@@ -2,6 +2,7 @@ package driver
22
33import (
44 "context"
5+ "fmt"
56 "time"
67
78 "github.com/NethermindEth/juno/consensus/db"
@@ -62,16 +63,40 @@ func (d *Driver[V, H, A]) Run(ctx context.Context) error {
6263 }
6364 }()
6465
65- d .stateMachine .ReplayWAL ()
66+ if err := d .replay (ctx ); err != nil {
67+ return err
68+ }
69+
70+ return d .listen (ctx )
71+ }
72+
73+ func (d * Driver [V , H , A ]) replay (ctx context.Context ) error {
74+ for walEntry , err := range d .db .LoadAllEntries () {
75+ if err != nil {
76+ return fmt .Errorf ("failed to load WAL entries: %w" , err )
77+ }
6678
79+ if _ , err := d .execute (ctx , true , d .stateMachine .ProcessWAL (walEntry )); err != nil {
80+ return err
81+ }
82+ }
83+
84+ return nil
85+ }
86+
87+ func (d * Driver [V , H , A ]) listen (ctx context.Context ) error {
6788 for {
6889 select {
6990 case <- ctx .Done ():
7091 return nil
7192 default :
7293 }
94+
7395 actions := d .stateMachine .ProcessStart (0 )
74- isCommitted := d .execute (ctx , actions )
96+ isCommitted , err := d .execute (ctx , false , actions )
97+ if err != nil {
98+ return err
99+ }
75100
76101 // Todo: check message signature everytime a message is received.
77102 // For the time being it can be assumed the signature is correct.
@@ -100,46 +125,69 @@ func (d *Driver[V, H, A]) Run(ctx context.Context) error {
100125 actions = d .stateMachine .ProcessPrecommit (p )
101126 }
102127
103- isCommitted = d .execute (ctx , actions )
128+ isCommitted , err = d .execute (ctx , false , actions )
129+ if err != nil {
130+ return err
131+ }
104132 }
105133 }
106134}
107135
108136// This function executes the actions returned by the stateMachine.
109137// It returns true if a commit action was executed. This is to notify the caller to start a new height with round 0.
138+ // Note: `WriteWAL` actions are generated as part of processing the event itself, so there's no
139+ // need to write them to the WAL again here. `isReplaying` is used to disable the writing of WAL.
110140func (d * Driver [V , H , A ]) execute (
111141 ctx context.Context ,
112- executingActions []actions.Action [V , H , A ],
113- ) (isCommitted bool ) {
114- for _ , action := range executingActions {
142+ isReplaying bool ,
143+ resultActions []actions.Action [V , H , A ],
144+ ) (isCommitted bool , err error ) {
145+ for _ , action := range resultActions {
146+ if ! isReplaying && action .RequiresWALFlush () {
147+ if err := d .db .Flush (); err != nil {
148+ return false , fmt .Errorf ("failed to flush WAL: %w" , err )
149+ }
150+ }
151+
115152 switch action := action .(type ) {
153+ case * actions.WriteWAL [V , H , A ]:
154+ if ! isReplaying {
155+ if err := d .db .SetWALEntry (action .Entry ); err != nil {
156+ return false , fmt .Errorf ("failed to write WAL: %w" , err )
157+ }
158+ }
159+
116160 case * actions.BroadcastProposal [V , H , A ]:
117161 d .broadcasters .ProposalBroadcaster .Broadcast (ctx , (* types.Proposal [V , H , A ])(action ))
162+
118163 case * actions.BroadcastPrevote [H , A ]:
119164 d .broadcasters .PrevoteBroadcaster .Broadcast (ctx , (* types.Prevote [H , A ])(action ))
165+
120166 case * actions.BroadcastPrecommit [H , A ]:
121167 d .broadcasters .PrecommitBroadcaster .Broadcast (ctx , (* types.Precommit [H , A ])(action ))
168+
122169 case * actions.ScheduleTimeout :
123- d .scheduledTms [types .Timeout (* action )] = time .AfterFunc (d .getTimeout (action .Step , action .Round ), func () {
124- select {
125- case <- ctx .Done ():
126- case d .timeoutsCh <- types .Timeout (* action ):
127- }
128- })
129- case * actions.Commit [V , H , A ]:
130- if err := d .db .Flush (); err != nil {
131- d .log .Fatalf ("failed to flush WAL during commit" , "height" , action .Height , "round" , action .Round , "err" , err )
132- }
170+ d .setTimeout (ctx , types .Timeout (* action ))
133171
172+ case * actions.Commit [V , H , A ]:
134173 d .log .Debugw ("Committing" , "height" , action .Height , "round" , action .Round )
135174 d .commitListener .OnCommit (ctx , action .Height , * action .Value )
136175
137176 if err := d .db .DeleteWALEntries (action .Height ); err != nil {
138- d . log . Errorw ("failed to delete WAL messages during commit" , "height" , action . Height , "round" , action . Round , "err " , err )
177+ return true , fmt . Errorf ("failed to delete WAL messages during commit: %w " , err )
139178 }
140179
141- return true
180+ return true , nil
142181 }
143182 }
144- return false
183+ return false , nil
184+ }
185+
186+ func (d * Driver [V , H , A ]) setTimeout (ctx context.Context , timeout types.Timeout ) {
187+ d .scheduledTms [timeout ] = time .AfterFunc (d .getTimeout (timeout .Step , timeout .Round ), func () {
188+ select {
189+ case <- ctx .Done ():
190+ case d .timeoutsCh <- timeout :
191+ }
192+ })
145193}
0 commit comments