From fb590535b59c692870acea90f2693a5b74c1d9e3 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Tue, 22 Jul 2025 14:48:20 +0300 Subject: [PATCH 1/4] mem leak test Signed-off-by: Timur Tuktamyshev --- examples/101-monitor-pods/hooks/pods-hook.sh | 7 +- .../101-monitor-pods/shell-operator-pod.yaml | 4 +- .../101-monitor-pods/shell-operator-rbac.yaml | 6 +- .../kubernetes_bindings_controller.go | 10 +- pkg/kube_events_manager/monitor.go | 28 ++++-- pkg/kube_events_manager/resource_informer.go | 91 ++++++++++++------- 6 files changed, 95 insertions(+), 51 deletions(-) diff --git a/examples/101-monitor-pods/hooks/pods-hook.sh b/examples/101-monitor-pods/hooks/pods-hook.sh index 046c2f1a..b428a51f 100755 --- a/examples/101-monitor-pods/hooks/pods-hook.sh +++ b/examples/101-monitor-pods/hooks/pods-hook.sh @@ -6,13 +6,16 @@ configVersion: v1 kubernetes: - apiVersion: v1 kind: Pod + jqFilter: "." executeHookOnEvent: - - Added + - Added + - Modified + - Deleted EOF else type=$(jq -r '.[0].type' $BINDING_CONTEXT_PATH) if [[ $type == "Event" ]] ; then podName=$(jq -r '.[0].object.metadata.name' $BINDING_CONTEXT_PATH) - echo "Pod '${podName}' added" + echo "Pod '${podName}' modified" fi fi diff --git a/examples/101-monitor-pods/shell-operator-pod.yaml b/examples/101-monitor-pods/shell-operator-pod.yaml index a4562389..c50ab70a 100644 --- a/examples/101-monitor-pods/shell-operator-pod.yaml +++ b/examples/101-monitor-pods/shell-operator-pod.yaml @@ -6,6 +6,6 @@ metadata: spec: containers: - name: shell-operator - image: registry.mycompany.com/shell-operator:monitor-pods - imagePullPolicy: Always + image: monitor-pods:v2 + imagePullPolicy: Never serviceAccountName: monitor-pods-acc diff --git a/examples/101-monitor-pods/shell-operator-rbac.yaml b/examples/101-monitor-pods/shell-operator-rbac.yaml index da59f09e..6c9642d5 100644 --- a/examples/101-monitor-pods/shell-operator-rbac.yaml +++ b/examples/101-monitor-pods/shell-operator-rbac.yaml @@ -5,7 +5,7 @@ metadata: name: monitor-pods-acc --- -apiVersion: rbac.authorization.k8s.io/v1beta1 +apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: monitor-pods @@ -15,7 +15,7 @@ rules: verbs: ["get", "watch", "list"] --- -apiVersion: rbac.authorization.k8s.io/v1beta1 +apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: monitor-pods @@ -26,4 +26,4 @@ roleRef: subjects: - kind: ServiceAccount name: monitor-pods-acc - namespace: example-monitor-pods \ No newline at end of file + namespace: default \ No newline at end of file diff --git a/pkg/hook/controller/kubernetes_bindings_controller.go b/pkg/hook/controller/kubernetes_bindings_controller.go index 97f500a0..f8d5ee39 100644 --- a/pkg/hook/controller/kubernetes_bindings_controller.go +++ b/pkg/hook/controller/kubernetes_bindings_controller.go @@ -316,21 +316,27 @@ func (c *kubernetesBindingsController) SnapshotsInfo() []string { } func (c *kubernetesBindingsController) SnapshotsDump() map[string]interface{} { + fmt.Println("[SNAPSHOT] kubernetesBindingsController.SnapshotsDump: called") dumps := make(map[string]interface{}) + bindingCount := 0 for _, binding := range c.KubernetesBindings { monitorID := binding.Monitor.Metadata.MonitorId if c.kubeEventsManager.HasMonitor(monitorID) { total, last := c.kubeEventsManager.GetMonitor(monitorID).SnapshotOperations() + snapshot := c.kubeEventsManager.GetMonitor(monitorID).Snapshot() + fmt.Println("[SNAPSHOT] SnapshotsDump: binding=", binding.BindingName, "snapshot size=", len(snapshot)) + fmt.Println("[SNAPSHOT] SnapshotsDump: binding=", binding.BindingName, "operations since start:", total, "since last:", last) dumps[binding.BindingName] = map[string]interface{}{ - "snapshot": c.kubeEventsManager.GetMonitor(monitorID).Snapshot(), + "snapshot": snapshot, "operations": map[string]interface{}{ "sinceStart": total, "sinceLastExecution": last, }, } + bindingCount++ } } - + fmt.Println("[SNAPSHOT] kubernetesBindingsController.SnapshotsDump: total bindings in dump:", bindingCount) return dumps } diff --git a/pkg/kube_events_manager/monitor.go b/pkg/kube_events_manager/monitor.go index da11c384..cfda39eb 100644 --- a/pkg/kube_events_manager/monitor.go +++ b/pkg/kube_events_manager/monitor.go @@ -274,21 +274,33 @@ func (m *monitor) CreateInformers() error { // Snapshot returns all existed objects from all created informers func (m *monitor) Snapshot() []kemtypes.ObjectAndFilterResult { + fmt.Println("[SNAPSHOT] monitor.Snapshot: called for monitor:", m.Name) objects := make([]kemtypes.ObjectAndFilterResult, 0) - - for _, informer := range m.ResourceInformers { - objects = append(objects, informer.getCachedObjects()...) + fmt.Println("[SNAPSHOT] monitor.Snapshot: ResourceInformers count:", len(m.ResourceInformers)) + for idx, informer := range m.ResourceInformers { + objs := informer.getCachedObjects() + fmt.Println("[SNAPSHOT] monitor.Snapshot: ResourceInformer #", idx, "returned", len(objs), "objects") + for _, obj := range objs { + fmt.Println("[SNAPSHOT] monitor.Snapshot: ResourceInformer #", idx, "object:", obj.Metadata.ResourceId) + } + objects = append(objects, objs...) } - + countVarying := 0 m.VaryingInformers.RangeValue(func(value []*resourceInformer) { - for _, informer := range value { - objects = append(objects, informer.getCachedObjects()...) + fmt.Println("[SNAPSHOT] monitor.Snapshot: VaryingInformers group #", countVarying, "count:", len(value)) + for idx, informer := range value { + objs := informer.getCachedObjects() + fmt.Println("[SNAPSHOT] monitor.Snapshot: VaryingInformer #", countVarying, "/", idx, "returned", len(objs), "objects") + for _, obj := range objs { + fmt.Println("[SNAPSHOT] monitor.Snapshot: VaryingInformer #", countVarying, "/", idx, "object:", obj.Metadata.ResourceId) + } + objects = append(objects, objs...) } + countVarying++ }) - + fmt.Println("[SNAPSHOT] monitor.Snapshot: total objects in snapshot:", len(objects)) // Sort objects by namespace and name sort.Sort(kemtypes.ByNamespaceAndName(objects)) - return objects } diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index 3d15e570..e14cd966 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -154,8 +154,10 @@ func (ei *resourceInformer) createSharedInformer() error { // Snapshot returns all cached objects for this informer func (ei *resourceInformer) getCachedObjects() []kemtypes.ObjectAndFilterResult { ei.cacheLock.RLock() + fmt.Println("[CACHE] getCachedObjects: reading all cached objects, count:", len(ei.cachedObjects)) res := make([]kemtypes.ObjectAndFilterResult, 0) - for _, obj := range ei.cachedObjects { + for k, obj := range ei.cachedObjects { + fmt.Println("[CACHE] getCachedObjects: key=", k, "object=", obj) res = append(res, *obj) } ei.cacheLock.RUnlock() @@ -163,6 +165,7 @@ func (ei *resourceInformer) getCachedObjects() []kemtypes.ObjectAndFilterResult // Reset eventBuf if needed. ei.eventBufLock.Lock() if !ei.eventCbEnabled { + fmt.Println("[CACHE] getCachedObjects: eventCb not enabled, resetting eventBuf") ei.eventBuf = nil } ei.eventBufLock.Unlock() @@ -239,19 +242,17 @@ func (ei *resourceInformer) loadExistedObjects() error { } filteredObjects[objFilterRes.Metadata.ResourceId] = objFilterRes - - log.Debug("initial list: cached with checksum", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("resourceId", objFilterRes.Metadata.ResourceId), - slog.String("checksum", objFilterRes.Metadata.Checksum)) + fmt.Println("[CACHE] loadExistedObjects: will add object to cache, key=", objFilterRes.Metadata.ResourceId, "object=", objFilterRes) } // Save objects to the cache. ei.cacheLock.Lock() - defer ei.cacheLock.Unlock() for k, v := range filteredObjects { + fmt.Println("[CACHE] loadExistedObjects: writing to cache, key=", k, "object=", v) ei.cachedObjects[k] = v } + fmt.Println("[CACHE] loadExistedObjects: cache now has", len(ei.cachedObjects), "objects") + ei.cacheLock.Unlock() ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects)) ei.metricStorage.GaugeSet("{PREFIX}kube_snapshot_objects", float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) @@ -327,31 +328,20 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty case kemtypes.WatchEventAdded: fallthrough case kemtypes.WatchEventModified: - // Update object in cache ei.cacheLock.Lock() cachedObject, objectInCache := ei.cachedObjects[resourceId] skipEvent := false if objectInCache && cachedObject.Metadata.Checksum == objFilterRes.Metadata.Checksum { - // update object in cache and do not send event - log.Debug("skip KubeEvent", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("eventType", string(eventType)), - slog.String("resourceId", resourceId), - ) + fmt.Println("[CACHE] handleWatchEvent:", eventType, "skipping event for object with same checksum, key=", resourceId) skipEvent = true } - ei.cachedObjects[resourceId] = objFilterRes - // Update cached objects info. - ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects)) - if eventType == kemtypes.WatchEventAdded { - ei.cachedObjectsInfo.Added++ - ei.cachedObjectsIncrement.Added++ + if objectInCache { + fmt.Println("[CACHE] handleWatchEvent:", eventType, "update existing object in cache, key=", resourceId, "old=", cachedObject, "new=", objFilterRes) } else { - ei.cachedObjectsInfo.Modified++ - ei.cachedObjectsIncrement.Modified++ + fmt.Println("[CACHE] handleWatchEvent:", eventType, "add new object to cache, key=", resourceId, "object=", objFilterRes) } - // Update metrics. - ei.metricStorage.GaugeSet("{PREFIX}kube_snapshot_objects", float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) + ei.cachedObjects[resourceId] = objFilterRes + fmt.Println("[CACHE] handleWatchEvent:", eventType, "cache now has", len(ei.cachedObjects), "objects") ei.cacheLock.Unlock() if skipEvent { return @@ -359,17 +349,14 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty case kemtypes.WatchEventDeleted: ei.cacheLock.Lock() - delete(ei.cachedObjects, resourceId) - // Update cached objects info. - ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects)) - if ei.cachedObjectsInfo.Count == 0 { - ei.cachedObjectsInfo.Cleaned++ - ei.cachedObjectsIncrement.Cleaned++ + _, existed := ei.cachedObjects[resourceId] + if existed { + fmt.Println("[CACHE] handleWatchEvent: Deleted, removing object from cache, key=", resourceId) + delete(ei.cachedObjects, resourceId) + fmt.Println("[CACHE] handleWatchEvent: Deleted, cache now has", len(ei.cachedObjects), "objects") + } else { + fmt.Println("[CACHE] handleWatchEvent: Deleted, object not found in cache, key=", resourceId) } - ei.cachedObjectsInfo.Deleted++ - ei.cachedObjectsIncrement.Deleted++ - // Update metrics. - ei.metricStorage.GaugeSet("{PREFIX}kube_snapshot_objects", float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) ei.cacheLock.Unlock() } @@ -458,6 +445,19 @@ func (ei *resourceInformer) start() { } }() + go func() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ei.ctx.Done(): + return + case <-ticker.C: + ei.LogDanglingObjects() + } + } + }() + // TODO: separate handler and informer errorHandler := newWatchErrorHandler(ei.Monitor.Metadata.DebugName, ei.Monitor.Kind, ei.Monitor.Metadata.LogLabels, ei.metricStorage, ei.logger.Named("watch-error-handler")) err := DefaultFactoryStore.Start(ei.ctx, ei.id, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler) @@ -489,3 +489,26 @@ func (ei *resourceInformer) getCachedObjectsInfoIncrement() CachedObjectsInfo { ei.cachedObjectsIncrement = &CachedObjectsInfo{} return info } + +func (ei *resourceInformer) LogDanglingObjects() { + // Получить актуальный список объектов из Kubernetes + objList, err := ei.KubeClient.Dynamic(). + Resource(ei.GroupVersionResource). + Namespace(ei.Namespace). + List(context.TODO(), ei.ListOptions) + if err != nil { + fmt.Println("[DANGLING] Error listing objects from API:", err) + return + } + actual := make(map[string]bool) + for _, item := range objList.Items { + actual[resourceId(&item)] = true + } + ei.cacheLock.RLock() + for k := range ei.cachedObjects { + if !actual[k] { + fmt.Println("[DANGLING] Object in cache but not in API:", k) + } + } + ei.cacheLock.RUnlock() +} From de73b85e6acbfc20e716ec49037e31a55b47068a Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Tue, 22 Jul 2025 16:15:56 +0300 Subject: [PATCH 2/4] update logs Signed-off-by: Timur Tuktamyshev --- examples/101-monitor-pods/Dockerfile | 2 +- .../101-monitor-pods/shell-operator-pod.yaml | 2 +- pkg/kube_events_manager/resource_informer.go | 28 +++++++++++++++---- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/examples/101-monitor-pods/Dockerfile b/examples/101-monitor-pods/Dockerfile index 3e7d33cb..96256277 100644 --- a/examples/101-monitor-pods/Dockerfile +++ b/examples/101-monitor-pods/Dockerfile @@ -1,2 +1,2 @@ -FROM ghcr.io/flant/shell-operator:latest +FROM shell:v2 ADD hooks /hooks diff --git a/examples/101-monitor-pods/shell-operator-pod.yaml b/examples/101-monitor-pods/shell-operator-pod.yaml index c50ab70a..0d4922dd 100644 --- a/examples/101-monitor-pods/shell-operator-pod.yaml +++ b/examples/101-monitor-pods/shell-operator-pod.yaml @@ -6,6 +6,6 @@ metadata: spec: containers: - name: shell-operator - image: monitor-pods:v2 + image: monitor-pods:v3 imagePullPolicy: Never serviceAccountName: monitor-pods-acc diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index e14cd966..af34495f 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -446,14 +446,22 @@ func (ei *resourceInformer) start() { }() go func() { + fmt.Println("[DANGLING] LogDanglingObjects ticker goroutine started for informer:", ei.Monitor.Metadata.DebugName) ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for { select { case <-ei.ctx.Done(): + fmt.Println("[DANGLING] LogDanglingObjects ticker goroutine stopped for informer:", ei.Monitor.Metadata.DebugName) return case <-ticker.C: - ei.LogDanglingObjects() + fmt.Println("[DANGLING] LogDanglingObjects ticker tick for informer:", ei.Monitor.Metadata.DebugName) + dangling := ei.LogDanglingObjects() + if len(dangling) == 0 { + fmt.Println("[DANGLING] LogDanglingObjects ticker: SUCCESS — no dangling objects for informer:", ei.Monitor.Metadata.DebugName) + } else { + fmt.Println("[DANGLING] LogDanglingObjects ticker: found", len(dangling), "dangling objects for informer:", ei.Monitor.Metadata.DebugName, dangling) + } } } }() @@ -490,25 +498,33 @@ func (ei *resourceInformer) getCachedObjectsInfoIncrement() CachedObjectsInfo { return info } -func (ei *resourceInformer) LogDanglingObjects() { - // Получить актуальный список объектов из Kubernetes +func (ei *resourceInformer) LogDanglingObjects() []string { + fmt.Println("[DANGLING] LogDanglingObjects: start for informer:", ei.Monitor.Metadata.DebugName) objList, err := ei.KubeClient.Dynamic(). Resource(ei.GroupVersionResource). Namespace(ei.Namespace). List(context.TODO(), ei.ListOptions) if err != nil { - fmt.Println("[DANGLING] Error listing objects from API:", err) - return + fmt.Println("[DANGLING] LogDanglingObjects: error listing objects from API:", err) + return nil } actual := make(map[string]bool) for _, item := range objList.Items { actual[resourceId(&item)] = true } ei.cacheLock.RLock() + dangling := []string{} for k := range ei.cachedObjects { if !actual[k] { - fmt.Println("[DANGLING] Object in cache but not in API:", k) + dangling = append(dangling, k) } } + fmt.Println("[DANGLING] LogDanglingObjects: informer:", ei.Monitor.Metadata.DebugName, "cache size:", len(ei.cachedObjects), "actual in API:", len(actual)) + if len(dangling) > 0 { + fmt.Println("[DANGLING] LogDanglingObjects: found", len(dangling), "dangling objects:", dangling) + } else { + fmt.Println("[DANGLING] LogDanglingObjects: SUCCESS — no dangling objects found!") + } ei.cacheLock.RUnlock() + return dangling } From e2f1e74ad76ef75a288a9ecb81d49a8ec6b47352 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Tue, 22 Jul 2025 17:45:59 +0300 Subject: [PATCH 3/4] test Signed-off-by: Timur Tuktamyshev --- pkg/kube_events_manager/resource_informer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index af34495f..e8682581 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -336,9 +336,9 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty skipEvent = true } if objectInCache { - fmt.Println("[CACHE] handleWatchEvent:", eventType, "update existing object in cache, key=", resourceId, "old=", cachedObject, "new=", objFilterRes) + fmt.Println("[CACHE] handleWatchEvent:", eventType, "update existing object in cache, key=", resourceId) } else { - fmt.Println("[CACHE] handleWatchEvent:", eventType, "add new object to cache, key=", resourceId, "object=", objFilterRes) + fmt.Println("[CACHE] handleWatchEvent:", eventType, "add new object to cache, key=", resourceId) } ei.cachedObjects[resourceId] = objFilterRes fmt.Println("[CACHE] handleWatchEvent:", eventType, "cache now has", len(ei.cachedObjects), "objects") From d54acb59143adc20da955be53ed361c67dc639e6 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Wed, 23 Jul 2025 16:38:27 +0300 Subject: [PATCH 4/4] t Signed-off-by: Timur Tuktamyshev --- pkg/kube_events_manager/factory.go | 1 - pkg/kube_events_manager/resource_informer.go | 2 ++ pkg/utils/checksum/checksum.go | 10 ++++++---- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/kube_events_manager/factory.go b/pkg/kube_events_manager/factory.go index a52ebab4..8e960a59 100644 --- a/pkg/kube_events_manager/factory.go +++ b/pkg/kube_events_manager/factory.go @@ -117,7 +117,6 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna if !informer.HasSynced() { go informer.Run(factory.ctx.Done()) - if err := wait.PollUntilContextCancel(ctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) { return informer.HasSynced(), nil }); err != nil { diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index e8682581..2a2ca5b3 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -229,7 +229,9 @@ func (ei *resourceInformer) loadExistedObjects() error { defer measure.Duration(func(d time.Duration) { ei.metricStorage.HistogramObserve("{PREFIX}kube_jq_filter_duration_seconds", d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) })() + filter := jq.NewFilter() + objFilterRes, err = applyFilter(ei.Monitor.JqFilter, filter, ei.Monitor.FilterFunc, &obj) }() diff --git a/pkg/utils/checksum/checksum.go b/pkg/utils/checksum/checksum.go index ce1a4ae7..da90cb0a 100644 --- a/pkg/utils/checksum/checksum.go +++ b/pkg/utils/checksum/checksum.go @@ -1,20 +1,22 @@ package checksum import ( - "crypto/md5" "encoding/hex" "os" "path/filepath" "sort" + + "github.com/cespare/xxhash/v2" ) func CalculateChecksum(stringArr ...string) string { - hasher := md5.New() + digest := xxhash.New() sort.Strings(stringArr) for _, value := range stringArr { - _, _ = hasher.Write([]byte(value)) + _, _ = digest.Write([]byte(value)) } - return hex.EncodeToString(hasher.Sum(nil)) + + return hex.EncodeToString(digest.Sum(nil)) } func CalculateChecksumOfFile(path string) (string, error) {