Skip to content

Commit 5a024f8

Browse files
committed
Fix Observables queuing & request race condition
JAVA-2977
1 parent 8e31820 commit 5a024f8

8 files changed

+61
-33
lines changed

driver-async/src/main/com/mongodb/async/client/AbstractSubscription.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,12 @@ void addToQueue(@Nullable final TResult result) {
109109
if (result != null) {
110110
resultsQueue.add(result);
111111
}
112-
tryProcessResultsQueue();
113112
}
114113

115114
void addToQueue(@Nullable final List<TResult> results) {
116115
if (results != null) {
117116
resultsQueue.addAll(results);
118117
}
119-
tryProcessResultsQueue();
120118
}
121119

122120
void onError(final Throwable t) {
@@ -156,7 +154,7 @@ private void tryRequestInitialData() {
156154
}
157155
}
158156

159-
private void tryProcessResultsQueue() {
157+
void tryProcessResultsQueue() {
160158
try {
161159
processResultsQueue();
162160
} catch (Throwable t) {

driver-async/src/main/com/mongodb/async/client/FlatteningSingleResultCallbackSubscription.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@ public void onResult(final List<TResult> result, final Throwable t) {
4444
if (t != null) {
4545
onError(t);
4646
} else {
47+
addToQueue(result);
4748
synchronized (FlatteningSingleResultCallbackSubscription.this) {
4849
completed = true;
4950
}
50-
addToQueue(result);
51+
tryProcessResultsQueue();
5152
}
5253
}
5354
});

driver-async/src/main/com/mongodb/async/client/MongoIterableSubscription.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,18 +85,20 @@ void requestMoreData() {
8585
batchCursor.next(new SingleResultCallback<List<TResult>>() {
8686
@Override
8787
public void onResult(final List<TResult> result, final Throwable t) {
88-
8988
synchronized (MongoIterableSubscription.this) {
9089
isReading = false;
91-
if (t == null && result == null) {
92-
completed = true;
93-
}
9490
}
9591

9692
if (t != null) {
9793
onError(t);
9894
} else {
9995
addToQueue(result);
96+
synchronized (MongoIterableSubscription.this) {
97+
if (result == null) {
98+
completed = true;
99+
}
100+
}
101+
tryProcessResultsQueue();
100102
}
101103
}
102104
});

driver-async/src/main/com/mongodb/async/client/SingleResultCallbackSubscription.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ public void onResult(final TResult result, final Throwable t) {
4141
if (t != null) {
4242
onError(t);
4343
} else {
44+
addToQueue(result);
4445
synchronized (SingleResultCallbackSubscription.this) {
4546
completed = true;
4647
}
47-
addToQueue(result);
48+
tryProcessResultsQueue();
4849
}
4950
}
5051
});

driver-async/src/test/unit/com/mongodb/async/client/FlatteningSingleResultCallbackSubscriptionSpecification.groovy

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import com.mongodb.MongoException
2121
import com.mongodb.async.SingleResultCallback
2222
import spock.lang.Specification
2323

24+
import java.util.concurrent.Executors
25+
import java.util.concurrent.TimeUnit
26+
2427
import static com.mongodb.async.client.Observables.observeAndFlatten
2528

