From e93ab841e781b788e557c74283a3bcd059b8fc3b Mon Sep 17 00:00:00 2001 From: Laszlo Hadadi Date: Fri, 14 Mar 2025 08:54:06 +0100 Subject: [PATCH 1/3] make sure the delivery of the message has no race condition --- .../eventbus/impl/MessageConsumerImpl.java | 36 +++++-------- .../core/eventbus/MessageConsumerTest.java | 53 +++++++++++++++---- 2 files changed, 54 insertions(+), 35 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java index 590f9dd2a9b..82ab31a9b9f 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java @@ -133,36 +133,24 @@ public synchronized Future unregister() { } protected boolean doReceive(Message message) { - Handler> theHandler; synchronized (this) { if (handler == null) { return false; } - if (demand == 0L) { - if (pending.size() < maxBufferedMessages) { - pending.add(message); - return true; - } else { - discard(message); - if (discardHandler != null) { - discardHandler.handle(message); - } else { - log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer. address: " + address); - } - } + if (pending.size() < maxBufferedMessages) { + pending.add(message); + checkNextTick(); return true; } else { - if (pending.size() > 0) { - pending.add(message); - message = pending.poll(); - } - if (demand != Long.MAX_VALUE) { - demand--; + discard(message); + if (discardHandler != null) { + discardHandler.handle(message); + } else { + String pause = demand == 0 ? "paused" : "NOT paused"; + log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in " + pause + " consumer. address: " + address); } - theHandler = handler; } } - deliver(theHandler, message); return true; } @@ -171,7 +159,7 @@ protected void dispatch(Message msg, ContextInternal context, Handler> theHandler, Message message) { @@ -183,8 +171,8 @@ private void deliver(Handler> theHandler, Message message) { private synchronized void checkNextTick() { // Check if there are more pending messages in the queue that can be processed next time around - if (!pending.isEmpty() && demand > 0L) { - context.nettyEventLoop().execute(() -> { + if (demand > 0L && !pending.isEmpty()) { + context.emit(__ -> { Message message; Handler> theHandler; synchronized (MessageConsumerImpl.this) { diff --git a/src/test/java/io/vertx/core/eventbus/MessageConsumerTest.java b/src/test/java/io/vertx/core/eventbus/MessageConsumerTest.java index 11ed44919fb..a64a7b049ae 100644 --- a/src/test/java/io/vertx/core/eventbus/MessageConsumerTest.java +++ b/src/test/java/io/vertx/core/eventbus/MessageConsumerTest.java @@ -7,33 +7,41 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; public class MessageConsumerTest extends VertxTestBase { @Test - public void testMessageConsumptionStayOnWorkerThreadAfterResume() throws Exception { - TestVerticle verticle = new TestVerticle(2); + public void testMessageConsumptionStayOnWorkerThreadAfterResumeAndOnlyDispatchOneMessageAtOneMoment() throws Exception { + int numberOfExpectedMessages = 10; + TestVerticle verticle = new TestVerticle(numberOfExpectedMessages); + EchoVerticle echoVerticle = new EchoVerticle(); Future deployVerticle = vertx.deployVerticle(verticle, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)); + Future deployEchoVerticle = vertx.deployVerticle(echoVerticle, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)); CountDownLatch startLatch = new CountDownLatch(1); - deployVerticle.onComplete(onSuccess(cf -> startLatch.countDown())); + Future.all(deployVerticle, deployEchoVerticle) + .onComplete(onSuccess(cf -> startLatch.countDown())); awaitLatch(startLatch); - vertx.eventBus().send("testAddress", "message1"); - vertx.eventBus().send("testAddress", "message2"); + for (int i = 1; i <= numberOfExpectedMessages; i++) { + vertx.eventBus().send("testAddress", "message" + i); + } awaitLatch(verticle.msgLatch); - assertEquals(2, verticle.messageArrivedOnWorkerThread.size()); - assertTrue("message1 should be processed on worker thread", verticle.messageArrivedOnWorkerThread.get("message1")); - assertTrue("message2 should be processed on worker thread", verticle.messageArrivedOnWorkerThread.get("message2")); + assertEquals(numberOfExpectedMessages, verticle.messageArrivedOnWorkerThread.size()); + for (int i = 1; i <= numberOfExpectedMessages; i++) { + assertTrue("message" + i + " should be processed on worker thread", verticle.messageArrivedOnWorkerThread.get("message" + i)); + } } private static class TestVerticle extends AbstractVerticle { private final CountDownLatch msgLatch; + private final AtomicBoolean messageProcessingOngoing = new AtomicBoolean(); private final Map messageArrivedOnWorkerThread = new HashMap<>(); @@ -51,11 +59,34 @@ private void handleMessages(MessageConsumer consumer) { consumer.handler(msg -> { consumer.pause(); messageArrivedOnWorkerThread.putIfAbsent(msg.body(), Context.isOnWorkerThread()); - msgLatch.countDown(); - vertx.setTimer(20, id -> { - consumer.resume(); + if (messageProcessingOngoing.compareAndSet(false, true)) { + msgLatch.countDown(); + } else { + System.err.println("Received message while processing another message"); + } + vertx.eventBus().request("echoAddress", 20) + .onComplete(ar -> { + messageProcessingOngoing.set(false); + consumer.resume(); + }); + }); + } + } + + private static class EchoVerticle extends AbstractVerticle { + @Override + public void start() { + MessageConsumer consumer = vertx.eventBus().localConsumer("echoAddress"); + handleMessages(consumer); + } + + private void handleMessages(MessageConsumer consumer) { + consumer.handler(msg -> { + vertx.setTimer(msg.body(), id -> { + msg.reply(msg.body()); }); }); } } + } From b408a6d94cbaff56dce0d2e8f59b3fc3bbc04dac Mon Sep 17 00:00:00 2001 From: Laszlo Hadadi Date: Fri, 14 Mar 2025 11:30:30 +0100 Subject: [PATCH 2/3] removing the resume as that is basically wiping out pending msgs during the test --- src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java index 407d4a67c91..6c21457ca99 100644 --- a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java +++ b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java @@ -1401,7 +1401,6 @@ public void testUnregisterConsumerDiscardPendingMessages() { eb.send(ADDRESS1, "val1"); Context ctx = Vertx.currentContext(); ctx.runOnContext(v -> { - consumer.resume(); ((MessageConsumerImpl) consumer).discardHandler(discarded -> { assertEquals("val1", discarded.body()); testComplete(); From ebaab7680850b7d0b138651cb1bdf98fa7e07d84 Mon Sep 17 00:00:00 2001 From: Laszlo Hadadi Date: Mon, 17 Mar 2025 11:04:20 +0100 Subject: [PATCH 3/3] avoid StackOverflow and do return with false when msg discarded so metricks could be updatedy --- .../core/eventbus/impl/MessageConsumerImpl.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java index 82ab31a9b9f..7e70f6c4d70 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java @@ -133,14 +133,22 @@ public synchronized Future unregister() { } protected boolean doReceive(Message message) { + Handler> theHandler; synchronized (this) { if (handler == null) { return false; } if (pending.size() < maxBufferedMessages) { pending.add(message); - checkNextTick(); - return true; + if (demand == 0) { + return true; + } else { + if (demand != Long.MAX_VALUE) { + demand--; + } + message = pending.poll(); + theHandler = handler; + } } else { discard(message); if (discardHandler != null) { @@ -149,8 +157,10 @@ protected boolean doReceive(Message message) { String pause = demand == 0 ? "paused" : "NOT paused"; log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in " + pause + " consumer. address: " + address); } + return false; } } + deliver(theHandler, message); return true; } @@ -172,7 +182,7 @@ private void deliver(Handler> theHandler, Message message) { private synchronized void checkNextTick() { // Check if there are more pending messages in the queue that can be processed next time around if (demand > 0L && !pending.isEmpty()) { - context.emit(__ -> { + context.execute(__ -> { Message message; Handler> theHandler; synchronized (MessageConsumerImpl.this) {