77import org .ldk .batteries .NioPeerHandler ;
88import org .ldk .enums .Currency ;
99import org .ldk .enums .Network ;
10- import org .ldk .impl .bindings ;
1110import org .ldk .structs .*;
1211import org .ldk .util .TwoTuple ;
1312
@@ -202,7 +201,10 @@ private Peer(Object _dummy, byte seed) {
202201 logger = Logger .new_impl ((String arg ) -> System .out .println (seed + ": " + arg ));
203202 fee_estimator = FeeEstimator .new_impl ((confirmation_target -> 253 ));
204203 tx_broadcaster = BroadcasterInterface .new_impl (tx -> {
205- broadcast_set .add (tx );
204+ synchronized (broadcast_set ) {
205+ broadcast_set .add (tx );
206+ broadcast_set .notifyAll ();
207+ }
206208 });
207209 monitors = new HashMap <>();
208210 this .seed = seed ;
@@ -418,12 +420,16 @@ Event[] get_monitor_events(int expected_len) {
418420 if (use_chan_manager_constructor ) {
419421 while (true ) {
420422 synchronized (this .pending_manager_events ) {
421- if (expected_len != 0 && this .pending_manager_events .size () ! = expected_len ) {
423+ if (expected_len != 0 && this .pending_manager_events .size () = = expected_len ) {
422424 break ;
423425 }
424426 }
425- try { Thread .sleep (500 ); } catch (InterruptedException e ) { assert false ; }
426- break ;
427+ if (expected_len == 0 ) {
428+ try { Thread .sleep (500 ); } catch (InterruptedException e ) { assert false ; }
429+ break ;
430+ } else {
431+ Thread .yield ();
432+ }
427433 }
428434 synchronized (this .pending_manager_events ) {
429435 Event [] res = this .pending_manager_events .toArray (new Event [0 ]);
@@ -449,7 +455,11 @@ Event[] get_monitor_events(int expected_len) {
449455 }
450456 }
451457
452- Event [] get_manager_events (int expected_len ) {
458+ Event [] get_manager_events (int expected_len , Peer peer1 , Peer peer2 ) {
459+ assert expected_len != 0 ;
460+ if (!use_nio_peer_handler ) {
461+ maybe_exchange_peer_messages (peer1 , peer2 );
462+ }
453463 Event [] res = new Event [0 ];
454464 if (use_chan_manager_constructor ) {
455465 while (res .length < expected_len ) {
@@ -464,7 +474,15 @@ Event[] get_manager_events(int expected_len) {
464474 }
465475 } else {
466476 ArrayList <Event > l = new ArrayList <Event >();
467- chan_manager .as_EventsProvider ().process_pending_events (EventHandler .new_impl (l ::add ));
477+ while (l .size () < expected_len ) {
478+ Thread .yield ();
479+ if (use_nio_peer_handler ) {
480+ peer1 .nio_peer_handler .check_events ();
481+ peer2 .nio_peer_handler .check_events ();
482+ }
483+ chan_manager .as_EventsProvider ().process_pending_events (EventHandler .new_impl (l ::add ));
484+ assert l .size () == expected_len || l .size () == 0 ; // We don't handle partial results
485+ }
468486 return l .toArray (new Event [0 ]);
469487 }
470488 assert res .length == expected_len ;
@@ -498,12 +516,9 @@ static class DescriptorHolder { SocketDescriptor val; }
498516 }
499517 }
500518 });
501- void wait_events_processed (Peer peer1 , Peer peer2 ) {
502- if (use_nio_peer_handler ) {
503- peer1 .nio_peer_handler .check_events ();
504- peer2 .nio_peer_handler .check_events ();
505- try { Thread .sleep (500 ); } catch (InterruptedException e ) { assert false ; }
506- } else {
519+
520+ void maybe_exchange_peer_messages (Peer peer1 , Peer peer2 ) {
521+ if (!use_nio_peer_handler ) {
507522 synchronized (runqueue ) {
508523 ran = false ;
509524 }
@@ -520,6 +535,9 @@ void wait_events_processed(Peer peer1, Peer peer2) {
520535 try { runqueue .wait (); } catch (InterruptedException e ) { assert false ; }
521536 }
522537 }
538+ } else if (!use_chan_manager_constructor ) {
539+ peer1 .nio_peer_handler .check_events ();
540+ peer2 .nio_peer_handler .check_events ();
523541 }
524542 }
525543 void do_read_event (PeerManager pm , SocketDescriptor descriptor , byte [] data ) {
@@ -540,6 +558,9 @@ void connect_peers(final Peer peer1, final Peer peer2) {
540558 try {
541559 peer1 .nio_peer_handler .connect (peer2 .chan_manager .get_our_node_id (), new InetSocketAddress ("127.0.0.1" , peer2 .nio_port ), 100 );
542560 } catch (IOException e ) { assert false ; }
561+ while (peer1 .peer_manager .get_peer_node_ids ().length == 0 || peer2 .peer_manager .get_peer_node_ids ().length == 0 ) {
562+ Thread .yield ();
563+ }
543564 } else {
544565 DescriptorHolder descriptor1 = new DescriptorHolder ();
545566 DescriptorHolder descriptor1ref = descriptor1 ;
@@ -573,6 +594,8 @@ public long send_data(byte[] data, boolean resume_read) {
573594 Result_NonePeerHandleErrorZ inbound_conn_res = peer2 .peer_manager .new_inbound_connection (descriptor2 );
574595 assert inbound_conn_res instanceof Result_NonePeerHandleErrorZ .Result_NonePeerHandleErrorZ_OK ;
575596 do_read_event (peer2 .peer_manager , descriptor2 , ((Result_CVec_u8ZPeerHandleErrorZ .Result_CVec_u8ZPeerHandleErrorZ_OK ) conn_res ).res );
597+
598+ maybe_exchange_peer_messages (peer1 , peer2 );
576599 }
577600 }
578601
@@ -581,13 +604,11 @@ TestState do_test_message_handler() throws InterruptedException {
581604 Peer peer2 = new Peer ((byte ) 2 );
582605
583606 connect_peers (peer1 , peer2 );
584- wait_events_processed (peer1 , peer2 );
585607
586608 Result_NoneAPIErrorZ cc_res = peer1 .chan_manager .create_channel (peer2 .node_id , 10000 , 1000 , 42 , null );
587609 assert cc_res instanceof Result_NoneAPIErrorZ .Result_NoneAPIErrorZ_OK ;
588- wait_events_processed (peer1 , peer2 );
589610
590- Event [] events = peer1 .get_manager_events (1 );
611+ Event [] events = peer1 .get_manager_events (1 , peer1 , peer2 );
591612 assert events [0 ] instanceof Event .FundingGenerationReady ;
592613 assert ((Event .FundingGenerationReady ) events [0 ]).channel_value_satoshis == 10000 ;
593614 assert ((Event .FundingGenerationReady ) events [0 ]).user_channel_id == 42 ;
@@ -604,7 +625,13 @@ TestState do_test_message_handler() throws InterruptedException {
604625 funding .addOutput (Coin .SATOSHI .multiply (10000 ), new Script (funding_spk ));
605626 Result_NoneAPIErrorZ funding_res = peer1 .chan_manager .funding_transaction_generated (chan_id , funding .bitcoinSerialize ());
606627 assert funding_res instanceof Result_NoneAPIErrorZ .Result_NoneAPIErrorZ_OK ;
607- wait_events_processed (peer1 , peer2 );
628+
629+ maybe_exchange_peer_messages (peer1 , peer2 );
630+ synchronized (peer1 .broadcast_set ) {
631+ while (peer1 .broadcast_set .size () != 1 ) {
632+ peer1 .broadcast_set .wait ();
633+ }
634+ }
608635
609636 assert peer1 .broadcast_set .size () == 1 ;
610637 assert Arrays .equals (peer1 .broadcast_set .get (0 ), funding .bitcoinSerialize ());
@@ -619,7 +646,9 @@ TestState do_test_message_handler() throws InterruptedException {
619646 peer1 .connect_block (b , height , 0 );
620647 peer2 .connect_block (b , height , 0 );
621648 }
622- wait_events_processed (peer1 , peer2 );
649+
650+ maybe_exchange_peer_messages (peer1 , peer2 );
651+ while (peer1 .chan_manager .list_usable_channels ().length != 1 || peer2 .chan_manager .list_usable_channels ().length != 1 )
623652
624653 peer1 .chan_manager .list_channels ();
625654 ChannelDetails [] peer1_chans = peer1 .chan_manager .list_usable_channels ();
@@ -664,7 +693,6 @@ TestState do_test_message_handler() throws InterruptedException {
664693
665694 Result_NonePaymentSendFailureZ payment_res = peer1 .chan_manager .send_payment (route , payment_hash , payment_secret );
666695 assert payment_res instanceof Result_NonePaymentSendFailureZ .Result_NonePaymentSendFailureZ_OK ;
667- wait_events_processed (peer1 , peer2 );
668696
669697 RouteHop [][] hops = new RouteHop [1 ][1 ];
670698 byte [] hop_pubkey = new byte [33 ];
@@ -675,21 +703,13 @@ TestState do_test_message_handler() throws InterruptedException {
675703 payment_res = peer1 .chan_manager .send_payment (r2 , payment_hash , payment_secret );
676704 assert payment_res instanceof Result_NonePaymentSendFailureZ .Result_NonePaymentSendFailureZ_Err ;
677705
678- if (!use_chan_manager_constructor ) {
679- peer1 .get_monitor_events (0 );
680- peer2 .get_monitor_events (0 );
681- } else {
682- // The events are combined across manager + monitors but peer1 still has no events
683- }
684-
685706 if (reload_peers ) {
686- if (use_nio_peer_handler ) {
687- peer1 .nio_peer_handler .interrupt ();
688- peer2 .nio_peer_handler .interrupt ();
689- }
690707 if (use_chan_manager_constructor ) {
691708 peer1 .constructor .interrupt ();
692709 peer2 .constructor .interrupt ();
710+ } else if (use_nio_peer_handler ) {
711+ peer1 .nio_peer_handler .interrupt ();
712+ peer2 .nio_peer_handler .interrupt ();
693713 }
694714 WeakReference <Peer > op1 = new WeakReference <Peer >(peer1 );
695715 peer1 = new Peer (peer1 );
@@ -713,7 +733,7 @@ public TestState(WeakReference<Peer> ref_block, Peer peer1, Peer peer2, Sha256Ha
713733 this .best_blockhash = best_blockhash ;
714734 }
715735 }
716- void do_test_message_handler_b (TestState state ) {
736+ void do_test_message_handler_b (TestState state ) throws InterruptedException {
717737 GcCheck obj = new GcCheck ();
718738 if (state .ref_block != null ) {
719739 // Ensure the original peers get freed before we move on. Note that we have to be in a different function
@@ -724,36 +744,52 @@ void do_test_message_handler_b(TestState state) {
724744 }
725745 connect_peers (state .peer1 , state .peer2 );
726746 }
727- wait_events_processed (state .peer1 , state .peer2 );
728747
729- Event [] events = state .peer2 .get_manager_events (1 );
748+ Event [] events = state .peer2 .get_manager_events (1 , state . peer1 , state . peer2 );
730749 assert events [0 ] instanceof Event .PendingHTLCsForwardable ;
731750 state .peer2 .chan_manager .process_pending_htlc_forwards ();
732751
733- events = state .peer2 .get_manager_events (1 );
752+ events = state .peer2 .get_manager_events (1 , state . peer1 , state . peer2 );
734753 assert events [0 ] instanceof Event .PaymentReceived ;
735754 byte [] payment_preimage = ((Event .PaymentReceived )events [0 ]).payment_preimage ;
736755 assert !Arrays .equals (payment_preimage , new byte [32 ]);
737756 state .peer2 .chan_manager .claim_funds (payment_preimage );
738- wait_events_processed (state .peer1 , state .peer2 );
739757
740- events = state .peer1 .get_manager_events (1 );
758+ events = state .peer1 .get_manager_events (1 , state . peer1 , state . peer2 );
741759 assert events [0 ] instanceof Event .PaymentSent ;
742760 assert Arrays .equals (((Event .PaymentSent ) events [0 ]).payment_preimage , payment_preimage );
743- wait_events_processed (state .peer1 , state .peer2 );
761+
762+ if (use_nio_peer_handler ) {
763+ // We receive PaymentSent immediately upon receipt of the payment preimage, but we expect to not have an
764+ // HTLC transaction to broadcast below, which requires a bit more time to fully complete the
765+ // commitment-transaction-update dance between both peers.
766+ Thread .sleep (100 );
767+ }
744768
745769 ChannelDetails [] peer1_chans = state .peer1 .chan_manager .list_channels ();
746770
747771 if (nice_close ) {
748772 Result_NoneAPIErrorZ close_res = state .peer1 .chan_manager .close_channel (peer1_chans [0 ].get_channel_id ());
749773 assert close_res instanceof Result_NoneAPIErrorZ .Result_NoneAPIErrorZ_OK ;
750- wait_events_processed (state .peer1 , state .peer2 );
774+ maybe_exchange_peer_messages (state .peer1 , state .peer2 );
775+ synchronized (state .peer1 .broadcast_set ) {
776+ while (state .peer1 .broadcast_set .size () != 1 ) state .peer1 .broadcast_set .wait ();
777+ }
778+ synchronized (state .peer2 .broadcast_set ) {
779+ while (state .peer2 .broadcast_set .size () != 1 ) state .peer2 .broadcast_set .wait ();
780+ }
751781
752782 assert state .peer1 .broadcast_set .size () == 1 ;
753783 assert state .peer2 .broadcast_set .size () == 1 ;
754784 } else {
755785 state .peer1 .chan_manager .force_close_all_channels ();
756- wait_events_processed (state .peer1 , state .peer2 );
786+ maybe_exchange_peer_messages (state .peer1 , state .peer2 );
787+ synchronized (state .peer1 .broadcast_set ) {
788+ while (state .peer1 .broadcast_set .size () != 1 ) state .peer1 .broadcast_set .wait ();
789+ }
790+ synchronized (state .peer2 .broadcast_set ) {
791+ while (state .peer2 .broadcast_set .size () != 1 ) state .peer2 .broadcast_set .wait ();
792+ }
757793
758794 assert state .peer1 .broadcast_set .size () == 1 ;
759795 assert state .peer2 .broadcast_set .size () == 1 ;
@@ -794,9 +830,8 @@ void do_test_message_handler_b(TestState state) {
794830
795831 if (use_nio_peer_handler ) {
796832 state .peer1 .peer_manager .disconnect_by_node_id (state .peer2 .chan_manager .get_our_node_id (), false );
797- wait_events_processed (state .peer1 , state .peer2 );
798- assert state .peer1 .peer_manager .get_peer_node_ids ().length == 0 ;
799- assert state .peer2 .peer_manager .get_peer_node_ids ().length == 0 ;
833+ while (state .peer1 .peer_manager .get_peer_node_ids ().length != 0 ) Thread .yield ();
834+ while (state .peer2 .peer_manager .get_peer_node_ids ().length != 0 ) Thread .yield ();
800835 state .peer1 .nio_peer_handler .interrupt ();
801836 state .peer2 .nio_peer_handler .interrupt ();
802837 }
0 commit comments