Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 6 additions & 19 deletions client/src/main/java/io/split/client/HttpSplitChangeFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import io.split.Spec;
import io.split.client.dtos.SplitChange;
import io.split.client.dtos.SplitHttpResponse;
import io.split.client.dtos.RuleBasedSegment;
import io.split.client.dtos.SplitChangesOldPayloadDto;
import io.split.client.dtos.ChangeDto;
import io.split.client.dtos.Split;
import io.split.client.exceptions.UriTooLongException;
import io.split.client.utils.Json;
import io.split.client.utils.Utils;
Expand All @@ -25,7 +22,6 @@

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.split.Spec.SPEC_1_3;
Expand Down Expand Up @@ -91,7 +87,7 @@ public SplitChange fetch(long since, long sinceRBS, FetchOptions options) {
_log.warn("Detected proxy without support for Feature flags spec {} version, will switch to spec version {}",
SPEC_1_3, SPEC_1_1);
_lastProxyCheckTimestamp = System.currentTimeMillis();
return fetch(since, 0, options);
return fetch(since, sinceRBS, options);
}

_telemetryRuntimeProducer.recordSyncError(ResourceEnum.SPLIT_SYNC, response.statusCode());
Expand All @@ -100,30 +96,21 @@ public SplitChange fetch(long since, long sinceRBS, FetchOptions options) {
);
}

String body = response.body();
if (specVersion.equals(Spec.SPEC_1_1)) {
return Json.fromJson(body, SplitChangesOldPayloadDto.class).toSplitChange();
return Json.fromJson(response.body(), SplitChangesOldPayloadDto.class).toSplitChange();
}

return Json.fromJson(body, SplitChange.class);

SplitChange splitChange = Json.fromJson(response.body(), SplitChange.class);
splitChange.clearCache = _lastProxyCheckTimestamp != 0;
_lastProxyCheckTimestamp = 0L;
return splitChange;
} catch (Exception e) {
throw new IllegalStateException(String.format("Problem fetching splitChanges since %s: %s", since, e), e);
} finally {
_telemetryRuntimeProducer.recordSyncLatency(HTTPLatenciesEnum.SPLITS, System.currentTimeMillis() - start);
}
}

public Long getLastProxyCheckTimestamp() {
return _lastProxyCheckTimestamp;
}

public void setLastProxyCheckTimestamp(long lastProxyCheckTimestamp) {
synchronized (_lock) {
_lastProxyCheckTimestamp = lastProxyCheckTimestamp;
}
}


