-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Fix concurrency bug in NamespacedHierarchicalStore.computeIfAbsent
#5209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
87d7037
4207b3a
267e938
32faf28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,8 @@ | |
| import java.util.Optional; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ConcurrentMap; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.FutureTask; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.function.Function; | ||
| import java.util.function.Supplier; | ||
|
|
@@ -241,24 +243,78 @@ public void close() { | |
| public <K, V> Object computeIfAbsent(N namespace, K key, Function<? super K, ? extends V> defaultCreator) { | ||
| Preconditions.notNull(defaultCreator, "defaultCreator must not be null"); | ||
| CompositeKey<N> compositeKey = new CompositeKey<>(namespace, key); | ||
| StoredValue storedValue = getStoredValue(compositeKey); | ||
| var result = StoredValue.evaluateIfNotNull(storedValue); | ||
| if (result == null) { | ||
| StoredValue newStoredValue = this.storedValues.compute(compositeKey, (__, oldStoredValue) -> { | ||
| if (StoredValue.evaluateIfNotNull(oldStoredValue) == null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In your analysis you said:
But looking at the existing implementation, the
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right that the previous implementation checked The previous implementation ran The fix moves |
||
| rejectIfClosed(); | ||
| var computedValue = Preconditions.notNull(defaultCreator.apply(key), | ||
| "defaultCreator must not return null"); | ||
| return newStoredValue(() -> { | ||
| rejectIfClosed(); | ||
| return computedValue; | ||
| }); | ||
|
|
||
| // CAS-retry loop: retry if another thread concurrently modifies the entry | ||
| for (;;) { | ||
| StoredValue localStoredValue = this.storedValues.get(compositeKey); | ||
| if (localStoredValue != null) { | ||
| Object localValue = evaluateForComputeIfAbsent(compositeKey, localStoredValue); | ||
| if (localValue != null) { | ||
| return localValue; | ||
| } | ||
|
|
||
| Object computed = computeAndInstall(compositeKey, localStoredValue, key, defaultCreator); | ||
| if (computed != null) { | ||
| return computed; | ||
| } | ||
| continue; | ||
| } | ||
|
|
||
| // No local mapping: consult parent first. | ||
| if (this.parentStore != null) { | ||
| StoredValue parentStoredValue = this.parentStore.getStoredValue(compositeKey); | ||
| Object parentValue = StoredValue.evaluateIfNotNull(parentStoredValue); | ||
| if (parentValue != null) { | ||
| return parentValue; | ||
| } | ||
| return oldStoredValue; | ||
| }); | ||
| return requireNonNull(newStoredValue.evaluate()); | ||
| } | ||
|
|
||
| Object computed = computeAndInstall(compositeKey, null, key, defaultCreator); | ||
| if (computed != null) { | ||
| return computed; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private @Nullable Object evaluateForComputeIfAbsent(CompositeKey<N> compositeKey, StoredValue storedValue) { | ||
| Supplier<@Nullable Object> supplier = storedValue.supplier(); | ||
| if (supplier instanceof DeferredSupplier deferred) { | ||
| deferred.run(); | ||
| try { | ||
| return deferred.getOrThrow(); | ||
| } | ||
| catch (Throwable t) { | ||
| this.storedValues.remove(compositeKey, storedValue); | ||
| throw t; | ||
| } | ||
| } | ||
| return storedValue.evaluate(); | ||
| } | ||
|
|
||
| private <K, V> @Nullable Object computeAndInstall(CompositeKey<N> compositeKey, @Nullable StoredValue expectedOld, | ||
| K key, Function<? super K, ? extends V> defaultCreator) { | ||
|
|
||
| var deferred = new DeferredSupplier(() -> { | ||
| rejectIfClosed(); | ||
| return Preconditions.notNull(defaultCreator.apply(key), "defaultCreator must not return null"); | ||
| }); | ||
| StoredValue newStoredValue = newStoredValue(deferred); | ||
|
|
||
| boolean installed = (expectedOld == null ? this.storedValues.putIfAbsent(compositeKey, newStoredValue) == null | ||
| : this.storedValues.replace(compositeKey, expectedOld, newStoredValue)); | ||
|
|
||
| if (!installed) { | ||
| return null; | ||
| } | ||
|
|
||
| deferred.run(); | ||
| try { | ||
| return requireNonNull(deferred.getOrThrow()); | ||
| } | ||
| catch (Throwable t) { | ||
| this.storedValues.remove(compositeKey, newStoredValue); | ||
| throw t; | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -460,6 +516,66 @@ private void close(CloseAction<N> closeAction) throws Throwable { | |
|
|
||
| } | ||
|
|
||
| /** | ||
| * Deferred computation that can be installed into the store without executing | ||
| * user code while holding internal map locks. | ||
| * | ||
| * <p>For {@link #get(Object, Object)}, failures are treated as logically absent | ||
| * (returning {@code null}) so exceptions are not observable via {@code get()}. | ||
| * | ||
| * <p>For {@link #computeIfAbsent(Object, Object, Function)}, | ||
| * {@link #getOrThrow()} rethrows the original failure. | ||
| */ | ||
| private static final class DeferredSupplier implements Supplier<@Nullable Object> { | ||
|
|
||
| private final FutureTask<@Nullable Object> task; | ||
|
|
||
| private DeferredSupplier(Supplier<@Nullable Object> delegate) { | ||
| this.task = new FutureTask<>(delegate::get); | ||
| } | ||
|
|
||
| private void run() { | ||
| this.task.run(); | ||
| } | ||
|
|
||
| @Override | ||
| public @Nullable Object get() { | ||
| try { | ||
| return this.task.get(); | ||
| } | ||
| catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw throwAsUncheckedException(e); | ||
| } | ||
| catch (ExecutionException e) { | ||
| Throwable t = e.getCause(); | ||
| if (t == null) { | ||
| t = e; | ||
| } | ||
| UnrecoverableExceptions.rethrowIfUnrecoverable(t); | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| private @Nullable Object getOrThrow() { | ||
| try { | ||
| return this.task.get(); | ||
| } | ||
| catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw throwAsUncheckedException(e); | ||
| } | ||
| catch (ExecutionException e) { | ||
| Throwable t = e.getCause(); | ||
| if (t == null) { | ||
| t = e; | ||
| } | ||
| UnrecoverableExceptions.rethrowIfUnrecoverable(t); | ||
| throw throwAsUncheckedException(t); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Thread-safe {@link Supplier} that memoizes the result of calling its | ||
| * delegate and ensures it is called at most once. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.