Skip to content

Commit 573776d

Browse files
authored
[scheduler] support pull contract data & fix task timeout func & test fix (#406)
1 parent b01d850 commit 573776d

38 files changed

+768
-1378
lines changed

cmd/coordinator/config/config.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ type Config struct {
1111
ServiceEndpoint string `env:"HTTP_SERVICE_ENDPOINT"`
1212
ChainEndpoint string `env:"CHAIN_ENDPOINT"`
1313
DatabaseDSN string `env:"DATABASE_DSN"`
14-
DatasourceDSN string `env:"DATASOURCE_DSN"`
1514
BootNodeMultiAddr string `env:"BOOTNODE_MULTIADDR"`
1615
IoTeXChainID int `env:"IOTEX_CHAINID"`
1716
ProjectContractAddress string `env:"PROJECT_CONTRACT_ADDRESS,optional"`
@@ -32,43 +31,40 @@ var (
3231
ServiceEndpoint: ":9001",
3332
ChainEndpoint: "https://babel-api.testnet.iotex.io",
3433
DatabaseDSN: "postgres://test_user:test_passwd@postgres:5432/test?sslmode=disable",
35-
DatasourceDSN: "postgres://test_user:test_passwd@postgres:5432/test?sslmode=disable",
3634
BootNodeMultiAddr: "/dns4/bootnode-0.testnet.iotex.one/tcp/4689/ipfs/12D3KooWFnaTYuLo8Mkbm3wzaWHtUuaxBRe24Uiopu15Wr5EhD3o",
3735
IoTeXChainID: 2,
38-
ProjectContractAddress: "0x02feBE78F3A740b3e9a1CaFAA1b23a2ac0793D26",
36+
ProjectContractAddress: "0x2339644f65c371Ca36b335A7eC3EB8AD8CBd5F51",
3937
IPFSEndpoint: "ipfs.mainnet.iotex.io",
4038
DIDAuthServerEndpoint: "didkit:9999",
41-
SequencerPubKey: "",
39+
SequencerPubKey: "0x04df6acbc5b355aabfb2145b36b20b7942c831c245c423a20b189fab4cf3a3dba3d564080841f2eb4890c118ca5e0b80b25f81269621c5e28273a962996c109afa",
4240
LogLevel: int(slog.LevelDebug),
4341
}
4442
// local debug default config for coordinator; all config elements come from docker-compose-dev.yaml in root of project
4543
defaultDebugConfig = &Config{
4644
ServiceEndpoint: ":9001",
4745
ChainEndpoint: "https://babel-api.testnet.iotex.io",
4846
DatabaseDSN: "postgres://test_user:test_passwd@localhost:5432/test?sslmode=disable",
49-
DatasourceDSN: "postgres://test_user:test_passwd@localhost:5432/test?sslmode=disable",
5047
BootNodeMultiAddr: "/dns4/bootnode-0.testnet.iotex.one/tcp/4689/ipfs/12D3KooWFnaTYuLo8Mkbm3wzaWHtUuaxBRe24Uiopu15Wr5EhD3o",
5148
IoTeXChainID: 2,
52-
ProjectContractAddress: "0x02feBE78F3A740b3e9a1CaFAA1b23a2ac0793D26",
49+
ProjectContractAddress: "0x2339644f65c371Ca36b335A7eC3EB8AD8CBd5F51",
5350
IPFSEndpoint: "ipfs.mainnet.iotex.io",
5451
DIDAuthServerEndpoint: "localhost:9999",
5552
ProjectCacheDirectory: "./project_cache",
56-
SequencerPubKey: "",
53+
SequencerPubKey: "0x04df6acbc5b355aabfb2145b36b20b7942c831c245c423a20b189fab4cf3a3dba3d564080841f2eb4890c118ca5e0b80b25f81269621c5e28273a962996c109afa",
5754
LogLevel: int(slog.LevelDebug),
5855
}
5956
// integration default config for coordinator; all config elements come from Makefile in `integration_test` entry
6057
defaultTestConfig = &Config{
6158
ServiceEndpoint: ":19001",
6259
ChainEndpoint: "https://babel-api.testnet.iotex.io",
6360
DatabaseDSN: "postgres://test_user:test_passwd@localhost:15432/test?sslmode=disable",
64-
DatasourceDSN: "postgres://test_user:test_passwd@localhost:15432/test?sslmode=disable",
6561
BootNodeMultiAddr: "/dns4/bootnode-0.testnet.iotex.one/tcp/4689/ipfs/12D3KooWFnaTYuLo8Mkbm3wzaWHtUuaxBRe24Uiopu15Wr5EhD3o",
6662
IoTeXChainID: 2,
6763
ProjectContractAddress: "", //"0x02feBE78F3A740b3e9a1CaFAA1b23a2ac0793D26",
6864
IPFSEndpoint: "ipfs.mainnet.iotex.io",
6965
DIDAuthServerEndpoint: "localhost:19999",
7066
ProjectFileDirectory: "./testdata",
71-
SequencerPubKey: "",
67+
SequencerPubKey: "0x04df6acbc5b355aabfb2145b36b20b7942c831c245c423a20b189fab4cf3a3dba3d564080841f2eb4890c118ca5e0b80b25f81269621c5e28273a962996c109afa",
7268
LogLevel: int(slog.LevelDebug),
7369
}
7470
)

cmd/coordinator/config/config_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ func TestConfig_Init(t *testing.T) {
1919
ServiceEndpoint: ":1999",
2020
ChainEndpoint: "http://iotex.chainendpoint.io",
2121
DatabaseDSN: "postgres://username:password@host:port/database?ext=1",
22-
DatasourceDSN: "postgres://username:password@host:port/database?ext=1",
2322
BootNodeMultiAddr: "/dns4/a.b.com/tcp/1000/ipfs/123123123",
2423
IoTeXChainID: 100,
2524
ProjectContractAddress: "0x02feBE78F3A740b3e9a1CaFAA1b23a2ac0793D26",
@@ -33,7 +32,6 @@ func TestConfig_Init(t *testing.T) {
3332
_ = os.Setenv("HTTP_SERVICE_ENDPOINT", expected.ServiceEndpoint)
3433
_ = os.Setenv("CHAIN_ENDPOINT", expected.ChainEndpoint)
3534
_ = os.Setenv("DATABASE_DSN", expected.DatabaseDSN)
36-
_ = os.Setenv("DATASOURCE_DSN", expected.DatasourceDSN)
3735
_ = os.Setenv("BOOTNODE_MULTIADDR", expected.BootNodeMultiAddr)
3836
_ = os.Setenv("IOTEX_CHAINID", strconv.Itoa(expected.IoTeXChainID))
3937
_ = os.Setenv("PROJECT_CONTRACT_ADDRESS", expected.ProjectContractAddress)

cmd/coordinator/main.go

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"os/signal"
88
"syscall"
99

10-
"github.com/ethereum/go-ethereum/common/hexutil"
1110
"github.com/pkg/errors"
1211

1312
"github.com/machinefi/sprout/cmd/coordinator/api"
@@ -21,45 +20,35 @@ import (
2120
func main() {
2221
conf, err := config.Get()
2322
if err != nil {
24-
log.Fatal(err)
23+
log.Fatal(errors.Wrap(err, "failed to get config"))
2524
}
2625
conf.Print()
2726
slog.Info("coordinator config loaded")
2827

2928
persistence, err := persistence.NewPostgres(conf.DatabaseDSN)
3029
if err != nil {
31-
log.Fatal(err)
30+
log.Fatal(errors.Wrap(err, "failed to new postgres persistence"))
3231
}
3332

34-
projectConfigManager, err := project.NewConfigManager(conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectCacheDirectory, conf.IPFSEndpoint)
33+
projectConfigManager, err := project.NewManager(conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectCacheDirectory, conf.IPFSEndpoint)
3534
if err != nil {
36-
log.Fatal(err)
35+
log.Fatal(errors.Wrap(err, "failed to new project config manager"))
3736
}
3837

39-
datasource, err := datasource.NewPostgres(conf.DatasourceDSN)
40-
if err != nil {
41-
log.Fatal(err)
38+
if err := task.RunDispatcher(persistence, datasource.NewPostgres, projectConfigManager.Get, conf.BootNodeMultiAddr, conf.OperatorPrivateKey, conf.OperatorPrivateKeyED25519, conf.ChainEndpoint, conf.ProjectContractAddress, conf.IoTeXChainID); err != nil {
39+
log.Fatal(errors.Wrap(err, "failed to run dispatcher"))
4240
}
4341

44-
nextTaskID, err := persistence.FetchNextTaskID()
45-
if err != nil {
46-
log.Fatal(err)
47-
}
48-
49-
dispatcher, err := task.NewDispatcher(persistence, projectConfigManager, datasource, conf.BootNodeMultiAddr, conf.OperatorPrivateKey, conf.OperatorPrivateKeyED25519, conf.IoTeXChainID)
50-
if err != nil {
51-
log.Fatal(err)
52-
}
53-
54-
sequencerPubKey, err := hexutil.Decode(conf.SequencerPubKey)
55-
if err != nil {
56-
log.Fatal(errors.Wrap(err, "failed to decode sequencer pubkey"))
57-
}
58-
go dispatcher.Dispatch(nextTaskID, sequencerPubKey)
42+
// TODO verify sig
43+
// sequencerPubKey, err := hexutil.Decode(conf.SequencerPubKey)
44+
// if err != nil {
45+
// log.Fatal(errors.Wrap(err, "failed to decode sequencer pubkey"))
46+
// }
47+
//go dispatcher.Dispatch(nextTaskID, sequencerPubKey)
5948

6049
go func() {
6150
if err := api.NewHttpServer(persistence, conf).Run(conf.ServiceEndpoint); err != nil {
62-
log.Fatal(err)
51+
log.Fatal(errors.Wrap(err, "failed to run http server"))
6352
}
6453
}()
6554

cmd/internal/env.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func Print(c any) {
5959
rv := reflect.ValueOf(c).Elem()
6060

6161
if env, ok := c.(interface{ Env() string }); ok {
62-
fmt.Println(color.RedString("ENV: %s", env.Env()))
62+
fmt.Println(color.CyanString("ENV: %s", env.Env()))
6363
}
6464

6565
for i := 0; i < rt.NumField(); i++ {

cmd/internal/env_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ func TestParseEnv(t *testing.T) {
1919
ServiceEndpoint: ":1999",
2020
ChainEndpoint: "http://iotex.chainendpoint.io",
2121
DatabaseDSN: "postgres://username:password@host:port/database?ext=1",
22-
DatasourceDSN: "postgres://username:password@host:port/database?ext=1",
2322
BootNodeMultiAddr: "/dns4/a.b.com/tcp/1000/ipfs/123123123",
2423
IoTeXChainID: 100,
2524
ProjectContractAddress: "0x02feBE78F3A740b3e9a1CaFAA1b23a2ac0793D26",
@@ -33,7 +32,6 @@ func TestParseEnv(t *testing.T) {
3332
_ = os.Setenv("HTTP_SERVICE_ENDPOINT", expected.ServiceEndpoint)
3433
_ = os.Setenv("CHAIN_ENDPOINT", expected.ChainEndpoint)
3534
_ = os.Setenv("DATABASE_DSN", expected.DatabaseDSN)
36-
_ = os.Setenv("DATASOURCE_DSN", expected.DatasourceDSN)
3735
_ = os.Setenv("BOOTNODE_MULTIADDR", expected.BootNodeMultiAddr)
3836
_ = os.Setenv("IOTEX_CHAINID", strconv.Itoa(expected.IoTeXChainID))
3937
_ = os.Setenv("PROJECT_CONTRACT_ADDRESS", expected.ProjectContractAddress)

cmd/prover/config/config.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,16 @@ var (
3535
ZKWasmServerEndpoint: "zkwasm:4001",
3636
WasmServerEndpoint: "wasm:4001",
3737
ChainEndpoint: "https://babel-api.testnet.iotex.io",
38-
ProjectContractAddress: "0x02feBE78F3A740b3e9a1CaFAA1b23a2ac0793D26",
38+
ProjectContractAddress: "0x2339644f65c371Ca36b335A7eC3EB8AD8CBd5F51",
3939
DatabaseDSN: "postgres://test_user:test_passwd@postgres:5432/test?sslmode=disable",
4040
BootNodeMultiAddr: "/dns4/bootnode-0.testnet.iotex.one/tcp/4689/ipfs/12D3KooWFnaTYuLo8Mkbm3wzaWHtUuaxBRe24Uiopu15Wr5EhD3o",
41-
ProverContractAddress: "0xB2b3f3c8BB00493c6f12232C2cb3e20A65698939",
41+
ProverContractAddress: "0x2B9BC8eC74E7F2526045eb13fFa37b10e0d40464",
4242
ProverPrivateKey: "did:key:z6MkmF1AgufHf8ASaxDcCR8iSZjEsEbJMp7LkqyEHw6123",
4343
IoTeXChainID: 2,
44-
SchedulerEpoch: 720,
44+
SchedulerEpoch: 20,
4545
IPFSEndpoint: "ipfs.mainnet.iotex.io",
4646
LogLevel: int(slog.LevelDebug),
47-
SequencerPubKey: "",
47+
SequencerPubKey: "0x04df6acbc5b355aabfb2145b36b20b7942c831c245c423a20b189fab4cf3a3dba3d564080841f2eb4890c118ca5e0b80b25f81269621c5e28273a962996c109afa",
4848
}
4949

5050
defaultDebugConfig = &Config{
@@ -53,17 +53,17 @@ var (
5353
ZKWasmServerEndpoint: "localhost:4003",
5454
WasmServerEndpoint: "localhost:4004",
5555
ChainEndpoint: "https://babel-api.testnet.iotex.io",
56-
ProjectContractAddress: "0x02feBE78F3A740b3e9a1CaFAA1b23a2ac0793D26",
56+
ProjectContractAddress: "0x2339644f65c371Ca36b335A7eC3EB8AD8CBd5F51",
5757
DatabaseDSN: "postgres://test_user:test_passwd@localhost:5432/test?sslmode=disable",
5858
BootNodeMultiAddr: "/dns4/bootnode-0.testnet.iotex.one/tcp/4689/ipfs/12D3KooWFnaTYuLo8Mkbm3wzaWHtUuaxBRe24Uiopu15Wr5EhD3o",
59-
ProverContractAddress: "",
59+
ProverContractAddress: "0x2B9BC8eC74E7F2526045eb13fFa37b10e0d40464",
6060
ProverPrivateKey: "",
6161
IoTeXChainID: 2,
62-
SchedulerEpoch: 720,
62+
SchedulerEpoch: 20,
6363
IPFSEndpoint: "ipfs.mainnet.iotex.io",
6464
ProjectCacheDirectory: "./project_cache",
6565
LogLevel: int(slog.LevelDebug),
66-
SequencerPubKey: "",
66+
SequencerPubKey: "0x04df6acbc5b355aabfb2145b36b20b7942c831c245c423a20b189fab4cf3a3dba3d564080841f2eb4890c118ca5e0b80b25f81269621c5e28273a962996c109afa",
6767
}
6868

6969
defaultTestConfig = &Config{
@@ -78,11 +78,11 @@ var (
7878
ProverContractAddress: "",
7979
ProverPrivateKey: "",
8080
IoTeXChainID: 2,
81-
SchedulerEpoch: 720,
81+
SchedulerEpoch: 20,
8282
IPFSEndpoint: "ipfs.mainnet.iotex.io",
8383
ProjectFileDirectory: "./testdata",
8484
LogLevel: int(slog.LevelDebug),
85-
SequencerPubKey: "",
85+
SequencerPubKey: "0x04df6acbc5b355aabfb2145b36b20b7942c831c245c423a20b189fab4cf3a3dba3d564080841f2eb4890c118ca5e0b80b25f81269621c5e28273a962996c109afa",
8686
}
8787
)
8888

cmd/prover/main.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package main
22

33
import (
4-
"crypto/x509"
5-
"encoding/hex"
64
"log"
75
"log/slog"
86
"os"
@@ -42,7 +40,7 @@ func main() {
4240
},
4341
)
4442

45-
projectConfigManager, err := project.NewConfigManager(conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectCacheDirectory, conf.IPFSEndpoint)
43+
projectConfigManager, err := project.NewManager(conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectCacheDirectory, conf.IPFSEndpoint)
4644
if err != nil {
4745
log.Fatal(err)
4846
}
@@ -51,25 +49,23 @@ func main() {
5149
if err != nil {
5250
log.Fatal(errors.Wrap(err, "failed to parse prover private key"))
5351
}
54-
pubKeyBytes, err := x509.MarshalPKIXPublicKey(sk.PublicKey)
55-
if err != nil {
56-
log.Fatal(errors.Wrap(err, "failed to marshal public key"))
57-
}
58-
pubKeyHex := hex.EncodeToString(pubKeyBytes)
52+
proverID := crypto.PubkeyToAddress(sk.PublicKey).String()
53+
54+
slog.Info("my prover id", "prover_id", proverID)
5955

6056
sequencerPubKey, err := hexutil.Decode(conf.SequencerPubKey)
6157
if err != nil {
6258
log.Fatal(errors.Wrap(err, "failed to decode sequencer pubkey"))
6359
}
6460

65-
taskProcessor := task.NewProcessor(vmHandler, projectConfigManager, sk, sequencerPubKey, pubKeyHex)
61+
taskProcessor := task.NewProcessor(vmHandler, projectConfigManager, sk, sequencerPubKey, proverID)
6662

6763
pubSubs, err := p2p.NewPubSubs(taskProcessor.HandleP2PData, conf.BootNodeMultiAddr, conf.IoTeXChainID)
6864
if err != nil {
6965
log.Fatal(err)
7066
}
7167

72-
if err := scheduler.Run(conf.SchedulerEpoch, conf.ChainEndpoint, conf.ProverContractAddress, conf.ProjectContractAddress, pubKeyHex, pubSubs, taskProcessor.HandleProjectProvers); err != nil {
68+
if err := scheduler.Run(conf.SchedulerEpoch, conf.ChainEndpoint, conf.ProverContractAddress, conf.ProjectContractAddress, proverID, pubSubs, taskProcessor.HandleProjectProvers); err != nil {
7369
log.Fatal(err)
7470
}
7571

cmd/sequencer/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os/signal"
99
"syscall"
1010

11+
"github.com/ethereum/go-ethereum/common/hexutil"
1112
"github.com/ethereum/go-ethereum/crypto"
1213
"github.com/pkg/errors"
1314

@@ -31,7 +32,7 @@ func init() {
3132
flag.StringVar(&coordinatorAddress, "coordinatorAddress", "localhost:9001", "coordinator address")
3233
flag.StringVar(&databaseDSN, "databaseDSN", "postgres://test_user:test_passwd@localhost:5432/test?sslmode=disable", "database dsn")
3334
flag.StringVar(&didAuthServer, "didAuthServer", "localhost:9999", "did auth server endpoint")
34-
flag.StringVar(&privateKey, "privateKey", "", "sequencer private key")
35+
flag.StringVar(&privateKey, "privateKey", "dbfe03b0406549232b8dccc04be8224fcc0afa300a33d4f335dcfdfead861c85", "sequencer private key")
3536
}
3637

3738
func main() {
@@ -44,6 +45,8 @@ func main() {
4445
log.Fatal(errors.Wrap(err, "failed parse private key"))
4546
}
4647

48+
slog.Info("sequencer public key", "public_key", hexutil.Encode(crypto.FromECDSAPub(&sk.PublicKey)))
49+
4750
_ = clients.NewManager()
4851

4952
p, err := newPersistence(databaseDSN)

cmd/sequencer/persistence.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,16 @@ type message struct {
2929

3030
type task struct {
3131
gorm.Model
32+
ProjectID uint64 `gorm:"index:task_fetch,not null"`
3233
InternalTaskID string `gorm:"index:internal_task_id,not null"`
3334
MessageIDs datatypes.JSON `gorm:"not null"`
34-
Signature string
35+
Signature string `gorm:"not null,default:''"`
3536
}
3637

3738
func (t *task) sign(sk *ecdsa.PrivateKey, projectID uint64, clientID string, messages ...[]byte) (string, error) {
3839
buf := bytes.NewBuffer(nil)
3940

40-
if err := binary.Write(buf, binary.BigEndian, t.ID); err != nil {
41+
if err := binary.Write(buf, binary.BigEndian, uint64(t.ID)); err != nil {
4142
return "", err
4243
}
4344
if err := binary.Write(buf, binary.BigEndian, projectID); err != nil {
@@ -78,7 +79,7 @@ func (p *persistence) aggregateTaskTx(tx *gorm.DB, amount int, m *message, sk *e
7879
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
7980
Order("created_at").
8081
Where(
81-
"project_id = ? AND project_version = ? AND client_did = ? AND internal_task_id = ?",
82+
"project_id = ? AND project_version = ? AND client_id = ? AND internal_task_id = ?",
8283
m.ProjectID, m.ProjectVersion, m.ClientID, "",
8384
).Limit(amount).Find(&messages).Error; err != nil {
8485
return errors.Wrap(err, "failed to fetch unpacked messages")
@@ -104,6 +105,7 @@ func (p *persistence) aggregateTaskTx(tx *gorm.DB, amount int, m *message, sk *e
104105

105106
t := &task{
106107
InternalTaskID: taskID,
108+
ProjectID: m.ProjectID,
107109
MessageIDs: messageIDsJson,
108110
}
109111

@@ -120,8 +122,7 @@ func (p *persistence) aggregateTaskTx(tx *gorm.DB, amount int, m *message, sk *e
120122
return errors.Wrap(err, "failed to sign task")
121123
}
122124

123-
t.Signature = sig
124-
if err := tx.Model(t).Update("sign", sig).Where("id = ?", t.ID).Error; err != nil {
125+
if err := tx.Model(t).Update("signature", sig).Where("id = ?", t.ID).Error; err != nil {
125126
return errors.Wrap(err, "failed to update task sign")
126127
}
127128

cmd/tests/init.go

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func runProver(conf *proverconfig.Config) {
117117
},
118118
)
119119

120-
projectConfigManager, err := project.NewConfigManager(conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectCacheDirectory, conf.IPFSEndpoint)
120+
projectConfigManager, err := project.NewManager(conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectCacheDirectory, conf.IPFSEndpoint)
121121
if err != nil {
122122
log.Fatal(err)
123123
}
@@ -160,30 +160,14 @@ func runCoordinator(conf *coordinatorconfig.Config) {
160160

161161
_ = clients.NewManager()
162162

163-
projectConfigManager, err := project.NewConfigManager(conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectCacheDirectory, conf.IPFSEndpoint)
163+
projectConfigManager, err := project.NewManager(conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectCacheDirectory, conf.IPFSEndpoint)
164164
if err != nil {
165165
log.Fatal(err)
166166
}
167167

168-
datasource, err := datasource.NewPostgres(conf.DatasourceDSN)
169-
if err != nil {
170-
log.Fatal(err)
171-
}
172-
173-
nextTaskID, err := pg.FetchNextTaskID()
174-
if err != nil {
175-
log.Fatal(err)
176-
}
177-
178-
dispatcher, err := task.NewDispatcher(pg, projectConfigManager, datasource, conf.BootNodeMultiAddr, conf.OperatorPrivateKey, conf.OperatorPrivateKeyED25519, conf.IoTeXChainID)
179-
if err != nil {
180-
log.Fatal(err)
181-
}
182-
sequencerPubKey, err := hexutil.Decode(conf.SequencerPubKey)
183-
if err != nil {
184-
log.Fatal(errors.Wrap(err, "failed to decode sequencer pubkey"))
168+
if err := task.RunDispatcher(pg, datasource.NewPostgres, projectConfigManager.Get, conf.BootNodeMultiAddr, conf.OperatorPrivateKey, conf.OperatorPrivateKeyED25519, conf.ChainEndpoint, conf.ProjectContractAddress, conf.IoTeXChainID); err != nil {
169+
log.Fatal(errors.Wrap(err, "failed to run dispatcher"))
185170
}
186-
go dispatcher.Dispatch(nextTaskID, sequencerPubKey)
187171

188172
go func() {
189173
if err := api.NewHttpServer(pg, conf).Run(conf.ServiceEndpoint); err != nil {

0 commit comments

Comments
 (0)