diff --git a/services/api/optimistic_test.go b/services/api/optimistic_test.go index 6635b753..39e7e1ec 100644 --- a/services/api/optimistic_test.go +++ b/services/api/optimistic_test.go @@ -1,6 +1,7 @@ package api import ( + "bytes" "context" "encoding/json" "fmt" @@ -12,6 +13,7 @@ import ( "time" "github.com/alicebob/miniredis/v2" + "github.com/attestantio/go-builder-client/api/capella" v1 "github.com/attestantio/go-builder-client/api/v1" "github.com/attestantio/go-eth2-client/spec/bellatrix" consensuscapella "github.com/attestantio/go-eth2-client/spec/capella" @@ -640,3 +642,81 @@ func TestBuilderApiSubmitNewBlockOptimisticV2_full(t *testing.T) { }) } } + +func TestBuilderApiOptimisticV2SlowPath_fail_ssz_decode_header(t *testing.T) { + pubkey, _ := generateKeyPair(t) + backend := startTestBackend(t, pubkey) + outBytes := make([]byte, 944) // 944 bytes is min required to try ssz decoding. + outBytes[0] = 0xaa + + r := bytes.NewReader(outBytes) + + backend.relay.optimisticV2SlowPath(r, v2SlowPathOpts{ + payload: &common.BuilderSubmitBlockRequest{ + Capella: &capella.SubmitBlockRequest{ + Message: &v1.BidTrace{ + BuilderPubkey: *pubkey, + }, + }, + }, + }) + + // Check that demotion occurred. + mockDB, ok := backend.relay.db.(*database.MockDB) + require.True(t, ok) + require.Equal(t, mockDB.Demotions[pubkey.String()], true) +} + +func TestBuilderApiOptimisticV2SlowPath(t *testing.T) { + pubkey, secretkey := generateKeyPair(t) + + testReq := common.TestBuilderSubmitBlockRequestV2(secretkey, getTestBidTrace(*pubkey, 10)) + testPayload := &common.BuilderSubmitBlockRequest{ + Capella: &capella.SubmitBlockRequest{ + Message: &v1.BidTrace{ + BuilderPubkey: *pubkey, + }, + ExecutionPayload: &consensuscapella.ExecutionPayload{}, + }, + } + + testCases := []struct { + description string + simError error + expectDemotion bool + }{ + { + description: "success", + }, + { + description: "failure_empty_payload", + simError: errFake, + expectDemotion: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + backend := startTestBackend(t, pubkey) + backend.relay.blockSimRateLimiter = &MockBlockSimulationRateLimiter{ + simulationError: tc.simError, + } + + submissionBytes, err := testReq.MarshalSSZ() + require.NoError(t, err) + + r := bytes.NewReader(submissionBytes) + + opts := v2SlowPathOpts{ + payload: testPayload, + entry: &blockBuilderCacheEntry{}, + } + backend.relay.optimisticV2SlowPath(r, opts) + + // Check demotion status. + mockDB, ok := backend.relay.db.(*database.MockDB) + require.True(t, ok) + require.Equal(t, mockDB.Demotions[pubkey.String()], tc.expectDemotion) + }) + } +} diff --git a/services/api/service.go b/services/api/service.go index 428d8a0d..c9147385 100644 --- a/services/api/service.go +++ b/services/api/service.go @@ -2311,15 +2311,12 @@ func (api *RelayAPI) handleSubmitNewBlockV2(w http.ResponseWriter, req *http.Req remainderReader := bytes.NewReader(remainder) slowPathOpts := v2SlowPathOpts{ - header: &header, - payload: payload, - receivedAt: receivedAt, - eligibleAt: eligibleAt, - pf: pf, - isCancellationEnabled: isCancellationEnabled, - entry: builderEntry, - gasLimit: gasLimit, - pipeliner: tx, + payload: payload, + receivedAt: receivedAt, + eligibleAt: eligibleAt, + pf: pf, + entry: builderEntry, + gasLimit: gasLimit, } // Join the header bytes with the remaining bytes. @@ -2333,21 +2330,114 @@ func (api *RelayAPI) handleSubmitNewBlockV2(w http.ResponseWriter, req *http.Req } type v2SlowPathOpts struct { - header *common.SubmitBlockRequestV2Optimistic - payload *common.BuilderSubmitBlockRequest - receivedAt time.Time - eligibleAt time.Time - pf common.Profile - isCancellationEnabled bool - entry *blockBuilderCacheEntry - gasLimit uint64 - pipeliner redis.Pipeliner + payload *common.BuilderSubmitBlockRequest + receivedAt time.Time + eligibleAt time.Time + pf common.Profile + entry *blockBuilderCacheEntry + gasLimit uint64 } func (api *RelayAPI) optimisticV2SlowPath(r io.Reader, v2Opts v2SlowPathOpts) { log := api.log.WithFields(logrus.Fields{"method": "optimisticV2SlowPath"}) - // TODO(mikeneuder): slow path + payload := v2Opts.payload + msg, err := io.ReadAll(r) + if err != nil { + demotionErr := fmt.Errorf("%w: could not read full message", err) + api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr) + log.WithError(err).Warn("could not read full message") + return + } + + // Unmarshall full request. + var req common.SubmitBlockRequestV2Optimistic + err = req.UnmarshalSSZ(msg) + if err != nil { + demotionErr := fmt.Errorf("%w: could not unmarshall full request", err) + api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr) + log.WithError(err).Warn("could not unmarshall full request") + return + } + + // Fill in txns and withdrawals. + payload.Capella.ExecutionPayload.Transactions = req.Transactions + payload.Capella.ExecutionPayload.Withdrawals = req.Withdrawals + + getPayloadResponse, err := common.BuildGetPayloadResponse(payload) + if err != nil { + demotionErr := fmt.Errorf("%w: could not construct getPayloadResponse", err) + api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr) + log.WithError(err).Warn("could not construct getPayloadResponse") + return + } + + // Create the redis pipeline tx + tx := api.redis.NewTxPipeline() + + // Save payload. + err = api.redis.SaveExecutionPayloadCapella(context.Background(), tx, payload.Slot(), payload.ProposerPubkey(), payload.BlockHash(), getPayloadResponse.Capella.Capella) + if err != nil { + demotionErr := fmt.Errorf("%w: could not save execution payload", err) + api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr) + log.WithError(err).Warn("could not save execution payload") + return + } + + currentTime := time.Now().UTC() + log.WithFields(logrus.Fields{ + "timeStampExecutionPayloadSaved": currentTime.UnixMilli(), + "timeSinceReceivedAt": v2Opts.receivedAt.Sub(currentTime).Milliseconds(), + "timeSinceEligibleAt": v2Opts.eligibleAt.Sub(currentTime).Milliseconds(), + }).Info("v2 execution payload saved") + + // Used to communicate simulation result to the deferred function. + simResultC := make(chan *blockSimResult, 1) + + // Deferred saving of the builder submission to database (whenever this function ends) + defer func() { + savePayloadToDatabase := !api.ffDisablePayloadDBStorage + var simResult *blockSimResult + select { + case simResult = <-simResultC: + case <-time.After(10 * time.Second): + log.Warn("timed out waiting for simulation result") + simResult = &blockSimResult{false, false, nil, nil} + } + + submissionEntry, err := api.db.SaveBuilderBlockSubmission(payload, simResult.requestErr, simResult.validationErr, v2Opts.receivedAt, v2Opts.eligibleAt, simResult.wasSimulated, savePayloadToDatabase, v2Opts.pf, simResult.optimisticSubmission) + if err != nil { + log.WithError(err).WithField("payload", payload).Error("saving builder block submission to database failed") + return + } + + err = api.db.UpsertBlockBuilderEntryAfterSubmission(submissionEntry, simResult.validationErr != nil) + if err != nil { + log.WithError(err).Error("failed to upsert block-builder-entry") + } + }() + + // Simulate the block submission and save to db + timeBeforeValidation := time.Now().UTC() + + log = log.WithFields(logrus.Fields{ + "timestampBeforeValidation": timeBeforeValidation.UTC().UnixMilli(), + }) + + // Construct simulation request. + opts := blockSimOptions{ + isHighPrio: v2Opts.entry.status.IsHighPrio, + log: log, + builder: v2Opts.entry, + req: &common.BuilderBlockValidationRequest{ + BuilderSubmitBlockRequest: *payload, + RegisteredGasLimit: v2Opts.gasLimit, + }, + } + api.processOptimisticBlock(opts, simResultC) + + nextTime := time.Now().UTC() + v2Opts.pf.Simulation = uint64(nextTime.Sub(v2Opts.eligibleAt).Microseconds()) // All done log.Info("received v2 block from builder")