Skip to content

Commit e05e949

Browse files
authored
Detect zombie consumer (#390)
* Detect zombie consumer * With a load balancer configuration, the disconnection can take time due to heartbeat In this pr, the client detects possible zombie consumers. --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 72bff1c commit e05e949

File tree

6 files changed

+53
-18
lines changed

6 files changed

+53
-18
lines changed

pkg/ha/ha_consumer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ func (c *ReliableConsumer) GetStatusAsString() string {
4646
func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) {
4747
go func() {
4848
event := <-channelClose
49-
if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) {
49+
if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) || strings.EqualFold(event.Reason, stream.ZombieConsumer) {
5050
c.setStatus(StatusReconnecting)
51-
logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo())
51+
logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", c.getInfo(), event.Reason)
5252
c.bootstrap = false
5353
err, reconnected := retry(1, c)
5454
if err != nil {

pkg/stream/client.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -487,16 +487,19 @@ func (c *Client) Close() error {
487487
}
488488
}
489489

490-
for _, cs := range c.coordinator.consumers {
491-
err := c.coordinator.RemoveConsumerById(cs.(*Consumer).ID, Event{
492-
Command: CommandClose,
493-
StreamName: cs.(*Consumer).GetStreamName(),
494-
Name: cs.(*Consumer).GetName(),
495-
Reason: SocketClosed,
496-
Err: nil,
497-
})
498-
if err != nil {
499-
logs.LogWarn("error removing consumer: %s", err)
490+
for _, cs := range c.coordinator.GetConsumers() {
491+
if cs != nil {
492+
err := c.coordinator.RemoveConsumerById(cs.(*Consumer).ID, Event{
493+
Command: CommandClose,
494+
StreamName: cs.(*Consumer).GetStreamName(),
495+
Name: cs.(*Consumer).GetName(),
496+
Reason: SocketClosed,
497+
Err: nil,
498+
})
499+
500+
if err != nil {
501+
logs.LogWarn("error removing consumer: %s", err)
502+
}
500503
}
501504
}
502505
if c.getSocket().isOpen() {
@@ -1019,10 +1022,30 @@ func (c *Client) declareSubscriber(streamName string,
10191022
}
10201023
}
10211024

1022-
case <-time.After(consumer.options.autoCommitStrategy.flushInterval):
1023-
consumer.cacheStoreOffset()
1025+
case <-time.After(1_000 * time.Millisecond):
1026+
if consumer.options.autocommit && time.Since(consumer.getLastAutoCommitStored()) >= consumer.options.autoCommitStrategy.flushInterval {
1027+
consumer.cacheStoreOffset()
1028+
}
1029+
1030+
// This is a very edge case where the consumer is not active anymore
1031+
// but the consumer is still in the list of consumers
1032+
// It can happen during the reconnection with load-balancing
1033+
// found this problem with a caos test where random killing the load-balancer and node where
1034+
// the client should be connected
1035+
if consumer.isZombie() {
1036+
logs.LogWarn("Detected zombie consumer for stream %s, closing", streamName)
1037+
consumer.close(Event{
1038+
Command: CommandUnsubscribe,
1039+
StreamName: consumer.GetStreamName(),
1040+
Name: consumer.GetName(),
1041+
Reason: ZombieConsumer,
1042+
Err: nil,
1043+
})
1044+
return
1045+
}
10241046
}
10251047
}
1048+
10261049
}()
10271050
return consumer, err.Err
10281051
}

pkg/stream/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ const (
114114
defaultConfirmationTimeOut = 10 * time.Second
115115
//
116116

117+
ZombieConsumer = "zombie-consumer"
117118
SocketClosed = "socket client closed"
118119
MetaDataUpdate = "metadata Data update"
119120
LeaderLocatorBalanced = "balanced"

pkg/stream/consumer.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ func (consumer *Consumer) getStatus() int {
5454
return consumer.status
5555
}
5656

57+
func (consumer *Consumer) isZombie() bool {
58+
consumer.mutex.Lock()
59+
defer consumer.mutex.Unlock()
60+
return consumer.status == open && !consumer.options.client.socket.isOpen()
61+
}
62+
5763
func (consumer *Consumer) GetStreamName() string {
5864
if consumer.options == nil {
5965
return ""
@@ -341,9 +347,9 @@ func (consumer *Consumer) close(reason Event) error {
341347
consumer.closeHandler = nil
342348
}
343349

344-
close(consumer.chunkForConsumer)
345-
346350
if consumer.response.data != nil {
351+
close(consumer.chunkForConsumer)
352+
347353
close(consumer.response.data)
348354
consumer.response.data = nil
349355
}

pkg/stream/coordinator.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,14 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event)
8989
return consumer.close(reason)
9090

9191
}
92-
func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error {
92+
func (coordinator *Coordinator) GetConsumers() map[interface{}]interface{} {
93+
coordinator.mutex.Lock()
94+
defer coordinator.mutex.Unlock()
95+
return coordinator.consumers
9396

97+
}
98+
99+
func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error {
94100
producer, err := coordinator.ExtractProducerById(id)
95101
if err != nil {
96102
return err

pkg/stream/producer_unconfirmed.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ func newUnConfirmed(maxSize int) *unConfirmed {
3434
func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID uint8) {
3535

3636
if u.size() > u.maxSize {
37-
logs.LogDebug("unConfirmed size: %d reached, producer blocked", u.maxSize)
3837
u.blockSignal.L.Lock()
3938
u.blockSignal.Wait()
4039
u.blockSignal.L.Unlock()

0 commit comments

Comments
 (0)