From 5c76ddd3770e1c047d6009c735ae80570357b0ae Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 2 Jun 2025 17:26:47 -0400 Subject: [PATCH 1/3] add gas indicators on log --- tasks/scheduler.go | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/tasks/scheduler.go b/tasks/scheduler.go index a31ac1e9..5e651655 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -202,16 +202,26 @@ func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTa return allTasks, tasksMap } -func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { +type execMetrics struct { + totalGas int64 + maxGas int64 +} + +func (s *scheduler) collectResponses(tasks []*deliverTxTask) ([]types.ResponseDeliverTx, *execMetrics) { res := make([]types.ResponseDeliverTx, 0, len(tasks)) + m := &execMetrics{} for _, t := range tasks { res = append(res, *t.Response) + if t.Response.GasUsed > m.maxGas { + m.maxGas = t.Response.GasUsed + } + m.totalGas += t.Response.GasUsed if t.TxTracer != nil { t.TxTracer.Commit() } } - return res + return res, m } func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) { @@ -346,9 +356,21 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t } s.metrics.maxIncarnation = s.maxIncarnation - ctx.Logger().Info("occ scheduler", "height", ctx.BlockHeight(), "txs", len(tasks), "latency_ms", time.Since(startTime).Milliseconds(), "retries", s.metrics.retries, "maxIncarnation", s.maxIncarnation, "iterations", iterations, "sync", s.synchronous, "workers", s.workers) - - return s.collectResponses(tasks), nil + r, m := s.collectResponses(tasks) + + ctx.Logger().Info("occ scheduler", + "height", ctx.BlockHeight(), + "txs", len(tasks), + "latency_ms", time.Since(startTime).Milliseconds(), + "retries", s.metrics.retries, + "maxIncarnation", s.maxIncarnation, + "totalGas", m.totalGas, + "maxGas", m.maxGas, + "iterations", iterations, + "sync", s.synchronous, + "workers", s.workers) + + return r, nil } func (s *scheduler) shouldRerun(task *deliverTxTask) bool { From 2867e5d1abfa98417d4140d7a65ab9e2ec0e6b15 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 2 Jun 2025 17:39:24 -0400 Subject: [PATCH 2/3] add cumulative gas --- tasks/scheduler.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tasks/scheduler.go b/tasks/scheduler.go index 5e651655..b7abcd58 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -56,6 +56,7 @@ type deliverTxTask struct { Response *types.ResponseDeliverTx VersionStores map[sdk.StoreKey]*multiversion.VersionIndexedStore TxTracer sdk.TxTracer + CumulativeGas int64 } // AppendDependencies appends the given indexes to the task's dependencies @@ -203,8 +204,9 @@ func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTa } type execMetrics struct { - totalGas int64 - maxGas int64 + totalGas int64 + maxGas int64 + cumulativeGas int64 } func (s *scheduler) collectResponses(tasks []*deliverTxTask) ([]types.ResponseDeliverTx, *execMetrics) { @@ -216,6 +218,7 @@ func (s *scheduler) collectResponses(tasks []*deliverTxTask) ([]types.ResponseDe m.maxGas = t.Response.GasUsed } m.totalGas += t.Response.GasUsed + m.cumulativeGas += t.CumulativeGas if t.TxTracer != nil { t.TxTracer.Commit() @@ -366,6 +369,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t "maxIncarnation", s.maxIncarnation, "totalGas", m.totalGas, "maxGas", m.maxGas, + "cumulativeGas", m.cumulativeGas, "iterations", iterations, "sync", s.synchronous, "workers", s.workers) @@ -592,6 +596,7 @@ func (s *scheduler) executeTask(task *deliverTxTask) { task.SetStatus(statusExecuted) task.Response = &resp + task.CumulativeGas += resp.GasUsed // write from version store to multiversion stores for _, v := range task.VersionStores { From c1ef505e01cab5bf3302ebd226240e398d391f2a Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Mon, 2 Jun 2025 21:03:11 -0500 Subject: [PATCH 3/3] Add metrics for longest running tx --- tasks/scheduler.go | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/tasks/scheduler.go b/tasks/scheduler.go index b7abcd58..cebf966f 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -113,6 +113,7 @@ type scheduler struct { metrics *schedulerMetrics synchronous bool // true if maxIncarnation exceeds threshold maxIncarnation int // current highest incarnation + execMetrics *execMetrics } // NewScheduler creates a new scheduler @@ -122,6 +123,7 @@ func NewScheduler(workers int, tracingInfo *tracing.Info, deliverTxFunc func(ctx deliverTx: deliverTxFunc, tracingInfo: tracingInfo, metrics: &schedulerMetrics{}, + execMetrics: &execMetrics{}, } } @@ -204,27 +206,28 @@ func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTa } type execMetrics struct { - totalGas int64 - maxGas int64 - cumulativeGas int64 + totalGas int64 + maxGas int64 + cumulativeGas int64 + longestRunningTxHash string + longestRunningTxDuration time.Duration } -func (s *scheduler) collectResponses(tasks []*deliverTxTask) ([]types.ResponseDeliverTx, *execMetrics) { +func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { res := make([]types.ResponseDeliverTx, 0, len(tasks)) - m := &execMetrics{} for _, t := range tasks { res = append(res, *t.Response) - if t.Response.GasUsed > m.maxGas { - m.maxGas = t.Response.GasUsed + if t.Response.GasUsed > s.execMetrics.maxGas { + s.execMetrics.maxGas = t.Response.GasUsed } - m.totalGas += t.Response.GasUsed - m.cumulativeGas += t.CumulativeGas + s.execMetrics.totalGas += t.Response.GasUsed + s.execMetrics.cumulativeGas += t.CumulativeGas if t.TxTracer != nil { t.TxTracer.Commit() } } - return res, m + return res } func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) { @@ -359,7 +362,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t } s.metrics.maxIncarnation = s.maxIncarnation - r, m := s.collectResponses(tasks) + r := s.collectResponses(tasks) ctx.Logger().Info("occ scheduler", "height", ctx.BlockHeight(), @@ -367,9 +370,11 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t "latency_ms", time.Since(startTime).Milliseconds(), "retries", s.metrics.retries, "maxIncarnation", s.maxIncarnation, - "totalGas", m.totalGas, - "maxGas", m.maxGas, - "cumulativeGas", m.cumulativeGas, + "totalGas", s.execMetrics.totalGas, + "maxGas", s.execMetrics.maxGas, + "cumulativeGas", s.execMetrics.cumulativeGas, + "longestRunningTxHash", s.execMetrics.longestRunningTxHash, + "longestRunningTxDuration", s.execMetrics.longestRunningTxDuration.Milliseconds(), "iterations", iterations, "sync", s.synchronous, "workers", s.workers) @@ -578,7 +583,14 @@ func (s *scheduler) executeTask(task *deliverTxTask) { s.prepareTask(task) + startTime := time.Now() resp := s.deliverTx(task.Ctx, task.Request, task.SdkTx, task.Checksum) + duration := time.Since(startTime) + if duration > s.execMetrics.longestRunningTxDuration { + s.execMetrics.longestRunningTxDuration = duration + s.execMetrics.longestRunningTxHash = fmt.Sprintf("%X", sha256.Sum256(task.Request.Tx)) + } + // close the abort channel close(task.AbortCh) abort, ok := <-task.AbortCh