Skip to content

Commit d408266

Browse files
committed
Improve error message when attempting to change consumer type
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
1 parent f2eb565 commit d408266

File tree

2 files changed

+45
-3
lines changed

2 files changed

+45
-3
lines changed

server/consumer.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2094,9 +2094,6 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
20942094
if cfg.FlowControl != ncfg.FlowControl {
20952095
return errors.New("flow control can not be updated")
20962096
}
2097-
if cfg.MaxWaiting != ncfg.MaxWaiting {
2098-
return errors.New("max waiting can not be updated")
2099-
}
21002097

21012098
// Deliver Subject is conditional on if its bound.
21022099
if cfg.DeliverSubject != ncfg.DeliverSubject {
@@ -2111,6 +2108,10 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
21112108
}
21122109
}
21132110

2111+
if cfg.MaxWaiting != ncfg.MaxWaiting {
2112+
return errors.New("max waiting can not be updated")
2113+
}
2114+
21142115
// Check if BackOff is defined, MaxDeliver is within range.
21152116
if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && lbo > ncfg.MaxDeliver {
21162117
return NewJSConsumerMaxDeliverBackoffError()

server/jetstream_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9279,6 +9279,47 @@ func TestJetStreamPullConsumerMaxWaiting(t *testing.T) {
92799279
}
92809280
}
92819281

9282+
func TestJetStreamChangeConsumerType(t *testing.T) {
9283+
s := RunBasicJetStreamServer(t)
9284+
defer s.Shutdown()
9285+
9286+
nc, js := jsClientConnect(t, s)
9287+
defer nc.Close()
9288+
9289+
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"test.*"}})
9290+
require_NoError(t, err)
9291+
9292+
// create pull consumer
9293+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
9294+
Name: "pull",
9295+
AckPolicy: nats.AckExplicitPolicy,
9296+
})
9297+
require_NoError(t, err)
9298+
9299+
// cannot update pull -> push
9300+
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
9301+
Name: "pull",
9302+
AckPolicy: nats.AckExplicitPolicy,
9303+
DeliverSubject: "foo",
9304+
})
9305+
require_Contains(t, err.Error(), "can not update pull consumer to push based")
9306+
9307+
// create push consumer
9308+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
9309+
Name: "push",
9310+
AckPolicy: nats.AckExplicitPolicy,
9311+
DeliverSubject: "foo",
9312+
})
9313+
require_NoError(t, err)
9314+
9315+
// cannot change push -> pull
9316+
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
9317+
Name: "push",
9318+
AckPolicy: nats.AckExplicitPolicy,
9319+
})
9320+
require_Contains(t, err.Error(), "can not update push consumer to pull based")
9321+
}
9322+
92829323
////////////////////////////////////////
92839324
// Benchmark placeholders
92849325
// TODO(dlc) - move

0 commit comments

Comments
 (0)