Skip to content

Commit e34efc6

Browse files
committed
feat: Integrate Sync P2P protocol to consensus
1 parent 4fea7c1 commit e34efc6

File tree

15 files changed

+421
-450
lines changed

15 files changed

+421
-450
lines changed

consensus/consensus.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ import (
1212
"github.com/NethermindEth/juno/consensus/proposal"
1313
"github.com/NethermindEth/juno/consensus/proposer"
1414
"github.com/NethermindEth/juno/consensus/starknet"
15+
consensusSync "github.com/NethermindEth/juno/consensus/sync"
1516
"github.com/NethermindEth/juno/consensus/tendermint"
1617
"github.com/NethermindEth/juno/consensus/types"
1718
"github.com/NethermindEth/juno/consensus/votecounter"
1819
"github.com/NethermindEth/juno/core/felt"
1920
"github.com/NethermindEth/juno/db"
21+
"github.com/NethermindEth/juno/p2p/sync"
2022
"github.com/NethermindEth/juno/utils"
2123
"github.com/NethermindEth/juno/vm"
2224
"github.com/libp2p/go-libp2p/core/host"
@@ -36,6 +38,7 @@ func Init(
3638
database db.KeyValueStore,
3739
blockchain *blockchain.Blockchain,
3840
vm vm.VM,
41+
blockFetcher *sync.BlockFetcher,
3942
nodeAddress *starknet.Address,
4043
validators votecounter.Validators[starknet.Address],
4144
timeoutFn driver.TimeoutFn,
@@ -49,7 +52,7 @@ func Init(
4952

5053
tendermintDB := consensusDB.NewTendermintDB[starknet.Value, starknet.Hash, starknet.Address](database)
5154

52-
executor := builder.NewExecutor(blockchain, vm, logger, false, true) // TODO: We're currently skipping signature validation
55+
executor := builder.NewExecutor(blockchain, vm, logger, false, false)
5356
builder := builder.New(blockchain, executor)
5457

5558
proposalStore := proposal.ProposalStore[starknet.Hash]{}
@@ -59,13 +62,18 @@ func Init(
5962
p2p := p2p.New(host, logger, &builder, &proposalStore, currentHeight, &config.DefaultBufferSizes, bootstrapPeersFn)
6063

6164
commitListener := driver.NewCommitListener(logger, &proposalStore, proposer, p2p)
65+
66+
messageExtractor := consensusSync.New(validators, toValue, &proposalStore)
67+
6268
driver := driver.New(
6369
logger,
6470
tendermintDB,
6571
stateMachine,
6672
commitListener,
6773
p2p.Broadcasters(),
6874
p2p.Listeners(),
75+
blockFetcher,
76+
&messageExtractor,
6977
timeoutFn,
7078
)
7179

consensus/consensus_test.go

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package consensus_test
33
import (
44
"fmt"
55
goitre "iter"
6+
gosync "sync"
67
"testing"
78
"time"
89

@@ -15,12 +16,15 @@ import (
1516
"github.com/NethermindEth/juno/core/felt"
1617
"github.com/NethermindEth/juno/db/memory"
1718
"github.com/NethermindEth/juno/genesis"
19+
"github.com/NethermindEth/juno/p2p/dht"
1820
"github.com/NethermindEth/juno/p2p/pubsub/testutils"
21+
"github.com/NethermindEth/juno/p2p/server"
22+
"github.com/NethermindEth/juno/p2p/starknetp2p"
23+
p2psync "github.com/NethermindEth/juno/p2p/sync"
1924
"github.com/NethermindEth/juno/sync"
2025
"github.com/NethermindEth/juno/utils"
2126
"github.com/NethermindEth/juno/vm"
2227
"github.com/sourcegraph/conc"
23-
"github.com/sourcegraph/conc/iter"
2428
"github.com/stretchr/testify/assert"
2529
"github.com/stretchr/testify/require"
2630
"go.uber.org/zap"
@@ -91,7 +95,8 @@ func loadGenesis(t *testing.T, log *utils.ZapLogger) (core.StateDiff, map[felt.F
9195
func initNode(
9296
t *testing.T,
9397
index int,
94-
node *testutils.Node,
98+
consensusNode *testutils.Node,
99+
syncNode *testutils.Node,
95100
logger *utils.ZapLogger,
96101
commits chan commit,
97102
cfg *testConfig,
@@ -113,20 +118,41 @@ func initNode(
113118
}
114119
vm := vm.New(&chainInfo, false, logger)
115120

121+
blockFetcher := p2psync.NewBlockFetcher(bc, syncNode.Host, &network, logger)
122+
syncServer := server.New(syncNode.Host, bc, logger)
123+
116124
services, err := consensus.Init(
117-
node.Host,
125+
consensusNode.Host,
118126
logger,
119127
consensusDB,
120128
bc,
121129
vm,
130+
&blockFetcher,
122131
&mockServices.NodeAddress,
123132
mockServices.Validators,
124133
mockServices.TimeoutFn,
125-
node.GetBootstrapPeers,
134+
consensusNode.GetBootstrapPeers,
126135
)
127136
require.NoError(t, err)
128137

129138
wg := conc.NewWaitGroup()
139+
wg.Go(func() {
140+
dht, err := dht.New(
141+
t.Context(),
142+
syncNode.Host,
143+
&network,
144+
starknetp2p.SyncProtocolID,
145+
syncNode.GetBootstrapPeers,
146+
)
147+
require.NoError(t, err)
148+
require.NoError(t, dht.Bootstrap(t.Context()))
149+
t.Cleanup(func() {
150+
require.NoError(t, dht.Close())
151+
})
152+
})
153+
wg.Go(func() {
154+
require.NoError(t, syncServer.Run(t.Context()))
155+
})
130156
wg.Go(func() {
131157
require.NoError(t, services.Proposer.Run(t.Context()))
132158
})
@@ -269,12 +295,26 @@ func runTest(t *testing.T, cfg testConfig) {
269295

270296
commits := make(chan commit, commitBufferSize)
271297

272-
nodes := testutils.BuildNetworks(t, cfg.networkSetup(honestNodeCount))
298+
consensusNodes := testutils.BuildNetworks(t, cfg.networkSetup(honestNodeCount))
299+
syncNodes := testutils.BuildNetworks(t, cfg.networkSetup(honestNodeCount))
273300

274-
iterator := iter.Iterator[testutils.Node]{MaxGoroutines: honestNodeCount}
275-
iterator.ForEachIdx(nodes, func(i int, node *testutils.Node) {
276-
initNode(t, i, node, logger, commits, &cfg, genesisDiff, genesisClasses)
277-
})
301+
wg := gosync.WaitGroup{}
302+
t.Cleanup(wg.Wait)
303+
for i := range honestNodeCount {
304+
wg.Go(func() {
305+
initNode(
306+
t,
307+
i,
308+
&consensusNodes[i],
309+
&syncNodes[i],
310+
logger,
311+
commits,
312+
&cfg,
313+
genesisDiff,
314+
genesisClasses,
315+
)
316+
})
317+
}
278318

279319
assertCommits(t, commits, cfg, logger)
280320
}

consensus/driver/commit_listener.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ type CommitHook[V types.Hashable[H], H types.Hash] interface {
1515
}
1616

1717
// CommitListener is a component that is used to notify different components that a new committed block is available.
18-
//
19-
//go:generate mockgen -destination=../mocks/mock_commit_listener.go -package=mocks github.com/NethermindEth/juno/consensus/driver CommitListener
2018
type CommitListener[V types.Hashable[H], H types.Hash] interface {
2119
CommitHook[V, H]
2220
// Listen returns a channel that will receive committed blocks.

consensus/driver/driver.go

Lines changed: 102 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,37 @@ package driver
33
import (
44
"context"
55
"fmt"
6+
gosync "sync"
67
"time"
78

89
"github.com/NethermindEth/juno/consensus/db"
910
"github.com/NethermindEth/juno/consensus/p2p"
11+
consensusSync "github.com/NethermindEth/juno/consensus/sync"
1012
"github.com/NethermindEth/juno/consensus/tendermint"
1113
"github.com/NethermindEth/juno/consensus/types"
1214
"github.com/NethermindEth/juno/consensus/types/actions"
15+
"github.com/NethermindEth/juno/p2p/sync"
1316
"github.com/NethermindEth/juno/utils"
1417
)
1518

1619
type TimeoutFn func(step types.Step, round types.Round) time.Duration
1720

1821
type Driver[V types.Hashable[H], H types.Hash, A types.Addr] struct {
19-
log utils.Logger
20-
db db.TendermintDB[V, H, A]
21-
stateMachine tendermint.StateMachine[V, H, A]
22-
commitListener CommitListener[V, H]
23-
broadcasters p2p.Broadcasters[V, H, A]
24-
listeners p2p.Listeners[V, H, A]
25-
26-
getTimeout TimeoutFn
22+
log utils.Logger
23+
db db.TendermintDB[V, H, A]
24+
stateMachine tendermint.StateMachine[V, H, A]
25+
commitListener CommitListener[V, H]
26+
broadcasters p2p.Broadcasters[V, H, A]
27+
listeners p2p.Listeners[V, H, A]
28+
blockFetcher *sync.BlockFetcher
29+
messageExtractor *consensusSync.MessageExtractor[V, H, A]
30+
getTimeout TimeoutFn
2731

2832
scheduledTms map[types.Timeout]*time.Timer
2933
timeoutsCh chan types.Timeout
34+
syncListener chan sync.BlockBody
35+
lastQuorum types.Height
36+
wg gosync.WaitGroup
3037
}
3138

3239
func New[V types.Hashable[H], H types.Hash, A types.Addr](
@@ -36,18 +43,24 @@ func New[V types.Hashable[H], H types.Hash, A types.Addr](
3643
commitListener CommitListener[V, H],
3744
broadcasters p2p.Broadcasters[V, H, A],
3845
listeners p2p.Listeners[V, H, A],
46+
blockFetcher *sync.BlockFetcher,
47+
messageExtractor *consensusSync.MessageExtractor[V, H, A],
3948
getTimeout TimeoutFn,
4049
) Driver[V, H, A] {
4150
return Driver[V, H, A]{
42-
log: log,
43-
db: db,
44-
stateMachine: stateMachine,
45-
commitListener: commitListener,
46-
broadcasters: broadcasters,
47-
listeners: listeners,
48-
getTimeout: getTimeout,
49-
scheduledTms: make(map[types.Timeout]*time.Timer),
50-
timeoutsCh: make(chan types.Timeout),
51+
log: log,
52+
db: db,
53+
stateMachine: stateMachine,
54+
commitListener: commitListener,
55+
broadcasters: broadcasters,
56+
listeners: listeners,
57+
blockFetcher: blockFetcher,
58+
messageExtractor: messageExtractor,
59+
getTimeout: getTimeout,
60+
scheduledTms: make(map[types.Timeout]*time.Timer),
61+
timeoutsCh: make(chan types.Timeout),
62+
syncListener: make(chan sync.BlockBody, 1),
63+
wg: gosync.WaitGroup{},
5164
}
5265
}
5366

@@ -84,7 +97,9 @@ func (d *Driver[V, H, A]) replay(ctx context.Context) error {
8497
return nil
8598
}
8699

100+
//nolint:gocyclo // This is having higher complexity due to adding ok check for unmanaged channels.
87101
func (d *Driver[V, H, A]) listen(ctx context.Context) error {
102+
defer d.wg.Wait()
88103
for {
89104
select {
90105
case <-ctx.Done():
@@ -123,13 +138,26 @@ func (d *Driver[V, H, A]) listen(ctx context.Context) error {
123138
return nil
124139
}
125140
actions = d.stateMachine.ProcessPrecommit(p)
141+
case p, ok := <-d.syncListener:
142+
if !ok {
143+
return nil
144+
}
145+
146+
if p.Err != nil {
147+
d.syncCurrentHeight(ctx)
148+
} else {
149+
proposal, precommits := d.messageExtractor.Extract(&p)
150+
actions = d.stateMachine.ProcessSync(&proposal, precommits)
151+
}
126152
}
127153

128154
isCommitted, err = d.execute(ctx, false, actions)
129155
if err != nil {
130156
return err
131157
}
132158
}
159+
160+
d.syncCurrentHeight(ctx)
133161
}
134162
}
135163

@@ -167,34 +195,76 @@ func (d *Driver[V, H, A]) execute(
167195
d.broadcasters.PrecommitBroadcaster.Broadcast(ctx, (*types.Precommit[H, A])(action))
168196

169197
case *actions.ScheduleTimeout:
170-
d.setTimeout(ctx, types.Timeout(*action))
198+
d.scheduleTimeout(ctx, types.Timeout(*action))
171199

172200
case *actions.Commit[V, H, A]:
173-
d.log.Debugw("Committing", "height", action.Height, "round", action.Round)
174-
d.commitListener.OnCommit(ctx, action.Height, *action.Value)
175-
176-
if err := d.db.DeleteWALEntries(action.Height); err != nil {
177-
return true, fmt.Errorf("failed to delete WAL messages during commit: %w", err)
178-
}
179-
180-
return true, nil
201+
return true, d.commit(ctx, action)
181202

182203
case *actions.TriggerSync:
183-
d.triggerSync(*action)
204+
d.triggerSync(ctx, *action)
184205
}
185206
}
186207
return false, nil
187208
}
188209

189-
func (d *Driver[V, H, A]) triggerSync(sync actions.TriggerSync) {
190-
// TODO: Implement this
191-
}
192-
193-
func (d *Driver[V, H, A]) setTimeout(ctx context.Context, timeout types.Timeout) {
210+
func (d *Driver[V, H, A]) scheduleTimeout(ctx context.Context, timeout types.Timeout) {
194211
d.scheduledTms[timeout] = time.AfterFunc(d.getTimeout(timeout.Step, timeout.Round), func() {
195212
select {
196213
case <-ctx.Done():
197214
case d.timeoutsCh <- timeout:
198215
}
199216
})
200217
}
218+
219+
func (d *Driver[V, H, A]) commit(ctx context.Context, commit *actions.Commit[V, H, A]) error {
220+
d.log.Debugw("Committing", "height", commit.Height, "round", commit.Round)
221+
d.commitListener.OnCommit(ctx, commit.Height, *commit.Value)
222+
223+
if err := d.db.DeleteWALEntries(commit.Height); err != nil {
224+
return fmt.Errorf("failed to delete WAL messages during commit: %w", err)
225+
}
226+
227+
return nil
228+
}
229+
230+
func (d *Driver[V, H, A]) triggerSync(ctx context.Context, triggerSync actions.TriggerSync) {
231+
currentlyHasFutureQuorum := d.hasFutureQuorum()
232+
// TODO: Temporary workaround to only trigger the next height, because the sync process
233+
// doesn't support triggering multiple heights at once.
234+
d.lastQuorum = max(d.lastQuorum, triggerSync.End)
235+
236+
// Only trigger sync if haven't triggered yet.
237+
if !currentlyHasFutureQuorum {
238+
d.syncCurrentHeight(ctx)
239+
}
240+
}
241+
242+
// TODO: Temporary workaround to only trigger the next height, because the sync process currently
243+
// doesn't support triggering multiple heights at once.
244+
func (d *Driver[V, H, A]) syncCurrentHeight(ctx context.Context) {
245+
height := d.stateMachine.Height()
246+
247+
if !d.hasFutureQuorum() {
248+
return
249+
}
250+
251+
d.wg.Go(func() {
252+
for {
253+
select {
254+
case <-ctx.Done():
255+
return
256+
default:
257+
}
258+
259+
if err := d.blockFetcher.ProcessBlock(ctx, uint64(height), d.syncListener); err != nil {
260+
d.log.Errorw("failed to trigger sync", "block", height, "err", err)
261+
} else {
262+
break
263+
}
264+
}
265+
})
266+
}
267+
268+
func (d *Driver[V, H, A]) hasFutureQuorum() bool {
269+
return d.lastQuorum > d.stateMachine.Height()
270+
}

consensus/driver/driver_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ func TestDriver(t *testing.T) {
178178
newMockCommitListener(t, &commitAction),
179179
broadcasters,
180180
listeners,
181+
nil, // TODO: Add tests for trigger sync
182+
nil, // TODO: Add tests for trigger sync
181183
mockTimeoutFn,
182184
)
183185

0 commit comments

Comments
 (0)