7
7
import java .util .HashMap ;
8
8
import java .util .Map ;
9
9
import java .util .concurrent .CountDownLatch ;
10
+ import java .util .concurrent .atomic .AtomicBoolean ;
10
11
11
12
public class MessageConsumerTest extends VertxTestBase {
12
13
13
14
14
15
@ Test
15
- public void testMessageConsumptionStayOnWorkerThreadAfterResume () throws Exception {
16
- TestVerticle verticle = new TestVerticle (2 );
16
+ public void testMessageConsumptionStayOnWorkerThreadAfterResumeAndOnlyDispatchOneMessageAtOneMoment () throws Exception {
17
+ int numberOfExpectedMessages = 10 ;
18
+ TestVerticle verticle = new TestVerticle (numberOfExpectedMessages );
19
+ EchoVerticle echoVerticle = new EchoVerticle ();
17
20
Future <String > deployVerticle = vertx .deployVerticle (verticle , new DeploymentOptions ().setThreadingModel (ThreadingModel .WORKER ));
21
+ Future <String > deployEchoVerticle = vertx .deployVerticle (echoVerticle , new DeploymentOptions ().setThreadingModel (ThreadingModel .WORKER ));
18
22
19
23
CountDownLatch startLatch = new CountDownLatch (1 );
20
- deployVerticle .onComplete (onSuccess (cf -> startLatch .countDown ()));
24
+ Future .all (deployVerticle , deployEchoVerticle )
25
+ .onComplete (onSuccess (cf -> startLatch .countDown ()));
21
26
awaitLatch (startLatch );
22
27
23
- vertx .eventBus ().send ("testAddress" , "message1" );
24
- vertx .eventBus ().send ("testAddress" , "message2" );
28
+ for (int i = 1 ; i <= numberOfExpectedMessages ; i ++) {
29
+ vertx .eventBus ().send ("testAddress" , "message" + i );
30
+ }
25
31
26
32
awaitLatch (verticle .msgLatch );
27
33
28
- assertEquals (2 , verticle .messageArrivedOnWorkerThread .size ());
29
- assertTrue ("message1 should be processed on worker thread" , verticle .messageArrivedOnWorkerThread .get ("message1" ));
30
- assertTrue ("message2 should be processed on worker thread" , verticle .messageArrivedOnWorkerThread .get ("message2" ));
34
+ assertEquals (numberOfExpectedMessages , verticle .messageArrivedOnWorkerThread .size ());
35
+ for (int i = 1 ; i <= numberOfExpectedMessages ; i ++) {
36
+ assertTrue ("message" + i + " should be processed on worker thread" , verticle .messageArrivedOnWorkerThread .get ("message" + i ));
37
+ }
31
38
}
32
39
33
40
34
41
private static class TestVerticle extends AbstractVerticle {
35
42
36
43
private final CountDownLatch msgLatch ;
44
+ private final AtomicBoolean messageProcessingOngoing = new AtomicBoolean ();
37
45
38
46
private final Map <String , Boolean > messageArrivedOnWorkerThread = new HashMap <>();
39
47
@@ -51,11 +59,34 @@ private void handleMessages(MessageConsumer<String> consumer) {
51
59
consumer .handler (msg -> {
52
60
consumer .pause ();
53
61
messageArrivedOnWorkerThread .putIfAbsent (msg .body (), Context .isOnWorkerThread ());
54
- msgLatch .countDown ();
55
- vertx .setTimer (20 , id -> {
56
- consumer .resume ();
62
+ if (messageProcessingOngoing .compareAndSet (false , true )) {
63
+ msgLatch .countDown ();
64
+ } else {
65
+ System .err .println ("Received message while processing another message" );
66
+ }
67
+ vertx .eventBus ().request ("echoAddress" , 20 )
68
+ .onComplete (ar -> {
69
+ messageProcessingOngoing .set (false );
70
+ consumer .resume ();
71
+ });
72
+ });
73
+ }
74
+ }
75
+
76
+ private static class EchoVerticle extends AbstractVerticle {
77
+ @ Override
78
+ public void start () {
79
+ MessageConsumer <Integer > consumer = vertx .eventBus ().localConsumer ("echoAddress" );
80
+ handleMessages (consumer );
81
+ }
82
+
83
+ private void handleMessages (MessageConsumer <Integer > consumer ) {
84
+ consumer .handler (msg -> {
85
+ vertx .setTimer (msg .body (), id -> {
86
+ msg .reply (msg .body ());
57
87
});
58
88
});
59
89
}
60
90
}
91
+
61
92
}
0 commit comments