Skip to content

Commit 7cd19db

Browse files
committed
Improve tracking of apiInflight metric, add IPQ tests
Signed-off-by: Neil Twigg <neil@nats.io>
1 parent 72f5e57 commit 7cd19db

File tree

2 files changed

+129
-5
lines changed

2 files changed

+129
-5
lines changed

server/ipqueue_test.go

Lines changed: 126 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,99 @@ func TestIPQueuePopOne(t *testing.T) {
250250
q.recycle(&values)
251251
}
252252

253+
func TestIPQueuePopOneLast(t *testing.T) {
254+
s := &Server{}
255+
q := newIPQueue[int](s, "test")
256+
q.push(1)
257+
<-q.ch
258+
e, ok := q.popOneLast()
259+
if !ok {
260+
t.Fatal("Got nil")
261+
}
262+
if i := e; i != 1 {
263+
t.Fatalf("Expected 1, got %v", i)
264+
}
265+
if l := q.len(); l != 0 {
266+
t.Fatalf("Expected len to be 0, got %v", l)
267+
}
268+
// That does not affect the number of notProcessed
269+
if n := q.inProgress(); n != 0 {
270+
t.Fatalf("Expected count to be 0, got %v", n)
271+
}
272+
select {
273+
case <-q.ch:
274+
t.Fatalf("Should not have been notified of addition")
275+
default:
276+
// OK
277+
}
278+
q.push(2)
279+
q.push(3)
280+
e, ok = q.popOneLast()
281+
if !ok {
282+
t.Fatal("Got nil")
283+
}
284+
if i := e; i != 3 {
285+
t.Fatalf("Expected 3, got %v", i)
286+
}
287+
if l := q.len(); l != 1 {
288+
t.Fatalf("Expected len to be 1, got %v", l)
289+
}
290+
select {
291+
case <-q.ch:
292+
// OK
293+
default:
294+
t.Fatalf("Should have been notified that there is more")
295+
}
296+
e, ok = q.popOneLast()
297+
if !ok {
298+
t.Fatal("Got nil")
299+
}
300+
if i := e; i != 2 {
301+
t.Fatalf("Expected 2, got %v", i)
302+
}
303+
if l := q.len(); l != 0 {
304+
t.Fatalf("Expected len to be 0, got %v", l)
305+
}
306+
select {
307+
case <-q.ch:
308+
t.Fatalf("Should not have been notified that there is more")
309+
default:
310+
// OK
311+
}
312+
// Calling it again now that we know there is nothing, we
313+
// should get nil.
314+
if e, ok = q.popOneLast(); ok {
315+
t.Fatalf("Expected nil, got %v", e)
316+
}
317+
318+
q = newIPQueue[int](s, "test2")
319+
q.push(1)
320+
q.push(2)
321+
// Capture current capacity
322+
q.Lock()
323+
c := cap(q.elts)
324+
q.Unlock()
325+
e, ok = q.popOneLast()
326+
if !ok || e != 2 {
327+
t.Fatalf("Invalid value: %v", e)
328+
}
329+
if l := q.len(); l != 1 {
330+
t.Fatalf("Expected len to be 1, got %v", l)
331+
}
332+
values := q.pop()
333+
if len(values) != 1 || values[0] != 1 {
334+
t.Fatalf("Unexpected values: %v", values)
335+
}
336+
if cap(values) != c {
337+
t.Fatalf("Unexpected capacity: %v vs %v", cap(values), c)
338+
}
339+
if l := q.len(); l != 0 {
340+
t.Fatalf("Expected len to be 0, got %v", l)
341+
}
342+
// Just make sure that this is ok...
343+
q.recycle(&values)
344+
}
345+
253346
func TestIPQueueMultiProducers(t *testing.T) {
254347
s := &Server{}
255348
q := newIPQueue[int](s, "test")
@@ -382,7 +475,36 @@ func TestIPQueueDrain(t *testing.T) {
382475
}
383476
}
384477

385-
func TestIPQueueSizeCalculation(t *testing.T) {
478+
func TestIPQueueSizeCalculationPopOne(t *testing.T) {
479+
type testType = [16]byte
480+
var testValue testType
481+
482+
calc := ipqSizeCalculation[testType](func(e testType) uint64 {
483+
return uint64(len(e))
484+
})
485+
s := &Server{}
486+
q := newIPQueue[testType](s, "test", calc)
487+
488+
for i := 0; i < 10; i++ {
489+
testValue[0] = byte(i)
490+
q.push(testValue)
491+
require_Equal(t, q.len(), i+1)
492+
require_Equal(t, q.size(), uint64(i+1)*uint64(len(testValue)))
493+
}
494+
495+
for i := 10; i > 5; i-- {
496+
v, _ := q.popOne()
497+
require_Equal(t, 10-v[0], byte(i))
498+
require_Equal(t, q.len(), i-1)
499+
require_Equal(t, q.size(), uint64(i-1)*uint64(len(testValue)))
500+
}
501+
502+
q.pop()
503+
require_Equal(t, q.len(), 0)
504+
require_Equal(t, q.size(), 0)
505+
}
506+
507+
func TestIPQueueSizeCalculationPopOneLast(t *testing.T) {
386508
type testType = [16]byte
387509
var testValue testType
388510

@@ -393,13 +515,15 @@ func TestIPQueueSizeCalculation(t *testing.T) {
393515
q := newIPQueue[testType](s, "test", calc)
394516

395517
for i := 0; i < 10; i++ {
518+
testValue[0] = byte(i)
396519
q.push(testValue)
397520
require_Equal(t, q.len(), i+1)
398521
require_Equal(t, q.size(), uint64(i+1)*uint64(len(testValue)))
399522
}
400523

401524
for i := 10; i > 5; i-- {
402-
q.popOne()
525+
v, _ := q.popOneLast()
526+
require_Equal(t, v[0]+1, byte(i))
403527
require_Equal(t, q.len(), i-1)
404528
require_Equal(t, q.size(), uint64(i-1)*uint64(len(testValue)))
405529
}

server/jetstream_api.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -888,12 +888,14 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
888888
// Check pending and warn if getting backed up.
889889
limit := atomic.LoadInt64(&js.queueLimit)
890890
retry:
891+
atomic.AddInt64(&js.apiInflight, 1)
891892
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
892893
if pending >= int(limit) {
893894
if _, ok := s.jsAPIRoutedReqs.popOne(); ok {
894895
// If we were able to take one of the oldest items off the queue, then
895896
// retry the insert.
896897
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping oldest request")
898+
atomic.AddInt64(&js.apiInflight, -1)
897899
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
898900
TypedEvent: TypedEvent{
899901
Type: JSAPILimitReachedAdvisoryType,
@@ -910,8 +912,8 @@ retry:
910912
// It's likely not possible to get to this point, but if for some reason we have got here,
911913
// then something is wrong for us to be both over the limit but unable to pull entries, so
912914
// throw everything away and hope we recover from it.
913-
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
914915
drained := int64(s.jsAPIRoutedReqs.drain())
916+
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", drained)
915917
atomic.AddInt64(&js.apiInflight, -drained)
916918

917919
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
@@ -924,8 +926,6 @@ retry:
924926
Domain: js.config.Domain,
925927
Dropped: drained,
926928
})
927-
} else {
928-
atomic.StoreInt64(&js.apiInflight, int64(pending))
929929
}
930930
}
931931

0 commit comments

Comments
 (0)