diff --git a/examples/101-monitor-pods/Dockerfile b/examples/101-monitor-pods/Dockerfile index 3e7d33cb..96256277 100644 --- a/examples/101-monitor-pods/Dockerfile +++ b/examples/101-monitor-pods/Dockerfile @@ -1,2 +1,2 @@ -FROM ghcr.io/flant/shell-operator:latest +FROM shell:v2 ADD hooks /hooks diff --git a/examples/101-monitor-pods/hooks/pods-hook.sh b/examples/101-monitor-pods/hooks/pods-hook.sh index 046c2f1a..b428a51f 100755 --- a/examples/101-monitor-pods/hooks/pods-hook.sh +++ b/examples/101-monitor-pods/hooks/pods-hook.sh @@ -6,13 +6,16 @@ configVersion: v1 kubernetes: - apiVersion: v1 kind: Pod + jqFilter: "." executeHookOnEvent: - - Added + - Added + - Modified + - Deleted EOF else type=$(jq -r '.[0].type' $BINDING_CONTEXT_PATH) if [[ $type == "Event" ]] ; then podName=$(jq -r '.[0].object.metadata.name' $BINDING_CONTEXT_PATH) - echo "Pod '${podName}' added" + echo "Pod '${podName}' modified" fi fi diff --git a/examples/101-monitor-pods/shell-operator-pod.yaml b/examples/101-monitor-pods/shell-operator-pod.yaml index a4562389..0d4922dd 100644 --- a/examples/101-monitor-pods/shell-operator-pod.yaml +++ b/examples/101-monitor-pods/shell-operator-pod.yaml @@ -6,6 +6,6 @@ metadata: spec: containers: - name: shell-operator - image: registry.mycompany.com/shell-operator:monitor-pods - imagePullPolicy: Always + image: monitor-pods:v3 + imagePullPolicy: Never serviceAccountName: monitor-pods-acc diff --git a/examples/101-monitor-pods/shell-operator-rbac.yaml b/examples/101-monitor-pods/shell-operator-rbac.yaml index da59f09e..6c9642d5 100644 --- a/examples/101-monitor-pods/shell-operator-rbac.yaml +++ b/examples/101-monitor-pods/shell-operator-rbac.yaml @@ -5,7 +5,7 @@ metadata: name: monitor-pods-acc --- -apiVersion: rbac.authorization.k8s.io/v1beta1 +apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: monitor-pods @@ -15,7 +15,7 @@ rules: verbs: ["get", "watch", "list"] --- -apiVersion: rbac.authorization.k8s.io/v1beta1 +apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: monitor-pods @@ -26,4 +26,4 @@ roleRef: subjects: - kind: ServiceAccount name: monitor-pods-acc - namespace: example-monitor-pods \ No newline at end of file + namespace: default \ No newline at end of file diff --git a/pkg/hook/controller/kubernetes_bindings_controller.go b/pkg/hook/controller/kubernetes_bindings_controller.go index 97f500a0..f8d5ee39 100644 --- a/pkg/hook/controller/kubernetes_bindings_controller.go +++ b/pkg/hook/controller/kubernetes_bindings_controller.go @@ -316,21 +316,27 @@ func (c *kubernetesBindingsController) SnapshotsInfo() []string { } func (c *kubernetesBindingsController) SnapshotsDump() map[string]interface{} { + fmt.Println("[SNAPSHOT] kubernetesBindingsController.SnapshotsDump: called") dumps := make(map[string]interface{}) + bindingCount := 0 for _, binding := range c.KubernetesBindings { monitorID := binding.Monitor.Metadata.MonitorId if c.kubeEventsManager.HasMonitor(monitorID) { total, last := c.kubeEventsManager.GetMonitor(monitorID).SnapshotOperations() + snapshot := c.kubeEventsManager.GetMonitor(monitorID).Snapshot() + fmt.Println("[SNAPSHOT] SnapshotsDump: binding=", binding.BindingName, "snapshot size=", len(snapshot)) + fmt.Println("[SNAPSHOT] SnapshotsDump: binding=", binding.BindingName, "operations since start:", total, "since last:", last) dumps[binding.BindingName] = map[string]interface{}{ - "snapshot": c.kubeEventsManager.GetMonitor(monitorID).Snapshot(), + "snapshot": snapshot, "operations": map[string]interface{}{ "sinceStart": total, "sinceLastExecution": last, }, } + bindingCount++ } } - + fmt.Println("[SNAPSHOT] kubernetesBindingsController.SnapshotsDump: total bindings in dump:", bindingCount) return dumps } diff --git a/pkg/kube_events_manager/factory.go b/pkg/kube_events_manager/factory.go index a52ebab4..8e960a59 100644 --- a/pkg/kube_events_manager/factory.go +++ b/pkg/kube_events_manager/factory.go @@ -117,7 +117,6 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna if !informer.HasSynced() { go informer.Run(factory.ctx.Done()) - if err := wait.PollUntilContextCancel(ctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) { return informer.HasSynced(), nil }); err != nil { diff --git a/pkg/kube_events_manager/monitor.go b/pkg/kube_events_manager/monitor.go index da11c384..cfda39eb 100644 --- a/pkg/kube_events_manager/monitor.go +++ b/pkg/kube_events_manager/monitor.go @@ -274,21 +274,33 @@ func (m *monitor) CreateInformers() error { // Snapshot returns all existed objects from all created informers func (m *monitor) Snapshot() []kemtypes.ObjectAndFilterResult { + fmt.Println("[SNAPSHOT] monitor.Snapshot: called for monitor:", m.Name) objects := make([]kemtypes.ObjectAndFilterResult, 0) - - for _, informer := range m.ResourceInformers { - objects = append(objects, informer.getCachedObjects()...) + fmt.Println("[SNAPSHOT] monitor.Snapshot: ResourceInformers count:", len(m.ResourceInformers)) + for idx, informer := range m.ResourceInformers { + objs := informer.getCachedObjects() + fmt.Println("[SNAPSHOT] monitor.Snapshot: ResourceInformer #", idx, "returned", len(objs), "objects") + for _, obj := range objs { + fmt.Println("[SNAPSHOT] monitor.Snapshot: ResourceInformer #", idx, "object:", obj.Metadata.ResourceId) + } + objects = append(objects, objs...) } - + countVarying := 0 m.VaryingInformers.RangeValue(func(value []*resourceInformer) { - for _, informer := range value { - objects = append(objects, informer.getCachedObjects()...) + fmt.Println("[SNAPSHOT] monitor.Snapshot: VaryingInformers group #", countVarying, "count:", len(value)) + for idx, informer := range value { + objs := informer.getCachedObjects() + fmt.Println("[SNAPSHOT] monitor.Snapshot: VaryingInformer #", countVarying, "/", idx, "returned", len(objs), "objects") + for _, obj := range objs { + fmt.Println("[SNAPSHOT] monitor.Snapshot: VaryingInformer #", countVarying, "/", idx, "object:", obj.Metadata.ResourceId) + } + objects = append(objects, objs...) } + countVarying++ }) - + fmt.Println("[SNAPSHOT] monitor.Snapshot: total objects in snapshot:", len(objects)) // Sort objects by namespace and name sort.Sort(kemtypes.ByNamespaceAndName(objects)) - return objects } diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index 3d15e570..2a2ca5b3 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -154,8 +154,10 @@ func (ei *resourceInformer) createSharedInformer() error { // Snapshot returns all cached objects for this informer func (ei *resourceInformer) getCachedObjects() []kemtypes.ObjectAndFilterResult { ei.cacheLock.RLock() + fmt.Println("[CACHE] getCachedObjects: reading all cached objects, count:", len(ei.cachedObjects)) res := make([]kemtypes.ObjectAndFilterResult, 0) - for _, obj := range ei.cachedObjects { + for k, obj := range ei.cachedObjects { + fmt.Println("[CACHE] getCachedObjects: key=", k, "object=", obj) res = append(res, *obj) } ei.cacheLock.RUnlock() @@ -163,6 +165,7 @@ func (ei *resourceInformer) getCachedObjects() []kemtypes.ObjectAndFilterResult // Reset eventBuf if needed. ei.eventBufLock.Lock() if !ei.eventCbEnabled { + fmt.Println("[CACHE] getCachedObjects: eventCb not enabled, resetting eventBuf") ei.eventBuf = nil } ei.eventBufLock.Unlock() @@ -226,7 +229,9 @@ func (ei *resourceInformer) loadExistedObjects() error { defer measure.Duration(func(d time.Duration) { ei.metricStorage.HistogramObserve("{PREFIX}kube_jq_filter_duration_seconds", d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) })() + filter := jq.NewFilter() + objFilterRes, err = applyFilter(ei.Monitor.JqFilter, filter, ei.Monitor.FilterFunc, &obj) }() @@ -239,19 +244,17 @@ func (ei *resourceInformer) loadExistedObjects() error { } filteredObjects[objFilterRes.Metadata.ResourceId] = objFilterRes - - log.Debug("initial list: cached with checksum", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("resourceId", objFilterRes.Metadata.ResourceId), - slog.String("checksum", objFilterRes.Metadata.Checksum)) + fmt.Println("[CACHE] loadExistedObjects: will add object to cache, key=", objFilterRes.Metadata.ResourceId, "object=", objFilterRes) } // Save objects to the cache. ei.cacheLock.Lock() - defer ei.cacheLock.Unlock() for k, v := range filteredObjects { + fmt.Println("[CACHE] loadExistedObjects: writing to cache, key=", k, "object=", v) ei.cachedObjects[k] = v } + fmt.Println("[CACHE] loadExistedObjects: cache now has", len(ei.cachedObjects), "objects") + ei.cacheLock.Unlock() ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects)) ei.metricStorage.GaugeSet("{PREFIX}kube_snapshot_objects", float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) @@ -327,31 +330,20 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty case kemtypes.WatchEventAdded: fallthrough case kemtypes.WatchEventModified: - // Update object in cache ei.cacheLock.Lock() cachedObject, objectInCache := ei.cachedObjects[resourceId] skipEvent := false if objectInCache && cachedObject.Metadata.Checksum == objFilterRes.Metadata.Checksum { - // update object in cache and do not send event - log.Debug("skip KubeEvent", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("eventType", string(eventType)), - slog.String("resourceId", resourceId), - ) + fmt.Println("[CACHE] handleWatchEvent:", eventType, "skipping event for object with same checksum, key=", resourceId) skipEvent = true } - ei.cachedObjects[resourceId] = objFilterRes - // Update cached objects info. - ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects)) - if eventType == kemtypes.WatchEventAdded { - ei.cachedObjectsInfo.Added++ - ei.cachedObjectsIncrement.Added++ + if objectInCache { + fmt.Println("[CACHE] handleWatchEvent:", eventType, "update existing object in cache, key=", resourceId) } else { - ei.cachedObjectsInfo.Modified++ - ei.cachedObjectsIncrement.Modified++ + fmt.Println("[CACHE] handleWatchEvent:", eventType, "add new object to cache, key=", resourceId) } - // Update metrics. - ei.metricStorage.GaugeSet("{PREFIX}kube_snapshot_objects", float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) + ei.cachedObjects[resourceId] = objFilterRes + fmt.Println("[CACHE] handleWatchEvent:", eventType, "cache now has", len(ei.cachedObjects), "objects") ei.cacheLock.Unlock() if skipEvent { return @@ -359,17 +351,14 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty case kemtypes.WatchEventDeleted: ei.cacheLock.Lock() - delete(ei.cachedObjects, resourceId) - // Update cached objects info. - ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects)) - if ei.cachedObjectsInfo.Count == 0 { - ei.cachedObjectsInfo.Cleaned++ - ei.cachedObjectsIncrement.Cleaned++ + _, existed := ei.cachedObjects[resourceId] + if existed { + fmt.Println("[CACHE] handleWatchEvent: Deleted, removing object from cache, key=", resourceId) + delete(ei.cachedObjects, resourceId) + fmt.Println("[CACHE] handleWatchEvent: Deleted, cache now has", len(ei.cachedObjects), "objects") + } else { + fmt.Println("[CACHE] handleWatchEvent: Deleted, object not found in cache, key=", resourceId) } - ei.cachedObjectsInfo.Deleted++ - ei.cachedObjectsIncrement.Deleted++ - // Update metrics. - ei.metricStorage.GaugeSet("{PREFIX}kube_snapshot_objects", float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels) ei.cacheLock.Unlock() } @@ -458,6 +447,27 @@ func (ei *resourceInformer) start() { } }() + go func() { + fmt.Println("[DANGLING] LogDanglingObjects ticker goroutine started for informer:", ei.Monitor.Metadata.DebugName) + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ei.ctx.Done(): + fmt.Println("[DANGLING] LogDanglingObjects ticker goroutine stopped for informer:", ei.Monitor.Metadata.DebugName) + return + case <-ticker.C: + fmt.Println("[DANGLING] LogDanglingObjects ticker tick for informer:", ei.Monitor.Metadata.DebugName) + dangling := ei.LogDanglingObjects() + if len(dangling) == 0 { + fmt.Println("[DANGLING] LogDanglingObjects ticker: SUCCESS — no dangling objects for informer:", ei.Monitor.Metadata.DebugName) + } else { + fmt.Println("[DANGLING] LogDanglingObjects ticker: found", len(dangling), "dangling objects for informer:", ei.Monitor.Metadata.DebugName, dangling) + } + } + } + }() + // TODO: separate handler and informer errorHandler := newWatchErrorHandler(ei.Monitor.Metadata.DebugName, ei.Monitor.Kind, ei.Monitor.Metadata.LogLabels, ei.metricStorage, ei.logger.Named("watch-error-handler")) err := DefaultFactoryStore.Start(ei.ctx, ei.id, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler) @@ -489,3 +499,34 @@ func (ei *resourceInformer) getCachedObjectsInfoIncrement() CachedObjectsInfo { ei.cachedObjectsIncrement = &CachedObjectsInfo{} return info } + +func (ei *resourceInformer) LogDanglingObjects() []string { + fmt.Println("[DANGLING] LogDanglingObjects: start for informer:", ei.Monitor.Metadata.DebugName) + objList, err := ei.KubeClient.Dynamic(). + Resource(ei.GroupVersionResource). + Namespace(ei.Namespace). + List(context.TODO(), ei.ListOptions) + if err != nil { + fmt.Println("[DANGLING] LogDanglingObjects: error listing objects from API:", err) + return nil + } + actual := make(map[string]bool) + for _, item := range objList.Items { + actual[resourceId(&item)] = true + } + ei.cacheLock.RLock() + dangling := []string{} + for k := range ei.cachedObjects { + if !actual[k] { + dangling = append(dangling, k) + } + } + fmt.Println("[DANGLING] LogDanglingObjects: informer:", ei.Monitor.Metadata.DebugName, "cache size:", len(ei.cachedObjects), "actual in API:", len(actual)) + if len(dangling) > 0 { + fmt.Println("[DANGLING] LogDanglingObjects: found", len(dangling), "dangling objects:", dangling) + } else { + fmt.Println("[DANGLING] LogDanglingObjects: SUCCESS — no dangling objects found!") + } + ei.cacheLock.RUnlock() + return dangling +} diff --git a/pkg/utils/checksum/checksum.go b/pkg/utils/checksum/checksum.go index ce1a4ae7..da90cb0a 100644 --- a/pkg/utils/checksum/checksum.go +++ b/pkg/utils/checksum/checksum.go @@ -1,20 +1,22 @@ package checksum import ( - "crypto/md5" "encoding/hex" "os" "path/filepath" "sort" + + "github.com/cespare/xxhash/v2" ) func CalculateChecksum(stringArr ...string) string { - hasher := md5.New() + digest := xxhash.New() sort.Strings(stringArr) for _, value := range stringArr { - _, _ = hasher.Write([]byte(value)) + _, _ = digest.Write([]byte(value)) } - return hex.EncodeToString(hasher.Sum(nil)) + + return hex.EncodeToString(digest.Sum(nil)) } func CalculateChecksumOfFile(path string) (string, error) {