private URI buildURL(FetchOptions options, long since, long sinceRBS) throws URISyntaxException {
URIBuilder uriBuilder = new URIBuilder(_target).addParameter(SPEC, "" + specVersion);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.split.client;

import com.google.gson.JsonObject;
import com.google.gson.stream.JsonReader;
import io.split.client.dtos.SplitChange;
import io.split.client.dtos.SplitChangesOldPayloadDto;
import io.split.client.utils.InputStreamProvider;
import io.split.client.utils.Json;
import io.split.client.utils.LocalhostSanitizer;
Expand Down Expand Up @@ -37,13 +39,20 @@ public JsonLocalhostSplitChangeFetcher(InputStreamProvider inputStreamProvider)
public SplitChange fetch(long since, long sinceRBS, FetchOptions options) {
try {
JsonReader jsonReader = new JsonReader(new BufferedReader(new InputStreamReader(_inputStreamProvider.get(), StandardCharsets.UTF_8)));
if (checkOldSpec(new JsonReader(new BufferedReader(new InputStreamReader(_inputStreamProvider.get(), StandardCharsets.UTF_8))))) {
return Json.fromJson(jsonReader, SplitChangesOldPayloadDto.class).toSplitChange();
}
SplitChange splitChange = Json.fromJson(jsonReader, SplitChange.class);
return processSplitChange(splitChange, since, sinceRBS);
} catch (Exception e) {
throw new IllegalStateException("Problem fetching splitChanges: " + e.getMessage(), e);
}
}

private boolean checkOldSpec(JsonReader jsonReader) {
return Json.fromJson(jsonReader, JsonObject.class).has("splits");
}

private SplitChange processSplitChange(SplitChange splitChange, long changeNumber, long changeNumberRBS) throws NoSuchAlgorithmException {
SplitChange splitChangeToProcess = LocalhostSanitizer.sanitization(splitChange);
// if the till is less than storage CN and different from the default till ignore the change
Expand Down
1 change: 1 addition & 0 deletions client/src/main/java/io/split/client/dtos/ChangeDto.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.split.client.dtos;

import java.util.ArrayList;
import java.util.List;

public class ChangeDto<T> {
Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions client/src/main/java/io/split/client/dtos/SplitChange.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ public class SplitChange {
public ChangeDto<Split> featureFlags;
@SerializedName("rbs")
public ChangeDto<RuleBasedSegment> ruleBasedSegments;
public boolean clearCache;
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ private Set<String> runWithoutExceptionHandling(FetchOptions options) throws Int
throw new IllegalStateException("SplitChange was null");
}

if (change.clearCache) {
_splitCacheProducer.clear();
_ruleBasedSegmentCacheProducer.clear();
}

if (checkExitConditions(change.featureFlags, _splitCacheProducer.getChangeNumber()) ||
checkExitConditions(change.ruleBasedSegments, _ruleBasedSegmentCacheProducer.getChangeNumber())) {
return segments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ public interface RuleBasedSegmentCacheProducer {
void setChangeNumber(long changeNumber);
long getChangeNumber();
void update(List<ParsedRuleBasedSegment> toAdd, List<String> toRemove, long changeNumber);
void clear();
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ public void kill(String splitName, String defaultTreatment, long changeNumber) {
@Override
public void clear() {
_concurrentMap.clear();
_changeNumber.set(-1);
_concurrentTrafficTypeNameSet.clear();
_flagSets.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ public List<String> ruleBasedSegmentNames() {
return ruleBasedSegmentNamesList;
}

@Override
public void clear() {
_changeNumber.set(-1);
_concurrentMap.clear();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.split.storages.pluggable.adapters;

import io.split.client.dtos.RuleBasedSegment;
import io.split.client.utils.Json;
import io.split.engine.experiments.ParsedRuleBasedSegment;
import io.split.storages.RuleBasedSegmentCacheProducer;
import io.split.storages.pluggable.domain.PrefixAdapter;
import io.split.storages.pluggable.domain.UserStorageWrapper;
import io.split.storages.pluggable.utils.Helper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pluggable.CustomStorageWrapper;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static com.google.common.base.Preconditions.checkNotNull;

public class UserCustomRuleBasedSegmentAdapterProducer implements RuleBasedSegmentCacheProducer {

private static final Logger _log = LoggerFactory.getLogger(UserCustomRuleBasedSegmentAdapterProducer.class);

private final UserStorageWrapper _userStorageWrapper;

public UserCustomRuleBasedSegmentAdapterProducer(CustomStorageWrapper customStorageWrapper) {
_userStorageWrapper = new UserStorageWrapper(checkNotNull(customStorageWrapper));
}

@Override
public long getChangeNumber() {
String wrapperResponse = _userStorageWrapper.get(PrefixAdapter.buildRuleBasedSegmentChangeNumber());
return Helper.responseToLong(wrapperResponse, -1L);
}

@Override
public boolean remove(String ruleBasedSegmentName) {
// NoOp
return true;
}

@Override
public void setChangeNumber(long changeNumber) {
//NoOp
}

@Override
public void clear() {
//NoOp
}

@Override
public void update(List<ParsedRuleBasedSegment> toAdd, List<String> toRemove, long changeNumber) {
//NoOp
}

public Set<String> getSegments() {
//NoOp
return new HashSet<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,6 @@ public void testSwitchingToOldSpec() throws URISyntaxException, InvocationTarget
HttpSplitChangeFetcher fetcher = HttpSplitChangeFetcher.create(splitHtpClient, rootTarget,
Mockito.mock(TelemetryRuntimeProducer.class), true);

Field specVersion = fetcher.getClass().getDeclaredField("specVersion");
specVersion.setAccessible(true);
specVersion.set(fetcher, Spec.SPEC_1_1);

SplitChange change = fetcher.fetch(-1, -1, new FetchOptions.Builder().cacheControlHeaders(true).build());

List<ClassicHttpRequest> captured = requestCaptor.getAllValues();
Expand All @@ -264,7 +260,6 @@ public void testSwitchingToOldSpec() throws URISyntaxException, InvocationTarget
Assert.assertEquals(0, change.ruleBasedSegments.d.size());
Assert.assertEquals(-1, change.ruleBasedSegments.s);
Assert.assertEquals(-1, change.ruleBasedSegments.t);
Assert.assertTrue(fetcher.getLastProxyCheckTimestamp() > 0);

// Set proxy interval to low number to force check for spec 1.3
Field proxyInterval = fetcher.getClass().getDeclaredField("PROXY_CHECK_INTERVAL_MILLISECONDS_SS");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,23 @@ public void processTestForException() {

SplitChange splitChange = localhostSplitChangeFetcher.fetch(-1L, -1, fetchOptions);
}

@Test
public void testParseOldSpec() throws FileNotFoundException {
InputStream inputStream = new FileInputStream("src/test/resources/split_old_spec.json");
InputStreamProvider inputStreamProvider = new StaticContentInputStreamProvider(inputStream);
JsonLocalhostSplitChangeFetcher localhostSplitChangeFetcher = new JsonLocalhostSplitChangeFetcher(inputStreamProvider);
FetchOptions fetchOptions = Mockito.mock(FetchOptions.class);

SplitChange splitChange = localhostSplitChangeFetcher.fetch(-1L, -1, fetchOptions);

List<Split> split = splitChange.featureFlags.d;
Assert.assertEquals(7, split.size());
Assert.assertEquals(1660326991072L, splitChange.featureFlags.t);
Assert.assertEquals(-1L, splitChange.featureFlags.s);

Assert.assertEquals(new ArrayList<>(), splitChange.ruleBasedSegments.d);
Assert.assertEquals(-1L, splitChange.ruleBasedSegments.t);
Assert.assertEquals(-1L, splitChange.ruleBasedSegments.s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,21 @@ public void works() throws IOException, URISyntaxException, InterruptedException
Assert.assertEquals("on_whitelist", client.getTreatment("admin", "push_test"));
client.destroy();
}

@Test
public void testOldSpec() throws IOException, URISyntaxException, InterruptedException, TimeoutException {
SplitClientConfig config = SplitClientConfig.builder()
.splitFile("src/test/resources/split_old_spec.json")
.segmentDirectory("src/test/resources")
.setBlockUntilReadyTimeout(10000)
.build();
SplitFactory splitFactory = SplitFactoryBuilder.build("localhost", config);
SplitClient client = splitFactory.client();
client.blockUntilReady();

Assert.assertEquals("on", client.getTreatment("bilal", "split_1"));
Assert.assertEquals("off", client.getTreatment("bilal", "split_2"));
Assert.assertEquals("v5", client.getTreatment("admin", "split_2"));
client.destroy();
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package io.split.client;

import io.split.SSEMockServer;
import io.split.Spec;
import io.split.SplitMockServer;
import io.split.client.api.SplitView;
import io.split.client.dtos.EvaluationOptions;
import io.split.client.impressions.ImpressionsManager;
import io.split.client.utils.CustomDispatcher;
import io.split.engine.experiments.SplitFetcherImp;
import io.split.storages.enums.OperationMode;
import io.split.storages.enums.StorageMode;
import io.split.storages.pluggable.CustomStorageWrapperImp;
Expand All @@ -26,7 +24,6 @@

import javax.ws.rs.sse.OutboundSseEvent;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand Down Expand Up @@ -213,7 +210,7 @@ public void getTreatmentWithStreamingDisabled() throws Exception {

@Test
public void managerSplitsWithStreamingEnabled() throws Exception {
MockResponse response = new MockResponse().setBody("{\"ff\":{\"d\": [], \"s\":1585948850109, \"t\":1585948850109}, \"rbs\":{\"d\":[],\"s\":-1,\"t\":-1}}");
MockResponse response = new MockResponse().setBody("{\"ff\":{\"d\": [], \"s\":1585948850109, \"t\":1585948850109}, \"rbs\":{\"d\":[],\"s\":1585948850109,\"t\":1585948850109}}");
Queue responses = new LinkedList<>();
responses.add(response);
SplitMockServer splitServer = new SplitMockServer(CustomDispatcher.builder()
Expand Down Expand Up @@ -249,7 +246,6 @@ public void managerSplitsWithStreamingEnabled() throws Exception {

splitServer.stop();
sseServer.stop();
factory.destroy();
}

@Test
Expand Down Expand Up @@ -424,7 +420,7 @@ public void splitClientControlNotifications() throws Exception {

@Test
public void splitClientMultiFactory() throws Exception {
MockResponse response = new MockResponse().setBody("{\"ff\":{\"d\":[],\"s\":1585948850109, \"t\":1585948850109},\"rbs\":{\"d\":[],\"s\":1585948850109,\"t\":1585948850109}}");
MockResponse response = new MockResponse().setBody("{\"ff\":{\"d\": [], \"s\":1585948850109, \"t\":1585948850109}, \"rbs\":{\"d\":[],\"s\":1585948850109,\"t\":1585948850109}}");
Queue responses = new LinkedList<>();
responses.add(response);
responses.add(response);
Expand Down Expand Up @@ -577,10 +573,9 @@ public void keepAlive() throws Exception {
Queue responses = new LinkedList<>();
responses.add(response);

CustomDispatcher dispatcher = CustomDispatcher.builder()
SplitMockServer splitServer = new SplitMockServer(CustomDispatcher.builder()
.path(CustomDispatcher.SINCE_1585948850109, responses)
.build();
SplitMockServer splitServer = new SplitMockServer(dispatcher);
.build());

//plitMockServer splitServer = new SplitMockServer(CustomDispatcher.builder().build());
SSEMockServer.SseEventQueue eventQueue = new SSEMockServer.SseEventQueue();
Expand All @@ -599,6 +594,7 @@ public void keepAlive() throws Exception {

// wait to check keep alive notification.
Thread.sleep(50000);

// must reconnect and after the second syncAll the result must be different
Awaitility.await()
.atMost(1L, TimeUnit.MINUTES)
Expand Down Expand Up @@ -645,7 +641,6 @@ public void testConnectionClosedByRemoteHostIsProperlyHandled() throws Exception
Thread.sleep(1000);
result = client.getTreatment("admin", "push_test");
Assert.assertNotEquals("on_whitelist", result);
client.destroy();
}

@Test
Expand Down Expand Up @@ -682,7 +677,6 @@ public void testConnectionClosedIsProperlyHandled() throws Exception {
Thread.sleep(1000);
result = client.getTreatment("admin", "push_test");
Assert.assertNotEquals("on_whitelist", result);
client.destroy();
}

@Test
Expand Down Expand Up @@ -751,15 +745,15 @@ public void testPluggableMode() throws IOException, URISyntaxException {
Assert.assertNotNull(customStorageWrapper.getConfig());
String key = customStorageWrapper.getConfig().keySet().stream().collect(Collectors.toList()).get(0);
Assert.assertTrue(customStorageWrapper.getConfig().get(key).contains(StorageMode.PLUGGABLE.name()));
client.destroy();

} catch (TimeoutException | InterruptedException e) {
}
}

@Test
public void getTreatmentFlagSetWithPolling() throws Exception {
MockResponse response = new MockResponse().setBody("{\"ff\":{\"d\":[{\"trafficTypeName\":\"client\",\"name\":\"workm\",\"trafficAllocation\":100,\"trafficAllocationSeed\":147392224,\"seed\":524417105,\"status\":\"ACTIVE\",\"killed\":false,\"defaultTreatment\":\"on\",\"changeNumber\":1602796638344,\"algo\":2,\"configurations\":{},\"sets\":[\"set1\",\"set2\"],\"conditions\":[{\"conditionType\":\"ROLLOUT\",\"matcherGroup\":{\"combiner\":\"AND\",\"matchers\":[{\"keySelector\":{\"trafficType\":\"client\",\"attribute\":null},\"matcherType\":\"IN_SEGMENT\",\"negate\":false,\"userDefinedSegmentMatcherData\":{\"segmentName\":\"new_segment\"},\"whitelistMatcherData\":null,\"unaryNumericMatcherData\":null,\"betweenMatcherData\":null,\"booleanMatcherData\":null,\"dependencyMatcherData\":null,\"stringMatcherData\":null}]},\"partitions\":[{\"treatment\":\"on\",\"size\":0},{\"treatment\":\"off\",\"size\":0},{\"treatment\":\"free\",\"size\":100},{\"treatment\":\"conta\",\"size\":0}],\"label\":\"in segment new_segment\"},{\"conditionType\":\"ROLLOUT\",\"matcherGroup\":{\"combiner\":\"AND\",\"matchers\":[{\"keySelector\":{\"trafficType\":\"client\",\"attribute\":null},\"matcherType\":\"ALL_KEYS\",\"negate\":false,\"userDefinedSegmentMatcherData\":null,\"whitelistMatcherData\":null,\"unaryNumericMatcherData\":null,\"betweenMatcherData\":null,\"booleanMatcherData\":null,\"dependencyMatcherData\":null,\"stringMatcherData\":null}]},\"partitions\":[{\"treatment\":\"on\",\"size\":100},{\"treatment\":\"off\",\"size\":0},{\"treatment\":\"free\",\"size\":0},{\"treatment\":\"conta\",\"size\":0}],\"label\":\"default rule\"}]},{\"trafficTypeName\":\"client\",\"name\":\"workm_set_3\",\"trafficAllocation\":100,\"trafficAllocationSeed\":147392224,\"seed\":524417105,\"status\":\"ACTIVE\",\"killed\":false,\"defaultTreatment\":\"on\",\"changeNumber\":1602796638344,\"algo\":2,\"configurations\":{},\"sets\":[\"set3\"],\"conditions\":[{\"conditionType\":\"ROLLOUT\",\"matcherGroup\":{\"combiner\":\"AND\",\"matchers\":[{\"keySelector\":{\"trafficType\":\"client\",\"attribute\":null},\"matcherType\":\"IN_SEGMENT\",\"negate\":false,\"userDefinedSegmentMatcherData\":{\"segmentName\":\"new_segment\"},\"whitelistMatcherData\":null,\"unaryNumericMatcherData\":null,\"betweenMatcherData\":null,\"booleanMatcherData\":null,\"dependencyMatcherData\":null,\"stringMatcherData\":null}]},\"partitions\":[{\"treatment\":\"on\",\"size\":0},{\"treatment\":\"off\",\"size\":0},{\"treatment\":\"free\",\"size\":100},{\"treatment\":\"conta\",\"size\":0}],\"label\":\"in segment new_segment\"},{\"conditionType\":\"ROLLOUT\",\"matcherGroup\":{\"combiner\":\"AND\",\"matchers\":[{\"keySelector\":{\"trafficType\":\"client\",\"attribute\":null},\"matcherType\":\"ALL_KEYS\",\"negate\":false,\"userDefinedSegmentMatcherData\":null,\"whitelistMatcherData\":null,\"unaryNumericMatcherData\":null,\"betweenMatcherData\":null,\"booleanMatcherData\":null,\"dependencyMatcherData\":null,\"stringMatcherData\":null}]},\"partitions\":[{\"treatment\":\"on\",\"size\":100},{\"treatment\":\"off\",\"size\":0},{\"treatment\":\"free\",\"size\":0},{\"treatment\":\"conta\",\"size\":0}],\"label\":\"default rule\"}]}],\"s\":-1,\"t\":1602796638344},\"rbs\":{\"d\":[],\"t\":-1,\"s\":-1}}");
MockResponse responseFlag = new MockResponse().setBody("{\"ff\":{\"d\":[],\"s\":1602796638344,\"t\":1602796638344},\"rbs\":{\"d\":[],\"t\":-1,\"s\":-1}}");
MockResponse responseFlag = new MockResponse().setBody("{\"ff\":{\"d\": [], \"s\":1602796638344, \"t\":1602796638344},\"rbs\":{\"d\":[],\"t\":-1,\"s\":-1}}");
MockResponse segmentResponse = new MockResponse().setBody("{\"name\":\"new_segment\",\"added\":[\"user-1\"],\"removed\":[\"user-2\",\"user-3\"],\"since\":-1,\"till\":-1}");
Queue responses = new LinkedList<>();
responses.add(response);
Expand Down
Loading
Loading