Skip to content
This repository was archived by the owner on May 30, 2024. It is now read-only.

Commit 28ca2af

Browse files
committed
add unit tests for StreamProcessor
1 parent 770169b commit 28ca2af

File tree

2 files changed

+353
-5
lines changed

2 files changed

+353
-5
lines changed

src/main/java/com/launchdarkly/client/StreamProcessor.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

14+
import com.google.common.annotations.VisibleForTesting;
1415
import com.google.common.util.concurrent.SettableFuture;
1516
import com.google.gson.Gson;
1617
import com.google.gson.JsonElement;
@@ -161,8 +162,19 @@ public void onError(Throwable throwable) {
161162
}
162163
};
163164

165+
es = createEventSource(handler,
166+
URI.create(config.streamURI.toASCIIString() + "/all"),
167+
connectionErrorHandler,
168+
headers);
169+
es.start();
170+
return initFuture;
171+
}
172+
173+
@VisibleForTesting
174+
protected EventSource createEventSource(EventHandler handler, URI streamUri, ConnectionErrorHandler errorHandler,
175+
Headers headers) {
164176
EventSource.Builder builder = new EventSource.Builder(handler, URI.create(config.streamURI.toASCIIString() + "/all"))
165-
.connectionErrorHandler(connectionErrorHandler)
177+
.connectionErrorHandler(errorHandler)
166178
.headers(headers)
167179
.reconnectTimeMs(config.reconnectTimeMs)
168180
.connectTimeoutMs(config.connectTimeoutMillis)
@@ -179,11 +191,9 @@ public void onError(Throwable throwable) {
179191
}
180192
}
181193

