Skip to content

Commit bf144b0

Browse files
authored
Optimize Relay (#108)
* Optimize Relay Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Fix curerntTime Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Add src, dst current time Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Add optimizeRelay Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Refactoring optimize relay Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * * Add src, dst parameter * Fix start time variable * Change relay parameter Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Fix update clients Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Fix src, dst parameters Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Fix src, dst Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Fix tm2tm test Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Refactoring shouldExecuteRelay Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Refactoring test-service Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Add ack test to service Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Fix error log Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Fix service interval Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Fix test sleep Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Fix service test Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Remove startTime Use eventHeight Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Add logger Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * * Fix Error messages * Fix check src, dst timestamp * Fix test case comments Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Fix error logs Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Fix error message Signed-off-by: Dongri Jin <dongri.jin@speee.jp> * Fix error message Signed-off-by: Dongri Jin <dongri.jin@speee.jp> --------- Signed-off-by: Dongri Jin <dongri.jin@speee.jp>
1 parent f9ed49d commit bf144b0

File tree

13 files changed

+439
-100
lines changed

13 files changed

+439
-100
lines changed

chains/tendermint/query.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -184,22 +184,22 @@ func (c *Chain) QueryUnreceivedPackets(ctx core.QueryContext, seqs []uint64) ([]
184184
PacketCommitmentSequences: seqs,
185185
})
186186
if err != nil {
187-
return nil, err
187+
return nil, fmt.Errorf("failed to query unreceived packets: error=%w height=%v", err, ctx.Height())
188188
}
189189
return res.Sequences, nil
190190
}
191191

