From 773b0b290ac0196bc63b11d79737c8f2da619fd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20IRMAK?= Date: Wed, 16 Jul 2025 15:03:29 +0300 Subject: [PATCH 1/4] trie: swap out the pools in StackTrie with an arena allocator Arena allocators are a perfect match for something like StackTrie where allocations happens in stack-like "frames" and can then be deallocation at the end of the "allocation frame" We can construct the "allocation frames" every time we create a new extension/branch node and decontstruct the frame by deallocating everything allocated on the arena during that frame. This saves us from dealing with individual allocations/deallocations --- common/arena.go | 61 ++++++++++++++++++ trie/bytepool.go | 64 ------------------- trie/stacktrie.go | 136 +++++++++++++++++++++++++++++------------ trie/stacktrie_test.go | 73 ++++++++++++++++++++++ 4 files changed, 231 insertions(+), 103 deletions(-) create mode 100644 common/arena.go delete mode 100644 trie/bytepool.go diff --git a/common/arena.go b/common/arena.go new file mode 100644 index 00000000000..3d38defd975 --- /dev/null +++ b/common/arena.go @@ -0,0 +1,61 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package common + +// Arena is an allocation primitive that allows individual allocations +// out of a page of items and bulk de-allocations of last N allocations. +// The most common way of using an Arena is to pair it up with a sync.Pool +// of pages. +type Arena[T any] struct { + used uint32 + pages [][]T + + PageSize uint32 + NewPage func() any + ReleasePage func(any) +} + +// Alloc returns the next free item on the arena +// Allocates a new page if needed +func (a *Arena[T]) Alloc() *T { + pageIndex := a.used / a.PageSize + pageOffset := a.used % a.PageSize + if pageOffset == 0 { + a.pages = append(a.pages, a.NewPage().([]T)) + } + a.used++ + return &a.pages[pageIndex][pageOffset] +} + +// Used returns the number of items that live on this arena +func (a *Arena[T]) Used() uint32 { + return a.used +} + +// Reset rollsback the active set of live elements to the given number +func (a *Arena[T]) Reset(to uint32) { + a.used = to +} + +// Release releases all the pages that the arena currently owns +func (a *Arena[T]) Release() { + for _, page := range a.pages { + a.ReleasePage(page) + } + a.pages = nil + a.used = 0 +} diff --git a/trie/bytepool.go b/trie/bytepool.go deleted file mode 100644 index 4f9c5672fd9..00000000000 --- a/trie/bytepool.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2024 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package trie - -// bytesPool is a pool for byte slices. It is safe for concurrent use. -type bytesPool struct { - c chan []byte - w int -} - -// newBytesPool creates a new bytesPool. The sliceCap sets the capacity of -// newly allocated slices, and the nitems determines how many items the pool -// will hold, at maximum. -func newBytesPool(sliceCap, nitems int) *bytesPool { - return &bytesPool{ - c: make(chan []byte, nitems), - w: sliceCap, - } -} - -// Get returns a slice. Safe for concurrent use. -func (bp *bytesPool) Get() []byte { - select { - case b := <-bp.c: - return b - default: - return make([]byte, 0, bp.w) - } -} - -// GetWithSize returns a slice with specified byte slice size. -func (bp *bytesPool) GetWithSize(s int) []byte { - b := bp.Get() - if cap(b) < s { - return make([]byte, s) - } - return b[:s] -} - -// Put returns a slice to the pool. Safe for concurrent use. This method -// will ignore slices that are too small or too large (>3x the cap) -func (bp *bytesPool) Put(b []byte) { - if c := cap(b); c < bp.w || c > 3*bp.w { - return - } - select { - case bp.c <- b: - default: - } -} diff --git a/trie/stacktrie.go b/trie/stacktrie.go index 2b7366c3c51..ac8baca18d1 100644 --- a/trie/stacktrie.go +++ b/trie/stacktrie.go @@ -20,15 +20,31 @@ import ( "bytes" "errors" "sync" + "unsafe" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/metrics" ) +var stNodeAllocationMeter = metrics.NewRegisteredMeter("stacktrie/allocation/node", nil) +var byteAllocationMeter = metrics.NewRegisteredMeter("stacktrie/allocation/byte", nil) + var ( - stPool = sync.Pool{New: func() any { return new(stNode) }} - bPool = newBytesPool(32, 100) - _ = types.TrieHasher((*StackTrie)(nil)) + stPageSize = 1024 + stPagePool = sync.Pool{ + New: func() any { + stNodeAllocationMeter.Mark(int64(unsafe.Sizeof(stNode{})) * int64(stPageSize)) + return make([]stNode, stPageSize) + }, + } + bytePagePool = sync.Pool{ + New: func() any { + byteAllocationMeter.Mark(int64(unsafe.Sizeof([32]byte{})) * int64(stPageSize)) + return make([][32]byte, stPageSize) + }, + } + _ = types.TrieHasher((*StackTrie)(nil)) ) // OnTrieNode is a callback method invoked when a trie node is committed @@ -40,6 +56,14 @@ var ( // it after the call ends. type OnTrieNode func(path []byte, hash common.Hash, blob []byte) +// allocationFrame keeps track of the position of the allocators in StackTrie +// at the time of a new branch/ext node being created. At the point where this +// node is hashed, allocators can be reset to the positions stored in the allocation +// frame +type allocationFrame struct { + node, bytes uint32 +} + // StackTrie is a trie implementation that expects keys to be inserted // in order. Once it determines that a subtree will no longer be inserted // into, it will hash it and free up the memory it uses. @@ -48,20 +72,47 @@ type StackTrie struct { h *hasher last []byte onTrieNode OnTrieNode - kBuf []byte // buf space used for hex-key during insertions - pBuf []byte // buf space used for path during insertions + + nodeAllocator common.Arena[stNode] + byteAllocator common.Arena[[32]byte] + allocationStackFrames []allocationFrame + + kBuf []byte // buf space used for hex-key during insertions + pBuf []byte // buf space used for path during insertions + tmpNode *stNode // used as a temporary ext node when needed } // NewStackTrie allocates and initializes an empty trie. The committed nodes // will be discarded immediately if no callback is configured. func NewStackTrie(onTrieNode OnTrieNode) *StackTrie { - return &StackTrie{ - root: stPool.Get().(*stNode), - h: newHasher(false), - onTrieNode: onTrieNode, - kBuf: make([]byte, 64), - pBuf: make([]byte, 64), + t := StackTrie{ + h: newHasher(false), + onTrieNode: onTrieNode, + kBuf: make([]byte, 64), + pBuf: make([]byte, 64), + nodeAllocator: common.Arena[stNode]{NewPage: stPagePool.Get, PageSize: uint32(stPageSize), ReleasePage: stPagePool.Put}, + byteAllocator: common.Arena[[32]byte]{NewPage: bytePagePool.Get, PageSize: uint32(stPageSize), ReleasePage: bytePagePool.Put}, } + t.root = t.nodeAllocator.Alloc().reset() + t.tmpNode = t.nodeAllocator.Alloc().reset() + return &t +} + +// creates a new allocation frame by saving positions of internal allocators +func (t *StackTrie) pushAllocationFrame() { + t.allocationStackFrames = append(t.allocationStackFrames, allocationFrame{ + node: t.nodeAllocator.Used(), + bytes: t.byteAllocator.Used(), + }) +} + +// pops a saved allocation frame and rollsback allocators to the state they were +// at the beginning of the frame +func (t *StackTrie) popAllocationFrame() { + allocationFrame := t.allocationStackFrames[len(t.allocationStackFrames)-1] + t.nodeAllocator.Reset(allocationFrame.node) + t.byteAllocator.Reset(allocationFrame.bytes) + t.allocationStackFrames = t.allocationStackFrames[:len(t.allocationStackFrames)-1] } func (t *StackTrie) grow(key []byte) { @@ -94,7 +145,10 @@ func (t *StackTrie) Update(key, value []byte) error { // Reset resets the stack trie object to empty state. func (t *StackTrie) Reset() { - t.root = stPool.Get().(*stNode) + t.nodeAllocator.Reset(0) + t.byteAllocator.Reset(0) + t.root = t.nodeAllocator.Alloc().reset() + t.tmpNode = t.nodeAllocator.Alloc().reset() t.last = nil } @@ -116,18 +170,17 @@ type stNode struct { // newLeaf constructs a leaf node with provided node key and value. The key // will be deep-copied in the function and safe to modify afterwards, but // value is not. -func newLeaf(key, val []byte) *stNode { - st := stPool.Get().(*stNode) +func (t *StackTrie) newLeaf(key, val []byte) *stNode { + st := t.nodeAllocator.Alloc().reset() st.typ = leafNode st.key = append(st.key, key...) st.val = val return st } -// newExt constructs an extension node with provided node key and child. The +// makeExt constructs an extension node with provided node key and child. The // key will be deep-copied in the function and safe to modify afterwards. -func newExt(key []byte, child *stNode) *stNode { - st := stPool.Get().(*stNode) +func makeExt(st *stNode, key []byte, child *stNode) *stNode { st.typ = extNode st.key = append(st.key, key...) st.children[0] = child @@ -144,12 +197,6 @@ const ( ) func (n *stNode) reset() *stNode { - if n.typ == hashedNode { - // On hashnodes, we 'own' the val: it is guaranteed to be not held - // by external caller. Hence, when we arrive here, we can put it back - // into the pool - bPool.Put(n.val) - } n.key = n.key[:0] n.val = nil for i := range n.children { @@ -194,7 +241,7 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, path []byte) { // Add new child if st.children[idx] == nil { - st.children[idx] = newLeaf(key[1:], value) + st.children[idx] = t.newLeaf(key[1:], value) } else { t.insert(st.children[idx], key[1:], value, append(path, key[0])) } @@ -223,8 +270,14 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, path []byte) { // Break on the non-last byte, insert an intermediate // extension. The path prefix of the newly-inserted // extension should also contain the different byte. - n = newExt(st.key[diffidx+1:], st.children[0]) - t.hash(n, append(path, st.key[:diffidx+1]...)) + e := makeExt(t.tmpNode.reset(), st.key[diffidx+1:], st.children[0]) // build a temporary extension node to hash + t.hash(e, append(path, st.key[:diffidx+1]...)) // frame belonging to st gets popped here + + // allocate a new node to hold the hashed e + n = t.nodeAllocator.Alloc().reset() + n.typ = hashedNode + n.val = e.val + t.pushAllocationFrame() // for the "new" st that might end up being a branch or a ext } else { // Break on the last byte, no need to insert // an extension node: reuse the current node. @@ -245,12 +298,14 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, path []byte) { // the common prefix is at least one byte // long, insert a new intermediate branch // node. - st.children[0] = stPool.Get().(*stNode) + st.children[0] = t.nodeAllocator.Alloc().reset() st.children[0].typ = branchNode + t.pushAllocationFrame() // for the new branch child p = st.children[0] } + // Create a leaf for the inserted part - o := newLeaf(key[diffidx+1:], value) + o := t.newLeaf(key[diffidx+1:], value) // Insert both child leaves where they belong: origIdx := st.key[diffidx] @@ -282,11 +337,14 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, path []byte) { st.typ = branchNode p = st st.children[0] = nil + t.pushAllocationFrame() // leafnode turning into a branch node } else { // Convert current node into an ext, // and insert a child branch node. st.typ = extNode - st.children[0] = stPool.Get().(*stNode) + t.pushAllocationFrame() // leafnode turning into a ext node + st.children[0] = t.nodeAllocator.Alloc().reset() + t.pushAllocationFrame() // new branch node st.children[0].typ = branchNode p = st.children[0] } @@ -295,11 +353,11 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, path []byte) { // value and another containing the new value. The child leaf // is hashed directly in order to free up some memory. origIdx := st.key[diffidx] - p.children[origIdx] = newLeaf(st.key[diffidx+1:], st.val) + p.children[origIdx] = t.newLeaf(st.key[diffidx+1:], st.val) t.hash(p.children[origIdx], append(path, st.key[:diffidx+1]...)) newIdx := key[diffidx] - p.children[newIdx] = newLeaf(key[diffidx+1:], value) + p.children[newIdx] = t.newLeaf(key[diffidx+1:], value) // Finally, cut off the key part that has been passed // over to the children. @@ -359,9 +417,8 @@ func (t *StackTrie) hash(st *stNode, path []byte) { continue } st.children[i] = nil - stPool.Put(child.reset()) // Release child back to pool. } - + t.popAllocationFrame() case extNode: // recursively hash and commit child as the first step t.hash(st.children[0], append(path, st.key...)) @@ -373,10 +430,8 @@ func (t *StackTrie) hash(st *stNode, path []byte) { } n.encode(t.h.encbuf) blob = t.h.encodedBytes() - - stPool.Put(st.children[0].reset()) // Release child back to pool. st.children[0] = nil - + t.popAllocationFrame() case leafNode: st.key = append(st.key, byte(16)) n := leafNodeEncoder{ @@ -398,13 +453,13 @@ func (t *StackTrie) hash(st *stNode, path []byte) { // Skip committing the non-root node if the size is smaller than 32 bytes // as tiny nodes are always embedded in their parent except root node. if len(blob) < 32 && len(path) > 0 { - st.val = bPool.GetWithSize(len(blob)) + st.val = t.byteAllocator.Alloc()[:len(blob)] copy(st.val, blob) return } // Write the hash to the 'val'. We allocate a new val here to not mutate // input values. - st.val = bPool.GetWithSize(32) + st.val = t.byteAllocator.Alloc()[:32] t.h.hashDataTo(st.val, blob) // Invoke the callback it's provided. Notably, the path and blob slices are @@ -422,5 +477,8 @@ func (t *StackTrie) hash(st *stNode, path []byte) { func (t *StackTrie) Hash() common.Hash { n := t.root t.hash(n, nil) - return common.BytesToHash(n.val) + hash := common.BytesToHash(n.val) + t.byteAllocator.Release() + t.nodeAllocator.Release() + return hash } diff --git a/trie/stacktrie_test.go b/trie/stacktrie_test.go index 7e342e64bf4..66c8eaa604e 100644 --- a/trie/stacktrie_test.go +++ b/trie/stacktrie_test.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestStackTrieInsertAndHash(t *testing.T) { @@ -444,3 +445,75 @@ func TestInsert100K(t *testing.T) { t.Fatalf("hash wrong, have %x want %x", have, want) } } + +func TestAllocationFrame(t *testing.T) { + val := []byte{0} + + t.Run("leaf turning into a branch", func(t *testing.T) { + s := NewStackTrie(nil) + require.NoError(t, s.Update([]byte{0x01, 0x00}, val)) + assert.Equal(t, uint8(leafNode), s.root.typ) + assert.Equal(t, []byte{0, 0x1, 0, 0}, s.root.key) + require.NoError(t, s.Update([]byte{0x11, 0x00}, val)) + assert.Equal(t, uint8(branchNode), s.root.typ) + assert.Len(t, s.allocationStackFrames, 1) + s.Hash() + assert.Len(t, s.allocationStackFrames, 0) + }) + + t.Run("leaf turning into an extension node", func(t *testing.T) { + s := NewStackTrie(nil) + require.NoError(t, s.Update([]byte{0x01, 0x00}, val)) + assert.Equal(t, uint8(leafNode), s.root.typ) + assert.Equal(t, []byte{0, 0x1, 0, 0}, s.root.key) + require.NoError(t, s.Update([]byte{0x01, 0x10}, val)) + assert.Equal(t, uint8(extNode), s.root.typ) + assert.Len(t, s.allocationStackFrames, 2) + s.Hash() + assert.Len(t, s.allocationStackFrames, 0) + }) + + t.Run("extension node creates an intermediate branch", func(t *testing.T) { + s := NewStackTrie(nil) + require.NoError(t, s.Update([]byte{0x01, 0x00}, val)) + assert.Equal(t, uint8(leafNode), s.root.typ) + assert.Equal(t, []byte{0, 0x1, 0, 0}, s.root.key) + require.NoError(t, s.Update([]byte{0x01, 0x10}, val)) + assert.Equal(t, uint8(extNode), s.root.typ) + assert.Len(t, s.allocationStackFrames, 2) + require.NoError(t, s.Update([]byte{0x02, 0x10}, val)) + assert.Equal(t, uint8(extNode), s.root.typ) + assert.Len(t, s.allocationStackFrames, 2) + s.Hash() + assert.Len(t, s.allocationStackFrames, 0) + }) + + t.Run("extension node creates an intermediate extension node", func(t *testing.T) { + s := NewStackTrie(nil) + require.NoError(t, s.Update([]byte{0x01, 0x00}, val)) + assert.Equal(t, uint8(leafNode), s.root.typ) + assert.Equal(t, []byte{0, 0x1, 0, 0}, s.root.key) + require.NoError(t, s.Update([]byte{0x01, 0x10}, val)) + assert.Equal(t, uint8(extNode), s.root.typ) + assert.Len(t, s.allocationStackFrames, 2) + require.NoError(t, s.Update([]byte{0x11, 0x10}, val)) + assert.Equal(t, uint8(branchNode), s.root.typ) + assert.Len(t, s.allocationStackFrames, 1) + s.Hash() + assert.Len(t, s.allocationStackFrames, 0) + }) + t.Run("extension node creates an intermediate extension node (2)", func(t *testing.T) { + s := NewStackTrie(nil) + require.NoError(t, s.Update([]byte{0x00, 0x10}, val)) + assert.Equal(t, uint8(leafNode), s.root.typ) + assert.Equal(t, []byte{0, 0, 1, 0}, s.root.key) + require.NoError(t, s.Update([]byte{0x00, 0x11}, val)) + assert.Equal(t, uint8(extNode), s.root.typ) + assert.Len(t, s.allocationStackFrames, 2) + require.NoError(t, s.Update([]byte{0x01, 0x11}, val)) + assert.Equal(t, uint8(extNode), s.root.typ) + assert.Len(t, s.allocationStackFrames, 2) + s.Hash() + assert.Len(t, s.allocationStackFrames, 0) + }) +} From c5b268c31df4346e0541976c37a7e3760dd8e38c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20IRMAK?= Date: Mon, 21 Jul 2025 18:40:55 +0300 Subject: [PATCH 2/4] eth/snap: track time spent on trie rebuilding --- eth/protocols/snap/sync.go | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 84ceb9105ea..1e280984045 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/msgrate" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -106,6 +107,9 @@ var ( // storageConcurrency is the number of chunks to split a large contract // storage trie into to allow concurrent retrievals. storageConcurrency = 16 + + // + trieRebuildTimeGauge = metrics.NewRegisteredGauge("snap/sync/rebuild", nil) ) // ErrCancelled is returned from snap syncing if the operation was prematurely @@ -502,8 +506,9 @@ type Syncer struct { storageHealed uint64 // Number of storage slots downloaded during the healing stage storageHealedBytes common.StorageSize // Number of raw storage bytes persisted to disk during the healing stage - startTime time.Time // Time instance when snapshot sync started - logTime time.Time // Time instance when status was last reported + startTime time.Time // Time instance when snapshot sync started + logTime time.Time // Time instance when status was last reported + trieRebuildTime time.Duration // Total duration it took to rebuild trie intermediate nodes pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root) @@ -2202,28 +2207,37 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // Keep the left boundary as it's complete tr = newPathTrie(account, false, s.db, batch) } + + start := time.Now() for j := 0; j < len(res.hashes[i]); j++ { tr.update(res.hashes[i][j][:], res.slots[i][j]) } tr.commit(true) + s.trieRebuildTime += time.Since(start) } // Persist the received storage segments. These flat state maybe // outdated during the sync, but it can be fixed later during the // snapshot generation. for j := 0; j < len(res.hashes[i]); j++ { rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) + } + start := time.Now() + for j := 0; j < len(res.hashes[i]); j++ { // If we're storing large contracts, generate the trie nodes // on the fly to not trash the gluing points if i == len(res.hashes)-1 && res.subTask != nil { res.subTask.genTrie.update(res.hashes[i][j][:], res.slots[i][j]) } } + s.trieRebuildTime += time.Since(start) } // Large contracts could have generated new trie nodes, flush them to disk if res.subTask != nil { if res.subTask.done { + start := time.Now() root := res.subTask.genTrie.commit(res.subTask.Last == common.MaxHash) + s.trieRebuildTime += time.Since(start) if err := res.subTask.genBatch.Write(); err != nil { log.Error("Failed to persist stack slots", "err", err) } @@ -2241,7 +2255,9 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { } } } else if res.subTask.genBatch.ValueSize() > batchSizeThreshold { + start := time.Now() res.subTask.genTrie.commit(false) + s.trieRebuildTime += time.Since(start) if err := res.subTask.genBatch.Write(); err != nil { log.Error("Failed to persist stack slots", "err", err) } @@ -2417,6 +2433,7 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { slim := types.SlimAccountRLP(*res.accounts[i]) rawdb.WriteAccountSnapshot(batch, hash, slim) + start := time.Now() if !task.needHeal[i] { // If the storage task is complete, drop it into the stack trie // to generate account trie nodes for it @@ -2433,6 +2450,7 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { panic(err) // Really shouldn't ever happen } } + s.trieRebuildTime += time.Since(start) } // Flush anything written just now and update the stats if err := batch.Write(); err != nil { @@ -2464,18 +2482,23 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { // flush after finalizing task.done. It's fine even if we crash and lose this // write as it will only cause more data to be downloaded during heal. if task.done { + start := time.Now() task.genTrie.commit(task.Last == common.MaxHash) + s.trieRebuildTime += time.Since(start) if err := task.genBatch.Write(); err != nil { log.Error("Failed to persist stack account", "err", err) } task.genBatch.Reset() } else if task.genBatch.ValueSize() > batchSizeThreshold { + start := time.Now() task.genTrie.commit(false) + s.trieRebuildTime += time.Since(start) if err := task.genBatch.Write(); err != nil { log.Error("Failed to persist stack account", "err", err) } task.genBatch.Reset() } + trieRebuildTimeGauge.Update(s.trieRebuildTime.Microseconds()) log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes) } From 3d40a60bebd9926a882e62d17056b7721c5c55b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20IRMAK?= Date: Wed, 23 Jul 2025 17:43:21 +0300 Subject: [PATCH 3/4] Revert "eth/snap: track time spent on trie rebuilding" This reverts commit c5b268c31df4346e0541976c37a7e3760dd8e38c. --- eth/protocols/snap/sync.go | 27 ++------------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 1e280984045..84ceb9105ea 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -38,7 +38,6 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/msgrate" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -107,9 +106,6 @@ var ( // storageConcurrency is the number of chunks to split a large contract // storage trie into to allow concurrent retrievals. storageConcurrency = 16 - - // - trieRebuildTimeGauge = metrics.NewRegisteredGauge("snap/sync/rebuild", nil) ) // ErrCancelled is returned from snap syncing if the operation was prematurely @@ -506,9 +502,8 @@ type Syncer struct { storageHealed uint64 // Number of storage slots downloaded during the healing stage storageHealedBytes common.StorageSize // Number of raw storage bytes persisted to disk during the healing stage - startTime time.Time // Time instance when snapshot sync started - logTime time.Time // Time instance when status was last reported - trieRebuildTime time.Duration // Total duration it took to rebuild trie intermediate nodes + startTime time.Time // Time instance when snapshot sync started + logTime time.Time // Time instance when status was last reported pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root) @@ -2207,37 +2202,28 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // Keep the left boundary as it's complete tr = newPathTrie(account, false, s.db, batch) } - - start := time.Now() for j := 0; j < len(res.hashes[i]); j++ { tr.update(res.hashes[i][j][:], res.slots[i][j]) } tr.commit(true) - s.trieRebuildTime += time.Since(start) } // Persist the received storage segments. These flat state maybe // outdated during the sync, but it can be fixed later during the // snapshot generation. for j := 0; j < len(res.hashes[i]); j++ { rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) - } - start := time.Now() - for j := 0; j < len(res.hashes[i]); j++ { // If we're storing large contracts, generate the trie nodes // on the fly to not trash the gluing points if i == len(res.hashes)-1 && res.subTask != nil { res.subTask.genTrie.update(res.hashes[i][j][:], res.slots[i][j]) } } - s.trieRebuildTime += time.Since(start) } // Large contracts could have generated new trie nodes, flush them to disk if res.subTask != nil { if res.subTask.done { - start := time.Now() root := res.subTask.genTrie.commit(res.subTask.Last == common.MaxHash) - s.trieRebuildTime += time.Since(start) if err := res.subTask.genBatch.Write(); err != nil { log.Error("Failed to persist stack slots", "err", err) } @@ -2255,9 +2241,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { } } } else if res.subTask.genBatch.ValueSize() > batchSizeThreshold { - start := time.Now() res.subTask.genTrie.commit(false) - s.trieRebuildTime += time.Since(start) if err := res.subTask.genBatch.Write(); err != nil { log.Error("Failed to persist stack slots", "err", err) } @@ -2433,7 +2417,6 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { slim := types.SlimAccountRLP(*res.accounts[i]) rawdb.WriteAccountSnapshot(batch, hash, slim) - start := time.Now() if !task.needHeal[i] { // If the storage task is complete, drop it into the stack trie // to generate account trie nodes for it @@ -2450,7 +2433,6 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { panic(err) // Really shouldn't ever happen } } - s.trieRebuildTime += time.Since(start) } // Flush anything written just now and update the stats if err := batch.Write(); err != nil { @@ -2482,23 +2464,18 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { // flush after finalizing task.done. It's fine even if we crash and lose this // write as it will only cause more data to be downloaded during heal. if task.done { - start := time.Now() task.genTrie.commit(task.Last == common.MaxHash) - s.trieRebuildTime += time.Since(start) if err := task.genBatch.Write(); err != nil { log.Error("Failed to persist stack account", "err", err) } task.genBatch.Reset() } else if task.genBatch.ValueSize() > batchSizeThreshold { - start := time.Now() task.genTrie.commit(false) - s.trieRebuildTime += time.Since(start) if err := task.genBatch.Write(); err != nil { log.Error("Failed to persist stack account", "err", err) } task.genBatch.Reset() } - trieRebuildTimeGauge.Update(s.trieRebuildTime.Microseconds()) log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes) } From ff5beb274b604b9cb9862dea34bd7ca6539c36b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20IRMAK?= Date: Mon, 28 Jul 2025 09:12:55 +0300 Subject: [PATCH 4/4] updates from gary --- common/arena.go | 46 ++++++++++++++++++++++++++++++++-------------- trie/stacktrie.go | 4 ++-- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/common/arena.go b/common/arena.go index 3d38defd975..296fdaa88b9 100644 --- a/common/arena.go +++ b/common/arena.go @@ -16,37 +16,55 @@ package common +import "fmt" + // Arena is an allocation primitive that allows individual allocations // out of a page of items and bulk de-allocations of last N allocations. // The most common way of using an Arena is to pair it up with a sync.Pool // of pages. +// +// Notably, arena is not thread safe, please manage the concurrency issues +// by yourselves. type Arena[T any] struct { - used uint32 - pages [][]T + used uint32 + pages [][]T + pageSize uint32 + newPage func() any + releasePage func(any) +} - PageSize uint32 - NewPage func() any - ReleasePage func(any) +// NewArena constructs the arena for the given type. +func NewArena[T any](pageSize uint32, newPage func() any, releasePage func(any)) *Arena[T] { + return &Arena[T]{ + pageSize: pageSize, + newPage: newPage, + releasePage: releasePage, + } } -// Alloc returns the next free item on the arena -// Allocates a new page if needed +// Alloc returns the next free item on the arena. func (a *Arena[T]) Alloc() *T { - pageIndex := a.used / a.PageSize - pageOffset := a.used % a.PageSize - if pageOffset == 0 { - a.pages = append(a.pages, a.NewPage().([]T)) + pageIndex := a.used / a.pageSize + pageOffset := a.used % a.pageSize + + // Allocate additional items if all pre-allocated ones have been exhausted + if pageOffset == 0 && int(pageIndex) == len(a.pages) { + items := a.newPage().([]T) + if len(items) != int(a.pageSize) { + panic(fmt.Errorf("invalid page size, want: %d, got: %d", a.pageSize, len(items))) + } + a.pages = append(a.pages, items) } a.used++ return &a.pages[pageIndex][pageOffset] } -// Used returns the number of items that live on this arena +// Used returns the number of items that live on this arena. func (a *Arena[T]) Used() uint32 { return a.used } -// Reset rollsback the active set of live elements to the given number +// Reset rollbacks the active set of live elements to the given number. func (a *Arena[T]) Reset(to uint32) { a.used = to } @@ -54,7 +72,7 @@ func (a *Arena[T]) Reset(to uint32) { // Release releases all the pages that the arena currently owns func (a *Arena[T]) Release() { for _, page := range a.pages { - a.ReleasePage(page) + a.releasePage(page) } a.pages = nil a.used = 0 diff --git a/trie/stacktrie.go b/trie/stacktrie.go index ac8baca18d1..01e1d311eff 100644 --- a/trie/stacktrie.go +++ b/trie/stacktrie.go @@ -90,8 +90,8 @@ func NewStackTrie(onTrieNode OnTrieNode) *StackTrie { onTrieNode: onTrieNode, kBuf: make([]byte, 64), pBuf: make([]byte, 64), - nodeAllocator: common.Arena[stNode]{NewPage: stPagePool.Get, PageSize: uint32(stPageSize), ReleasePage: stPagePool.Put}, - byteAllocator: common.Arena[[32]byte]{NewPage: bytePagePool.Get, PageSize: uint32(stPageSize), ReleasePage: bytePagePool.Put}, + nodeAllocator: *common.NewArena[stNode](uint32(stPageSize), stPagePool.Get, stPagePool.Put), + byteAllocator: *common.NewArena[[32]byte](uint32(stPageSize), bytePagePool.Get, bytePagePool.Put), } t.root = t.nodeAllocator.Alloc().reset() t.tmpNode = t.nodeAllocator.Alloc().reset()