Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/101-monitor-pods/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
FROM ghcr.io/flant/shell-operator:latest
FROM shell:v2
ADD hooks /hooks
7 changes: 5 additions & 2 deletions examples/101-monitor-pods/hooks/pods-hook.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ configVersion: v1
kubernetes:
- apiVersion: v1
kind: Pod
jqFilter: "."
executeHookOnEvent:
- Added
- Added
- Modified
- Deleted
EOF
else
type=$(jq -r '.[0].type' $BINDING_CONTEXT_PATH)
if [[ $type == "Event" ]] ; then
podName=$(jq -r '.[0].object.metadata.name' $BINDING_CONTEXT_PATH)
echo "Pod '${podName}' added"
echo "Pod '${podName}' modified"
fi
fi
4 changes: 2 additions & 2 deletions examples/101-monitor-pods/shell-operator-pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ metadata:
spec:
containers:
- name: shell-operator
image: registry.mycompany.com/shell-operator:monitor-pods
imagePullPolicy: Always
image: monitor-pods:v3
imagePullPolicy: Never
serviceAccountName: monitor-pods-acc
6 changes: 3 additions & 3 deletions examples/101-monitor-pods/shell-operator-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
name: monitor-pods-acc

---
apiVersion: rbac.authorization.k8s.io/v1beta1
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: monitor-pods
Expand All @@ -15,7 +15,7 @@ rules:
verbs: ["get", "watch", "list"]

---
apiVersion: rbac.authorization.k8s.io/v1beta1
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: monitor-pods
Expand All @@ -26,4 +26,4 @@ roleRef:
subjects:
- kind: ServiceAccount
name: monitor-pods-acc
namespace: example-monitor-pods
namespace: default
10 changes: 8 additions & 2 deletions pkg/hook/controller/kubernetes_bindings_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,21 +316,27 @@ func (c *kubernetesBindingsController) SnapshotsInfo() []string {
}

func (c *kubernetesBindingsController) SnapshotsDump() map[string]interface{} {
fmt.Println("[SNAPSHOT] kubernetesBindingsController.SnapshotsDump: called")
dumps := make(map[string]interface{})
bindingCount := 0
for _, binding := range c.KubernetesBindings {
monitorID := binding.Monitor.Metadata.MonitorId
if c.kubeEventsManager.HasMonitor(monitorID) {
total, last := c.kubeEventsManager.GetMonitor(monitorID).SnapshotOperations()
snapshot := c.kubeEventsManager.GetMonitor(monitorID).Snapshot()
fmt.Println("[SNAPSHOT] SnapshotsDump: binding=", binding.BindingName, "snapshot size=", len(snapshot))
fmt.Println("[SNAPSHOT] SnapshotsDump: binding=", binding.BindingName, "operations since start:", total, "since last:", last)
dumps[binding.BindingName] = map[string]interface{}{
"snapshot": c.kubeEventsManager.GetMonitor(monitorID).Snapshot(),
"snapshot": snapshot,
"operations": map[string]interface{}{
"sinceStart": total,
"sinceLastExecution": last,
},
}
bindingCount++
}
}

fmt.Println("[SNAPSHOT] kubernetesBindingsController.SnapshotsDump: total bindings in dump:", bindingCount)
return dumps
}

Expand Down
1 change: 0 additions & 1 deletion pkg/kube_events_manager/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna

