Skip to content

Commit 72f5e57

Browse files
committed
Evict oldest entry when the routed JS API queue is full
Signed-off-by: Neil Twigg <neil@nats.io>
1 parent 64074d1 commit 72f5e57

File tree

1 file changed

+24
-4
lines changed

1 file changed

+24
-4
lines changed

server/jetstream_api.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -886,12 +886,30 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
886886
// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
887887
// header from the msg body. No other references are needed.
888888
// Check pending and warn if getting backed up.
889-
pending, err := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
890-
if err == nil {
891-
atomic.AddInt64(&js.apiInflight, 1)
892-
}
893889
limit := atomic.LoadInt64(&js.queueLimit)
890+
retry:
891+
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
894892
if pending >= int(limit) {
893+
if _, ok := s.jsAPIRoutedReqs.popOne(); ok {
894+
// If we were able to take one of the oldest items off the queue, then
895+
// retry the insert.
896+
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping oldest request")
897+
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
898+
TypedEvent: TypedEvent{
899+
Type: JSAPILimitReachedAdvisoryType,
900+
ID: nuid.Next(),
901+
Time: time.Now().UTC(),
902+
},
903+
Server: s.Name(),
904+
Domain: js.config.Domain,
905+
Dropped: 1,
906+
})
907+
goto retry
908+
}
909+
910+
// It's likely not possible to get to this point, but if for some reason we have got here,
911+
// then something is wrong for us to be both over the limit but unable to pull entries, so
912+
// throw everything away and hope we recover from it.
895913
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
896914
drained := int64(s.jsAPIRoutedReqs.drain())
897915
atomic.AddInt64(&js.apiInflight, -drained)
@@ -906,6 +924,8 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
906924
Domain: js.config.Domain,
907925
Dropped: drained,
908926
})
927+
} else {
928+
atomic.StoreInt64(&js.apiInflight, int64(pending))
909929
}
910930
}
911931

0 commit comments

Comments
 (0)