Skip to content

Commit 8396708

Browse files
committed
Redis waitgroup error handling, lint, tests
1 parent 00f30db commit 8396708

File tree

3 files changed

+25
-9
lines changed

3 files changed

+25
-9
lines changed

datastore/redis.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
146146
keyBlockBuilderStatus: fmt.Sprintf("%s/%s:block-builder-status", redisPrefix, prefix),
147147
keyLastSlotDelivered: fmt.Sprintf("%s/%s:last-slot-delivered", redisPrefix, prefix),
148148
keyLastHashDelivered: fmt.Sprintf("%s/%s:last-hash-delivered", redisPrefix, prefix),
149+
currentSlot: 0,
149150
}, nil
150151
}
151152

@@ -814,7 +815,7 @@ func (r *RedisCache) SetFloorBidValue(slot uint64, parentHash, proposerPubkey, v
814815
func (r *RedisCache) BeginProcessingSlot(ctx context.Context, slot uint64) (err error) {
815816
// Should never process more than one slot at a time
816817
if r.currentSlot != 0 {
817-
return fmt.Errorf("already processing slot %d", r.currentSlot)
818+
return fmt.Errorf("already processing slot %d", r.currentSlot) //nolint:goerr113
818819
}
819820

820821
keyProcessingSlot := r.keyProcessingSlot(slot)

services/api/optimistic_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ func TestPrepareBuildersForSlot(t *testing.T) {
358358
pkStr := pubkey.String()
359359
// Clear cache.
360360
backend.relay.blockBuildersCache = map[string]*blockBuilderCacheEntry{}
361-
backend.relay.prepareBuildersForSlot(slot + 1)
361+
backend.relay.prepareBuildersForSlot(slot+1, slot)
362362
entry, ok := backend.relay.blockBuildersCache[pkStr]
363363
require.True(t, ok)
364364
require.Equal(t, true, entry.status.IsHighPrio)

services/api/service.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ func (api *RelayAPI) IsReady() bool {
516516
// - Stop returning bids
517517
// - Set ready /readyz to negative status
518518
// - Wait a bit to allow removal of service from load balancer and draining of requests
519-
// - If in the middle of proccessing optimistic blocks, wait for those to finish and release redis lock
519+
// - If in the middle of processing optimistic blocks, wait for those to finish and release redis lock
520520
func (api *RelayAPI) StopServer() (err error) {
521521
// avoid running this twice. setting srvShutdown to true makes /readyz switch to negative status
522522
if wasStopping := api.srvShutdown.Swap(true); wasStopping {
@@ -541,7 +541,10 @@ func (api *RelayAPI) StopServer() (err error) {
541541

542542
// wait for optimistic blocks
543543
api.optimisticBlocksWG.Wait()
544-
api.redis.EndProcessingSlot(context.Background())
544+
err = api.redis.EndProcessingSlot(context.Background())
545+
if err != nil {
546+
api.log.WithError(err).Error("failed to update redis optimistic processing slot")
547+
}
545548

546549
// shutdown
547550
return api.srv.Shutdown(context.Background())
@@ -834,13 +837,19 @@ func (api *RelayAPI) updateProposerDuties(headSlot uint64) {
834837
api.log.Infof("proposer duties updated: %s", strings.Join(_duties, ", "))
835838
}
836839

837-
func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64, prevHeadSlot uint64) {
840+
func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) {
838841
// First wait for this process to finish processing optimistic blocks
839842
api.optimisticBlocksWG.Wait()
840843

841844
// Now we release our lock and wait for all other builder processes to wrap up
842-
api.redis.EndProcessingSlot(context.Background())
843-
api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot + 1)
845+
err := api.redis.EndProcessingSlot(context.Background())
846+
if err != nil {
847+
api.log.WithError(err).Error("failed to update redis optimistic processing slot")
848+
}
849+
err = api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot+1)
850+
if err != nil {
851+
api.log.WithError(err).Error("failed to get redis optimistic processing slot")
852+
}
844853

845854
// Prevent race with StopServer, make sure we don't lock up redis if the server is shutting down
846855
if api.srvShutdown.Load() {
@@ -849,7 +858,10 @@ func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64, prevHeadSlot uint64
849858

850859
// Update the optimistic slot and signal processing of the next slot
851860
api.optimisticSlot.Store(headSlot + 1)
852-
api.redis.BeginProcessingSlot(context.Background(), headSlot + 1)
861+
err = api.redis.BeginProcessingSlot(context.Background(), headSlot+1)
862+
if err != nil {
863+
api.log.WithError(err).Error("failed to update redis optimistic processing slot")
864+
}
853865

854866
builders, err := api.db.GetBlockBuilders()
855867
if err != nil {
@@ -1404,7 +1416,10 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
14041416
}
14051417

14061418
// Wait until optimistic blocks are complete using the redis waitgroup
1407-
api.redis.WaitForSlotComplete(context.Background(), uint64(slot))
1419+
err = api.redis.WaitForSlotComplete(context.Background(), uint64(slot))
1420+
if err != nil {
1421+
api.log.WithError(err).Error("failed to get redis optimistic processing slot")
1422+
}
14081423

14091424
// Check if there is a demotion for the winning block.
14101425
_, err = api.db.GetBuilderDemotion(bidTrace)

0 commit comments

Comments
 (0)