Skip to content

Commit 0c7b5d6

Browse files
jbertrambrusdev
authored andcommitted
ARTEMIS-5597 initial distribution broken when sending to FQQN
Ensure the broker considers remote queue bindings as well when routing messages to an FQQN. Otherwise consumers might starve.
1 parent e49cda5 commit 0c7b5d6

File tree

4 files changed

+103
-15
lines changed
  • artemis-commons/src/main/java/org/apache/activemq/artemis/utils
  • artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl
  • tests

4 files changed

+103
-15
lines changed

artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.activemq.artemis.utils;
1818

19+
import org.apache.activemq.artemis.api.core.SimpleString;
20+
1921
/**
2022
* UUID represents Universally Unique Identifiers (aka Global UID in Windows world). UUIDs are usually generated via
2123
* UUIDGenerator (or in case of 'Null UUID', 16 zero bytes, via static method getNullUUID()), or received from external
@@ -63,6 +65,8 @@ public final class UUID {
6365

6466
public static final char HYPHEN = '-';
6567

68+
public static final int UUID_STRING_LENGTH = 36;
69+
6670
// 'Standard' namespaces defined (suggested) by UUID specs:
6771
public static final String NAMESPACE_DNS = "6ba7b810-9dad-11d1-80b4-00c04fd430c8";
6872

@@ -174,7 +178,7 @@ public String toString() {
174178
*/
175179

