@@ -22,9 +22,6 @@ type Manager struct {
2222 // activeReservations contains all the active reservationsFSMs.
2323 activeReservations map [ID ]* FSM
2424
25- // hasL402 is true if the client has a valid L402.
26- hasL402 bool
27-
2825 runCtx context.Context
2926
3027 sync.Mutex
@@ -59,22 +56,15 @@ func (m *Manager) Run(ctx context.Context, height int32) error {
5956 return err
6057 }
6158
62- reservationResChan := make (
63- chan * reservationrpc.ServerReservationNotification ,
64- )
65-
66- err = m .RegisterReservationNotifications (reservationResChan )
67- if err != nil {
68- return err
69- }
59+ ntfnChan := m .cfg .NotificationManager .SubscribeReservations (ctx )
7060
7161 for {
7262 select {
7363 case height := <- newBlockChan :
7464 log .Debugf ("Received block %v" , height )
7565 currentHeight = height
7666
77- case reservationRes := <- reservationResChan :
67+ case reservationRes := <- ntfnChan :
7868 log .Debugf ("Received reservation %x" ,
7969 reservationRes .ReservationId )
8070 _ , err := m .newReservation (
@@ -157,101 +147,6 @@ func (m *Manager) newReservation(ctx context.Context, currentHeight uint32,
157147 return reservationFSM , nil
158148}
159149
160- // fetchL402 fetches the L402 from the server. This method will keep on
161- // retrying until it gets a valid response.
162- func (m * Manager ) fetchL402 (ctx context.Context ) {
163- // Add a 0 timer so that we initially fetch the L402 immediately.
164- timer := time .NewTimer (0 )
165- for {
166- select {
167- case <- ctx .Done ():
168- return
169-
170- case <- timer .C :
171- err := m .cfg .FetchL402 (ctx )
172- if err != nil {
173- log .Warnf ("Error fetching L402: %v" , err )
174- timer .Reset (time .Second * 10 )
175- continue
176- }
177- m .hasL402 = true
178- return
179- }
180- }
181- }
182-
183- // RegisterReservationNotifications registers a new reservation notification
184- // stream.
185- func (m * Manager ) RegisterReservationNotifications (
186- reservationChan chan * reservationrpc.ServerReservationNotification ) error {
187-
188- // In order to create a valid l402 we first are going to call
189- // the FetchL402 method. As a client might not have outbound capacity
190- // yet, we'll retry until we get a valid response.
191- if ! m .hasL402 {
192- m .fetchL402 (m .runCtx )
193- }
194-
195- ctx , cancel := context .WithCancel (m .runCtx )
196-
197- // We'll now subscribe to the reservation notifications.
198- reservationStream , err := m .cfg .ReservationClient .
199- ReservationNotificationStream (
200- ctx , & reservationrpc.ReservationNotificationRequest {},
201- )
202- if err != nil {
203- cancel ()
204- return err
205- }
206-
207- log .Debugf ("Successfully subscribed to reservation notifications" )
208-
209- // We'll now start a goroutine that will forward all the reservation
210- // notifications to the reservationChan.
211- go func () {
212- for {
213- reservationRes , err := reservationStream .Recv ()
214- if err == nil && reservationRes != nil {
215- log .Debugf ("Received reservation %x" ,
216- reservationRes .ReservationId )
217- reservationChan <- reservationRes
218- continue
219- }
220- log .Errorf ("Error receiving " +
221- "reservation: %v" , err )
222-
223- cancel ()
224-
225- // If we encounter an error, we'll
226- // try to reconnect.
227- for {
228- select {
229- case <- m .runCtx .Done ():
230- return
231-
232- case <- time .After (time .Second * 10 ):
233- log .Debugf ("Reconnecting to " +
234- "reservation notifications" )
235- err = m .RegisterReservationNotifications (
236- reservationChan ,
237- )
238- if err != nil {
239- log .Errorf ("Error " +
240- "reconnecting: %v" , err )
241- continue
242- }
243-
244- // If we were able to reconnect, we'll
245- // return.
246- return
247- }
248- }
249- }
250- }()
251-
252- return nil
253- }
254-
255150// RecoverReservations tries to recover all reservations that are still active
256151// from the database.
257152func (m * Manager ) RecoverReservations (ctx context.Context ) error {
0 commit comments