Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import io.split.storages.SplitCacheProducer;
import io.split.storages.RuleBasedSegmentCache;
import io.split.storages.RuleBasedSegmentCacheProducer;
import io.split.storages.RuleBasedSegmentCacheConsumer;
import io.split.storages.enums.OperationMode;
import io.split.storages.memory.InMemoryCacheImp;
import io.split.storages.memory.SegmentCacheInMemoryImpl;
Expand Down Expand Up @@ -218,7 +219,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
splitCache, _segmentCache, telemetryStorage, _startTime);

// Segments
_segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache);
_segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache, ruleBasedSegmentCache);

SplitParser splitParser = new SplitParser();
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
Expand Down Expand Up @@ -420,7 +421,8 @@ protected SplitFactoryImpl(SplitClientConfig config) {
segmentCache,
_telemetryStorageProducer,
_splitCache,
config.getThreadFactory());
config.getThreadFactory(),
_ruleBasedSegmentCache);

// SplitFetcher
SplitChangeFetcher splitChangeFetcher = createSplitChangeFetcher(config);
Expand Down Expand Up @@ -607,7 +609,7 @@ private static HttpClientBuilder setupProxy(HttpClientBuilder httpClientbuilder,

private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config,
SegmentCacheProducer segmentCacheProducer,
SplitCacheConsumer splitCacheConsumer) throws URISyntaxException {
SplitCacheConsumer splitCacheConsumer, RuleBasedSegmentCacheConsumer ruleBasedSegmentCache) throws URISyntaxException {
SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(_splitHttpClient, _rootTarget,
_telemetryStorageProducer);

Expand All @@ -617,7 +619,8 @@ private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config,
segmentCacheProducer,
_telemetryStorageProducer,
splitCacheConsumer,
config.getThreadFactory());
config.getThreadFactory(),
ruleBasedSegmentCache);
}

private SplitFetcher buildSplitFetcher(SplitCacheProducer splitCacheProducer, SplitParser splitParser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
import com.google.common.collect.Maps;
import io.split.client.utils.SplitExecutorFactory;
import io.split.engine.common.FetchOptions;
import io.split.storages.RuleBasedSegmentCacheConsumer;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheConsumer;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -38,12 +41,14 @@ public class SegmentSynchronizationTaskImp implements SegmentSynchronizationTask
private final ScheduledExecutorService _scheduledExecutorService;
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
private final SplitCacheConsumer _splitCacheConsumer;
private final RuleBasedSegmentCacheConsumer _ruleBasedSegmentCacheConsumer;

private ScheduledFuture<?> _scheduledFuture;

public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher, long refreshEveryNSeconds, int numThreads,
SegmentCacheProducer segmentCacheProducer, TelemetryRuntimeProducer telemetryRuntimeProducer,
SplitCacheConsumer splitCacheConsumer, ThreadFactory threadFactory) {
SplitCacheConsumer splitCacheConsumer, ThreadFactory threadFactory,
RuleBasedSegmentCacheConsumer ruleBasedSegmentCacheConsumer) {
_segmentChangeFetcher = checkNotNull(segmentChangeFetcher);

checkArgument(refreshEveryNSeconds >= 0L);
Expand All @@ -54,6 +59,7 @@ public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher,
_segmentCacheProducer = checkNotNull(segmentCacheProducer);
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
_splitCacheConsumer = checkNotNull(splitCacheConsumer);
_ruleBasedSegmentCacheConsumer = checkNotNull(ruleBasedSegmentCacheConsumer);
}

