Skip to content

Commit 5664272

Browse files
lhotarinodece
authored andcommitted
[fix] Fix mixed lookup/partition metadata requests causing reliability issues and incorrect responses (apache#24832)
(cherry picked from commit 4457b08)
1 parent 2468fa9 commit 5664272

File tree

4 files changed

+26
-5
lines changed

4 files changed

+26
-5
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -494,8 +494,12 @@ private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName,
494494
}
495495

496496
@Override
497-
protected void handleLookup(CommandLookupTopic lookup) {
497+
protected void handleLookup(CommandLookupTopic lookupParam) {
498498
checkArgument(state == State.Connected);
499+
500+
// Make a copy since the command is handled asynchronously
501+
CommandLookupTopic lookup = new CommandLookupTopic().copyFrom(lookupParam);
502+
499503
final long requestId = lookup.getRequestId();
500504
final boolean authoritative = lookup.isAuthoritative();
501505

@@ -575,8 +579,13 @@ private void writeAndFlush(ByteBuf cmd) {
575579
}
576580

577581
@Override
578-
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
582+
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadataParam) {
579583
checkArgument(state == State.Connected);
584+
585+
// Make a copy since the command is handled asynchronously
586+
CommandPartitionedTopicMetadata partitionMetadata =
587+
new CommandPartitionedTopicMetadata().copyFrom(partitionMetadataParam);
588+
580589
final long requestId = partitionMetadata.getRequestId();
581590
if (log.isDebugEnabled()) {
582591
log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(),
@@ -2958,8 +2967,12 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
29582967
}
29592968

29602969
@Override
2961-
protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) {
2970+
protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicListParam) {
29622971
checkArgument(state == State.Connected);
2972+
2973+
// make a copy since command is handled asynchronously
2974+
CommandWatchTopicList commandWatchTopicList = new CommandWatchTopicList().copyFrom(commandWatchTopicListParam);
2975+
29632976
final long requestId = commandWatchTopicList.getRequestId();
29642977
final long watcherId = commandWatchTopicList.getWatcherId();
29652978
final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace());

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1166,7 +1166,7 @@ protected void handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess c
11661166
CompletableFuture<CommandWatchTopicListSuccess> requestFuture =
11671167
(CompletableFuture<CommandWatchTopicListSuccess>) pendingRequests.remove(requestId);
11681168
if (requestFuture != null) {
1169-
requestFuture.complete(commandWatchTopicListSuccess);
1169+
requestFuture.complete(new CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess));
11701170
} else {
11711171
duplicatedResponseCounter.incrementAndGet();
11721172
log.warn("{} Received unknown request id from server: {}",

pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21+
import static org.assertj.core.api.Assertions.assertThat;
2122
import static org.mockito.Mockito.any;
2223
import static org.mockito.Mockito.mock;
2324
import static org.mockito.Mockito.verify;
@@ -315,7 +316,9 @@ public void testCreateWatcher() {
315316
.setRequestId(7)
316317
.setWatcherId(5).setTopicsHash("f00");
317318
cnx.handleCommandWatchTopicListSuccess(success);
318-
assertEquals(result.getNow(null), success);
319+
assertThat(result.getNow(null))
320+
.usingRecursiveComparison()
321+
.comparingOnlyFields("requestId", "watcherId", "topicsHash");
319322
});
320323
}
321324

pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
482482
}
483483
} finally {
484484
buffer.release();
485+
// Clear the fields in cmd to release memory.
486+
// The clear() call below also helps prevent misuse of holding references to command objects after
487+
// handle* methods complete, as per the class javadoc requirement.
488+
// While this doesn't completely prevent such misuse, it makes tests more likely to catch violations.
489+
cmd.clear();
485490
}
486491
}
487492

0 commit comments

Comments
 (0)