Skip to content

Commit 625d1da

Browse files
committed
Add heartbeat timer reset after it occurs
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
1 parent 516c40a commit 625d1da

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

async-nats/src/jetstream/consumer/push.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,10 @@ impl futures::Stream for Messages {
132132
.poll_unpin(cx)
133133
{
134134
Poll::Ready(_) => {
135+
self.heartbeat_sleep = None;
135136
return Poll::Ready(Some(Err(MessagesError::new(
136137
MessagesErrorKind::MissingHeartbeat,
137-
))))
138+
))));
138139
}
139140
Poll::Pending => (),
140141
}
@@ -569,9 +570,10 @@ impl<'a> futures::Stream for Ordered<'a> {
569570
.poll_unpin(cx)
570571
{
571572
Poll::Ready(_) => {
573+
self.heartbeat_sleep = None;
572574
return Poll::Ready(Some(Err(OrderedError::new(
573575
OrderedErrorKind::MissingHeartbeat,
574-
))))
576+
))));
575577
}
576578
Poll::Pending => (),
577579
}

async-nats/tests/jetstream_tests.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1650,6 +1650,17 @@ mod jetstream {
16501650
messages.next().await.unwrap().unwrap_err().kind(),
16511651
async_nats::jetstream::consumer::push::MessagesErrorKind::MissingHeartbeat
16521652
);
1653+
stream
1654+
.create_consumer(consumer::push::Config {
1655+
deliver_subject: "delivery".to_string(),
1656+
durable_name: Some("delete_me".to_string()),
1657+
idle_heartbeat: Duration::from_secs(5),
1658+
..Default::default()
1659+
})
1660+
.await
1661+
.unwrap();
1662+
1663+
messages.next().await.unwrap().unwrap();
16531664
}
16541665

16551666
#[tokio::test]

0 commit comments

Comments
 (0)