Skip to content

Commit c51297a

Browse files
committed
Add option to requeue message if processing takes too long
Fixes #137 References #23
1 parent c07515b commit c51297a

File tree

11 files changed

+359
-40
lines changed

11 files changed

+359
-40
lines changed

src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved. */
1+
/* Copyright (c) 2013-2021 VMware, Inc. or its affiliates. All rights reserved. */
22
package com.rabbitmq.jms.admin;
33

44
import com.rabbitmq.client.AMQP;
@@ -68,9 +68,23 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
6868
* Whether requeue message on {@link RuntimeException} in the
6969
* {@link javax.jms.MessageListener} or not.
7070
* Default is false.
71+
*
72+
* @since 1.7.0
73+
* @see RMQConnectionFactory#requeueOnTimeout
7174
*/
7275
private boolean requeueOnMessageListenerException = false;
7376

77+
/**
78+
* Whether to requeue a message that timed out or not.
79+
*
80+
* Only taken into account if requeueOnMessageListenerException is true.
81+
* Default is false.
82+
*
83+
* @since 2.3.0
84+
* @see RMQConnectionFactory#requeueOnMessageListenerException
85+
*/
86+
private boolean requeueOnTimeout = false;
87+
7488
/**
7589
* Whether to <a href="https://www.rabbitmq.com/nack.html">nack</a> messages on rollback or not.
7690
*
@@ -275,6 +289,7 @@ protected Connection createConnection(String username, String password, Connecti
275289
.setReceivingContextConsumer(rcc)
276290
.setConfirmListener(confirmListener)
277291
.setTrustedPackages(this.trustedPackages)
292+
.setRequeueOnTimeout(this.requeueOnTimeout)
278293
);
279294
logger.debug("Connection {} created.", conn);
280295
return conn;
@@ -858,7 +873,11 @@ public boolean isPreferProducerMessageProperty() {
858873
/**
859874
* Whether requeue message on {@link RuntimeException} in the
860875
* {@link javax.jms.MessageListener} or not.
876+
*
861877
* Default is false.
878+
*
879+
* @since 1.7.0
880+
* @see RMQConnectionFactory#setRequeueOnTimeout(boolean)
862881
*/
863882
public void setRequeueOnMessageListenerException(boolean requeueOnMessageListenerException) {
864883
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
@@ -1002,6 +1021,19 @@ public void setConfirmListener(ConfirmListener confirmListener) {
10021021
this.confirmListener = confirmListener;
10031022
}
10041023

1024+
/**
1025+
* Whether to requeue a message that timed out or not.
1026+
*
1027+
* Only taken into account if requeueOnMessageListenerException is true.
1028+
* Default is false.
1029+
*
1030+
* @since 2.3.0
1031+
* @see RMQConnectionFactory#setRequeueOnMessageListenerException(boolean)
1032+
*/
1033+
public void setRequeueOnTimeout(boolean requeueOnTimeout) {
1034+
this.requeueOnTimeout = requeueOnTimeout;
1035+
}
1036+
10051037
@FunctionalInterface
10061038
private interface ConnectionCreator {
10071039
com.rabbitmq.client.Connection create(com.rabbitmq.client.ConnectionFactory cf) throws Exception;

src/main/java/com/rabbitmq/jms/client/ConnectionParams.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// License, v. 2.0. If a copy of the MPL was not distributed with this
33
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
//
5-
// Copyright (c) 2016-2020 VMware, Inc. or its affiliates. All rights reserved.
5+
// Copyright (c) 2016-2021 VMware, Inc. or its affiliates. All rights reserved.
66
package com.rabbitmq.jms.client;
77

88
import com.rabbitmq.client.AMQP;
@@ -50,9 +50,23 @@ public class ConnectionParams {
5050
* Whether requeue message on {@link RuntimeException} in the
5151
* {@link javax.jms.MessageListener} or not.
5252
* Default is false.
53+
*
54+
* @since 1.7.0
55+
* @see ConnectionParams#requeueOnTimeout
5356
*/
5457
private boolean requeueOnMessageListenerException = false;
5558

59+
/**
60+
* Whether to requeue a message that timed out or not.
61+
*
62+
* Only taken into account if requeueOnMessageListenerException is true.
63+
* Default is false.
64+
*
65+
* @since 2.3.0
66+
* @see ConnectionParams#requeueOnMessageListenerException
67+
*/
68+
private boolean requeueOnTimeout = false;
69+
5670
/**
5771
* Whether to commit nack on rollback or not.
5872
* Default is false.
@@ -226,4 +240,13 @@ public ConnectionParams setTrustedPackages(List<String> trustedPackages) {
226240
public List<String> getTrustedPackages() {
227241
return trustedPackages;
228242
}
243+
244+
public ConnectionParams setRequeueOnTimeout(boolean requeueOnTimeout) {
245+
this.requeueOnTimeout = requeueOnTimeout;
246+
return this;
247+
}
248+
249+
public boolean willRequeueOnTimeout() {
250+
return requeueOnTimeout;
251+
}
229252
}

src/main/java/com/rabbitmq/jms/client/DeliveryExecutor.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
// License, v. 2.0. If a copy of the MPL was not distributed with this
33
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
//
5-
// Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved.
5+
// Copyright (c) 2013-2021 VMware, Inc. or its affiliates. All rights reserved.
66
package com.rabbitmq.jms.client;
77

88
import java.util.concurrent.Callable;
99
import java.util.concurrent.ExecutionException;
1010
import java.util.concurrent.ExecutorService;
1111
import java.util.concurrent.Executors;
12+
import java.util.concurrent.Future;
1213
import java.util.concurrent.TimeUnit;
1314
import java.util.concurrent.TimeoutException;
1415

@@ -22,7 +23,7 @@
2223
* if execution takes too long (set on instantiation), and interrupts execution on closure or timeout. Also serialises
2324
* calls. There is one instance of this executor per session.
2425
*/
25-
public class DeliveryExecutor {
26+
class DeliveryExecutor {
2627

2728
private final class CallOnMessage implements Callable<Boolean> {
2829
private final RMQMessage rmqMessage;
@@ -42,12 +43,15 @@ public Boolean call() throws Exception {
4243
/** Timeout for onMessage executions */
4344
private final long onMessageTimeoutMs;
4445

46+
private final boolean closeOnTimeout;
47+
4548
/** Executor allocated if/when onMessage calls are made; used to isolate us from potential hangs. */
4649
private ExecutorService onMessageExecutorService = null;
4750
private final Object lockOnMessageExecutorService = new Object();
4851

49-
public DeliveryExecutor(long onMessageTimeoutMs) {
52+
DeliveryExecutor(long onMessageTimeoutMs, boolean closeOnTimeout) {
5053
this.onMessageTimeoutMs = onMessageTimeoutMs;
54+
this.closeOnTimeout = closeOnTimeout;
5155
}
5256

5357
/**
@@ -62,11 +66,18 @@ public DeliveryExecutor(long onMessageTimeoutMs) {
6266
* @throws InterruptedException if executing thread is interrupted
6367
*/
6468
public void deliverMessageWithProtection(RMQMessage rmqMessage, MessageListener messageListener) throws JMSException, InterruptedException {
69+
Future<Boolean> task = null;
6570
try {
66-
this.getExecutorService().submit(new CallOnMessage(rmqMessage, messageListener)).get(this.onMessageTimeoutMs, TimeUnit.MILLISECONDS);
71+
task = this.getExecutorService().submit(new CallOnMessage(rmqMessage, messageListener));
72+
task.get(this.onMessageTimeoutMs, TimeUnit.MILLISECONDS);
6773
} catch (TimeoutException e) {
68-
this.closeAbruptly();
69-
throw new RMQJMSException("onMessage took too long and was interrupted", null);
74+
if (this.closeOnTimeout) {
75+
this.closeAbruptly();
76+
throw new RMQJMSException("onMessage took too long and was interrupted", null);
77+
} else {
78+
task.cancel(true);
79+
throw new DeliveryProcessingTimeoutException();
80+
}
7081
} catch (ExecutionException e) {
7182
throw new RMQMessageListenerExecutionJMSException("onMessage threw exception", e.getCause());
7283
}
@@ -113,4 +124,8 @@ private boolean waitForTerminatedExecutorService(ExecutorService executorService
113124
return false;
114125
}
115126
}
127+
128+
static class DeliveryProcessingTimeoutException extends RuntimeException {
129+
130+
}
116131
}

src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// License, v. 2.0. If a copy of the MPL was not distributed with this
33
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
//
5-
// Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved.
5+
// Copyright (c) 2013-2021 VMware, Inc. or its affiliates. All rights reserved.
66
package com.rabbitmq.jms.client;
77

88
import java.io.IOException;
@@ -44,6 +44,7 @@ class MessageListenerConsumer implements Consumer, Abortable {
4444
private final long terminationTimeout;
4545
private volatile boolean rejecting;
4646
private final boolean requeueOnMessageListenerException;
47+
private final boolean requeueOnTimeout;
4748

4849
/**
4950
* True when AMQP auto-ack is true as well. Happens
@@ -61,8 +62,12 @@ class MessageListenerConsumer implements Consumer, Abortable {
6162
* @param messageListener to call {@link MessageListener#onMessage(javax.jms.Message) onMessage(Message)} with received messages
6263
* @param terminationTimeout wait time (in nanoseconds) for cancel to take effect
6364
*/
64-
public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel channel, MessageListener messageListener, long terminationTimeout,
65-
boolean requeueOnMessageListenerException, ReceivingContextConsumer receivingContextConsumer) {
65+
MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel channel, MessageListener messageListener, long terminationTimeout,
66+
boolean requeueOnMessageListenerException, ReceivingContextConsumer receivingContextConsumer,
67+
boolean requeueOnTimeout) {
68+
if (requeueOnTimeout && !requeueOnMessageListenerException) {
69+
throw new IllegalArgumentException("requeueOnTimeout can be true only if requeueOnMessageListenerException is true as well");
70+
}
6671
this.messageConsumer = messageConsumer;
6772
this.channel = channel;
6873
this.messageListener = messageListener;
@@ -73,6 +78,7 @@ public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel chann
7378
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
7479
this.skipAck = messageConsumer.amqpAutoAck();
7580
this.receivingContextConsumer = receivingContextConsumer;
81+
this.requeueOnTimeout = requeueOnTimeout;
7682
}
7783

7884
private String getConsTag() {
@@ -110,7 +116,7 @@ public void handleCancelOk(String consumerTag) {
110116
* {@inheritDoc}
111117
*/
112118
@Override
113-
public void handleCancel(String consumerTag) throws IOException {
119+
public void handleCancel(String consumerTag) {
114120
logger.trace("consumerTag='{}'", consumerTag);
115121
this.completion.setComplete();
116122
}
@@ -136,19 +142,24 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
136142
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(),
137143
response, this.receivingContextConsumer);
138144
this.messageConsumer.getSession().addUncommittedTag(dtag);
139-
boolean runtimeExceptionInListener = false;
145+
boolean alreadyNacked = false;
140146
try {
141147
this.messageConsumer.getSession().deliverMessage(msg, this.messageListener);
148+
} catch (DeliveryExecutor.DeliveryProcessingTimeoutException timeoutException) {
149+
// happens only if requeueOnTimeout is true
150+
logger.debug("nacking {} because of timeout", dtag);
151+
alreadyNacked = true;
152+
nack(dtag);
142153
} catch(RMQMessageListenerExecutionJMSException e) {
143154
if (e.getCause() instanceof RuntimeException) {
144-
runtimeExceptionInListener = true;
155+
alreadyNacked = true;
145156
nack(dtag);
146157
this.abort();
147158
} else {
148159
throw e;
149160
}
150161
}
151-
if (!runtimeExceptionInListener) {
162+
if (!alreadyNacked) {
152163
dealWithAcknowledgments(dtag);
153164
}
154165
} else {

src/main/java/com/rabbitmq/jms/client/RMQConnection.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// License, v. 2.0. If a copy of the MPL was not distributed with this
33
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
//
5-
// Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved.
5+
// Copyright (c) 2013-2021 VMware, Inc. or its affiliates. All rights reserved.
66
package com.rabbitmq.jms.client;
77

88
import java.io.IOException;
@@ -94,9 +94,23 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
9494
* Whether requeue message on {@link RuntimeException} in the
9595
* {@link javax.jms.MessageListener} or not.
9696
* Default is false.
97+
*
98+
* @since 1.7.0
99+
* @see RMQConnection#requeueOnTimeout
97100
*/
98101
private final boolean requeueOnMessageListenerException;
99102

103+
/**
104+
* Whether to requeue a message that timed out or not.
105+
*
106+
* Only taken into account if requeueOnMessageListenerException is true.
107+
* Default is false.
108+
*
109+
* @since 2.3.0
110+
* @see RMQConnection#requeueOnMessageListenerException
111+
*/
112+
private final boolean requeueOnTimeout;
113+
100114
/**
101115
* Whether to commit nack on rollback or not.
102116
* Default is false.
@@ -149,6 +163,9 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
149163
* @param connectionParams parameters for this connection
150164
*/
151165
public RMQConnection(ConnectionParams connectionParams) {
166+
if (connectionParams.willRequeueOnTimeout() && !connectionParams.willRequeueOnMessageListenerException()) {
167+
throw new IllegalArgumentException("requeueOnTimeout can be true only if requeueOnMessageListenerException is true as well");
168+
}
152169

153170
connectionParams.getRabbitConnection().addShutdownListener(new RMQConnectionShutdownListener());
154171

@@ -166,6 +183,7 @@ public RMQConnection(ConnectionParams connectionParams) {
166183
this.receivingContextConsumer = connectionParams.getReceivingContextConsumer();
167184
this.confirmListener = connectionParams.getConfirmListener();
168185
this.trustedPackages = connectionParams.getTrustedPackages();
186+
this.requeueOnTimeout = connectionParams.willRequeueOnTimeout();
169187
}
170188

171189
/**
@@ -220,6 +238,7 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
220238
.setReceivingContextConsumer(this.receivingContextConsumer)
221239
.setConfirmListener(this.confirmListener)
222240
.setTrustedPackages(this.trustedPackages)
241+
.setRequeueOnTimeout(this.requeueOnTimeout)
223242
);
224243
this.sessions.add(session);
225244
return session;

src/main/java/com/rabbitmq/jms/client/RMQMessageConsumer.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// License, v. 2.0. If a copy of the MPL was not distributed with this
33
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
//
5-
// Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved.
5+
// Copyright (c) 2013-2021 VMware, Inc. or its affiliates. All rights reserved.
66
package com.rabbitmq.jms.client;
77

88
import java.io.IOException;
@@ -45,7 +45,7 @@
4545
* {@link MessageListener#onMessage} calls are implemented with a more conventional {@link Consumer}.
4646
* </p>
4747
*/
48-
public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, TopicSubscriber {
48+
class RMQMessageConsumer implements MessageConsumer, QueueReceiver, TopicSubscriber {
4949
private final Logger logger = LoggerFactory.getLogger(RMQMessageConsumer.class);
5050

5151
private static final String DIRECT_REPLY_TO = "amq.rabbitmq.reply-to";
@@ -80,6 +80,7 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
8080
private final DelayedReceiver delayedReceiver;
8181
/** Record and preserve the need to acknowledge automatically */
8282
private final boolean autoAck;
83+
private final boolean requeueOnTimeout;
8384

8485
/** Track how this consumer is being used. */
8586
private final AtomicInteger numberOfReceives = new AtomicInteger(0);
@@ -103,7 +104,10 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
103104
* @param requeueOnMessageListenerException true to requeue message on RuntimeException in listener, false otherwise
104105
*/
105106
RMQMessageConsumer(RMQSession session, RMQDestination destination, String uuidTag, boolean paused, String messageSelector, boolean requeueOnMessageListenerException,
106-
ReceivingContextConsumer receivingContextConsumer) {
107+
ReceivingContextConsumer receivingContextConsumer, boolean requeueOnTimeout) {
108+
if (requeueOnTimeout && !requeueOnMessageListenerException) {
109+
throw new IllegalArgumentException("requeueOnTimeout can be true only if requeueOnMessageListenerException is true as well");
110+
}
107111
this.session = session;
108112
this.destination = destination;
109113
this.uuidTag = uuidTag;
@@ -114,6 +118,7 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
114118
this.autoAck = session.isAutoAck();
115119
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
116120
this.receivingContextConsumer = receivingContextConsumer;
121+
this.requeueOnTimeout = requeueOnTimeout;
117122
}
118123

119124
/**
@@ -212,7 +217,8 @@ private void setNewListenerConsumer(MessageListener messageListener) throws Exce
212217
messageListener,
213218
TimeUnit.MILLISECONDS.toNanos(this.session.getConnection()
214219
.getTerminationTimeout()),
215-
this.requeueOnMessageListenerException, this.receivingContextConsumer);
220+
this.requeueOnMessageListenerException, this.receivingContextConsumer,
221+
this.requeueOnTimeout);
216222
if (this.listenerConsumer.compareAndSet(null, mlConsumer)) {
217223
this.abortables.add(mlConsumer);
218224
if (!this.getSession().getConnection().isStopped()) {

0 commit comments

Comments
 (0)