-
Notifications
You must be signed in to change notification settings - Fork 197
Description
Using the attached plan to test EMQx message forwarding, 500 messages are sent per second, with 25 clients configured to subscribe. Theoretically, tens of thousands of messages should flow out per second. After setting QoS to 1, the outbound message rate dropped from an initial 5000 to 2000 after continuous operation. The JMeter error log shows that each time an error is reported, the received message count decreases. Could you please help investigate the cause?
2024-02-03 09:35:12,677 ERROR o.a.j.JMeter: Uncaught exception in thread Thread[RxComputationThreadPool-11,5,main]
java.lang.IllegalStateException: Current state = RESET, new state = FLUSHED
at java.base/java.nio.charset.CharsetDecoder.throwIllegalStateException(CharsetDecoder.java:989) ~[?:?]
at java.base/java.nio.charset.CharsetDecoder.flush(CharsetDecoder.java:672) ~[?:?]
at java.base/java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:801) ~[?:?]
at net.xmeter.samplers.mqtt.hivemq.HiveMQTTConnection.decode(HiveMQTTConnection.java:115) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at net.xmeter.samplers.mqtt.hivemq.HiveMQTTConnection.handlePublishReceived(HiveMQTTConnection.java:108) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.mqtt.mqtt3.Mqtt3AsyncClientView.lambda$callbackView$1(Mqtt3AsyncClientView.java:73) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:227) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:212) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.rx.FlowableWithSingle$SingleFutureSubscriber.onNext(FlowableWithSingle.java:377) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.rx.operators.FlowableWithSingleCombine$SplitSubscriber$Default.tryOnNextActual(FlowableWithSingleCombine.java:206) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.rx.operators.FlowableWithSingleCombine$SplitSubscriber.tryOnNext(FlowableWithSingleCombine.java:171) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnConditionalSubscriber.runAsync(FlowableObserveOn.java:649) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66) [mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57) [mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
2024-02-03 09:35:30,054 INFO o.a.j.r.Summariser: summary + 39 in 00:00:30 = 1.3/s Avg: 1 Min: 1 Max: 8 Err: 0 (0.00%) Active: 25 Started: 25 Finished: 0
2024-02-03 09:35:30,054 INFO o.a.j.r.Summariser: summary = 10012 in 00:03:02 = 55.1/s Avg: 0 Min: 0 Max: 435 Err: 0 (0.00%)
2024-02-03 09:35:52,629 ERROR o.a.j.JMeter: Uncaught exception in thread Thread[RxComputationThreadPool-7,5,main]
java.lang.IllegalStateException: Current state = RESET, new state = FLUSHED
at java.base/java.nio.charset.CharsetDecoder.throwIllegalStateException(CharsetDecoder.java:989) ~[?:?]
at java.base/java.nio.charset.CharsetDecoder.flush(CharsetDecoder.java:672) ~[?:?]
at java.base/java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:801) ~[?:?]
at net.xmeter.samplers.mqtt.hivemq.HiveMQTTConnection.decode(HiveMQTTConnection.java:115) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at net.xmeter.samplers.mqtt.hivemq.HiveMQTTConnection.handlePublishReceived(HiveMQTTConnection.java:108) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.mqtt.mqtt3.Mqtt3AsyncClientView.lambda$callbackView$1(Mqtt3AsyncClientView.java:73) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:227) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:212) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.rx.FlowableWithSingle$SingleFutureSubscriber.onNext(FlowableWithSingle.java:377) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.rx.operators.FlowableWithSingleCombine$SplitSubscriber$Default.tryOnNextActual(FlowableWithSingleCombine.java:206) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.rx.operators.FlowableWithSingleCombine$SplitSubscriber.tryOnNext(FlowableWithSingleCombine.java:171) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnConditionalSubscriber.runAsync(FlowableObserveOn.java:649) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66) [mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57) [mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
This issue was automatically translated from Chinese.