Skip to content

Commit b7ea5bb

Browse files
committed
refactor: Separate Sync P2P server from P2P service
1 parent e3768ce commit b7ea5bb

File tree

5 files changed

+121
-83
lines changed

5 files changed

+121
-83
lines changed

node/node.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,6 @@ func New(cfg *Config, version string, logLevel *utils.LogLevel) (*Node, error) {
396396
} else if p2pService != nil {
397397
// regular p2p node
398398
p2pService.WithListener(makeSyncMetrics(&sync.NoopSynchronizer{}, chain))
399-
p2pService.WithGossipTracer()
400399
}
401400
}
402401
earlyServices = append(earlyServices, makeMetrics(cfg.MetricsHost, cfg.MetricsPort))

p2p/p2p.go

Lines changed: 19 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,21 @@ import (
1212
"github.com/NethermindEth/juno/blockchain"
1313
"github.com/NethermindEth/juno/db"
1414
"github.com/NethermindEth/juno/p2p/dht"
15-
p2pPeers "github.com/NethermindEth/juno/p2p/peers"
15+
"github.com/NethermindEth/juno/p2p/server"
1616
"github.com/NethermindEth/juno/p2p/starknetp2p"
1717
p2pSync "github.com/NethermindEth/juno/p2p/sync"
1818
junoSync "github.com/NethermindEth/juno/sync"
1919
"github.com/NethermindEth/juno/utils"
2020
"github.com/libp2p/go-libp2p"
2121
libp2pdht "github.com/libp2p/go-libp2p-kad-dht"
22-
pubsub "github.com/libp2p/go-libp2p-pubsub"
2322
"github.com/libp2p/go-libp2p/core/crypto"
2423
"github.com/libp2p/go-libp2p/core/crypto/pb"
2524
"github.com/libp2p/go-libp2p/core/host"
2625
"github.com/libp2p/go-libp2p/core/network"
2726
"github.com/libp2p/go-libp2p/core/peer"
2827
"github.com/libp2p/go-libp2p/core/protocol"
2928
"github.com/multiformats/go-multiaddr"
29+
"golang.org/x/sync/errgroup"
3030
"google.golang.org/protobuf/proto"
3131
)
3232

@@ -40,14 +40,12 @@ type Service struct {
4040
host host.Host
4141

4242
network *utils.Network
43-
handler *p2pPeers.Handler
43+
server *server.Server
4444
log utils.SimpleLogger
4545

46-
dht *libp2pdht.IpfsDHT
47-
pubsub *pubsub.PubSub
46+
dht *libp2pdht.IpfsDHT
4847

4948
synchroniser *p2pSync.Service
50-
gossipTracer *gossipTracer
5149

5250
feederNode bool
5351
database db.KeyValueStore
@@ -146,14 +144,15 @@ func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchai
146144

147145
blockFetcher := p2pSync.NewBlockFetcher(bc, p2phost, snNetwork, log)
148146
synchroniser := p2pSync.New(bc, log, &blockFetcher)
147+
server := server.New(p2phost, bc, log)
149148
s := &Service{
150149
synchroniser: synchroniser,
151150
log: log,
152151
host: p2phost,
153152
network: snNetwork,
154153
dht: p2pdht,
155154
feederNode: feederNode,
156-
handler: p2pPeers.NewHandler(bc, log),
155+
server: &server,
157156
database: database,
158157
}
159158
return s, nil
@@ -208,16 +207,6 @@ func (s *Service) Run(ctx context.Context) error {
208207
return err
209208
}
210209

211-
var options []pubsub.Option
212-
if s.gossipTracer != nil {
213-
options = append(options, pubsub.WithRawTracer(s.gossipTracer))
214-
}
215-
216-
s.pubsub, err = pubsub.NewGossipSub(ctx, s.host, options...)
217-
if err != nil {
218-
return err
219-
}
220-
221210
defer s.callAndLogErr(s.dht.Close, "Failed stopping DHT")
222211

223212
listenAddrs, err := s.ListenAddrs()
@@ -228,10 +217,21 @@ func (s *Service) Run(ctx context.Context) error {
228217
s.log.Infow("Listening on", "addr", addr)
229218
}
230219

231-
s.setProtocolHandlers()
220+
g := errgroup.Group{}
232221

233222
if !s.feederNode {
234-
s.synchroniser.Run(ctx)
223+
g.Go(func() error {
224+
s.synchroniser.Run(ctx)
225+
return nil
226+
})
227+
}
228+
229+
g.Go(func() error {
230+
return s.server.Run(ctx)
231+
})
232+
233+
if err := g.Wait(); err != nil {
234+
return err
235235
}
236236

237237
<-ctx.Done()
@@ -244,14 +244,6 @@ func (s *Service) Run(ctx context.Context) error {
244244
return nil
245245
}
246246

247-
func (s *Service) setProtocolHandlers() {
248-
s.SetProtocolHandler(starknetp2p.HeadersSyncSubProtocol, s.handler.HeadersHandler)
249-
s.SetProtocolHandler(starknetp2p.EventsSyncSubProtocol, s.handler.EventsHandler)
250-
s.SetProtocolHandler(starknetp2p.TransactionsSyncSubProtocol, s.handler.TransactionsHandler)
251-
s.SetProtocolHandler(starknetp2p.ClassesSyncSubProtocol, s.handler.ClassesHandler)
252-
s.SetProtocolHandler(starknetp2p.StateDiffSyncSubProtocol, s.handler.StateDiffHandler)
253-
}
254-
255247
func (s *Service) callAndLogErr(f func() error, msg string) {
256248
err := f()
257249
if err != nil {
@@ -299,21 +291,10 @@ func (s *Service) NewStream(ctx context.Context, pids ...protocol.ID) (network.S
299291
}
300292
}
301293

302-
func (s *Service) SetProtocolHandler(
303-
syncSubProtocol starknetp2p.SyncSubProtocol,
304-
handler func(network.Stream),
305-
) {
306-
s.host.SetStreamHandler(starknetp2p.Sync(s.network, syncSubProtocol), handler)
307-
}
308-
309294
func (s *Service) WithListener(l junoSync.EventListener) {
310295
s.synchroniser.WithListener(l)
311296
}
312297

313-
func (s *Service) WithGossipTracer() {
314-
s.gossipTracer = NewGossipTracer(s.host)
315-
}
316-
317298
// persistPeers stores the given peers in the peers database
318299
func (s *Service) persistPeers() error {
319300
txn := s.database.NewBatch()

p2p/peers/iterator.go renamed to p2p/server/iterator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package peers
1+
package server
22

33
import (
44
"errors"

p2p/peers/iterator_test.go renamed to p2p/server/iterator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package peers
1+
package server
22

33
import (
44
"testing"

0 commit comments

Comments
 (0)