2629
class FlatteningSingleResultCallbackSubscriptionSpecification extends Specification {
@@ -45,7 +48,8 @@ class FlatteningSingleResultCallbackSubscriptionSpecification extends Specificat
4548

4649
def 'should call onComplete after all data has been consumed'() {
4750
given:
48-
SingleResultCallback<List> listSingleResultCallback
51+
SingleResultCallback<List> listSingleResultCallback = null
52+
def executor = Executors.newFixedThreadPool(5)
4953
def observer = new TestObserver()
5054
observeAndFlatten(new Block<SingleResultCallback<List>>() {
5155
@Override
@@ -64,12 +68,17 @@ class FlatteningSingleResultCallbackSubscriptionSpecification extends Specificat
6468
observer.assertNoTerminalEvent()
6569

6670
when:
67-
listSingleResultCallback.onResult([1, 2, 3, 4], null)
71+
100.times { executor.submit { observer.requestMore(1) } }
72+
listSingleResultCallback?.onResult([1, 2, 3, 4], null)
6873

6974
then:
7075
observer.assertNoErrors()
7176
observer.assertReceivedOnNext([1, 2, 3, 4])
7277
observer.assertTerminalEvent()
78+
79+
cleanup:
80+
executor?.shutdown()
81+
executor?.awaitTermination(10, TimeUnit.SECONDS)
7382
}
7483

7584
def 'should throw an error if request is less than 1'() {

driver-async/src/test/unit/com/mongodb/async/client/MongoIterableSubscriptionSpecification.groovy

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import com.mongodb.MongoException
2020
import com.mongodb.async.AsyncBatchCursor
2121
import spock.lang.Specification
2222

23+
import java.util.concurrent.Executors
24+
import java.util.concurrent.TimeUnit
25+
2326
import static com.mongodb.async.client.Observables.observe
2427

2528
class MongoIterableSubscriptionSpecification extends Specification {
@@ -72,16 +75,22 @@ class MongoIterableSubscriptionSpecification extends Specification {
7275
def 'should call onComplete after cursor has completed and all onNext values requested'() {
7376
given:
7477
def mongoIterable = getMongoIterable()
78+
def executor = Executors.newFixedThreadPool(5)
7579
def observer = new TestObserver()
7680
observe(mongoIterable).subscribe(observer)
7781

7882
when:
83+
100.times { executor.submit { observer.requestMore(1) } }
7984
observer.requestMore(10)
8085

8186
then:
8287
observer.assertNoErrors()
8388
observer.assertReceivedOnNext([1, 2, 3, 4])
8489
observer.assertTerminalEvent()
90+
91+
cleanup:
92+
executor?.shutdown()
93+
executor?.awaitTermination(10, TimeUnit.SECONDS)
8594
}
8695

8796
def 'should call onError if batchCursor returns an throwable in the callback'() {

driver-async/src/test/unit/com/mongodb/async/client/SingleResultCallbackSubscriptionSpecification.groovy

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import com.mongodb.MongoException
2121
import com.mongodb.async.SingleResultCallback
2222
import spock.lang.Specification
2323

24+
import java.util.concurrent.Executors
25+
import java.util.concurrent.TimeUnit
26+
2427
import static com.mongodb.async.client.Observables.observe
2528

2629
class SingleResultCallbackSubscriptionSpecification extends Specification {
@@ -45,7 +48,8 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
4548

4649
def 'should call onComplete after all data has been consumed'() {
4750
given:
48-
SingleResultCallback<Integer> singleResultCallback
51+
SingleResultCallback<Integer> singleResultCallback = null
52+
def executor = Executors.newFixedThreadPool(5)
4953
def observer = new TestObserver()
5054
observe(new Block<SingleResultCallback<Integer>>() {
5155
@Override
@@ -56,20 +60,24 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
5660

5761
when:
5862
observer.requestMore(5)
59-
observer.requestMore(5)
6063

6164
then:
65+
observer.assertNoTerminalEvent()
6266
observer.assertNoErrors()
6367
observer.assertReceivedOnNext([])
64-
observer.assertNoTerminalEvent()
6568

6669
when:
67-
singleResultCallback.onResult(1, null)
70+
100.times { executor.submit { observer.requestMore(1) } }
71+
singleResultCallback?.onResult(1, null)
6872

6973
then:
7074
observer.assertNoErrors()
71-
observer.assertReceivedOnNext([1])
7275
observer.assertTerminalEvent()
76+
observer.assertReceivedOnNext([1])
77+
78+
cleanup:
79+
executor?.shutdown()
80+
executor?.awaitTermination(10, TimeUnit.SECONDS)
7381
}
7482

7583
def 'should throw an error if request is less than 1'() {
@@ -91,7 +99,7 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
9199
observe(new Block<SingleResultCallback<Integer>>() {
92100
@Override
93101
void apply(final SingleResultCallback<Integer> callback) {
94-
callback.onResult(null, new MongoException('failed'));
102+
callback.onResult(null, new MongoException('failed'))
95103
}
96104
}).subscribe(observer)
97105

@@ -267,7 +275,7 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
267275
observe(new Block<SingleResultCallback<Integer>>() {
268276
@Override
269277
void apply(final SingleResultCallback<Integer> callback) {
270-
throw new MongoException('failed');
278+
throw new MongoException('failed')
271279
}
272280
}).subscribe(observer)
273281

@@ -288,7 +296,7 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
288296

289297
@Override
290298
void apply(final SingleResultCallback<Integer> callback) {
291-
callback.onResult(results, null);
299+
callback.onResult(results, null)
292300
}
293301
}
294302
}

driver-async/src/test/unit/com/mongodb/async/client/TestObserver.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void onError(final Throwable e) {
6060
}
6161

6262
@Override
63-
public void onSubscribe(final Subscription subscription) {
63+
public synchronized void onSubscribe(final Subscription subscription) {
6464
this.subscription = subscription;
6565
delegate.onSubscribe(subscription);
6666
}
@@ -77,7 +77,7 @@ public void onSubscribe(final Subscription subscription) {
7777
* @param result the item emitted by the obserable
7878
*/
7979
@Override
80-
public void onNext(final T result) {
80+
public synchronized void onNext(final T result) {
8181
onNextEvents.add(result);
8282
delegate.onNext(result);
8383
}
@@ -92,7 +92,7 @@ public void onNext(final T result) {
9292
* @param e the exception encountered by the obserable
9393
*/
9494
@Override
95-
public void onError(final Throwable e) {
95+
public synchronized void onError(final Throwable e) {
9696
try {
9797
onErrorEvents.add(e);
9898
delegate.onError(e);
@@ -108,7 +108,7 @@ public void onError(final Throwable e) {
108108
* </p>
109109
*/
110110
@Override
111-
public void onComplete() {
111+
public synchronized void onComplete() {
112112
try {
113113
onCompleteEvents.add(null);
114114
delegate.onComplete();
@@ -123,7 +123,7 @@ public void onComplete() {
123123
* @param n the maximum number of items you want the obserable to emit to the Subscriber at this time, or
124124
* {@code Long.MAX_VALUE} if you want the obserable to emit items at its own pace
125125
*/
126-
public void requestMore(final long n) {
126+
public synchronized void requestMore(final long n) {
127127
subscription.request(n);
128128
}
129129

@@ -133,7 +133,7 @@ public void requestMore(final long n) {
133133
*
134134
* @return a list of the Throwables that were passed to this Subscriber's {@link #onError} method
135135
*/
136-
public List<Throwable> getOnErrorEvents() {
136+
public synchronized List<Throwable> getOnErrorEvents() {
137137
return onErrorEvents;
138138
}
139139

@@ -142,7 +142,7 @@ public List<Throwable> getOnErrorEvents() {
142142
*
143143
* @return a list of items observed by this Subscriber, in the order in which they were observed
144144
*/
145-
public List<T> getOnNextEvents() {
145+
public synchronized List<T> getOnNextEvents() {
146146
return onNextEvents;
147147
}
148148

@@ -151,7 +151,7 @@ public List<T> getOnNextEvents() {
151151
*
152152
* @return the subscription or null if not subscribed to
153153
*/
154-
public Subscription getSubscription() {
154+
public synchronized Subscription getSubscription() {
155155
return subscription;
156156
}
157157

@@ -161,7 +161,7 @@ public Subscription getSubscription() {
161161
* @param items the sequence of items expected to have been observed
162162
* @throws AssertionError if the sequence of items observed does not exactly match {@code items}
163163
*/
164-
public void assertReceivedOnNext(final List<T> items) {
164+
public synchronized void assertReceivedOnNext(final List<T> items) {
165165
if (getOnNextEvents().size() != items.size()) {
166166
throw new AssertionError("Number of items does not match. Provided: " + items.size() + " Actual: " + getOnNextEvents().size());
167167
}
@@ -186,7 +186,7 @@ public void assertReceivedOnNext(final List<T> items) {
186186
*
187187
* @throws AssertionError if not exactly one terminal event notification was received
188188
*/
189-
public void assertTerminalEvent() {
189+
public synchronized void assertTerminalEvent() {
190190
if (onErrorEvents.size() > 1) {
191191
throw new AssertionError("Too many onError events: " + onErrorEvents.size());
192192
}
@@ -209,7 +209,7 @@ public void assertTerminalEvent() {
209209
*
210210
* @throws AssertionError if a terminal event notification was received
211211
*/
212-
public void assertNoTerminalEvent() {
212+
public synchronized void assertNoTerminalEvent() {
213213
if (onCompleteEvents.size() > 0 || onErrorEvents.size() > 0) {
214214
throw new AssertionError("Terminal events received.");
215215
}
@@ -220,7 +220,7 @@ public void assertNoTerminalEvent() {
220220
*
221221
* @throws AssertionError if this {@link Subscription} is not unsubscribed
222222
*/
223-
public void assertUnsubscribed() {
223+
public synchronized void assertUnsubscribed() {
224224
if (subscription == null || !subscription.isUnsubscribed()) {
225225
throw new AssertionError("Not unsubscribed.");
226226
}
@@ -242,7 +242,7 @@ public void assertSubscribed() {
242242
*
243243
* @throws AssertionError if this {@link Observer} has received one or more {@link #onError} notifications
244244
*/
245-
public void assertNoErrors() {
245+
public synchronized void assertNoErrors() {
246246
if (onErrorEvents.size() > 0) {
247247
// can't use AssertionError because (message, cause) doesn't exist until Java 7
248248
throw new RuntimeException("Unexpected onError events: " + getOnErrorEvents().size(), getOnErrorEvents().get(0));
@@ -254,7 +254,7 @@ public void assertNoErrors() {
254254
*
255255
* @throws AssertionError if this {@link Observer} did not received an {@link #onError} notifications
256256
*/
257-
public void assertErrored() {
257+
public synchronized void assertErrored() {
258258
if (onErrorEvents.size() == 0) {
259259
// can't use AssertionError because (message, cause) doesn't exist until Java 7
260260
throw new RuntimeException("No onError events");

0 commit comments

Comments
 (0)