Skip to content

Commit 1b0da8b

Browse files
committed
feat: added function without timeout
1 parent 9ffe4e0 commit 1b0da8b

File tree

1 file changed

+53
-42
lines changed

1 file changed

+53
-42
lines changed

src/flow_queue/service.rs

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,11 @@ impl RabbitmqClient {
122122
}
123123

124124
// Receive messages from a queue
125-
pub async fn await_message(
125+
// Receive messages from a queue with no timeout
126+
pub async fn await_message_no_timeout(
126127
&self,
127128
queue_name: &str,
128129
message_id: String,
129-
timeout: Duration,
130130
ack_on_success: bool,
131131
) -> Result<Message, RabbitMqError> {
132132
let mut consumer = {
@@ -149,53 +149,64 @@ impl RabbitmqClient {
149149

150150
debug!("Starting to consume from {}", queue_name);
151151

152-
// Create a future for the next message
153-
let receive_future = async {
154-
while let Some(delivery_result) = consumer.next().await {
155-
let delivery = match delivery_result {
156-
Ok(del) => del,
157-
Err(_) => return Err(RabbitMqError::DeserializationError),
158-
};
159-
let data = &delivery.data;
160-
let message_str = match std::str::from_utf8(&data) {
161-
Ok(str) => str,
162-
Err(_) => {
163-
return Err(RabbitMqError::DeserializationError);
164-
}
165-
};
166-
167-
debug!("Received message: {}", message_str);
168-
169-
// Parse the message
170-
let message = match serde_json::from_str::<Message>(message_str) {
171-
Ok(m) => m,
172-
Err(e) => {
173-
log::error!("Failed to parse message: {}", e);
174-
return Err(RabbitMqError::DeserializationError);
175-
}
176-
};
177-
178-
if message.message_id == message_id {
179-
if ack_on_success {
180-
delivery
181-
.ack(lapin::options::BasicAckOptions::default())
182-
.await
183-
.expect("Failed to acknowledge message");
184-
}
185-
186-
return Ok(message);
152+
while let Some(delivery_result) = consumer.next().await {
153+
let delivery = match delivery_result {
154+
Ok(del) => del,
155+
Err(_) => return Err(RabbitMqError::DeserializationError),
156+
};
157+
let data = &delivery.data;
158+
let message_str = match std::str::from_utf8(&data) {
159+
Ok(str) => str,
160+
Err(_) => {
161+
return Err(RabbitMqError::DeserializationError);
162+
}
163+
};
164+
165+
debug!("Received message: {}", message_str);
166+
167+
// Parse the message
168+
let message = match serde_json::from_str::<Message>(message_str) {
169+
Ok(m) => m,
170+
Err(e) => {
171+
log::error!("Failed to parse message: {}", e);
172+
return Err(RabbitMqError::DeserializationError);
187173
}
174+
};
175+
176+
if message.message_id == message_id {
177+
if ack_on_success {
178+
delivery
179+
.ack(lapin::options::BasicAckOptions::default())
180+
.await
181+
.expect("Failed to acknowledge message");
182+
}
183+
184+
return Ok(message);
188185
}
189-
Err(RabbitMqError::DeserializationError)
190-
};
186+
}
187+
Err(RabbitMqError::DeserializationError)
188+
}
191189

192-
// Set a timeout of 10 seconds
193-
match tokio::time::timeout(timeout, receive_future).await {
190+
// Receive messages from a queue with timeout
191+
pub async fn await_message(
192+
&self,
193+
queue_name: &str,
194+
message_id: String,
195+
timeout: Duration,
196+
ack_on_success: bool,
197+
) -> Result<Message, RabbitMqError> {
198+
// Set a timeout
199+
match tokio::time::timeout(
200+
timeout,
201+
self.await_message_no_timeout(queue_name, message_id, ack_on_success),
202+
)
203+
.await
204+
{
194205
Ok(result) => result,
195206
Err(_) => {
196207
debug!(
197208
"Timeout waiting for message after {} seconds",
198-
timeout.as_millis() / 1000
209+
timeout.as_secs()
199210
);
200211
Err(RabbitMqError::TimeoutError)
201212
}

0 commit comments

Comments
 (0)