Skip to content

Async Mqtt3Client does not release threads created by publishes() when disconnected #633

@ASunc

Description

@ASunc

🐛 Bug Report

🔬 How To Reproduce

Steps to reproduce the behavior:

  1. ... create client by Mqtt3Client.builder()......buildAsync()
  2. call client.publishes(MqttGlobalPublishFilter.REMAINING, ....) // thread com.hivemq.client.mqtt-1-1 is created
  3. call client.connectIWth()....
  4. call client.disconnect().get() // thread is not released

Code sample

public class HiveTest {

public static void main(String[] args) throws InterruptedException, ExecutionException
{
	var test = new HiveTest();

	for (int i = 0; i < 10; i++) {
		var cli = test.connect();

		Thread.sleep(2 * 1000L);

		test.disconnect(cli);
	}

	System.out.println("Sleeping... com.hivemq.client.mqtt-n-n threads are still there.");
	Thread.sleep(300 * 1000L);
}

private void disconnect(Mqtt3AsyncClient cli) throws InterruptedException, ExecutionException
{
	cli.disconnect().get();
}

public Mqtt3AsyncClient connect() throws InterruptedException, ExecutionException
{
	var client = Mqtt3Client.builder()
			.identifier("myid")
			.serverHost("broker")
			.serverPort(1883)
			.addDisconnectedListener(ctx -> {

				System.err.println("MQTT disconnected from broker: " + ctx.getCause());
			})
			.addConnectedListener(ctx -> {

				System.out.println("MQTT connected to broker");
			})
			.buildAsync();

	client.publishes(MqttGlobalPublishFilter.REMAINING, pub -> {

		try {

			String payload = new String(pub.getPayloadAsBytes(), "UTF-8");
			System.out.println("MESSAGE:" + pub.getTopic().toString() + ":" + payload);
		}
		catch (UnsupportedEncodingException e) {

			System.err.println("Bad payload encoding.");
		}
	});

	client.connectWith().cleanSession(false).keepAlive(60).send().thenCompose(connAck -> {

		return client.subscribeWith()
				.addSubscription(Mqtt3Subscription.builder().topicFilter("test").build())
				.send();

	}).get();

	return client;
}

}

Environment

Where are you running/using this client? Windows 11

Hardware or Device? Dell Laptop

What version of this client are you using? 1.3.3

JVM version? 11.0.22.7

Operating System? Windows 11

Which MQTT protocol version is being used? 3

Which MQTT broker (name and version)? mosquitto 2.0.18

Screenshots

📈 Expected behavior

When disconnecting, I would expect that thread would be terminated. Don't confuse this
thread with various RxComputation threads, I know that number of those are capped.

📎 Additional context

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions