Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type deliverTxTask struct {
Response *types.ResponseDeliverTx
VersionStores map[sdk.StoreKey]*multiversion.VersionIndexedStore
TxTracer sdk.TxTracer
CumulativeGas int64
}

// AppendDependencies appends the given indexes to the task's dependencies
Expand Down Expand Up @@ -112,6 +113,7 @@ type scheduler struct {
metrics *schedulerMetrics
synchronous bool // true if maxIncarnation exceeds threshold
maxIncarnation int // current highest incarnation
execMetrics *execMetrics
}

// NewScheduler creates a new scheduler
Expand All @@ -121,6 +123,7 @@ func NewScheduler(workers int, tracingInfo *tracing.Info, deliverTxFunc func(ctx
deliverTx: deliverTxFunc,
tracingInfo: tracingInfo,
metrics: &schedulerMetrics{},
execMetrics: &execMetrics{},
}
}

Expand Down Expand Up @@ -202,10 +205,23 @@ func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTa
return allTasks, tasksMap
}

type execMetrics struct {
totalGas int64
maxGas int64
cumulativeGas int64
longestRunningTxHash string
longestRunningTxDuration time.Duration
}

func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx {
res := make([]types.ResponseDeliverTx, 0, len(tasks))
for _, t := range tasks {
res = append(res, *t.Response)
if t.Response.GasUsed > s.execMetrics.maxGas {
s.execMetrics.maxGas = t.Response.GasUsed
}
s.execMetrics.totalGas += t.Response.GasUsed
s.execMetrics.cumulativeGas += t.CumulativeGas

if t.TxTracer != nil {
t.TxTracer.Commit()
Expand Down Expand Up @@ -346,9 +362,24 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
}
s.metrics.maxIncarnation = s.maxIncarnation

ctx.Logger().Info("occ scheduler", "height", ctx.BlockHeight(), "txs", len(tasks), "latency_ms", time.Since(startTime).Milliseconds(), "retries", s.metrics.retries, "maxIncarnation", s.maxIncarnation, "iterations", iterations, "sync", s.synchronous, "workers", s.workers)
r := s.collectResponses(tasks)

ctx.Logger().Info("occ scheduler",
"height", ctx.BlockHeight(),
"txs", len(tasks),
"latency_ms", time.Since(startTime).Milliseconds(),
"retries", s.metrics.retries,
"maxIncarnation", s.maxIncarnation,
"totalGas", s.execMetrics.totalGas,
"maxGas", s.execMetrics.maxGas,
"cumulativeGas", s.execMetrics.cumulativeGas,
"longestRunningTxHash", s.execMetrics.longestRunningTxHash,
"longestRunningTxDuration", s.execMetrics.longestRunningTxDuration.Milliseconds(),
"iterations", iterations,
"sync", s.synchronous,
"workers", s.workers)

return s.collectResponses(tasks), nil
return r, nil
}

func (s *scheduler) shouldRerun(task *deliverTxTask) bool {
Expand Down Expand Up @@ -552,7 +583,14 @@ func (s *scheduler) executeTask(task *deliverTxTask) {

s.prepareTask(task)

startTime := time.Now()
resp := s.deliverTx(task.Ctx, task.Request, task.SdkTx, task.Checksum)
duration := time.Since(startTime)
if duration > s.execMetrics.longestRunningTxDuration {
s.execMetrics.longestRunningTxDuration = duration
s.execMetrics.longestRunningTxHash = fmt.Sprintf("%X", sha256.Sum256(task.Request.Tx))
}

// close the abort channel
close(task.AbortCh)
abort, ok := <-task.AbortCh
Expand All @@ -570,6 +608,7 @@ func (s *scheduler) executeTask(task *deliverTxTask) {

task.SetStatus(statusExecuted)
task.Response = &resp
task.CumulativeGas += resp.GasUsed

// write from version store to multiversion stores
for _, v := range task.VersionStores {
Expand Down
Loading