diff --git a/tasks/scheduler.go b/tasks/scheduler.go index a31ac1e9..cebf966f 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -56,6 +56,7 @@ type deliverTxTask struct { Response *types.ResponseDeliverTx VersionStores map[sdk.StoreKey]*multiversion.VersionIndexedStore TxTracer sdk.TxTracer + CumulativeGas int64 } // AppendDependencies appends the given indexes to the task's dependencies @@ -112,6 +113,7 @@ type scheduler struct { metrics *schedulerMetrics synchronous bool // true if maxIncarnation exceeds threshold maxIncarnation int // current highest incarnation + execMetrics *execMetrics } // NewScheduler creates a new scheduler @@ -121,6 +123,7 @@ func NewScheduler(workers int, tracingInfo *tracing.Info, deliverTxFunc func(ctx deliverTx: deliverTxFunc, tracingInfo: tracingInfo, metrics: &schedulerMetrics{}, + execMetrics: &execMetrics{}, } } @@ -202,10 +205,23 @@ func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTa return allTasks, tasksMap } +type execMetrics struct { + totalGas int64 + maxGas int64 + cumulativeGas int64 + longestRunningTxHash string + longestRunningTxDuration time.Duration +} + func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { res := make([]types.ResponseDeliverTx, 0, len(tasks)) for _, t := range tasks { res = append(res, *t.Response) + if t.Response.GasUsed > s.execMetrics.maxGas { + s.execMetrics.maxGas = t.Response.GasUsed + } + s.execMetrics.totalGas += t.Response.GasUsed + s.execMetrics.cumulativeGas += t.CumulativeGas if t.TxTracer != nil { t.TxTracer.Commit() @@ -346,9 +362,24 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t } s.metrics.maxIncarnation = s.maxIncarnation - ctx.Logger().Info("occ scheduler", "height", ctx.BlockHeight(), "txs", len(tasks), "latency_ms", time.Since(startTime).Milliseconds(), "retries", s.metrics.retries, "maxIncarnation", s.maxIncarnation, "iterations", iterations, "sync", s.synchronous, "workers", s.workers) + r := s.collectResponses(tasks) + + ctx.Logger().Info("occ scheduler", + "height", ctx.BlockHeight(), + "txs", len(tasks), + "latency_ms", time.Since(startTime).Milliseconds(), + "retries", s.metrics.retries, + "maxIncarnation", s.maxIncarnation, + "totalGas", s.execMetrics.totalGas, + "maxGas", s.execMetrics.maxGas, + "cumulativeGas", s.execMetrics.cumulativeGas, + "longestRunningTxHash", s.execMetrics.longestRunningTxHash, + "longestRunningTxDuration", s.execMetrics.longestRunningTxDuration.Milliseconds(), + "iterations", iterations, + "sync", s.synchronous, + "workers", s.workers) - return s.collectResponses(tasks), nil + return r, nil } func (s *scheduler) shouldRerun(task *deliverTxTask) bool { @@ -552,7 +583,14 @@ func (s *scheduler) executeTask(task *deliverTxTask) { s.prepareTask(task) + startTime := time.Now() resp := s.deliverTx(task.Ctx, task.Request, task.SdkTx, task.Checksum) + duration := time.Since(startTime) + if duration > s.execMetrics.longestRunningTxDuration { + s.execMetrics.longestRunningTxDuration = duration + s.execMetrics.longestRunningTxHash = fmt.Sprintf("%X", sha256.Sum256(task.Request.Tx)) + } + // close the abort channel close(task.AbortCh) abort, ok := <-task.AbortCh @@ -570,6 +608,7 @@ func (s *scheduler) executeTask(task *deliverTxTask) { task.SetStatus(statusExecuted) task.Response = &resp + task.CumulativeGas += resp.GasUsed // write from version store to multiversion stores for _, v := range task.VersionStores {