diff --git a/pom.xml b/pom.xml index 7e7ecce5..5c754c7b 100644 --- a/pom.xml +++ b/pom.xml @@ -176,9 +176,10 @@ .github/** src/test/resources/certs/* src/test/**/*.log - src/test/resources/META-INF/service/* + **/src/main/resources/META-INF/services/* + **/src/test/resources/META-INF/services/* **/target/** - */*.iml + **/*.iml **/*/spring.factories **/application.properties diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java new file mode 100644 index 00000000..184a0695 --- /dev/null +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.samples.springboot; + +import org.apache.rocketmq.spring.metric.MetricExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +public class AtomicLongMetricExtension implements MetricExtension { + + private final static Logger LOGGER = LoggerFactory.getLogger(AtomicLongMetricExtension.class); + + private final Map consumerMessageCountMap = new ConcurrentHashMap<>(); + + @Override + public void addProducerMessageCount(String topic, int count) { + //nothing + } + + @Override + public void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count) { + String key = topic + "_" + consumerMode.name(); + AtomicLong atomicLong = consumerMessageCountMap.computeIfAbsent(key, t -> new AtomicLong()); + LOGGER.info("The count of {} consumer messages for {} is {}" + , consumerMode.name(), topic, atomicLong.addAndGet(count)); + } +} diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension new file mode 100644 index 00000000..ba1a24d1 --- /dev/null +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension @@ -0,0 +1 @@ +org.apache.rocketmq.samples.springboot.AtomicLongMetricExtension \ No newline at end of file diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java new file mode 100644 index 00000000..ff3a0b25 --- /dev/null +++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/AtomicLongMetricExtension.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.samples.springboot; + +import org.apache.rocketmq.spring.metric.MetricExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +public class AtomicLongMetricExtension implements MetricExtension { + + private final static Logger LOGGER = LoggerFactory.getLogger(AtomicLongMetricExtension.class); + + private final Map producerMessageCountMap = new ConcurrentHashMap<>(); + + @Override + public void addProducerMessageCount(String topic, int count) { + AtomicLong atomicLong = producerMessageCountMap.computeIfAbsent(topic, t -> new AtomicLong()); + LOGGER.info("The count of producer messages for {} is {}", topic, atomicLong.addAndGet(count)); + } + + @Override + public void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count) { + //nothing + } +} diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension new file mode 100644 index 00000000..ba1a24d1 --- /dev/null +++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension @@ -0,0 +1 @@ +org.apache.rocketmq.samples.springboot.AtomicLongMetricExtension \ No newline at end of file diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/DefaultLitePullConsumerWithTopic.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/DefaultLitePullConsumerWithTopic.java new file mode 100644 index 00000000..ddf92165 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/DefaultLitePullConsumerWithTopic.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.core; + +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.remoting.RPCHook; + +public class DefaultLitePullConsumerWithTopic extends DefaultLitePullConsumer { + + private String topic; + + public DefaultLitePullConsumerWithTopic(final String consumerGroup) { + super(consumerGroup); + } + + public DefaultLitePullConsumerWithTopic(final String consumerGroup, RPCHook rpcHook) { + super(consumerGroup, rpcHook); + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java index dd6a4d8d..b5742275 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java @@ -31,6 +31,8 @@ import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.metric.MetricExtension.EConsumerMode; +import org.apache.rocketmq.spring.metric.MetricExtensionProvider; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.slf4j.Logger; @@ -239,6 +241,7 @@ public T sendAndReceive(String destination, Message message, Type type, S if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); MessageExt replyMessage; if (Objects.isNull(hashKey) || hashKey.isEmpty()) { @@ -444,6 +447,7 @@ public void sendAndReceive(String destination, Message message, } }; } + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); if (Objects.isNull(hashKey) || hashKey.isEmpty()) { producer.request(rocketMsg, requestCallback, timeout); } else { @@ -514,7 +518,7 @@ public SendResult syncSend(String destination, Collection try { long now = System.currentTimeMillis(); - Collection rmqMsgs = new ArrayList<>(); + List rmqMsgs = new ArrayList<>(); for (Message msg : messages) { if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) { log.warn("Found a message empty in the batch, skip it"); @@ -523,6 +527,10 @@ public SendResult syncSend(String destination, Collection rmqMsgs.add(this.createRocketMqMessage(destination, msg)); } + if (!rmqMsgs.isEmpty()) { + MetricExtensionProvider.addProducerMessageCount(rmqMsgs.get(0).getTopic(), rmqMsgs.size()); + } + SendResult sendResult = producer.send(rmqMsgs, timeout); long costTime = System.currentTimeMillis() - now; if (log.isDebugEnabled()) { @@ -555,6 +563,7 @@ public SendResult syncSend(String destination, Message message, long timeout, if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); SendResult sendResult = producer.send(rocketMsg, timeout); long costTime = System.currentTimeMillis() - now; if (log.isDebugEnabled()) { @@ -620,6 +629,7 @@ public SendResult syncSendOrderly(String destination, Message message, String try { long now = System.currentTimeMillis(); org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout); long costTime = System.currentTimeMillis() - now; if (log.isDebugEnabled()) { @@ -726,6 +736,7 @@ public void asyncSend(String destination, Message message, SendCallback sendC if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); producer.send(rocketMsg, sendCallback, timeout); } catch (Exception e) { log.info("asyncSend failed. destination:{}, message:{} ", destination, message); @@ -805,6 +816,7 @@ public void asyncSendOrderly(String destination, Message message, String hash } try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout); } catch (Exception e) { log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); @@ -867,6 +879,7 @@ public void sendOneWay(String destination, Message message) { } try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); producer.sendOneway(rocketMsg); } catch (Exception e) { log.error("sendOneWay failed. destination:{}, message:{} ", destination, message); @@ -899,6 +912,7 @@ public void sendOneWayOrderly(String destination, Message message, String has } try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); producer.sendOneway(rocketMsg, messageQueueSelector, hashKey); } catch (Exception e) { log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message); @@ -973,6 +987,7 @@ public TransactionSendResult sendMessageInTransaction(final String destination, throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener"); } org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); + MetricExtensionProvider.producerMessageCountIncrement(rocketMsg.getTopic()); return producer.sendMessageInTransaction(rocketMsg, arg); } catch (MQClientException e) { throw RocketMQUtil.convert(e); @@ -1082,6 +1097,8 @@ public List receive(Class clazz, long timeout) { for (MessageExt messageExt : messageExts) { list.add(doConvertMessage(messageExt, clazz)); } + MetricExtensionProvider.addConsumerMessageCount( + ((DefaultLitePullConsumerWithTopic) consumer).getTopic(), EConsumerMode.Pull, list.size()); return list; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java new file mode 100644 index 00000000..53c59a28 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtension.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.metric; + +public interface MetricExtension { + + enum EConsumerMode { + /** + * pull mode + */ + Pull, + /** + * push mode + */ + Push, + } + + /** + * Add current count of message from the producer. + * + * @param topic topic name + * @param count count of message + */ + void addProducerMessageCount(String topic, int count); + + /** + * Add current count of message from the consumer. + * + * @param topic topic name + * @param consumerMode consumer mode + * @param count count of message + */ + void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count); +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java new file mode 100644 index 00000000..a4d12c1d --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/metric/MetricExtensionProvider.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.metric; + +import org.apache.rocketmq.spring.metric.MetricExtension.EConsumerMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.ServiceLoader; + +public class MetricExtensionProvider { + + private final static Logger log = LoggerFactory.getLogger(MetricExtensionProvider.class); + + private static List metricExtensions = new ArrayList<>(); + + static { + resolveInstance(); + } + + private static void resolveInstance() { + try { + ServiceLoader serviceLoader = ServiceLoader.load(MetricExtension.class); + for (MetricExtension spi : serviceLoader) { + metricExtensions.add(spi); + } + log.info("[MetricExtensionProvider] MetricExtension resolved, size=" + metricExtensions.size()); + } catch (Throwable t) { + log.warn("[MetricExtensionProvider] WARN: MetricExtension resolve failure"); + } + } + + /** + * Get all metric extensions. DO NOT MODIFY the returned list, use {@link #addMetricExtension(MetricExtension)}. + * + * @return all metric extensions. + */ + public static List getMetricExtensions() { + return metricExtensions; + } + + /** + * Add metric extension. + *

+ * Not that this method is NOT thread safe. + *

+ * + * @param metricExtension the metric extension to add. + */ + public static void addMetricExtension(MetricExtension metricExtension) { + metricExtensions.add(metricExtension); + } + + public static void addProducerMessageCount(String topic, int count) { + for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { + m.addProducerMessageCount(topic, count); + } + } + + public static void producerMessageCountIncrement(String topic) { + addProducerMessageCount(topic, 1); + } + + public static void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count) { + for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { + m.addConsumerMessageCount(topic, consumerMode, count); + } + } + + public static void consumerMessageCountIncrement(String topic, EConsumerMode consumerMode) { + addConsumerMessageCount(topic, consumerMode, 1); + } +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index b6705db2..4039dfa6 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -48,6 +48,8 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.apache.rocketmq.spring.core.RocketMQReplyListener; +import org.apache.rocketmq.spring.metric.MetricExtension.EConsumerMode; +import org.apache.rocketmq.spring.metric.MetricExtensionProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; @@ -392,6 +394,7 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderly private void handleMessage( MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException { + MetricExtensionProvider.consumerMessageCountIncrement(messageExt.getTopic(), EConsumerMode.Push); if (rocketMQListener != null) { rocketMQListener.onMessage(doConvertMessage(messageExt)); } else if (rocketMQReplyListener != null) { diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java index 381d9365..dae77e0f 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.DefaultLitePullConsumerWithTopic; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.slf4j.Logger; @@ -296,13 +297,14 @@ public static DefaultLitePullConsumer createDefaultLitePullConsumer(String nameS String groupName, String topicName, MessageModel messageModel, SelectorType selectorType, String selectorExpression, String ak, String sk, int pullBatchSize) throws MQClientException { - DefaultLitePullConsumer litePullConsumer = null; + DefaultLitePullConsumerWithTopic litePullConsumer = null; if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { - litePullConsumer = new DefaultLitePullConsumer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk))); + litePullConsumer = new DefaultLitePullConsumerWithTopic(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk))); litePullConsumer.setVipChannelEnabled(false); } else { - litePullConsumer = new DefaultLitePullConsumer(groupName); + litePullConsumer = new DefaultLitePullConsumerWithTopic(groupName); } + litePullConsumer.setTopic(topicName); litePullConsumer.setNamesrvAddr(nameServer); litePullConsumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer)); litePullConsumer.setPullBatchSize(pullBatchSize); diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java new file mode 100644 index 00000000..62c6e4cc --- /dev/null +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/metric/MetricExtensionTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.metric; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class MetricExtensionTest implements MetricExtension { + + private static String TOPIC; + private static int COUNT; + private static EConsumerMode CONSUMER_MODE; + + @Override + public void addProducerMessageCount(String topic, int count) { + TOPIC = topic; + COUNT = count; + } + + @Override + public void addConsumerMessageCount(String topic, EConsumerMode consumerMode, int count) { + TOPIC = topic; + COUNT = count; + CONSUMER_MODE = consumerMode; + } + + @Test + public void testAddProducerMessageCount() { + MetricExtensionProvider.addProducerMessageCount("topic1", 111); + assertEquals("topic1", TOPIC); + assertEquals(111, COUNT); + } + + @Test + public void testAddConsumerMessageCount() { + MetricExtensionProvider.addConsumerMessageCount("topic2", EConsumerMode.Push, 222); + assertEquals("topic2", TOPIC); + assertEquals(222, COUNT); + assertEquals(EConsumerMode.Push, CONSUMER_MODE); + } +} diff --git a/rocketmq-spring-boot/src/test/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension b/rocketmq-spring-boot/src/test/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension new file mode 100644 index 00000000..0e69187c --- /dev/null +++ b/rocketmq-spring-boot/src/test/resources/META-INF/services/org.apache.rocketmq.spring.metric.MetricExtension @@ -0,0 +1 @@ +org.apache.rocketmq.spring.metric.MetricExtensionTest \ No newline at end of file