Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8b7f574
fix(flagd): no retry for certain error codes, implement test steps
leakonvalinka Nov 10, 2025
f0a1db2
attempt to handle fatal error
leakonvalinka Nov 14, 2025
654c8da
fix(flagd): update testbed + step, fix event
leakonvalinka Nov 24, 2025
07195a7
adjust rpc resolver
leakonvalinka Dec 12, 2025
ccf5120
Merge branch 'main' into fix/flagd-infinite-connection-retries
leakonvalinka Dec 15, 2025
e6d4057
fix e2e tests
leakonvalinka Dec 17, 2025
75392e6
Merge branch 'main' into fix/flagd-infinite-connection-retries
leakonvalinka Dec 17, 2025
95a880c
clean up
leakonvalinka Dec 17, 2025
45a9822
fatal only on first connection
leakonvalinka Dec 17, 2025
e50aa7f
remove exclusion of sync e2e test tag
leakonvalinka Dec 17, 2025
a636257
add shutdown after fatal, fix tests
leakonvalinka Dec 19, 2025
5794b1a
Merge branch 'main' into fix/flagd-infinite-connection-retries
leakonvalinka Dec 19, 2025
d27e4e9
remove shutdown
leakonvalinka Dec 19, 2025
94c7691
fix lint issues
leakonvalinka Dec 19, 2025
ee42405
fix spotless
leakonvalinka Dec 22, 2025
02539d0
feat(flagd): Communicate Fatal and shutdown connectors
guidobrei Dec 22, 2025
4e32125
fixup: update tests
toddbaert Dec 22, 2025
701069a
fixup: revert rpc test expectations
toddbaert Dec 22, 2025
bdc3e68
fixup: revert enum change
toddbaert Dec 22, 2025
a969488
fixup: test timeout
toddbaert Dec 22, 2025
e05539d
fixup: flaky test and init changedflags
toddbaert Jan 2, 2026
39b59ea
Merge branch 'main' into fix/flagd-infinite-connection-retries
toddbaert Jan 2, 2026
9397ec2
fixup: race condition with FATAL
toddbaert Jan 6, 2026
586b4e1
Merge branch 'main' into fix/flagd-infinite-connection-retries
toddbaert Jan 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

/** Helper class to hold configuration default values. */
Expand Down Expand Up @@ -37,6 +40,7 @@ public final class Config {
static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS";
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
static final String FATAL_STATUS_CODES_ENV_VAR_NAME = "FLAGD_FATAL_STATUS_CODES";
/**
* Environment variable to fetch Provider id.
*
Expand Down Expand Up @@ -93,6 +97,18 @@ static long fallBackToEnvOrDefault(String key, long defaultValue) {
}
}

static List<String> fallBackToEnvOrDefaultList(String key, List<String> defaultValue) {
try {
return System.getenv(key) != null
? Arrays.stream(System.getenv(key).split(","))
.map(String::trim)
.collect(Collectors.toList())
: defaultValue;
} catch (Exception e) {
return defaultValue;
}
}

static Resolver fromValueProvider(Function<String, String> provider) {
final String resolverVar = provider.apply(RESOLVER_ENV_VAR);
if (resolverVar == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.openfeature.contrib.providers.flagd;

import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefault;
import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefaultList;
import static dev.openfeature.contrib.providers.flagd.Config.fromValueProvider;

import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
Expand Down Expand Up @@ -122,6 +123,15 @@ public class FlagdOptions {
@Builder.Default
private int retryGracePeriod =
fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);

/**
* List of grpc response status codes for which the provider transitions into fatal state upon first connection.
* Defaults to empty list
*/
@Builder.Default
private List<String> fatalStatusCodes =
fallBackToEnvOrDefaultList(Config.FATAL_STATUS_CODES_ENV_VAR_NAME, List.of());

/**
* Selector to be used with flag sync gRPC contract.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache;
import dev.openfeature.sdk.ErrorCode;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.EventProvider;
import dev.openfeature.sdk.Hook;
Expand Down Expand Up @@ -192,8 +192,9 @@ EvaluationContext getEnrichedContext() {
}

@SuppressWarnings("checkstyle:fallthrough")
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
log.debug("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent());
private void onProviderEvent(
ProviderEvent providerEvent, ProviderEventDetails providerEventDetails, Structure syncMetadata) {
log.debug("FlagdProviderEvent event {} ", providerEvent);
synchronized (syncResources) {
/*
* We only use Error and Ready as previous states.
Expand All @@ -204,10 +205,10 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
* forward a configuration changed to the ready, if we are not in the ready
* state.
*/
switch (flagdProviderEvent.getEvent()) {
switch (providerEvent) {
case PROVIDER_CONFIGURATION_CHANGED:
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_READY) {
onConfigurationChanged(flagdProviderEvent);
emit(providerEvent, providerEventDetails);
break;
}
// intentional fall through
Expand All @@ -216,33 +217,30 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
* Sync metadata is used to enrich the context, and is immutable in flagd,
* so we only need it to be fetched once at READY.
*/
if (flagdProviderEvent.getSyncMetadata() != null) {
syncResources.setEnrichedContext(contextEnricher.apply(flagdProviderEvent.getSyncMetadata()));
if (syncMetadata != null) {
syncResources.setEnrichedContext(contextEnricher.apply(syncMetadata));
}
onReady();
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY);
break;

case PROVIDER_ERROR:
if (providerEventDetails != null
&& providerEventDetails.getErrorCode() == ErrorCode.PROVIDER_FATAL) {
onFatal();
break;
}

if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) {
onError();
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR);
}
break;

