11package com .launchdarkly .client ;
22
3+ import com .google .common .util .concurrent .ThreadFactoryBuilder ;
34import com .google .gson .Gson ;
45import com .launchdarkly .eventsource .EventHandler ;
56import com .launchdarkly .eventsource .EventSource ;
1213
1314import java .io .IOException ;
1415import java .net .URI ;
15- import java .util .concurrent .Executors ;
16- import java .util .concurrent .Future ;
17- import java .util .concurrent .ScheduledExecutorService ;
18- import java .util .concurrent .TimeUnit ;
16+ import java .util .concurrent .*;
1917import java .util .concurrent .atomic .AtomicBoolean ;
2018
2119class StreamProcessor implements UpdateProcessor {
@@ -32,7 +30,7 @@ class StreamProcessor implements UpdateProcessor {
3230 private final String sdkKey ;
3331 private final FeatureRequestor requestor ;
3432 private final ScheduledExecutorService heartbeatDetectorService ;
35- private DateTime lastHeartbeat ;
33+ private volatile DateTime lastHeartbeat ;
3634 private volatile EventSource es ;
3735 private AtomicBoolean initialized = new AtomicBoolean (false );
3836
@@ -42,7 +40,10 @@ class StreamProcessor implements UpdateProcessor {
4240 this .config = config ;
4341 this .sdkKey = sdkKey ;
4442 this .requestor = requestor ;
45- this .heartbeatDetectorService = Executors .newScheduledThreadPool (1 );
43+ ThreadFactory threadFactory = new ThreadFactoryBuilder ()
44+ .setNameFormat ("LaunchDarkly-HeartbeatDetector-%d" )
45+ .build ();
46+ this .heartbeatDetectorService = Executors .newSingleThreadScheduledExecutor (threadFactory );
4647 heartbeatDetectorService .scheduleAtFixedRate (new HeartbeatDetector (), 1 , 1 , TimeUnit .MINUTES );
4748 }
4849
@@ -200,8 +201,10 @@ private final class HeartbeatDetector implements Runnable {
200201
201202 @ Override
202203 public void run () {
203- DateTime fiveMinutesAgo = DateTime .now ().minusSeconds (DEAD_CONNECTION_INTERVAL_SECONDS );
204- if (lastHeartbeat .isBefore (fiveMinutesAgo ) && es .getState () == ReadyState .OPEN ) {
204+ DateTime reconnectThresholdTime = DateTime .now ().minusSeconds (DEAD_CONNECTION_INTERVAL_SECONDS );
205+ // We only want to force the reconnect if the ES connection is open. If not, it's already trying to
206+ // connect anyway, or this processor was shut down
207+ if (lastHeartbeat .isBefore (reconnectThresholdTime ) && es .getState () == ReadyState .OPEN ) {
205208 try {
206209 logger .info ("Stream stopped receiving heartbeats- reconnecting." );
207210 es .close ();
0 commit comments