From 21c863cc4acf2eb1a6f7db863800c8237d956af9 Mon Sep 17 00:00:00 2001 From: codchen Date: Thu, 24 Oct 2024 22:53:18 +0800 Subject: [PATCH 1/2] Add invariance check & logs to occ scheduler --- tasks/scheduler.go | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/tasks/scheduler.go b/tasks/scheduler.go index 1be6fa779..45437d5ad 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -6,6 +6,7 @@ import ( "fmt" "sort" "sync" + "sync/atomic" "time" "github.com/cosmos/cosmos-sdk/store/multiversion" @@ -132,7 +133,9 @@ func (s *scheduler) invalidateTask(task *deliverTxTask) { } } -func start(ctx context.Context, ch chan func(), workers int) { +func start(ctx context.Context, ch chan func(), workers int) *atomic.Int32 { + res := new(atomic.Int32) + res.Store(0) for i := 0; i < workers; i++ { go func() { for { @@ -140,11 +143,14 @@ func start(ctx context.Context, ch chan func(), workers int) { case <-ctx.Done(): return case work := <-ch: + res.Add(1) work() + res.Add(-1) } } }() } + return res } func (s *scheduler) DoValidate(work func()) { @@ -306,19 +312,24 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t defer cancel() // execution tasks are limited by workers - start(workerCtx, s.executeCh, workers) + executeBusyCnt := start(workerCtx, s.executeCh, workers) // validation tasks uses length of tasks to avoid blocking on validation - start(workerCtx, s.validateCh, len(tasks)) + validateBusyCnt := start(workerCtx, s.validateCh, len(tasks)) toExecute := tasks for !allValidated(tasks) { // if the max incarnation >= x, we should revert to synchronous if iterations >= maximumIterations { + ctx.Logger().Info(fmt.Sprintf("iteration number %d exceeds max iteration %d, switching to sequential", iterations, maximumIterations)) + if s.synchronous { + ctx.Logger().Error("already synchronous") + } // process synchronously s.synchronous = true startIdx, anyLeft := s.findFirstNonValidated() if !anyLeft { + ctx.Logger().Error("should be impossible to have any non-validated when the outer for loop hasn't exited") break } toExecute = tasks[startIdx:] @@ -329,6 +340,10 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t return nil, err } + if eBusy := executeBusyCnt.Load(); eBusy != 0 { + ctx.Logger().Error(fmt.Sprintf("%d goroutines are still executing when none is expected", eBusy)) + } + // validate returns any that should be re-executed // note this processes ALL tasks, not just those recently executed var err error @@ -336,6 +351,10 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t if err != nil { return nil, err } + + if vBusy := validateBusyCnt.Load(); vBusy != 0 { + ctx.Logger().Error(fmt.Sprintf("%d goroutines are still validating when none is expected", vBusy)) + } // these are retries which apply to metrics s.metrics.retries += len(toExecute) iterations++ @@ -559,6 +578,9 @@ func (s *scheduler) executeTask(task *deliverTxTask) { close(task.AbortCh) abort, ok := <-task.AbortCh if ok { + if s.synchronous { + task.Ctx.Logger().Error("synchronous processing received abort signal", "incarnation", task.Incarnation, "index", task.AbsoluteIndex) + } // if there is an abort item that means we need to wait on the dependent tx task.SetStatus(statusAborted) task.Abort = &abort @@ -573,6 +595,10 @@ func (s *scheduler) executeTask(task *deliverTxTask) { task.SetStatus(statusExecuted) task.Response = &resp + if len(task.VersionStores) == 0 { + task.Ctx.Logger().Error("no version store found when writing to multiversion store", "incarnation", task.Incarnation, "index", task.AbsoluteIndex) + } + // write from version store to multiversion stores for _, v := range task.VersionStores { v.WriteToMultiVersionStore() From 0ec7b874c02dbed49fdfcdad5e4c39ae8303463d Mon Sep 17 00:00:00 2001 From: codchen Date: Fri, 25 Oct 2024 11:23:37 +0800 Subject: [PATCH 2/2] more logs --- store/multiversion/store.go | 10 ++++++++-- tasks/scheduler.go | 6 ++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 693be21ec..1f9dc6161 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -2,6 +2,7 @@ package multiversion import ( "bytes" + "fmt" "sort" "sync" @@ -15,7 +16,7 @@ type MultiVersionStore interface { GetLatest(key []byte) (value MultiVersionValueItem) GetLatestBeforeIndex(index int, key []byte) (value MultiVersionValueItem) Has(index int, key []byte) bool - WriteLatestToStore() + WriteLatestToStore() int SetWriteset(index int, incarnation int, writeset WriteSet) InvalidateWriteset(index int, incarnation int) SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) @@ -396,7 +397,8 @@ func (s *Store) ValidateTransactionState(index int) (bool, []int) { return iteratorValid && readsetValid, conflictIndices } -func (s *Store) WriteLatestToStore() { +func (s *Store) WriteLatestToStore() int { + keysWritten := 0 // sort the keys keys := []string{} s.multiVersionMap.Range(func(key, value interface{}) bool { @@ -413,6 +415,7 @@ func (s *Store) WriteLatestToStore() { mvValue, found := val.(MultiVersionValue).GetLatestNonEstimate() if !found { // this means that at some point, there was an estimate, but we have since removed it so there isn't anything writeable at the key, so we can skip + fmt.Printf("DEBUG: previously estimated key %s now removed and not written", key) continue } // we shouldn't have any ESTIMATE values when performing the write, because we read the latest non-estimate values only @@ -426,10 +429,13 @@ func (s *Store) WriteLatestToStore() { // not. Once we get confirmation that .Delete is guaranteed not to // save the byteslice, then we can assume only a read-only copy is sufficient. s.parentStore.Delete([]byte(key)) + keysWritten++ continue } if mvValue.Value() != nil { s.parentStore.Set([]byte(key), mvValue.Value()) + keysWritten++ } } + return keysWritten } diff --git a/tasks/scheduler.go b/tasks/scheduler.go index 45437d5ad..6f8759f26 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -360,8 +360,10 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t iterations++ } - for _, mv := range s.multiVersionStores { - mv.WriteLatestToStore() + for storeKey, mv := range s.multiVersionStores { + if keysWritten := mv.WriteLatestToStore(); keysWritten > 0 { + ctx.Logger().Info(fmt.Sprintf("writing %d keys from MV store to parent for %s", keysWritten, storeKey.Name())) + } } s.metrics.maxIncarnation = s.maxIncarnation