2222import io .netty .channel .EventLoopGroup ;
2323import io .netty .channel .nio .NioEventLoopGroup ;
2424import java .util .Collections ;
25+ import java .util .concurrent .TimeUnit ;
2526import lombok .Cleanup ;
2627import org .apache .bookkeeper .mledger .ManagedLedger ;
2728import org .apache .bookkeeper .mledger .ManagedLedgerFactory ;
@@ -55,6 +56,13 @@ public void testCheckSequenceId() throws Exception {
5556 Consumer <String > consumer = client .newConsumer (Schema .STRING ).topic (topicName ).subscriptionName ("sub" )
5657 .subscribe ();
5758
59+ // Create a producer
60+ Producer <String > producer = client .newProducer (Schema .STRING ).topic (topicName ).create ();
61+ // Move the fence timing to after the first message is successfully written
62+ // The current ledger is not empty, the Broker recovery will not take the abnormal path of
63+ // "deleting empty ledger + unable to find old ledger"
64+ producer .send ("Hello-0" );
65+
5866 // Fence the topic by opening the ManagedLedger for the topic outside the Pulsar broker. This will cause the
5967 // broker to fail subsequent send operation and it will trigger a recover
6068 EventLoopGroup eventLoopGroup = new NioEventLoopGroup (1 );
@@ -66,15 +74,12 @@ public void testCheckSequenceId() throws Exception {
6674 ml .close ();
6775 clientFactory .close ();
6876
69- // Create a producer
70- Producer <String > producer = client .newProducer (Schema .STRING ).topic (topicName ).create ();
71-
72- for (int i = 0 ; i < N ; i ++) {
77+ for (int i = 1 ; i < N ; i ++) {
7378 producer .send ("Hello-" + i );
7479 }
7580
7681 for (int i = 0 ; i < N ; i ++) {
77- Message <String > msg = consumer .receive ();
82+ Message <String > msg = consumer .receive (10 , TimeUnit . SECONDS );
7883 assertEquals (msg .getValue (), "Hello-" + i );
7984 assertEquals (msg .getSequenceId (), i );
8085 consumer .acknowledge (msg );
0 commit comments