Skip to content

Commit 52df7e6

Browse files
Improve error message when attempting to change consumer type (#6408)
Before this change, an attempt to change consumer type (by either setting or removing deliver subject), resulted in `max waiting can not be updated` error as `MaxWaiting` has a default value for pull consumers. This ensures that changing consumer type is checked before max waiting to return a more useful error message. Signed-off-by: Piotr Piotrowski [piotr@synadia.com](mailto:piotr@synadia.com)
2 parents 654d051 + d408266 commit 52df7e6

File tree

2 files changed

+45
-3
lines changed

2 files changed

+45
-3
lines changed

server/consumer.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2099,9 +2099,6 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
20992099
if cfg.FlowControl != ncfg.FlowControl {
21002100
return errors.New("flow control can not be updated")
21012101
}
2102-
if cfg.MaxWaiting != ncfg.MaxWaiting {
2103-
return errors.New("max waiting can not be updated")
2104-
}
21052102

21062103
// Deliver Subject is conditional on if its bound.
21072104
if cfg.DeliverSubject != ncfg.DeliverSubject {
@@ -2116,6 +2113,10 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
21162113
}
21172114
}
21182115

2116+
if cfg.MaxWaiting != ncfg.MaxWaiting {
2117+
return errors.New("max waiting can not be updated")
2118+
}
2119+
21192120
// Check if BackOff is defined, MaxDeliver is within range.
21202121
if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && lbo > ncfg.MaxDeliver {
21212122
return NewJSConsumerMaxDeliverBackoffError()

server/jetstream_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9279,6 +9279,47 @@ func TestJetStreamPullConsumerMaxWaiting(t *testing.T) {
92799279
}
92809280
}
92819281

9282+
func TestJetStreamChangeConsumerType(t *testing.T) {
9283+
s := RunBasicJetStreamServer(t)
9284+
defer s.Shutdown()
9285+
9286+
nc, js := jsClientConnect(t, s)
9287+
defer nc.Close()
9288+
9289+
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"test.*"}})
9290+
require_NoError(t, err)
9291+
9292+
// create pull consumer
9293+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
9294+
Name: "pull",
9295+
AckPolicy: nats.AckExplicitPolicy,
9296+
})
9297+
require_NoError(t, err)
9298+
9299+
// cannot update pull -> push
9300+
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
9301+
Name: "pull",
9302+
AckPolicy: nats.AckExplicitPolicy,
9303+
DeliverSubject: "foo",
9304+
})
9305+
require_Contains(t, err.Error(), "can not update pull consumer to push based")
9306+
9307+
// create push consumer
9308+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
9309+
Name: "push",
9310+
AckPolicy: nats.AckExplicitPolicy,
9311+
DeliverSubject: "foo",
9312+
})
9313+
require_NoError(t, err)
9314+
9315+
// cannot change push -> pull
9316+
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
9317+
Name: "push",
9318+
AckPolicy: nats.AckExplicitPolicy,
9319+
})
9320+
require_Contains(t, err.Error(), "can not update push consumer to pull based")
9321+
}
9322+
92829323
////////////////////////////////////////
92839324
// Benchmark placeholders
92849325
// TODO(dlc) - move

0 commit comments

Comments
 (0)