if !informer.HasSynced() {
go informer.Run(factory.ctx.Done())

if err := wait.PollUntilContextCancel(ctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) {
return informer.HasSynced(), nil
}); err != nil {
Expand Down
28 changes: 20 additions & 8 deletions pkg/kube_events_manager/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,21 +274,33 @@ func (m *monitor) CreateInformers() error {

// Snapshot returns all existed objects from all created informers
func (m *monitor) Snapshot() []kemtypes.ObjectAndFilterResult {
fmt.Println("[SNAPSHOT] monitor.Snapshot: called for monitor:", m.Name)
objects := make([]kemtypes.ObjectAndFilterResult, 0)

for _, informer := range m.ResourceInformers {
objects = append(objects, informer.getCachedObjects()...)
fmt.Println("[SNAPSHOT] monitor.Snapshot: ResourceInformers count:", len(m.ResourceInformers))
for idx, informer := range m.ResourceInformers {
objs := informer.getCachedObjects()
fmt.Println("[SNAPSHOT] monitor.Snapshot: ResourceInformer #", idx, "returned", len(objs), "objects")
for _, obj := range objs {
fmt.Println("[SNAPSHOT] monitor.Snapshot: ResourceInformer #", idx, "object:", obj.Metadata.ResourceId)
}
objects = append(objects, objs...)
}

countVarying := 0
m.VaryingInformers.RangeValue(func(value []*resourceInformer) {
for _, informer := range value {
objects = append(objects, informer.getCachedObjects()...)
fmt.Println("[SNAPSHOT] monitor.Snapshot: VaryingInformers group #", countVarying, "count:", len(value))
for idx, informer := range value {
objs := informer.getCachedObjects()
fmt.Println("[SNAPSHOT] monitor.Snapshot: VaryingInformer #", countVarying, "/", idx, "returned", len(objs), "objects")
for _, obj := range objs {
fmt.Println("[SNAPSHOT] monitor.Snapshot: VaryingInformer #", countVarying, "/", idx, "object:", obj.Metadata.ResourceId)
}
objects = append(objects, objs...)
}
countVarying++
})

fmt.Println("[SNAPSHOT] monitor.Snapshot: total objects in snapshot:", len(objects))
// Sort objects by namespace and name
sort.Sort(kemtypes.ByNamespaceAndName(objects))

return objects
}

Expand Down
109 changes: 75 additions & 34 deletions pkg/kube_events_manager/resource_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,18 @@ func (ei *resourceInformer) createSharedInformer() error {
// Snapshot returns all cached objects for this informer
func (ei *resourceInformer) getCachedObjects() []kemtypes.ObjectAndFilterResult {
ei.cacheLock.RLock()
fmt.Println("[CACHE] getCachedObjects: reading all cached objects, count:", len(ei.cachedObjects))
res := make([]kemtypes.ObjectAndFilterResult, 0)
for _, obj := range ei.cachedObjects {
for k, obj := range ei.cachedObjects {
fmt.Println("[CACHE] getCachedObjects: key=", k, "object=", obj)
res = append(res, *obj)
}
ei.cacheLock.RUnlock()

// Reset eventBuf if needed.
ei.eventBufLock.Lock()
if !ei.eventCbEnabled {
fmt.Println("[CACHE] getCachedObjects: eventCb not enabled, resetting eventBuf")
ei.eventBuf = nil
}
ei.eventBufLock.Unlock()
Expand Down Expand Up @@ -226,7 +229,9 @@ func (ei *resourceInformer) loadExistedObjects() error {
defer measure.Duration(func(d time.Duration) {
ei.metricStorage.HistogramObserve("{PREFIX}kube_jq_filter_duration_seconds", d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil)
})()

filter := jq.NewFilter()

objFilterRes, err = applyFilter(ei.Monitor.JqFilter, filter, ei.Monitor.FilterFunc, &obj)
}()

Expand All @@ -239,19 +244,17 @@ func (ei *resourceInformer) loadExistedObjects() error {
}

filteredObjects[objFilterRes.Metadata.ResourceId] = objFilterRes

log.Debug("initial list: cached with checksum",
slog.String("debugName", ei.Monitor.Metadata.DebugName),
slog.String("resourceId", objFilterRes.Metadata.ResourceId),
slog.String("checksum", objFilterRes.Metadata.Checksum))
fmt.Println("[CACHE] loadExistedObjects: will add object to cache, key=", objFilterRes.Metadata.ResourceId, "object=", objFilterRes)
}

// Save objects to the cache.
ei.cacheLock.Lock()
defer ei.cacheLock.Unlock()
for k, v := range filteredObjects {
fmt.Println("[CACHE] loadExistedObjects: writing to cache, key=", k, "object=", v)
ei.cachedObjects[k] = v
}
fmt.Println("[CACHE] loadExistedObjects: cache now has", len(ei.cachedObjects), "objects")
ei.cacheLock.Unlock()

ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects))
ei.metricStorage.GaugeSet("{PREFIX}kube_snapshot_objects", float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels)
Expand Down Expand Up @@ -327,49 +330,35 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty
case kemtypes.WatchEventAdded:
fallthrough
case kemtypes.WatchEventModified:
// Update object in cache
ei.cacheLock.Lock()
cachedObject, objectInCache := ei.cachedObjects[resourceId]
skipEvent := false
if objectInCache && cachedObject.Metadata.Checksum == objFilterRes.Metadata.Checksum {
// update object in cache and do not send event
log.Debug("skip KubeEvent",
slog.String("debugName", ei.Monitor.Metadata.DebugName),
slog.String("eventType", string(eventType)),
slog.String("resourceId", resourceId),
)
fmt.Println("[CACHE] handleWatchEvent:", eventType, "skipping event for object with same checksum, key=", resourceId)
skipEvent = true
}
ei.cachedObjects[resourceId] = objFilterRes
// Update cached objects info.
ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects))
if eventType == kemtypes.WatchEventAdded {
ei.cachedObjectsInfo.Added++
ei.cachedObjectsIncrement.Added++
if objectInCache {
fmt.Println("[CACHE] handleWatchEvent:", eventType, "update existing object in cache, key=", resourceId)
} else {
ei.cachedObjectsInfo.Modified++
ei.cachedObjectsIncrement.Modified++
fmt.Println("[CACHE] handleWatchEvent:", eventType, "add new object to cache, key=", resourceId)
}
// Update metrics.
ei.metricStorage.GaugeSet("{PREFIX}kube_snapshot_objects", float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels)
ei.cachedObjects[resourceId] = objFilterRes
fmt.Println("[CACHE] handleWatchEvent:", eventType, "cache now has", len(ei.cachedObjects), "objects")
ei.cacheLock.Unlock()
if skipEvent {
return
}

case kemtypes.WatchEventDeleted:
ei.cacheLock.Lock()
delete(ei.cachedObjects, resourceId)
// Update cached objects info.
ei.cachedObjectsInfo.Count = uint64(len(ei.cachedObjects))
if ei.cachedObjectsInfo.Count == 0 {
ei.cachedObjectsInfo.Cleaned++
ei.cachedObjectsIncrement.Cleaned++
_, existed := ei.cachedObjects[resourceId]
if existed {
fmt.Println("[CACHE] handleWatchEvent: Deleted, removing object from cache, key=", resourceId)
delete(ei.cachedObjects, resourceId)
fmt.Println("[CACHE] handleWatchEvent: Deleted, cache now has", len(ei.cachedObjects), "objects")
} else {
fmt.Println("[CACHE] handleWatchEvent: Deleted, object not found in cache, key=", resourceId)
}
ei.cachedObjectsInfo.Deleted++
ei.cachedObjectsIncrement.Deleted++
// Update metrics.
ei.metricStorage.GaugeSet("{PREFIX}kube_snapshot_objects", float64(len(ei.cachedObjects)), ei.Monitor.Metadata.MetricLabels)
ei.cacheLock.Unlock()
}

Expand Down Expand Up @@ -458,6 +447,27 @@ func (ei *resourceInformer) start() {
}
}()

go func() {
fmt.Println("[DANGLING] LogDanglingObjects ticker goroutine started for informer:", ei.Monitor.Metadata.DebugName)
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ei.ctx.Done():
fmt.Println("[DANGLING] LogDanglingObjects ticker goroutine stopped for informer:", ei.Monitor.Metadata.DebugName)
return
case <-ticker.C:
fmt.Println("[DANGLING] LogDanglingObjects ticker tick for informer:", ei.Monitor.Metadata.DebugName)
dangling := ei.LogDanglingObjects()
if len(dangling) == 0 {
fmt.Println("[DANGLING] LogDanglingObjects ticker: SUCCESS — no dangling objects for informer:", ei.Monitor.Metadata.DebugName)
} else {
fmt.Println("[DANGLING] LogDanglingObjects ticker: found", len(dangling), "dangling objects for informer:", ei.Monitor.Metadata.DebugName, dangling)
}
}
}
}()

