2323import io .netty .channel .nio .NioEventLoopGroup ;
2424import io .opentelemetry .api .OpenTelemetry ;
2525import java .util .Collections ;
26+ import java .util .concurrent .TimeUnit ;
2627import lombok .Cleanup ;
2728import org .apache .bookkeeper .mledger .ManagedLedger ;
2829import org .apache .bookkeeper .mledger .ManagedLedgerFactory ;
@@ -56,6 +57,13 @@ public void testCheckSequenceId() throws Exception {
5657 Consumer <String > consumer = client .newConsumer (Schema .STRING ).topic (topicName ).subscriptionName ("sub" )
5758 .subscribe ();
5859
60+ // Create a producer
61+ Producer <String > producer = client .newProducer (Schema .STRING ).topic (topicName ).create ();
62+ // Move the fence timing to after the first message is successfully written
63+ // The current ledger is not empty, the Broker recovery will not take the abnormal path of
64+ // "deleting empty ledger + unable to find old ledger"
65+ producer .send ("Hello-0" );
66+
5967 // Fence the topic by opening the ManagedLedger for the topic outside the Pulsar broker. This will cause the
6068 // broker to fail subsequent send operation and it will trigger a recover
6169 EventLoopGroup eventLoopGroup = new NioEventLoopGroup (1 );
@@ -67,15 +75,12 @@ public void testCheckSequenceId() throws Exception {
6775 ml .close ();
6876 clientFactory .close ();
6977
70- // Create a producer
71- Producer <String > producer = client .newProducer (Schema .STRING ).topic (topicName ).create ();
72-
73- for (int i = 0 ; i < num ; i ++) {
78+ for (int i = 1 ; i < num ; i ++) {
7479 producer .send ("Hello-" + i );
7580 }
7681
7782 for (int i = 0 ; i < num ; i ++) {
78- Message <String > msg = consumer .receive ();
83+ Message <String > msg = consumer .receive (10 , TimeUnit . SECONDS );
7984 assertEquals (msg .getValue (), "Hello-" + i );
8085 assertEquals (msg .getSequenceId (), i );
8186 consumer .acknowledge (msg );
0 commit comments