Skip to content

Commit 9987e91

Browse files
authored
Merge pull request #53 from duhenglucky/consumer_polish
[ISSUE #54]Polish consumer and metadata definition
2 parents 28d3142 + c189db0 commit 9987e91

File tree

12 files changed

+375
-274
lines changed

12 files changed

+375
-274
lines changed

openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919

2020
import io.openmessaging.MessagingAccessPoint;
2121
import io.openmessaging.OMS;
22-
import io.openmessaging.consumer.Consumer;
22+
import io.openmessaging.consumer.PullConsumer;
2323
import io.openmessaging.message.Message;
24+
import java.util.Arrays;
2425

2526
public class PullConsumerApp {
2627
public static void main(String[] args) {
@@ -29,7 +30,7 @@ public static void main(String[] args) {
2930
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
3031

3132
//Start a PullConsumer to receive messages from the specific queue.
32-
final Consumer consumer = messagingAccessPoint.createConsumer();
33+
final PullConsumer consumer = messagingAccessPoint.createPullConsumer();
3334

3435
//Register a shutdown hook to close the opened endpoints.
3536
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@@ -39,7 +40,7 @@ public void run() {
3940
}
4041
}));
4142

42-
consumer.bindQueue("NS://HELLO_QUEUE");
43+
consumer.bindQueue(Arrays.asList("NS://HELLO_QUEUE"));
4344
consumer.start();
4445

4546
Message message = consumer.receive(1000);

openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import io.openmessaging.OMS;
2222
import io.openmessaging.consumer.Consumer;
2323
import io.openmessaging.consumer.MessageListener;
24+
import io.openmessaging.consumer.PushConsumer;
2425
import io.openmessaging.manager.ResourceManager;
2526
import io.openmessaging.message.Message;
27+
import java.util.Arrays;
2628

2729
public class PushConsumerApp {
2830
public static void main(String[] args) {
@@ -33,7 +35,7 @@ public static void main(String[] args) {
3335
//Fetch a ResourceManager to create Queue resource.
3436
ResourceManager resourceManager = messagingAccessPoint.resourceManager();
3537
resourceManager.createNamespace("NS://XXXX");
36-
final Consumer consumer = messagingAccessPoint.createConsumer();
38+
final PushConsumer consumer = messagingAccessPoint.createPushConsumer();
3739
consumer.start();
3840

3941
//Register a shutdown hook to close the opened endpoints.
@@ -49,7 +51,7 @@ public void run() {
4951
resourceManager.createQueue(simpleQueue);
5052
//This queue doesn't has a source queue, so only the message delivered to the queue directly can
5153
//be consumed by this consumer.
52-
consumer.bindQueue(simpleQueue, new MessageListener() {
54+
consumer.bindQueue(Arrays.asList(simpleQueue), new MessageListener() {
5355
@Override
5456
public void onReceived(Message message, Context context) {
5557
System.out.println("Received one message: " + message);
@@ -58,7 +60,7 @@ public void onReceived(Message message, Context context) {
5860

5961
});
6062

61-
consumer.unbindQueue(simpleQueue);
63+
consumer.unbindQueue(Arrays.asList(simpleQueue));
6264

6365
consumer.stop();
6466
}

openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void run() {
6161
Message message = producer.createMessage(
6262
"NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
6363
message.header().setBornHost("127.0.0.1").setDurability((short) 0);
64-
message.extensionHeader().get().setPartition(1);
64+
message.extensionHeader().setPartition(1);
6565
SendResult sendResult = producer.send(message);
6666
System.out.println("SendResult: " + sendResult);
6767

openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package io.openmessaging.samples.routing;
1919

20+
import io.openmessaging.consumer.PushConsumer;
2021
import io.openmessaging.message.Message;
2122
import io.openmessaging.MessagingAccessPoint;
2223
import io.openmessaging.OMS;
2324
import io.openmessaging.consumer.Consumer;
2425
import io.openmessaging.consumer.MessageListener;
2526
import io.openmessaging.manager.ResourceManager;
2627
import io.openmessaging.producer.Producer;
28+
import java.util.Arrays;
2729

2830
public class RoutingApp {
2931
public static void main(String[] args) {
@@ -54,10 +56,10 @@ public static void main(String[] args) {
5456
producer.send(message);
5557

5658
//Consume messages from the queue behind the routing.
57-
final Consumer consumer = messagingAccessPoint.createConsumer();
59+
final PushConsumer consumer = messagingAccessPoint.createPushConsumer();
5860
consumer.start();
5961

60-
consumer.bindQueue(destinationQueue, new MessageListener() {
62+
consumer.bindQueue(Arrays.asList(destinationQueue), new MessageListener() {
6163
@Override
6264
public void onReceived(Message message, Context context) {
6365
//The message sent to the sourceQueue will be delivered to anotherConsumer by the routing rule

openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919

2020
import io.openmessaging.consumer.Consumer;
2121
import io.openmessaging.consumer.MessageListener;
22+
import io.openmessaging.consumer.PullConsumer;
23+
import io.openmessaging.consumer.PushConsumer;
2224
import io.openmessaging.exception.OMSRuntimeException;
2325
import io.openmessaging.exception.OMSSecurityException;
2426
import io.openmessaging.manager.ResourceManager;
2527
import io.openmessaging.message.MessageFactory;
2628
import io.openmessaging.producer.Producer;
2729
import io.openmessaging.producer.TransactionStateCheckListener;
30+
import java.util.Collection;
2831

2932
/**
3033
* An instance of {@code MessagingAccessPoint} may be obtained from {@link OMS}, which is capable of creating {@code
@@ -91,15 +94,43 @@ public interface MessagingAccessPoint {
9194
Producer createProducer(TransactionStateCheckListener transactionStateCheckListener);
9295

9396
/**
94-
* Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint}. The returned {@code Consumer}
95-
* isn't bind to any queue, uses {@link Consumer#bindQueue(String, MessageListener)} to bind queues.
97+
* Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint}.
98+
* The returned {@code PushConsumer} isn't attached to any queue,
99+
* uses {@link PushConsumer#bindQueue(Collection, MessageListener)} to attach queues.
96100
*
97101
* @return the created {@code PushConsumer}
98-
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
99-
* error
100-
* @throws OMSSecurityException if have no authority to create a consumer.
102+
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
103+
* due to some internal error
104+
*/
105+
PushConsumer createPushConsumer();
106+
107+
/**
108+
* Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint}.
109+
*
110+
* @return the created {@code PullConsumer}
111+
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
112+
* due to some internal error
113+
*/
114+
PullConsumer createPullConsumer();
115+
116+
/**
117+
* Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint} with some preset attributes.
118+
*
119+
* @param attributes the preset attributes
120+
* @return the created {@code PushConsumer}
121+
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
122+
* due to some internal error
123+
*/
124+
PushConsumer createPushConsumer(KeyValue attributes);
125+
126+
/**
127+
* Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint}.
128+
*
129+
* @return the created {@code PullConsumer}
130+
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
131+
* due to some internal error
101132
*/
102-
Consumer createConsumer();
133+
PullConsumer createPullConsumer(KeyValue attributes);
103134

104135
/**
105136
* Gets a lightweight {@code ResourceManager} instance from the specified {@code MessagingAccessPoint}.

0 commit comments

Comments
 (0)