@@ -7,26 +7,31 @@ import (
77
88 "github.com/NethermindEth/juno/consensus/db"
99 "github.com/NethermindEth/juno/consensus/p2p"
10+ consensusSync "github.com/NethermindEth/juno/consensus/sync"
1011 "github.com/NethermindEth/juno/consensus/tendermint"
1112 "github.com/NethermindEth/juno/consensus/types"
1213 "github.com/NethermindEth/juno/consensus/types/actions"
14+ "github.com/NethermindEth/juno/p2p/sync"
1315 "github.com/NethermindEth/juno/utils"
1416)
1517
1618type TimeoutFn func (step types.Step , round types.Round ) time.Duration
1719
1820type Driver [V types.Hashable [H ], H types.Hash , A types.Addr ] struct {
19- log utils.Logger
20- db db.TendermintDB [V , H , A ]
21- stateMachine tendermint.StateMachine [V , H , A ]
22- commitListener CommitListener [V , H ]
23- broadcasters p2p.Broadcasters [V , H , A ]
24- listeners p2p.Listeners [V , H , A ]
21+ log utils.Logger
22+ db db.TendermintDB [V , H , A ]
23+ stateMachine tendermint.StateMachine [V , H , A ]
24+ commitListener CommitListener [V , H ]
25+ broadcasters p2p.Broadcasters [V , H , A ]
26+ listeners p2p.Listeners [V , H , A ]
27+ blockFetcher * sync.BlockFetcher
28+ messageExtractor * consensusSync.MessageExtractor [V , H , A ]
2529
2630 getTimeout TimeoutFn
2731
2832 scheduledTms map [types.Timeout ]* time.Timer
2933 timeoutsCh chan types.Timeout
34+ syncListener chan sync.BlockBody
3035}
3136
3237func New [V types.Hashable [H ], H types.Hash , A types.Addr ](
@@ -36,18 +41,23 @@ func New[V types.Hashable[H], H types.Hash, A types.Addr](
3641 commitListener CommitListener [V , H ],
3742 broadcasters p2p.Broadcasters [V , H , A ],
3843 listeners p2p.Listeners [V , H , A ],
44+ blockFetcher * sync.BlockFetcher ,
45+ messageExtractor * consensusSync.MessageExtractor [V , H , A ],
3946 getTimeout TimeoutFn ,
4047) Driver [V , H , A ] {
4148 return Driver [V , H , A ]{
42- log : log ,
43- db : db ,
44- stateMachine : stateMachine ,
45- commitListener : commitListener ,
46- broadcasters : broadcasters ,
47- listeners : listeners ,
48- getTimeout : getTimeout ,
49- scheduledTms : make (map [types.Timeout ]* time.Timer ),
50- timeoutsCh : make (chan types.Timeout ),
49+ log : log ,
50+ db : db ,
51+ stateMachine : stateMachine ,
52+ commitListener : commitListener ,
53+ broadcasters : broadcasters ,
54+ listeners : listeners ,
55+ blockFetcher : blockFetcher ,
56+ messageExtractor : messageExtractor ,
57+ getTimeout : getTimeout ,
58+ scheduledTms : make (map [types.Timeout ]* time.Timer ),
59+ timeoutsCh : make (chan types.Timeout ),
60+ syncListener : make (chan sync.BlockBody ),
5161 }
5262}
5363
@@ -123,6 +133,12 @@ func (d *Driver[V, H, A]) listen(ctx context.Context) error {
123133 return nil
124134 }
125135 actions = d .stateMachine .ProcessPrecommit (p )
136+ case p , ok := <- d .syncListener :
137+ if ! ok {
138+ return nil
139+ }
140+ proposal , precommits := d .messageExtractor .Extract (& p )
141+ actions = d .stateMachine .ProcessSync (& proposal , precommits )
126142 }
127143
128144 isCommitted , err = d .execute (ctx , false , actions )
@@ -180,14 +196,28 @@ func (d *Driver[V, H, A]) execute(
180196 return true , nil
181197
182198 case * actions.TriggerSync :
183- d .triggerSync (* action )
199+ d .triggerSync (ctx , * action )
184200 }
185201 }
186202 return false , nil
187203}
188204
189- func (d * Driver [V , H , A ]) triggerSync (sync actions.TriggerSync ) {
190- // TODO: Implement this
205+ func (d * Driver [V , H , A ]) triggerSync (ctx context.Context , sync actions.TriggerSync ) {
206+ for i := sync .Start ; i <= sync .End ; i ++ {
207+ for {
208+ select {
209+ case <- ctx .Done ():
210+ return
211+ default :
212+ }
213+
214+ if err := d .blockFetcher .ProcessBlock (ctx , uint64 (i ), d .syncListener ); err != nil {
215+ d .log .Errorw ("failed to process block" , "block" , i , "err" , err )
216+ } else {
217+ break
218+ }
219+ }
220+ }
191221}
192222
193223func (d * Driver [V , H , A ]) setTimeout (ctx context.Context , timeout types.Timeout ) {
0 commit comments