Skip to content

Commit 9ffe4e0

Browse files
Merge pull request #11 from code0-tech/3-rabbit-mq-messages
3 rabbit mq messages
2 parents c720b7a + b9f17e4 commit 9ffe4e0

File tree

10 files changed

+238
-279
lines changed

10 files changed

+238
-279
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ redis = { version = "0.29.0", features = [
1919
"json",
2020
] }
2121
serde_json = "1.0.138"
22+
serde = "1.0.138"
2223
lapin = "2.5.0"
24+
futures-lite = "2.6.0"
2325

2426
[dev-dependencies]
2527
testcontainers = "0.23.2"

src/flow_queue/connection.rs

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,56 @@
1-
use lapin::{Channel, Connection, ConnectionProperties};
2-
use std::sync::Arc;
3-
use tokio::sync::Mutex;
1+
use lapin::Connection;
42

5-
pub type FlowQueue = Arc<Mutex<Box<Connection>>>;
6-
7-
pub type FlowChannel = Arc<Mutex<Box<Channel>>>;
8-
9-
async fn build_connection(rabbitmq_url: &str) -> Connection {
10-
match Connection::connect(rabbitmq_url, ConnectionProperties::default()).await {
3+
pub async fn build_connection(rabbitmq_url: &str) -> Connection {
4+
match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await {
115
Ok(env) => env,
12-
Err(error) => panic!("Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}", error),
13-
}
14-
}
15-
16-
pub async fn create_flow_channel_connection(uri: &str) -> FlowChannel {
17-
let connection = build_connection(uri).await;
18-
19-
match connection.create_channel().await {
20-
Ok(channel) => Arc::new(Mutex::new(Box::new(channel))),
21-
Err(error) => panic!("Cannot create channel {:?}", error),
6+
Err(error) => panic!(
7+
"Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}",
8+
error
9+
),
2210
}
2311
}
2412

2513
#[cfg(test)]
2614
mod tests {
15+
use crate::flow_queue::connection::build_connection;
2716
use testcontainers::core::{IntoContainerPort, WaitFor};
2817
use testcontainers::runners::AsyncRunner;
2918
use testcontainers::GenericImage;
30-
use crate::flow_queue::connection::build_connection;
3119

3220
macro_rules! rabbitmq_container_test {
3321
($test_name:ident, $consumer:expr) => {
34-
3522
#[tokio::test]
3623
async fn $test_name() {
3724
let port: u16 = 5672;
3825
let image_name = "rabbitmq";
3926
let wait_message = "Server startup complete";
40-
27+
4128
let container = GenericImage::new(image_name, "latest")
4229
.with_exposed_port(port.tcp())
4330
.with_wait_for(WaitFor::message_on_stdout(wait_message))
4431
.start()
4532
.await
4633
.unwrap();
47-
34+
4835
let host_port = container.get_host_port_ipv4(port).await.unwrap();
4936
let url = format!("amqp://guest:guest@localhost:{}", host_port);
50-
37+
5138
$consumer(url).await;
5239
}
5340
};
5441
}
5542

56-
rabbitmq_container_test!(test_rabbitmq_startup, (|url: String| async move {
57-
println!("RabbitMQ started with the url: {}", url);
58-
}));
59-
60-
rabbitmq_container_test!(test_rabbitmq_connection, (|url: String| async move {
61-
build_connection(&*url).await;
62-
}));
63-
43+
rabbitmq_container_test!(
44+
test_rabbitmq_startup,
45+
(|url: String| async move {
46+
println!("RabbitMQ started with the url: {}", url);
47+
})
48+
);
49+
50+
rabbitmq_container_test!(
51+
test_rabbitmq_connection,
52+
(|url: String| async move {
53+
build_connection(&*url).await;
54+
})
55+
);
6456
}

src/flow_queue/delegate.rs

Lines changed: 0 additions & 55 deletions
This file was deleted.

src/flow_queue/handler.rs

Lines changed: 0 additions & 119 deletions
This file was deleted.

src/flow_queue/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,2 @@
11
pub mod connection;
2-
pub mod name;
3-
pub mod handler;
4-
pub mod delegate;
2+
pub mod service;

src/flow_queue/name.rs

Lines changed: 0 additions & 62 deletions
This file was deleted.

0 commit comments

Comments
 (0)