Skip to content

Testing EMQx MQTT message forwarding, received messages keep decreasing #150

@bobbydog

Description

@bobbydog

Using the attached plan to test EMQx message forwarding, 500 messages are sent per second, with 25 clients configured to subscribe. Theoretically, tens of thousands of messages should flow out per second. After setting QoS to 1, the outbound message rate dropped from an initial 5000 to 2000 after continuous operation. The JMeter error log shows that each time an error is reported, the received message count decreases. Could you please help investigate the cause?
2024-02-03 09:35:12,677 ERROR o.a.j.JMeter: Uncaught exception in thread Thread[RxComputationThreadPool-11,5,main]
java.lang.IllegalStateException: Current state = RESET, new state = FLUSHED
at java.base/java.nio.charset.CharsetDecoder.throwIllegalStateException(CharsetDecoder.java:989) ~[?:?]
at java.base/java.nio.charset.CharsetDecoder.flush(CharsetDecoder.java:672) ~[?:?]
at java.base/java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:801) ~[?:?]
at net.xmeter.samplers.mqtt.hivemq.HiveMQTTConnection.decode(HiveMQTTConnection.java:115) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at net.xmeter.samplers.mqtt.hivemq.HiveMQTTConnection.handlePublishReceived(HiveMQTTConnection.java:108) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.mqtt.mqtt3.Mqtt3AsyncClientView.lambda$callbackView$1(Mqtt3AsyncClientView.java:73) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:227) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:212) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.rx.FlowableWithSingle$SingleFutureSubscriber.onNext(FlowableWithSingle.java:377) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.rx.operators.FlowableWithSingleCombine$SplitSubscriber$Default.tryOnNextActual(FlowableWithSingleCombine.java:206) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.rx.operators.FlowableWithSingleCombine$SplitSubscriber.tryOnNext(FlowableWithSingleCombine.java:171) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnConditionalSubscriber.runAsync(FlowableObserveOn.java:649) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66) [mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57) [mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
2024-02-03 09:35:30,054 INFO o.a.j.r.Summariser: summary + 39 in 00:00:30 = 1.3/s Avg: 1 Min: 1 Max: 8 Err: 0 (0.00%) Active: 25 Started: 25 Finished: 0
2024-02-03 09:35:30,054 INFO o.a.j.r.Summariser: summary = 10012 in 00:03:02 = 55.1/s Avg: 0 Min: 0 Max: 435 Err: 0 (0.00%)
2024-02-03 09:35:52,629 ERROR o.a.j.JMeter: Uncaught exception in thread Thread[RxComputationThreadPool-7,5,main]
java.lang.IllegalStateException: Current state = RESET, new state = FLUSHED
at java.base/java.nio.charset.CharsetDecoder.throwIllegalStateException(CharsetDecoder.java:989) ~[?:?]
at java.base/java.nio.charset.CharsetDecoder.flush(CharsetDecoder.java:672) ~[?:?]
at java.base/java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:801) ~[?:?]
at net.xmeter.samplers.mqtt.hivemq.HiveMQTTConnection.decode(HiveMQTTConnection.java:115) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at net.xmeter.samplers.mqtt.hivemq.HiveMQTTConnection.handlePublishReceived(HiveMQTTConnection.java:108) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.mqtt.mqtt3.Mqtt3AsyncClientView.lambda$callbackView$1(Mqtt3AsyncClientView.java:73) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:227) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:212) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.rx.FlowableWithSingle$SingleFutureSubscriber.onNext(FlowableWithSingle.java:377) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.rx.operators.FlowableWithSingleCombine$SplitSubscriber$Default.tryOnNextActual(FlowableWithSingleCombine.java:206) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at com.hivemq.client.internal.rx.operators.FlowableWithSingleCombine$SplitSubscriber.tryOnNext(FlowableWithSingleCombine.java:171) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnConditionalSubscriber.runAsync(FlowableObserveOn.java:649) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176) ~[mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66) [mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57) [mqtt-xmeter-2.0.2-jar-with-dependencies.jar:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.base/java.lang.Thread.run(Thread.java:829) [?:?]

jmeter.log
JmeterPlans.zip
image


This issue was automatically translated from Chinese.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions