diff --git a/execution/evm/eth_rpc_client.go b/execution/evm/eth_rpc_client.go new file mode 100644 index 000000000..8799a8177 --- /dev/null +++ b/execution/evm/eth_rpc_client.go @@ -0,0 +1,30 @@ +package evm + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" +) + +type ethRPCClient struct { + client *ethclient.Client +} + +func NewEthRPCClient(client *ethclient.Client) EthRPCClient { + return ðRPCClient{client: client} +} + +func (e *ethRPCClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + return e.client.HeaderByNumber(ctx, number) +} + +func (e *ethRPCClient) GetTxs(ctx context.Context) ([]string, error) { + var result []string + err := e.client.Client().CallContext(ctx, &result, "txpoolExt_getTxs") + if err != nil { + return nil, err + } + return result, nil +} diff --git a/execution/evm/eth_rpc_tracing.go b/execution/evm/eth_rpc_tracing.go new file mode 100644 index 000000000..a2842d7f2 --- /dev/null +++ b/execution/evm/eth_rpc_tracing.go @@ -0,0 +1,82 @@ +package evm + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// tracedEthRPCClient wraps an EthRPCClient and records spans for observability. +type tracedEthRPCClient struct { + inner EthRPCClient + tracer trace.Tracer +} + +// withTracingEthRPCClient decorates an EthRPCClient with OpenTelemetry tracing. +func withTracingEthRPCClient(inner EthRPCClient) EthRPCClient { + return &tracedEthRPCClient{ + inner: inner, + tracer: otel.Tracer("ev-node/execution/eth-rpc"), + } +} + +func (t *tracedEthRPCClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + var blockNumber string + if number == nil { + blockNumber = "latest" + } else { + blockNumber = number.String() + } + + ctx, span := t.tracer.Start(ctx, "Eth.GetBlockByNumber", + trace.WithAttributes( + attribute.String("method", "eth_getBlockByNumber"), + attribute.String("block_number", blockNumber), + ), + ) + defer span.End() + + result, err := t.inner.HeaderByNumber(ctx, number) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.String("block_hash", result.Hash().Hex()), + attribute.String("state_root", result.Root.Hex()), + attribute.Int64("gas_limit", int64(result.GasLimit)), + attribute.Int64("gas_used", int64(result.GasUsed)), + attribute.Int64("timestamp", int64(result.Time)), + ) + + return result, nil +} + +func (t *tracedEthRPCClient) GetTxs(ctx context.Context) ([]string, error) { + ctx, span := t.tracer.Start(ctx, "TxPool.GetTxs", + trace.WithAttributes( + attribute.String("method", "txpoolExt_getTxs"), + ), + ) + defer span.End() + + result, err := t.inner.GetTxs(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.Int("tx_count", len(result)), + ) + + return result, nil +} diff --git a/execution/evm/eth_rpc_tracing_test.go b/execution/evm/eth_rpc_tracing_test.go new file mode 100644 index 000000000..832a03fef --- /dev/null +++ b/execution/evm/eth_rpc_tracing_test.go @@ -0,0 +1,301 @@ +package evm + +import ( + "context" + "errors" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +// setupTestEthRPCTracing creates a traced eth RPC client with an in-memory span recorder +func setupTestEthRPCTracing(t *testing.T, mockClient EthRPCClient) (EthRPCClient, *tracetest.SpanRecorder) { + t.Helper() + + // create in-memory span recorder + sr := tracetest.NewSpanRecorder() + tp := trace.NewTracerProvider( + trace.WithSpanProcessor(sr), + ) + t.Cleanup(func() { + _ = tp.Shutdown(context.Background()) + }) + + // set as global provider for the test + otel.SetTracerProvider(tp) + + // create traced client + traced := withTracingEthRPCClient(mockClient) + + return traced, sr +} + +// mockEthRPCClient is a simple mock for testing +type mockEthRPCClient struct { + headerByNumberFn func(ctx context.Context, number *big.Int) (*types.Header, error) + getTxsFn func(ctx context.Context) ([]string, error) +} + +func (m *mockEthRPCClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + if m.headerByNumberFn != nil { + return m.headerByNumberFn(ctx, number) + } + return nil, nil +} + +func (m *mockEthRPCClient) GetTxs(ctx context.Context) ([]string, error) { + if m.getTxsFn != nil { + return m.getTxsFn(ctx) + } + return nil, nil +} + +func TestTracedEthRPCClient_HeaderByNumber_Success(t *testing.T) { + expectedHeader := &types.Header{ + GasLimit: 30000000, + GasUsed: 15000000, + Time: 1234567890, + } + + mockClient := &mockEthRPCClient{ + headerByNumberFn: func(ctx context.Context, number *big.Int) (*types.Header, error) { + return expectedHeader, nil + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + blockNumber := big.NewInt(100) + + header, err := traced.HeaderByNumber(ctx, blockNumber) + + require.NoError(t, err) + require.Equal(t, expectedHeader, header) + + // verify span was created + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "Eth.GetBlockByNumber", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + // verify attributes + attrs := span.Attributes() + requireAttribute(t, attrs, "method", "eth_getBlockByNumber") + requireAttribute(t, attrs, "block_number", "100") + requireAttribute(t, attrs, "block_hash", expectedHeader.Hash().Hex()) + requireAttribute(t, attrs, "state_root", expectedHeader.Root.Hex()) + requireAttribute(t, attrs, "gas_limit", int64(expectedHeader.GasLimit)) + requireAttribute(t, attrs, "gas_used", int64(expectedHeader.GasUsed)) + requireAttribute(t, attrs, "timestamp", int64(expectedHeader.Time)) +} + +func TestTracedEthRPCClient_HeaderByNumber_Latest(t *testing.T) { + expectedHeader := &types.Header{ + GasLimit: 30000000, + GasUsed: 15000000, + Time: 1234567890, + } + + mockClient := &mockEthRPCClient{ + headerByNumberFn: func(ctx context.Context, number *big.Int) (*types.Header, error) { + require.Nil(t, number, "number should be nil for latest block") + return expectedHeader, nil + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + + header, err := traced.HeaderByNumber(ctx, nil) + + require.NoError(t, err) + require.Equal(t, expectedHeader, header) + + // verify span + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "Eth.GetBlockByNumber", span.Name()) + + // verify block_number is "latest" when nil + attrs := span.Attributes() + requireAttribute(t, attrs, "block_number", "latest") +} + +func TestTracedEthRPCClient_HeaderByNumber_Error(t *testing.T) { + expectedErr := errors.New("failed to get block header") + + mockClient := &mockEthRPCClient{ + headerByNumberFn: func(ctx context.Context, number *big.Int) (*types.Header, error) { + return nil, expectedErr + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + blockNumber := big.NewInt(100) + + _, err := traced.HeaderByNumber(ctx, blockNumber) + + require.Error(t, err) + require.Equal(t, expectedErr, err) + + // verify span recorded the error + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "Eth.GetBlockByNumber", span.Name()) + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) + + // verify error event was recorded + events := span.Events() + require.Len(t, events, 1) + require.Equal(t, "exception", events[0].Name) + + // verify block header attributes NOT set on error + attrs := span.Attributes() + for _, attr := range attrs { + key := string(attr.Key) + require.NotEqual(t, "block_hash", key) + require.NotEqual(t, "state_root", key) + require.NotEqual(t, "gas_limit", key) + require.NotEqual(t, "gas_used", key) + } +} + +func TestTracedEthRPCClient_GetTxs_Success(t *testing.T) { + expectedTxs := []string{"0xabcd", "0xef01", "0x2345"} + + mockClient := &mockEthRPCClient{ + getTxsFn: func(ctx context.Context) ([]string, error) { + return expectedTxs, nil + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + + txs, err := traced.GetTxs(ctx) + + require.NoError(t, err) + require.Equal(t, expectedTxs, txs) + + // verify span was created + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "TxPool.GetTxs", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + // verify attributes + attrs := span.Attributes() + requireAttribute(t, attrs, "method", "txpoolExt_getTxs") + requireAttribute(t, attrs, "tx_count", len(expectedTxs)) +} + +func TestTracedEthRPCClient_GetTxs_EmptyPool(t *testing.T) { + mockClient := &mockEthRPCClient{ + getTxsFn: func(ctx context.Context) ([]string, error) { + return []string{}, nil + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + + txs, err := traced.GetTxs(ctx) + + require.NoError(t, err) + require.Empty(t, txs) + + // verify span + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "TxPool.GetTxs", span.Name()) + + // verify tx_count is 0 + attrs := span.Attributes() + requireAttribute(t, attrs, "tx_count", 0) +} + +func TestTracedEthRPCClient_GetTxs_Error(t *testing.T) { + expectedErr := errors.New("failed to get transactions") + + mockClient := &mockEthRPCClient{ + getTxsFn: func(ctx context.Context) ([]string, error) { + return nil, expectedErr + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + + _, err := traced.GetTxs(ctx) + + require.Error(t, err) + require.Equal(t, expectedErr, err) + + // verify span recorded the error + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "TxPool.GetTxs", span.Name()) + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) + + // verify error event was recorded + events := span.Events() + require.Len(t, events, 1) + require.Equal(t, "exception", events[0].Name) + + // verify tx_count NOT set on error + attrs := span.Attributes() + for _, attr := range attrs { + require.NotEqual(t, "tx_count", string(attr.Key)) + } +} + +// requireAttribute is a helper to check span attributes +func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { + t.Helper() + found := false + for _, attr := range attrs { + if string(attr.Key) == key { + found = true + switch v := expected.(type) { + case string: + require.Equal(t, v, attr.Value.AsString()) + case int64: + require.Equal(t, v, attr.Value.AsInt64()) + case int: + require.Equal(t, int64(v), attr.Value.AsInt64()) + default: + t.Fatalf("unsupported attribute type: %T", expected) + } + break + } + } + require.True(t, found, "attribute %s not found", key) +} diff --git a/execution/evm/execution.go b/execution/evm/execution.go index e360867b5..c310af06d 100644 --- a/execution/evm/execution.go +++ b/execution/evm/execution.go @@ -142,13 +142,22 @@ type EngineRPCClient interface { NewPayload(ctx context.Context, payload *engine.ExecutableData, blobHashes []string, parentBeaconBlockRoot string, executionRequests [][]byte) (*engine.PayloadStatusV1, error) } +// EthRPCClient abstracts Ethereum JSON-RPC calls for tracing and testing. +type EthRPCClient interface { + // HeaderByNumber retrieves a block header by number (nil = latest). + HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + + // GetTxs retrieves pending transactions from the transaction pool. + GetTxs(ctx context.Context) ([]string, error) +} + // EngineClient represents a client that interacts with an Ethereum execution engine // through the Engine API. It manages connections to both the engine and standard Ethereum // APIs, and maintains state related to block processing. type EngineClient struct { - engineClient EngineRPCClient // Client for Engine API calls - ethClient *ethclient.Client // Client for standard Ethereum API calls - genesisHash common.Hash // Hash of the genesis block + engineClient EngineRPCClient // Client for Engine API calls + ethClient EthRPCClient // Client for standard Ethereum API calls + genesisHash common.Hash // Hash of the genesis block initialHeight uint64 feeRecipient common.Address // Address to receive transaction fees @@ -171,7 +180,7 @@ type EngineClient struct { // execution and crash recovery. The db is wrapped with a prefix to isolate // EVM execution data from other ev-node data. // When tracingEnabled is true, the client will inject W3C trace context headers -// and wrap Engine API calls with OpenTelemetry spans. +// and wrap Engine API and Eth API calls with OpenTelemetry spans. func NewEngineExecutionClient( ethURL, engineURL string, @@ -197,7 +206,7 @@ func NewEngineExecutionClient( if err != nil { return nil, err } - ethClient := ethclient.NewClient(ethRPC) + rawEthClient := ethclient.NewClient(ethRPC) secret, err := decodeSecret(jwtSecret) if err != nil { @@ -223,12 +232,14 @@ func NewEngineExecutionClient( return nil, err } - // raw engine client + // wrap raw clients with interfaces engineClient := NewEngineRPCClient(rawEngineClient) + ethClient := NewEthRPCClient(rawEthClient) - // if tracing enabled, wrap with traced decorator + // if tracing enabled, wrap with traced decorators if tracingEnabled { engineClient = withTracingEngineRPCClient(engineClient) + ethClient = withTracingEthRPCClient(ethClient) } return &EngineClient{ @@ -298,8 +309,7 @@ func (c *EngineClient) InitChain(ctx context.Context, genesisTime time.Time, ini // GetTxs retrieves transactions from the current execution payload func (c *EngineClient) GetTxs(ctx context.Context) ([][]byte, error) { - var result []string - err := c.ethClient.Client().CallContext(ctx, &result, "txpoolExt_getTxs") + result, err := c.ethClient.GetTxs(ctx) if err != nil { return nil, fmt.Errorf("failed to get tx pool content: %w", err) } diff --git a/execution/evm/go.mod b/execution/evm/go.mod index 083cbe7ad..739aa825c 100644 --- a/execution/evm/go.mod +++ b/execution/evm/go.mod @@ -11,6 +11,7 @@ require ( github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.39.0 + go.opentelemetry.io/otel/sdk v1.39.0 go.opentelemetry.io/otel/trace v1.39.0 google.golang.org/protobuf v1.36.11 ) @@ -20,8 +21,6 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0 // indirect go.opentelemetry.io/otel/metric v1.39.0 // indirect - go.opentelemetry.io/otel/sdk v1.39.0 // indirect - go.opentelemetry.io/proto/otlp v1.9.0 // indirect ) require ( @@ -89,6 +88,7 @@ require ( github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect + go.opentelemetry.io/proto/otlp v1.9.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.47.0 // indirect golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect