Skip to content

Commit 9cbf830

Browse files
committed
Free memory for MqttPublishes.receive on timeout/interrupt
1 parent c070dec commit 9cbf830

File tree

1 file changed

+53
-39
lines changed

1 file changed

+53
-39
lines changed

src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java

Lines changed: 53 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.hivemq.client.internal.mqtt.util.MqttChecks;
3131
import com.hivemq.client.internal.util.AsyncRuntimeException;
3232
import com.hivemq.client.internal.util.Checks;
33+
import com.hivemq.client.internal.util.collections.NodeList;
3334
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
3435
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
3536
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException;
@@ -51,7 +52,6 @@
5152
import org.jetbrains.annotations.Nullable;
5253
import org.reactivestreams.Subscription;
5354

54-
import java.util.LinkedList;
5555
import java.util.Optional;
5656
import java.util.concurrent.CancellationException;
5757
import java.util.concurrent.CountDownLatch;
@@ -219,7 +219,7 @@ public void disconnect(final @NotNull Mqtt5Disconnect disconnect) {
219219
private static class MqttPublishes implements Mqtt5Publishes, FlowableSubscriber<Mqtt5Publish> {
220220

221221
private final @NotNull AtomicReference<@Nullable Subscription> subscription = new AtomicReference<>();
222-
private final @NotNull LinkedList<Entry> entries = new LinkedList<>();
222+
private final @NotNull NodeList<Entry> entries = new NodeList<>();
223223
private @Nullable Mqtt5Publish queuedPublish;
224224
private @Nullable Throwable error;
225225

@@ -248,16 +248,15 @@ public void onNext(final @NotNull Mqtt5Publish publish) {
248248
if (error != null) {
249249
return;
250250
}
251-
Entry entry;
252-
while ((entry = entries.poll()) != null) {
253-
final boolean success = entry.result.compareAndSet(null, publish);
251+
final Entry entry = entries.getFirst();
252+
if (entry == null) {
253+
queuedPublish = publish;
254+
} else {
255+
entries.remove(entry);
256+
entry.result = publish;
254257
entry.latch.countDown();
255-
if (success) {
256-
request();
257-
return;
258-
}
258+
request();
259259
}
260-
queuedPublish = publish;
261260
}
262261
}
263262

@@ -273,9 +272,9 @@ public void onError(final @NotNull Throwable t) {
273272
return;
274273
}
275274
error = t;
276-
Entry entry;
277-
while ((entry = entries.poll()) != null) {
278-
entry.result.set(t);
275+
for (Entry entry = entries.getFirst(); entry != null; entry = entry.getNext()) {
276+
entries.remove(entry);
277+
entry.result = t;
279278
entry.latch.countDown();
280279
}
281280
}
@@ -293,26 +292,27 @@ public void onError(final @NotNull Throwable t) {
293292
return publish;
294293
}
295294
entry = new Entry();
296-
entries.offer(entry);
295+
entries.add(entry);
297296
}
298297

299-
InterruptedException interruptedException = null;
298+
Object result;
300299
try {
301300
entry.latch.await();
301+
result = entry.result;
302+
assert (result instanceof Mqtt5Publish) || (result instanceof Throwable);
302303
} catch (final InterruptedException e) {
303-
interruptedException = e;
304+
result = tryCancel(entry, e);
304305
}
305-
final Object result = entry.result.getAndSet(Entry.CANCELLED);
306306
if (result instanceof Mqtt5Publish) {
307307
return (Mqtt5Publish) result;
308308
}
309309
if (result instanceof Throwable) {
310+
if (result instanceof InterruptedException) {
311+
throw (InterruptedException) result;
312+
}
310313
throw handleError((Throwable) result);
311314
}
312-
if (interruptedException != null) {
313-
throw interruptedException;
314-
}
315-
throw new InterruptedException();
315+
throw new IllegalStateException("This must not happen and is a bug.");
316316
}
317317

318318
@Override
@@ -334,25 +334,29 @@ public void onError(final @NotNull Throwable t) {
334334
return Optional.of(publish);
335335
}
336336
entry = new Entry();
337-
entries.offer(entry);
337+
entries.add(entry);
338338
}
339339

340-
InterruptedException interruptedException = null;
340+
Object result;
341341
try {
342-
entry.latch.await(timeout, timeUnit);
342+
if (entry.latch.await(timeout, timeUnit)) {
343+
result = entry.result;
344+
assert (result instanceof Mqtt5Publish) || (result instanceof Throwable);
345+
} else {
346+
result = tryCancel(entry, null);
347+
}
343348
} catch (final InterruptedException e) {
344-
interruptedException = e;
349+
result = tryCancel(entry, e);
345350
}
346-
final Object result = entry.result.getAndSet(Entry.CANCELLED);
347351
if (result instanceof Mqtt5Publish) {
348352
return Optional.of((Mqtt5Publish) result);
349353
}
350354
if (result instanceof Throwable) {
355+
if (result instanceof InterruptedException) {
356+
throw (InterruptedException) result;
357+
}
351358
throw handleError((Throwable) result);
352359
}
353-
if (interruptedException != null) {
354-
throw interruptedException;
355-
}
356360
return Optional.empty();
357361
}
358362

@@ -369,13 +373,25 @@ public void onError(final @NotNull Throwable t) {
369373
}
370374

371375
private @Nullable Mqtt5Publish receiveNowUnsafe() {
376+
final Mqtt5Publish queuedPublish = this.queuedPublish;
372377
if (queuedPublish != null) {
373-
final Mqtt5Publish queuedPublish = this.queuedPublish;
374378
this.queuedPublish = null;
375379
request();
376-
return queuedPublish;
377380
}
378-
return null;
381+
return queuedPublish;
382+
}
383+
384+
private @Nullable Object tryCancel(final @NotNull Entry entry, final @Nullable Object resultOnCancel) {
385+
synchronized (entries) {
386+
final Object result = entry.result;
387+
if (result == null) {
388+
entries.remove(entry);
389+
return resultOnCancel;
390+
} else {
391+
assert (result instanceof Mqtt5Publish) || (result instanceof Throwable);
392+
return result;
393+
}
394+
}
379395
}
380396

381397
@Override
@@ -389,9 +405,9 @@ public void close() {
389405
return;
390406
}
391407
error = new CancellationException();
392-
Entry entry;
393-
while ((entry = entries.poll()) != null) {
394-
entry.result.set(error);
408+
for (Entry entry = entries.getFirst(); entry != null; entry = entry.getNext()) {
409+
entries.remove(entry);
410+
entry.result = error;
395411
entry.latch.countDown();
396412
}
397413
}
@@ -404,12 +420,10 @@ public void close() {
404420
throw new RuntimeException(t);
405421
}
406422

407-
private static class Entry {
408-
409-
static final @NotNull Object CANCELLED = new Object();
423+
private static class Entry extends NodeList.Node<Entry> {
410424

411425
final @NotNull CountDownLatch latch = new CountDownLatch(1);
412-
final @NotNull AtomicReference<@Nullable Object> result = new AtomicReference<>();
426+
@Nullable Object result = null;
413427
}
414428
}
415429
}

0 commit comments

Comments
 (0)