Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import java.time.Instant;
Expand Down Expand Up @@ -102,10 +105,35 @@ protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTele
StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName()));
registerListener(this);

this.childrenCache = Caffeine.newBuilder()
long childrenCacheMaxSizeBytes = getChildrenCacheMaxSizeBytes();

Caffeine<Object, Object> childrenCacheBuilder = Caffeine.newBuilder()
.recordStats()
.refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS)
.expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS)
.expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS);
if (childrenCacheMaxSizeBytes > 0) {
childrenCacheBuilder.maximumWeight(childrenCacheMaxSizeBytes)
.weigher((String key, List<String> children) -> {
// calculate the total byte size of the key and entries in the children list
// to get some estimation of the required heap memory required for the entry.
// add 16 bytes overhead for Java object header and 16 bytes for java.lang.String fields.
int totalSize = ByteBufUtil.utf8Bytes(key) + 32;
for (String child : children) {
totalSize += ByteBufUtil.utf8Bytes(child) + 32;
}
return totalSize;
});
}
this.childrenCache = childrenCacheBuilder
.evictionListener(new RemovalListener<String, List<String>>() {
@Override
public void onRemoval(String key, List<String> value, RemovalCause cause) {
if (cause == RemovalCause.SIZE) {
log.warn("[{}] Evicting path {} from children cache because the size of the cache is too "
+ "large. Consider increasing the maximum heap size.", metadataStoreName, key);
}
}
})
.buildAsync(new AsyncCacheLoader<String, List<String>>() {
@Override
public CompletableFuture<List<String>> asyncLoad(String key, Executor executor) {
Expand Down Expand Up @@ -152,6 +180,20 @@ public CompletableFuture<Boolean> asyncReload(String key, Boolean oldValue,
this.metadataStoreStats = new MetadataStoreStats(metadataStoreName, openTelemetry);
}

/**
* Return the maximum size of the children cache in bytes.
* @return maximum size of the children cache in bytes.
*/
protected long getChildrenCacheMaxSizeBytes() {
long heapMaxSizeBytes = Runtime.getRuntime().maxMemory();
// default 20% of max heap size, this should be sufficient to prevent OOME in the use case
// when a lot of namespaces with lots of topics are listed in the metadata store.
long defaultSizeBytes = heapMaxSizeBytes / 5;
// min size 20MB
int minSizeBytes = 1024 * 1024 * 20;
Copy link
Contributor

@Technoboy- Technoboy- Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if it were configurable

return Math.max(defaultSizeBytes, minSizeBytes);
}

@Override
public CompletableFuture<Void> handleMetadataEvent(MetadataEvent event) {
CompletableFuture<Void> result = new CompletableFuture<>();
Expand Down
Loading