default:
log.warn("Unknown event {}", flagdProviderEvent.getEvent());
log.warn("Unknown event {}", providerEvent);
}
}
}

private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
this.emitProviderConfigurationChanged(ProviderEventDetails.builder()
.flagsChanged(flagdProviderEvent.getFlagsChanged())
.message("configuration changed")
.build());
}

private void onReady() {
if (syncResources.initialize()) {
log.info("Initialized FlagdProvider");
Expand Down Expand Up @@ -284,4 +282,17 @@ private void onError() {
TimeUnit.SECONDS);
}
}

private void onFatal() {
if (errorTask != null && !errorTask.isCancelled()) {
errorTask.cancel(false);
}
this.syncResources.setFatal(true);

this.emitProviderError(ProviderEventDetails.builder()
.errorCode(ErrorCode.PROVIDER_FATAL)
.build());

shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.ImmutableContext;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.exceptions.FatalError;
import dev.openfeature.sdk.exceptions.GeneralError;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -16,8 +17,11 @@ class FlagdProviderSyncResources {
@Setter
private volatile ProviderEvent previousEvent = null;

@Setter
private volatile boolean isFatal;

private volatile EvaluationContext enrichedContext = new ImmutableContext();
private volatile boolean initialized;
private volatile boolean isInitialized;
private volatile boolean isShutDown;

public void setEnrichedContext(EvaluationContext context) {
Expand All @@ -31,32 +35,40 @@ public void setEnrichedContext(EvaluationContext context) {
* @return true iff this was the first call to {@code initialize()}
*/
public synchronized boolean initialize() {
if (this.initialized) {
if (this.isInitialized) {
return false;
}
this.initialized = true;
this.isInitialized = true;
this.isFatal = false;
this.notifyAll();
return true;
}

/**
* Blocks the calling thread until either {@link FlagdProviderSyncResources#initialize()} or
* {@link FlagdProviderSyncResources#shutdown()} is called or the deadline is exceeded, whatever happens first. If
* {@link FlagdProviderSyncResources#initialize()} has been executed before {@code waitForInitialization(long)} is
* called, it will return instantly. If the deadline is exceeded, a GeneralError will be thrown.
* If {@link FlagdProviderSyncResources#shutdown()} is called in the meantime, an {@link IllegalStateException} will
* Blocks the calling thread until either
* {@link FlagdProviderSyncResources#initialize()} or
* {@link FlagdProviderSyncResources#shutdown()} is called or the deadline is
* exceeded, whatever happens first. If
* {@link FlagdProviderSyncResources#initialize()} has been executed before
* {@code waitForInitialization(long)} is
* called, it will return instantly. If the deadline is exceeded, a GeneralError
* will be thrown.
* If {@link FlagdProviderSyncResources#shutdown()} is called in the meantime,
* an {@link IllegalStateException} will
* be thrown. Otherwise, the method will return cleanly.
*
* @param deadline the maximum time in ms to wait
* @throws GeneralError when the deadline is exceeded before
* {@link FlagdProviderSyncResources#initialize()} is called on this object
* @throws IllegalStateException when {@link FlagdProviderSyncResources#shutdown()} is called or has been called on
* this object
* @throws GeneralError when the deadline is exceeded before
* {@link FlagdProviderSyncResources#initialize()} is
* called on this object, or when
* {@link FlagdProviderSyncResources#shutdown()}
* @throws FatalError when the provider has been marked as fatal during
* shutdown
*/
public void waitForInitialization(long deadline) {
long start = System.currentTimeMillis();
long end = start + deadline;
while (!initialized && !isShutDown) {
while (!isInitialized && !isShutDown) {
long now = System.currentTimeMillis();
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
if (now >= end) {
Expand All @@ -68,7 +80,7 @@ public void waitForInitialization(long deadline) {
if (isShutDown) {
break;
}
if (initialized) { // might have changed in the meantime
if (isInitialized) { // might have changed in the meantime
return;
}
try {
Expand All @@ -80,7 +92,11 @@ public void waitForInitialization(long deadline) {
}
}
if (isShutDown) {
throw new IllegalStateException("Already shut down");
String msg = "Already shut down due to previous error.";
if (isFatal) {
throw new FatalError(msg);
}
throw new GeneralError(msg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.FlagStore;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage;
Expand All @@ -20,13 +19,15 @@
import dev.openfeature.sdk.ImmutableMetadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.Reason;
import dev.openfeature.sdk.Structure;
import dev.openfeature.sdk.Value;
import dev.openfeature.sdk.exceptions.GeneralError;
import dev.openfeature.sdk.exceptions.ParseError;
import dev.openfeature.sdk.exceptions.TypeMismatchError;
import dev.openfeature.sdk.internal.TriConsumer;
import java.util.Map;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -38,7 +39,7 @@
@Slf4j
public class InProcessResolver implements Resolver {
private final Storage flagStore;
private final Consumer<FlagdProviderEvent> onConnectionEvent;
private final TriConsumer<ProviderEvent, ProviderEventDetails, Structure> onConnectionEvent;
private final Operator operator;
private final String scope;
private final QueueSource queueSource;
Expand All @@ -52,7 +53,8 @@ public class InProcessResolver implements Resolver {
* @param onConnectionEvent lambda which handles changes in the
* connection/stream
*/
public InProcessResolver(FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
public InProcessResolver(
FlagdOptions options, TriConsumer<ProviderEvent, ProviderEventDetails, Structure> onConnectionEvent) {
this.queueSource = getQueueSource(options);
this.flagStore = new FlagStore(queueSource);
this.onConnectionEvent = onConnectionEvent;
Expand All @@ -73,14 +75,29 @@ public void init() throws Exception {
switch (storageStateChange.getStorageState()) {
case OK:
log.debug("onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
onConnectionEvent.accept(new FlagdProviderEvent(

var eventDetails = ProviderEventDetails.builder()
.flagsChanged(storageStateChange.getChangedFlagsKeys())
.message("configuration changed")
.build();

onConnectionEvent.accept(
ProviderEvent.PROVIDER_CONFIGURATION_CHANGED,
storageStateChange.getChangedFlagsKeys(),
storageStateChange.getSyncMetadata()));
eventDetails,
storageStateChange.getSyncMetadata());

log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
break;
case STALE:
onConnectionEvent.accept(ProviderEvent.PROVIDER_ERROR, null, null);
break;
case ERROR:
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR));
onConnectionEvent.accept(
ProviderEvent.PROVIDER_ERROR,
ProviderEventDetails.builder()
.errorCode(ErrorCode.PROVIDER_FATAL)
.build(),
null);
break;
default:
log.warn(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Structure;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -109,7 +110,7 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
switch (payload.getType()) {
case DATA:
try {
List<String> changedFlagsKeys;
List<String> changedFlagsKeys = Collections.emptyList();
ParsingResult parsingResult = FlagParser.parseString(payload.getFlagData(), throwIfInvalid);
Map<String, FeatureFlag> flagMap = parsingResult.getFlags();
Map<String, Object> flagSetMetadataMap = parsingResult.getFlagSetMetadata();
Expand All @@ -133,13 +134,19 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
// catch all exceptions and avoid stream listener interruptions
log.warn("Invalid flag sync payload from connector", e);
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) {
log.warn("Failed to convey STALE status, queue is full");
log.warn("Failed to convey TRANSIENT_ERROR status, queue is full");
}
}
break;
case ERROR:
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) {
log.warn("Failed to convey TRANSIENT_ERROR status, queue is full");
}
break;
case SHUTDOWN:
shutdown();
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) {
log.warn("Failed to convey ERROR status, queue is full");
log.warn("Failed to convey FATAL_ERROR status, queue is full");
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage;

/** Satus of the storage. */
/** Status of the storage. */
public enum StorageState {
/** Storage is upto date and working as expected. */
OK,
/** Storage has gone stale(most recent sync failed). May get to OK status with next sync. */
/** Storage has gone stale (most recent sync failed). May get to OK status with next sync. */
STALE,
/** Storage is in an unrecoverable error stage. */
ERROR,
Expand Down
Loading
Loading