Skip to content

Commit 3a1ff95

Browse files
committed
reservation: remove notifications from reservation
This commit removes the notification stream from the reservation manager and replaces it with a subscriber interface.
1 parent e7bd486 commit 3a1ff95

File tree

5 files changed

+30
-142
lines changed

5 files changed

+30
-142
lines changed

instantout/reservation/actions_test.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,6 @@ func (m *mockReservationClient) OpenReservation(ctx context.Context,
5757
args.Error(1)
5858
}
5959

60-
func (m *mockReservationClient) ReservationNotificationStream(
61-
ctx context.Context, in *swapserverrpc.ReservationNotificationRequest,
62-
opts ...grpc.CallOption,
63-
) (swapserverrpc.ReservationService_ReservationNotificationStreamClient,
64-
error) {
65-
66-
args := m.Called(ctx, in, opts)
67-
return args.Get(0).(swapserverrpc.ReservationService_ReservationNotificationStreamClient),
68-
args.Error(1)
69-
}
70-
7160
func (m *mockReservationClient) FetchL402(ctx context.Context,
7261
in *swapserverrpc.FetchL402Request,
7362
opts ...grpc.CallOption) (*swapserverrpc.FetchL402Response, error) {

instantout/reservation/fsm.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ type Config struct {
3030

3131
// FetchL402 is the function used to fetch the l402 token.
3232
FetchL402 func(context.Context) error
33+
34+
// NotificationManager is the manager that handles the notification
35+
// subscriptions.
36+
NotificationManager ReservationNotificationManager
3337
}
3438

3539
// FSM is the state machine that manages the reservation lifecycle.

instantout/reservation/interfaces.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package reservation
33
import (
44
"context"
55
"fmt"
6+
7+
"github.com/lightninglabs/loop/swapserverrpc"
68
)
79

810
var (
@@ -31,3 +33,10 @@ type Store interface {
3133
// made.
3234
ListReservations(ctx context.Context) ([]*Reservation, error)
3335
}
36+
37+
// ReservationNotificationManager handles subscribing to incoming reservation
38+
// subscriptions.
39+
type ReservationNotificationManager interface {
40+
SubscribeReservations(context.Context,
41+
) <-chan *swapserverrpc.ServerReservationNotification
42+
}

instantout/reservation/manager.go

Lines changed: 2 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@ type Manager struct {
2222
// activeReservations contains all the active reservationsFSMs.
2323
activeReservations map[ID]*FSM
2424

25-
// hasL402 is true if the client has a valid L402.
26-
hasL402 bool
27-
2825
runCtx context.Context
2926

3027
sync.Mutex
@@ -59,22 +56,15 @@ func (m *Manager) Run(ctx context.Context, height int32) error {
5956
return err
6057
}
6158

62-
reservationResChan := make(
63-
chan *reservationrpc.ServerReservationNotification,
64-
)
65-
66-
err = m.RegisterReservationNotifications(reservationResChan)
67-
if err != nil {
68-
return err
69-
}
59+
ntfnChan := m.cfg.NotificationManager.SubscribeReservations(ctx)
7060

7161
for {
7262
select {
7363
case height := <-newBlockChan:
7464
log.Debugf("Received block %v", height)
7565
currentHeight = height
7666

77-
case reservationRes := <-reservationResChan:
67+
case reservationRes := <-ntfnChan:
7868
log.Debugf("Received reservation %x",
7969
reservationRes.ReservationId)
8070
_, err := m.newReservation(
@@ -157,101 +147,6 @@ func (m *Manager) newReservation(ctx context.Context, currentHeight uint32,
157147
return reservationFSM, nil
158148
}
159149

160-
// fetchL402 fetches the L402 from the server. This method will keep on
161-
// retrying until it gets a valid response.
162-
func (m *Manager) fetchL402(ctx context.Context) {
163-
// Add a 0 timer so that we initially fetch the L402 immediately.
164-
timer := time.NewTimer(0)
165-
for {
166-
select {
167-
case <-ctx.Done():
168-
return
169-
170-
case <-timer.C:
171-
err := m.cfg.FetchL402(ctx)
172-
if err != nil {
173-
log.Warnf("Error fetching L402: %v", err)
174-
timer.Reset(time.Second * 10)
175-
continue
176-
}
177-
m.hasL402 = true
178-
return
179-
}
180-
}
181-
}
182-
183-
// RegisterReservationNotifications registers a new reservation notification
184-
// stream.
185-
func (m *Manager) RegisterReservationNotifications(
186-
reservationChan chan *reservationrpc.ServerReservationNotification) error {
187-
188-
// In order to create a valid l402 we first are going to call
189-
// the FetchL402 method. As a client might not have outbound capacity
190-
// yet, we'll retry until we get a valid response.
191-
if !m.hasL402 {
192-
m.fetchL402(m.runCtx)
193-
}
194-
195-
ctx, cancel := context.WithCancel(m.runCtx)
196-
197-
// We'll now subscribe to the reservation notifications.
198-
reservationStream, err := m.cfg.ReservationClient.
199-
ReservationNotificationStream(
200-
ctx, &reservationrpc.ReservationNotificationRequest{},
201-
)
202-
if err != nil {
203-
cancel()
204-
return err
205-
}
206-
207-
log.Debugf("Successfully subscribed to reservation notifications")
208-
209-
// We'll now start a goroutine that will forward all the reservation
210-
// notifications to the reservationChan.
211-
go func() {
212-
for {
213-
reservationRes, err := reservationStream.Recv()
214-
if err == nil && reservationRes != nil {
215-
log.Debugf("Received reservation %x",
216-
reservationRes.ReservationId)
217-
reservationChan <- reservationRes
218-
continue
219-
}
220-
log.Errorf("Error receiving "+
221-
"reservation: %v", err)
222-
223-
cancel()
224-
225-
// If we encounter an error, we'll
226-
// try to reconnect.
227-
for {
228-
select {
229-
case <-m.runCtx.Done():
230-
return
231-
232-
case <-time.After(time.Second * 10):
233-
log.Debugf("Reconnecting to " +
234-
"reservation notifications")
235-
err = m.RegisterReservationNotifications(
236-
reservationChan,
237-
)
238-
if err != nil {
239-
log.Errorf("Error "+
240-
"reconnecting: %v", err)
241-
continue
242-
}
243-
244-
// If we were able to reconnect, we'll
245-
// return.
246-
return
247-
}
248-
}
249-
}
250-
}()
251-
252-
return nil
253-
}
254-
255150
// RecoverReservations tries to recover all reservations that are still active
256151
// from the database.
257152
func (m *Manager) RecoverReservations(ctx context.Context) error {

instantout/reservation/manager_test.go

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/lightningnetwork/lnd/chainntnfs"
1414
"github.com/stretchr/testify/mock"
1515
"github.com/stretchr/testify/require"
16-
"google.golang.org/grpc"
1716
)
1817

1918
var (
@@ -115,17 +114,10 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext {
115114
store := NewSQLStore(loopdb.NewTypedStore[Querier](dbFixture))
116115

117116
mockReservationClient := new(mockReservationClient)
118-
119117
sendChan := make(chan *swapserverrpc.ServerReservationNotification)
120-
121-
mockReservationClient.On(
122-
"ReservationNotificationStream", mock.Anything, mock.Anything,
123-
mock.Anything,
124-
).Return(
125-
&dummyReservationNotificationServer{
126-
SendChan: sendChan,
127-
}, nil,
128-
)
118+
mockNtfnManager := &mockNtfnManager{
119+
sendChan: sendChan,
120+
}
129121

130122
mockReservationClient.On(
131123
"OpenReservation", mock.Anything, mock.Anything, mock.Anything,
@@ -134,11 +126,12 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext {
134126
)
135127

136128
cfg := &Config{
137-
Store: store,
138-
Wallet: mockLnd.WalletKit,
139-
ChainNotifier: mockLnd.ChainNotifier,
140-
FetchL402: func(context.Context) error { return nil },
141-
ReservationClient: mockReservationClient,
129+
Store: store,
130+
Wallet: mockLnd.WalletKit,
131+
ChainNotifier: mockLnd.ChainNotifier,
132+
FetchL402: func(context.Context) error { return nil },
133+
ReservationClient: mockReservationClient,
134+
NotificationManager: mockNtfnManager,
142135
}
143136

144137
manager := NewManager(cfg)
@@ -152,17 +145,15 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext {
152145
}
153146
}
154147

155-
type dummyReservationNotificationServer struct {
156-
grpc.ClientStream
157-
158-
// SendChan is the channel that is used to send notifications.
159-
SendChan chan *swapserverrpc.ServerReservationNotification
148+
type mockNtfnManager struct {
149+
sendChan chan *swapserverrpc.ServerReservationNotification
160150
}
161151

162-
func (d *dummyReservationNotificationServer) Recv() (
163-
*swapserverrpc.ServerReservationNotification, error) {
152+
func (m *mockNtfnManager) SubscribeReservations(
153+
ctx context.Context,
154+
) <-chan *swapserverrpc.ServerReservationNotification {
164155

165-
return <-d.SendChan, nil
156+
return m.sendChan
166157
}
167158

168159
func mustDecodeID(id string) ID {

0 commit comments

Comments
 (0)