Skip to content

Commit 9abd102

Browse files
authored
[task/dispatcher] support prover amount dispatch window size (#537)
1 parent d9504a7 commit 9abd102

File tree

7 files changed

+259
-177
lines changed

7 files changed

+259
-177
lines changed

task/dispatcher/dispatcher.go

Lines changed: 68 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@ import (
44
"log/slog"
55
"strconv"
66
"sync"
7+
"time"
78

8-
"github.com/ethereum/go-ethereum/common"
99
pubsub "github.com/libp2p/go-libp2p-pubsub"
10-
"github.com/pkg/errors"
1110

1211
"github.com/machinefi/sprout/datasource"
1312
"github.com/machinefi/sprout/p2p"
@@ -55,6 +54,7 @@ type Dispatcher struct {
5554
chainHeadNotification <-chan uint64
5655
contract Contract
5756
taskStateHandler *taskStateHandler
57+
windowSizeSetInterval time.Duration
5858
}
5959

6060
func (d *Dispatcher) handleP2PData(data *p2p.Data, topic *pubsub.Topic) {
@@ -71,7 +71,7 @@ func (d *Dispatcher) handleP2PData(data *p2p.Data, topic *pubsub.Topic) {
7171
pd.(*projectDispatcher).handle(s)
7272
}
7373

74-
func (d *Dispatcher) setWindowSize(head uint64) {
74+
func (d *Dispatcher) setRequiredProverAmount(head uint64) {
7575
ps := d.projectOffsets.Projects(head)
7676
for _, p := range ps {
7777
cp := d.contract.Project(p.ID, head)
@@ -81,7 +81,6 @@ func (d *Dispatcher) setWindowSize(head uint64) {
8181
}
8282
pd, ok := d.projectDispatchers.Load(p.ID)
8383
if !ok {
84-
slog.Error("the project dispatcher not exist when set window size", "project_id", p.ID)
8584
continue
8685
}
8786

@@ -94,88 +93,10 @@ func (d *Dispatcher) setWindowSize(head uint64) {
9493
}
9594
proverAmount = n
9695
}
97-
pd.(*projectDispatcher).window.size.Store(proverAmount)
96+
pd.(*projectDispatcher).requiredProverAmount.Store(proverAmount)
9897
}
9998
}
10099

101-
func New(persistence Persistence, newDatasource NewDatasource,
102-
projectManager ProjectManager, defaultDatasourceURI, bootNodeMultiaddr, operatorPrivateKey, operatorPrivateKeyED25519 string,
103-
sequencerPubKey []byte, iotexChainID int, projectNotification <-chan uint64, chainHeadNotification <-chan uint64,
104-
contract Contract, projectOffsets *scheduler.ProjectEpochOffsets) (*Dispatcher, error) {
105-
106-
projectDispatchers := &sync.Map{}
107-
taskStateHandler := newTaskStateHandler(persistence, contract, projectManager, operatorPrivateKey, operatorPrivateKeyED25519)
108-
d := &Dispatcher{
109-
local: false,
110-
persistence: persistence,
111-
newDatasource: newDatasource,
112-
projectManager: projectManager,
113-
defaultDatasourceURI: defaultDatasourceURI,
114-
bootNodeMultiaddr: bootNodeMultiaddr,
115-
operatorPrivateKey: operatorPrivateKey,
116-
operatorPrivateKeyED25519: operatorPrivateKeyED25519,
117-
sequencerPubKey: sequencerPubKey,
118-
iotexChainID: iotexChainID,
119-
projectNotification: projectNotification,
120-
chainHeadNotification: chainHeadNotification,
121-
contract: contract,
122-
projectOffsets: projectOffsets,
123-
projectDispatchers: projectDispatchers,
124-
taskStateHandler: taskStateHandler,
125-
}
126-
ps, err := p2p.NewPubSubs(d.handleP2PData, bootNodeMultiaddr, iotexChainID)
127-
if err != nil {
128-
return nil, err
129-
}
130-
d.pubSubs = ps
131-
return d, nil
132-
}
133-
134-
func NewLocal(persistence Persistence, newDatasource NewDatasource,
135-
projectManager ProjectManager, defaultDatasourceURI, operatorPrivateKey, operatorPrivateKeyED25519, bootNodeMultiaddr string,
136-
sequencerPubKey []byte, iotexChainID int) (*Dispatcher, error) {
137-
138-
projectDispatchers := &sync.Map{}
139-
taskStateHandler := newTaskStateHandler(persistence, nil, projectManager, operatorPrivateKey, operatorPrivateKeyED25519)
140-
d := &Dispatcher{
141-
local: true,
142-
projectDispatchers: projectDispatchers,
143-
}
144-
ps, err := p2p.NewPubSubs(d.handleP2PData, bootNodeMultiaddr, iotexChainID)
145-
if err != nil {
146-
return nil, err
147-
}
148-
149-
projectIDs := projectManager.ProjectIDs()
150-
for _, id := range projectIDs {
151-
_, ok := projectDispatchers.Load(id)
152-
if ok {
153-
continue
154-
}
155-
p, err := projectManager.Project(id)
156-
if err != nil {
157-
return nil, errors.Wrapf(err, "failed to get project, project_id %v", id)
158-
}
159-
if err := ps.Add(id); err != nil {
160-
return nil, errors.Wrapf(err, "failed to add pubsubs, project_id %v", id)
161-
}
162-
cp := &contract.Project{
163-
ID: id,
164-
Attributes: map[common.Hash][]byte{},
165-
}
166-
uri := p.DatasourceURI
167-
if uri == "" {
168-
uri = defaultDatasourceURI
169-
}
170-
pd, err := newProjectDispatcher(persistence, uri, newDatasource, cp, ps, taskStateHandler, sequencerPubKey)
171-
if err != nil {
172-
return nil, errors.Wrapf(err, "failed to new project dispatcher, project_id %v", id)
173-
}
174-
projectDispatchers.Store(id, pd)
175-
}
176-
return d, nil
177-
}
178-
179100
func (d *Dispatcher) setProjectDispatcher(pid uint64) {
180101
cp := d.contract.LatestProject(pid)
181102
if cp == nil {
@@ -213,6 +134,34 @@ func (d *Dispatcher) setProjectDispatcher(pid uint64) {
213134
slog.Info("a new project dispatcher started", "project_id", cp.ID)
214135
}
215136

137+
func (d *Dispatcher) setWindowSize() {
138+
ticker := time.NewTicker(d.windowSizeSetInterval)
139+
for range ticker.C {
140+
provers := d.contract.LatestProvers()
141+
proverAmount := uint64(0)
142+
for _, p := range provers {
143+
if p.Paused {
144+
continue
145+
}
146+
proverAmount++
147+
}
148+
d.projectDispatchers.Range(func(k, v interface{}) bool {
149+
pd := v.(*projectDispatcher)
150+
if pd.idle.Load() || proverAmount == 0 {
151+
pd.window.setSize(0)
152+
return true
153+
}
154+
size := proverAmount
155+
if size > pd.requiredProverAmount.Load() {
156+
size = pd.requiredProverAmount.Load()
157+
}
158+
proverAmount -= size
159+
pd.window.setSize(size)
160+
return true
161+
})
162+
}
163+
}
164+
216165
func (d *Dispatcher) Run() {
217166
if d.local {
218167
return
@@ -230,7 +179,42 @@ func (d *Dispatcher) Run() {
230179
}()
231180
go func() {
232181
for head := range d.chainHeadNotification {
233-
d.setWindowSize(head)
182+
d.setRequiredProverAmount(head)
234183
}
235184
}()
185+
go d.setWindowSize()
186+
}
187+
188+
func New(persistence Persistence, newDatasource NewDatasource,
189+
projectManager ProjectManager, defaultDatasourceURI, bootNodeMultiaddr, operatorPrivateKey, operatorPrivateKeyED25519 string,
190+
sequencerPubKey []byte, iotexChainID int, projectNotification <-chan uint64, chainHeadNotification <-chan uint64,
191+
contract Contract, projectOffsets *scheduler.ProjectEpochOffsets) (*Dispatcher, error) {
192+
193+
projectDispatchers := &sync.Map{}
194+
taskStateHandler := newTaskStateHandler(persistence, contract, projectManager, operatorPrivateKey, operatorPrivateKeyED25519)
195+
d := &Dispatcher{
196+
local: false,
197+
persistence: persistence,
198+
newDatasource: newDatasource,
199+
projectManager: projectManager,
200+
defaultDatasourceURI: defaultDatasourceURI,
201+
bootNodeMultiaddr: bootNodeMultiaddr,
202+
operatorPrivateKey: operatorPrivateKey,
203+
operatorPrivateKeyED25519: operatorPrivateKeyED25519,
204+
sequencerPubKey: sequencerPubKey,
205+
iotexChainID: iotexChainID,
206+
projectNotification: projectNotification,
207+
chainHeadNotification: chainHeadNotification,
208+
contract: contract,
209+
projectOffsets: projectOffsets,
210+
projectDispatchers: projectDispatchers,
211+
taskStateHandler: taskStateHandler,
212+
windowSizeSetInterval: 5 * time.Second,
213+
}
214+
ps, err := p2p.NewPubSubs(d.handleP2PData, bootNodeMultiaddr, iotexChainID)
215+
if err != nil {
216+
return nil, err
217+
}
218+
d.pubSubs = ps
219+
return d, nil
236220
}

task/dispatcher/dispatcher_test.go

Lines changed: 9 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func TestDispatcher_handleP2PData(t *testing.T) {
6565
})
6666
}
6767

68-
func TestDispatcher_setWindowSize(t *testing.T) {
68+
func TestDispatcher_setRequiredProverAmount(t *testing.T) {
6969
r := require.New(t)
7070
po := &scheduler.ProjectEpochOffsets{}
7171
c := &contract.Contract{}
@@ -79,23 +79,23 @@ func TestDispatcher_setWindowSize(t *testing.T) {
7979
defer p.Reset()
8080

8181
p.ApplyMethodReturn(po, "Projects", nil)
82-
d.setWindowSize(1)
82+
d.setRequiredProverAmount(1)
8383
})
8484
t.Run("ContractProjectNotExist", func(t *testing.T) {
8585
p := gomonkey.NewPatches()
8686
defer p.Reset()
8787

8888
p.ApplyMethodReturn(po, "Projects", []*scheduler.ScheduledProject{{ID: 1, ScheduledBlockNumber: 0}})
8989
p.ApplyMethodReturn(c, "Project", nil)
90-
d.setWindowSize(1)
90+
d.setRequiredProverAmount(1)
9191
})
9292
t.Run("ProjectDispatcherNotExist", func(t *testing.T) {
9393
p := gomonkey.NewPatches()
9494
defer p.Reset()
9595

9696
p.ApplyMethodReturn(po, "Projects", []*scheduler.ScheduledProject{{ID: 1, ScheduledBlockNumber: 0}})
9797
p.ApplyMethodReturn(c, "Project", &contract.Project{ID: 1})
98-
d.setWindowSize(1)
98+
d.setRequiredProverAmount(1)
9999
})
100100
t.Run("FailedToParseProjectRequiredProverAmount", func(t *testing.T) {
101101
p := gomonkey.NewPatches()
@@ -107,7 +107,7 @@ func TestDispatcher_setWindowSize(t *testing.T) {
107107
Attributes: map[common.Hash][]byte{contract.RequiredProverAmountHash: []byte("err")},
108108
})
109109
d.projectDispatchers.Store(uint64(1), &projectDispatcher{})
110-
d.setWindowSize(1)
110+
d.setRequiredProverAmount(1)
111111
})
112112
t.Run("Success", func(t *testing.T) {
113113
p := gomonkey.NewPatches()
@@ -120,9 +120,9 @@ func TestDispatcher_setWindowSize(t *testing.T) {
120120
})
121121
size := atomic.Uint64{}
122122
d.projectDispatchers.Store(uint64(1), &projectDispatcher{
123-
window: &window{size: &size},
123+
requiredProverAmount: &size,
124124
})
125-
d.setWindowSize(1)
125+
d.setRequiredProverAmount(1)
126126
r.Equal(size.Load(), uint64(2))
127127
})
128128
}
@@ -150,69 +150,6 @@ func TestNew(t *testing.T) {
150150
})
151151
}
152152

