Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions core/src/rpc_clients/bundler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,18 @@ pub struct TwExecuteResponse {
}

/// Response from tw_getTransactionHash bundler method
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "status")]
pub enum TwGetTransactionHashResponse {
Pending,
Success { transaction_hash: String },
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TwGetTransactionHashResponse {
/// The transaction hash
pub transaction_hash: Option<String>,
pub enum TwGetTransactionHashStatus {
Pending,
Success,
}

impl BundlerClient {
Expand Down Expand Up @@ -152,12 +159,12 @@ impl BundlerClient {
pub async fn tw_get_transaction_hash(
&self,
transaction_id: &str,
) -> TransportResult<Option<String>> {
) -> TransportResult<TwGetTransactionHashResponse> {
let params = serde_json::json!([transaction_id]);

let response: TwGetTransactionHashResponse =
self.inner.request("tw_getTransactionHash", params).await?;

Ok(response.transaction_hash)
Ok(response)
}
}
6 changes: 3 additions & 3 deletions core/src/rpc_clients/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ impl HeaderInjectingTransport {
.map_err(TransportErrorKind::custom)?;

let status = resp.status();
debug!(%status, "received response from server");
debug!(?status, "received response from server");

// Get response body
let body = resp.bytes().await.map_err(TransportErrorKind::custom)?;
debug!(bytes = body.len(), "retrieved response body");
trace!(body = %String::from_utf8_lossy(&body), "response body");
trace!(body = ?String::from_utf8_lossy(&body), "response body");

// Check for HTTP errors
if !status.is_success() {
Expand Down Expand Up @@ -100,7 +100,7 @@ impl Service<RequestPacket> for HeaderInjectingTransport {
#[inline]
fn call(&mut self, req: RequestPacket) -> Self::Future {
let this = self.clone(); // Clone is cheap - just clones the Arc inside Client
let span = debug_span!("HeaderInjectingTransport", url = %this.url);
let span = debug_span!("HeaderInjectingTransport", url = ?this.url);
Box::pin(this.do_request(req).instrument(span))
}
}
Expand Down
46 changes: 16 additions & 30 deletions executors/src/eip7702_executor/confirm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use alloy::primitives::{Address, TxHash};
use alloy::providers::Provider;
use alloy::rpc::types::TransactionReceipt;
use engine_core::error::{AlloyRpcErrorToEngineError, EngineError};
use engine_core::rpc_clients::TwGetTransactionHashResponse;
use engine_core::{
chain::{Chain, ChainService, RpcCredentials},
execution_options::WebhookOptions,
Expand Down Expand Up @@ -31,11 +32,7 @@ pub struct Eip7702ConfirmationJobData {
pub transaction_id: String,
pub chain_id: u64,
pub bundler_transaction_id: String,
/// ! Deprecated todo: remove this field after all jobs are processed
pub eoa_address: Option<Address>,

// TODO: make non-optional after all jobs are processed
pub sender_details: Option<Eip7702Sender>,
pub sender_details: Eip7702Sender,

pub rpc_credentials: RpcCredentials,
#[serde(default)]
Expand Down Expand Up @@ -189,7 +186,7 @@ where
let chain = chain.with_new_default_headers(chain_auth_headers);

// 2. Get transaction hash from bundler
let transaction_hash_str = chain
let transaction_hash_res = chain
.bundler_client()
.tw_get_transaction_hash(&job_data.bundler_transaction_id)
.await
Expand All @@ -198,16 +195,19 @@ where
})
.map_err_fail()?;

let transaction_hash = match transaction_hash_str {
Some(hash) => hash.parse::<TxHash>().map_err(|e| {
Eip7702ConfirmationError::TransactionHashError {
message: format!("Invalid transaction hash format: {}", e),
}
.fail()
})?,
None => {
let transaction_hash = match transaction_hash_res {
TwGetTransactionHashResponse::Success { transaction_hash } => {
transaction_hash.parse::<TxHash>().map_err(|e| {
Eip7702ConfirmationError::TransactionHashError {
message: format!("Invalid transaction hash format: {}", e),
}
.fail()
})?
}

TwGetTransactionHashResponse::Pending => {
return Err(Eip7702ConfirmationError::TransactionHashError {
message: "Transaction not found".to_string(),
message: "Transaction not yet confirmed".to_string(),
})
.map_err_nack(Some(Duration::from_secs(2)), RequeuePosition::Last);
}
Expand Down Expand Up @@ -262,25 +262,11 @@ where
"Transaction confirmed successfully"
);

// todo: remove this after all jobs are processed
let sender_details = job_data
.sender_details
.clone()
.or_else(|| {
job_data
.eoa_address
.map(|eoa_address| Eip7702Sender::Owner { eoa_address })
})
.ok_or_else(|| Eip7702ConfirmationError::InternalError {
message: "No sender details found".to_string(),
})
.map_err_fail()?;

Ok(Eip7702ConfirmationResult {
transaction_id: job_data.transaction_id.clone(),
transaction_hash,
receipt,
sender_details,
sender_details: job_data.sender_details.clone(),
})
}

Expand Down
40 changes: 10 additions & 30 deletions executors/src/eip7702_executor/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,7 @@ pub struct Eip7702SendJobData {
pub transaction_id: String,
pub chain_id: u64,
pub transactions: Vec<InnerTransaction>,

// !IMPORTANT TODO
// To preserve backwards compatibility with pre-existing queued jobs, we continue keeping the eoa_address field until the next release
// However, we make it optional now, and rely on the Eip7702ExecutionOptions instead
pub eoa_address: Option<Address>,

// We must also keep the execution_options as optional to prevent deserialization errors
// when we remove the eoa_address field, we can make execution_options required
// at runtime we resolve from both, with preference to execution_options
// if both are none, we return an error
#[serde(skip_serializing_if = "Option::is_none")]
pub execution_options: Option<Eip7702ExecutionOptions>,

pub execution_options: Eip7702ExecutionOptions,
pub signing_credential: SigningCredential,
#[serde(default)]
pub webhook_options: Vec<WebhookOptions>,
Expand Down Expand Up @@ -208,24 +196,17 @@ where

let chain = chain.with_new_default_headers(chain_auth_headers);

let owner_address = job_data
.eoa_address
.or(job_data.execution_options.as_ref().map(|e| match e {
Eip7702ExecutionOptions::Owner(o) => o.from,
Eip7702ExecutionOptions::SessionKey(s) => s.session_key_address,
}))
.ok_or(Eip7702SendError::InternalError {
message: "No owner address found".to_string(),
})
.map_err_fail()?;
let owner_address = match &job_data.execution_options {
Eip7702ExecutionOptions::Owner(o) => o.from,
Eip7702ExecutionOptions::SessionKey(s) => s.session_key_address,
};

let account = DelegatedAccount::new(owner_address, chain);

let session_key_target_address =
job_data.execution_options.as_ref().and_then(|e| match e {
Eip7702ExecutionOptions::Owner(_) => None,
Eip7702ExecutionOptions::SessionKey(s) => Some(s.account_address),
});
let session_key_target_address = match &job_data.execution_options {
Eip7702ExecutionOptions::Owner(_) => None,
Eip7702ExecutionOptions::SessionKey(s) => Some(s.account_address),
};

let transactions = match session_key_target_address {
Some(target_address) => {
Expand Down Expand Up @@ -343,8 +324,7 @@ where
transaction_id: job.job.data.transaction_id.clone(),
chain_id: job.job.data.chain_id,
bundler_transaction_id: success_data.result.transaction_id.clone(),
eoa_address: None,
sender_details: Some(success_data.result.sender_details.clone()),
sender_details: success_data.result.sender_details.clone(),
rpc_credentials: job.job.data.rpc_credentials.clone(),
webhook_options: job.job.data.webhook_options.clone(),
})
Expand Down
24 changes: 12 additions & 12 deletions executors/src/eoa/store/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,30 +105,30 @@ impl AtomicEoaExecutorStore {
{
Ok(()) => {
tracing::debug!(
eoa = %self.eoa(),
chain_id = %self.chain_id(),
worker_id = %self.worker_id(),
eoa = ?self.eoa(),
chain_id = self.chain_id(),
worker_id = self.worker_id(),
"Successfully released EOA lock"
);
Ok(self.store)
}
Err(TransactionStoreError::LockLost { .. }) => {
// Lock was already taken over, which is fine for release
tracing::debug!(
eoa = %self.eoa(),
chain_id = %self.chain_id(),
worker_id = %self.worker_id(),
eoa = ?self.eoa(),
chain_id = self.chain_id(),
worker_id = self.worker_id(),
"Lock already released or taken over by another worker"
);
Ok(self.store)
}
Err(e) => {
// Other errors shouldn't fail the worker, just log
tracing::warn!(
eoa = %self.eoa(),
chain_id = %self.chain_id(),
worker_id = %self.worker_id(),
error = %e,
eoa = ?self.eoa(),
chain_id = self.chain_id(),
worker_id = self.worker_id(),
error = ?e,
"Failed to release EOA lock"
);
Ok(self.store)
Expand Down Expand Up @@ -197,7 +197,7 @@ impl AtomicEoaExecutorStore {
tracing::debug!(
retry_count = retry_count,
delay_ms = delay_ms,
eoa = %self.eoa(),
eoa = ?self.eoa(),
chain_id = self.chain_id(),
"Retrying lock check operation"
);
Expand Down Expand Up @@ -312,7 +312,7 @@ impl AtomicEoaExecutorStore {
tracing::debug!(
retry_count = retry_count,
delay_ms = delay_ms,
eoa = %self.eoa,
eoa = ?self.eoa,
chain_id = self.chain_id,
operation = safe_tx.name(),
"Retrying atomic operation"
Expand Down
6 changes: 4 additions & 2 deletions executors/src/eoa/store/borrowed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
valid_results.push(result.clone());
} else {
tracing::warn!(
transaction_id = %transaction_id,
nonce = %result.transaction.nonce,
transaction_id = ?transaction_id,
nonce = result.transaction.nonce,
"Submission result not found in borrowed state, ignoring"
);
}
Expand Down Expand Up @@ -174,6 +174,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
// Update transaction data status
let tx_data_key = self.keys.transaction_data_key_name(transaction_id);
pipeline.hset(&tx_data_key, "status", "pending");
pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce);

// Queue webhook event using user_request from SubmissionResult
let event = EoaExecutorEvent {
Expand Down Expand Up @@ -206,6 +207,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
pipeline.hset(&tx_data_key, "status", "failed");
pipeline.hset(&tx_data_key, "completed_at", now);
pipeline.hset(&tx_data_key, "failure_reason", err.to_string());
pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce);

// Queue webhook event using user_request from SubmissionResult
let event = EoaExecutorEvent {
Expand Down
33 changes: 26 additions & 7 deletions executors/src/eoa/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ impl EoaExecutorStore {
}
// Lock exists, forcefully take it over
tracing::warn!(
eoa = %self.eoa,
chain_id = %self.chain_id,
worker_id = %worker_id,
eoa = ?self.eoa,
chain_id = self.chain_id,
worker_id = worker_id,
"Forcefully taking over EOA lock from stalled worker"
);
// Force set - no expiry, only released by explicit takeover
Expand Down Expand Up @@ -504,13 +504,32 @@ impl EoaExecutorStore {
&self,
limit: u64,
) -> Result<Vec<PendingTransaction>, TransactionStoreError> {
self.peek_pending_transactions_paginated(0, limit).await
}

/// Peek at pending transactions with pagination support
pub async fn peek_pending_transactions_paginated(
&self,
offset: u64,
limit: u64,
) -> Result<Vec<PendingTransaction>, TransactionStoreError> {
if limit == 0 {
return Ok(Vec::new());
}

let pending_key = self.pending_transactions_zset_name();
let mut conn = self.redis.clone();

// Use ZRANGE to peek without removing
let transaction_ids: Vec<PendingTransactionStringWithQueuedAt> = conn
.zrange_withscores(&pending_key, 0, (limit - 1) as isize)
.await?;
// Use ZRANGE to peek without removing, with offset support
let start = offset as isize;
let stop = (offset + limit - 1) as isize;

let transaction_ids: Vec<PendingTransactionStringWithQueuedAt> =
conn.zrange_withscores(&pending_key, start, stop).await?;

if transaction_ids.is_empty() {
return Ok(Vec::new());
}

let mut pipe = twmq::redis::pipe();

Expand Down
2 changes: 2 additions & 0 deletions executors/src/eoa/store/submitted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> {
.zrange(self.keys.recycled_nonces_zset_name(), 0, -1)
.await?;

// filter out nonces that are higher than the highest submitted nonce
// these don't need to be recycled, they'll be used up by incrementing the nonce
let recycled_nonces = recycled_nonces
.into_iter()
.filter(|nonce| *nonce < highest_submitted_nonce)
Expand Down
Loading
Loading