// TODO: separate handler and informer
errorHandler := newWatchErrorHandler(ei.Monitor.Metadata.DebugName, ei.Monitor.Kind, ei.Monitor.Metadata.LogLabels, ei.metricStorage, ei.logger.Named("watch-error-handler"))
err := DefaultFactoryStore.Start(ei.ctx, ei.id, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler)
Expand Down Expand Up @@ -489,3 +499,34 @@ func (ei *resourceInformer) getCachedObjectsInfoIncrement() CachedObjectsInfo {
ei.cachedObjectsIncrement = &CachedObjectsInfo{}
return info
}

func (ei *resourceInformer) LogDanglingObjects() []string {
fmt.Println("[DANGLING] LogDanglingObjects: start for informer:", ei.Monitor.Metadata.DebugName)
objList, err := ei.KubeClient.Dynamic().
Resource(ei.GroupVersionResource).
Namespace(ei.Namespace).
List(context.TODO(), ei.ListOptions)
if err != nil {
fmt.Println("[DANGLING] LogDanglingObjects: error listing objects from API:", err)
return nil
}
actual := make(map[string]bool)
for _, item := range objList.Items {
actual[resourceId(&item)] = true
}
ei.cacheLock.RLock()
dangling := []string{}
for k := range ei.cachedObjects {
if !actual[k] {
dangling = append(dangling, k)
}
}
fmt.Println("[DANGLING] LogDanglingObjects: informer:", ei.Monitor.Metadata.DebugName, "cache size:", len(ei.cachedObjects), "actual in API:", len(actual))
if len(dangling) > 0 {
fmt.Println("[DANGLING] LogDanglingObjects: found", len(dangling), "dangling objects:", dangling)
} else {
fmt.Println("[DANGLING] LogDanglingObjects: SUCCESS — no dangling objects found!")
}
ei.cacheLock.RUnlock()
return dangling
}
10 changes: 6 additions & 4 deletions pkg/utils/checksum/checksum.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package checksum

import (
"crypto/md5"
"encoding/hex"
"os"
"path/filepath"
"sort"

"github.com/cespare/xxhash/v2"
)

func CalculateChecksum(stringArr ...string) string {
hasher := md5.New()
digest := xxhash.New()
sort.Strings(stringArr)
for _, value := range stringArr {
_, _ = hasher.Write([]byte(value))
_, _ = digest.Write([]byte(value))
}
return hex.EncodeToString(hasher.Sum(nil))

return hex.EncodeToString(digest.Sum(nil))
}

func CalculateChecksumOfFile(path string) (string, error) {
Expand Down
Loading