Skip to content

Commit 3d8eb31

Browse files
committed
fix: lifetime identifier was missing
1 parent b2f1630 commit 3d8eb31

File tree

2 files changed

+27
-41
lines changed

2 files changed

+27
-41
lines changed

src/flow_queue/delegate.rs

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,48 +3,37 @@ use lapin::ConsumerDelegate;
33
use log::debug;
44
use std::future::Future;
55
use std::pin::Pin;
6+
use std::sync::Arc;
67

78
/// Delegate trait to implement.
8-
///
9-
/// Use as delegate for RabbitMQ
10-
///
11-
/// # Example
12-
/// ```
13-
/// use lapin::message::Delivery;
14-
/// use code0_flow::flow_queue::delegate::Delegate;
15-
///
16-
/// struct HttpDelegate;
17-
///
18-
/// impl Delegate for HttpDelegate {
19-
/// fn handle_delivery(&self, delivery: Delivery) {
20-
/// todo!("Handle delivery!")
21-
/// }
22-
/// }
23-
/// ```
24-
pub trait Delegate {
9+
pub trait Delegate: Sync + Send {
2510
fn handle_delivery(&self, delivery: Delivery);
2611
}
2712

2813
pub struct QueueDelegate<T: Delegate> {
29-
pub delegate: T,
14+
pub delegate: Arc<T>, // Use Arc for safe ownership transfer
3015
}
3116

3217
impl<T: Delegate> QueueDelegate<T> {
3318
pub fn new(delegate: T) -> Self {
34-
QueueDelegate { delegate }
19+
QueueDelegate {
20+
delegate: Arc::new(delegate),
21+
}
3522
}
3623

3724
pub fn deliver(&self, delivery: Delivery) {
3825
self.delegate.handle_delivery(delivery);
3926
}
4027
}
4128

42-
impl<T: Delegate> ConsumerDelegate for QueueDelegate<T> {
29+
impl<T: Delegate + Send + Sync + 'static> ConsumerDelegate for QueueDelegate<T> {
4330
fn on_new_delivery(
4431
&self,
4532
delivery: DeliveryResult,
46-
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
47-
async move {
33+
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
34+
let delegate = Arc::clone(&self.delegate);
35+
36+
Box::pin(async move {
4837
let optional_delivery = match delivery {
4938
Ok(option) => option,
5039
Err(_) => return,
@@ -54,15 +43,13 @@ impl<T: Delegate> ConsumerDelegate for QueueDelegate<T> {
5443
None => return,
5544
};
5645

57-
self.delegate.handle_delivery(delivery);
58-
}
46+
delegate.handle_delivery(delivery); // Use cloned delegate
47+
})
5948
}
6049

61-
fn drop_prefetched_messages(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
62-
let future = async move {
50+
fn drop_prefetched_messages(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
51+
Box::pin(async move {
6352
debug!("Dropping prefetched messages...");
64-
};
65-
66-
Box::pin(future)
53+
})
6754
}
6855
}

src/flow_queue/handler.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
1-
use crate::flow_queue::connection::{get_flow_channel, FlowChannel, FlowQueue};
1+
use crate::flow_queue::connection::FlowChannel;
22
use crate::flow_queue::delegate::{Delegate, QueueDelegate};
3-
use crate::flow_queue::name::{QueueName, QueuePrefix, QueueProtocol};
4-
use lapin::message::{Delivery, DeliveryResult};
3+
use crate::flow_queue::name::QueueName;
54
use lapin::options::{BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions};
6-
use lapin::protocol::basic::gen_return;
75
use lapin::types::FieldTable;
8-
use lapin::{ConsumerDelegate, Error};
9-
use log::{debug, error, info};
10-
use std::future::Future;
11-
use std::pin::Pin;
6+
use lapin::Error;
7+
use log::info;
8+
use std::sync::Arc;
129

1310
/// # Declares all given queues
1411
///
@@ -72,7 +69,7 @@ pub async fn send_message(
7269
/// - delegate: Consumer delegate of the message
7370
///
7471
/// # Example
75-
/// ```
72+
/// ``` ignore
7673
/// use lapin::message::Delivery;
7774
/// use code0_flow::flow_queue::delegate::Delegate;
7875
/// use code0_flow::flow_queue::connection::get_flow_channel;
@@ -98,15 +95,15 @@ pub async fn send_message(
9895
/// consume_message(channel, queue_name, HttpDelegate).await;
9996
/// }
10097
/// ```
101-
pub async fn consume_message<T: Delegate>(
98+
pub async fn consume_message<T: Delegate + 'static>(
10299
channel: FlowChannel,
103100
queue_name: QueueName,
104101
delegate: T,
105102
) {
106103
let name = queue_name.prefix + queue_name.protocol;
107104
let channel_arc = channel.lock().await;
108105

109-
let mut consumer = channel_arc
106+
let consumer = channel_arc
110107
.basic_consume(
111108
&*name,
112109
"",
@@ -116,5 +113,7 @@ pub async fn consume_message<T: Delegate>(
116113
.await
117114
.unwrap();
118115

119-
consumer.set_delegate(QueueDelegate { delegate });
116+
consumer.set_delegate(QueueDelegate {
117+
delegate: Arc::new(delegate),
118+
});
120119
}

0 commit comments

Comments
 (0)