44import com .launchdarkly .eventsource .EventHandler ;
55import com .launchdarkly .eventsource .EventSource ;
66import com .launchdarkly .eventsource .MessageEvent ;
7+ import com .launchdarkly .eventsource .ReadyState ;
78import okhttp3 .Headers ;
9+ import org .joda .time .DateTime ;
810import org .slf4j .Logger ;
911import org .slf4j .LoggerFactory ;
1012
1113import java .io .IOException ;
1214import java .net .URI ;
15+ import java .util .concurrent .Executors ;
1316import java .util .concurrent .Future ;
17+ import java .util .concurrent .ScheduledExecutorService ;
18+ import java .util .concurrent .TimeUnit ;
1419import java .util .concurrent .atomic .AtomicBoolean ;
1520
1621class StreamProcessor implements UpdateProcessor {
@@ -20,12 +25,15 @@ class StreamProcessor implements UpdateProcessor {
2025 private static final String INDIRECT_PUT = "indirect/put" ;
2126 private static final String INDIRECT_PATCH = "indirect/patch" ;
2227 private static final Logger logger = LoggerFactory .getLogger (StreamProcessor .class );
28+ private static final int DEAD_CONNECTION_INTERVAL_SECONDS = 300 ;
2329
2430 private final FeatureStore store ;
2531 private final LDConfig config ;
2632 private final String sdkKey ;
2733 private final FeatureRequestor requestor ;
28- private EventSource es ;
34+ private final ScheduledExecutorService heartbeatDetectorService ;
35+ private DateTime lastHeartbeat ;
36+ private volatile EventSource es ;
2937 private AtomicBoolean initialized = new AtomicBoolean (false );
3038
3139
@@ -34,6 +42,8 @@ class StreamProcessor implements UpdateProcessor {
3442 this .config = config ;
3543 this .sdkKey = sdkKey ;
3644 this .requestor = requestor ;
45+ this .heartbeatDetectorService = Executors .newScheduledThreadPool (1 );
46+ heartbeatDetectorService .scheduleAtFixedRate (new HeartbeatDetector (), 1 , 1 , TimeUnit .MINUTES );
3747 }
3848
3949 @ Override
@@ -50,7 +60,7 @@ public Future<Void> start() {
5060
5161 @ Override
5262 public void onOpen () throws Exception {
53-
63+ lastHeartbeat = DateTime . now ();
5464 }
5565
5666 @ Override
@@ -100,6 +110,12 @@ public void onMessage(String name, MessageEvent event) throws Exception {
100110 }
101111 }
102112
113+ @ Override
114+ public void onComment (String comment ) {
115+ logger .debug ("Received a heartbeat" );
116+ lastHeartbeat = DateTime .now ();
117+ }
118+
103119 @ Override
104120 public void onError (Throwable throwable ) {
105121 logger .error ("Encountered EventSource error: " + throwable .getMessage ());
@@ -125,6 +141,14 @@ public void close() throws IOException {
125141 if (store != null ) {
126142 store .close ();
127143 }
144+ if (heartbeatDetectorService != null ) {
145+ heartbeatDetectorService .shutdownNow ();
146+ try {
147+ heartbeatDetectorService .awaitTermination (100 , TimeUnit .MILLISECONDS );
148+ } catch (InterruptedException e ) {
149+ logger .error ("Encountered an exception terminating heartbeat detector: " + e .getMessage ());
150+ }
151+ }
128152 }
129153
130154 @ Override
@@ -171,4 +195,21 @@ int version() {
171195 }
172196
173197 }
198+
199+ private final class HeartbeatDetector implements Runnable {
200+
201+ @ Override
202+ public void run () {
203+ DateTime fiveMinutesAgo = DateTime .now ().minusSeconds (DEAD_CONNECTION_INTERVAL_SECONDS );
204+ if (lastHeartbeat .isBefore (fiveMinutesAgo ) && es .getState () == ReadyState .OPEN ) {
205+ try {
206+ logger .info ("Stream stopped receiving heartbeats- reconnecting." );
207+ es .close ();
208+ start ();
209+ } catch (IOException e ) {
210+ logger .warn ("Encountered exception closing stream connection: " + e .getMessage ());
211+ }
212+ }
213+ }
214+ }
174215}
0 commit comments