From 7c815f36de8310201c9570b05a09951bddeebee1 Mon Sep 17 00:00:00 2001 From: Austonst Date: Tue, 26 Sep 2023 12:00:01 -0600 Subject: [PATCH 1/4] Sync progress of optimistic block validation across builder instances via redis --- datastore/redis.go | 55 +++++++++++++++++++++++++++++++++++++++++ services/api/service.go | 20 +++++++++++++-- 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/datastore/redis.go b/datastore/redis.go index f1f0e9b9..86cf6454 100644 --- a/datastore/redis.go +++ b/datastore/redis.go @@ -24,6 +24,7 @@ var ( redisPrefix = "boost-relay" expiryBidCache = 45 * time.Second + expiryLock = 24 * time.Second RedisConfigFieldPubkey = "pubkey" RedisStatsFieldLatestSlot = "latest-slot" @@ -91,6 +92,7 @@ type RedisCache struct { prefixTopBidValue string prefixFloorBid string prefixFloorBidValue string + prefixProcessingSlot string // keys keyValidatorRegistrationTimestamp string @@ -101,6 +103,8 @@ type RedisCache struct { keyBlockBuilderStatus string keyLastSlotDelivered string keyLastHashDelivered string + + currentSlot uint64 } func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) { @@ -132,6 +136,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) { prefixTopBidValue: fmt.Sprintf("%s/%s:top-bid-value", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey prefixFloorBid: fmt.Sprintf("%s/%s:bid-floor", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey prefixFloorBidValue: fmt.Sprintf("%s/%s:bid-floor-value", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey + prefixProcessingSlot: fmt.Sprintf("%s/%s:processing-slot", redisPrefix, prefix), // prefix:slot keyValidatorRegistrationTimestamp: fmt.Sprintf("%s/%s:validator-registration-timestamp", redisPrefix, prefix), keyRelayConfig: fmt.Sprintf("%s/%s:relay-config", redisPrefix, prefix), @@ -190,6 +195,11 @@ func (r *RedisCache) keyFloorBidValue(slot uint64, parentHash, proposerPubkey st return fmt.Sprintf("%s:%d_%s_%s", r.prefixFloorBidValue, slot, parentHash, proposerPubkey) } +// keyProcessingSlot returns the key for the counter of builder processes working on a given slot +func (r *RedisCache) keyProcessingSlot(slot uint64) string { + return fmt.Sprintf("%s:%d", r.prefixProcessingSlot, slot) +} + func (r *RedisCache) GetObj(key string, obj any) (err error) { value, err := r.client.Get(context.Background(), key).Result() if err != nil { @@ -800,6 +810,51 @@ func (r *RedisCache) SetFloorBidValue(slot uint64, parentHash, proposerPubkey, v return err } +// BeginProcessingSlot signals that a builder process is handling blocks for a given slot +func (r *RedisCache) BeginProcessingSlot(ctx context.Context, slot uint64) (err error) { + // Should never process more than one slot at a time + if r.currentSlot != 0 { + return fmt.Errorf("already processing slot %d", r.currentSlot) + } + + keyProcessingSlot := r.keyProcessingSlot(slot) + err = r.client.Incr(ctx, keyProcessingSlot).Err() + if err != nil { + return err + } + r.currentSlot = slot + err = r.client.Expire(ctx, keyProcessingSlot, expiryLock).Err() + return err +} + +// EndProcessingSlot signals that a builder process is done handling blocks for the current slot +func (r *RedisCache) EndProcessingSlot(ctx context.Context) (err error) { + // Do not decrement if called multiple times + if r.currentSlot == 0 { + return nil + } + + keyProcessingSlot := r.keyProcessingSlot(r.currentSlot) + err = r.client.Decr(ctx, keyProcessingSlot).Err() + r.currentSlot = 0 + return err +} + +// WaitForSlotComplete waits for a slot to be completed by all builder processes +func (r *RedisCache) WaitForSlotComplete(ctx context.Context, slot uint64) (err error) { + keyProcessingSlot := r.keyProcessingSlot(slot) + for { + processing, err := r.client.Get(ctx, keyProcessingSlot).Uint64() + if err != nil { + return err + } + if processing == 0 { + return nil + } + time.Sleep(50 * time.Millisecond) + } +} + func (r *RedisCache) NewPipeline() redis.Pipeliner { //nolint:ireturn,nolintlint return r.client.Pipeline() } diff --git a/services/api/service.go b/services/api/service.go index 9dea594f..feb47a41 100644 --- a/services/api/service.go +++ b/services/api/service.go @@ -516,6 +516,7 @@ func (api *RelayAPI) IsReady() bool { // - Stop returning bids // - Set ready /readyz to negative status // - Wait a bit to allow removal of service from load balancer and draining of requests +// - If in the middle of proccessing optimistic blocks, wait for those to finish and release redis lock func (api *RelayAPI) StopServer() (err error) { // avoid running this twice. setting srvShutdown to true makes /readyz switch to negative status if wasStopping := api.srvShutdown.Swap(true); wasStopping { @@ -538,6 +539,10 @@ func (api *RelayAPI) StopServer() (err error) { // wait for any active getPayload call to finish api.getPayloadCallsInFlight.Wait() + // wait for optimistic blocks + api.optimisticBlocksWG.Wait() + api.redis.EndProcessingSlot(context.Background()) + // shutdown return api.srv.Shutdown(context.Background()) } @@ -826,10 +831,21 @@ func (api *RelayAPI) updateProposerDuties(headSlot uint64) { } func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64) { - // Wait until there are no optimistic blocks being processed. Then we can - // safely update the slot. + // First wait for this process to finish processing optimistic blocks api.optimisticBlocksWG.Wait() + + // Now we release our lock and wait for all other builder processes to wrap up + api.redis.EndProcessingSlot(context.Background()) + api.redis.WaitForSlotComplete(context.Background(), headSlot) + + // Prevent race with StopServer, make sure we don't lock up redis if the server is shutting down + if api.srvShutdown.Load() { + return + } + + // Update the optimistic slot and signal processing of the next slot api.optimisticSlot.Store(headSlot + 1) + api.redis.BeginProcessingSlot(context.Background(), headSlot + 1) builders, err := api.db.GetBlockBuilders() if err != nil { From 00f30db31a9c42e2eaac480c1da23d238a244e90 Mon Sep 17 00:00:00 2001 From: Austonst Date: Thu, 5 Oct 2023 10:31:49 -0600 Subject: [PATCH 2/4] Bug fixes: correct proposer API behavior for redis waitgroup and improve missed slot handling --- services/api/service.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/services/api/service.go b/services/api/service.go index feb47a41..a84e4830 100644 --- a/services/api/service.go +++ b/services/api/service.go @@ -759,15 +759,19 @@ func (api *RelayAPI) processNewSlot(headSlot uint64) { // store the head slot api.headSlot.Store(headSlot) - // only for builder-api + // for both apis if api.opts.BlockBuilderAPI || api.opts.ProposerAPI { // update proposer duties in the background go api.updateProposerDuties(headSlot) + } + // for block builder api + if api.opts.BlockBuilderAPI { // update the optimistic slot - go api.prepareBuildersForSlot(headSlot) + go api.prepareBuildersForSlot(headSlot, prevHeadSlot) } + // for proposer api if api.opts.ProposerAPI { go api.datastore.RefreshKnownValidators(api.log, api.beaconClient, headSlot) } @@ -830,13 +834,13 @@ func (api *RelayAPI) updateProposerDuties(headSlot uint64) { api.log.Infof("proposer duties updated: %s", strings.Join(_duties, ", ")) } -func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64) { +func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64, prevHeadSlot uint64) { // First wait for this process to finish processing optimistic blocks api.optimisticBlocksWG.Wait() // Now we release our lock and wait for all other builder processes to wrap up api.redis.EndProcessingSlot(context.Background()) - api.redis.WaitForSlotComplete(context.Background(), headSlot) + api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot + 1) // Prevent race with StopServer, make sure we don't lock up redis if the server is shutting down if api.srvShutdown.Load() { @@ -1399,8 +1403,8 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request) log.WithError(err).Error("failed to increment builder-stats after getPayload") } - // Wait until optimistic blocks are complete. - api.optimisticBlocksWG.Wait() + // Wait until optimistic blocks are complete using the redis waitgroup + api.redis.WaitForSlotComplete(context.Background(), uint64(slot)) // Check if there is a demotion for the winning block. _, err = api.db.GetBuilderDemotion(bidTrace) From 83967081aee664977ea026caf3bc7035ceb787ba Mon Sep 17 00:00:00 2001 From: Austonst Date: Thu, 5 Oct 2023 11:13:35 -0600 Subject: [PATCH 3/4] Redis waitgroup error handling, lint, tests --- datastore/redis.go | 3 ++- services/api/optimistic_test.go | 2 +- services/api/service.go | 29 ++++++++++++++++++++++------- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/datastore/redis.go b/datastore/redis.go index 86cf6454..9cadf740 100644 --- a/datastore/redis.go +++ b/datastore/redis.go @@ -146,6 +146,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) { keyBlockBuilderStatus: fmt.Sprintf("%s/%s:block-builder-status", redisPrefix, prefix), keyLastSlotDelivered: fmt.Sprintf("%s/%s:last-slot-delivered", redisPrefix, prefix), keyLastHashDelivered: fmt.Sprintf("%s/%s:last-hash-delivered", redisPrefix, prefix), + currentSlot: 0, }, nil } @@ -814,7 +815,7 @@ func (r *RedisCache) SetFloorBidValue(slot uint64, parentHash, proposerPubkey, v func (r *RedisCache) BeginProcessingSlot(ctx context.Context, slot uint64) (err error) { // Should never process more than one slot at a time if r.currentSlot != 0 { - return fmt.Errorf("already processing slot %d", r.currentSlot) + return fmt.Errorf("already processing slot %d", r.currentSlot) //nolint:goerr113 } keyProcessingSlot := r.keyProcessingSlot(slot) diff --git a/services/api/optimistic_test.go b/services/api/optimistic_test.go index b7a35b80..e9b33e30 100644 --- a/services/api/optimistic_test.go +++ b/services/api/optimistic_test.go @@ -358,7 +358,7 @@ func TestPrepareBuildersForSlot(t *testing.T) { pkStr := pubkey.String() // Clear cache. backend.relay.blockBuildersCache = map[string]*blockBuilderCacheEntry{} - backend.relay.prepareBuildersForSlot(slot + 1) + backend.relay.prepareBuildersForSlot(slot+1, slot) entry, ok := backend.relay.blockBuildersCache[pkStr] require.True(t, ok) require.Equal(t, true, entry.status.IsHighPrio) diff --git a/services/api/service.go b/services/api/service.go index a84e4830..5523d259 100644 --- a/services/api/service.go +++ b/services/api/service.go @@ -516,7 +516,7 @@ func (api *RelayAPI) IsReady() bool { // - Stop returning bids // - Set ready /readyz to negative status // - Wait a bit to allow removal of service from load balancer and draining of requests -// - If in the middle of proccessing optimistic blocks, wait for those to finish and release redis lock +// - If in the middle of processing optimistic blocks, wait for those to finish and release redis lock func (api *RelayAPI) StopServer() (err error) { // avoid running this twice. setting srvShutdown to true makes /readyz switch to negative status if wasStopping := api.srvShutdown.Swap(true); wasStopping { @@ -541,7 +541,10 @@ func (api *RelayAPI) StopServer() (err error) { // wait for optimistic blocks api.optimisticBlocksWG.Wait() - api.redis.EndProcessingSlot(context.Background()) + err = api.redis.EndProcessingSlot(context.Background()) + if err != nil { + api.log.WithError(err).Error("failed to update redis optimistic processing slot") + } // shutdown return api.srv.Shutdown(context.Background()) @@ -834,13 +837,19 @@ func (api *RelayAPI) updateProposerDuties(headSlot uint64) { api.log.Infof("proposer duties updated: %s", strings.Join(_duties, ", ")) } -func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64, prevHeadSlot uint64) { +func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) { // First wait for this process to finish processing optimistic blocks api.optimisticBlocksWG.Wait() // Now we release our lock and wait for all other builder processes to wrap up - api.redis.EndProcessingSlot(context.Background()) - api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot + 1) + err := api.redis.EndProcessingSlot(context.Background()) + if err != nil { + api.log.WithError(err).Error("failed to update redis optimistic processing slot") + } + err = api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot+1) + if err != nil { + api.log.WithError(err).Error("failed to get redis optimistic processing slot") + } // Prevent race with StopServer, make sure we don't lock up redis if the server is shutting down if api.srvShutdown.Load() { @@ -849,7 +858,10 @@ func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64, prevHeadSlot uint64 // Update the optimistic slot and signal processing of the next slot api.optimisticSlot.Store(headSlot + 1) - api.redis.BeginProcessingSlot(context.Background(), headSlot + 1) + err = api.redis.BeginProcessingSlot(context.Background(), headSlot+1) + if err != nil { + api.log.WithError(err).Error("failed to update redis optimistic processing slot") + } builders, err := api.db.GetBlockBuilders() if err != nil { @@ -1404,7 +1416,10 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request) } // Wait until optimistic blocks are complete using the redis waitgroup - api.redis.WaitForSlotComplete(context.Background(), uint64(slot)) + err = api.redis.WaitForSlotComplete(context.Background(), uint64(slot)) + if err != nil { + api.log.WithError(err).Error("failed to get redis optimistic processing slot") + } // Check if there is a demotion for the winning block. _, err = api.db.GetBuilderDemotion(bidTrace) From d668875c71f546aa391207b17faf0cb48428699b Mon Sep 17 00:00:00 2001 From: Auston Sterling Date: Mon, 18 Mar 2024 15:26:17 -0600 Subject: [PATCH 4/4] Better prevention and handling of redis waitgroup key expiry issues --- datastore/redis.go | 24 +++++++++++++++++++----- services/api/service.go | 5 +++-- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/datastore/redis.go b/datastore/redis.go index 9cadf740..13363808 100644 --- a/datastore/redis.go +++ b/datastore/redis.go @@ -819,13 +819,18 @@ func (r *RedisCache) BeginProcessingSlot(ctx context.Context, slot uint64) (err } keyProcessingSlot := r.keyProcessingSlot(slot) - err = r.client.Incr(ctx, keyProcessingSlot).Err() + + pipe := r.client.TxPipeline() + pipe.Incr(ctx, keyProcessingSlot) + pipe.Expire(ctx, keyProcessingSlot, expiryLock) + _, err = pipe.Exec(ctx) + if err != nil { return err } + r.currentSlot = slot - err = r.client.Expire(ctx, keyProcessingSlot, expiryLock).Err() - return err + return nil } // EndProcessingSlot signals that a builder process is done handling blocks for the current slot @@ -836,9 +841,18 @@ func (r *RedisCache) EndProcessingSlot(ctx context.Context) (err error) { } keyProcessingSlot := r.keyProcessingSlot(r.currentSlot) - err = r.client.Decr(ctx, keyProcessingSlot).Err() + + pipe := r.client.TxPipeline() + pipe.Decr(ctx, keyProcessingSlot) + pipe.Expire(ctx, keyProcessingSlot, expiryLock) + _, err = pipe.Exec(ctx) + + if err != nil { + return err + } + r.currentSlot = 0 - return err + return nil } // WaitForSlotComplete waits for a slot to be completed by all builder processes diff --git a/services/api/service.go b/services/api/service.go index 5523d259..96b56693 100644 --- a/services/api/service.go +++ b/services/api/service.go @@ -844,7 +844,7 @@ func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) { // Now we release our lock and wait for all other builder processes to wrap up err := api.redis.EndProcessingSlot(context.Background()) if err != nil { - api.log.WithError(err).Error("failed to update redis optimistic processing slot") + api.log.WithError(err).Error("failed to unlock redis optimistic processing slot") } err = api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot+1) if err != nil { @@ -860,7 +860,8 @@ func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) { api.optimisticSlot.Store(headSlot + 1) err = api.redis.BeginProcessingSlot(context.Background(), headSlot+1) if err != nil { - api.log.WithError(err).Error("failed to update redis optimistic processing slot") + api.log.WithError(err).Error("failed to lock redis optimistic processing slot") + api.optimisticSlot.Store(0) } builders, err := api.db.GetBlockBuilders()