From b7527798f07143a7dfd251ef41bf5759d01a3c56 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Thu, 4 Sep 2025 12:43:05 +0300 Subject: [PATCH 1/4] fix: graceful shutdown Signed-off-by: Timur Tuktamyshev --- hooks/pods-hook.sh | 20 ++++++ pkg/debug/server.go | 65 +++++++++---------- pkg/kube_events_manager/factory.go | 1 + .../kube_events_manager.go | 14 ++-- pkg/kube_events_manager/monitor.go | 20 ------ pkg/kube_events_manager/namespace_informer.go | 4 -- pkg/kube_events_manager/resource_informer.go | 5 -- pkg/shell-operator/operator.go | 2 +- .../kube_event_manager_test.go | 2 +- 9 files changed, 58 insertions(+), 75 deletions(-) create mode 100755 hooks/pods-hook.sh diff --git a/hooks/pods-hook.sh b/hooks/pods-hook.sh new file mode 100755 index 00000000..2cfe50ef --- /dev/null +++ b/hooks/pods-hook.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +if [[ $1 == "--config" ]] ; then + cat < Date: Thu, 4 Sep 2025 13:00:17 +0300 Subject: [PATCH 2/4] fix: add more graceful shutdown Signed-off-by: Timur Tuktamyshev --- pkg/kube_events_manager/factory.go | 56 +++++++++++++++---- .../kube_events_manager.go | 13 +++++ pkg/kube_events_manager/monitor.go | 16 ++++++ pkg/kube_events_manager/namespace_informer.go | 14 ++++- pkg/kube_events_manager/resource_informer.go | 5 ++ pkg/shell-operator/operator.go | 11 ++++ 6 files changed, 104 insertions(+), 11 deletions(-) diff --git a/pkg/kube_events_manager/factory.go b/pkg/kube_events_manager/factory.go index 25b7f43f..66cc7a9a 100644 --- a/pkg/kube_events_manager/factory.go +++ b/pkg/kube_events_manager/factory.go @@ -36,38 +36,44 @@ type Factory struct { handlerRegistrations map[string]cache.ResourceEventHandlerRegistration ctx context.Context cancel context.CancelFunc + // done is closed when the underlying informer.Run returns + done chan struct{} } type FactoryStore struct { mu sync.Mutex - data map[FactoryIndex]Factory + cond *sync.Cond + data map[FactoryIndex]*Factory } func NewFactoryStore() *FactoryStore { - return &FactoryStore{ - data: make(map[FactoryIndex]Factory), + fs := &FactoryStore{ + data: make(map[FactoryIndex]*Factory), } + fs.cond = sync.NewCond(&fs.mu) + return fs } func (c *FactoryStore) Reset() { c.mu.Lock() defer c.mu.Unlock() - c.data = make(map[FactoryIndex]Factory) + c.data = make(map[FactoryIndex]*Factory) } func (c *FactoryStore) add(index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) { ctx, cancel := context.WithCancel(context.Background()) - c.data[index] = Factory{ + c.data[index] = &Factory{ shared: f, handlerRegistrations: make(map[string]cache.ResourceEventHandlerRegistration), ctx: ctx, cancel: cancel, + done: nil, } log.Debug("Factory store: added a new factory for index", slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) } -func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) Factory { +func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) *Factory { f, ok := c.data[index] if ok { log.Debug("Factory store: the factory with index found", @@ -115,9 +121,18 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna slog.Int("value", len(factory.handlerRegistrations)), slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) - if !informer.HasSynced() { - go informer.Run(factory.ctx.Done()) + // Ensure informer.Run is started once and tracked + if factory.done == nil { + factory.done = make(chan struct{}) + go func() { + informer.Run(factory.ctx.Done()) + close(factory.done) + log.Debug("Factory store: informer goroutine exited", + slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + }() + } + if !informer.HasSynced() { if err := wait.PollUntilContextCancel(ctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) { return informer.HasSynced(), nil }); err != nil { @@ -131,11 +146,10 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna func (c *FactoryStore) Stop(informerId string, index FactoryIndex) { c.mu.Lock() - defer c.mu.Unlock() - f, ok := c.data[index] if !ok { // already deleted + c.mu.Unlock() return } @@ -152,10 +166,32 @@ func (c *FactoryStore) Stop(informerId string, index FactoryIndex) { slog.Int("value", len(f.handlerRegistrations)), slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) if len(f.handlerRegistrations) == 0 { + log.Debug("Factory store: last handler removed, canceling shared informer", + slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + done := f.done f.cancel() + c.mu.Unlock() + if done != nil { + <-done + } + c.mu.Lock() delete(c.data, index) log.Debug("Factory store: deleted factory", slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + c.cond.Broadcast() + } + } + c.mu.Unlock() +} + +// WaitStopped blocks until there is no factory for the index +func (c *FactoryStore) WaitStopped(index FactoryIndex) { + c.mu.Lock() + for { + if _, ok := c.data[index]; !ok { + c.mu.Unlock() + return } + c.cond.Wait() } } diff --git a/pkg/kube_events_manager/kube_events_manager.go b/pkg/kube_events_manager/kube_events_manager.go index ee98af53..8e13a401 100644 --- a/pkg/kube_events_manager/kube_events_manager.go +++ b/pkg/kube_events_manager/kube_events_manager.go @@ -25,6 +25,7 @@ type KubeEventsManager interface { Ch() chan kemtypes.KubeEvent Stop() + Wait() } // kubeEventsManager is a main implementation of KubeEventsManager. @@ -143,6 +144,18 @@ func (mgr *kubeEventsManager) Stop() { mgr.cancel() } +func (mgr *kubeEventsManager) Wait() { + mgr.m.RLock() + monitors := make([]Monitor, 0, len(mgr.Monitors)) + for _, mon := range mgr.Monitors { + monitors = append(monitors, mon) + } + mgr.m.RUnlock() + for _, mon := range monitors { + mon.Wait() + } +} + func (mgr *kubeEventsManager) MetricStorage() metric.Storage { return mgr.metricStorage } diff --git a/pkg/kube_events_manager/monitor.go b/pkg/kube_events_manager/monitor.go index 7eafc881..9e252eb8 100644 --- a/pkg/kube_events_manager/monitor.go +++ b/pkg/kube_events_manager/monitor.go @@ -19,6 +19,7 @@ type Monitor interface { CreateInformers() error Start(context.Context) Stop() + Wait() Snapshot() []kemtypes.ObjectAndFilterResult EnableKubeEventCb() GetConfig() *MonitorConfig @@ -372,6 +373,21 @@ func (m *monitor) Stop() { } } +// Wait waits for all started informers to stop +func (m *monitor) Wait() { + for _, informer := range m.ResourceInformers { + informer.wait() + } + m.VaryingInformers.RangeValue(func(value []*resourceInformer) { + for _, informer := range value { + informer.wait() + } + }) + if m.NamespaceInformer != nil { + m.NamespaceInformer.wait() + } +} + func (m *monitor) SnapshotOperations() (*CachedObjectsInfo /*total*/, *CachedObjectsInfo /*last*/) { total := &CachedObjectsInfo{} last := &CachedObjectsInfo{} diff --git a/pkg/kube_events_manager/namespace_informer.go b/pkg/kube_events_manager/namespace_informer.go index 8bfde99f..5e0e8f3f 100644 --- a/pkg/kube_events_manager/namespace_informer.go +++ b/pkg/kube_events_manager/namespace_informer.go @@ -21,6 +21,7 @@ type namespaceInformer struct { ctx context.Context cancel context.CancelFunc stopped bool + done chan struct{} KubeClient *klient.Client Monitor *MonitorConfig @@ -128,13 +129,18 @@ func (ni *namespaceInformer) start() { return } cctx, cancel := context.WithCancel(ni.ctx) + ni.done = make(chan struct{}) go func() { <-ni.ctx.Done() ni.stopped = true cancel() }() - go ni.SharedInformer.Run(cctx.Done()) + go func() { + ni.SharedInformer.Run(cctx.Done()) + close(ni.done) + log.Debug("Namespace informer goroutine exited", slog.String("name", ni.Monitor.Metadata.DebugName)) + }() if err := wait.PollUntilContextCancel(cctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) { return ni.SharedInformer.HasSynced(), nil @@ -145,3 +151,9 @@ func (ni *namespaceInformer) start() { log.Debug("Informer is ready", slog.String("debugName", ni.Monitor.Metadata.DebugName)) } + +func (ni *namespaceInformer) wait() { + if ni.done != nil { + <-ni.done + } +} diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index 6a8f8ffe..ef9db494 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -469,6 +469,11 @@ func (ei *resourceInformer) start() { log.Debug("informer is ready", slog.String("debugName", ei.Monitor.Metadata.DebugName)) } +// wait blocks until the underlying shared informer for this FactoryIndex is stopped +func (ei *resourceInformer) wait() { + DefaultFactoryStore.WaitStopped(ei.FactoryIndex) +} + // CachedObjectsInfo returns info accumulated from start. func (ei *resourceInformer) getCachedObjectsInfo() CachedObjectsInfo { ei.cacheLock.RLock() diff --git a/pkg/shell-operator/operator.go b/pkg/shell-operator/operator.go index 356d6976..7988a6ed 100644 --- a/pkg/shell-operator/operator.go +++ b/pkg/shell-operator/operator.go @@ -960,9 +960,20 @@ func (op *ShellOperator) runMetrics() { // Shutdown pause kubernetes events handling and stop queues. Wait for queues to stop. func (op *ShellOperator) Shutdown() { + log.Info("shutdown: begin") op.ScheduleManager.Stop() + log.Info("shutdown: schedule manager stopped") + op.KubeEventsManager.Stop() + log.Info("shutdown: kube events manager canceled, waiting for informers") + if kem, ok := op.KubeEventsManager.(interface{ Wait() }); ok { + kem.Wait() + } + log.Info("shutdown: kube events manager done") + op.TaskQueues.Stop() + log.Info("shutdown: task queues stop signaled, waiting") // Wait for queues to stop, but no more than 10 seconds op.TaskQueues.WaitStopWithTimeout(WaitQueuesTimeout) + log.Info("shutdown: task queues stopped") } From cb63c6fa44b05b3545a0d1d361250390e039ddbd Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Thu, 4 Sep 2025 13:18:43 +0300 Subject: [PATCH 3/4] chore: verbose logging Signed-off-by: Timur Tuktamyshev --- hooks/pods-hook.sh | 20 ----------- pkg/debug/server.go | 65 ++++++++++++++++++---------------- pkg/shell-operator/operator.go | 16 ++++----- 3 files changed, 41 insertions(+), 60 deletions(-) delete mode 100755 hooks/pods-hook.sh diff --git a/hooks/pods-hook.sh b/hooks/pods-hook.sh deleted file mode 100755 index 2cfe50ef..00000000 --- a/hooks/pods-hook.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env bash - -if [[ $1 == "--config" ]] ; then - cat < Date: Thu, 4 Sep 2025 16:53:57 +0300 Subject: [PATCH 4/4] chore: finalize Signed-off-by: Timur Tuktamyshev --- pkg/shell-operator/operator.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/shell-operator/operator.go b/pkg/shell-operator/operator.go index eadb47b1..4c57464c 100644 --- a/pkg/shell-operator/operator.go +++ b/pkg/shell-operator/operator.go @@ -960,18 +960,18 @@ func (op *ShellOperator) runMetrics() { // Shutdown pause kubernetes events handling and stop queues. Wait for queues to stop. func (op *ShellOperator) Shutdown() { - fmt.Println("shutdown begin", slog.String("phase", "shutdown")) + op.logger.Info("shutdown begin", slog.String("phase", "shutdown")) op.ScheduleManager.Stop() - fmt.Println("schedule manager stopped", slog.String("phase", "shutdown")) + op.logger.Info("schedule manager stopped", slog.String("phase", "shutdown")) op.KubeEventsManager.Stop() - fmt.Println("waiting informers", slog.String("phase", "shutdown")) + op.logger.Info("waiting informers", slog.String("phase", "shutdown")) op.KubeEventsManager.Wait() - fmt.Println("informers stopped", slog.String("phase", "shutdown")) + op.logger.Info("informers stopped", slog.String("phase", "shutdown")) op.TaskQueues.Stop() - fmt.Println("waiting task queues", slog.String("phase", "shutdown")) + op.logger.Info("waiting task queues", slog.String("phase", "shutdown")) // Wait for queues to stop, but no more than 10 seconds op.TaskQueues.WaitStopWithTimeout(WaitQueuesTimeout) - fmt.Println("task queues stopped", slog.String("phase", "shutdown")) + op.logger.Info("task queues stopped", slog.String("phase", "shutdown")) }