192192
func (c *Chain) QueryUnfinalizedRelayPackets(ctx core.QueryContext, counterparty core.LightClientICS04Querier) (core.PacketInfoList, error) {
193193
res, err := c.queryPacketCommitments(ctx, 0, 1000)
194194
if err != nil {
195-
return nil, err
195+
return nil, fmt.Errorf("failed to query packet commitments: error=%w height=%v", err, ctx.Height())
196196
}
197197

198198
var packets core.PacketInfoList
199199
for _, ps := range res.Commitments {
200200
packet, height, err := c.querySentPacket(ctx, ps.Sequence)
201201
if err != nil {
202-
return nil, err
202+
return nil, fmt.Errorf("failed to query sent packet: error=%w height=%v", err, ctx.Height())
203203
}
204204
packets = append(packets, &core.PacketInfo{
205205
Packet: *packet,
@@ -210,14 +210,14 @@ func (c *Chain) QueryUnfinalizedRelayPackets(ctx core.QueryContext, counterparty
210210

211211
var counterpartyCtx core.QueryContext
212212
if counterpartyH, err := counterparty.GetLatestFinalizedHeader(); err != nil {
213-
return nil, err
213+
return nil, fmt.Errorf("failed to get latest finalized header: error=%w height=%v", err, ctx.Height())
214214
} else {
215215
counterpartyCtx = core.NewQueryContext(context.TODO(), counterpartyH.GetHeight())
216216
}
217217

218218
seqs, err := counterparty.QueryUnreceivedPackets(counterpartyCtx, packets.ExtractSequenceList())
219219
if err != nil {
220-
return nil, err
220+
return nil, fmt.Errorf("failed to query counterparty for unreceived packets: error=%w, height=%v", err, counterpartyCtx.Height())
221221
}
222222
packets = packets.Filter(seqs)
223223

@@ -233,26 +233,26 @@ func (c *Chain) QueryUnreceivedAcknowledgements(ctx core.QueryContext, seqs []ui
233233
PacketAckSequences: seqs,
234234
})
235235
if err != nil {
236-
return nil, err
236+
return nil, fmt.Errorf("failed to query unreceived acks: : error=%w height=%v", err, ctx.Height())
237237
}
238238
return res.Sequences, nil
239239
}
240240

241241
func (c *Chain) QueryUnfinalizedRelayAcknowledgements(ctx core.QueryContext, counterparty core.LightClientICS04Querier) (core.PacketInfoList, error) {
242242
res, err := c.queryPacketAcknowledgementCommitments(ctx, 0, 1000)
243243
if err != nil {
244-
return nil, err
244+
return nil, fmt.Errorf("failed to query packet acknowledgement commitments: error=%w height=%v", err, ctx.Height())
245245
}
246246

247247
var packets core.PacketInfoList
248248
for _, ps := range res.Acknowledgements {
249249
packet, rpHeight, err := c.queryReceivedPacket(ctx, ps.Sequence)
250250
if err != nil {
251-
return nil, err
251+
return nil, fmt.Errorf("failed to query received packet: error=%w height=%v", err, ctx.Height())
252252
}
253253
ack, _, err := c.queryWrittenAcknowledgement(ctx, ps.Sequence)
254254
if err != nil {
255-
return nil, err
255+
return nil, fmt.Errorf("failed to query written acknowledgement: error=%w height=%v", err, ctx.Height())
256256
}
257257
packets = append(packets, &core.PacketInfo{
258258
Packet: *packet,
@@ -263,14 +263,14 @@ func (c *Chain) QueryUnfinalizedRelayAcknowledgements(ctx core.QueryContext, cou
263263

264264
var counterpartyCtx core.QueryContext
265265
if counterpartyH, err := counterparty.GetLatestFinalizedHeader(); err != nil {
266-
return nil, err
266+
return nil, fmt.Errorf("failed to get latest finalized header: error=%w height=%v", err, ctx.Height())
267267
} else {
268268
counterpartyCtx = core.NewQueryContext(context.TODO(), counterpartyH.GetHeight())
269269
}
270270

271271
seqs, err := counterparty.QueryUnreceivedAcknowledgements(counterpartyCtx, packets.ExtractSequenceList())
272272
if err != nil {
273-
return nil, err
273+
return nil, fmt.Errorf("failed to query counterparty for unreceived acknowledgements: error=%w height=%v", err, counterpartyCtx.Height())
274274
}
275275
packets = packets.Filter(seqs)
276276

cmd/service.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,18 @@ func serviceCmd(ctx *config.Context) *cobra.Command {
2727

2828
func startCmd(ctx *config.Context) *cobra.Command {
2929
const (
30-
flagRelayInterval = "relay-interval"
31-
flagPrometheusAddr = "prometheus-addr"
30+
flagRelayInterval = "relay-interval"
31+
flagPrometheusAddr = "prometheus-addr"
32+
flagSrcRelayOptimizeInterval = "src-relay-optimize-interval"
33+
flagSrcRelayOptimizeCount = "src-relay-optimize-count"
34+
flagDstRelayOptimizeInterval = "dst-relay-optimize-interval"
35+
flagDstRelayOptimizeCount = "dst-relay-optimize-count"
3236
)
3337
const (
34-
defaultRelayInterval = 3 * time.Second
35-
defaultPrometheusAddr = "localhost:2223"
38+
defaultRelayInterval = 3 * time.Second
39+
defaultPrometheusAddr = "localhost:2223"
40+
defaultRelayOptimizeInterval = 10 * time.Second
41+
defaultRelayOptimizeCount = 5
3642
)
3743

3844
cmd := &cobra.Command{
@@ -60,10 +66,24 @@ func startCmd(ctx *config.Context) *cobra.Command {
6066
if err := st.SetupRelay(context.TODO(), c[src], c[dst]); err != nil {
6167
return err
6268
}
63-
return core.StartService(context.Background(), st, c[src], c[dst], viper.GetDuration(flagRelayInterval))
69+
return core.StartService(
70+
context.Background(),
71+
st,
72+
c[src],
73+
c[dst],
74+
viper.GetDuration(flagRelayInterval),
75+
viper.GetDuration(flagSrcRelayOptimizeInterval),
76+
viper.GetUint64(flagSrcRelayOptimizeCount),
77+
viper.GetDuration(flagDstRelayOptimizeInterval),
78+
viper.GetUint64(flagDstRelayOptimizeCount),
79+
)
6480
},
6581
}
6682
cmd.Flags().Duration(flagRelayInterval, defaultRelayInterval, "time interval to perform relays")
6783
cmd.Flags().String(flagPrometheusAddr, defaultPrometheusAddr, "host address to which the prometheus exporter listens")
84+
cmd.Flags().Duration(flagSrcRelayOptimizeInterval, defaultRelayOptimizeInterval, "maximum time interval to delay relays for optimization")
85+
cmd.Flags().Uint64(flagSrcRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization")
86+
cmd.Flags().Duration(flagDstRelayOptimizeInterval, defaultRelayOptimizeInterval, "maximum time interval to delay relays for optimization")
87+
cmd.Flags().Uint64(flagDstRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization")
6888
return cmd
6989
}

cmd/tx.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,18 @@ func relayMsgsCmd(ctx *config.Context) *cobra.Command {
245245

246246
msgs := core.NewRelayMsgs()
247247

248-
if m, err := st.UpdateClients(c[src], c[dst], sp, &core.RelayPackets{}, sh, viper.GetBool(flagDoRefresh)); err != nil {
248+
doExecuteRelaySrc := len(sp.Dst) > 0
249+
doExecuteRelayDst := len(sp.Src) > 0
250+
doExecuteAckSrc := false
251+
doExecuteAckDst := false
252+
253+
if m, err := st.UpdateClients(c[src], c[dst], doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, sh, viper.GetBool(flagDoRefresh)); err != nil {
249254
return err
250255
} else {
251256
msgs.Merge(m)
252257
}
253258

254-
if m, err := st.RelayPackets(c[src], c[dst], sp, sh); err != nil {
259+
if m, err := st.RelayPackets(c[src], c[dst], sp, sh, doExecuteRelaySrc, doExecuteRelayDst); err != nil {
255260
return err
256261
} else {
257262
msgs.Merge(m)
@@ -315,13 +320,18 @@ func relayAcksCmd(ctx *config.Context) *cobra.Command {
315320

316321
msgs := core.NewRelayMsgs()
317322

318-
if m, err := st.UpdateClients(c[src], c[dst], &core.RelayPackets{}, sp, sh, viper.GetBool(flagDoRefresh)); err != nil {
323+
doExecuteRelaySrc := false
324+
doExecuteRelayDst := false
325+
doExecuteAckSrc := len(sp.Dst) > 0
326+
doExecuteAckDst := len(sp.Src) > 0
327+
328+
if m, err := st.UpdateClients(c[src], c[dst], doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, sh, viper.GetBool(flagDoRefresh)); err != nil {
319329
return err
320330
} else {
321331
msgs.Merge(m)
322332
}
323333

324-
if m, err := st.RelayAcknowledgements(c[src], c[dst], sp, sh); err != nil {
334+
if m, err := st.RelayAcknowledgements(c[src], c[dst], sp, sh, doExecuteAckSrc, doExecuteAckDst); err != nil {
325335
return err
326336
} else {
327337
msgs.Merge(m)

core/naive-strategy.go

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader
101101
now := time.Now()
102102
srcPackets, err = src.QueryUnfinalizedRelayPackets(srcCtx, dst)
103103
if err != nil {
104-
return err
104+
return fmt.Errorf("failed to query unfinalized relay packets on src chain: %w", err)
105105
}
106106
logger.TimeTrack(now, "QueryUnfinalizedRelayPackets", "queried_chain", "src", "num_packets", len(srcPackets))
107107
return nil
@@ -123,7 +123,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader
123123
now := time.Now()
124124
dstPackets, err = dst.QueryUnfinalizedRelayPackets(dstCtx, src)
125125
if err != nil {
126-
return err
126+
return fmt.Errorf("failed to query unfinalized relay packets on dst chain: %w", err)
127127
}
128128
logger.TimeTrack(now, "QueryUnfinalizedRelayPackets", "queried_chain", "dst", "num_packets", len(dstPackets))
129129
return nil
@@ -168,7 +168,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader
168168
now := time.Now()
169169
seqs, err := dst.QueryUnreceivedPackets(dstCtx, srcPackets.ExtractSequenceList())
170170
if err != nil {
171-
return err
171+
return fmt.Errorf("failed to query unreceived packets on dst chain: %w", err)
172172
}
173173
logger.TimeTrack(now, "QueryUnreceivedPackets", "queried_chain", "dst", "num_seqs", len(seqs))
174174
srcPackets = srcPackets.Filter(seqs)
@@ -179,7 +179,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader
179179
now := time.Now()
180180
seqs, err := src.QueryUnreceivedPackets(srcCtx, dstPackets.ExtractSequenceList())
181181
if err != nil {
182-
return err
182+
return fmt.Errorf("failed to query unreceived packets on src chain: %w", err)
183183
}
184184
logger.TimeTrack(now, "QueryUnreceivedPackets", "queried_chain", "src", "num_seqs", len(seqs))
185185
dstPackets = dstPackets.Filter(seqs)
@@ -199,7 +199,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader
199199
}, nil
200200
}
201201

202-
func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) (*RelayMsgs, error) {
202+
func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error) {
203203
logger := GetChannelPairLogger(src, dst)
204204
defer logger.TimeTrack(time.Now(), "RelayPackets", "num_src", len(rp.Src), "num_dst", len(rp.Dst))
205205

@@ -225,21 +225,26 @@ func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets,
225225
return nil, err
226226
}
227227

228-
msgs.Dst, err = collectPackets(srcCtx, src, rp.Src, dstAddress)
229-
if err != nil {
230-
logger.Error(
231-
"error collecting packets",
232-
err,
233-
)
234-
return nil, err
228+
if doExecuteRelayDst {
229+
msgs.Dst, err = collectPackets(srcCtx, src, rp.Src, dstAddress)
230+
if err != nil {
231+
logger.Error(
232+
"error collecting packets",
233+
err,
234+
)
235+
return nil, err
236+
}
235237
}
236-
msgs.Src, err = collectPackets(dstCtx, dst, rp.Dst, srcAddress)
237-
if err != nil {
238-
logger.Error(
239-
"error collecting packets",
240-
err,
241-
)
242-
return nil, err
238+
239+
if doExecuteRelaySrc {
240+
msgs.Src, err = collectPackets(dstCtx, dst, rp.Dst, srcAddress)
241+
if err != nil {
242+
logger.Error(
243+
"error collecting packets",
244+
err,
245+
)
246+
return nil, err
247+
}
243248
}
244249

245250
if len(msgs.Dst) == 0 && len(msgs.Src) == 0 {
@@ -281,7 +286,7 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh S
281286
now := time.Now()
282287
srcAcks, err = src.QueryUnfinalizedRelayAcknowledgements(srcCtx, dst)
283288
if err != nil {
284-
return err
289+
return fmt.Errorf("failed to query unfinalized relay acknowledgements on src chain: %w", err)
285290
}
286291
logger.TimeTrack(now, "QueryUnfinalizedRelayAcknowledgements", "queried_chain", "src", "num_packets", len(srcAcks))
287292
return nil
@@ -306,7 +311,7 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh S
306311
now := time.Now()
307312
dstAcks, err = dst.QueryUnfinalizedRelayAcknowledgements(dstCtx, src)
308313
if err != nil {
309-
return err
314+
return fmt.Errorf("failed to query unfinalized relay acknowledgements on dst chain: %w", err)
310315
}
311316
logger.TimeTrack(now, "QueryUnfinalizedRelayAcknowledgements", "queried_chain", "dst", "num_packets", len(dstAcks))
312317
return nil
@@ -350,7 +355,7 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh S
350355
now := time.Now()
351356
seqs, err := dst.QueryUnreceivedAcknowledgements(dstCtx, srcAcks.ExtractSequenceList())
352357
if err != nil {
353-
return err
358+
return fmt.Errorf("failed to query unreceived acknowledgements on dst chain: %w", err)
354359
}
355360
logger.TimeTrack(now, "QueryUnreceivedAcknowledgements", "queried_chain", "dst", "num_seqs", len(seqs))
356361
srcAcks = srcAcks.Filter(seqs)
@@ -363,7 +368,7 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh S
363368
now := time.Now()
364369
seqs, err := src.QueryUnreceivedAcknowledgements(srcCtx, dstAcks.ExtractSequenceList())
365370
if err != nil {
366-
return err
371+
return fmt.Errorf("failed to query unreceived acknowledgements on src chain: %w", err)
367372
}
368373
logger.TimeTrack(now, "QueryUnreceivedAcknowledgements", "queried_chain", "src", "num_seqs", len(seqs))
369374
dstAcks = dstAcks.Filter(seqs)
@@ -415,7 +420,7 @@ func logPacketsRelayed(src, dst Chain, num int, obj string, dir string) {
415420
)
416421
}
417422

418-
func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) (*RelayMsgs, error) {
423+
func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteAckSrc, doExecuteAckDst bool) (*RelayMsgs, error) {
419424
logger := GetChannelPairLogger(src, dst)
420425
defer logger.TimeTrack(time.Now(), "RelayAcknowledgements", "num_src", len(rp.Src), "num_dst", len(rp.Dst))
421426

@@ -440,13 +445,13 @@ func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *Rela
440445
return nil, err
441446
}
442447

443-
if !st.dstNoAck {
448+
if !st.dstNoAck && doExecuteAckDst {
444449
msgs.Dst, err = collectAcks(srcCtx, src, rp.Src, dstAddress)
445450
if err != nil {
446451
return nil, err
447452
}
448453
}
449-
if !st.srcNoAck {
454+
if !st.srcNoAck && doExecuteAckSrc {
450455
msgs.Src, err = collectAcks(dstCtx, dst, rp.Dst, srcAddress)
451456
if err != nil {
452457
return nil, err
@@ -490,16 +495,13 @@ func collectAcks(ctx QueryContext, chain *ProvableChain, packets PacketInfoList,
490495
return msgs, nil
491496
}
492497

493-
func (st *NaiveStrategy) UpdateClients(src, dst *ProvableChain, rpForRecv, rpForAck *RelayPackets, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) {
498+
func (st *NaiveStrategy) UpdateClients(src, dst *ProvableChain, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst bool, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) {
494499
logger := GetChannelPairLogger(src, dst)
495500

496501
msgs := NewRelayMsgs()
497502

498-
// check if unrelayed packets or acks exist
499-
needsUpdateForSrc := len(rpForRecv.Dst) > 0 ||
500-
!st.srcNoAck && len(rpForAck.Dst) > 0
501-
needsUpdateForDst := len(rpForRecv.Src) > 0 ||
502-
!st.dstNoAck && len(rpForAck.Src) > 0
503+
needsUpdateForSrc := doExecuteRelaySrc || (doExecuteAckSrc && !st.srcNoAck)
504+
needsUpdateForDst := doExecuteRelayDst || (doExecuteAckDst && !st.dstNoAck)
503505

504506
// check if LC refresh is needed
505507
if !needsUpdateForSrc && doRefresh {

0 commit comments

Comments
 (0)