11package com .launchdarkly .client ;
22
3- import static com .launchdarkly .client .VersionedDataKind .FEATURES ;
4-
5- import java .io .IOException ;
6- import java .net .URI ;
7- import java .util .HashMap ;
8- import java .util .Map ;
9- import java .util .concurrent .ExecutorService ;
10- import java .util .concurrent .Executors ;
11- import java .util .concurrent .ThreadFactory ;
12- import java .util .concurrent .TimeUnit ;
13-
14- import org .slf4j .Logger ;
15- import org .slf4j .LoggerFactory ;
16-
3+ import com .google .common .annotations .VisibleForTesting ;
174import com .google .common .base .Optional ;
185import com .google .common .cache .CacheBuilder ;
196import com .google .common .cache .CacheLoader ;
2411import com .google .common .util .concurrent .ThreadFactoryBuilder ;
2512import com .google .gson .Gson ;
2613
14+ import org .slf4j .Logger ;
15+ import org .slf4j .LoggerFactory ;
16+
17+ import java .io .IOException ;
18+ import java .util .HashMap ;
19+ import java .util .List ;
20+ import java .util .Map ;
21+ import java .util .concurrent .ExecutorService ;
22+ import java .util .concurrent .Executors ;
23+ import java .util .concurrent .ThreadFactory ;
24+ import java .util .concurrent .TimeUnit ;
25+
26+ import static com .launchdarkly .client .VersionedDataKind .FEATURES ;
27+
2728import redis .clients .jedis .Jedis ;
2829import redis .clients .jedis .JedisPool ;
2930import redis .clients .jedis .JedisPoolConfig ;
@@ -38,12 +39,15 @@ public class RedisFeatureStore implements FeatureStore {
3839 private static final String DEFAULT_PREFIX = "launchdarkly" ;
3940 private static final String INIT_KEY = "$initialized$" ;
4041 private static final String CACHE_REFRESH_THREAD_POOL_NAME_FORMAT = "RedisFeatureStore-cache-refresher-pool-%d" ;
42+ private static final Gson gson = new Gson ();
43+
4144 private final JedisPool pool ;
4245 private LoadingCache <CacheKey , Optional <VersionedData >> cache ;
4346 private final LoadingCache <String , Boolean > initCache = createInitCache ();
4447 private String prefix ;
4548 private ListeningExecutorService executorService ;
46-
49+ private UpdateListener updateListener ;
50+
4751 private static class CacheKey {
4852 final VersionedDataKind <?> kind ;
4953 final String key ;
@@ -102,10 +106,6 @@ private void setPrefix(String prefix) {
102106 }
103107 }
104108
105- private void createCache (long cacheTimeSecs ) {
106- createCache (cacheTimeSecs , false , false );
107- }
108-
109109 private void createCache (long cacheTimeSecs , boolean refreshStaleValues , boolean asyncRefresh ) {
110110 if (cacheTimeSecs > 0 ) {
111111 if (refreshStaleValues ) {
@@ -120,7 +120,9 @@ private CacheLoader<CacheKey, Optional<VersionedData>> createDefaultCacheLoader(
120120 return new CacheLoader <CacheKey , Optional <VersionedData >>() {
121121 @ Override
122122 public Optional <VersionedData > load (CacheKey key ) throws Exception {
123- return Optional .<VersionedData >fromNullable (getRedis (key .kind , key .key ));
123+ try (Jedis jedis = pool .getResource ()) {
124+ return Optional .<VersionedData >fromNullable (getRedisEvenIfDeleted (key .kind , key .key , jedis ));
125+ }
124126 }
125127 };
126128 }
@@ -169,7 +171,13 @@ public <T extends VersionedData> T get(VersionedDataKind<T> kind, String key) {
169171 if (cache != null ) {
170172 item = (T ) cache .getUnchecked (new CacheKey (kind , key )).orNull ();
171173 } else {
172- item = getRedis (kind , key );
174+ try (Jedis jedis = pool .getResource ()) {
175+ item = getRedisEvenIfDeleted (kind , key , jedis );
176+ }
177+ }
178+ if (item != null && item .isDeleted ()) {
179+ logger .debug ("[get] Key: {} has been deleted in \" {}\" . Returning null" , key , kind .getNamespace ());
180+ return null ;
173181 }
174182 if (item != null ) {
175183 logger .debug ("[get] Key: {} with version: {} found in \" {}\" ." , key , item .getVersion (), kind .getNamespace ());
@@ -182,7 +190,6 @@ public <T extends VersionedData> Map<String, T> all(VersionedDataKind<T> kind) {
182190 try (Jedis jedis = pool .getResource ()) {
183191 Map <String , String > allJson = jedis .hgetAll (itemsKey (kind ));
184192 Map <String , T > result = new HashMap <>();
185- Gson gson = new Gson ();
186193
187194 for (Map .Entry <String , String > entry : allJson .entrySet ()) {
188195 T item = gson .fromJson (entry .getValue (), kind .getItemClass ());
@@ -197,7 +204,6 @@ public <T extends VersionedData> Map<String, T> all(VersionedDataKind<T> kind) {
197204 @ Override
198205 public void init (Map <VersionedDataKind <?>, Map <String , ? extends VersionedData >> allData ) {
199206 try (Jedis jedis = pool .getResource ()) {
200- Gson gson = new Gson ();
201207 Transaction t = jedis .multi ();
202208
203209 for (Map .Entry <VersionedDataKind <?>, Map <String , ? extends VersionedData >> entry : allData .entrySet ()) {
@@ -216,63 +222,54 @@ public void init(Map<VersionedDataKind<?>, Map<String, ? extends VersionedData>>
216222
217223 @ Override
218224 public <T extends VersionedData > void delete (VersionedDataKind <T > kind , String key , int version ) {
219- Jedis jedis = null ;
220- try {
221- Gson gson = new Gson ();
222- jedis = pool .getResource ();
223- String baseKey = itemsKey (kind );
224- jedis .watch (baseKey );
225-
226- VersionedData item = getRedis (kind , key , jedis );
227-
228- if (item != null && item .getVersion () >= version ) {
229- logger .warn ("Attempted to delete key: {} version: {}" +
230- " with a version that is the same or older: {} in \" {}\" " ,
231- key , item .getVersion (), version , kind .getNamespace ());
232- return ;
233- }
234-
235- VersionedData deletedItem = kind .makeDeletedItem (key , version );
236- jedis .hset (baseKey , key , gson .toJson (deletedItem ));
237-
238- if (cache != null ) {
239- cache .invalidate (new CacheKey (kind , key ));
240- }
241- } finally {
242- if (jedis != null ) {
243- jedis .unwatch ();
244- jedis .close ();
245- }
246- }
225+ T deletedItem = kind .makeDeletedItem (key , version );
226+ updateItemWithVersioning (kind , deletedItem );
247227 }
248-
228+
249229 @ Override
250230 public <T extends VersionedData > void upsert (VersionedDataKind <T > kind , T item ) {
251- Jedis jedis = null ;
252- try {
253- jedis = pool .getResource ();
254- Gson gson = new Gson ();
255- String baseKey = itemsKey (kind );
256- jedis .watch (baseKey );
257-
258- VersionedData old = getRedisEvenIfDeleted (kind , item .getKey (), jedis );
259-
260- if (old != null && old .getVersion () >= item .getVersion ()) {
261- logger .warn ("Attempted to update key: {} version: {}" +
262- " with a version that is the same or older: {} in \" {}\" " ,
263- item .getKey (), old .getVersion (), item .getVersion (), kind .getNamespace ());
264- return ;
265- }
266-
267- jedis .hset (baseKey , item .getKey (), gson .toJson (item ));
231+ updateItemWithVersioning (kind , item );
232+ }
268233
269- if (cache != null ) {
270- cache .invalidate (new CacheKey (kind , item .getKey ()));
271- }
272- } finally {
273- if (jedis != null ) {
274- jedis .unwatch ();
275- jedis .close ();
234+ private <T extends VersionedData > void updateItemWithVersioning (VersionedDataKind <T > kind , T newItem ) {
235+ while (true ) {
236+ Jedis jedis = null ;
237+ try {
238+ jedis = pool .getResource ();
239+ String baseKey = itemsKey (kind );
240+ jedis .watch (baseKey );
241+
242+ if (updateListener != null ) {
243+ updateListener .aboutToUpdate (baseKey , newItem .getKey ());
244+ }
245+
246+ VersionedData oldItem = getRedisEvenIfDeleted (kind , newItem .getKey (), jedis );
247+
248+ if (oldItem != null && oldItem .getVersion () >= newItem .getVersion ()) {
249+ logger .warn ("Attempted to {} key: {} version: {}" +
250+ " with a version that is the same or older: {} in \" {}\" " ,
251+ newItem .isDeleted () ? "delete" : "update" ,
252+ newItem .getKey (), oldItem .getVersion (), newItem .getVersion (), kind .getNamespace ());
253+ return ;
254+ }
255+
256+ Transaction tx = jedis .multi ();
257+ tx .hset (baseKey , newItem .getKey (), gson .toJson (newItem ));
258+ List <Object > result = tx .exec ();
259+ if (result .isEmpty ()) {
260+ // if exec failed, it means the watch was triggered and we should retry
261+ logger .debug ("Concurrent modification detected, retrying" );
262+ continue ;
263+ }
264+
265+ if (cache != null ) {
266+ cache .invalidate (new CacheKey (kind , newItem .getKey ()));
267+ }
268+ } finally {
269+ if (jedis != null ) {
270+ jedis .unwatch ();
271+ jedis .close ();
272+ }
276273 }
277274 }
278275 }
@@ -323,23 +320,7 @@ private Boolean getInit() {
323320 }
324321 }
325322
326- private <T extends VersionedData > T getRedis (VersionedDataKind <T > kind , String key ) {
327- try (Jedis jedis = pool .getResource ()) {
328- return getRedis (kind , key , jedis );
329- }
330- }
331-
332- private <T extends VersionedData > T getRedis (VersionedDataKind <T > kind , String key , Jedis jedis ) {
333- T item = getRedisEvenIfDeleted (kind , key , jedis );
334- if (item != null && item .isDeleted ()) {
335- logger .debug ("[get] Key: {} has been deleted in \" {}\" . Returning null" , key , kind .getNamespace ());
336- return null ;
337- }
338- return item ;
339- }
340-
341323 private <T extends VersionedData > T getRedisEvenIfDeleted (VersionedDataKind <T > kind , String key , Jedis jedis ) {
342- Gson gson = new Gson ();
343324 String json = jedis .hget (itemsKey (kind ), key );
344325
345326 if (json == null ) {
@@ -354,4 +335,12 @@ private static JedisPoolConfig getPoolConfig() {
354335 return new JedisPoolConfig ();
355336 }
356337
338+ static interface UpdateListener {
339+ void aboutToUpdate (String baseKey , String itemKey );
340+ }
341+
342+ @ VisibleForTesting
343+ void setUpdateListener (UpdateListener updateListener ) {
344+ this .updateListener = updateListener ;
345+ }
357346}
0 commit comments