Skip to content

Commit db04425

Browse files
ARTEMIS-5601 auto created queues in AMQP are not waiting a sync
The protocol should recover the OperationContext before the auto create.
1 parent 3ebc633 commit db04425

File tree

4 files changed

+127
-128
lines changed

4 files changed

+127
-128
lines changed

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.netty.channel.EventLoop;
4040
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
4141
import org.apache.activemq.artemis.api.core.SimpleString;
42+
import org.apache.activemq.artemis.core.persistence.OperationContext;
4243
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
4344
import org.apache.activemq.artemis.core.security.CheckType;
4445
import org.apache.activemq.artemis.core.security.SecurityAuth;
@@ -360,7 +361,6 @@ public void close(ErrorCondition errorCondition) {
360361
public AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException {
361362
AMQPSessionContext sessionExtension = sessions.get(realSession);
362363
if (sessionExtension == null) {
363-
// how this is possible? Log a warn here
364364
sessionExtension = newSessionExtension(realSession);
365365
realSession.setContext(sessionExtension);
366366
sessions.put(realSession, sessionExtension);
@@ -410,43 +410,52 @@ public AMQPConnectionCallback getConnectionCallback() {
410410
protected void remoteLinkOpened(Link link) throws Exception {
411411
final AMQPSessionContext protonSession = getSessionExtension(link.getSession());
412412

413-
final Runnable runnable = link.attachments().get(AMQP_LINK_INITIALIZER_KEY, Runnable.class);
414-
if (runnable != null) {
415-
link.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, null);
416-
runnable.run();
417-
return;
418-
}
413+
// The operation context is needed in case there are auto-created destinations.
414+
// We need to ensure the binding storage operation is complete before sending the remoteLinkOpened response.
415+
OperationContext oldContext = protonSession.sessionSPI.recoverContext();
419416

420-
if (link.getLocalState() == EndpointState.ACTIVE) { // if already active it's probably from the AMQP bridge and hence we just ignore it
421-
return;
422-
}
417+
try {
423418

424-
link.setSource(link.getRemoteSource());
425-
link.setTarget(link.getRemoteTarget());
419+
final Runnable runnable = link.attachments().get(AMQP_LINK_INITIALIZER_KEY, Runnable.class);
420+
if (runnable != null) {
421+
link.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, null);
422+
runnable.run();
423+
return;
424+
}
426425

427-
if (link instanceof Receiver receiver) {
428-
if (link.getRemoteTarget() instanceof Coordinator coordinator) {
429-
protonSession.addTransactionHandler(coordinator, receiver);
430-
} else if (isReplicaTarget(receiver)) {
431-
handleReplicaTargetLinkOpened(protonSession, receiver);
432-
} else if (isFederationControlLink(receiver)) {
433-
handleFederationControlLinkOpened(protonSession, receiver);
434-
} else if (isFederationEventLink(receiver)) {
435-
protonSession.addFederationEventProcessor(receiver);
436-
} else {
437-
protonSession.addReceiver(receiver);
426+
if (link.getLocalState() == EndpointState.ACTIVE) { // if already active it's probably from the AMQP bridge and hence we just ignore it
427+
return;
438428
}
439-
} else {
440-
final Sender sender = (Sender) link;
441-
if (isFederationAddressReceiver(sender)) {
442-
protonSession.addFederationAddressSender(sender);
443-
} else if (isFederationQueueReceiver(sender)) {
444-
protonSession.addFederationQueueSender(sender);
445-
} else if (isFederationEventLink(sender)) {
446-
protonSession.addFederationEventDispatcher(sender);
429+
430+
link.setSource(link.getRemoteSource());
431+
link.setTarget(link.getRemoteTarget());
432+
433+
if (link instanceof Receiver receiver) {
434+
if (link.getRemoteTarget() instanceof Coordinator coordinator) {
435+
protonSession.addTransactionHandler(coordinator, receiver);
436+
} else if (isReplicaTarget(receiver)) {
437+
handleReplicaTargetLinkOpened(protonSession, receiver);
438+
} else if (isFederationControlLink(receiver)) {
439+
handleFederationControlLinkOpened(protonSession, receiver);
440+
} else if (isFederationEventLink(receiver)) {
441+
protonSession.addFederationEventProcessor(receiver);
442+
} else {
443+
protonSession.addReceiver(receiver);
444+
}
447445
} else {
448-
protonSession.addSender(sender);
446+
final Sender sender = (Sender) link;
447+
if (isFederationAddressReceiver(sender)) {
448+
protonSession.addFederationAddressSender(sender);
449+
} else if (isFederationQueueReceiver(sender)) {
450+
protonSession.addFederationQueueSender(sender);
451+
} else if (isFederationEventLink(sender)) {
452+
protonSession.addFederationEventDispatcher(sender);
453+
} else {
454+
protonSession.addSender(sender);
455+
}
449456
}
457+
} finally {
458+
protonSession.sessionSPI.resetContext(oldContext);
450459
}
451460
}
452461

tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java

Lines changed: 64 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,17 @@
3030
import javax.jms.Session;
3131
import javax.jms.TextMessage;
3232
import java.io.File;
33-
import java.util.ArrayList;
34-
import java.util.Collection;
35-
import java.util.List;
33+
import java.util.concurrent.CountDownLatch;
3634
import java.util.concurrent.ExecutorService;
3735
import java.util.concurrent.Executors;
3836
import java.util.concurrent.TimeUnit;
3937
import java.util.concurrent.atomic.AtomicInteger;
4038

4139
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
4240
import org.apache.activemq.artemis.tests.util.CFUtil;
41+
import org.apache.activemq.artemis.util.ServerUtil;
42+
import org.apache.activemq.artemis.utils.FileUtil;
4343
import org.apache.activemq.artemis.utils.RandomUtil;
44-
import org.apache.activemq.artemis.utils.ReusableLatch;
4544
import org.apache.activemq.artemis.utils.TestParameters;
4645
import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
4746
import org.junit.jupiter.api.BeforeAll;
@@ -61,18 +60,19 @@ public class HorizontalPagingTest extends SoakTestBase {
6160
private static final String TEST_NAME = "HORIZONTAL";
6261

6362
private static final boolean TEST_ENABLED = Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true"));
64-
private static final int SERVER_START_TIMEOUT = testProperty(TEST_NAME, "SERVER_START_TIMEOUT", 300_000);
65-
private static final int TIMEOUT_MINUTES = testProperty(TEST_NAME, "TIMEOUT_MINUTES", 120);
66-
private static final String PROTOCOL_LIST = testProperty(TEST_NAME, "PROTOCOL_LIST", "OPENWIRE,CORE,AMQP");
63+
private static final int SERVER_START_TIMEOUT = testProperty(TEST_NAME, "SERVER_START_TIMEOUT", 20_000);
64+
private static final int TIMEOUT_MINUTES = testProperty(TEST_NAME, "TIMEOUT_MINUTES", 5);
6765
private static final int PRINT_INTERVAL = testProperty(TEST_NAME, "PRINT_INTERVAL", 100);
66+
// This property is useful if you want to validate a setup on a different data folder, for example a remote directory on a NFS server or anything like that
67+
private static final String DATA_FOLDER = testProperty(TEST_NAME, "DATA_FOLDER", null);
68+
private static final int EXECUTOR_SIZE = testProperty(TEST_NAME, "EXECUTOR_SIZE", 50);
6869

6970
private final int DESTINATIONS;
7071
private final int MESSAGES;
7172
private final int COMMIT_INTERVAL;
7273
// if 0 will use AUTO_ACK
7374
private final int RECEIVE_COMMIT_INTERVAL;
7475
private final int MESSAGE_SIZE;
75-
private final int PARALLEL_SENDS;
7676

7777
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
7878

@@ -84,35 +84,34 @@ public static void createServers() throws Exception {
8484
File serverLocation = getFileServerLocation(SERVER_NAME_0);
8585
deleteDirectory(serverLocation);
8686

87+
File dataFolder = null;
88+
if (DATA_FOLDER != null) {
89+
dataFolder = new File(DATA_FOLDER);
90+
if (dataFolder.exists()) {
91+
deleteDirectory(dataFolder);
92+
}
93+
}
94+
8795
HelperCreate cliCreateServer = helperCreate();
8896
cliCreateServer.setRole("amq").setUser("admin").setPassword("admin").setAllowAnonymous(true).setNoWeb(false).setArtemisInstance(serverLocation);
89-
// some limited memory to make it more likely to fail
9097
cliCreateServer.setArgs("--java-memory", "2g");
91-
cliCreateServer.setConfiguration("./src/main/resources/servers/subscriptionPaging");
98+
cliCreateServer.setConfiguration("./src/main/resources/servers/horizontalPaging");
9299
cliCreateServer.createServer();
93-
}
94-
}
95100

96-
public static List<String> parseProtocolList() {
97-
String[] protocols = PROTOCOL_LIST.split(",");
98-
99-
List<String> protocolList = new ArrayList<>();
100-
for (String str : protocols) {
101-
logger.info("Adding {} to the list for the test", str);
102-
protocolList.add(str);
101+
if (dataFolder != null) {
102+
assertTrue(FileUtil.findReplace(new File(getFileServerLocation(SERVER_NAME_0), "/etc/broker.xml"), "data/", dataFolder.getAbsolutePath() + "/"));
103+
}
103104
}
104-
105-
return protocolList;
106105
}
107106

108107
public HorizontalPagingTest() {
109-
DESTINATIONS = TestParameters.testProperty(TEST_NAME, "DESTINATIONS", 5);
110-
MESSAGES = TestParameters.testProperty(TEST_NAME, "MESSAGES", 1000);
108+
DESTINATIONS = TestParameters.testProperty(TEST_NAME, "DESTINATIONS", 100);
109+
MESSAGES = TestParameters.testProperty(TEST_NAME, "MESSAGES", 100);
111110
COMMIT_INTERVAL = TestParameters.testProperty(TEST_NAME, "COMMIT_INTERVAL", 100);
112111
// if 0 will use AUTO_ACK
113112
RECEIVE_COMMIT_INTERVAL = TestParameters.testProperty(TEST_NAME, "RECEIVE_COMMIT_INTERVAL", 100);
114113
MESSAGE_SIZE = TestParameters.testProperty(TEST_NAME, "MESSAGE_SIZE", 10_000);
115-
PARALLEL_SENDS = TestParameters.testProperty(TEST_NAME, "PARALLEL_SENDS", 5);
114+
116115
}
117116

118117
Process serverProcess;
@@ -125,29 +124,43 @@ public void before() throws Exception {
125124
serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT);
126125
}
127126

127+
/// ///////////////////////////////////////////////////
128+
/// It is important to keep separate tests here
129+
/// as the server has to be killed within the timeframe of protocol being executed
130+
/// to validate proper callbacks are in place
131+
@Test
132+
public void testHorizontalAMQP() throws Exception {
133+
testHorizontal("AMQP");
134+
}
135+
136+
@Test
137+
public void testHorizontalCORE() throws Exception {
138+
testHorizontal("CORE");
139+
}
140+
128141
@Test
129-
public void testHorizontal() throws Exception {
130-
Collection<String> protocolList = parseProtocolList();
142+
public void testHorizontalOPENWIRE() throws Exception {
143+
testHorizontal("OPENWIRE");
144+
}
145+
146+
private void testHorizontal(String protocol) throws Exception {
131147
AtomicInteger errors = new AtomicInteger(0);
132148

133-
ExecutorService service = Executors.newFixedThreadPool(DESTINATIONS * protocolList.size());
149+
ExecutorService service = Executors.newFixedThreadPool(EXECUTOR_SIZE);
134150
runAfter(service::shutdownNow);
135151

136152
String text = RandomUtil.randomAlphaNumericString(MESSAGE_SIZE);
137153

138-
ReusableLatch latchDone = new ReusableLatch(0);
139-
140-
for (String protocol : protocolList) {
141-
String protocolUsed = protocol;
154+
{
155+
CountDownLatch latchDone = new CountDownLatch(DESTINATIONS);
142156

143157
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
144158
Connection connection = factory.createConnection();
145159
runAfter(connection::close);
146160

147161
for (int i = 0; i < DESTINATIONS; i++) {
148-
latchDone.countUp();
149162
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
150-
Queue queue = session.createQueue("queue_" + i + protocolUsed);
163+
Queue queue = session.createQueue("queue_" + i + "_" + protocol);
151164
service.execute(() -> {
152165
try {
153166
logger.info("*******************************************************************************************************************************\ndestination {}", queue.getQueueName());
@@ -172,25 +185,24 @@ public void testHorizontal() throws Exception {
172185
}
173186
});
174187
}
188+
assertTrue(latchDone.await(TIMEOUT_MINUTES, TimeUnit.MINUTES));
175189
}
176190

177-
assertTrue(latchDone.await(TIMEOUT_MINUTES, TimeUnit.MINUTES));
178-
179191
killServer(serverProcess, true);
180-
181-
serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT);
192+
serverProcess = startServer(SERVER_NAME_0, 0, -1);
193+
assertTrue(ServerUtil.waitForServerToStart(0, SERVER_START_TIMEOUT));
194+
assertEquals(0, errors.get());
182195

183196
AtomicInteger completedFine = new AtomicInteger(0);
184197

185-
for (String protocol : protocolList) {
186-
String protocolUsed = protocol;
198+
{
199+
CountDownLatch latchDone = new CountDownLatch(DESTINATIONS);
187200

188201
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
189202
Connection connectionConsumer = factory.createConnection();
190203
runAfter(connectionConsumer::close);
191204

192205
for (int i = 0; i < DESTINATIONS; i++) {
193-
latchDone.countUp();
194206
int destination = i;
195207
service.execute(() -> {
196208
try {
@@ -202,10 +214,13 @@ public void testHorizontal() throws Exception {
202214
sessionConsumer = connectionConsumer.createSession(true, Session.SESSION_TRANSACTED);
203215
}
204216

205-
MessageConsumer messageConsumer = sessionConsumer.createConsumer(sessionConsumer.createQueue("queue_" + destination + protocolUsed));
217+
String queueName = "queue_" + destination + "_" + protocol;
218+
219+
MessageConsumer messageConsumer = sessionConsumer.createConsumer(sessionConsumer.createQueue(queueName));
206220
for (int m = 0; m < MESSAGES; m++) {
207-
TextMessage message = (TextMessage) messageConsumer.receive(50_000);
221+
TextMessage message = (TextMessage) messageConsumer.receive(1_000);
208222
if (message == null) {
223+
logger.info("message is null on {}, m={}", queueName, m);
209224
m--;
210225
continue;
211226
}
@@ -238,14 +253,18 @@ public void testHorizontal() throws Exception {
238253
}
239254

240255
connectionConsumer.start();
241-
}
242256

243-
assertTrue(latchDone.await(TIMEOUT_MINUTES, TimeUnit.MINUTES));
257+
assertTrue(latchDone.await(TIMEOUT_MINUTES, TimeUnit.MINUTES));
258+
}
244259

245260
service.shutdown();
246261
assertTrue(service.awaitTermination(TIMEOUT_MINUTES, TimeUnit.MINUTES), "Test Timed Out");
247262
assertEquals(0, errors.get());
248-
assertEquals(DESTINATIONS * protocolList.size(), completedFine.get());
263+
assertEquals(DESTINATIONS, completedFine.get());
264+
265+
killServer(serverProcess, true);
266+
serverProcess = startServer(SERVER_NAME_0, 0, -1);
267+
assertTrue(ServerUtil.waitForServerToStart(0, SERVER_START_TIMEOUT));
249268
}
250269

251270
}

0 commit comments

Comments
 (0)