Skip to content

failed to release memory #706

@JackieJK

Description

@JackieJK

During stress testing, after sending a large number of messages, the program froze and failed to release memory even after the stress-testing tool had been shut down.

with -Xmx1G

anyone can help? sry for my poor english

Image Image

`
@OverRide
public PooledObject makeObject() throws Exception {
MqttClientParam params = paramsPool.get(paramCounter.getAndIncrement());
assert params != null;
String[] split = params.serverUri().split(":");
Mqtt3AsyncClient client = Mqtt3Client.builder()
.identifier(String.join("_", HostNameUtil.getHostName(), IDUtils.getNextId()))
.serverHost(split[0])
.serverPort(Integer.parseInt(split[1]))
.automaticReconnect(MqttClientAutoReconnect.builder().initialDelay(10, TimeUnit.SECONDS).maxDelay(10, TimeUnit.SECONDS).build())
.addConnectedListener(e -> {
log.info("{}->{}:{}", e.getClientConfig().getClientIdentifier(), e.getClientConfig().getServerHost(), e.getClientConfig().getServerPort());
})
.addDisconnectedListener(e -> {
log.error("", e.getCause());
})
.simpleAuth(Mqtt3SimpleAuth.builder().username(params.username()).password(params.password().getBytes()).build())
.buildAsync();

client.connect(Mqtt3Connect.builder().keepAlive(60).cleanSession(true).build()).get();

client.publishes(MqttGlobalPublishFilter.SUBSCRIBED, message -> {
  try {
    messageService.consume(message.getTopic().toString(), message.getPayloadAsBytes());
  } catch (Exception e) {
    log.error("", e);
  }
}, MqttMessageService.getIO_EXECUTOR());
MqttInitialization.subscribe(client);
return new PooledObject2(client, params);

}
`

GenericObjectPoolConfig<Mqtt3AsyncClient> config = new GenericObjectPoolConfig<>(); config.setBlockWhenExhausted(true); config.setMaxWait(Duration.ofSeconds(2)); config.setMinIdle(properties.getMinIdle()); config.setMaxIdle(config.getMinIdle()); config.setMaxTotal(config.getMinIdle()); config.setTestOnCreate(true); config.setTestOnBorrow(true); config.setTestWhileIdle(false); config.setLifo(false); config.setTimeBetweenEvictionRuns(Duration.ofSeconds(-1)); config.setMinEvictableIdleDuration(Duration.ofSeconds(-1));

message.setId(IDUtils.getNextId()); message.setSn(sn); byte[] payload = OBJECT_MAPPER.writeValueAsBytes(message); TopicCommand command = TopicCommand.get(message.getClass()); String messageTopic = command.getMessageTopic(); messageTopic = messageTopic.replace("{sn}", sn); Mqtt3AsyncClient client = mqttClientPool.borrow(); client.publishWith().topic(messageTopic).qos(MqttQos.AT_MOST_ONCE).payload(payload).send(); mqttClientPool.revert(client);

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions