getMetricExtensions() {
+ return metricExtensions;
+ }
+
+ /**
+ * Add metric extension.
+ *
+ * Not that this method is NOT thread safe.
+ *
+ *
+ * @param metricExtension the metric extension to add.
+ */
+ public static void addMetricExtension(MetricExtension metricExtension) {
+ metricExtensions.add(metricExtension);
+ }
+
+ public static void addProducerMessageCount(String topic, int count) {
+ for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
+ m.addProducerMessageCount(topic, count);
+ }
+ }
+
+ public static void producerMessageCountIncrement(String topic) {
+ addProducerMessageCount(topic, 1);
+ }
+
+ public static void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count) {
+ for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
+ m.addConsumerMessageCount(topic, consumerMode, count);
+ }
+ }
+
+ public static void consumerMessageCountIncrement(String topic, EConsumerMode consumerMode) {
+ addConsumerMessageCount(topic, consumerMode, 1);
+ }
+}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index b6705db2..4039dfa6 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -48,6 +48,8 @@
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.apache.rocketmq.spring.metric.MetricExtension.EConsumerMode;
+import org.apache.rocketmq.spring.metric.MetricExtensionProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
@@ -392,6 +394,7 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderly
private void handleMessage(
MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
+ MetricExtensionProvider.consumerMessageCountIncrement(messageExt.getTopic(), EConsumerMode.Push);
if (rocketMQListener != null) {
rocketMQListener.onMessage(doConvertMessage(messageExt));
} else if (rocketMQReplyListener != null) {
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index 381d9365..dae77e0f 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -37,6 +37,7 @@
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.SelectorType;
+import org.apache.rocketmq.spring.core.DefaultLitePullConsumerWithTopic;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.slf4j.Logger;
@@ -296,13 +297,14 @@ public static DefaultLitePullConsumer createDefaultLitePullConsumer(String nameS
String groupName, String topicName, MessageModel messageModel, SelectorType selectorType,
String selectorExpression, String ak, String sk, int pullBatchSize)
throws MQClientException {
- DefaultLitePullConsumer litePullConsumer = null;
+ DefaultLitePullConsumerWithTopic litePullConsumer = null;
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
- litePullConsumer = new DefaultLitePullConsumer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
+ litePullConsumer = new DefaultLitePullConsumerWithTopic(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
litePullConsumer.setVipChannelEnabled(false);
} else {
- litePullConsumer = new DefaultLitePullConsumer(groupName);
+ litePullConsumer = new DefaultLitePullConsumerWithTopic(groupName);
}
+ litePullConsumer.setTopic(topicName);
litePullConsumer.setNamesrvAddr(nameServer);
litePullConsumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
litePullConsumer.setPullBatchSize(pullBatchSize);
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java
new file mode 100644
index 00000000..62c6e4cc
--- /dev/null
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.metric;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MetricExtensionTest implements MetricExtension {
+
+ private static String TOPIC;
+ private static int COUNT;
+ private static EConsumerMode CONSUMER_MODE;
+
+ @Override
+ public void addProducerMessageCount(String topic, int count) {
+ TOPIC = topic;
+ COUNT = count;
+ }
+
+ @Override
+ public void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count) {
+ TOPIC = topic;
+ COUNT = count;
+ CONSUMER_MODE = consumerMode;
+ }
+
+ @Test
+ public void testAddProducerMessageCount() {
+ MetricExtensionProvider.addProducerMessageCount("topic1", 111);
+ assertEquals("topic1", TOPIC);
+ assertEquals(111, COUNT);
+ }
+
+ @Test
+ public void testAddConsumerMessageCount() {
+ MetricExtensionProvider.addConsumerMessageCount("topic2", EConsumerMode.Push, 222);
+ assertEquals("topic2", TOPIC);
+ assertEquals(222, COUNT);
+ assertEquals(EConsumerMode.Push, CONSUMER_MODE);
+ }
+}
diff --git a/rocketmq-spring-boot/src/test/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension b/rocketmq-spring-boot/src/test/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension
new file mode 100644
index 00000000..0e69187c
--- /dev/null
+++ b/rocketmq-spring-boot/src/test/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension
@@ -0,0 +1 @@
+org.apache.rocketmq.spring.metric.MetricExtensionTest
\ No newline at end of file