Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 47 additions & 10 deletions pkg/kube_events_manager/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,38 +36,44 @@ type Factory struct {
handlerRegistrations map[string]cache.ResourceEventHandlerRegistration
ctx context.Context
cancel context.CancelFunc
// done is closed when the underlying informer.Run returns
done chan struct{}
}

type FactoryStore struct {
mu sync.Mutex
data map[FactoryIndex]Factory
cond *sync.Cond
data map[FactoryIndex]*Factory
}

func NewFactoryStore() *FactoryStore {
return &FactoryStore{
data: make(map[FactoryIndex]Factory),
fs := &FactoryStore{
data: make(map[FactoryIndex]*Factory),
}
fs.cond = sync.NewCond(&fs.mu)
return fs
}

func (c *FactoryStore) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
c.data = make(map[FactoryIndex]Factory)
c.data = make(map[FactoryIndex]*Factory)
}

func (c *FactoryStore) add(index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) {
ctx, cancel := context.WithCancel(context.Background())
c.data[index] = Factory{
c.data[index] = &Factory{
shared: f,
handlerRegistrations: make(map[string]cache.ResourceEventHandlerRegistration),
ctx: ctx,
cancel: cancel,
done: nil,
}
log.Debug("Factory store: added a new factory for index",
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
}

func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) Factory {
func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) *Factory {
f, ok := c.data[index]
if ok {
log.Debug("Factory store: the factory with index found",
Expand Down Expand Up @@ -115,9 +121,18 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna
slog.Int("value", len(factory.handlerRegistrations)),
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))

if !informer.HasSynced() {
go informer.Run(factory.ctx.Done())
// Ensure informer.Run is started once and tracked
if factory.done == nil {
factory.done = make(chan struct{})
go func() {
informer.Run(factory.ctx.Done())
close(factory.done)
log.Debug("Factory store: informer goroutine exited",
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
}()
}

if !informer.HasSynced() {
if err := wait.PollUntilContextCancel(ctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) {
return informer.HasSynced(), nil
}); err != nil {
Expand All @@ -131,11 +146,10 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna

func (c *FactoryStore) Stop(informerId string, index FactoryIndex) {
c.mu.Lock()
defer c.mu.Unlock()

f, ok := c.data[index]
if !ok {
// already deleted
c.mu.Unlock()
return
}

Expand All @@ -146,15 +160,38 @@ func (c *FactoryStore) Stop(informerId string, index FactoryIndex) {
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()),
log.Err(err))
}

delete(f.handlerRegistrations, informerId)
log.Debug("Factory store: decreased usage counter of the factory",
slog.Int("value", len(f.handlerRegistrations)),
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
if len(f.handlerRegistrations) == 0 {
log.Debug("Factory store: last handler removed, canceling shared informer",
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
done := f.done
f.cancel()
c.mu.Unlock()
if done != nil {
<-done
}
c.mu.Lock()
delete(c.data, index)
log.Debug("Factory store: deleted factory",
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
c.cond.Broadcast()
}
}
c.mu.Unlock()
}

// WaitStopped blocks until there is no factory for the index
func (c *FactoryStore) WaitStopped(index FactoryIndex) {
c.mu.Lock()
for {
if _, ok := c.data[index]; !ok {
c.mu.Unlock()
return
}
c.cond.Wait()
}
}
23 changes: 15 additions & 8 deletions pkg/kube_events_manager/kube_events_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type KubeEventsManager interface {
StopMonitor(monitorID string) error

Ch() chan kemtypes.KubeEvent
PauseHandleEvents()
Stop()
Wait()
}

// kubeEventsManager is a main implementation of KubeEventsManager.
Expand Down Expand Up @@ -138,14 +139,20 @@ func (mgr *kubeEventsManager) Ch() chan kemtypes.KubeEvent {
return mgr.KubeEventCh
}

// PauseHandleEvents set flags for all informers to ignore incoming events.
// Useful for shutdown without panicking.
// Calling cancel() leads to a race and panicking, see https://github.com/kubernetes/kubernetes/issues/59822
func (mgr *kubeEventsManager) PauseHandleEvents() {
// Stop the kube events manager and all the informers inside monitors.
func (mgr *kubeEventsManager) Stop() {
mgr.cancel()
}

func (mgr *kubeEventsManager) Wait() {
mgr.m.RLock()
defer mgr.m.RUnlock()
for _, monitor := range mgr.Monitors {
monitor.PauseHandleEvents()
monitors := make([]Monitor, 0, len(mgr.Monitors))
for _, mon := range mgr.Monitors {
monitors = append(monitors, mon)
}
mgr.m.RUnlock()
for _, mon := range monitors {
mon.Wait()
}
}

Expand Down
16 changes: 6 additions & 10 deletions pkg/kube_events_manager/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Monitor interface {
CreateInformers() error
Start(context.Context)
Stop()
PauseHandleEvents()
Wait()
Snapshot() []kemtypes.ObjectAndFilterResult
EnableKubeEventCb()
GetConfig() *MonitorConfig
Expand Down Expand Up @@ -373,22 +373,18 @@ func (m *monitor) Stop() {
}
}

// PauseHandleEvents set flags for all informers to ignore incoming events.
// Useful for shutdown without panicking.
// Calling cancel() leads to a race and panicking, see https://github.com/kubernetes/kubernetes/issues/59822
func (m *monitor) PauseHandleEvents() {
// Wait waits for all started informers to stop
func (m *monitor) Wait() {
for _, informer := range m.ResourceInformers {
informer.pauseHandleEvents()
informer.wait()
}

m.VaryingInformers.RangeValue(func(value []*resourceInformer) {
for _, informer := range value {
informer.pauseHandleEvents()
informer.wait()
}
})

if m.NamespaceInformer != nil {
m.NamespaceInformer.pauseHandleEvents()
m.NamespaceInformer.wait()
}
}

Expand Down
14 changes: 11 additions & 3 deletions pkg/kube_events_manager/namespace_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type namespaceInformer struct {
ctx context.Context
cancel context.CancelFunc
stopped bool
done chan struct{}

KubeClient *klient.Client
Monitor *MonitorConfig
Expand Down Expand Up @@ -128,13 +129,18 @@ func (ni *namespaceInformer) start() {
return
}
cctx, cancel := context.WithCancel(ni.ctx)
ni.done = make(chan struct{})
go func() {
<-ni.ctx.Done()
ni.stopped = true
cancel()
}()

go ni.SharedInformer.Run(cctx.Done())
go func() {
ni.SharedInformer.Run(cctx.Done())
close(ni.done)
log.Debug("Namespace informer goroutine exited", slog.String("name", ni.Monitor.Metadata.DebugName))
}()

if err := wait.PollUntilContextCancel(cctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) {
return ni.SharedInformer.HasSynced(), nil
Expand All @@ -146,6 +152,8 @@ func (ni *namespaceInformer) start() {
log.Debug("Informer is ready", slog.String("debugName", ni.Monitor.Metadata.DebugName))
}

func (ni *namespaceInformer) pauseHandleEvents() {
ni.stopped = true
func (ni *namespaceInformer) wait() {
if ni.done != nil {
<-ni.done
}
}
6 changes: 3 additions & 3 deletions pkg/kube_events_manager/resource_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,9 +469,9 @@ func (ei *resourceInformer) start() {
log.Debug("informer is ready", slog.String("debugName", ei.Monitor.Metadata.DebugName))
}

func (ei *resourceInformer) pauseHandleEvents() {
log.Debug("PAUSE resource informer", slog.String("debugName", ei.Monitor.Metadata.DebugName))
ei.stopped = true
// wait blocks until the underlying shared informer for this FactoryIndex is stopped
func (ei *resourceInformer) wait() {
DefaultFactoryStore.WaitStopped(ei.FactoryIndex)
}

// CachedObjectsInfo returns info accumulated from start.
Expand Down
11 changes: 10 additions & 1 deletion pkg/shell-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,9 +960,18 @@ func (op *ShellOperator) runMetrics() {

// Shutdown pause kubernetes events handling and stop queues. Wait for queues to stop.
func (op *ShellOperator) Shutdown() {
op.logger.Info("shutdown begin", slog.String("phase", "shutdown"))
op.ScheduleManager.Stop()
op.KubeEventsManager.PauseHandleEvents()
op.logger.Info("schedule manager stopped", slog.String("phase", "shutdown"))

op.KubeEventsManager.Stop()
op.logger.Info("waiting informers", slog.String("phase", "shutdown"))
op.KubeEventsManager.Wait()
op.logger.Info("informers stopped", slog.String("phase", "shutdown"))

op.TaskQueues.Stop()
op.logger.Info("waiting task queues", slog.String("phase", "shutdown"))
// Wait for queues to stop, but no more than 10 seconds
op.TaskQueues.WaitStopWithTimeout(WaitQueuesTimeout)
op.logger.Info("task queues stopped", slog.String("phase", "shutdown"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var _ = Describe("Binding 'kubernetes' with kind 'Pod' should emit KubeEvent obj

AfterEach(func() {
fmt.Fprintf(GinkgoWriter, "Starting AfterEach\n")
KubeEventsManager.PauseHandleEvents()
KubeEventsManager.Stop()
fmt.Fprintf(GinkgoWriter, "Finished AfterEach\n")
})

Expand Down
Loading