From d633f0a778309b273b2ce2d9023c776bbaaff2a6 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Tue, 29 Apr 2025 18:01:56 -0700 Subject: [PATCH 1/3] Fixed segment sync --- .../io/split/client/SplitFactoryImpl.java | 11 +++-- .../SegmentSynchronizationTaskImp.java | 8 +++- .../common/LocalhostSynchronizerTest.java | 17 ++++---- .../split/engine/common/SynchronizerTest.java | 9 +---- .../experiments/SplitFetcherImpTest.java | 2 +- .../engine/experiments/SplitFetcherTest.java | 26 ++++++------ .../SegmentSynchronizationTaskImpTest.java | 20 ++++------ client/src/test/resources/segment_2.json | 10 +++++ client/src/test/resources/split_init.json | 40 ++++++++++++++++++- 9 files changed, 93 insertions(+), 50 deletions(-) create mode 100644 client/src/test/resources/segment_2.json diff --git a/client/src/main/java/io/split/client/SplitFactoryImpl.java b/client/src/main/java/io/split/client/SplitFactoryImpl.java index d2779c666..199d0d1ae 100644 --- a/client/src/main/java/io/split/client/SplitFactoryImpl.java +++ b/client/src/main/java/io/split/client/SplitFactoryImpl.java @@ -69,6 +69,7 @@ import io.split.storages.SplitCacheProducer; import io.split.storages.RuleBasedSegmentCache; import io.split.storages.RuleBasedSegmentCacheProducer; +import io.split.storages.RuleBasedSegmentCacheConsumer; import io.split.storages.enums.OperationMode; import io.split.storages.memory.InMemoryCacheImp; import io.split.storages.memory.SegmentCacheInMemoryImpl; @@ -218,7 +219,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn splitCache, _segmentCache, telemetryStorage, _startTime); // Segments - _segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache); + _segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache, ruleBasedSegmentCache); SplitParser splitParser = new SplitParser(); RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); @@ -420,7 +421,8 @@ protected SplitFactoryImpl(SplitClientConfig config) { segmentCache, _telemetryStorageProducer, _splitCache, - config.getThreadFactory()); + config.getThreadFactory(), + _ruleBasedSegmentCache); // SplitFetcher SplitChangeFetcher splitChangeFetcher = createSplitChangeFetcher(config); @@ -607,7 +609,7 @@ private static HttpClientBuilder setupProxy(HttpClientBuilder httpClientbuilder, private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config, SegmentCacheProducer segmentCacheProducer, - SplitCacheConsumer splitCacheConsumer) throws URISyntaxException { + SplitCacheConsumer splitCacheConsumer, RuleBasedSegmentCacheConsumer ruleBasedSegmentCache) throws URISyntaxException { SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(_splitHttpClient, _rootTarget, _telemetryStorageProducer); @@ -617,7 +619,8 @@ private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config, segmentCacheProducer, _telemetryStorageProducer, splitCacheConsumer, - config.getThreadFactory()); + config.getThreadFactory(), + ruleBasedSegmentCache); } private SplitFetcher buildSplitFetcher(SplitCacheProducer splitCacheProducer, SplitParser splitParser, diff --git a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java index fc7db7a98..efcf5f945 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java +++ b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java @@ -3,6 +3,7 @@ import com.google.common.collect.Maps; import io.split.client.utils.SplitExecutorFactory; import io.split.engine.common.FetchOptions; +import io.split.storages.RuleBasedSegmentCacheConsumer; import io.split.storages.SegmentCacheProducer; import io.split.storages.SplitCacheConsumer; import io.split.telemetry.storage.TelemetryRuntimeProducer; @@ -38,12 +39,14 @@ public class SegmentSynchronizationTaskImp implements SegmentSynchronizationTask private final ScheduledExecutorService _scheduledExecutorService; private final TelemetryRuntimeProducer _telemetryRuntimeProducer; private final SplitCacheConsumer _splitCacheConsumer; + private final RuleBasedSegmentCacheConsumer _ruleBasedSegmentCacheConsumer; private ScheduledFuture _scheduledFuture; public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher, long refreshEveryNSeconds, int numThreads, SegmentCacheProducer segmentCacheProducer, TelemetryRuntimeProducer telemetryRuntimeProducer, - SplitCacheConsumer splitCacheConsumer, ThreadFactory threadFactory) { + SplitCacheConsumer splitCacheConsumer, ThreadFactory threadFactory, + RuleBasedSegmentCacheConsumer ruleBasedSegmentCacheConsumer) { _segmentChangeFetcher = checkNotNull(segmentChangeFetcher); checkArgument(refreshEveryNSeconds >= 0L); @@ -54,6 +57,7 @@ public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher, _segmentCacheProducer = checkNotNull(segmentCacheProducer); _telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer); _splitCacheConsumer = checkNotNull(splitCacheConsumer); + _ruleBasedSegmentCacheConsumer = checkNotNull(ruleBasedSegmentCacheConsumer); } public void initializeSegment(String segmentName) { @@ -137,6 +141,7 @@ public boolean isRunning() { public void fetchAll(boolean addCacheHeader) { _splitCacheConsumer.getSegments().forEach(this::initialize); + _ruleBasedSegmentCacheConsumer.getSegments().forEach(this::initialize); for (Map.Entry entry : _segmentFetchers.entrySet()) { SegmentFetcher fetcher = entry.getValue(); @@ -155,6 +160,7 @@ public void fetchAll(boolean addCacheHeader) { public boolean fetchAllSynchronous() { _splitCacheConsumer.getSegments().forEach(this::initialize); + _ruleBasedSegmentCacheConsumer.getSegments().forEach(this::initialize); List> segmentFetchExecutions = _segmentFetchers.entrySet() .stream().map(e -> _scheduledExecutorService.submit(e.getValue()::runWhitCacheHeader)) .collect(Collectors.toList()); diff --git a/client/src/test/java/io/split/engine/common/LocalhostSynchronizerTest.java b/client/src/test/java/io/split/engine/common/LocalhostSynchronizerTest.java index 7edbea333..04163aedd 100644 --- a/client/src/test/java/io/split/engine/common/LocalhostSynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/LocalhostSynchronizerTest.java @@ -9,10 +9,7 @@ import io.split.engine.experiments.*; import io.split.engine.segments.SegmentChangeFetcher; import io.split.engine.segments.SegmentSynchronizationTaskImp; -import io.split.storages.RuleBasedSegmentCacheProducer; -import io.split.storages.SegmentCacheProducer; -import io.split.storages.SplitCache; -import io.split.storages.SplitCacheProducer; +import io.split.storages.*; import io.split.storages.memory.InMemoryCacheImp; import io.split.storages.memory.RuleBasedSegmentCacheInMemoryImp; import io.split.storages.memory.SegmentCacheInMemoryImpl; @@ -36,11 +33,11 @@ public void testSyncAll(){ InputStreamProvider inputStreamProvider = new FileInputStreamProvider("src/test/resources/split_init.json"); SplitChangeFetcher splitChangeFetcher = new JsonLocalhostSplitChangeFetcher(inputStreamProvider); SplitParser splitParser = new SplitParser(); - RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp(); + RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp(); RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); SplitFetcher splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, TELEMETRY_STORAGE_NOOP, FLAG_SETS_FILTER, - ruleBasedSegmentParser, ruleBasedSegmentCacheProducer); + ruleBasedSegmentParser, ruleBasedSegmentCache); SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCacheProducer, 1000L, null); @@ -48,7 +45,7 @@ public void testSyncAll(){ SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl(); SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer, - TELEMETRY_STORAGE_NOOP, splitCacheProducer, null); + TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache); SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, segmentSynchronizationTaskImp, null, null, null, null); LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, false); @@ -62,11 +59,11 @@ public void testPeriodicFetching() throws InterruptedException { SplitChangeFetcher splitChangeFetcher = Mockito.mock(JsonLocalhostSplitChangeFetcher.class); SplitParser splitParser = new SplitParser(); - RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp(); + RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp(); RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); SplitFetcher splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, TELEMETRY_STORAGE_NOOP, FLAG_SETS_FILTER, - ruleBasedSegmentParser, ruleBasedSegmentCacheProducer); + ruleBasedSegmentParser, ruleBasedSegmentCache); SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCacheProducer, 1000L, null); FetchOptions fetchOptions = new FetchOptions.Builder().build(); @@ -75,7 +72,7 @@ public void testPeriodicFetching() throws InterruptedException { SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl(); SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer, - TELEMETRY_STORAGE_NOOP, splitCacheProducer, null); + TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache); SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, segmentSynchronizationTaskImp, null, null, null, null); LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, true); diff --git a/client/src/test/java/io/split/engine/common/SynchronizerTest.java b/client/src/test/java/io/split/engine/common/SynchronizerTest.java index 5ea05c35a..0ce439c7f 100644 --- a/client/src/test/java/io/split/engine/common/SynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/SynchronizerTest.java @@ -7,12 +7,7 @@ import io.split.client.interceptors.FlagSetsFilterImpl; import io.split.engine.segments.SegmentChangeFetcher; import io.split.engine.segments.SegmentSynchronizationTaskImp; -import io.split.storages.SegmentCache; -import io.split.storages.SegmentCacheProducer; -import io.split.storages.SplitCache; -import io.split.storages.SplitCacheConsumer; -import io.split.storages.SplitCacheProducer; -import io.split.storages.RuleBasedSegmentCacheProducer; +import io.split.storages.*; import io.split.storages.memory.InMemoryCacheImp; import io.split.engine.experiments.FetchResult; import io.split.engine.experiments.SplitFetcherImp; @@ -88,7 +83,7 @@ public void syncAll() throws InterruptedException { public void testSyncAllSegments() throws InterruptedException, NoSuchFieldException, IllegalAccessException { SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(Mockito.mock(SegmentChangeFetcher.class), 20L, 1, _segmentCacheProducer, Mockito.mock(TelemetryRuntimeProducer.class), - Mockito.mock(SplitCacheConsumer.class), null); + Mockito.mock(SplitCacheConsumer.class), null, Mockito.mock(RuleBasedSegmentCache.class)); Field synchronizerSegmentFetcher = SynchronizerImp.class.getDeclaredField("_segmentSynchronizationTaskImp"); synchronizerSegmentFetcher.setAccessible(true); Field modifiersField = Field.class.getDeclaredField("modifiers"); diff --git a/client/src/test/java/io/split/engine/experiments/SplitFetcherImpTest.java b/client/src/test/java/io/split/engine/experiments/SplitFetcherImpTest.java index 3a1b0b993..78b6eac66 100644 --- a/client/src/test/java/io/split/engine/experiments/SplitFetcherImpTest.java +++ b/client/src/test/java/io/split/engine/experiments/SplitFetcherImpTest.java @@ -190,7 +190,7 @@ public void testLocalHost() { FetchResult fetchResult = splitFetcher.forceRefresh(fetchOptions); - Assert.assertEquals(1, fetchResult.getSegments().size()); + Assert.assertEquals(2, fetchResult.getSegments().size()); } @Test diff --git a/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java b/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java index 98845fc62..f2cc0cc4c 100644 --- a/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java +++ b/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java @@ -3,12 +3,10 @@ import com.google.common.collect.Lists; import io.split.client.interceptors.FlagSetsFilter; import io.split.client.interceptors.FlagSetsFilterImpl; -import io.split.storages.RuleBasedSegmentCacheProducer; +import io.split.storages.*; import io.split.storages.memory.InMemoryCacheImp; -import io.split.storages.SegmentCache; import io.split.storages.memory.RuleBasedSegmentCacheInMemoryImp; import io.split.storages.memory.SegmentCacheInMemoryImpl; -import io.split.storages.SplitCache; import io.split.client.dtos.*; import io.split.engine.ConditionsTestUtil; import io.split.engine.common.FetchOptions; @@ -157,14 +155,14 @@ public void whenParserFailsWeRemoveTheExperiment() throws InterruptedException { SegmentCache segmentCache = new SegmentCacheInMemoryImpl(); SplitCache cache = new InMemoryCacheImp(-1, FLAG_SETS_FILTER); - RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp(); + RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp(); RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class); - SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null); + SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null, ruleBasedSegmentCache); segmentSynchronizationTask.start(); SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER, - ruleBasedSegmentParser, ruleBasedSegmentCacheProducer); + ruleBasedSegmentParser, ruleBasedSegmentCache); // execute the fetcher for a little bit. @@ -182,14 +180,14 @@ public void ifThereIsAProblemTalkingToSplitChangeCountDownLatchIsNotDecremented( SplitChangeFetcher splitChangeFetcher = mock(SplitChangeFetcher.class); when(splitChangeFetcher.fetch(-1L, -1, new FetchOptions.Builder().build())).thenThrow(new RuntimeException()); SegmentCache segmentCache = new SegmentCacheInMemoryImpl(); - RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp(); + RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp(); RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class); - SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null); + SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null, ruleBasedSegmentCache); segmentSynchronizationTask.start(); SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER, - ruleBasedSegmentParser, ruleBasedSegmentCacheProducer); + ruleBasedSegmentParser, ruleBasedSegmentCache); // execute the fetcher for a little bit. executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS); @@ -224,11 +222,11 @@ public void addFeatureFlags() throws InterruptedException { SplitChangeFetcher splitChangeFetcher = mock(SplitChangeFetcher.class); when(splitChangeFetcher.fetch(Mockito.eq(-1L), Mockito.eq(-1L), Mockito.any())).thenReturn(validReturn); - RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp(); + RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp(); RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(new HashSet<>(Arrays.asList("set_1", "set_2"))); SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, flagSetsFilter, - ruleBasedSegmentParser, ruleBasedSegmentCacheProducer); + ruleBasedSegmentParser, ruleBasedSegmentCache); executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS); @@ -282,16 +280,16 @@ public void worksWithUserDefinedSegments() throws Exception { AChangePerCallSplitChangeFetcher experimentChangeFetcher = new AChangePerCallSplitChangeFetcher(segmentName); SplitCache cache = new InMemoryCacheImp(startingChangeNumber, FLAG_SETS_FILTER); SegmentCache segmentCache = new SegmentCacheInMemoryImpl(); - RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp(); + RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp(); RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class); SegmentChange segmentChange = getSegmentChange(0L, 0L, segmentName); when(segmentChangeFetcher.fetch(anyString(), anyLong(), any())).thenReturn(segmentChange); - SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, Mockito.mock(TelemetryStorage.class), cache, null); + SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, Mockito.mock(TelemetryStorage.class), cache, null, ruleBasedSegmentCache); segmentSynchronizationTask.start(); SplitFetcherImp fetcher = new SplitFetcherImp(experimentChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER, - ruleBasedSegmentParser, ruleBasedSegmentCacheProducer); + ruleBasedSegmentParser, ruleBasedSegmentCache); // execute the fetcher for a little bit. executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS); diff --git a/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java b/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java index ae33691e3..f6f7f04f4 100644 --- a/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java +++ b/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java @@ -10,10 +10,7 @@ import io.split.client.utils.StaticContentInputStreamProvider; import io.split.engine.common.FetchOptions; import io.split.engine.experiments.*; -import io.split.storages.RuleBasedSegmentCacheProducer; -import io.split.storages.SegmentCacheProducer; -import io.split.storages.SplitCache; -import io.split.storages.SplitCacheConsumer; +import io.split.storages.*; import io.split.storages.memory.InMemoryCacheImp; import io.split.storages.memory.RuleBasedSegmentCacheInMemoryImp; import io.split.storages.memory.SegmentCacheInMemoryImpl; @@ -68,7 +65,7 @@ public void works() { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1, segmentCacheProducer, - TELEMETRY_STORAGE, Mockito.mock(SplitCacheConsumer.class), null); + TELEMETRY_STORAGE, Mockito.mock(SplitCacheConsumer.class), null, Mockito.mock(RuleBasedSegmentCache.class)); // create two tasks that will separately call segment and make sure @@ -113,7 +110,7 @@ public void testFetchAllAsynchronousAndGetFalse() throws NoSuchFieldException, I SegmentFetcherImp segmentFetcher = Mockito.mock(SegmentFetcherImp.class); _segmentFetchers.put("SF", segmentFetcher); final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1, - segmentCacheProducer, TELEMETRY_STORAGE, Mockito.mock(SplitCacheConsumer.class), null); + segmentCacheProducer, TELEMETRY_STORAGE, Mockito.mock(SplitCacheConsumer.class), null, Mockito.mock(RuleBasedSegmentCache.class)); Mockito.when(segmentFetcher.runWhitCacheHeader()).thenReturn(false); Mockito.when(segmentFetcher.fetch(Mockito.anyObject())).thenReturn(false); @@ -137,7 +134,7 @@ public void testFetchAllAsynchronousAndGetTrue() throws NoSuchFieldException, Il SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentFetcherImp segmentFetcher = Mockito.mock(SegmentFetcherImp.class); final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1, segmentCacheProducer, - TELEMETRY_STORAGE, Mockito.mock(SplitCacheConsumer.class), null); + TELEMETRY_STORAGE, Mockito.mock(SplitCacheConsumer.class), null, Mockito.mock(RuleBasedSegmentCache.class)); // Before executing, we'll update the map of segmentFecthers via reflection. Field segmentFetchersForced = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers"); @@ -152,8 +149,6 @@ public void testFetchAllAsynchronousAndGetTrue() throws NoSuchFieldException, Il Assert.assertEquals(true, fetch); } - // TODO: Enable the test when Localhost support sppec 1.3 - @Ignore @Test public void testLocalhostSegmentChangeFetcher() throws InterruptedException, FileNotFoundException { FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(new HashSet<>()); @@ -164,11 +159,11 @@ public void testLocalhostSegmentChangeFetcher() throws InterruptedException, Fil SplitChangeFetcher splitChangeFetcher = new JsonLocalhostSplitChangeFetcher(inputStreamProvider); SplitParser splitParser = new SplitParser(); FetchOptions fetchOptions = new FetchOptions.Builder().build(); - RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp(); + RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp(); RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); SplitFetcher splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, TELEMETRY_STORAGE_NOOP, flagSetsFilter, - ruleBasedSegmentParser, ruleBasedSegmentCacheProducer); + ruleBasedSegmentParser, ruleBasedSegmentCache); SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCacheProducer, 1000, null); @@ -180,12 +175,13 @@ public void testLocalhostSegmentChangeFetcher() throws InterruptedException, Fil SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl(); SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer, - TELEMETRY_STORAGE_NOOP, splitCacheProducer, null); + TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache); segmentSynchronizationTaskImp.start(); Thread.sleep(2000); Mockito.verify(segmentChangeFetcher, Mockito.times(1)).fetch("segment_1",-1, fetchOptions); + Mockito.verify(segmentChangeFetcher, Mockito.times(1)).fetch("segment_2",-1, fetchOptions); } } \ No newline at end of file diff --git a/client/src/test/resources/segment_2.json b/client/src/test/resources/segment_2.json new file mode 100644 index 000000000..7e8c81e79 --- /dev/null +++ b/client/src/test/resources/segment_2.json @@ -0,0 +1,10 @@ +{ + "name": "segment_2", + "added": [ + "user1", + "user4" + ], + "removed": [], + "since": -1, + "till": 1585948850110 +} \ No newline at end of file diff --git a/client/src/test/resources/split_init.json b/client/src/test/resources/split_init.json index 6b5abb671..01d57975a 100644 --- a/client/src/test/resources/split_init.json +++ b/client/src/test/resources/split_init.json @@ -564,4 +564,42 @@ ], "s": -1, "t": 1660326991072 -}, "rbs":{"d": [], "s": -1, "t": -1}} \ No newline at end of file +}, "rbs":{"d": [ + { + "changeNumber": 5, + "name": "sample_rule_based_segment", + "status": "ACTIVE", + "trafficTypeName": "user", + "excluded":{ + "keys":["mauro@split.io","gaston@split.io"], + "segments":[] + }, + "conditions": [ + { + "conditionType": "ROLLOUT", + "matcherGroup": { + "combiner": "AND", + "matchers": [ + { + "keySelector": { + "trafficType": "user", + "attribute": null + }, + "matcherType": "IN_SEGMENT", + "negate": false, + "userDefinedSegmentMatcherData": { + "segmentName": "segment_2" + }, + "whitelistMatcherData": null, + "unaryNumericMatcherData": null, + "betweenMatcherData": null, + "booleanMatcherData": null, + "dependencyMatcherData": null, + "stringMatcherData": null + } + ] + } + } + ] + } +], "s": -1, "t": -1}} \ No newline at end of file From 6b51bcbe79ffeeb9907efbae338908dcb4377b8e Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany <41021307+chillaq@users.noreply.github.com> Date: Wed, 30 Apr 2025 08:06:03 -0700 Subject: [PATCH 2/3] Update client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java Co-authored-by: gthea --- .../split/engine/segments/SegmentSynchronizationTaskImp.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java index efcf5f945..b83d319e2 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java +++ b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java @@ -140,8 +140,8 @@ public boolean isRunning() { } public void fetchAll(boolean addCacheHeader) { - _splitCacheConsumer.getSegments().forEach(this::initialize); - _ruleBasedSegmentCacheConsumer.getSegments().forEach(this::initialize); + Set names = getSegmentNames(); + names.forEach(this::initialize); for (Map.Entry entry : _segmentFetchers.entrySet()) { SegmentFetcher fetcher = entry.getValue(); From 58a5be5937012a3e5fa8553770a08ab3299aa88c Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Wed, 30 Apr 2025 08:51:34 -0700 Subject: [PATCH 3/3] added getSegmentNames method --- .../engine/segments/SegmentSynchronizationTaskImp.java | 9 +++++++++ .../io/split/engine/sse/NotificationProcessorImp.java | 6 ++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java index b83d319e2..48877ea3e 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java +++ b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java @@ -11,8 +11,10 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -198,4 +200,11 @@ private void initialize(String segmentName) { _segmentFetchers.putIfAbsent(segmentName, segment); } } + + private Set getSegmentNames() { + Set names = new HashSet<>(_splitCacheConsumer.getSegments()); + names.addAll(_ruleBasedSegmentCacheConsumer.getSegments()); + + return names; + } } \ No newline at end of file diff --git a/client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java b/client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java index 7e38cbadc..b833efc31 100644 --- a/client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java +++ b/client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java @@ -1,11 +1,13 @@ package io.split.engine.sse; import com.google.common.annotations.VisibleForTesting; +import io.split.client.dtos.Split; import io.split.engine.sse.dtos.GenericNotificationData; import io.split.engine.sse.dtos.IncomingNotification; import io.split.engine.sse.dtos.SplitKillNotification; import io.split.engine.sse.dtos.StatusNotification; import io.split.engine.sse.dtos.SegmentQueueDto; +import io.split.engine.sse.dtos.CommonChangeNotification; import io.split.engine.sse.workers.FeatureFlagsWorker; import io.split.engine.sse.workers.Worker; @@ -42,10 +44,10 @@ public void process(IncomingNotification notification) { @Override public void processSplitKill(SplitKillNotification splitKillNotification) { _featureFlagsWorker.kill(splitKillNotification); - _featureFlagsWorker.addToQueue(new SplitKillNotification(GenericNotificationData.builder() + _featureFlagsWorker.addToQueue(new CommonChangeNotification<>(GenericNotificationData.builder() .changeNumber(splitKillNotification.getChangeNumber()) .channel(splitKillNotification.getChannel()) - .build())); + .build(), Split.class)); } @Override