Skip to content

Commit c6149b7

Browse files
author
Daisuke Kanda
committed
process handshake after joining
Signed-off-by: Daisuke Kanda <daisuke.kanda@datachain.jp>
1 parent 262e7c6 commit c6149b7

File tree

2 files changed

+112
-175
lines changed

2 files changed

+112
-175
lines changed

core/channel.go

Lines changed: 55 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,6 @@ type createChannelFutureProofs struct {
140140
chanRes *chantypes.QueryChannelResponse
141141
}
142142

143-
type createChannelFutureMsg = func(proofs *createChannelFutureProofs) (msg []sdk.Msg, last bool)
144-
145143
func resolveCreateChannelFutureProofs(
146144
ctx context.Context,
147145
sh SyncHeaders,
@@ -167,63 +165,8 @@ func resolveCreateChannelFutureProofs(
167165
return nil
168166
}
169167

170-
type createChannelFutureMsgs struct {
171-
Src, Dst []createChannelFutureMsg
172-
}
173-
174-
func resolveCreateChannelFutureMsgs(
175-
ctx context.Context,
176-
sh SyncHeaders,
177-
fmsgs *createChannelFutureMsgs,
178-
src, dst *ProvableChain,
179-
srcProofs, dstProofs *createChannelFutureProofs,
180-
) (*RelayMsgs, error) {
181-
ret := NewRelayMsgs()
182-
183-
err := retry.Do(func() error {
184-
var eg = new(errgroup.Group)
185-
186-
if len(fmsgs.Dst) > 0 { // send to Dst
187-
eg.Go(func() error {
188-
err := resolveCreateChannelFutureProofs(ctx, sh, src, dst, srcProofs)
189-
if err != nil {
190-
return err
191-
}
192-
193-
for _, fm := range fmsgs.Dst {
194-
msgs, last := fm(srcProofs)
195-
ret.Dst = append(ret.Dst, msgs...)
196-
ret.Last = ret.Last || last
197-
}
198-
return nil
199-
})
200-
}
201-
202-
if len(fmsgs.Src) > 0 { // send to Src
203-
eg.Go(func() error {
204-
err := resolveCreateChannelFutureProofs(ctx, sh, dst, src, dstProofs)
205-
if err != nil {
206-
return err
207-
}
208-
209-
for _, fm := range fmsgs.Src {
210-
msgs, last := fm(dstProofs)
211-
ret.Src = append(ret.Src, msgs...)
212-
ret.Last = ret.Last || last
213-
}
214-
return nil
215-
})
216-
}
217-
return eg.Wait()
218-
}, rtyAtt, rtyDel, rtyErr, retry.Context(ctx))
219-
if err != nil {
220-
return nil, err
221-
}
222-
return ret, nil
223-
}
224-
225168
func createChannelStep(ctx context.Context, src, dst *ProvableChain) (*RelayMsgs, error) {
226-
fmsgs := createChannelFutureMsgs{}
169+
out := NewRelayMsgs()
227170
if err := validatePaths(src, dst); err != nil {
228171
return nil, err
229172
}
@@ -248,13 +191,14 @@ func createChannelStep(ctx context.Context, src, dst *ProvableChain) (*RelayMsgs
248191
if err != nil {
249192
return nil, err
250193
} else if !settled {
251-
return NewRelayMsgs(), nil
194+
return out, nil
252195
}
253196

197+
var srcFutureMsgs, dstFutureMsgs []func() (msg []sdk.Msg, last bool)
254198
switch {
255199
// Handshake hasn't been started on src or dst, relay `chanOpenInit` to src
256200
case srcProofs.chanRes.Channel.State == chantypes.UNINITIALIZED && dstProofs.chanRes.Channel.State == chantypes.UNINITIALIZED:
257-
fmsgs.Src = append(fmsgs.Src, func(p *createChannelFutureProofs) ([]sdk.Msg, bool) {
201+
srcFutureMsgs = append(srcFutureMsgs, func() ([]sdk.Msg, bool) {
258202
logChannelStates(ctx, src, dst, srcProofs.chanRes, dstProofs.chanRes)
259203
var msgs []sdk.Msg
260204
addr := mustGetAddress(src)
@@ -263,88 +207,114 @@ func createChannelStep(ctx context.Context, src, dst *ProvableChain) (*RelayMsgs
263207
})
264208
// Handshake has started on dst (1 step done), relay `chanOpenTry` and `updateClient` to src
265209
case srcProofs.chanRes.Channel.State == chantypes.UNINITIALIZED && dstProofs.chanRes.Channel.State == chantypes.INIT:
266-
fmsgs.Src = append(fmsgs.Src, func(p *createChannelFutureProofs) ([]sdk.Msg, bool) {
210+
srcFutureMsgs = append(srcFutureMsgs, func() ([]sdk.Msg, bool) {
267211
logChannelStates(ctx, src, dst, srcProofs.chanRes, dstProofs.chanRes)
268212
var msgs []sdk.Msg
269213
addr := mustGetAddress(src)
270-
if len(p.updateHeaders) > 0 {
271-
msgs = append(msgs, src.Path().UpdateClients(p.updateHeaders, addr)...)
214+
if len(dstProofs.updateHeaders) > 0 {
215+
msgs = append(msgs, src.Path().UpdateClients(dstProofs.updateHeaders, addr)...)
272216
}
273-
msgs = append(msgs, src.Path().ChanTry(dst.Path(), p.chanRes, addr))
217+
msgs = append(msgs, src.Path().ChanTry(dst.Path(), dstProofs.chanRes, addr))
274218
return msgs, false
275219
})
276220
// Handshake has started on src (1 step done), relay `chanOpenTry` and `updateClient` to dst
277221
case srcProofs.chanRes.Channel.State == chantypes.INIT && dstProofs.chanRes.Channel.State == chantypes.UNINITIALIZED:
278-
fmsgs.Dst = append(fmsgs.Dst, func(p *createChannelFutureProofs) ([]sdk.Msg, bool) {
222+
dstFutureMsgs = append(dstFutureMsgs, func() ([]sdk.Msg, bool) {
279223
logChannelStates(ctx, dst, src, dstProofs.chanRes, srcProofs.chanRes)
280224
var msgs []sdk.Msg
281225
addr := mustGetAddress(dst)
282-
if len(p.updateHeaders) > 0 {
283-
msgs = append(msgs, dst.Path().UpdateClients(p.updateHeaders, addr)...)
226+
if len(srcProofs.updateHeaders) > 0 {
227+
msgs = append(msgs, dst.Path().UpdateClients(srcProofs.updateHeaders, addr)...)
284228
}
285-
msgs = append(msgs, dst.Path().ChanTry(src.Path(), p.chanRes, addr))
229+
msgs = append(msgs, dst.Path().ChanTry(src.Path(), srcProofs.chanRes, addr))
286230
return msgs, false
287231
})
288232

289233
// Handshake has started on src (2 steps done), relay `chanOpenAck` and `updateClient` to dst
290234
case srcProofs.chanRes.Channel.State == chantypes.TRYOPEN && dstProofs.chanRes.Channel.State == chantypes.INIT:
291-
fmsgs.Dst = append(fmsgs.Dst, func(p *createChannelFutureProofs) ([]sdk.Msg, bool) {
235+
dstFutureMsgs = append(dstFutureMsgs, func() ([]sdk.Msg, bool) {
292236
logChannelStates(ctx, dst, src, dstProofs.chanRes, srcProofs.chanRes)
293237
var msgs []sdk.Msg
294238
addr := mustGetAddress(dst)
295-
if len(p.updateHeaders) > 0 {
296-
msgs = append(msgs, dst.Path().UpdateClients(p.updateHeaders, addr)...)
239+
if len(srcProofs.updateHeaders) > 0 {
240+
msgs = append(msgs, dst.Path().UpdateClients(srcProofs.updateHeaders, addr)...)
297241
}
298-
msgs = append(msgs, dst.Path().ChanAck(src.Path(), p.chanRes, addr))
242+
msgs = append(msgs, dst.Path().ChanAck(src.Path(), srcProofs.chanRes, addr))
299243
return msgs, false
300244
})
301245

302246
// Handshake has started on dst (2 steps done), relay `chanOpenAck` and `updateClient` to src
303247
case srcProofs.chanRes.Channel.State == chantypes.INIT && dstProofs.chanRes.Channel.State == chantypes.TRYOPEN:
304-
fmsgs.Src = append(fmsgs.Src, func(p *createChannelFutureProofs) ([]sdk.Msg, bool) {
248+
srcFutureMsgs = append(srcFutureMsgs, func() ([]sdk.Msg, bool) {
305249
logChannelStates(ctx, src, dst, srcProofs.chanRes, dstProofs.chanRes)
306250
var msgs []sdk.Msg
307251
addr := mustGetAddress(src)
308-
if len(p.updateHeaders) > 0 {
309-
msgs = append(msgs, src.Path().UpdateClients(p.updateHeaders, addr)...)
252+
if len(dstProofs.updateHeaders) > 0 {
253+
msgs = append(msgs, src.Path().UpdateClients(dstProofs.updateHeaders, addr)...)
310254
}
311-
msgs = append(msgs, src.Path().ChanAck(dst.Path(), p.chanRes, addr))
255+
msgs = append(msgs, src.Path().ChanAck(dst.Path(), dstProofs.chanRes, addr))
312256
return msgs, false
313257
})
314258

315259
// Handshake has confirmed on dst (3 steps done), relay `chanOpenConfirm` and `updateClient` to src
316260
case srcProofs.chanRes.Channel.State == chantypes.TRYOPEN && dstProofs.chanRes.Channel.State == chantypes.OPEN:
317-
fmsgs.Src = append(fmsgs.Src, func(p *createChannelFutureProofs) ([]sdk.Msg, bool) {
261+
srcFutureMsgs = append(srcFutureMsgs, func() ([]sdk.Msg, bool) {
318262
logChannelStates(ctx, src, dst, srcProofs.chanRes, dstProofs.chanRes)
319263
var msgs []sdk.Msg
320264
addr := mustGetAddress(src)
321-
if len(p.updateHeaders) > 0 {
322-
msgs = append(msgs, src.Path().UpdateClients(p.updateHeaders, addr)...)
265+
if len(dstProofs.updateHeaders) > 0 {
266+
msgs = append(msgs, src.Path().UpdateClients(dstProofs.updateHeaders, addr)...)
323267
}
324-
msgs = append(msgs, src.Path().ChanConfirm(p.chanRes, addr))
268+
msgs = append(msgs, src.Path().ChanConfirm(dstProofs.chanRes, addr))
325269
return msgs, true
326270
})
327271

328272
// Handshake has confirmed on src (3 steps done), relay `chanOpenConfirm` and `updateClient` to dst
329273
case srcProofs.chanRes.Channel.State == chantypes.OPEN && dstProofs.chanRes.Channel.State == chantypes.TRYOPEN:
330-
fmsgs.Dst = append(fmsgs.Dst, func(p *createChannelFutureProofs) ([]sdk.Msg, bool) {
274+
dstFutureMsgs = append(dstFutureMsgs, func() ([]sdk.Msg, bool) {
331275
logChannelStates(ctx, dst, src, dstProofs.chanRes, srcProofs.chanRes)
332276
var msgs []sdk.Msg
333277
addr := mustGetAddress(dst)
334-
if len(p.updateHeaders) > 0 {
335-
msgs = append(msgs, dst.Path().UpdateClients(p.updateHeaders, addr)...)
278+
if len(srcProofs.updateHeaders) > 0 {
279+
msgs = append(msgs, dst.Path().UpdateClients(srcProofs.updateHeaders, addr)...)
336280
}
337-
msgs = append(msgs, dst.Path().ChanConfirm(p.chanRes, addr))
281+
msgs = append(msgs, dst.Path().ChanConfirm(srcProofs.chanRes, addr))
338282
return msgs, true
339283
})
340284
default:
341285
panic(fmt.Sprintf("not implemeneted error: %v <=> %v", srcProofs.chanRes.Channel.State.String(), dstProofs.chanRes.Channel.State.String()))
342286
}
343287

344-
out, err := resolveCreateChannelFutureMsgs(ctx, sh, &fmsgs, src, dst, &srcProofs, &dstProofs)
288+
err = retry.Do(func() error {
289+
var eg = new(errgroup.Group)
290+
291+
if 0 < len(dstFutureMsgs) {
292+
eg.Go(func() error {
293+
return resolveCreateChannelFutureProofs(ctx, sh, src, dst, &srcProofs)
294+
})
295+
}
296+
if 0 < len(srcFutureMsgs) {
297+
eg.Go(func() error {
298+
return resolveCreateChannelFutureProofs(ctx, sh, dst, src, &dstProofs)
299+
})
300+
}
301+
return eg.Wait()
302+
}, rtyAtt, rtyDel, rtyErr, retry.Context(ctx))
345303
if err != nil {
346304
return nil, err
347305
}
306+
307+
for _, f := range dstFutureMsgs {
308+
msgs, last := f()
309+
out.Dst = append(out.Dst, msgs...)
310+
out.Last = out.Last || last
311+
}
312+
for _, f := range srcFutureMsgs {
313+
msgs, last := f()
314+
out.Src = append(out.Src, msgs...)
315+
out.Last = out.Last || last
316+
}
317+
348318
return out, nil
349319
}
350320

0 commit comments

Comments
 (0)