182-
es = builder.build();
183-
es.start();
184-
return initFuture;
194+
return builder.build();
185195
}
186-
196+
187197
@Override
188198
public void close() throws IOException {
189199
logger.info("Closing LaunchDarkly StreamProcessor");
Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
package com.launchdarkly.client;
2+
3+
import com.launchdarkly.eventsource.ConnectionErrorHandler;
4+
import com.launchdarkly.eventsource.EventHandler;
5+
import com.launchdarkly.eventsource.EventSource;
6+
import com.launchdarkly.eventsource.MessageEvent;
7+
import com.launchdarkly.eventsource.UnsuccessfulResponseException;
8+
9+
import org.easymock.EasyMockSupport;
10+
import org.junit.Before;
11+
import org.junit.Test;
12+
13+
import java.io.IOException;
14+
import java.net.URI;
15+
import java.util.Collections;
16+
import java.util.concurrent.Future;
17+
18+
import static com.launchdarkly.client.VersionedDataKind.FEATURES;
19+
import static com.launchdarkly.client.VersionedDataKind.SEGMENTS;
20+
import static org.easymock.EasyMock.expect;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
23+
import static org.junit.Assert.assertNull;
24+
import static org.junit.Assert.assertTrue;
25+
26+
import okhttp3.Headers;
27+
28+
public class StreamProcessorTest extends EasyMockSupport {
29+
30+
private static final String SDK_KEY = "sdk_key";
31+
private static final URI STREAM_URI = URI.create("http://stream.test.com");
32+
private static final String FEATURE1_KEY = "feature1";
33+
private static final int FEATURE1_VERSION = 11;
34+
private static final FeatureFlag FEATURE = new FeatureFlagBuilder(FEATURE1_KEY).version(FEATURE1_VERSION).build();
35+
private static final String SEGMENT1_KEY = "segment1";
36+
private static final int SEGMENT1_VERSION = 22;
37+
private static final Segment SEGMENT = new Segment.Builder(SEGMENT1_KEY).version(SEGMENT1_VERSION).build();
38+
39+
private InMemoryFeatureStore featureStore;
40+
private LDConfig.Builder configBuilder;
41+
private FeatureRequestor mockRequestor;
42+
private EventSource mockEventSource;
43+
private EventHandler eventHandler;
44+
private URI actualStreamUri;
45+
private ConnectionErrorHandler errorHandler;
46+
private Headers headers;
47+
48+
@Before
49+
public void setup() {
50+
featureStore = new InMemoryFeatureStore();
51+
configBuilder = new LDConfig.Builder().featureStore(featureStore);
52+
mockRequestor = createStrictMock(FeatureRequestor.class);
53+
mockEventSource = createStrictMock(EventSource.class);
54+
}
55+
56+
@Test
57+
public void streamUriHasCorrectEndpoint() {
58+
LDConfig config = configBuilder.streamURI(STREAM_URI).build();
59+
createStreamProcessor(SDK_KEY, config).start();
60+
assertEquals(URI.create(STREAM_URI.toString() + "/all"), actualStreamUri);
61+
}
62+
63+
@Test
64+
public void headersHaveAuthorization() {
65+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
66+
assertEquals(SDK_KEY, headers.get("Authorization"));
67+
}
68+
69+
@Test
70+
public void headersHaveUserAgent() {
71+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
72+
assertEquals("JavaClient/" + LDClient.CLIENT_VERSION, headers.get("User-Agent"));
73+
}
74+
75+
@Test
76+
public void headersHaveAccept() {
77+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
78+
assertEquals("text/event-stream", headers.get("Accept"));
79+
}
80+
81+
@Test
82+
public void putCausesFeatureToBeStored() throws Exception {
83+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
84+
MessageEvent event = new MessageEvent("{\"data\":{\"flags\":{\"" +
85+
FEATURE1_KEY + "\":" + featureJson(FEATURE1_KEY, FEATURE1_VERSION) + "}," +
86+
"\"segments\":{}}}");
87+
eventHandler.onMessage("put", event);
88+
89+
assertFeatureInStore(FEATURE);
90+
}
91+
92+
@Test
93+
public void putCausesSegmentToBeStored() throws Exception {
94+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
95+
MessageEvent event = new MessageEvent("{\"data\":{\"flags\":{},\"segments\":{\"" +
96+
SEGMENT1_KEY + "\":" + segmentJson(SEGMENT1_KEY, SEGMENT1_VERSION) + "}}}");
97+
eventHandler.onMessage("put", event);
98+
99+
assertSegmentInStore(SEGMENT);
100+
}
101+
102+
@Test
103+
public void storeNotInitializedByDefault() throws Exception {
104+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
105+
assertFalse(featureStore.initialized());
106+
}
107+
108+
@Test
109+
public void putCausesStoreToBeInitialized() throws Exception {
110+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
111+
eventHandler.onMessage("put", emptyPutEvent());
112+
assertTrue(featureStore.initialized());
113+
}
114+
115+
@Test
116+
public void processorNotInitializedByDefault() throws Exception {
117+
StreamProcessor sp = createStreamProcessor(SDK_KEY, configBuilder.build());
118+
sp.start();
119+
assertFalse(sp.initialized());
120+
}
121+
122+
@Test
123+
public void putCausesProcessorToBeInitialized() throws Exception {
124+
StreamProcessor sp = createStreamProcessor(SDK_KEY, configBuilder.build());
125+
sp.start();
126+
eventHandler.onMessage("put", emptyPutEvent());
127+
assertTrue(sp.initialized());
128+
}
129+
130+
@Test
131+
public void futureIsNotSetByDefault() throws Exception {
132+
StreamProcessor sp = createStreamProcessor(SDK_KEY, configBuilder.build());
133+
Future<Void> future = sp.start();
134+
assertFalse(future.isDone());
135+
}
136+
137+
@Test
138+
public void putCausesFutureToBeSet() throws Exception {
139+
StreamProcessor sp = createStreamProcessor(SDK_KEY, configBuilder.build());
140+
Future<Void> future = sp.start();
141+
eventHandler.onMessage("put", emptyPutEvent());
142+
assertTrue(future.isDone());
143+
}
144+
145+
@Test
146+
public void patchUpdatesFeature() throws Exception {
147+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
148+
eventHandler.onMessage("put", emptyPutEvent());
149+
150+
String path = "/flags/" + FEATURE1_KEY;
151+
MessageEvent event = new MessageEvent("{\"path\":\"" + path + "\",\"data\":" +
152+
featureJson(FEATURE1_KEY, FEATURE1_VERSION) + "}");
153+
eventHandler.onMessage("patch", event);
154+
155+
assertFeatureInStore(FEATURE);
156+
}
157+
158+
@Test
159+
public void patchUpdatesSegment() throws Exception {
160+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
161+
eventHandler.onMessage("put", emptyPutEvent());
162+
163+
String path = "/segments/" + SEGMENT1_KEY;
164+
MessageEvent event = new MessageEvent("{\"path\":\"" + path + "\",\"data\":" +
165+
segmentJson(SEGMENT1_KEY, SEGMENT1_VERSION) + "}");
166+
eventHandler.onMessage("patch", event);
167+
168+
assertSegmentInStore(SEGMENT);
169+
}
170+
171+
@Test
172+
public void deleteDeletesFeature() throws Exception {
173+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
174+
eventHandler.onMessage("put", emptyPutEvent());
175+
featureStore.upsert(FEATURES, FEATURE);
176+
177+
String path = "/flags/" + FEATURE1_KEY;
178+
MessageEvent event = new MessageEvent("{\"path\":\"" + path + "\",\"version\":" +
179+
(FEATURE1_VERSION + 1) + "}");
180+
eventHandler.onMessage("delete", event);
181+
182+
assertNull(featureStore.get(FEATURES, FEATURE1_KEY));
183+
}
184+
185+
@Test
186+
public void deleteDeletesSegment() throws Exception {
187+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
188+
eventHandler.onMessage("put", emptyPutEvent());
189+
featureStore.upsert(SEGMENTS, SEGMENT);
190+
191+
String path = "/segments/" + SEGMENT1_KEY;
192+
MessageEvent event = new MessageEvent("{\"path\":\"" + path + "\",\"version\":" +
193+
(SEGMENT1_VERSION + 1) + "}");
194+
eventHandler.onMessage("delete", event);
195+
196+
assertNull(featureStore.get(SEGMENTS, SEGMENT1_KEY));
197+
}
198+
199+
@Test
200+
public void indirectPutRequestsAndStoresFeature() throws Exception {
201+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
202+
setupRequestorToReturnAllDataWithFlag(FEATURE);
203+
replayAll();
204+
205+
eventHandler.onMessage("indirect/put", new MessageEvent(""));
206+
207+
assertFeatureInStore(FEATURE);
208+
}
209+
210+
@Test
211+
public void indirectPutInitializesStore() throws Exception {
212+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
213+
setupRequestorToReturnAllDataWithFlag(FEATURE);
214+
replayAll();
215+
216+
eventHandler.onMessage("indirect/put", new MessageEvent(""));
217+
218+
assertTrue(featureStore.initialized());
219+
}
220+
221+
@Test
222+
public void indirectPutInitializesProcessor() throws Exception {
223+
StreamProcessor sp = createStreamProcessor(SDK_KEY, configBuilder.build());
224+
sp.start();
225+
setupRequestorToReturnAllDataWithFlag(FEATURE);
226+
replayAll();
227+
228+
eventHandler.onMessage("indirect/put", new MessageEvent(""));
229+
230+
assertTrue(featureStore.initialized());
231+
}
232+
233+
@Test
234+
public void indirectPutSetsFuture() throws Exception {
235+
StreamProcessor sp = createStreamProcessor(SDK_KEY, configBuilder.build());
236+
Future<Void> future = sp.start();
237+
setupRequestorToReturnAllDataWithFlag(FEATURE);
238+
replayAll();
239+
240+
eventHandler.onMessage("indirect/put", new MessageEvent(""));
241+
242+
assertTrue(future.isDone());
243+
}
244+
245+
@Test
246+
public void indirectPatchRequestsAndUpdatesFeature() throws Exception {
247+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
248+
expect(mockRequestor.getFlag(FEATURE1_KEY)).andReturn(FEATURE);
249+
replayAll();
250+
251+
eventHandler.onMessage("put", emptyPutEvent());
252+
eventHandler.onMessage("indirect/patch", new MessageEvent("/flags/" + FEATURE1_KEY));
253+
254+
assertFeatureInStore(FEATURE);
255+
}
256+
257+
@Test
258+
public void indirectPatchRequestsAndUpdatesSegment() throws Exception {
259+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
260+
expect(mockRequestor.getSegment(SEGMENT1_KEY)).andReturn(SEGMENT);
261+
replayAll();
262+
263+
eventHandler.onMessage("put", emptyPutEvent());
264+
eventHandler.onMessage("indirect/patch", new MessageEvent("/segments/" + SEGMENT1_KEY));
265+
266+
assertSegmentInStore(SEGMENT);
267+
}
268+
269+
@Test
270+
public void unknownEventTypeDoesNotThrowException() throws Exception {
271+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
272+
eventHandler.onMessage("what", new MessageEvent(""));
273+
}
274+
275+
@Test
276+
public void streamWillReconnectAfterGeneralIOException() throws Exception {
277+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
278+
ConnectionErrorHandler.Action action = errorHandler.onConnectionError(new IOException());
279+
assertEquals(ConnectionErrorHandler.Action.PROCEED, action);
280+
}
281+
282+
@Test
283+
public void streamWillReconnectAfterHttp500Error() throws Exception {
284+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
285+
UnsuccessfulResponseException e = new UnsuccessfulResponseException(500);
286+
ConnectionErrorHandler.Action action = errorHandler.onConnectionError(e);
287+
assertEquals(ConnectionErrorHandler.Action.PROCEED, action);
288+
}
289+
290+
@Test
291+
public void streamWillCloseAfterHttp401Error() throws Exception {
292+
createStreamProcessor(SDK_KEY, configBuilder.build()).start();
293+
UnsuccessfulResponseException e = new UnsuccessfulResponseException(401);
294+
ConnectionErrorHandler.Action action = errorHandler.onConnectionError(e);
295+
assertEquals(ConnectionErrorHandler.Action.SHUTDOWN, action);
296+
}
297+
298+
private StreamProcessor createStreamProcessor(String sdkKey, LDConfig config) {
299+
return new StreamProcessor(sdkKey, config, mockRequestor) {
300+
@Override
301+
protected EventSource createEventSource(EventHandler handler, URI streamUri, ConnectionErrorHandler errorHandler,
302+
Headers headers) {
303+
304+
StreamProcessorTest.this.eventHandler = handler;
305+
StreamProcessorTest.this.actualStreamUri = streamUri;
306+
StreamProcessorTest.this.errorHandler = errorHandler;
307+
StreamProcessorTest.this.headers = headers;
308+
return mockEventSource;
309+
}
310+
};
311+
}
312+
313+
private String featureJson(String key, int version) {
314+
return "{\"key\":\"" + key + "\",\"version\":" + version + ",\"on\":true}";
315+
}
316+
317+
private String segmentJson(String key, int version) {
318+
return "{\"key\":\"" + key + "\",\"version\":" + version + ",\"includes\":[],\"excludes\":[],\"rules\":[]}";
319+
}
320+
321+
private MessageEvent emptyPutEvent() {
322+
return new MessageEvent("{\"data\":{\"flags\":{},\"segments\":{}}}");
323+
}
324+
325+
private void setupRequestorToReturnAllDataWithFlag(FeatureFlag feature) throws Exception {
326+
FeatureRequestor.AllData data = new FeatureRequestor.AllData(
327+
Collections.singletonMap(feature.getKey(), feature), Collections.<String, Segment>emptyMap());
328+
expect(mockRequestor.getAllData()).andReturn(data);
329+
}
330+
331+
private void assertFeatureInStore(FeatureFlag feature) {
332+
assertEquals(feature.getVersion(), featureStore.get(FEATURES, feature.getKey()).getVersion());
333+
}
334+
335+
private void assertSegmentInStore(Segment segment) {
336+
assertEquals(segment.getVersion(), featureStore.get(SEGMENTS, segment.getKey()).getVersion());
337+
}
338+
}

0 commit comments

Comments
 (0)