Skip to content

Commit 28d3142

Browse files
authored
Merge pull request #52 from zongtanghu/master
[ISSUE #51]Consumer should support bulk subscription queue and poll message by offset
2 parents 0c809cb + b3e5d3f commit 28d3142

File tree

1 file changed

+91
-3
lines changed
  • openmessaging-api/src/main/java/io/openmessaging/consumer

1 file changed

+91
-3
lines changed

openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.openmessaging.Client;
2121
import io.openmessaging.MessagingAccessPoint;
2222
import io.openmessaging.ServiceLifecycle;
23+
import io.openmessaging.annotation.Optional;
2324
import io.openmessaging.exception.OMSDestinationException;
2425
import io.openmessaging.exception.OMSRuntimeException;
2526
import io.openmessaging.exception.OMSSecurityException;
@@ -97,6 +98,20 @@ public interface Consumer extends ServiceLifecycle, Client {
9798
*/
9899
void bindQueue(String queueName);
99100

101+
/**
102+
* Bind the {@code Consumer} to a collection of queue in pull model, user can use {@link Consumer#receive(long)} to get
103+
* messages from a collection of queue.
104+
* <p>
105+
* {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is
106+
* coming.
107+
*
108+
* @param queueNames a collection of queues.
109+
* @throws OMSSecurityException when have no authority to bind to this queue.
110+
* @throws OMSDestinationException when have no given destination in the server.
111+
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
112+
*/
113+
void bindQueue(List<String> queueNames);
114+
100115
/**
101116
* Bind the {@code Consumer} to a specified queue, with a {@code MessageListener}.
102117
* <p>
@@ -111,6 +126,20 @@ public interface Consumer extends ServiceLifecycle, Client {
111126
*/
112127
void bindQueue(String queueName, MessageListener listener);
113128

129+
/**
130+
* Bind the {@code Consumer} to a collection of queue, with a {@code MessageListener}.
131+
* <p>
132+
* {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is
133+
* coming.
134+
*
135+
* @param queueNames a collection of queues.
136+
* @param listener a specified listener to receive new message.
137+
* @throws OMSSecurityException when have no authority to bind to this queue.
138+
* @throws OMSDestinationException when have no given destination in the server.
139+
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
140+
*/
141+
void bindQueues(List<String> queueNames, MessageListener listener);
142+
114143
/**
115144
* Bind the {@code Consumer} to a specified queue, with a {@code BatchMessageListener}.
116145
* <p>
@@ -125,6 +154,20 @@ public interface Consumer extends ServiceLifecycle, Client {
125154
*/
126155
void bindQueue(String queueName, BatchMessageListener listener);
127156

157+
/**
158+
* Bind the {@code Consumer} to a collection of queue, with a {@code BatchMessageListener}.
159+
* <p>
160+
* {@link BatchMessageListener#onReceived(List, BatchMessageListener.Context)} will be called when new delivered
161+
* messages is coming.
162+
*
163+
* @param queueNames a collection of queues.
164+
* @param listener a specified listener to receive new messages.
165+
* @throws OMSSecurityException when have no authority to bind to this queue.
166+
* @throws OMSDestinationException when have no given destination in the server.
167+
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
168+
*/
169+
void bindQueues(List<String> queueNames, BatchMessageListener listener);
170+
128171
/**
129172
* Unbind the {@code Consumer} from a specified queue.
130173
* <p>
@@ -134,6 +177,15 @@ public interface Consumer extends ServiceLifecycle, Client {
134177
*/
135178
void unbindQueue(String queueName);
136179

180+
/**
181+
* Unbind the {@code Consumer} from a collection of queues.
182+
* <p>
183+
* After the success call, this consumer won't receive new message from the specified queue any more.
184+
*
185+
* @param queueNames a collection of queues.
186+
*/
187+
void unbindQueues(List<String> queueNames);
188+
137189
/**
138190
* This method is used to find out whether the {@code Consumer} in bind queue.
139191
*
@@ -142,11 +194,11 @@ public interface Consumer extends ServiceLifecycle, Client {
142194
boolean isBindQueue();
143195

144196
/**
145-
* This method is used to find out the queue bind to {@code Consumer}.
197+
* This method is used to find out the collection of queues bind to {@code Consumer}.
146198
*
147-
* @return the queue this consumer is bind, or null if the consumer is not bind queue.
199+
* @return the queues this consumer is bind, or null if the consumer is not bind queue.
148200
*/
149-
String getBindQueue();
201+
List<String> getBindQueues();
150202

151203
/**
152204
* Adds a {@code ConsumerInterceptor} instance to this consumer.
@@ -176,6 +228,25 @@ public interface Consumer extends ServiceLifecycle, Client {
176228
*/
177229
Message receive(long timeout);
178230

231+
/**
232+
* Receives the next message from the which bind queue,partition and receiptId of this consumer in pull model.
233+
* <p>
234+
* This call blocks indefinitely until a message is arrives, the timeout expires, or until this {@code PullConsumer}
235+
* is shut down.
236+
*
237+
* @param queueName receive message from which queueName in Message Queue.
238+
* @param partitionId receive message from which partition in Message Queue.
239+
* @param receiptId receive message from which receipt position in Message Queue.
240+
* @param timeout receive message will blocked at most <code>timeout</code> milliseconds.
241+
* @return the next message received from the bind queues, or null if the consumer is concurrently shut down.
242+
* @throws OMSSecurityException when have no authority to receive messages from this queue.
243+
* @throws OMSTimeOutException when the given timeout elapses before the send operation completes.
244+
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
245+
*/
246+
@Optional
247+
Message receive(String queueName, int partitionId, long receiptId, long timeout);
248+
249+
179250
/**
180251
* Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic
181252
* should implement in the {@link MessageListener}.
@@ -189,6 +260,23 @@ public interface Consumer extends ServiceLifecycle, Client {
189260
*/
190261
List<Message> batchReceive(long timeout);
191262

263+
/**
264+
* Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic
265+
* should implement in the {@link MessageListener}.
266+
* <p>
267+
*
268+
* @param queueName receive message from which queueName in Message Queue.
269+
* @param partitionId receive message from which partition in Message Queue.
270+
* @param receiptId receive message from which receipt position in Message Queue.
271+
* @param timeout receive messages will blocked at most <code>timeout</code> milliseconds.
272+
* @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down.
273+
* @throws OMSSecurityException when have no authority to receive messages from this queue.
274+
* @throws OMSTimeOutException when the given timeout elapses before the send operation completes.
275+
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
276+
*/
277+
@Optional
278+
List<Message> batchReceive(String queueName, int partitionId, long receiptId, long timeout);
279+
192280
/**
193281
* Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using
194282
* manual commit.

0 commit comments

Comments
 (0)