Skip to content
This repository was archived by the owner on May 30, 2024. It is now read-only.

Commit 6ae30f3

Browse files
authored
Merge pull request #52 from launchdarkly/eb/fix-stream-put
fix stream put handling - and add unit tests
2 parents f61acd6 + 28ca2af commit 6ae30f3

File tree

2 files changed

+363
-7
lines changed

2 files changed

+363
-7
lines changed

src/main/java/com/launchdarkly/client/StreamProcessor.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

14+
import com.google.common.annotations.VisibleForTesting;
1415
import com.google.common.util.concurrent.SettableFuture;
1516
import com.google.gson.Gson;
1617
import com.google.gson.JsonElement;
@@ -83,8 +84,8 @@ public void onMessage(String name, MessageEvent event) throws Exception {
8384
Gson gson = new Gson();
8485
switch (name) {
8586
case PUT: {
86-
FeatureRequestor.AllData allData = gson.fromJson(event.getData(), FeatureRequestor.AllData.class);
87-
store.init(FeatureRequestor.toVersionedDataMap(allData));
87+
PutData putData = gson.fromJson(event.getData(), PutData.class);
88+
store.init(FeatureRequestor.toVersionedDataMap(putData.data));
8889
if (!initialized.getAndSet(true)) {
8990
initFuture.set(null);
9091
logger.info("Initialized LaunchDarkly client.");
@@ -161,8 +162,19 @@ public void onError(Throwable throwable) {
161162
}
162163
};
163164

165+
es = createEventSource(handler,
166+
URI.create(config.streamURI.toASCIIString() + "/all"),
167+
connectionErrorHandler,
168+
headers);
169+
es.start();
170+
return initFuture;
171+
}
172+
173+
@VisibleForTesting
174+
protected EventSource createEventSource(EventHandler handler, URI streamUri, ConnectionErrorHandler errorHandler,
175+
Headers headers) {
164176
EventSource.Builder builder = new EventSource.Builder(handler, URI.create(config.streamURI.toASCIIString() + "/all"))
165-
.connectionErrorHandler(connectionErrorHandler)
177+
.connectionErrorHandler(errorHandler)
166178
.headers(headers)
167179
.reconnectTimeMs(config.reconnectTimeMs)
168180
.connectTimeoutMs(config.connectTimeoutMillis)
@@ -179,11 +191,9 @@ public void onError(Throwable throwable) {
179191
}
180192
}
181193

182-
es = builder.build();
183-
es.start();
184-
return initFuture;
194+
return builder.build();
185195
}
186-
196+
187197
@Override
188198
public void close() throws IOException {
189199
logger.info("Closing LaunchDarkly StreamProcessor");
@@ -200,6 +210,14 @@ public boolean initialized() {
200210
return initialized.get();
201211
}
202212

213+
private static final class PutData {
214+
FeatureRequestor.AllData data;
215+
216+
public PutData() {
217+
218+
}
219+
}
220+
203221
private static final class PatchData {
204222
String path;
205223
JsonElement data;

0 commit comments

Comments
 (0)