Skip to content

Commit 64074d1

Browse files
committed
Switch routed JS API queue to LIFO
Signed-off-by: Neil Twigg <neil@nats.io>
1 parent 52df7e6 commit 64074d1

File tree

2 files changed

+42
-6
lines changed

2 files changed

+42
-6
lines changed

server/ipqueue.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,43 @@ func (q *ipQueue[T]) popOne() (T, bool) {
207207
return e, true
208208
}
209209

210+
// Returns the last element from the queue, if any. See comment above
211+
// regarding calling after being notified that there is something and
212+
// the use of drain(). In short, the caller should always check the
213+
// boolean return value to ensure that the value is genuine and not a
214+
// default empty value.
215+
func (q *ipQueue[T]) popOneLast() (T, bool) {
216+
q.Lock()
217+
l := len(q.elts) - q.pos
218+
if l == 0 {
219+
q.Unlock()
220+
var empty T
221+
return empty, false
222+
}
223+
e := q.elts[len(q.elts)-1]
224+
q.elts = q.elts[:len(q.elts)-1]
225+
if l--; l > 0 {
226+
if q.calc != nil {
227+
q.sz -= q.calc(e)
228+
}
229+
// We need to re-signal
230+
select {
231+
case q.ch <- struct{}{}:
232+
default:
233+
}
234+
} else {
235+
// We have just emptied the queue, so we can reuse unless it is too big.
236+
if cap(q.elts) <= q.mrs {
237+
q.elts = q.elts[:0]
238+
} else {
239+
q.elts = nil
240+
}
241+
q.pos, q.sz = 0, 0
242+
}
243+
q.Unlock()
244+
return e, true
245+
}
246+
210247
// After a pop(), the slice can be recycled for the next push() when
211248
// a first element is added to the queue.
212249
// This will also decrement the "in progress" count with the length

server/jetstream_api.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -883,14 +883,13 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
883883
// If we are here we have received this request over a non-client connection.
884884
// We need to make sure not to block. We will send the request to a long-lived
885885
// pool of go routines.
886-
887-
// Increment inflight. Do this before queueing.
888-
atomic.AddInt64(&js.apiInflight, 1)
889-
890886
// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
891887
// header from the msg body. No other references are needed.
892888
// Check pending and warn if getting backed up.
893-
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
889+
pending, err := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
890+
if err == nil {
891+
atomic.AddInt64(&js.apiInflight, 1)
892+
}
894893
limit := atomic.LoadInt64(&js.queueLimit)
895894
if pending >= int(limit) {
896895
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
@@ -926,7 +925,7 @@ func (s *Server) processJSAPIRoutedRequests() {
926925
// Only pop one item at a time here, otherwise if the system is recovering
927926
// from queue buildup, then one worker will pull off all the tasks and the
928927
// others will be starved of work.
929-
for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
928+
for r, ok := queue.popOneLast(); ok && r != nil; r, ok = queue.popOneLast() {
930929
client.pa = r.pa
931930
start := time.Now()
932931
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)

0 commit comments

Comments
 (0)