From edbe2b62d3dcd6b60df819e2fa6992b12638926e Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Sat, 18 Oct 2025 14:56:06 -0400 Subject: [PATCH] removing tombstones if version parsing / comparable versions is not enabled, then the temporary cache will effectively be disabled Signed-off-by: Steve Hawkins --- .../api/config/ConfigurationService.java | 10 +- .../PrimaryUpdateAndCacheUtils.java | 5 + .../source/informer/InformerEventSource.java | 14 +-- .../source/informer/InformerManager.java | 5 + .../source/informer/InformerWrapper.java | 4 + .../informer/ManagedInformerEventSource.java | 29 +++-- .../informer/TemporaryResourceCache.java | 106 +++++------------- .../TemporaryPrimaryResourceCacheTest.java | 45 ++------ 8 files changed, 72 insertions(+), 146 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 6215c20179..0c760982ba 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -491,18 +491,16 @@ default Set> withPreviousAnnotationForDependentReso /** * If the event logic should parse the resourceVersion to determine the ordering of dependent - * resource events. This is typically not needed. + * resource events. * - *

Disabled by default as Kubernetes does not support, and discourages, this interpretation of - * resourceVersions. Enable only if your api server event processing seems to lag the operator - * logic, and you want to further minimize the amount of work done / updates issued by the - * operator. + *

Enabled by default as Kubernetes does support this interpretation of resourceVersions. + * Disable only if your api server provides non comparable resource versions.. * * @return if resource version should be parsed (as integer) * @since 4.5.0 */ default boolean parseResourceVersionsForEventFilteringAndCaching() { - return false; + return true; } /** diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index b4b3405ec4..72f0c8da36 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -451,6 +451,11 @@ public static

P addFinalizerWithSSA( } } + public static int compareResourceVersions(HasMetadata h1, HasMetadata h2) { + return compareResourceVersions( + h1.getMetadata().getResourceVersion(), h2.getMetadata().getResourceVersion()); + } + public static int compareResourceVersions(String v1, String v2) { var v1Length = v1.length(); if (v1Length == 0) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index ec11db25f4..4474a26c08 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -92,7 +92,7 @@ public InformerEventSource( } InformerEventSource(InformerEventSourceConfiguration configuration, KubernetesClient client) { - this(configuration, client, false); + this(configuration, client, true); } @SuppressWarnings({"unchecked", "rawtypes"}) @@ -211,17 +211,7 @@ private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) { if (res.isEmpty()) { return isEventKnownFromAnnotation(newObject, oldObject); } - boolean resVersionsEqual = - newObject - .getMetadata() - .getResourceVersion() - .equals(res.get().getMetadata().getResourceVersion()); - log.debug( - "Resource found in temporal cache for id: {} resource versions equal: {}", - resourceID, - resVersionsEqual); - return resVersionsEqual - || temporaryResourceCache.isLaterResourceVersion(resourceID, res.get(), newObject); + return temporaryResourceCache.isLaterResourceVersion(resourceID, res.get(), newObject); } private boolean isEventKnownFromAnnotation(R newObject, R oldObject) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index abd2b6a752..27ba5f5b16 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -221,6 +221,11 @@ public Optional get(ResourceID resourceID) { : r); } + public Optional getLastSyncResourceVersion(Optional namespace) { + return getSource(namespace.orElse(WATCH_ALL_NAMESPACES)) + .map(source -> source.getLastSyncResourceVersion()); + } + @Override public Stream keys() { return sources.values().stream().flatMap(Cache::keys); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 07db980fcd..096c201c12 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -156,6 +156,10 @@ public Optional get(ResourceID resourceID) { return Optional.ofNullable(cache.getByKey(getKey(resourceID))); } + public String getLastSyncResourceVersion() { + return this.informer.lastSyncResourceVersion(); + } + private String getKey(ResourceID resourceID) { return Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), resourceID.getName()); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 2679918b60..13192499f5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -34,6 +34,7 @@ import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.Informable; import io.javaoperatorsdk.operator.api.config.NamespaceChangeable; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator; @@ -133,19 +134,27 @@ public void handleRecentResourceCreate(ResourceID resourceID, R resource) { @Override public Optional get(ResourceID resourceID) { + var res = cache.get(resourceID); Optional resource = temporaryResourceCache.getResourceFromCache(resourceID); - if (resource.isPresent()) { - log.debug("Resource found in temporary cache for Resource ID: {}", resourceID); + if (resource.isPresent() + && res.filter( + r -> + PrimaryUpdateAndCacheUtils.compareResourceVersions(r, resource.orElseThrow()) + > 0) + .isEmpty()) { + log.debug("Latest resource found in temporary cache for Resource ID: {}", resourceID); return resource; - } else { - log.debug( - "Resource not found in temporary cache reading it from informer cache," - + " for Resource ID: {}", - resourceID); - var res = cache.get(resourceID); - log.debug("Resource found in cache: {} for id: {}", res.isPresent(), resourceID); - return res; } + log.debug( + "Resource not found, or older, in temporary cache. Found in informer cache {}, for" + + " Resource ID: {}", + res.isPresent(), + resourceID); + return res; + } + + public Optional getLastSyncResourceVersion(Optional namespace) { + return cache.getLastSyncResourceVersion(namespace); } @SuppressWarnings("unused") diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 06226ae4ba..19b50718b2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -15,7 +15,6 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -25,6 +24,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -46,53 +46,10 @@ */ public class TemporaryResourceCache { - static class ExpirationCache { - private final LinkedHashMap cache; - private final int ttlMs; - - public ExpirationCache(int maxEntries, int ttlMs) { - this.ttlMs = ttlMs; - this.cache = - new LinkedHashMap<>() { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > maxEntries; - } - }; - } - - public void add(K key) { - clean(); - cache.putIfAbsent(key, System.currentTimeMillis()); - } - - public boolean contains(K key) { - clean(); - return cache.get(key) != null; - } - - void clean() { - if (!cache.isEmpty()) { - long currentTimeMillis = System.currentTimeMillis(); - var iter = cache.entrySet().iterator(); - // the order will already be from oldest to newest, clean a fixed number of entries to - // amortize the cost amongst multiple calls - for (int i = 0; i < 10 && iter.hasNext(); i++) { - var entry = iter.next(); - if (currentTimeMillis - entry.getValue() > ttlMs) { - iter.remove(); - } - } - } - } - } - private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); private final Map cache = new ConcurrentHashMap<>(); - // keep up to the last million deletions for up to 10 minutes - private final ExpirationCache tombstones = new ExpirationCache<>(1000000, 1200000); private final ManagedInformerEventSource managedInformerEventSource; private final boolean parseResourceVersions; @@ -104,7 +61,6 @@ public TemporaryResourceCache( } public synchronized void onDeleteEvent(T resource, boolean unknownState) { - tombstones.add(resource.getMetadata().getUid()); onEvent(resource, unknownState); } @@ -130,37 +86,37 @@ public synchronized void putAddedResource(T newResource) { * @param previousResourceVersion null indicates an add */ public synchronized void putResource(T newResource, String previousResourceVersion) { + if (!parseResourceVersions) { + return; + } + var resourceId = ResourceID.fromResource(newResource); - var cachedResource = managedInformerEventSource.get(resourceId).orElse(null); - boolean moveAhead = false; - if (previousResourceVersion == null && cachedResource == null) { - if (tombstones.contains(newResource.getMetadata().getUid())) { - log.debug( - "Won't resurrect uid {} for resource id: {}", - newResource.getMetadata().getUid(), - resourceId); - return; - } - // we can skip further checks as this is a simple add and there's no previous entry to - // consider - moveAhead = true; + String latest = + managedInformerEventSource + .getLastSyncResourceVersion(resourceId.getNamespace()) + .orElse(null); + if (latest != null + && PrimaryUpdateAndCacheUtils.compareResourceVersions( + latest, newResource.getMetadata().getResourceVersion()) + >= 0) { + log.debug( + "Resource {}: resourceVersion {} is not later than latest {}", + resourceId, + newResource.getMetadata().getResourceVersion(), + latest); + return; } - if (moveAhead - || (cachedResource != null - && (cachedResource - .getMetadata() - .getResourceVersion() - .equals(previousResourceVersion)) - || isLaterResourceVersion(resourceId, newResource, cachedResource))) { + var cachedResource = managedInformerEventSource.get(resourceId).orElse(null); + + if (cachedResource == null + || PrimaryUpdateAndCacheUtils.compareResourceVersions(newResource, cachedResource) > 0) { log.debug( "Temporarily moving ahead to target version {} for resource id: {}", newResource.getMetadata().getResourceVersion(), resourceId); cache.put(resourceId, newResource); - } else if (cache.remove(resourceId) != null) { - log.debug("Removed an obsolete resource from cache for id: {}", resourceId); } } @@ -170,20 +126,8 @@ public synchronized void putResource(T newResource, String previousResourceVersi * cachedResource, otherwise false */ public boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) { - try { - if (parseResourceVersions - && Long.parseLong(newResource.getMetadata().getResourceVersion()) - > Long.parseLong(cachedResource.getMetadata().getResourceVersion())) { - return true; - } - } catch (NumberFormatException e) { - log.debug( - "Could not compare resourceVersions {} and {} for {}", - newResource.getMetadata().getResourceVersion(), - cachedResource.getMetadata().getResourceVersion(), - resourceId); - } - return false; + return parseResourceVersions + && PrimaryUpdateAndCacheUtils.compareResourceVersions(newResource, cachedResource) > 0; } public synchronized Optional getResourceFromCache(ResourceID resourceID) { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java index e3dc2c82e4..87e3e9320b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java @@ -17,9 +17,7 @@ import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,7 +25,6 @@ import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.ExpirationCache; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -46,17 +43,17 @@ class TemporaryPrimaryResourceCacheTest { @BeforeEach void setup() { informerEventSource = mock(InformerEventSource.class); - temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, false); + temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, true); } @Test void updateAddsTheResourceIntoCacheIfTheInformerHasThePreviousResourceVersion() { var testResource = testResource(); var prevTestResource = testResource(); - prevTestResource.getMetadata().setResourceVersion("0"); + prevTestResource.getMetadata().setResourceVersion("1"); when(informerEventSource.get(any())).thenReturn(Optional.of(prevTestResource)); - temporaryResourceCache.putResource(testResource, "0"); + temporaryResourceCache.putResource(testResource, "2"); var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); assertThat(cached).isPresent(); @@ -66,10 +63,10 @@ void updateAddsTheResourceIntoCacheIfTheInformerHasThePreviousResourceVersion() void updateNotAddsTheResourceIntoCacheIfTheInformerHasOtherVersion() { var testResource = testResource(); var informerCachedResource = testResource(); - informerCachedResource.getMetadata().setResourceVersion("x"); + informerCachedResource.getMetadata().setResourceVersion("2"); when(informerEventSource.get(any())).thenReturn(Optional.of(informerCachedResource)); - temporaryResourceCache.putResource(testResource, "0"); + temporaryResourceCache.putResource(testResource, "1"); var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); assertThat(cached).isNotPresent(); @@ -143,41 +140,15 @@ void rapidDeletion() { .endMetadata() .build(), false); + when(informerEventSource.getLastSyncResourceVersion( + Optional.of(testResource.getMetadata().getNamespace()))) + .thenReturn(Optional.of("3")); temporaryResourceCache.putAddedResource(testResource); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isEmpty(); } - @Test - void expirationCacheMax() { - ExpirationCache cache = new ExpirationCache<>(2, Integer.MAX_VALUE); - - cache.add(1); - cache.add(2); - cache.add(3); - - assertThat(cache.contains(1)).isFalse(); - assertThat(cache.contains(2)).isTrue(); - assertThat(cache.contains(3)).isTrue(); - } - - @Test - void expirationCacheTtl() { - ExpirationCache cache = new ExpirationCache<>(2, 1); - - cache.add(1); - cache.add(2); - - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .untilAsserted( - () -> { - assertThat(cache.contains(1)).isFalse(); - assertThat(cache.contains(2)).isFalse(); - }); - } - private ConfigMap propagateTestResourceToCache() { var testResource = testResource(); when(informerEventSource.get(any())).thenReturn(Optional.empty());