Skip to content

Commit a68e2f6

Browse files
committed
Added RBS support for SSE classes
1 parent da05139 commit a68e2f6

26 files changed

+392
-99
lines changed

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
278278

279279
_syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI,
280280
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser,
281-
flagSetsFilter);
281+
ruleBasedSegmentParser, flagSetsFilter, ruleBasedSegmentCache);
282282
_syncManager.start();
283283

284284
// DestroyOnShutDown

client/src/main/java/io/split/engine/common/ConsumerSynchronizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public void stopPeriodicFetching() {
3535
}
3636

3737
@Override
38-
public void refreshSplits(Long targetChangeNumber) {
38+
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
3939
//No-Op
4040
}
4141

client/src/main/java/io/split/engine/common/LocalhostSynchronizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void stopPeriodicFetching() {
5656
}
5757

5858
@Override
59-
public void refreshSplits(Long targetChangeNumber) {
59+
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
6060
FetchResult fetchResult = _splitFetcher.forceRefresh(new FetchOptions.Builder().cacheControlHeaders(true).build());
6161
if (fetchResult.isSuccess()){
6262
_log.debug("Refresh feature flags completed");

client/src/main/java/io/split/engine/common/PushManagerImp.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.annotations.VisibleForTesting;
44
import io.split.client.interceptors.FlagSetsFilter;
5+
import io.split.engine.experiments.RuleBasedSegmentParser;
56
import io.split.engine.experiments.SplitParser;
67
import io.split.engine.sse.AuthApiClient;
78
import io.split.engine.sse.AuthApiClientImp;
@@ -17,6 +18,7 @@
1718
import io.split.engine.sse.workers.FeatureFlagWorkerImp;
1819
import io.split.engine.sse.workers.Worker;
1920

21+
import io.split.storages.RuleBasedSegmentCache;
2022
import io.split.storages.SplitCacheProducer;
2123
import io.split.telemetry.domain.StreamingEvent;
2224
import io.split.telemetry.domain.enums.StreamEventsEnum;
@@ -79,9 +81,11 @@ public static PushManagerImp build(Synchronizer synchronizer,
7981
ThreadFactory threadFactory,
8082
SplitParser splitParser,
8183
SplitCacheProducer splitCacheProducer,
82-
FlagSetsFilter flagSetsFilter) {
83-
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer,
84-
telemetryRuntimeProducer, flagSetsFilter);
84+
FlagSetsFilter flagSetsFilter,
85+
RuleBasedSegmentCache ruleBasedSegmentCache,
86+
RuleBasedSegmentParser ruleBasedSegmentParser) {
87+
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer,
88+
ruleBasedSegmentCache, telemetryRuntimeProducer, flagSetsFilter);
8589
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
8690
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer);
8791

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
import io.split.client.SplitClientConfig;
66
import io.split.client.interceptors.FlagSetsFilter;
77
import io.split.engine.SDKReadinessGates;
8+
import io.split.engine.experiments.RuleBasedSegmentParser;
89
import io.split.engine.experiments.SplitFetcher;
910
import io.split.engine.experiments.SplitParser;
1011
import io.split.engine.experiments.SplitSynchronizationTask;
1112
import io.split.engine.segments.SegmentSynchronizationTask;
13+
import io.split.storages.RuleBasedSegmentCache;
1214
import io.split.storages.SegmentCacheProducer;
1315
import io.split.storages.SplitCacheProducer;
1416
import io.split.telemetry.domain.StreamingEvent;
@@ -89,12 +91,15 @@ public static SyncManagerImp build(SplitTasks splitTasks,
8991
TelemetrySynchronizer telemetrySynchronizer,
9092
SplitClientConfig config,
9193
SplitParser splitParser,
92-
FlagSetsFilter flagSetsFilter) {
94+
RuleBasedSegmentParser ruleBasedSegmentParser,
95+
FlagSetsFilter flagSetsFilter,
96+
RuleBasedSegmentCache ruleBasedSegmentCache) {
9397
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
9498
Synchronizer synchronizer = new SynchronizerImp(splitTasks,
9599
splitFetcher,
96100
splitCacheProducer,
97101
segmentCacheProducer,
102+
ruleBasedSegmentCache,
98103
config.streamingRetryDelay(),
99104
config.streamingFetchMaxRetries(),
100105
config.failedAttemptsBeforeLogging(),
@@ -109,7 +114,9 @@ public static SyncManagerImp build(SplitTasks splitTasks,
109114
config.getThreadFactory(),
110115
splitParser,
111116
splitCacheProducer,
112-
flagSetsFilter);
117+
flagSetsFilter,
118+
ruleBasedSegmentCache,
119+
ruleBasedSegmentParser);
113120

114121
return new SyncManagerImp(splitTasks,
115122
config.streamingEnabled(),

client/src/main/java/io/split/engine/common/Synchronizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ public interface Synchronizer {
66
boolean syncAll();
77
void startPeriodicFetching();
88
void stopPeriodicFetching();
9-
void refreshSplits(Long targetChangeNumber);
9+
void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber);
1010
void localKillSplit(SplitKillNotification splitKillNotification);
1111
void refreshSegment(String segmentName, Long targetChangeNumber);
1212
void startPeriodicDataRecording();

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.split.engine.segments.SegmentFetcher;
1010
import io.split.engine.segments.SegmentSynchronizationTask;
1111
import io.split.engine.sse.dtos.SplitKillNotification;
12+
import io.split.storages.RuleBasedSegmentCacheProducer;
1213
import io.split.storages.SegmentCacheProducer;
1314
import io.split.storages.SplitCacheProducer;
1415
import io.split.telemetry.synchronizer.TelemetrySyncTask;
@@ -34,6 +35,7 @@ public class SynchronizerImp implements Synchronizer {
3435
private final SplitFetcher _splitFetcher;
3536
private final SegmentSynchronizationTask _segmentSynchronizationTaskImp;
3637
private final SplitCacheProducer _splitCacheProducer;
38+
private final RuleBasedSegmentCacheProducer _ruleBasedSegmentCacheProducer;
3739
private final SegmentCacheProducer segmentCacheProducer;
3840
private final ImpressionsManager _impressionManager;
3941
private final EventsTask _eventsTask;
@@ -48,6 +50,7 @@ public SynchronizerImp(SplitTasks splitTasks,
4850
SplitFetcher splitFetcher,
4951
SplitCacheProducer splitCacheProducer,
5052
SegmentCacheProducer segmentCacheProducer,
53+
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer,
5154
int onDemandFetchRetryDelayMs,
5255
int onDemandFetchMaxRetries,
5356
int failedAttemptsBeforeLogging,
@@ -56,6 +59,7 @@ public SynchronizerImp(SplitTasks splitTasks,
5659
_splitFetcher = checkNotNull(splitFetcher);
5760
_segmentSynchronizationTaskImp = checkNotNull(splitTasks.getSegmentSynchronizationTask());
5861
_splitCacheProducer = checkNotNull(splitCacheProducer);
62+
_ruleBasedSegmentCacheProducer = checkNotNull(ruleBasedSegmentCacheProducer);
5963
this.segmentCacheProducer = checkNotNull(segmentCacheProducer);
6064
_onDemandFetchRetryDelayMs = checkNotNull(onDemandFetchRetryDelayMs);
6165
_onDemandFetchMaxRetries = onDemandFetchMaxRetries;
@@ -103,7 +107,7 @@ private static class SyncResult {
103107
private final FetchResult _fetchResult;
104108
}
105109

106-
private SyncResult attemptSplitsSync(long targetChangeNumber,
110+
private SyncResult attemptSplitsSync(long targetChangeNumber, long ruleBasedSegmentChangeNumber,
107111
FetchOptions opts,
108112
Function<Void, Long> nextWaitMs,
109113
int maxRetries) {
@@ -114,7 +118,8 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
114118
if (fetchResult != null && !fetchResult.retry() && !fetchResult.isSuccess()) {
115119
return new SyncResult(false, remainingAttempts, fetchResult);
116120
}
117-
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) {
121+
if ((targetChangeNumber != 0 && targetChangeNumber <= _splitCacheProducer.getChangeNumber()) ||
122+
(ruleBasedSegmentChangeNumber != 0 && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber())) {
118123
return new SyncResult(true, remainingAttempts, fetchResult);
119124
} else if (remainingAttempts <= 0) {
120125
return new SyncResult(false, remainingAttempts, fetchResult);
@@ -130,9 +135,11 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
130135
}
131136

132137
@Override
133-
public void refreshSplits(Long targetChangeNumber) {
138+
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
134139

135-
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) {
140+
if ((targetChangeNumber != 0 && targetChangeNumber <= _splitCacheProducer.getChangeNumber()) ||
141+
(ruleBasedSegmentChangeNumber != 0 && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) ||
142+
(ruleBasedSegmentChangeNumber == 0 && targetChangeNumber == 0)) {
136143
return;
137144
}
138145

@@ -142,7 +149,7 @@ public void refreshSplits(Long targetChangeNumber) {
142149
.flagSetsFilter(_sets)
143150
.build();
144151

145-
SyncResult regularResult = attemptSplitsSync(targetChangeNumber, opts,
152+
SyncResult regularResult = attemptSplitsSync(targetChangeNumber, ruleBasedSegmentChangeNumber, opts,
146153
(discard) -> (long) _onDemandFetchRetryDelayMs, _onDemandFetchMaxRetries);
147154

148155
int attempts = _onDemandFetchMaxRetries - regularResult.remainingAttempts();
@@ -157,7 +164,7 @@ public void refreshSplits(Long targetChangeNumber) {
157164
_log.info(String.format("No changes fetched after %s attempts. Will retry bypassing CDN.", attempts));
158165
FetchOptions withCdnBypass = new FetchOptions.Builder(opts).targetChangeNumber(targetChangeNumber).build();
159166
Backoff backoff = new Backoff(ON_DEMAND_FETCH_BACKOFF_BASE_MS, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS);
160-
SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, withCdnBypass,
167+
SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, ruleBasedSegmentChangeNumber, withCdnBypass,
161168
(discard) -> backoff.interval(), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES);
162169

163170
int withoutCDNAttempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - withCDNBypassed._remainingAttempts;
@@ -175,7 +182,7 @@ public void localKillSplit(SplitKillNotification splitKillNotification) {
175182
if (splitKillNotification.getChangeNumber() > _splitCacheProducer.getChangeNumber()) {
176183
_splitCacheProducer.kill(splitKillNotification.getSplitName(), splitKillNotification.getDefaultTreatment(),
177184
splitKillNotification.getChangeNumber());
178-
refreshSplits(splitKillNotification.getChangeNumber());
185+
refreshSplits(splitKillNotification.getChangeNumber(), 0L);
179186
}
180187
}
181188

client/src/main/java/io/split/engine/experiments/ParsedSplit.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.collect.ImmutableList;
44
import io.split.engine.matchers.AttributeMatcher;
5+
import io.split.engine.matchers.RuleBasedSegmentMatcher;
56
import io.split.engine.matchers.UserDefinedSegmentMatcher;
67

78
import java.util.HashSet;
@@ -243,6 +244,15 @@ public Set<String> getSegmentsNames() {
243244
.collect(Collectors.toSet());
244245
}
245246

247+
public Set<String> getRuleBasedSegmentsNames() {
248+
return parsedConditions().stream()
249+
.flatMap(parsedCondition -> parsedCondition.matcher().attributeMatchers().stream())
250+
.filter(ParsedSplit::isRuleBasedSegmentMatcher)
251+
.map(ParsedSplit::asRuleBasedSegmentMatcherForEach)
252+
.map(RuleBasedSegmentMatcher::getSegmentName)
253+
.collect(Collectors.toSet());
254+
}
255+
246256
private static boolean isSegmentMatcher(AttributeMatcher attributeMatcher) {
247257
return ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate() instanceof UserDefinedSegmentMatcher;
248258
}
@@ -251,4 +261,11 @@ private static UserDefinedSegmentMatcher asSegmentMatcherForEach(AttributeMatche
251261
return (UserDefinedSegmentMatcher) ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate();
252262
}
253263

264+
private static boolean isRuleBasedSegmentMatcher(AttributeMatcher attributeMatcher) {
265+
return ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate() instanceof RuleBasedSegmentMatcher;
266+
}
267+
268+
private static RuleBasedSegmentMatcher asRuleBasedSegmentMatcherForEach(AttributeMatcher attributeMatcher) {
269+
return (RuleBasedSegmentMatcher) ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate();
270+
}
254271
}

client/src/main/java/io/split/engine/experiments/ParserUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.split.engine.matchers.LessThanOrEqualToSemverMatcher;
2323
import io.split.engine.matchers.InListSemverMatcher;
2424
import io.split.engine.matchers.BetweenSemverMatcher;
25+
import io.split.engine.matchers.RuleBasedSegmentMatcher;
2526
import io.split.engine.matchers.collections.ContainsAllOfSetMatcher;
2627
import io.split.engine.matchers.collections.ContainsAnyOfSetMatcher;
2728
import io.split.engine.matchers.collections.EqualToSetMatcher;
@@ -183,6 +184,11 @@ public static AttributeMatcher toMatcher(Matcher matcher) {
183184
checkNotNull(matcher.betweenStringMatcherData, "betweenStringMatcherData is required for BETWEEN_SEMVER matcher type");
184185
delegate = new BetweenSemverMatcher(matcher.betweenStringMatcherData.start, matcher.betweenStringMatcherData.end);
185186
break;
187+
case IN_RULE_BASED_SEGMENT:
188+
checkNotNull(matcher.userDefinedSegmentMatcherData);
189+
String ruleBasedSegmentName = matcher.userDefinedSegmentMatcherData.segmentName;
190+
delegate = new RuleBasedSegmentMatcher(ruleBasedSegmentName);
191+
break;
186192
default:
187193
throw new IllegalArgumentException("Unknown matcher type: " + matcher.matcherType);
188194
}

client/src/main/java/io/split/engine/sse/NotificationParserImp.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.split.engine.sse.dtos.RawMessageNotification;
1212
import io.split.engine.sse.dtos.SegmentChangeNotification;
1313
import io.split.engine.sse.dtos.SplitKillNotification;
14+
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
1415
import io.split.engine.sse.exceptions.EventParsingException;
1516

1617
public class NotificationParserImp implements NotificationParser {
@@ -48,6 +49,8 @@ private IncomingNotification parseNotification(GenericNotificationData genericNo
4849
switch (genericNotificationData.getType()) {
4950
case SPLIT_UPDATE:
5051
return new FeatureFlagChangeNotification(genericNotificationData);
52+
case RB_SEGMENT_UPDATE:
53+
return new RuleBasedSegmentChangeNotification(genericNotificationData);
5154
case SPLIT_KILL:
5255
return new SplitKillNotification(genericNotificationData);
5356
case SEGMENT_UPDATE:

0 commit comments

Comments
 (0)