153-
func TestNewLocal(t *testing.T) {
154-
r := require.New(t)
155-
t.Run("FailedToNewPubSubs", func(t *testing.T) {
156-
p := gomonkey.NewPatches()
157-
defer p.Reset()
158-
159-
p.ApplyFuncReturn(p2p.NewPubSubs, nil, errors.New(t.Name()))
160-
161-
_, err := NewLocal(&mockPersistence{}, nil, nil, "", "", "", "", []byte(""), 0)
162-
r.ErrorContains(err, t.Name())
163-
})
164-
t.Run("FailedToGetProject", func(t *testing.T) {
165-
p := gomonkey.NewPatches()
166-
defer p.Reset()
167-
168-
pm := &mockProjectManager{}
169-
p.ApplyMethodReturn(pm, "Project", nil, errors.New(t.Name()))
170-
p.ApplyFuncReturn(p2p.NewPubSubs, &p2p.PubSubs{}, nil)
171-
172-
_, err := NewLocal(&mockPersistence{}, nil, pm, "", "", "", "", []byte(""), 0)
173-
r.ErrorContains(err, t.Name())
174-
})
175-
t.Run("FailedToAddPubSubs", func(t *testing.T) {
176-
p := gomonkey.NewPatches()
177-
defer p.Reset()
178-
179-
pm := &mockProjectManager{}
180-
p.ApplyFuncReturn(p2p.NewPubSubs, &p2p.PubSubs{}, nil)
181-
p.ApplyMethodReturn(&p2p.PubSubs{}, "Add", errors.New(t.Name()))
182-
p.ApplyMethodReturn(pm, "Project", nil, nil)
183-
184-
_, err := NewLocal(&mockPersistence{}, nil, pm, "", "", "", "", []byte(""), 0)
185-
r.ErrorContains(err, t.Name())
186-
})
187-
t.Run("FailedToNewProjectDispatch", func(t *testing.T) {
188-
p := gomonkey.NewPatches()
189-
defer p.Reset()
190-
191-
pm := &mockProjectManager{}
192-
p.ApplyFuncReturn(p2p.NewPubSubs, &p2p.PubSubs{}, nil)
193-
p.ApplyMethodReturn(&p2p.PubSubs{}, "Add", nil)
194-
p.ApplyFuncReturn(newProjectDispatcher, nil, errors.New(t.Name()))
195-
p.ApplyMethodReturn(pm, "Project", &project.Project{}, nil)
196-
197-
_, err := NewLocal(&mockPersistence{}, nil, pm, "", "", "", "", []byte(""), 0)
198-
r.ErrorContains(err, t.Name())
199-
})
200-
t.Run("Success", func(t *testing.T) {
201-
p := gomonkey.NewPatches()
202-
defer p.Reset()
203-
204-
pm := &mockProjectManager{}
205-
p.ApplyFuncReturn(p2p.NewPubSubs, &p2p.PubSubs{}, nil)
206-
p.ApplyMethodReturn(&p2p.PubSubs{}, "Add", nil)
207-
p.ApplyFuncReturn(newProjectDispatcher, &projectDispatcher{}, nil)
208-
p.ApplyMethodReturn(pm, "ProjectIDs", []uint64{0, 0})
209-
p.ApplyMethodReturn(pm, "Project", &project.Project{}, nil)
210-
211-
_, err := NewLocal(&mockPersistence{}, nil, pm, "", "", "", "", []byte(""), 0)
212-
r.NoError(err)
213-
})
214-
}
215-
216153
func TestDispatcher_setProjectDispatcher(t *testing.T) {
217154
paused := true
218155
cp := &contract.Project{
@@ -356,7 +293,8 @@ func TestDispatcher_Run(t *testing.T) {
356293

357294
p.ApplyMethodReturn(d.contract, "LatestProjects", []*contract.Project{{}})
358295
p.ApplyPrivateMethod(d, "setProjectDispatcher", func(*contract.Project) {})
359-
p.ApplyPrivateMethod(d, "setWindowSize", func(head uint64) {})
296+
p.ApplyPrivateMethod(d, "setRequiredProverAmount", func(head uint64) {})
297+
p.ApplyPrivateMethod(d, "setWindowSize", func() {})
360298

361299
d.Run()
362300
time.Sleep(10 * time.Millisecond)

0 commit comments

Comments
 (0)