Skip to content

Commit bdf86b3

Browse files
committed
make sure the delivery of the message has no race condition
1 parent 8c639de commit bdf86b3

File tree

2 files changed

+53
-35
lines changed

2 files changed

+53
-35
lines changed

src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -133,36 +133,23 @@ public synchronized Future<Void> unregister() {
133133
}
134134

135135
protected boolean doReceive(Message<T> message) {
136-
Handler<Message<T>> theHandler;
137136
synchronized (this) {
138137
if (handler == null) {
139138
return false;
140139
}
141-
if (demand == 0L) {
142-
if (pending.size() < maxBufferedMessages) {
143-
pending.add(message);
144-
return true;
145-
} else {
146-
discard(message);
147-
if (discardHandler != null) {
148-
discardHandler.handle(message);
149-
} else {
150-
log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer. address: " + address);
151-
}
152-
}
140+
if (pending.size() < maxBufferedMessages) {
141+
pending.add(message);
142+
checkNextTick();
153143
return true;
154144
} else {
155-
if (pending.size() > 0) {
156-
pending.add(message);
157-
message = pending.poll();
158-
}
159-
if (demand != Long.MAX_VALUE) {
160-
demand--;
145+
discard(message);
146+
if (discardHandler != null) {
147+
discardHandler.handle(message);
148+
} else {
149+
log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer. address: " + address);
161150
}
162-
theHandler = handler;
163151
}
164152
}
165-
deliver(theHandler, message);
166153
return true;
167154
}
168155

@@ -171,7 +158,7 @@ protected void dispatch(Message<T> msg, ContextInternal context, Handler<Message
171158
if (handler == null) {
172159
throw new NullPointerException();
173160
}
174-
context.emit(msg, handler);
161+
context.dispatch(msg, handler);
175162
}
176163

177164
private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
@@ -183,8 +170,8 @@ private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
183170

184171
private synchronized void checkNextTick() {
185172
// Check if there are more pending messages in the queue that can be processed next time around
186-
if (!pending.isEmpty() && demand > 0L) {
187-
context.nettyEventLoop().execute(() -> {
173+
if (demand > 0L && !pending.isEmpty()) {
174+
context.executor().execute(() -> {
188175
Message<T> message;
189176
Handler<Message<T>> theHandler;
190177
synchronized (MessageConsumerImpl.this) {

src/test/java/io/vertx/core/eventbus/MessageConsumerTest.java

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,41 @@
77
import java.util.HashMap;
88
import java.util.Map;
99
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.atomic.AtomicBoolean;
1011

1112
public class MessageConsumerTest extends VertxTestBase {
1213

1314

1415
@Test
15-
public void testMessageConsumptionStayOnWorkerThreadAfterResume() throws Exception {
16-
TestVerticle verticle = new TestVerticle(2);
16+
public void testMessageConsumptionStayOnWorkerThreadAfterResumeAndOnlyDispatchOneMessageAtOneMoment() throws Exception {
17+
int numberOfExpectedMessages = 10;
18+
TestVerticle verticle = new TestVerticle(numberOfExpectedMessages);
19+
EchoVerticle echoVerticle = new EchoVerticle();
1720
Future<String> deployVerticle = vertx.deployVerticle(verticle, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER));
21+
Future<String> deployEchoVerticle = vertx.deployVerticle(echoVerticle, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER));
1822

1923
CountDownLatch startLatch = new CountDownLatch(1);
20-
deployVerticle.onComplete(onSuccess(cf -> startLatch.countDown()));
24+
Future.all(deployVerticle, deployEchoVerticle)
25+
.onComplete(onSuccess(cf -> startLatch.countDown()));
2126
awaitLatch(startLatch);
2227

23-
vertx.eventBus().send("testAddress", "message1");
24-
vertx.eventBus().send("testAddress", "message2");
28+
for (int i = 1; i <= numberOfExpectedMessages; i++) {
29+
vertx.eventBus().send("testAddress", "message" + i);
30+
}
2531

2632
awaitLatch(verticle.msgLatch);
2733

28-
assertEquals(2, verticle.messageArrivedOnWorkerThread.size());
29-
assertTrue("message1 should be processed on worker thread", verticle.messageArrivedOnWorkerThread.get("message1"));
30-
assertTrue("message2 should be processed on worker thread", verticle.messageArrivedOnWorkerThread.get("message2"));
34+
assertEquals(numberOfExpectedMessages, verticle.messageArrivedOnWorkerThread.size());
35+
for (int i = 1; i <= numberOfExpectedMessages; i++) {
36+
assertTrue("message" + i + " should be processed on worker thread", verticle.messageArrivedOnWorkerThread.get("message" + i));
37+
}
3138
}
3239

3340

3441
private static class TestVerticle extends AbstractVerticle {
3542

3643
private final CountDownLatch msgLatch;
44+
private final AtomicBoolean messageProcessingOngoing = new AtomicBoolean();
3745

3846
private final Map<String, Boolean> messageArrivedOnWorkerThread = new HashMap<>();
3947

@@ -51,11 +59,34 @@ private void handleMessages(MessageConsumer<String> consumer) {
5159
consumer.handler(msg -> {
5260
consumer.pause();
5361
messageArrivedOnWorkerThread.putIfAbsent(msg.body(), Context.isOnWorkerThread());
54-
msgLatch.countDown();
55-
vertx.setTimer(20, id -> {
56-
consumer.resume();
62+
if (messageProcessingOngoing.compareAndSet(false, true)) {
63+
msgLatch.countDown();
64+
} else {
65+
System.err.println("Received message while processing another message");
66+
}
67+
vertx.eventBus().request("echoAddress", 20)
68+
.onComplete(ar -> {
69+
messageProcessingOngoing.set(false);
70+
consumer.resume();
71+
});
72+
});
73+
}
74+
}
75+
76+
private static class EchoVerticle extends AbstractVerticle {
77+
@Override
78+
public void start() {
79+
MessageConsumer<Integer> consumer = vertx.eventBus().localConsumer("echoAddress");
80+
handleMessages(consumer);
81+
}
82+
83+
private void handleMessages(MessageConsumer<Integer> consumer) {
84+
consumer.handler(msg -> {
85+
vertx.setTimer(msg.body(), id -> {
86+
msg.reply(msg.body());
5787
});
5888
});
5989
}
6090
}
91+
6192
}

0 commit comments

Comments
 (0)