@@ -34,7 +34,7 @@ public static class InvalidSerializedDataException extends Exception {}
3434 */
3535 public final TwoTuple <ChannelMonitor , byte []>[] channel_monitors ;
3636
37- private final Watch chain_watch ;
37+ private final ChainMonitor chain_monitor ;
3838
3939 /**
4040 * Deserializes a channel manager and a set of channel monitors from the given serialized copies and interface implementations
@@ -44,7 +44,7 @@ public static class InvalidSerializedDataException extends Exception {}
4444 * outputs will be loaded when chain_sync_completed is called.
4545 */
4646 public ChannelManagerConstructor (byte [] channel_manager_serialized , byte [][] channel_monitors_serialized ,
47- KeysInterface keys_interface , FeeEstimator fee_estimator , Watch chain_watch , @ Nullable Filter filter ,
47+ KeysInterface keys_interface , FeeEstimator fee_estimator , ChainMonitor chain_monitor , @ Nullable Filter filter ,
4848 BroadcasterInterface tx_broadcaster , Logger logger ) throws InvalidSerializedDataException {
4949 final ChannelMonitor [] monitors = new ChannelMonitor [channel_monitors_serialized .length ];
5050 this .channel_monitors = new TwoTuple [monitors .length ];
@@ -57,14 +57,14 @@ public ChannelManagerConstructor(byte[] channel_manager_serialized, byte[][] cha
5757 this .channel_monitors [i ] = new TwoTuple <>(monitors [i ], ((Result_C2Tuple_BlockHashChannelMonitorZDecodeErrorZ .Result_C2Tuple_BlockHashChannelMonitorZDecodeErrorZ_OK )res ).res .a );
5858 }
5959 Result_C2Tuple_BlockHashChannelManagerZDecodeErrorZ res =
60- UtilMethods .constructor_BlockHashChannelManagerZ_read (channel_manager_serialized , keys_interface , fee_estimator , chain_watch , tx_broadcaster ,
60+ UtilMethods .constructor_BlockHashChannelManagerZ_read (channel_manager_serialized , keys_interface , fee_estimator , chain_monitor . as_Watch () , tx_broadcaster ,
6161 logger , UserConfig .constructor_default (), monitors );
6262 if (res instanceof Result_C2Tuple_BlockHashChannelManagerZDecodeErrorZ .Result_C2Tuple_BlockHashChannelManagerZDecodeErrorZ_Err ) {
6363 throw new InvalidSerializedDataException ();
6464 }
6565 this .channel_manager = ((Result_C2Tuple_BlockHashChannelManagerZDecodeErrorZ .Result_C2Tuple_BlockHashChannelManagerZDecodeErrorZ_OK )res ).res .b ;
6666 this .channel_manager_latest_block_hash = ((Result_C2Tuple_BlockHashChannelManagerZDecodeErrorZ .Result_C2Tuple_BlockHashChannelManagerZDecodeErrorZ_OK )res ).res .a ;
67- this .chain_watch = chain_watch ;
67+ this .chain_monitor = chain_monitor ;
6868 if (filter != null ) {
6969 for (ChannelMonitor monitor : monitors ) {
7070 monitor .load_outputs_to_watch (filter );
@@ -76,21 +76,74 @@ public ChannelManagerConstructor(byte[] channel_manager_serialized, byte[][] cha
7676 * Constructs a channel manager from the given interface implementations
7777 */
7878 public ChannelManagerConstructor (LDKNetwork network , UserConfig config , byte [] current_blockchain_tip_hash , int current_blockchain_tip_height ,
79- KeysInterface keys_interface , FeeEstimator fee_estimator , Watch chain_watch ,
79+ KeysInterface keys_interface , FeeEstimator fee_estimator , ChainMonitor chain_monitor ,
8080 BroadcasterInterface tx_broadcaster , Logger logger ) throws InvalidSerializedDataException {
8181 channel_monitors = new TwoTuple [0 ];
8282 channel_manager_latest_block_hash = null ;
83- this .chain_watch = chain_watch ;
84- channel_manager = ChannelManager .constructor_new (fee_estimator , chain_watch , tx_broadcaster , logger , keys_interface , config , network , current_blockchain_tip_hash , current_blockchain_tip_height );
83+ this .chain_monitor = chain_monitor ;
84+ channel_manager = ChannelManager .constructor_new (fee_estimator , chain_monitor . as_Watch () , tx_broadcaster , logger , keys_interface , config , network , current_blockchain_tip_hash , current_blockchain_tip_height );
8585 }
8686
87+ /**
88+ * Abstract interface which should handle Events and persist the ChannelManager. When you call chain_sync_completed
89+ * a background thread is started which will automatically call these methods for you when events occur.
90+ */
91+ public interface ChannelManagerPersister {
92+ void handle_events (Event [] events );
93+ void persist_manager (byte [] channel_manager_bytes );
94+ }
95+
96+ Thread persister_thread = null ;
97+ volatile boolean shutdown = false ;
98+
8799 /**
88100 * Utility which adds all of the deserialized ChannelMonitors to the chain watch so that further updates from the
89101 * ChannelManager are processed as normal.
102+ *
103+ * This also spawns a background thread which will call the appropriate methods on the provided
104+ * ChannelManagerPersister as required.
90105 */
91- public void chain_sync_completed () {
106+ public void chain_sync_completed (ChannelManagerPersister persister ) {
107+ if (persister_thread != null ) { return ; }
92108 for (TwoTuple <ChannelMonitor , byte []> monitor : channel_monitors ) {
93- this .chain_watch .watch_channel (monitor .a .get_funding_txo ().a , monitor .a );
109+ this .chain_monitor . as_Watch () .watch_channel (monitor .a .get_funding_txo ().a , monitor .a );
94110 }
111+ persister_thread = new Thread (() -> {
112+ long lastTimerTick = System .currentTimeMillis ();
113+ while (true ) {
114+ boolean need_persist = this .channel_manager .await_persistable_update_timeout (1 );
115+ Event [] events = this .channel_manager .as_EventsProvider ().get_and_clear_pending_events ();
116+ if (events .length != 0 ) {
117+ persister .handle_events (events );
118+ need_persist = true ;
119+ }
120+ events = this .chain_monitor .as_EventsProvider ().get_and_clear_pending_events ();
121+ if (events .length != 0 ) {
122+ persister .handle_events (events );
123+ need_persist = true ;
124+ }
125+ if (need_persist ) {
126+ persister .persist_manager (this .channel_manager .write ());
127+ }
128+ if (shutdown ) {
129+ return ;
130+ }
131+ if (lastTimerTick < System .currentTimeMillis () - 60 * 1000 ) {
132+ this .channel_manager .timer_chan_freshness_every_min ();
133+ lastTimerTick = System .currentTimeMillis ();
134+ }
135+ }
136+ }, "NioPeerHandler NIO Thread" );
137+ persister_thread .start ();
138+ }
139+
140+ /**
141+ * Interrupt the background thread, stopping the background handling of
142+ */
143+ public void interrupt () {
144+ shutdown = true ;
145+ try {
146+ persister_thread .join ();
147+ } catch (InterruptedException ignored ) { }
95148 }
96149}
0 commit comments