diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java index 590f9dd2a9b..7e70f6c4d70 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java @@ -138,28 +138,26 @@ protected boolean doReceive(Message message) { if (handler == null) { return false; } - if (demand == 0L) { - if (pending.size() < maxBufferedMessages) { - pending.add(message); + if (pending.size() < maxBufferedMessages) { + pending.add(message); + if (demand == 0) { return true; } else { - discard(message); - if (discardHandler != null) { - discardHandler.handle(message); - } else { - log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer. address: " + address); + if (demand != Long.MAX_VALUE) { + demand--; } - } - return true; - } else { - if (pending.size() > 0) { - pending.add(message); message = pending.poll(); + theHandler = handler; } - if (demand != Long.MAX_VALUE) { - demand--; + } else { + discard(message); + if (discardHandler != null) { + discardHandler.handle(message); + } else { + String pause = demand == 0 ? "paused" : "NOT paused"; + log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in " + pause + " consumer. address: " + address); } - theHandler = handler; + return false; } } deliver(theHandler, message); @@ -171,7 +169,7 @@ protected void dispatch(Message msg, ContextInternal context, Handler> theHandler, Message message) { @@ -183,8 +181,8 @@ private void deliver(Handler> theHandler, Message message) { private synchronized void checkNextTick() { // Check if there are more pending messages in the queue that can be processed next time around - if (!pending.isEmpty() && demand > 0L) { - context.nettyEventLoop().execute(() -> { + if (demand > 0L && !pending.isEmpty()) { + context.execute(__ -> { Message message; Handler> theHandler; synchronized (MessageConsumerImpl.this) { diff --git a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java index 407d4a67c91..6c21457ca99 100644 --- a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java +++ b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java @@ -1401,7 +1401,6 @@ public void testUnregisterConsumerDiscardPendingMessages() { eb.send(ADDRESS1, "val1"); Context ctx = Vertx.currentContext(); ctx.runOnContext(v -> { - consumer.resume(); ((MessageConsumerImpl) consumer).discardHandler(discarded -> { assertEquals("val1", discarded.body()); testComplete(); diff --git a/src/test/java/io/vertx/core/eventbus/MessageConsumerTest.java b/src/test/java/io/vertx/core/eventbus/MessageConsumerTest.java index 11ed44919fb..a64a7b049ae 100644 --- a/src/test/java/io/vertx/core/eventbus/MessageConsumerTest.java +++ b/src/test/java/io/vertx/core/eventbus/MessageConsumerTest.java @@ -7,33 +7,41 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; public class MessageConsumerTest extends VertxTestBase { @Test - public void testMessageConsumptionStayOnWorkerThreadAfterResume() throws Exception { - TestVerticle verticle = new TestVerticle(2); + public void testMessageConsumptionStayOnWorkerThreadAfterResumeAndOnlyDispatchOneMessageAtOneMoment() throws Exception { + int numberOfExpectedMessages = 10; + TestVerticle verticle = new TestVerticle(numberOfExpectedMessages); + EchoVerticle echoVerticle = new EchoVerticle(); Future deployVerticle = vertx.deployVerticle(verticle, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)); + Future deployEchoVerticle = vertx.deployVerticle(echoVerticle, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)); CountDownLatch startLatch = new CountDownLatch(1); - deployVerticle.onComplete(onSuccess(cf -> startLatch.countDown())); + Future.all(deployVerticle, deployEchoVerticle) + .onComplete(onSuccess(cf -> startLatch.countDown())); awaitLatch(startLatch); - vertx.eventBus().send("testAddress", "message1"); - vertx.eventBus().send("testAddress", "message2"); + for (int i = 1; i <= numberOfExpectedMessages; i++) { + vertx.eventBus().send("testAddress", "message" + i); + } awaitLatch(verticle.msgLatch); - assertEquals(2, verticle.messageArrivedOnWorkerThread.size()); - assertTrue("message1 should be processed on worker thread", verticle.messageArrivedOnWorkerThread.get("message1")); - assertTrue("message2 should be processed on worker thread", verticle.messageArrivedOnWorkerThread.get("message2")); + assertEquals(numberOfExpectedMessages, verticle.messageArrivedOnWorkerThread.size()); + for (int i = 1; i <= numberOfExpectedMessages; i++) { + assertTrue("message" + i + " should be processed on worker thread", verticle.messageArrivedOnWorkerThread.get("message" + i)); + } } private static class TestVerticle extends AbstractVerticle { private final CountDownLatch msgLatch; + private final AtomicBoolean messageProcessingOngoing = new AtomicBoolean(); private final Map messageArrivedOnWorkerThread = new HashMap<>(); @@ -51,11 +59,34 @@ private void handleMessages(MessageConsumer consumer) { consumer.handler(msg -> { consumer.pause(); messageArrivedOnWorkerThread.putIfAbsent(msg.body(), Context.isOnWorkerThread()); - msgLatch.countDown(); - vertx.setTimer(20, id -> { - consumer.resume(); + if (messageProcessingOngoing.compareAndSet(false, true)) { + msgLatch.countDown(); + } else { + System.err.println("Received message while processing another message"); + } + vertx.eventBus().request("echoAddress", 20) + .onComplete(ar -> { + messageProcessingOngoing.set(false); + consumer.resume(); + }); + }); + } + } + + private static class EchoVerticle extends AbstractVerticle { + @Override + public void start() { + MessageConsumer consumer = vertx.eventBus().localConsumer("echoAddress"); + handleMessages(consumer); + } + + private void handleMessages(MessageConsumer consumer) { + consumer.handler(msg -> { + vertx.setTimer(msg.body(), id -> { + msg.reply(msg.body()); }); }); } } + }