176180
if (mDesc == null) {
177-
StringBuilder b = new StringBuilder(36);
181+
StringBuilder b = new StringBuilder(UUID_STRING_LENGTH);
178182

179183
for (int i = 0; i < 16; ++i) {
180184
// Need to bypass hyphens:
@@ -262,7 +266,7 @@ public boolean equals(final Object obj) {
262266
* @return {@code true} if the string is a valid UUID format; {@code false} otherwise
263267
*/
264268
public static boolean isUUID(CharSequence str) {
265-
if (str == null || str.length() != 36) {
269+
if (str == null || str.length() != UUID_STRING_LENGTH) {
266270
return false;
267271
}
268272

@@ -285,4 +289,21 @@ public static boolean isUUID(CharSequence str) {
285289
private static boolean isHex(char c) {
286290
return (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f');
287291
}
292+
293+
/**
294+
* Removes a trailing UUID from the given input, if present. The method checks whether the last characters of the
295+
* input adhere to a valid UUID format and removes them if true.
296+
*
297+
* @param input the input {@code SimpleString} which may contain a trailing UUID to be stripped
298+
* @return a {@code SimpleString} with the trailing UUID removed if present; otherwise, returns the original input
299+
* unchanged
300+
*/
301+
public static SimpleString stripTrailingUUID(SimpleString input) {
302+
int length = input.length();
303+
if (length >= UUID_STRING_LENGTH && UUID.isUUID(input.subSeq(length - UUID_STRING_LENGTH, length))) {
304+
return input.subSeq(0, length - UUID_STRING_LENGTH);
305+
} else {
306+
return input;
307+
}
308+
}
288309
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.activemq.artemis.core.server.group.impl.Response;
5050
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
5151
import org.apache.activemq.artemis.utils.CompositeAddress;
52+
import org.apache.activemq.artemis.utils.UUID;
5253
import org.slf4j.Logger;
5354
import org.slf4j.LoggerFactory;
5455
import java.lang.invoke.MethodHandles;
@@ -343,12 +344,6 @@ private void route(final Message message,
343344
} else if (groupRouting && groupingHandler != null && (groupId = message.getGroupID()) != null) {
344345
context.clear().setReusable(false);
345346
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
346-
} else if (CompositeAddress.isFullyQualified(message.getAddress())) {
347-
context.clear().setReusable(false);
348-
final Binding theBinding = bindingsNameMap.get(String.valueOf(CompositeAddress.extractQueueName(message.getAddressSimpleString())));
349-
if (theBinding != null && (theBinding.getFilter() == null || theBinding.getFilter().match(message))) {
350-
theBinding.route(message, context);
351-
}
352347
} else {
353348
// in a optimization, we are reusing the previous context if everything is right for it
354349
// so the simpleRouting will only happen if needed
@@ -494,8 +489,11 @@ private static boolean matchBinding(final Message message,
494489
return false;
495490
}
496491

497-
final Filter filter = binding.getFilter();
492+
if (CompositeAddress.isFullyQualified(message.getAddress()) && binding instanceof QueueBinding && !UUID.stripTrailingUUID(binding.getClusterName()).equals(CompositeAddress.extractQueueName(message.getAddressSimpleString()))) {
493+
return false;
494+
}
498495

496+
final Filter filter = binding.getFilter();
499497
if (filter == null || filter.match(message)) {
500498
return true;
501499
}

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,6 @@
1616
*/
1717
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
1818

19-
import static org.junit.jupiter.api.Assertions.assertEquals;
20-
import static org.junit.jupiter.api.Assertions.assertFalse;
21-
import static org.junit.jupiter.api.Assertions.assertNotNull;
22-
import static org.junit.jupiter.api.Assertions.assertTrue;
23-
2419
import java.lang.invoke.MethodHandles;
2520
import java.util.ArrayList;
2621
import java.util.Arrays;
@@ -37,8 +32,8 @@
3732
import org.apache.activemq.artemis.api.core.client.ClientProducer;
3833
import org.apache.activemq.artemis.api.core.client.ClientSession;
3934
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
40-
import org.apache.activemq.artemis.core.config.DivertConfiguration;
4135
import org.apache.activemq.artemis.api.core.management.QueueControl;
36+
import org.apache.activemq.artemis.core.config.DivertConfiguration;
4237
import org.apache.activemq.artemis.core.postoffice.Binding;
4338
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
4439
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -49,11 +44,17 @@
4944
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
5045
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
5146
import org.apache.activemq.artemis.tests.util.Wait;
47+
import org.apache.activemq.artemis.utils.CompositeAddress;
5248
import org.junit.jupiter.api.Disabled;
5349
import org.junit.jupiter.api.Test;
5450
import org.slf4j.Logger;
5551
import org.slf4j.LoggerFactory;
5652

53+
import static org.junit.jupiter.api.Assertions.assertEquals;
54+
import static org.junit.jupiter.api.Assertions.assertFalse;
55+
import static org.junit.jupiter.api.Assertions.assertNotNull;
56+
import static org.junit.jupiter.api.Assertions.assertTrue;
57+
5758
public class SimpleSymmetricClusterTest extends ClusterTestBase {
5859

5960
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -370,6 +371,64 @@ public void testSimple_TwoNodes() throws Exception {
370371

371372
}
372373

374+
@Test
375+
public void testInitialDistributionSendToFQQNConsumeFromFQQN() throws Exception {
376+
testInitialDistribution(true, true);
377+
}
378+
379+
@Test
380+
public void testInitialDistributionSendToFQQNConsumeFromQueue() throws Exception {
381+
testInitialDistribution(true, false);
382+
}
383+
384+
@Test
385+
public void testInitialDistributionSendToAddressConsumeFromFQQN() throws Exception {
386+
testInitialDistribution(false, true);
387+
}
388+
389+
@Test
390+
public void testInitialDistributionSendToAddressConsumeFromQueue() throws Exception {
391+
testInitialDistribution(false, false);
392+
}
393+
394+
private void testInitialDistribution(boolean sendToFQQN, boolean consumeFromFQQN) throws Exception {
395+
CountDownLatch countDownLatch = new CountDownLatch(1);
396+
final String address = "myAddress";
397+
final String queue = "myQueue";
398+
final String fqqn = CompositeAddress.toFullyQualified(address, queue);
399+
400+
setupServer(0, false, isNetty());
401+
setupServer(1, false, isNetty());
402+
403+
setupClusterConnection("cluster0", address, MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
404+
setupClusterConnection("cluster1", address, MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
405+
406+
startServers(0, 1);
407+
408+
setupSessionFactory(0, isNetty());
409+
setupSessionFactory(1, isNetty());
410+
411+
createQueue(0, address, queue, null, false);
412+
createQueue(1, address, queue, null, false);
413+
414+
waitForBindings(0, address, 1, 0, true);
415+
waitForBindings(0, address, 1, 0, false);
416+
waitForBindings(1, address, 1, 0, true);
417+
waitForBindings(1, address, 1, 0, false);
418+
419+
addConsumer(0, 0, consumeFromFQQN ? fqqn : queue, null);
420+
consumers[0].consumer.setMessageHandler((m) -> countDownLatch.countDown());
421+
422+
waitForBindings(0, address, 1, 1, true);
423+
waitForBindings(1, address, 1, 1, false);
424+
425+
send(1, sendToFQQN ? fqqn : address, 1, true, null);
426+
427+
assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
428+
429+
closeAllConsumers();
430+
}
431+
373432
@Test
374433
public void testSimpleSnFManagement() throws Exception {
375434
final String address = "queues.testaddress";

tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.apache.activemq.artemis.api.core.SimpleString;
2727
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
28+
import org.apache.activemq.artemis.utils.RandomUtil;
2829
import org.apache.activemq.artemis.utils.UUID;
2930
import org.apache.activemq.artemis.utils.UUIDGenerator;
3031
import org.junit.jupiter.api.Test;
@@ -101,4 +102,13 @@ public void testIsUUID() {
101102
// null
102103
assertFalse(UUID.isUUID(null));
103104
}
105+
106+
@Test
107+
public void testStringTrailingUUID() {
108+
SimpleString uuid = RandomUtil.randomUUIDSimpleString();
109+
assertEquals(uuid, UUID.stripTrailingUUID(uuid.concat(RandomUtil.randomUUIDSimpleString())));
110+
111+
SimpleString foo = SimpleString.of("foo");
112+
assertEquals(foo, UUID.stripTrailingUUID(foo));
113+
}
104114
}

0 commit comments

Comments
 (0)