public void initializeSegment(String segmentName) {
Expand Down Expand Up @@ -136,7 +142,8 @@ public boolean isRunning() {
}

public void fetchAll(boolean addCacheHeader) {
_splitCacheConsumer.getSegments().forEach(this::initialize);
Set<String> names = getSegmentNames();
names.forEach(this::initialize);
for (Map.Entry<String, SegmentFetcher> entry : _segmentFetchers.entrySet()) {
SegmentFetcher fetcher = entry.getValue();

Expand All @@ -155,6 +162,7 @@ public void fetchAll(boolean addCacheHeader) {

public boolean fetchAllSynchronous() {
_splitCacheConsumer.getSegments().forEach(this::initialize);
_ruleBasedSegmentCacheConsumer.getSegments().forEach(this::initialize);
List<Future<Boolean>> segmentFetchExecutions = _segmentFetchers.entrySet()
.stream().map(e -> _scheduledExecutorService.submit(e.getValue()::runWhitCacheHeader))
.collect(Collectors.toList());
Expand Down Expand Up @@ -192,4 +200,11 @@ private void initialize(String segmentName) {
_segmentFetchers.putIfAbsent(segmentName, segment);
}
}

private Set<String> getSegmentNames() {
Set<String> names = new HashSet<>(_splitCacheConsumer.getSegments());
names.addAll(_ruleBasedSegmentCacheConsumer.getSegments());

return names;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.split.engine.sse;

import com.google.common.annotations.VisibleForTesting;
import io.split.client.dtos.Split;
import io.split.engine.sse.dtos.GenericNotificationData;
import io.split.engine.sse.dtos.IncomingNotification;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.dtos.StatusNotification;
import io.split.engine.sse.dtos.SegmentQueueDto;
import io.split.engine.sse.dtos.CommonChangeNotification;
import io.split.engine.sse.workers.FeatureFlagsWorker;
import io.split.engine.sse.workers.Worker;

Expand Down Expand Up @@ -42,10 +44,10 @@ public void process(IncomingNotification notification) {
@Override
public void processSplitKill(SplitKillNotification splitKillNotification) {
_featureFlagsWorker.kill(splitKillNotification);
_featureFlagsWorker.addToQueue(new SplitKillNotification(GenericNotificationData.builder()
_featureFlagsWorker.addToQueue(new CommonChangeNotification<>(GenericNotificationData.builder()
.changeNumber(splitKillNotification.getChangeNumber())
.channel(splitKillNotification.getChannel())
.build()));
.build(), Split.class));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
import io.split.engine.experiments.*;
import io.split.engine.segments.SegmentChangeFetcher;
import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.storages.RuleBasedSegmentCacheProducer;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCache;
import io.split.storages.SplitCacheProducer;
import io.split.storages.*;
import io.split.storages.memory.InMemoryCacheImp;
import io.split.storages.memory.RuleBasedSegmentCacheInMemoryImp;
import io.split.storages.memory.SegmentCacheInMemoryImpl;
Expand All @@ -36,19 +33,19 @@ public void testSyncAll(){
InputStreamProvider inputStreamProvider = new FileInputStreamProvider("src/test/resources/split_init.json");
SplitChangeFetcher splitChangeFetcher = new JsonLocalhostSplitChangeFetcher(inputStreamProvider);
SplitParser splitParser = new SplitParser();
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();

SplitFetcher splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, TELEMETRY_STORAGE_NOOP, FLAG_SETS_FILTER,
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
ruleBasedSegmentParser, ruleBasedSegmentCache);

SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCacheProducer, 1000L, null);

SegmentChangeFetcher segmentChangeFetcher = new LocalhostSegmentChangeFetcher("src/test/resources/");
SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl();

SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer,
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null);
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache);
SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, segmentSynchronizationTaskImp, null, null, null, null);

LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, false);
Expand All @@ -62,11 +59,11 @@ public void testPeriodicFetching() throws InterruptedException {

SplitChangeFetcher splitChangeFetcher = Mockito.mock(JsonLocalhostSplitChangeFetcher.class);
SplitParser splitParser = new SplitParser();
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();

SplitFetcher splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, TELEMETRY_STORAGE_NOOP, FLAG_SETS_FILTER,
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
ruleBasedSegmentParser, ruleBasedSegmentCache);

SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCacheProducer, 1000L, null);
FetchOptions fetchOptions = new FetchOptions.Builder().build();
Expand All @@ -75,7 +72,7 @@ public void testPeriodicFetching() throws InterruptedException {
SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl();

SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer,
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null);
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache);

SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, segmentSynchronizationTaskImp, null, null, null, null);
LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@
import io.split.client.interceptors.FlagSetsFilterImpl;
import io.split.engine.segments.SegmentChangeFetcher;
import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.storages.SegmentCache;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCache;
import io.split.storages.SplitCacheConsumer;
import io.split.storages.SplitCacheProducer;
import io.split.storages.RuleBasedSegmentCacheProducer;
import io.split.storages.*;
import io.split.storages.memory.InMemoryCacheImp;
import io.split.engine.experiments.FetchResult;
import io.split.engine.experiments.SplitFetcherImp;
Expand Down Expand Up @@ -88,7 +83,7 @@ public void syncAll() throws InterruptedException {
public void testSyncAllSegments() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(Mockito.mock(SegmentChangeFetcher.class),
20L, 1, _segmentCacheProducer, Mockito.mock(TelemetryRuntimeProducer.class),
Mockito.mock(SplitCacheConsumer.class), null);
Mockito.mock(SplitCacheConsumer.class), null, Mockito.mock(RuleBasedSegmentCache.class));
Field synchronizerSegmentFetcher = SynchronizerImp.class.getDeclaredField("_segmentSynchronizationTaskImp");
synchronizerSegmentFetcher.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void testLocalHost() {

FetchResult fetchResult = splitFetcher.forceRefresh(fetchOptions);

Assert.assertEquals(1, fetchResult.getSegments().size());
Assert.assertEquals(2, fetchResult.getSegments().size());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import com.google.common.collect.Lists;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.interceptors.FlagSetsFilterImpl;
import io.split.storages.RuleBasedSegmentCacheProducer;
import io.split.storages.*;
import io.split.storages.memory.InMemoryCacheImp;
import io.split.storages.SegmentCache;
import io.split.storages.memory.RuleBasedSegmentCacheInMemoryImp;
import io.split.storages.memory.SegmentCacheInMemoryImpl;
import io.split.storages.SplitCache;
import io.split.client.dtos.*;
import io.split.engine.ConditionsTestUtil;
import io.split.engine.common.FetchOptions;
Expand Down Expand Up @@ -157,14 +155,14 @@ public void whenParserFailsWeRemoveTheExperiment() throws InterruptedException {

SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
SplitCache cache = new InMemoryCacheImp(-1, FLAG_SETS_FILTER);
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();

SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class);
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null);
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null, ruleBasedSegmentCache);
segmentSynchronizationTask.start();
SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER,
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
ruleBasedSegmentParser, ruleBasedSegmentCache);


// execute the fetcher for a little bit.
Expand All @@ -182,14 +180,14 @@ public void ifThereIsAProblemTalkingToSplitChangeCountDownLatchIsNotDecremented(
SplitChangeFetcher splitChangeFetcher = mock(SplitChangeFetcher.class);
when(splitChangeFetcher.fetch(-1L, -1, new FetchOptions.Builder().build())).thenThrow(new RuntimeException());
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();

SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class);
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null);
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null, ruleBasedSegmentCache);
segmentSynchronizationTask.start();
SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER,
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
ruleBasedSegmentParser, ruleBasedSegmentCache);

// execute the fetcher for a little bit.
executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -224,11 +222,11 @@ public void addFeatureFlags() throws InterruptedException {
SplitChangeFetcher splitChangeFetcher = mock(SplitChangeFetcher.class);
when(splitChangeFetcher.fetch(Mockito.eq(-1L), Mockito.eq(-1L), Mockito.any())).thenReturn(validReturn);

RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(new HashSet<>(Arrays.asList("set_1", "set_2")));
SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, flagSetsFilter,
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
ruleBasedSegmentParser, ruleBasedSegmentCache);

executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS);

Expand Down Expand Up @@ -282,16 +280,16 @@ public void worksWithUserDefinedSegments() throws Exception {
AChangePerCallSplitChangeFetcher experimentChangeFetcher = new AChangePerCallSplitChangeFetcher(segmentName);
SplitCache cache = new InMemoryCacheImp(startingChangeNumber, FLAG_SETS_FILTER);
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();

SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class);
SegmentChange segmentChange = getSegmentChange(0L, 0L, segmentName);
when(segmentChangeFetcher.fetch(anyString(), anyLong(), any())).thenReturn(segmentChange);
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, Mockito.mock(TelemetryStorage.class), cache, null);
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, Mockito.mock(TelemetryStorage.class), cache, null, ruleBasedSegmentCache);
segmentSynchronizationTask.start();
SplitFetcherImp fetcher = new SplitFetcherImp(experimentChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER,
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
ruleBasedSegmentParser, ruleBasedSegmentCache);

// execute the fetcher for a little bit.
executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS);
Expand Down
Loading