Skip to content

Commit 6f9ec28

Browse files
Dream95lhotari
authored andcommitted
[fix][broker] Flaky-test: TopicTransactionBufferTest.testMessagePublishInOrder (apache#24826)
Signed-off-by: Dream95 <zhou_8621@163.com> Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com> (cherry picked from commit 8828734) (cherry picked from commit 74c366c)
1 parent 051f11e commit 6f9ec28

File tree

2 files changed

+6
-17
lines changed

2 files changed

+6
-17
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,11 @@ public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
229229
return CompletableFuture.completedFuture(null);
230230
}
231231

232+
@VisibleForTesting
233+
public void setPublishFuture(CompletableFuture<Position> publishFuture) {
234+
this.publishFuture = publishFuture;
235+
}
236+
232237
@VisibleForTesting
233238
public CompletableFuture<Position> getPublishFuture() {
234239
return publishFuture;
@@ -295,7 +300,7 @@ public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceI
295300
"Transaction Buffer recover failed, the current state is: " + getState()));
296301
}
297302
}).whenComplete(((position, throwable) -> buffer.release()));
298-
publishFuture = future;
303+
setPublishFuture(future);
299304
return future;
300305
}
301306

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,20 @@
1818
*/
1919
package org.apache.pulsar.broker.transaction.buffer.utils;
2020

21-
import java.util.concurrent.CompletableFuture;
2221
import lombok.Setter;
23-
import org.apache.bookkeeper.mledger.Position;
2422
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
2523
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
2624

2725
public class TransactionBufferTestImpl extends TopicTransactionBuffer {
28-
@Setter
29-
public CompletableFuture<Void> transactionBufferFuture = null;
3026
@Setter
3127
public State state = null;
32-
@Setter
33-
public CompletableFuture<Position> publishFuture = null;
3428

3529
public TransactionBufferTestImpl(PersistentTopic topic) {
3630
super(topic);
3731
}
3832

39-
@Override
40-
public CompletableFuture<Void> getTransactionBufferFuture() {
41-
return transactionBufferFuture == null ? super.getTransactionBufferFuture() : transactionBufferFuture;
42-
}
43-
4433
@Override
4534
public State getState() {
4635
return state == null ? super.getState() : state;
4736
}
48-
49-
@Override
50-
public CompletableFuture<Position> getPublishFuture() {
51-
return publishFuture == null ? super.getPublishFuture() : publishFuture;
52-
}
5337
}

0 commit comments

Comments
 (0)