From c2fe1db337c50b100cea37c636641852db0fdb73 Mon Sep 17 00:00:00 2001 From: Joaquim Verges Date: Fri, 21 Nov 2025 20:56:24 +1300 Subject: [PATCH] Add detailed job lifecycle logging to EOA executor send flow --- aa-types/src/lib.rs | 2 +- core/src/rpc_clients/bundler.rs | 7 +- core/src/signer.rs | 2 +- eip7702-core/src/constants.rs | 2 +- eip7702-core/src/delegated_account.rs | 13 +- eip7702-core/src/lib.rs | 2 +- eip7702-core/src/transaction.rs | 11 +- eip7702-core/tests/integration_tests.rs | 36 ++- executors/src/eip7702_executor/mod.rs | 4 +- executors/src/eoa/store/mod.rs | 4 +- executors/src/eoa/worker/confirm.rs | 3 +- executors/src/eoa/worker/send.rs | 187 +++++++++++++-- executors/src/eoa/worker/transaction.rs | 20 +- executors/src/external_bundler/confirm.rs | 24 +- executors/src/external_bundler/deployment.rs | 40 ++-- executors/src/external_bundler/send.rs | 22 +- executors/src/metrics.rs | 183 ++++++++++----- executors/src/solana_executor/mod.rs | 4 +- executors/src/solana_executor/worker.rs | 212 +++++++++++------- executors/src/transaction_registry.rs | 24 +- integration-tests/src/lib.rs | 51 ++--- integration-tests/tests/setup.rs | 11 +- .../tests/sign_solana_transaction.rs | 6 +- server/src/execution_router/mod.rs | 7 +- server/src/http/error.rs | 11 +- server/src/http/extractors.rs | 6 +- .../src/http/routes/admin/eoa_diagnostics.rs | 127 ++++++++++- server/src/http/routes/admin/metrics.rs | 23 +- server/src/http/routes/admin/mod.rs | 2 +- .../http/routes/sign_solana_transaction.rs | 20 +- server/src/http/routes/solana_transaction.rs | 6 +- server/src/http/server.rs | 6 +- server/src/lib.rs | 4 +- thirdweb-core/src/iaw/mod.rs | 4 +- twmq/benches/throughput.rs | 4 +- twmq/src/lib.rs | 20 +- twmq/src/queue.rs | 2 +- twmq/src/shutdown.rs | 13 +- twmq/tests/basic_hook.rs | 4 +- twmq/tests/delay.rs | 5 +- twmq/tests/fixtures.rs | 5 +- twmq/tests/multilane_batch_pop.rs | 8 +- twmq/tests/prune_race_condition.rs | 96 ++++---- twmq/tests/prune_race_random_ids.rs | 132 +++++++---- 44 files changed, 951 insertions(+), 424 deletions(-) diff --git a/aa-types/src/lib.rs b/aa-types/src/lib.rs index 18cb98e..f44278b 100644 --- a/aa-types/src/lib.rs +++ b/aa-types/src/lib.rs @@ -1,3 +1,3 @@ pub mod userop; -pub use userop::*; \ No newline at end of file +pub use userop::*; diff --git a/core/src/rpc_clients/bundler.rs b/core/src/rpc_clients/bundler.rs index fbf3f3f..09e53ab 100644 --- a/core/src/rpc_clients/bundler.rs +++ b/core/src/rpc_clients/bundler.rs @@ -177,8 +177,11 @@ impl BundlerClient { Ok(response) } - pub async fn tw_get_delegation_contract(&self) -> TransportResult { - let response: TwGetDelegationContractResponse = self.inner.request("tw_getDelegationContract", ()).await?; + pub async fn tw_get_delegation_contract( + &self, + ) -> TransportResult { + let response: TwGetDelegationContractResponse = + self.inner.request("tw_getDelegationContract", ()).await?; Ok(response) } } diff --git a/core/src/signer.rs b/core/src/signer.rs index 9b4614b..181df13 100644 --- a/core/src/signer.rs +++ b/core/src/signer.rs @@ -621,7 +621,7 @@ impl SolanaSigner { solana_sdk::signature::Signature::default(), ); }; - + transaction.signatures[signer_index] = signature; Ok(transaction) diff --git a/eip7702-core/src/constants.rs b/eip7702-core/src/constants.rs index a8e7a56..f034626 100644 --- a/eip7702-core/src/constants.rs +++ b/eip7702-core/src/constants.rs @@ -2,4 +2,4 @@ pub const EIP_7702_DELEGATION_PREFIX: [u8; 3] = [0xef, 0x01, 0x00]; /// EIP-7702 delegation code length (prefix + address) -pub const EIP_7702_DELEGATION_CODE_LENGTH: usize = 23; \ No newline at end of file +pub const EIP_7702_DELEGATION_CODE_LENGTH: usize = 23; diff --git a/eip7702-core/src/delegated_account.rs b/eip7702-core/src/delegated_account.rs index 6b6aa75..9fed9a6 100644 --- a/eip7702-core/src/delegated_account.rs +++ b/eip7702-core/src/delegated_account.rs @@ -28,7 +28,10 @@ impl DelegatedAccount { } /// Check if the EOA has EIP-7702 delegation to the minimal account implementation - pub async fn is_minimal_account(&self, delegation_contract: Option
) -> Result { + pub async fn is_minimal_account( + &self, + delegation_contract: Option
, + ) -> Result { // Get the bytecode at the EOA address using eth_getCode let code = self .chain @@ -65,12 +68,8 @@ impl DelegatedAccount { // Compare with the minimal account implementation address let is_delegated = match delegation_contract { - Some(delegation_contract) => { - target_address == delegation_contract - } - None => { - true - } + Some(delegation_contract) => target_address == delegation_contract, + None => true, }; tracing::debug!( diff --git a/eip7702-core/src/lib.rs b/eip7702-core/src/lib.rs index dff5410..ec0cac1 100644 --- a/eip7702-core/src/lib.rs +++ b/eip7702-core/src/lib.rs @@ -1,3 +1,3 @@ pub mod constants; pub mod delegated_account; -pub mod transaction; \ No newline at end of file +pub mod transaction; diff --git a/eip7702-core/src/transaction.rs b/eip7702-core/src/transaction.rs index 8676d63..336499f 100644 --- a/eip7702-core/src/transaction.rs +++ b/eip7702-core/src/transaction.rs @@ -195,11 +195,18 @@ impl MinimalAccountTransaction { credentials: &SigningCredential, delegation_contract: Address, ) -> Result { - if self.account.is_minimal_account(Some(delegation_contract)).await? { + if self + .account + .is_minimal_account(Some(delegation_contract)) + .await? + { return Ok(self); } - let authorization = self.account.sign_authorization(signer, credentials, delegation_contract).await?; + let authorization = self + .account + .sign_authorization(signer, credentials, delegation_contract) + .await?; self.authorization = Some(authorization); Ok(self) } diff --git a/eip7702-core/tests/integration_tests.rs b/eip7702-core/tests/integration_tests.rs index 82f85ca..893fdfc 100644 --- a/eip7702-core/tests/integration_tests.rs +++ b/eip7702-core/tests/integration_tests.rs @@ -657,7 +657,9 @@ impl TestSetup { async fn test_eip7702_integration() -> Result<(), Box> { // Set up test environment let mut setup = TestSetup::new().await?; - let delegation_contract = setup.delegation_contract.expect("Delegation contract should be set"); + let delegation_contract = setup + .delegation_contract + .expect("Delegation contract should be set"); // Step 1: Fetch and set bytecode from Base Sepolia setup.fetch_and_set_bytecode().await?; @@ -667,11 +669,15 @@ async fn test_eip7702_integration() -> Result<(), Box> { // Step 3: Test is_minimal_account - all should be false initially assert!( - !developer_account.is_minimal_account(Some(delegation_contract)).await?, + !developer_account + .is_minimal_account(Some(delegation_contract)) + .await?, "Developer should not be minimal account initially" ); assert!( - !user_account.is_minimal_account(Some(delegation_contract)).await?, + !user_account + .is_minimal_account(Some(delegation_contract)) + .await?, "User should not be minimal account initially" ); println!("✓ All accounts are not minimal accounts initially"); @@ -696,9 +702,11 @@ async fn test_eip7702_integration() -> Result<(), Box> { .clone() .owner_transaction(&[mint_transaction]) .add_authorization_if_needed( - &setup.signer, + &setup.signer, &setup.developer_credentials, - setup.delegation_contract.expect("Delegation contract should be set") + setup + .delegation_contract + .expect("Delegation contract should be set"), ) .await?; @@ -732,7 +740,9 @@ async fn test_eip7702_integration() -> Result<(), Box> { ); assert!( - developer_account.is_minimal_account(Some(delegation_contract)).await?, + developer_account + .is_minimal_account(Some(delegation_contract)) + .await?, "Developer should be minimal account after minting" ); @@ -740,9 +750,11 @@ async fn test_eip7702_integration() -> Result<(), Box> { // User signs authorization but executor broadcasts it (user has no funds) let user_authorization = user_account .sign_authorization( - &setup.signer, + &setup.signer, &setup.user_credentials, - setup.delegation_contract.expect("Delegation contract should be set") + setup + .delegation_contract + .expect("Delegation contract should be set"), ) .await?; @@ -752,14 +764,18 @@ async fn test_eip7702_integration() -> Result<(), Box> { .await?; assert!( - user_account.is_minimal_account(Some(delegation_contract)).await?, + user_account + .is_minimal_account(Some(delegation_contract)) + .await?, "User (session key granter) should be minimal account after delegation" ); println!("✓ User (session key granter) is now a minimal account (delegated by executor)"); // Step 9: Developer is already delegated via add_authorization_if_needed in owner_transaction assert!( - developer_account.is_minimal_account(Some(delegation_contract)).await?, + developer_account + .is_minimal_account(Some(delegation_contract)) + .await?, "Developer (session key grantee) should already be minimal account from earlier delegation" ); println!("✓ Developer (session key grantee) was already delegated in previous step"); diff --git a/executors/src/eip7702_executor/mod.rs b/executors/src/eip7702_executor/mod.rs index 6191a3e..2f9219d 100644 --- a/executors/src/eip7702_executor/mod.rs +++ b/executors/src/eip7702_executor/mod.rs @@ -1,3 +1,3 @@ -pub mod send; pub mod confirm; -pub mod delegation_cache; \ No newline at end of file +pub mod delegation_cache; +pub mod send; diff --git a/executors/src/eoa/store/mod.rs b/executors/src/eoa/store/mod.rs index ef9db42..c1fa275 100644 --- a/executors/src/eoa/store/mod.rs +++ b/executors/src/eoa/store/mod.rs @@ -582,7 +582,9 @@ impl EoaExecutorStore { let mut pending_transactions: Vec = Vec::new(); let mut deletion_pipe = twmq::redis::pipe(); - for ((transaction_id, queued_at), user_request) in transaction_ids.into_iter().zip(user_requests) { + for ((transaction_id, queued_at), user_request) in + transaction_ids.into_iter().zip(user_requests) + { match user_request { Some(user_request) => { let user_request_parsed = serde_json::from_str(&user_request)?; diff --git a/executors/src/eoa/worker/confirm.rs b/executors/src/eoa/worker/confirm.rs index fd5a045..beb3feb 100644 --- a/executors/src/eoa/worker/confirm.rs +++ b/executors/src/eoa/worker/confirm.rs @@ -376,7 +376,8 @@ impl EoaExecutorWorker { "Found newest transaction for gas bump" ); - let time_since_queuing = EoaExecutorStore::now().saturating_sub(newest_transaction_data.created_at); + let time_since_queuing = + EoaExecutorStore::now().saturating_sub(newest_transaction_data.created_at); if time_since_queuing < NONCE_STALL_LIMIT_MS { tracing::warn!( diff --git a/executors/src/eoa/worker/send.rs b/executors/src/eoa/worker/send.rs index 296ebd4..d90eb52 100644 --- a/executors/src/eoa/worker/send.rs +++ b/executors/src/eoa/worker/send.rs @@ -1,16 +1,19 @@ use alloy::{consensus::Transaction, providers::Provider}; use engine_core::{chain::Chain, error::AlloyRpcErrorToEngineError}; -use crate::eoa::{ - EoaExecutorStore, - store::{BorrowedTransaction, PendingTransaction, SubmissionResult, SubmissionResultType}, - worker::{ - EoaExecutorWorker, - error::{ - EoaExecutorWorkerError, SendContext, is_retryable_preparation_error, - should_update_balance_threshold, +use crate::{ + eoa::{ + EoaExecutorStore, + store::{BorrowedTransaction, PendingTransaction, SubmissionResult, SubmissionResultType}, + worker::{ + EoaExecutorWorker, + error::{ + EoaExecutorWorkerError, SendContext, is_retryable_preparation_error, + should_update_balance_threshold, + }, }, }, + metrics::{calculate_duration_seconds, current_timestamp_ms}, }; const HEALTH_CHECK_INTERVAL_MS: u64 = 60 * 5 * 1000; // 5 minutes in milliseconds @@ -19,10 +22,20 @@ impl EoaExecutorWorker { // ========== SEND FLOW ========== #[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))] pub async fn send_flow(&self) -> Result { + let start_time = current_timestamp_ms(); + // 1. Get EOA health (initializes if needed) and check if we should update balance let mut health = self.get_eoa_health().await?; let now = EoaExecutorStore::now(); + tracing::info!( + duration_seconds = calculate_duration_seconds(start_time, current_timestamp_ms()), + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - send_flow: Got EOA health" + ); + // Update balance if it's stale // TODO: refactor this, very ugly if health.balance <= health.balance_threshold { @@ -57,17 +70,57 @@ impl EoaExecutorWorker { } } + tracing::info!( + duration_seconds = calculate_duration_seconds(start_time, current_timestamp_ms()), + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - send_flow: Completed balance check" + ); + let mut total_sent = 0; // 2. Process recycled nonces first + let recycled_start = current_timestamp_ms(); total_sent += self.process_recycled_nonces().await?; + tracing::info!( + duration_seconds = calculate_duration_seconds(recycled_start, current_timestamp_ms()), + total_duration_seconds = calculate_duration_seconds(start_time, current_timestamp_ms()), + transactions_sent = total_sent, + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - send_flow: Completed processing recycled nonces" + ); + // 3. Only proceed to new nonces if we successfully used all recycled nonces let remaining_recycled = self.store.clean_and_get_recycled_nonces().await?.len(); if remaining_recycled == 0 { let inflight_budget = self.store.get_inflight_budget(self.max_inflight).await?; + + tracing::info!( + duration_seconds = calculate_duration_seconds(start_time, current_timestamp_ms()), + inflight_budget = inflight_budget, + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - send_flow: Got inflight budget" + ); + if inflight_budget > 0 { + let new_tx_start = current_timestamp_ms(); total_sent += self.process_new_transactions(inflight_budget).await?; + + tracing::info!( + duration_seconds = calculate_duration_seconds(new_tx_start, current_timestamp_ms()), + total_duration_seconds = calculate_duration_seconds(start_time, current_timestamp_ms()), + transactions_sent = total_sent, + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - send_flow: Completed processing new transactions" + ); } else { tracing::warn!("No inflight budget, not sending new transactions"); } @@ -78,6 +131,15 @@ impl EoaExecutorWorker { ); } + tracing::info!( + total_sent = total_sent, + total_duration_seconds = calculate_duration_seconds(start_time, current_timestamp_ms()), + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - send_flow: Completed send flow" + ); + Ok(total_sent) } @@ -311,6 +373,7 @@ impl EoaExecutorWorker { return Ok(0); } + let function_start = current_timestamp_ms(); let mut total_sent: usize = 0; let mut remaining_budget = budget; @@ -320,19 +383,44 @@ impl EoaExecutorWorker { break; } + let iteration_start = current_timestamp_ms(); + // Get pending transactions + let pending_start = current_timestamp_ms(); let pending_txs = self .store .peek_pending_transactions(remaining_budget) .await?; + tracing::info!( + duration_seconds = calculate_duration_seconds(pending_start, current_timestamp_ms()), + iteration = iteration, + pending_count = pending_txs.len(), + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - process_new_transactions: Got pending transactions" + ); + if pending_txs.is_empty() { break; } + let nonce_start = current_timestamp_ms(); let optimistic_nonce = self.store.get_optimistic_transaction_count().await?; let batch_size = pending_txs.len().min(remaining_budget as usize); + tracing::info!( + duration_seconds = calculate_duration_seconds(nonce_start, current_timestamp_ms()), + iteration = iteration, + optimistic_nonce = optimistic_nonce, + batch_size = batch_size, + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - process_new_transactions: Got optimistic nonce" + ); + tracing::debug!( iteration = iteration, batch_size = batch_size, @@ -342,6 +430,7 @@ impl EoaExecutorWorker { ); // Build and sign all transactions in parallel with sequential nonces + let build_start = current_timestamp_ms(); let build_tasks: Vec<_> = pending_txs .iter() .take(batch_size) @@ -353,6 +442,17 @@ impl EoaExecutorWorker { .collect(); let prepared_results = futures::future::join_all(build_tasks).await; + + tracing::info!( + duration_seconds = calculate_duration_seconds(build_start, current_timestamp_ms()), + iteration = iteration, + batch_size = batch_size, + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - process_new_transactions: Built and signed transactions" + ); + let prepared_results_with_pending = pending_txs .iter() .take(batch_size) @@ -360,16 +460,28 @@ impl EoaExecutorWorker { .collect::>(); // Clean preparation results (handles failures and removes bad transactions) + let clean_start = current_timestamp_ms(); let cleaned_results = self .clean_prepration_results(prepared_results_with_pending, true) .await?; + tracing::info!( + duration_seconds = calculate_duration_seconds(clean_start, current_timestamp_ms()), + iteration = iteration, + cleaned_count = cleaned_results.len(), + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - process_new_transactions: Cleaned preparation results" + ); + if cleaned_results.is_empty() { // No successful preparations, reduce budget and continue continue; } // Move prepared transactions to borrowed state with incremented nonces + let move_start = current_timestamp_ms(); let moved_count = self .store .atomic_move_pending_to_borrowed_with_incremented_nonces( @@ -380,13 +492,19 @@ impl EoaExecutorWorker { ) .await?; - tracing::debug!( + tracing::info!( + duration_seconds = calculate_duration_seconds(move_start, current_timestamp_ms()), + iteration = iteration, moved_count = moved_count, total_prepared = cleaned_results.len(), - "Moved transactions to borrowed state using incremented nonces" + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - process_new_transactions: Moved transactions to borrowed state" ); // Send the transactions to the blockchain + let send_start = current_timestamp_ms(); let send_tasks: Vec<_> = cleaned_results .iter() .map(|borrowed_tx| { @@ -402,6 +520,16 @@ impl EoaExecutorWorker { let send_results = futures::future::join_all(send_tasks).await; + tracing::info!( + duration_seconds = calculate_duration_seconds(send_start, current_timestamp_ms()), + iteration = iteration, + transactions_sent = cleaned_results.len(), + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - process_new_transactions: Sent transactions to blockchain" + ); + // Process send results and update states let submission_results = send_results .into_iter() @@ -429,17 +557,23 @@ impl EoaExecutorWorker { .collect(); // Use batch processing to handle all submission results + let process_start = current_timestamp_ms(); let processing_report = self .store .process_borrowed_transactions(submission_results, self.webhook_queue.clone()) .await?; - tracing::debug!( - "Processed {} borrowed transactions: {} moved to submitted, {} moved to pending, {} failed", - processing_report.total_processed, - processing_report.moved_to_submitted, - processing_report.moved_to_pending, - processing_report.failed_transactions + tracing::info!( + duration_seconds = calculate_duration_seconds(process_start, current_timestamp_ms()), + iteration = iteration, + total_processed = processing_report.total_processed, + moved_to_submitted = processing_report.moved_to_submitted, + moved_to_pending = processing_report.moved_to_pending, + failed_transactions = processing_report.failed_transactions, + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - process_new_transactions: Processed submission results" ); total_sent += processing_report.moved_to_submitted; @@ -447,8 +581,29 @@ impl EoaExecutorWorker { // Update remaining budget by actual nonce consumption remaining_budget = remaining_budget.saturating_sub(processing_report.moved_to_submitted as u64); + + tracing::info!( + iteration_duration_seconds = calculate_duration_seconds(iteration_start, current_timestamp_ms()), + total_duration_seconds = calculate_duration_seconds(function_start, current_timestamp_ms()), + iteration = iteration, + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - process_new_transactions: Completed iteration" + ); } + tracing::info!( + total_sent = total_sent, + initial_budget = budget, + remaining_budget = remaining_budget, + total_duration_seconds = calculate_duration_seconds(function_start, current_timestamp_ms()), + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - process_new_transactions: Completed processing new transactions" + ); + Ok(total_sent as u32) } } diff --git a/executors/src/eoa/worker/transaction.rs b/executors/src/eoa/worker/transaction.rs index fa234bc..6b03e5b 100644 --- a/executors/src/eoa/worker/transaction.rs +++ b/executors/src/eoa/worker/transaction.rs @@ -20,11 +20,16 @@ use engine_core::{ }; use crate::eoa::{ + EoaTransactionRequest, store::{ BorrowedTransaction, BorrowedTransactionData, PendingTransaction, SubmittedNoopTransaction, - }, worker::{ - error::{is_retryable_preparation_error, is_unsupported_eip1559_error, EoaExecutorWorkerError}, EoaExecutorWorker - }, EoaTransactionRequest + }, + worker::{ + EoaExecutorWorker, + error::{ + EoaExecutorWorkerError, is_retryable_preparation_error, is_unsupported_eip1559_error, + }, + }, }; // Retry constants for preparation phase @@ -397,10 +402,13 @@ impl EoaExecutorWorker { nonce: u64, ) -> Result, EoaExecutorWorkerError> { let typed_tx = self.build_typed_transaction(request, nonce).await?; - + // Inject KMS cache into the signing credential (after deserialization from Redis) - let credential_with_cache = request.signing_credential.clone().with_aws_kms_cache(&self.kms_client_cache); - + let credential_with_cache = request + .signing_credential + .clone() + .with_aws_kms_cache(&self.kms_client_cache); + self.sign_transaction(typed_tx, &credential_with_cache) .await } diff --git a/executors/src/external_bundler/confirm.rs b/executors/src/external_bundler/confirm.rs index bc273be..2da468d 100644 --- a/executors/src/external_bundler/confirm.rs +++ b/executors/src/external_bundler/confirm.rs @@ -15,7 +15,10 @@ use twmq::{ }; use crate::{ - metrics::{record_transaction_queued_to_confirmed, current_timestamp_ms, calculate_duration_seconds_from_twmq}, + metrics::{ + calculate_duration_seconds_from_twmq, current_timestamp_ms, + record_transaction_queued_to_confirmed, + }, transaction_registry::TransactionRegistry, webhook::{ WebhookJobHandler, @@ -215,12 +218,17 @@ where "User operation confirmed on-chain" ); - // 4. Record metrics if original timestamp is available - if let Some(original_timestamp) = job_data.original_queued_timestamp { - let confirmed_timestamp = current_timestamp_ms(); - let queued_to_confirmed_duration = calculate_duration_seconds_from_twmq(original_timestamp, confirmed_timestamp); - record_transaction_queued_to_confirmed("erc4337-external", job_data.chain_id, queued_to_confirmed_duration); - } + // 4. Record metrics if original timestamp is available + if let Some(original_timestamp) = job_data.original_queued_timestamp { + let confirmed_timestamp = current_timestamp_ms(); + let queued_to_confirmed_duration = + calculate_duration_seconds_from_twmq(original_timestamp, confirmed_timestamp); + record_transaction_queued_to_confirmed( + "erc4337-external", + job_data.chain_id, + queued_to_confirmed_duration, + ); + } // 5. Success! Lock cleanup will happen atomically in on_success hook Ok(UserOpConfirmationResult { @@ -274,7 +282,7 @@ where tx: &mut TransactionContext<'_>, ) { // NEVER release lock on NACK - job will be retried with the same lock - + // Only queue webhook for actual errors, not for "waiting for receipt" states let should_queue_webhook = !matches!( nack_data.error, diff --git a/executors/src/external_bundler/deployment.rs b/executors/src/external_bundler/deployment.rs index 364e64f..ac8a969 100644 --- a/executors/src/external_bundler/deployment.rs +++ b/executors/src/external_bundler/deployment.rs @@ -150,7 +150,9 @@ impl DeploymentLock for RedisDeploymentLock { let lock_id = Uuid::new_v4().to_string(); let now = SystemTime::now() .duration_since(UNIX_EPOCH) - .map_err(|e| EngineError::InternalError { message: format!("System time error: {e}") })? + .map_err(|e| EngineError::InternalError { + message: format!("System time error: {e}"), + })? .as_secs(); let lock_data = LockData { @@ -158,22 +160,29 @@ impl DeploymentLock for RedisDeploymentLock { acquired_at: now, }; - let lock_data_str = serde_json::to_string(&lock_data) - .map_err(|e| EngineError::InternalError { message: format!("Serialization failed: {e}") })?; + let lock_data_str = + serde_json::to_string(&lock_data).map_err(|e| EngineError::InternalError { + message: format!("Serialization failed: {e}"), + })?; // Use SET NX EX for atomic acquire - let result: Option = conn - .set_nx(&key, &lock_data_str) - .await - .map_err(|e| EngineError::InternalError { message: format!("Lock acquire failed: {e}") })?; + let result: Option = + conn.set_nx(&key, &lock_data_str) + .await + .map_err(|e| EngineError::InternalError { + message: format!("Lock acquire failed: {e}"), + })?; match result { Some(_) => Ok(AcquireLockResult::Acquired), None => { // Lock already exists, get the lock_id - let existing_data: Option = conn.get(&key).await.map_err(|e| { - EngineError::InternalError { message: format!("Failed to read existing lock: {e}") } - })?; + let existing_data: Option = + conn.get(&key) + .await + .map_err(|e| EngineError::InternalError { + message: format!("Failed to read existing lock: {e}"), + })?; let existing_lock_id = existing_data .and_then(|data| serde_json::from_str::(&data).ok()) @@ -194,11 +203,12 @@ impl DeploymentLock for RedisDeploymentLock { let key = self.lock_key(chain_id, account_address); - let deleted = conn.del::<&str, usize>(&key).await.map_err(|e| { - EngineError::InternalError { message: format!( - "Failed to delete lock for account {account_address}: {e}" - ) } - })?; + let deleted = + conn.del::<&str, usize>(&key) + .await + .map_err(|e| EngineError::InternalError { + message: format!("Failed to delete lock for account {account_address}: {e}"), + })?; Ok(deleted > 0) } diff --git a/executors/src/external_bundler/send.rs b/executors/src/external_bundler/send.rs index 1dc0037..e869451 100644 --- a/executors/src/external_bundler/send.rs +++ b/executors/src/external_bundler/send.rs @@ -27,7 +27,10 @@ use twmq::{ }; use crate::{ - metrics::{record_transaction_queued_to_sent, current_timestamp_ms, calculate_duration_seconds_from_twmq}, + metrics::{ + calculate_duration_seconds_from_twmq, current_timestamp_ms, + record_transaction_queued_to_sent, + }, transaction_registry::TransactionRegistry, webhook::{ WebhookJobHandler, @@ -350,9 +353,7 @@ where DeploymentStatus::BeingDeployed { stale, lock_id } => { return Err(ExternalBundlerSendError::DeploymentLocked { account_address: smart_account.address, - message: format!( - "Deployment in progress (stale: {stale}, lock_id: {lock_id})" - ), + message: format!("Deployment in progress (stale: {stale}, lock_id: {lock_id})"), }) .map_err_nack( Some(Duration::from_secs(if stale { 5 } else { 30 })), @@ -504,10 +505,15 @@ where tracing::debug!(userop_hash = ?user_op_hash, "User operation sent to bundler"); - // Record metrics: transaction queued to sent - let sent_timestamp = current_timestamp_ms(); - let queued_to_sent_duration = calculate_duration_seconds_from_twmq(job.job.created_at, sent_timestamp); - record_transaction_queued_to_sent("erc4337-external", job_data.chain_id, queued_to_sent_duration); + // Record metrics: transaction queued to sent + let sent_timestamp = current_timestamp_ms(); + let queued_to_sent_duration = + calculate_duration_seconds_from_twmq(job.job.created_at, sent_timestamp); + record_transaction_queued_to_sent( + "erc4337-external", + job_data.chain_id, + queued_to_sent_duration, + ); Ok(ExternalBundlerSendResult { account_address: smart_account.address, diff --git a/executors/src/metrics.rs b/executors/src/metrics.rs index 1cf982a..ad9c08d 100644 --- a/executors/src/metrics.rs +++ b/executors/src/metrics.rs @@ -1,5 +1,8 @@ use lazy_static::lazy_static; -use prometheus::{HistogramOpts, Registry, Encoder, TextEncoder, register_histogram_vec_with_registry, HistogramVec}; +use prometheus::{ + Encoder, HistogramOpts, HistogramVec, Registry, TextEncoder, + register_histogram_vec_with_registry, +}; use std::sync::Arc; /// Metrics configuration for executor metrics @@ -20,20 +23,26 @@ impl ExecutorMetrics { HistogramOpts::new( "tw_engine_executor_transaction_queued_to_sent_duration_seconds", "Time from when transaction is queued to when it's sent to the network" - ).buckets(vec![0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0]), + ) + .buckets(vec![ + 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0 + ]), &["executor_type", "chain_id"], registry )?; - + let transaction_queued_to_confirmed_duration = register_histogram_vec_with_registry!( HistogramOpts::new( "tw_engine_executor_transaction_queued_to_confirmed_duration_seconds", "Time from when transaction is queued to when it's confirmed on-chain" - ).buckets(vec![0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1800.0]), + ) + .buckets(vec![ + 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1800.0 + ]), &["executor_type", "chain_id"], registry )?; - + let eoa_job_processing_duration = register_histogram_vec_with_registry!( HistogramOpts::new( "tw_engine_eoa_executor_job_processing_duration_seconds", @@ -48,12 +57,14 @@ impl ExecutorMetrics { HistogramOpts::new( "tw_engine_executor_eoa_degraded_send_duration_seconds", "Duration of EOA transactions that exceeded the send degradation threshold" - ).buckets(vec![5.0, 10.0, 20.0, 30.0, 60.0, 120.0, 300.0, 600.0]), + ) + .buckets(vec![5.0, 10.0, 20.0, 30.0, 60.0, 120.0, 300.0, 600.0]), &["eoa_address", "chain_id"], registry )?; - - let eoa_degraded_confirmation_duration = register_histogram_vec_with_registry!( + + let eoa_degraded_confirmation_duration = + register_histogram_vec_with_registry!( HistogramOpts::new( "tw_engine_executor_eoa_degraded_confirmation_duration_seconds", "Duration of EOA transactions that exceeded the confirmation degradation threshold" @@ -61,12 +72,15 @@ impl ExecutorMetrics { &["eoa_address", "chain_id"], registry )?; - + let eoa_stuck_duration = register_histogram_vec_with_registry!( HistogramOpts::new( "tw_engine_executor_eoa_stuck_duration_seconds", "Duration since last nonce movement for EOAs that are considered stuck" - ).buckets(vec![200.0, 300.0, 600.0, 1200.0, 1800.0, 3600.0, 7200.0, 14400.0]), + ) + .buckets(vec![ + 200.0, 300.0, 600.0, 1200.0, 1800.0, 3600.0, 7200.0, 14400.0 + ]), &["eoa_address", "chain_id", "out_of_funds"], registry )?; @@ -85,14 +99,14 @@ impl ExecutorMetrics { lazy_static! { /// Default metrics registry for executors (fallback if no external registry provided) static ref DEFAULT_EXECUTOR_METRICS_REGISTRY: Registry = Registry::new(); - + /// Default executor metrics instance (used when no external metrics are provided) - static ref DEFAULT_EXECUTOR_METRICS: ExecutorMetrics = + static ref DEFAULT_EXECUTOR_METRICS: ExecutorMetrics = ExecutorMetrics::new(&DEFAULT_EXECUTOR_METRICS_REGISTRY) .expect("Failed to create default executor metrics"); - + /// Global metrics instance - can be set by the binary crate or uses default - static ref EXECUTOR_METRICS_INSTANCE: std::sync::RwLock>> = + static ref EXECUTOR_METRICS_INSTANCE: std::sync::RwLock>> = std::sync::RwLock::new(None); } @@ -112,8 +126,10 @@ fn get_metrics() -> Arc { // Use default metrics if no custom metrics were set // This ensures backward compatibility drop(instance); // Release read lock - Arc::new(ExecutorMetrics::new(&DEFAULT_EXECUTOR_METRICS_REGISTRY) - .expect("Failed to create default metrics")) + Arc::new( + ExecutorMetrics::new(&DEFAULT_EXECUTOR_METRICS_REGISTRY) + .expect("Failed to create default metrics"), + ) } } } @@ -129,16 +145,26 @@ pub fn export_default_metrics() -> Result self.send_degradation_threshold_seconds as f64 { let metrics = get_metrics(); - metrics.eoa_degraded_send_duration + metrics + .eoa_degraded_send_duration .with_label_values(&[&eoa_address.to_string(), &chain_id.to_string()]) .observe(duration_seconds); } } /// Record EOA transaction confirmation metrics with automatic degradation detection - pub fn record_transaction_confirmed(&self, eoa_address: alloy::primitives::Address, chain_id: u64, duration_seconds: f64) { + pub fn record_transaction_confirmed( + &self, + eoa_address: alloy::primitives::Address, + chain_id: u64, + duration_seconds: f64, + ) { // Always record the regular metric record_transaction_queued_to_confirmed("eoa", chain_id, duration_seconds); - + // Only record degraded metric if threshold exceeded (low cardinality) if duration_seconds > self.confirmation_degradation_threshold_seconds as f64 { let metrics = get_metrics(); - metrics.eoa_degraded_confirmation_duration + metrics + .eoa_degraded_confirmation_duration .with_label_values(&[&eoa_address.to_string(), &chain_id.to_string()]) .observe(duration_seconds); } } /// Record stuck EOA metric when nonce hasn't moved for too long - pub fn record_stuck_eoa(&self, eoa_address: alloy::primitives::Address, chain_id: u64, time_since_last_movement_seconds: f64, out_of_funds: bool) { + pub fn record_stuck_eoa( + &self, + eoa_address: alloy::primitives::Address, + chain_id: u64, + time_since_last_movement_seconds: f64, + out_of_funds: bool, + ) { // Only record if EOA is actually stuck (exceeds threshold) if time_since_last_movement_seconds > self.stuck_threshold_seconds as f64 { let metrics = get_metrics(); - metrics.eoa_stuck_duration - .with_label_values(&[&eoa_address.to_string(), &chain_id.to_string(), &out_of_funds.to_string()]) + metrics + .eoa_stuck_duration + .with_label_values(&[ + &eoa_address.to_string(), + &chain_id.to_string(), + &out_of_funds.to_string(), + ]) .observe(time_since_last_movement_seconds); } } @@ -219,15 +269,16 @@ impl EoaMetrics { } } - - /// Helper to calculate duration in seconds from unix timestamps (milliseconds) pub fn calculate_duration_seconds(start_timestamp_ms: u64, end_timestamp_ms: u64) -> f64 { (end_timestamp_ms.saturating_sub(start_timestamp_ms)) as f64 / 1000.0 } /// Helper to calculate duration in seconds when start timestamp is in seconds (TWMQ format) -pub fn calculate_duration_seconds_from_twmq(start_timestamp_seconds: u64, end_timestamp_ms: u64) -> f64 { +pub fn calculate_duration_seconds_from_twmq( + start_timestamp_seconds: u64, + end_timestamp_ms: u64, +) -> f64 { let start_timestamp_ms = start_timestamp_seconds * 1000; calculate_duration_seconds(start_timestamp_ms, end_timestamp_ms) } @@ -247,46 +298,64 @@ mod tests { let start = 1000; let end = 3500; assert_eq!(calculate_duration_seconds(start, end), 2.5); - + // Test edge case where end < start (should return 0) assert_eq!(calculate_duration_seconds(3500, 1000), 0.0); } - + #[test] fn test_calculate_duration_seconds_from_twmq() { // Test TWMQ timestamp (seconds) to milliseconds conversion let start_seconds = 1640995200; // Unix timestamp in seconds let end_ms = 1640995203500; // Unix timestamp in milliseconds (3.5 seconds later) - assert_eq!(calculate_duration_seconds_from_twmq(start_seconds, end_ms), 3.5); - + assert_eq!( + calculate_duration_seconds_from_twmq(start_seconds, end_ms), + 3.5 + ); + // Test edge case where end < start (should return 0) - assert_eq!(calculate_duration_seconds_from_twmq(1640995203, 1640995200000), 0.0); + assert_eq!( + calculate_duration_seconds_from_twmq(1640995203, 1640995200000), + 0.0 + ); } - + #[test] fn test_metrics_recording() { // Test that metrics can be recorded without panicking record_transaction_queued_to_sent("test", 1, 1.5); record_transaction_queued_to_confirmed("test", 1, 10.0); record_eoa_job_processing_time(1, 2.0); - + // Test new EOA metrics abstraction let eoa_metrics = EoaMetrics::new(10, 120, 600); - let test_address = "0x1234567890123456789012345678901234567890".parse().unwrap(); - + let test_address = "0x1234567890123456789012345678901234567890" + .parse() + .unwrap(); + eoa_metrics.record_transaction_sent(test_address, 1, 5.0); // Won't record degradation (below threshold) eoa_metrics.record_transaction_sent(test_address, 1, 15.0); // Will record degradation (above threshold) eoa_metrics.record_transaction_confirmed(test_address, 1, 60.0); // Won't record degradation (below threshold) eoa_metrics.record_transaction_confirmed(test_address, 1, 180.0); // Will record degradation (above threshold) eoa_metrics.record_stuck_eoa(test_address, 1, 900.0, false); // Will record stuck EOA - + // Test that default metrics can be exported - let metrics_output = export_default_metrics().expect("Should be able to export default metrics"); - assert!(metrics_output.contains("tw_engine_executor_transaction_queued_to_sent_duration_seconds")); - assert!(metrics_output.contains("tw_engine_executor_transaction_queued_to_confirmed_duration_seconds")); + let metrics_output = + export_default_metrics().expect("Should be able to export default metrics"); + assert!( + metrics_output + .contains("tw_engine_executor_transaction_queued_to_sent_duration_seconds") + ); + assert!( + metrics_output + .contains("tw_engine_executor_transaction_queued_to_confirmed_duration_seconds") + ); assert!(metrics_output.contains("tw_engine_eoa_executor_job_processing_duration_seconds")); assert!(metrics_output.contains("tw_engine_executor_eoa_degraded_send_duration_seconds")); - assert!(metrics_output.contains("tw_engine_executor_eoa_degraded_confirmation_duration_seconds")); + assert!( + metrics_output + .contains("tw_engine_executor_eoa_degraded_confirmation_duration_seconds") + ); assert!(metrics_output.contains("tw_engine_executor_eoa_stuck_duration_seconds")); } @@ -294,22 +363,28 @@ mod tests { fn test_custom_metrics_registry() { // Test using a custom registry let custom_registry = Registry::new(); - let custom_metrics = ExecutorMetrics::new(&custom_registry).expect("Should create custom metrics"); - + let custom_metrics = + ExecutorMetrics::new(&custom_registry).expect("Should create custom metrics"); + // Initialize with custom metrics initialize_metrics(custom_metrics); - + // Record some metrics record_transaction_queued_to_sent("custom_test", 42, 2.5); - + // Export from custom registry let encoder = TextEncoder::new(); let metric_families = custom_registry.gather(); let mut buffer = Vec::new(); - encoder.encode(&metric_families, &mut buffer).expect("Should encode metrics"); + encoder + .encode(&metric_families, &mut buffer) + .expect("Should encode metrics"); let metrics_output = String::from_utf8(buffer).expect("Should convert to string"); - - assert!(metrics_output.contains("tw_engine_executor_transaction_queued_to_sent_duration_seconds")); + + assert!( + metrics_output + .contains("tw_engine_executor_transaction_queued_to_sent_duration_seconds") + ); assert!(metrics_output.contains("custom_test")); assert!(metrics_output.contains("42")); } diff --git a/executors/src/solana_executor/mod.rs b/executors/src/solana_executor/mod.rs index 9cf80bb..755b273 100644 --- a/executors/src/solana_executor/mod.rs +++ b/executors/src/solana_executor/mod.rs @@ -1,7 +1,7 @@ -pub mod worker; pub mod rpc_cache; pub mod storage; +pub mod worker; -pub use worker::{SolanaExecutorJobHandler, SolanaExecutorJobData, SolanaExecutorResult}; pub use rpc_cache::SolanaRpcCache; pub use storage::{LockError, SolanaTransactionAttempt, SolanaTransactionStorage, TransactionLock}; +pub use worker::{SolanaExecutorJobData, SolanaExecutorJobHandler, SolanaExecutorResult}; diff --git a/executors/src/solana_executor/worker.rs b/executors/src/solana_executor/worker.rs index b58dd77..c1ec06e 100644 --- a/executors/src/solana_executor/worker.rs +++ b/executors/src/solana_executor/worker.rs @@ -1,10 +1,17 @@ +use base64::Engine; use engine_core::{ credentials::SigningCredential, error::{EngineError, SolanaRpcErrorToEngineError}, - execution_options::{solana::{SolanaPriorityFee, SolanaTransactionOptions}, WebhookOptions}, + execution_options::{ + WebhookOptions, + solana::{SolanaPriorityFee, SolanaTransactionOptions}, + }, signer::SolanaSigner, }; -use engine_solana_core::{transaction::{InstructionDataEncoding, SolanaTransaction}, SolanaInstructionData}; +use engine_solana_core::{ + SolanaInstructionData, + transaction::{InstructionDataEncoding, SolanaTransaction}, +}; use serde::{Deserialize, Serialize}; use solana_client::{ nonblocking::rpc_client::RpcClient, @@ -12,12 +19,9 @@ use solana_client::{ }; use solana_commitment_config::{CommitmentConfig, CommitmentLevel}; use solana_sdk::pubkey::Pubkey; -use solana_transaction_status::{ - EncodedTransactionWithStatusMeta, UiTransactionEncoding -}; +use solana_transaction_status::{EncodedTransactionWithStatusMeta, UiTransactionEncoding}; use spl_memo_interface::instruction::build_memo; use std::{sync::Arc, time::Duration}; -use base64::Engine; use tracing::{error, info, warn}; use twmq::{ DurableExecution, FailHookData, NackHookData, Queue, SuccessHookData, UserCancellable, @@ -107,7 +111,9 @@ pub enum SolanaExecutorError { #[serde(rename_all = "camelCase")] PriorityFeeError { inner_error: EngineError }, - #[error("Blockhash expired, retrying with new blockhash (resubmission {submission_attempt_number})")] + #[error( + "Blockhash expired, retrying with new blockhash (resubmission {submission_attempt_number})" + )] #[serde(rename_all = "camelCase")] BlockhashExpired { submission_attempt_number: u32 }, @@ -173,7 +179,7 @@ impl SolanaExecutorError { pub fn is_send_success(&self) -> bool { matches!(self, SolanaExecutorError::TransactionSent { .. }) } - + /// Get the signature if this error contains one pub fn signature(&self) -> Option<&str> { match self { @@ -249,9 +255,14 @@ impl DurableExecution for SolanaExecutorJobHandler { info!(transaction_id = %transaction_id, "Acquired lock"); - let rpc_client = self.rpc_cache.get_or_create(data.transaction.execution_options.chain_id).await; + let rpc_client = self + .rpc_cache + .get_or_create(data.transaction.execution_options.chain_id) + .await; - let result = self.execute_transaction(&rpc_client, data, &lock, job.job.attempts).await; + let result = self + .execute_transaction(&rpc_client, data, &lock, job.job.attempts) + .await; if let Err(e) = lock.release().await { warn!(transaction_id = %transaction_id, error = ?e, "Failed to release lock"); @@ -260,8 +271,6 @@ impl DurableExecution for SolanaExecutorJobHandler { result } - - async fn on_success( &self, job: &BorrowedJob, @@ -322,10 +331,13 @@ impl DurableExecution for SolanaExecutorJobHandler { tx: &mut TransactionContext<'_>, ) { let transaction_id = &job.job.data.transaction_id; - + match nack_data.error { // Special case: TransactionSent is actually a success, send success webhook with stage="send" - SolanaExecutorError::TransactionSent { signature, submission_attempt_number } => { + SolanaExecutorError::TransactionSent { + signature, + submission_attempt_number, + } => { info!( transaction_id = %transaction_id, signature = %signature, @@ -339,12 +351,12 @@ impl DurableExecution for SolanaExecutorJobHandler { signature: String, submission_attempt_number: u32, } - + let payload = TransactionSentPayload { signature: signature.clone(), submission_attempt_number: *submission_attempt_number, }; - + if let Err(e) = self.queue_webhook_with_custom_payload( job, payload, @@ -359,10 +371,10 @@ impl DurableExecution for SolanaExecutorJobHandler { ); } } - + // Don't send webhook for NotYetConfirmed - silent retry SolanaExecutorError::NotYetConfirmed { .. } => {} - + // For all other errors (network errors, RPC errors, etc.), send nack webhook _ => { warn!( @@ -384,7 +396,11 @@ impl DurableExecution for SolanaExecutorJobHandler { impl SolanaExecutorJobHandler { /// Helper to convert Solana RPC errors with context - fn to_engine_solana_error(&self, e: &solana_client::client_error::ClientError, chain_id: &str) -> EngineError { + fn to_engine_solana_error( + &self, + e: &solana_client::client_error::ClientError, + chain_id: &str, + ) -> EngineError { e.to_engine_solana_error(chain_id) } @@ -406,21 +422,25 @@ impl SolanaExecutorJobHandler { error = %e, "Failed to get priority fees" ); - let engine_error = self.to_engine_solana_error(&e, chain_id,); + let engine_error = self.to_engine_solana_error(&e, chain_id); SolanaExecutorError::PriorityFeeError { inner_error: engine_error, } .nack(Some(NETWORK_ERROR_RETRY_DELAY), RequeuePosition::Last) })?; - + fee_history.sort_by_key(|a| a.prioritization_fee); - let percentile_index = ((percentile as f64 / 100.0) * fee_history.len() as f64).round() as usize; - + let percentile_index = + ((percentile as f64 / 100.0) * fee_history.len() as f64).round() as usize; + let result = if percentile_index < fee_history.len() { fee_history[percentile_index].prioritization_fee } else { // Fallback to max if index out of bounds - let fallback = fee_history.last().map(|f| f.prioritization_fee).unwrap_or(0); + let fallback = fee_history + .last() + .map(|f| f.prioritization_fee) + .unwrap_or(0); warn!( chain_id = %chain_id, percentile_index = percentile_index, @@ -453,20 +473,15 @@ impl SolanaExecutorJobHandler { chain_id: &str, ) -> JobResult { let writable_accounts = Self::get_writable_accounts(instructions); - + match priority_fee { SolanaPriorityFee::Auto => { - self.get_percentile_compute_unit_price( - rpc_client, - &writable_accounts, - 75, - chain_id, - ) - .await - } - SolanaPriorityFee::Manual { micro_lamports_per_unit } => { - Ok(*micro_lamports_per_unit) + self.get_percentile_compute_unit_price(rpc_client, &writable_accounts, 75, chain_id) + .await } + SolanaPriorityFee::Manual { + micro_lamports_per_unit, + } => Ok(*micro_lamports_per_unit), SolanaPriorityFee::Percentile { percentile } => { self.get_percentile_compute_unit_price( rpc_client, @@ -479,7 +494,6 @@ impl SolanaExecutorJobHandler { } } - /// Main execution flow: /// 1. Fetch last attempt from storage /// 2. If attempt exists, check if confirmed @@ -497,8 +511,14 @@ impl SolanaExecutorJobHandler { let transaction_id = &job_data.transaction_id; let chain_id_str = &job_data.transaction.execution_options.chain_id; let signer_address = job_data.transaction.execution_options.signer_address; - let commitment_level = job_data.transaction.execution_options.commitment.to_commitment_level(); - let commitment = CommitmentConfig { commitment: commitment_level }; + let commitment_level = job_data + .transaction + .execution_options + .commitment + .to_commitment_level(); + let commitment = CommitmentConfig { + commitment: commitment_level, + }; let max_retries = job_data.transaction.execution_options.max_blockhash_retries; // Verify we still hold the lock @@ -517,7 +537,10 @@ impl SolanaExecutorJobHandler { let submission_attempt_number = attempt.submission_attempt_number; // Step 3: Check if transaction is confirmed - match rpc_client.get_signature_status_with_commitment(&signature, commitment).await { + match rpc_client + .get_signature_status_with_commitment(&signature, commitment) + .await + { Ok(Some(Ok(()))) => { // Transaction confirmed! Fetch full transaction details info!( @@ -526,13 +549,16 @@ impl SolanaExecutorJobHandler { submission_attempt_number = submission_attempt_number, "Transaction confirmed, fetching details" ); - + let transaction_details = rpc_client - .get_transaction_with_config(&signature, RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Json), - commitment: Some(commitment), - max_supported_transaction_version: Some(0), - }) + .get_transaction_with_config( + &signature, + RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Json), + commitment: Some(commitment), + max_supported_transaction_version: Some(0), + }, + ) .await .map_err(|e| { error!( @@ -541,7 +567,8 @@ impl SolanaExecutorJobHandler { error = %e, "Failed to fetch transaction details" ); - let engine_error = self.to_engine_solana_error(&e, chain_id_str.as_str()); + let engine_error = + self.to_engine_solana_error(&e, chain_id_str.as_str()); SolanaExecutorError::RpcError { inner_error: engine_error, } @@ -576,7 +603,8 @@ impl SolanaExecutorJobHandler { } Ok(None) => { // Step 4: Not confirmed yet, check if blockhash is still valid - let time_since_sent = crate::metrics::current_timestamp_ms().saturating_sub(attempt.sent_at); + let time_since_sent = + crate::metrics::current_timestamp_ms().saturating_sub(attempt.sent_at); let min_wait_before_resubmit_ms = 30_000; // Wait at least 30 seconds before resubmitting if time_since_sent < min_wait_before_resubmit_ms { @@ -586,9 +614,17 @@ impl SolanaExecutorJobHandler { } .nack(Some(CONFIRMATION_RETRY_DELAY), RequeuePosition::Last)); } - + // Check blockhash validity - match rpc_client.is_blockhash_valid(&attempt.blockhash, CommitmentConfig { commitment: CommitmentLevel::Finalized }).await { + match rpc_client + .is_blockhash_valid( + &attempt.blockhash, + CommitmentConfig { + commitment: CommitmentLevel::Finalized, + }, + ) + .await + { Ok(true) => { // Blockhash still valid, not confirmed yet - retry return Err(SolanaExecutorError::NotYetConfirmed { @@ -598,7 +634,7 @@ impl SolanaExecutorJobHandler { } Ok(false) => { // Blockhash expired - + // For serialized transactions with existing signatures, we cannot retry with a new blockhash // because the signatures will become invalid. Check if there are any non-default signatures. if let engine_solana_core::transaction::SolanaTransactionInput::Serialized (t) = &job_data.transaction.input { @@ -628,7 +664,7 @@ impl SolanaExecutorJobHandler { // If no signatures, we can retry - will be signed during execution } } - + // For instruction-based transactions or serialized without signatures, we can retry with a new blockhash warn!( transaction_id = %transaction_id, @@ -675,7 +711,8 @@ impl SolanaExecutorJobHandler { error = %e, "RPC error checking blockhash" ); - let engine_error = self.to_engine_solana_error(&e, chain_id_str.as_str()); + let engine_error = + self.to_engine_solana_error(&e, chain_id_str.as_str()); return Err(SolanaExecutorError::RpcError { inner_error: engine_error, } @@ -700,7 +737,10 @@ impl SolanaExecutorJobHandler { } // Step 5: No attempt exists or blockhash expired - send new transaction - let submission_attempt_number = stored_attempt.as_ref().map(|a| a.submission_attempt_number + 1).unwrap_or(1); + let submission_attempt_number = stored_attempt + .as_ref() + .map(|a| a.submission_attempt_number + 1) + .unwrap_or(1); // Check job attempt limits if stored_attempt.is_none() && job_attempt_number > MAX_SEND_ATTEMPTS_WITHOUT_TRANSACTION { @@ -709,11 +749,12 @@ impl SolanaExecutorJobHandler { job_attempt_number = job_attempt_number, "Max send attempts exceeded" ); - return Err(SolanaExecutorError::MaxRetriesExceeded { - max_retries: MAX_SEND_ATTEMPTS_WITHOUT_TRANSACTION - }.fail()); + return Err(SolanaExecutorError::MaxRetriesExceeded { + max_retries: MAX_SEND_ATTEMPTS_WITHOUT_TRANSACTION, + } + .fail()); } - + if submission_attempt_number > max_retries + 1 { error!( transaction_id = %transaction_id, @@ -750,26 +791,37 @@ impl SolanaExecutorJobHandler { let versioned_tx = match &job_data.transaction.input { engine_solana_core::transaction::SolanaTransactionInput::Instructions(i) => { // For instruction-based transactions: calculate priority fees and apply execution options - let compute_unit_price = if let Some(priority_fee) = &job_data.transaction.execution_options.priority_fee { - Some(self.get_compute_unit_price(priority_fee, &i.instructions, rpc_client, chain_id_str.as_str()).await?) + let compute_unit_price = if let Some(priority_fee) = + &job_data.transaction.execution_options.priority_fee + { + Some( + self.get_compute_unit_price( + priority_fee, + &i.instructions, + rpc_client, + chain_id_str.as_str(), + ) + .await?, + ) } else { None }; - + // Add memo instruction with transaction_id for unique signatures // This ensures that even with the same blockhash, each resubmission has a unique signature let memo_data = format!("thirdweb-engine:{}", transaction_id); let memo_ix = build_memo(&spl_memo_interface::v3::id(), memo_data.as_bytes(), &[]); - + let mut instructions_with_memo = i.instructions.clone(); - let memo_data_base64 = base64::engine::general_purpose::STANDARD.encode(memo_data.as_bytes()); + let memo_data_base64 = + base64::engine::general_purpose::STANDARD.encode(memo_data.as_bytes()); instructions_with_memo.push(SolanaInstructionData { program_id: memo_ix.program_id, accounts: vec![], data: memo_data_base64, encoding: InstructionDataEncoding::Base64, }); - + let solana_tx = SolanaTransaction { input: engine_solana_core::transaction::SolanaTransactionInput::new_with_instructions(instructions_with_memo), compute_unit_limit: job_data.transaction.execution_options.compute_unit_limit, @@ -861,7 +913,10 @@ impl SolanaExecutorJobHandler { ..Default::default() }; - match rpc_client.send_transaction_with_config(&signed_tx, config).await { + match rpc_client + .send_transaction_with_config(&signed_tx, config) + .await + { Ok(_) => { info!( transaction_id = %transaction_id, @@ -930,7 +985,8 @@ impl SolanaExecutorJobHandler { ) -> JobError { if error.to_string().contains("lock lost") { warn!(transaction_id = %transaction_id, "Lock lost during Redis operation"); - SolanaExecutorError::LockLost.nack(Some(CONFIRMATION_RETRY_DELAY), RequeuePosition::Last) + SolanaExecutorError::LockLost + .nack(Some(CONFIRMATION_RETRY_DELAY), RequeuePosition::Last) } else { SolanaExecutorError::InternalError { message: format!("Redis error: {error}"), @@ -945,20 +1001,20 @@ impl SolanaExecutorJobHandler { /// Bad transactions (invalid signature, insufficient funds, etc.) should not be retried fn is_send_error_retryable(error: &EngineError) -> bool { use engine_core::error::{SolanaRpcErrorKind, SolanaRpcResponseErrorData}; - + match error { - EngineError::SolanaRpcError { kind, .. } => match kind { + EngineError::SolanaRpcError { kind, .. } => match kind { // Network/IO errors are always retryable SolanaRpcErrorKind::Io { .. } => true, SolanaRpcErrorKind::Reqwest { .. } => true, - + // RPC errors need more inspection SolanaRpcErrorKind::RpcError { data, message, .. } => { // Check if it's a preflight failure if let SolanaRpcResponseErrorData::SendTransactionPreflightFailure { .. } = data { return false; } - + // Check message for permanent errors let msg_lower = message.to_lowercase(); if msg_lower.contains("invalid signature") @@ -970,27 +1026,25 @@ fn is_send_error_retryable(error: &EngineError) -> bool { { return false; } - + true } - + // Transaction errors are permanent SolanaRpcErrorKind::TransactionError { .. } => false, - + // Signing errors are permanent SolanaRpcErrorKind::SigningError { .. } => false, - + // JSON/parse errors might be temporary SolanaRpcErrorKind::SerdeJson { .. } => true, - + // Custom errors - check message - SolanaRpcErrorKind::Custom { message } => { - !message.to_lowercase().contains("invalid") - } - + SolanaRpcErrorKind::Custom { message } => !message.to_lowercase().contains("invalid"), + // Unknown errors - be conservative and retry SolanaRpcErrorKind::Unknown { .. } => true, - } + }, // Other engine errors - be conservative and retry _ => true, } diff --git a/executors/src/transaction_registry.rs b/executors/src/transaction_registry.rs index db72613..a63584c 100644 --- a/executors/src/transaction_registry.rs +++ b/executors/src/transaction_registry.rs @@ -1,19 +1,21 @@ -use twmq::redis::{AsyncCommands, aio::ConnectionManager, Pipeline}; use engine_core::error::EngineError; use thiserror::Error; +use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; #[derive(Debug, Error)] pub enum TransactionRegistryError { #[error("Redis error: {0}")] RedisError(#[from] twmq::redis::RedisError), - + #[error("Transaction not found: {transaction_id}")] TransactionNotFound { transaction_id: String }, } impl From for EngineError { fn from(err: TransactionRegistryError) -> Self { - EngineError::InternalError { message: err.to_string() } + EngineError::InternalError { + message: err.to_string(), + } } } @@ -34,7 +36,10 @@ impl TransactionRegistry { } } - pub async fn get_transaction_queue(&self, transaction_id: &str) -> Result, TransactionRegistryError> { + pub async fn get_transaction_queue( + &self, + transaction_id: &str, + ) -> Result, TransactionRegistryError> { let mut conn = self.redis.clone(); let queue_name: Option = conn.hget(self.registry_key(), transaction_id).await?; Ok(queue_name) @@ -46,11 +51,16 @@ impl TransactionRegistry { queue_name: &str, ) -> Result<(), TransactionRegistryError> { let mut conn = self.redis.clone(); - let _: () = conn.hset(self.registry_key(), transaction_id, queue_name).await?; + let _: () = conn + .hset(self.registry_key(), transaction_id, queue_name) + .await?; Ok(()) } - pub async fn remove_transaction(&self, transaction_id: &str) -> Result<(), TransactionRegistryError> { + pub async fn remove_transaction( + &self, + transaction_id: &str, + ) -> Result<(), TransactionRegistryError> { let mut conn = self.redis.clone(); let _: u32 = conn.hdel(self.registry_key(), transaction_id).await?; Ok(()) @@ -65,4 +75,4 @@ impl TransactionRegistry { pub fn add_remove_command(&self, pipeline: &mut Pipeline, transaction_id: &str) { pipeline.hdel(self.registry_key(), transaction_id); } -} \ No newline at end of file +} diff --git a/integration-tests/src/lib.rs b/integration-tests/src/lib.rs index 9d3acaa..62ddf88 100644 --- a/integration-tests/src/lib.rs +++ b/integration-tests/src/lib.rs @@ -1,7 +1,7 @@ use solana_sdk::{ hash::Hash, instruction::Instruction, - message::{v0, VersionedMessage}, + message::{VersionedMessage, v0}, pubkey::Pubkey, signature::Signature, transaction::VersionedTransaction, @@ -11,7 +11,7 @@ use std::str::FromStr; /// Helper function to create a simple SPL token transfer transaction /// This creates a transaction that requires multiple signers (partial signature scenario) pub fn create_spl_token_transfer_transaction( - fee_payer: Pubkey, // Engine wallet (will sign second) + fee_payer: Pubkey, // Engine wallet (will sign second) token_authority: Pubkey, // User wallet (will sign first) token_account: Pubkey, // Source token account destination: Pubkey, // Destination token account @@ -27,28 +27,28 @@ pub fn create_spl_token_transfer_transaction( // This ensures correct instruction layout with decimals and proper discriminator let transfer_ix = spl_token_2022_interface::instruction::transfer_checked( &token_program_id, - &token_account, // Source account - &mint, // Token mint - &destination, // Destination account - &token_authority, // Authority (will be a required signer) - &[], // No multisig signers + &token_account, // Source account + &mint, // Token mint + &destination, // Destination account + &token_authority, // Authority (will be a required signer) + &[], // No multisig signers amount, decimals, )?; // Create the message with fee payer let message = v0::Message::try_compile( - &fee_payer, // Fee payer (engine wallet) + &fee_payer, // Fee payer (engine wallet) &[transfer_ix], - &[], // No address lookup tables + &[], // No address lookup tables recent_blockhash, )?; let message = VersionedMessage::V0(message); - + // Calculate number of required signatures let num_signatures = message.header().num_required_signatures as usize; - + // Initialize with default (empty) signatures let signatures = vec![Signature::default(); num_signatures]; @@ -60,23 +60,20 @@ pub fn create_spl_token_transfer_transaction( /// Helper function to verify transaction signature /// Returns true if the signature at the given index is valid -pub fn verify_signature( - transaction: &VersionedTransaction, - signer_pubkey: &Pubkey, -) -> bool { +pub fn verify_signature(transaction: &VersionedTransaction, signer_pubkey: &Pubkey) -> bool { // Find the signer's index in the account keys let account_keys = transaction.message.static_account_keys(); - - let signer_index = account_keys - .iter() - .position(|key| key == signer_pubkey); - if let Some(index) = signer_index && index < transaction.signatures.len() { + let signer_index = account_keys.iter().position(|key| key == signer_pubkey); + + if let Some(index) = signer_index + && index < transaction.signatures.len() + { // Check if the signature is not default (i.e., has been signed) let sig = &transaction.signatures[index]; return sig != &Signature::default(); } - + false } @@ -93,15 +90,10 @@ pub fn create_system_transfer( recent_blockhash: Hash, ) -> Result { use solana_system_interface::instruction as system_instruction; - + let transfer_ix = system_instruction::transfer(&from, &to, lamports); - - let message = v0::Message::try_compile( - &from, - &[transfer_ix], - &[], - recent_blockhash, - )?; + + let message = v0::Message::try_compile(&from, &[transfer_ix], &[], recent_blockhash)?; let message = VersionedMessage::V0(message); let num_signatures = message.header().num_required_signatures as usize; @@ -112,4 +104,3 @@ pub fn create_system_transfer( message, }) } - diff --git a/integration-tests/tests/setup.rs b/integration-tests/tests/setup.rs index 9262cb1..d105cff 100644 --- a/integration-tests/tests/setup.rs +++ b/integration-tests/tests/setup.rs @@ -14,10 +14,15 @@ use serde::Deserialize; use std::{env, sync::Arc, time::Duration}; use thirdweb_core::{abi::ThirdwebAbiServiceBuilder, auth::ThirdwebAuth, iaw::IAWClient}; use thirdweb_engine::{ - EngineServer, EngineServerState, QueueManager, ThirdwebChainService, + EngineServer, + EngineServerState, + QueueConfig, + QueueManager, + RedisConfig as ServerRedisConfig, + SolanaConfig, + ThirdwebChainService, // Import config types instead of duplicating them - ThirdwebConfig, RedisConfig as ServerRedisConfig, - SolanaConfig, QueueConfig, + ThirdwebConfig, }; use tokio::net::TcpListener; use tracing::info; diff --git a/integration-tests/tests/sign_solana_transaction.rs b/integration-tests/tests/sign_solana_transaction.rs index 7f0bf7d..714318b 100644 --- a/integration-tests/tests/sign_solana_transaction.rs +++ b/integration-tests/tests/sign_solana_transaction.rs @@ -216,11 +216,7 @@ async fn test_transaction_signature_verification() -> Result<()> { let status = response.status(); if !status.is_success() { let error_body = response.text().await?; - anyhow::bail!( - "Request failed with status {}: {}", - status, - error_body - ); + anyhow::bail!("Request failed with status {}: {}", status, error_body); } let sign_response: SignSolanaTransactionResponse = response.json().await?; diff --git a/server/src/execution_router/mod.rs b/server/src/execution_router/mod.rs index 8443054..5a246b7 100644 --- a/server/src/execution_router/mod.rs +++ b/server/src/execution_router/mod.rs @@ -506,8 +506,11 @@ impl ExecutionRouter { &self, request: engine_core::execution_options::solana::SendSolanaTransactionRequest, signing_credential: SigningCredential, - ) -> Result { - use engine_core::execution_options::solana::{QueuedSolanaTransactionResponse, SolanaTransactionOptions}; + ) -> Result + { + use engine_core::execution_options::solana::{ + QueuedSolanaTransactionResponse, SolanaTransactionOptions, + }; let transaction_id = request.idempotency_key.clone(); let chain_id = request.execution_options.chain_id; diff --git a/server/src/http/error.rs b/server/src/http/error.rs index f8ceb3e..09bdb55 100644 --- a/server/src/http/error.rs +++ b/server/src/http/error.rs @@ -1,5 +1,7 @@ use axum::{Json, http::StatusCode, response::IntoResponse}; -use engine_core::error::{ContractInteractionErrorKind, EngineError, RpcErrorKind, SolanaRpcErrorKind}; +use engine_core::error::{ + ContractInteractionErrorKind, EngineError, RpcErrorKind, SolanaRpcErrorKind, +}; use serde_json::json; // Extension trait that lets you pair an error with a status code @@ -86,10 +88,9 @@ impl ApiEngineError { EngineError::AwsKmsSignerError { .. } => StatusCode::BAD_GATEWAY, EngineError::SolanaRpcError { kind, .. } => match kind { SolanaRpcErrorKind::Io { .. } => StatusCode::BAD_GATEWAY, - SolanaRpcErrorKind::Reqwest { status, .. } => { - status.map(|s| StatusCode::from_u16(s).unwrap_or(StatusCode::BAD_GATEWAY)) - .unwrap_or(StatusCode::BAD_GATEWAY) - } + SolanaRpcErrorKind::Reqwest { status, .. } => status + .map(|s| StatusCode::from_u16(s).unwrap_or(StatusCode::BAD_GATEWAY)) + .unwrap_or(StatusCode::BAD_GATEWAY), SolanaRpcErrorKind::RpcError { .. } => StatusCode::BAD_GATEWAY, SolanaRpcErrorKind::SerdeJson { .. } => StatusCode::BAD_GATEWAY, SolanaRpcErrorKind::TransactionError { .. } => StatusCode::BAD_REQUEST, diff --git a/server/src/http/extractors.rs b/server/src/http/extractors.rs index 2492505..760fc06 100644 --- a/server/src/http/extractors.rs +++ b/server/src/http/extractors.rs @@ -7,7 +7,7 @@ use axum::{ }; use engine_core::{ chain::RpcCredentials, - credentials::{AwsKmsCredential, SigningCredential, KmsClientCache}, + credentials::{AwsKmsCredential, KmsClientCache, SigningCredential}, error::EngineError, }; use thirdweb_core::auth::ThirdwebAuth; @@ -109,7 +109,9 @@ impl FromRequestParts for SigningCredentialsExtractor { state: &EngineServerState, ) -> Result { // Try AWS KMS credentials first (with cache) - if let Some(aws_kms) = Self::try_extract_aws_kms_with_cache(parts, state.kms_client_cache.clone())? { + if let Some(aws_kms) = + Self::try_extract_aws_kms_with_cache(parts, state.kms_client_cache.clone())? + { return Ok(SigningCredentialsExtractor(SigningCredential::AwsKms( aws_kms, ))); diff --git a/server/src/http/routes/admin/eoa_diagnostics.rs b/server/src/http/routes/admin/eoa_diagnostics.rs index 82f9ef3..bdc1834 100644 --- a/server/src/http/routes/admin/eoa_diagnostics.rs +++ b/server/src/http/routes/admin/eoa_diagnostics.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use alloy::{consensus::Transaction, primitives::Address}; use axum::{ Router, debug_handler, @@ -5,9 +7,15 @@ use axum::{ http::StatusCode, response::{IntoResponse, Json}, }; -use engine_executors::eoa::store::{EoaExecutorStore, EoaHealth, TransactionData}; +use engine_core::{credentials::SigningCredential, error::EngineError}; +use engine_executors::eoa::{ + EoaExecutorWorkerJobData, + store::{EoaExecutorStore, EoaHealth, TransactionData}, +}; use serde::{Deserialize, Serialize}; +use twmq::{Queue, redis::AsyncCommands}; +use crate::chains::ThirdwebChainService; use crate::http::{ error::ApiEngineError, extractors::DiagnosticAuthExtractor, server::EngineServerState, types::SuccessResponse, @@ -73,6 +81,12 @@ pub struct BorrowedTransactionResponse { pub borrowed_at: u64, } +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ManualResetResponse { + pub job_enqueued: bool, +} + #[derive(Debug, Deserialize)] pub struct PaginationQuery { pub offset: Option, @@ -405,7 +419,25 @@ pub async fn schedule_manual_reset( }) })?; - Ok((StatusCode::OK, Json(SuccessResponse::new(())))) + let job_enqueued = ensure_eoa_executor_job( + &state.queue_manager.eoa_executor_queue, + &store, + eoa_address, + chain_id, + ) + .await?; + + tracing::info!( + eoa = ?eoa_address, + chain_id, + job_enqueued, + "Manual reset scheduled via admin endpoint" + ); + + Ok(( + StatusCode::OK, + Json(SuccessResponse::new(ManualResetResponse { job_enqueued })), + )) } // ===== HELPER FUNCTIONS ===== @@ -431,6 +463,97 @@ fn parse_eoa_chain(eoa_chain: &str) -> Result<(String, u64), ApiEngineError> { Ok((eoa, chain_id)) } +async fn ensure_eoa_executor_job( + queue: &Arc>>, + store: &EoaExecutorStore, + eoa: Address, + chain_id: u64, +) -> Result { + let job_id = format!("eoa_{}_{}", eoa, chain_id); + + if job_exists_in_queue(queue, &job_id).await? { + return Ok(false); + } + + let signing_credential = find_signing_credential(store).await?; + + let job_data = EoaExecutorWorkerJobData { + eoa_address: eoa, + chain_id, + noop_signing_credential: signing_credential, + }; + + queue + .clone() + .job(job_data) + .with_id(&job_id) + .push() + .await + .map_err(|e| { + ApiEngineError(EngineError::InternalError { + message: format!("Failed to enqueue EOA executor job: {e}"), + }) + })?; + + Ok(true) +} + +async fn job_exists_in_queue( + queue: &Arc>>, + job_id: &str, +) -> Result { + let mut conn = queue.redis.clone(); + let dedupe_key = queue.dedupe_set_name(); + + conn.sismember(&dedupe_key, job_id).await.map_err(|e| { + ApiEngineError(EngineError::InternalError { + message: format!("Failed to query queue dedupe set: {e}"), + }) + }) +} + +async fn find_signing_credential( + store: &EoaExecutorStore, +) -> Result { + if let Some(pending) = store + .peek_pending_transactions(1) + .await + .map_err(|e| { + ApiEngineError(EngineError::InternalError { + message: format!("Failed to inspect pending transactions: {e}"), + }) + })? + .into_iter() + .next() + { + return Ok(pending.user_request.signing_credential.clone()); + } + + let borrowed_transactions = store.peek_borrowed_transactions().await.map_err(|e| { + ApiEngineError(EngineError::InternalError { + message: format!("Failed to inspect borrowed transactions: {e}"), + }) + })?; + + for borrowed in borrowed_transactions { + if let Some(tx_data) = store + .get_transaction_data(&borrowed.transaction_id) + .await + .map_err(|e| { + ApiEngineError(EngineError::InternalError { + message: format!("Failed to hydrate transaction data: {e}"), + }) + })? + { + return Ok(tx_data.user_request.signing_credential.clone()); + } + } + + Err(ApiEngineError(EngineError::ValidationError { + message: "Unable to determine signing credential for this EOA; no pending or borrowed transactions found".to_string(), + })) +} + pub fn eoa_diagnostics_router() -> Router { // Add hidden admin diagnostic routes (not included in OpenAPI) Router::new() diff --git a/server/src/http/routes/admin/metrics.rs b/server/src/http/routes/admin/metrics.rs index 3775f6e..b9b7897 100644 --- a/server/src/http/routes/admin/metrics.rs +++ b/server/src/http/routes/admin/metrics.rs @@ -7,7 +7,7 @@ use prometheus::{Encoder, TextEncoder}; use std::sync::Arc; /// Prometheus metrics endpoint -/// +/// /// Returns metrics in Prometheus text format for all executors pub async fn get_metrics( State(registry): State>, @@ -15,24 +15,26 @@ pub async fn get_metrics( let encoder = TextEncoder::new(); let metric_families = registry.gather(); let mut buffer = Vec::new(); - encoder.encode(&metric_families, &mut buffer) + encoder + .encode(&metric_families, &mut buffer) .map_err(|e| MetricsError::EncodingFailed(e.to_string()))?; - - let metrics_output = String::from_utf8(buffer) - .map_err(|e| MetricsError::Utf8Error(e.to_string()))?; - + + let metrics_output = + String::from_utf8(buffer).map_err(|e| MetricsError::Utf8Error(e.to_string()))?; + Ok(( StatusCode::OK, [("content-type", "text/plain; version=0.0.4; charset=utf-8")], metrics_output, - ).into_response()) + ) + .into_response()) } #[derive(Debug, thiserror::Error)] pub enum MetricsError { #[error("Failed to encode metrics: {0}")] EncodingFailed(String), - + #[error("UTF-8 conversion error: {0}")] Utf8Error(String), } @@ -41,11 +43,12 @@ impl IntoResponse for MetricsError { fn into_response(self) -> Response { let error_message = self.to_string(); tracing::error!("Metrics error: {}", error_message); - + ( StatusCode::INTERNAL_SERVER_ERROR, [("content-type", "text/plain")], format!("Metrics export failed: {error_message}"), - ).into_response() + ) + .into_response() } } diff --git a/server/src/http/routes/admin/mod.rs b/server/src/http/routes/admin/mod.rs index 2129d18..ca74687 100644 --- a/server/src/http/routes/admin/mod.rs +++ b/server/src/http/routes/admin/mod.rs @@ -1,3 +1,3 @@ pub mod eoa_diagnostics; pub mod metrics; -pub mod queue; \ No newline at end of file +pub mod queue; diff --git a/server/src/http/routes/sign_solana_transaction.rs b/server/src/http/routes/sign_solana_transaction.rs index 6d117f8..685618d 100644 --- a/server/src/http/routes/sign_solana_transaction.rs +++ b/server/src/http/routes/sign_solana_transaction.rs @@ -77,14 +77,11 @@ pub async fn sign_solana_transaction( let rpc_client = state.solana_rpc_cache.get_or_create(chain_id).await; // Get recent blockhash - let recent_blockhash = rpc_client - .get_latest_blockhash() - .await - .map_err(|e| { - ApiEngineError(EngineError::ValidationError { - message: format!("Failed to get recent blockhash: {}", e), - }) - })?; + let recent_blockhash = rpc_client.get_latest_blockhash().await.map_err(|e| { + ApiEngineError(EngineError::ValidationError { + message: format!("Failed to get recent blockhash: {}", e), + }) + })?; // Build the transaction let solana_tx = SolanaTransaction { @@ -113,13 +110,12 @@ pub async fn sign_solana_transaction( let signature = signed_tx.signatures[0]; // Serialize the signed transaction to base64 - let signed_tx_bytes = bincode::serde::encode_to_vec(&signed_tx, bincode_standard()).map_err( - |e| { + let signed_tx_bytes = + bincode::serde::encode_to_vec(&signed_tx, bincode_standard()).map_err(|e| { ApiEngineError(EngineError::ValidationError { message: format!("Failed to serialize signed transaction: {}", e), }) - }, - )?; + })?; let signed_tx_base64 = Base64Engine.encode(&signed_tx_bytes); let response = SignSolanaTransactionResponse { diff --git a/server/src/http/routes/solana_transaction.rs b/server/src/http/routes/solana_transaction.rs index f823945..3537e39 100644 --- a/server/src/http/routes/solana_transaction.rs +++ b/server/src/http/routes/solana_transaction.rs @@ -60,9 +60,5 @@ pub async fn send_solana_transaction( "Solana transaction queued successfully" ); - Ok(( - StatusCode::ACCEPTED, - Json(SuccessResponse::new(response)), - )) + Ok((StatusCode::ACCEPTED, Json(SuccessResponse::new(response)))) } - diff --git a/server/src/http/server.rs b/server/src/http/server.rs index 490740c..29aec53 100644 --- a/server/src/http/server.rs +++ b/server/src/http/server.rs @@ -1,7 +1,11 @@ use std::sync::Arc; use axum::{Json, Router, routing::get}; -use engine_core::{signer::{EoaSigner, SolanaSigner}, userop::UserOpSigner, credentials::KmsClientCache}; +use engine_core::{ + credentials::KmsClientCache, + signer::{EoaSigner, SolanaSigner}, + userop::UserOpSigner, +}; use engine_executors::solana_executor::rpc_cache::SolanaRpcCache; use serde_json::json; use thirdweb_core::abi::ThirdwebAbiService; diff --git a/server/src/lib.rs b/server/src/lib.rs index 238a559..0c1af00 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -7,8 +7,8 @@ pub mod queue; // Re-export commonly used types for integration tests and external usage pub use chains::ThirdwebChainService; pub use config::{ - EngineConfig, MonitoringConfig, QueueConfig, RedisConfig, ServerConfig, SolanaConfig, - SolanRpcConfigData, ThirdwebConfig, ThirdwebUrls, + EngineConfig, MonitoringConfig, QueueConfig, RedisConfig, ServerConfig, SolanRpcConfigData, + SolanaConfig, ThirdwebConfig, ThirdwebUrls, }; pub use execution_router::ExecutionRouter; pub use http::server::{EngineServer, EngineServerState}; diff --git a/thirdweb-core/src/iaw/mod.rs b/thirdweb-core/src/iaw/mod.rs index ac7489e..8181b60 100644 --- a/thirdweb-core/src/iaw/mod.rs +++ b/thirdweb-core/src/iaw/mod.rs @@ -67,15 +67,13 @@ impl From for IAWError { } /// Message format for signing operations -#[derive(Debug, Clone, Serialize, Deserialize)] -#[derive(Default)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub enum MessageFormat { #[default] Text, Hex, } - /// Response data for message signing operations #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SignMessageData { diff --git a/twmq/benches/throughput.rs b/twmq/benches/throughput.rs index 6e27def..ee2c815 100644 --- a/twmq/benches/throughput.rs +++ b/twmq/benches/throughput.rs @@ -288,9 +288,7 @@ async fn load_test_throughput( let success_rate = metrics.success_rate(); let avg_processing_time = metrics.avg_processing_time_ms(); - println!( - "Load Test Results - {jobs_per_second}jobs/s for {duration_seconds}s:" - ); + println!("Load Test Results - {jobs_per_second}jobs/s for {duration_seconds}s:"); println!(" Jobs pushed: {jobs_pushed}"); println!(" Jobs processed: {total_processed}"); println!(" Simulated success rate: {:.1}%", success_rate * 100.0); diff --git a/twmq/src/lib.rs b/twmq/src/lib.rs index 853b7fd..164d234 100644 --- a/twmq/src/lib.rs +++ b/twmq/src/lib.rs @@ -1249,7 +1249,10 @@ impl Queue { let lease_exists: bool = conn.exists(&lease_key).await?; if !lease_exists { redis::cmd("UNWATCH").query_async::<()>(&mut conn).await?; - tracing::warn!(job_id = job.job.id, "Lease no longer exists, job was cancelled or timed out"); + tracing::warn!( + job_id = job.job.id, + "Lease no longer exists, job was cancelled or timed out" + ); return Ok(()); } @@ -1275,7 +1278,10 @@ impl Queue { } Err(_) => { // WATCH failed (lease key changed), retry - tracing::debug!(job_id = job.job.id, "WATCH failed during completion, retrying"); + tracing::debug!( + job_id = job.job.id, + "WATCH failed during completion, retrying" + ); continue; } } @@ -1349,7 +1355,10 @@ impl Queue { let lease_exists: bool = conn.exists(&lease_key).await?; if !lease_exists { redis::cmd("UNWATCH").query_async::<()>(&mut conn).await?; - tracing::warn!(job_id = job.id, "Lease no longer exists, job was cancelled or timed out"); + tracing::warn!( + job_id = job.id, + "Lease no longer exists, job was cancelled or timed out" + ); return Ok(()); } @@ -1370,7 +1379,10 @@ impl Queue { } Err(_) => { // WATCH failed (lease key changed), retry - tracing::debug!(job_id = job.id, "WATCH failed during queue error completion, retrying"); + tracing::debug!( + job_id = job.id, + "WATCH failed during queue error completion, retrying" + ); continue; } } diff --git a/twmq/src/queue.rs b/twmq/src/queue.rs index 819a3d6..288b89f 100644 --- a/twmq/src/queue.rs +++ b/twmq/src/queue.rs @@ -1,7 +1,7 @@ use std::{marker::PhantomData, sync::Arc, time::Duration}; use redis::{Client, aio::ConnectionManager}; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use crate::{DurableExecution, Queue, error::TwmqError}; diff --git a/twmq/src/shutdown.rs b/twmq/src/shutdown.rs index a0ce94b..ba19255 100644 --- a/twmq/src/shutdown.rs +++ b/twmq/src/shutdown.rs @@ -63,7 +63,9 @@ impl WorkerHandle { self.queue.queue_name(), e ); - Err(TwmqError::Runtime { message: format!("Worker panic: {e}") }) + Err(TwmqError::Runtime { + message: format!("Worker panic: {e}"), + }) } } } @@ -117,7 +119,9 @@ impl ShutdownHandle { errors.push(e); } Err(e) => { - let runtime_error = TwmqError::Runtime { message: format!("Worker {i} panic: {e}") }; + let runtime_error = TwmqError::Runtime { + message: format!("Worker {i} panic: {e}"), + }; tracing::error!("Worker {} task panicked during shutdown: {:?}", i, e); errors.push(runtime_error); } @@ -156,10 +160,7 @@ impl ShutdownHandle { } /// Add multiple workers at once - pub fn add_workers( - &mut self, - workers: impl IntoIterator>, - ) { + pub fn add_workers(&mut self, workers: impl IntoIterator>) { for worker in workers { self.add_worker(worker); } diff --git a/twmq/tests/basic_hook.rs b/twmq/tests/basic_hook.rs index b1f43b2..0306ac2 100644 --- a/twmq/tests/basic_hook.rs +++ b/twmq/tests/basic_hook.rs @@ -256,9 +256,7 @@ async fn test_cross_queue_job_scheduling() { let webhook_pending = webhook_queue.count(JobStatus::Pending).await.unwrap(); let webhook_success = webhook_queue.count(JobStatus::Success).await.unwrap(); - println!( - "Webhook queue - Pending: {webhook_pending}, Success: {webhook_success}" - ); + println!("Webhook queue - Pending: {webhook_pending}, Success: {webhook_success}"); // Either the webhook job is still pending or already succeeded assert!( diff --git a/twmq/tests/delay.rs b/twmq/tests/delay.rs index ef42509..5245188 100644 --- a/twmq/tests/delay.rs +++ b/twmq/tests/delay.rs @@ -60,7 +60,10 @@ impl DurableExecution for DelayTestJobHandler { type ErrorData = TestJobErrorData; type JobData = DelayTestJobData; - async fn process(&self, job: &BorrowedJob) -> JobResult { + async fn process( + &self, + job: &BorrowedJob, + ) -> JobResult { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() diff --git a/twmq/tests/fixtures.rs b/twmq/tests/fixtures.rs index 5e26c8a..f98845a 100644 --- a/twmq/tests/fixtures.rs +++ b/twmq/tests/fixtures.rs @@ -57,7 +57,10 @@ impl DurableExecution for TestJobHandler { // If not using async_trait, the signature is: // fn process(&self) -> impl std::future::Future> + Send + Sync { - async fn process(&self, job: &BorrowedJob) -> JobResult { + async fn process( + &self, + job: &BorrowedJob, + ) -> JobResult { println!( "TEST_JOB: Processing job with id_to_check: {}", job.job.data.id_to_check diff --git a/twmq/tests/multilane_batch_pop.rs b/twmq/tests/multilane_batch_pop.rs index 450c54b..3f8c398 100644 --- a/twmq/tests/multilane_batch_pop.rs +++ b/twmq/tests/multilane_batch_pop.rs @@ -327,9 +327,7 @@ async fn multilane_test_batch_pop_distributed_jobs_across_100k_lanes() { .await .expect("First batch pop should complete within 10 seconds"); let duration1 = start.elapsed(); - println!( - "[200 jobs - 200/100k lanes] ✅ First batch pop completed in {duration1:?}" - ); + println!("[200 jobs - 200/100k lanes] ✅ First batch pop completed in {duration1:?}"); let new_lanes_count = harness.queue.lanes_count().await.unwrap(); println!( @@ -355,9 +353,7 @@ async fn multilane_test_batch_pop_distributed_jobs_across_100k_lanes() { .await .expect("Second batch pop should complete within 10 seconds"); let duration2 = start.elapsed(); - println!( - "[200 jobs - 200/100k lanes] ✅ Second batch pop completed in {duration2:?}" - ); + println!("[200 jobs - 200/100k lanes] ✅ Second batch pop completed in {duration2:?}"); let total_jobs_2: usize = result2.values().map(|jobs| jobs.len()).sum(); assert_eq!(total_jobs_2, 100, "Second batch should return 100 jobs"); diff --git a/twmq/tests/prune_race_condition.rs b/twmq/tests/prune_race_condition.rs index 1db15fd..c9898f9 100644 --- a/twmq/tests/prune_race_condition.rs +++ b/twmq/tests/prune_race_condition.rs @@ -19,7 +19,7 @@ use std::{ use serde::{Deserialize, Serialize}; use twmq::{ - DurableExecution, Queue, NackHookData, SuccessHookData, IdempotencyMode, + DurableExecution, IdempotencyMode, NackHookData, Queue, SuccessHookData, hooks::TransactionContext, job::{BorrowedJob, JobError, JobResult, RequeuePosition}, queue::QueueOptions, @@ -33,7 +33,7 @@ static SUCCESS_COUNT: AtomicU32 = AtomicU32::new(0); static NACK_COUNT: AtomicU32 = AtomicU32::new(0); static ACTIVE_WORKERS: AtomicU32 = AtomicU32::new(0); static RACE_DETECTED: AtomicBool = AtomicBool::new(false); -static PROCESS_SLOWLY: AtomicBool = AtomicBool::new(false); // Control processing speed +static PROCESS_SLOWLY: AtomicBool = AtomicBool::new(false); // Control processing speed // Job that simulates EOA executor behavior with static ID #[derive(Serialize, Deserialize, Clone, Debug)] @@ -60,7 +60,7 @@ impl DurableExecution for EoaSimulatorJobHandler { job: &BorrowedJob, ) -> JobResult { let worker_count = ACTIVE_WORKERS.fetch_add(1, Ordering::SeqCst) + 1; - + tracing::warn!( job_id = ?job.job.id, lease_token = ?job.lease_token, @@ -81,15 +81,15 @@ impl DurableExecution for EoaSimulatorJobHandler { let process_slowly = PROCESS_SLOWLY.load(Ordering::SeqCst); if process_slowly { tracing::warn!("Processing SLOWLY to keep job active..."); - tokio::time::sleep(Duration::from_secs(2)).await; // Long enough for pruning to happen + tokio::time::sleep(Duration::from_secs(2)).await; // Long enough for pruning to happen } else { tokio::time::sleep(Duration::from_millis(50)).await; } let should_nack = SHOULD_NACK.load(Ordering::SeqCst); - + ACTIVE_WORKERS.fetch_sub(1, Ordering::SeqCst); - + if should_nack { let nack_num = NACK_COUNT.fetch_add(1, Ordering::SeqCst) + 1; tracing::info!( @@ -97,7 +97,7 @@ impl DurableExecution for EoaSimulatorJobHandler { nack_count = nack_num, "Job nacking (simulating work remaining)" ); - + Err(JobError::Nack { error: TestJobErrorData { reason: format!("Work remaining (nack #{})", nack_num), @@ -112,7 +112,7 @@ impl DurableExecution for EoaSimulatorJobHandler { success_count = success_num, "Job succeeding" ); - + Ok(EoaSimulatorJobOutput { message: format!("Success #{}", success_num), success_number: success_num, @@ -177,24 +177,26 @@ async fn test_prune_race_condition_two_workers() { .try_init(); let queue_name = format!("test_two_worker_race_{}", nanoid::nanoid!(6)); - + // CRITICAL: Configure to only keep 1 successful job to trigger aggressive pruning // Use TWO workers and local_concurrency=2 to maximize race probability let queue_options = QueueOptions { - local_concurrency: 2, // Allow 2 concurrent jobs per worker - max_success: 1, // Aggressive pruning - only keep 1 success + local_concurrency: 2, // Allow 2 concurrent jobs per worker + max_success: 1, // Aggressive pruning - only keep 1 success max_failed: 10, lease_duration: Duration::from_secs(3), - idempotency_mode: IdempotencyMode::Active, // CRITICAL: allows same ID after success + idempotency_mode: IdempotencyMode::Active, // CRITICAL: allows same ID after success ..Default::default() }; tracing::warn!("=== TWO WORKER RACE TEST ==="); tracing::warn!("Queue: {}", queue_name); tracing::warn!("Max success jobs: {}", queue_options.max_success); - tracing::warn!("This test reproduces the race where pruning deletes metadata while job is active"); + tracing::warn!( + "This test reproduces the race where pruning deletes metadata while job is active" + ); tracing::warn!("Using 2 workers to enable the race condition (impossible with 1 worker)"); - + // Reset test state SHOULD_NACK.store(true, Ordering::SeqCst); SUCCESS_COUNT.store(0, Ordering::SeqCst); @@ -204,24 +206,19 @@ async fn test_prune_race_condition_two_workers() { PROCESS_SLOWLY.store(false, Ordering::SeqCst); let handler = EoaSimulatorJobHandler; - + // Create queue let queue = Arc::new( - Queue::new( - REDIS_URL, - &queue_name, - Some(queue_options), - handler, - ) - .await - .expect("Failed to create queue"), + Queue::new(REDIS_URL, &queue_name, Some(queue_options), handler) + .await + .expect("Failed to create queue"), ); cleanup_redis_keys(&queue.redis, &queue_name).await; // Static job ID (simulating eoa_chainId pattern from production) let static_job_id = "eoa_0x1234_137"; - + let job_payload = EoaSimulatorJobPayload { eoa: "0x1234".to_string(), chain_id: 137, @@ -241,14 +238,14 @@ async fn test_prune_race_condition_two_workers() { // Start TWO workers to enable the race condition let worker1 = queue.work(); let worker2 = queue.work(); - + tracing::warn!("Two workers started!"); // PHASE 1: Create one successful job to fill success list tracing::warn!("=== PHASE 1: Creating initial success ==="); SHOULD_NACK.store(false, Ordering::SeqCst); PROCESS_SLOWLY.store(false, Ordering::SeqCst); - + // Wait for first success for _ in 0..50 { if SUCCESS_COUNT.load(Ordering::SeqCst) > 0 { @@ -256,7 +253,7 @@ async fn test_prune_race_condition_two_workers() { } tokio::time::sleep(Duration::from_millis(50)).await; } - + let first_success = SUCCESS_COUNT.load(Ordering::SeqCst); tracing::warn!("First success completed: {}", first_success); assert!(first_success > 0, "Should have at least one success"); @@ -265,18 +262,22 @@ async fn test_prune_race_condition_two_workers() { // The race window is: job completes → moves to success → pruning runs // We need a new job with same ID to become active right in that window tracing::warn!("=== PHASE 2: Running iterations to catch the pruning race ==="); - + let mut iteration = 0; let max_iterations = 10000; - + while iteration < max_iterations { iteration += 1; - + if iteration % 100 == 0 { - tracing::info!("Iteration {}/{}, race detected: {}", - iteration, max_iterations, RACE_DETECTED.load(Ordering::SeqCst)); + tracing::info!( + "Iteration {}/{}, race detected: {}", + iteration, + max_iterations, + RACE_DETECTED.load(Ordering::SeqCst) + ); } - + // Queue a new job with same ID queue .clone() @@ -285,21 +286,21 @@ async fn test_prune_race_condition_two_workers() { .push() .await .ok(); // May fail if already queued - + // Very short wait to let the job process tokio::time::sleep(Duration::from_millis(5)).await; - + // Check if we caught the race if RACE_DETECTED.load(Ordering::SeqCst) { tracing::error!("🎯 RACE CAUGHT AT ITERATION {}!", iteration); break; } } - + let race_detected = RACE_DETECTED.load(Ordering::SeqCst); - + let final_race_detected = RACE_DETECTED.load(Ordering::SeqCst); - + tracing::warn!("=== RESULTS ==="); tracing::warn!("Race detected: {}", final_race_detected); tracing::warn!("Total successes: {}", SUCCESS_COUNT.load(Ordering::SeqCst)); @@ -310,15 +311,24 @@ async fn test_prune_race_condition_two_workers() { cleanup_redis_keys(&queue.redis, &queue_name).await; if final_race_detected { - tracing::error!("⚠️ UNEXPECTED: Race condition detected after {} iterations!", iteration); + tracing::error!( + "⚠️ UNEXPECTED: Race condition detected after {} iterations!", + iteration + ); tracing::error!(" This should NOT happen with the fix in place!"); tracing::error!(" The fix should prevent pruning from deleting active job metadata."); panic!("Race condition detected - the fix is not working properly!"); } else { - tracing::info!("✅ SUCCESS: No race detected after {} iterations", iteration); + tracing::info!( + "✅ SUCCESS: No race detected after {} iterations", + iteration + ); tracing::info!(" The fix is working correctly!"); - tracing::info!(" Pruning now checks if a job is active/pending/delayed before deleting metadata."); - tracing::info!(" This prevents the race where Worker 1's pruning deletes Worker 2's active job metadata."); + tracing::info!( + " Pruning now checks if a job is active/pending/delayed before deleting metadata." + ); + tracing::info!( + " This prevents the race where Worker 1's pruning deletes Worker 2's active job metadata." + ); } } - diff --git a/twmq/tests/prune_race_random_ids.rs b/twmq/tests/prune_race_random_ids.rs index 21f436e..fd5e9ea 100644 --- a/twmq/tests/prune_race_random_ids.rs +++ b/twmq/tests/prune_race_random_ids.rs @@ -4,7 +4,7 @@ mod fixtures; use fixtures::TestJobErrorData; -use redis::{aio::ConnectionManager, AsyncCommands}; +use redis::{AsyncCommands, aio::ConnectionManager}; use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; use std::{ @@ -17,7 +17,7 @@ use std::{ use serde::{Deserialize, Serialize}; use twmq::{ - DurableExecution, Queue, NackHookData, SuccessHookData, IdempotencyMode, + DurableExecution, IdempotencyMode, NackHookData, Queue, SuccessHookData, hooks::TransactionContext, job::{BorrowedJob, JobError, JobResult, RequeuePosition}, queue::QueueOptions, @@ -64,7 +64,7 @@ impl DurableExecution for RandomJobHandler { tokio::time::sleep(Duration::from_millis(50)).await; let should_nack = SHOULD_NACK.load(Ordering::SeqCst); - + if should_nack { let nack_num = NACK_COUNT.fetch_add(1, Ordering::SeqCst) + 1; tracing::debug!( @@ -72,7 +72,7 @@ impl DurableExecution for RandomJobHandler { nack_count = nack_num, "Job nacking" ); - + Err(JobError::Nack { error: TestJobErrorData { reason: format!("Work remaining (nack #{})", nack_num), @@ -87,7 +87,7 @@ impl DurableExecution for RandomJobHandler { success_count = success_num, "Job succeeding" ); - + Ok(RandomJobOutput { message: format!("Success #{}", success_num), success_number: success_num, @@ -152,11 +152,11 @@ async fn test_prune_with_random_ids() { .try_init(); let queue_name = format!("test_random_ids_{}", nanoid::nanoid!(6)); - + // Aggressive pruning settings - only keep 1 successful job let queue_options = QueueOptions { local_concurrency: 2, - max_success: 1, // Aggressive pruning + max_success: 1, // Aggressive pruning max_failed: 10, lease_duration: Duration::from_secs(3), idempotency_mode: IdempotencyMode::Active, @@ -167,24 +167,19 @@ async fn test_prune_with_random_ids() { tracing::info!("Queue: {}", queue_name); tracing::info!("Max success jobs: {}", queue_options.max_success); tracing::info!("Testing pruning behavior with unique random job IDs"); - + // Reset test state - SHOULD_NACK.store(false, Ordering::SeqCst); // Start with successes + SHOULD_NACK.store(false, Ordering::SeqCst); // Start with successes SUCCESS_COUNT.store(0, Ordering::SeqCst); NACK_COUNT.store(0, Ordering::SeqCst); let handler = RandomJobHandler; - + // Create queue let queue = Arc::new( - Queue::new( - REDIS_URL, - &queue_name, - Some(queue_options), - handler, - ) - .await - .expect("Failed to create queue"), + Queue::new(REDIS_URL, &queue_name, Some(queue_options), handler) + .await + .expect("Failed to create queue"), ); cleanup_redis_keys(&queue.redis, &queue_name).await; @@ -192,13 +187,13 @@ async fn test_prune_with_random_ids() { // Start two workers let worker1 = queue.work(); let worker2 = queue.work(); - + tracing::info!("Two workers started!"); // Push jobs with random IDs and let them succeed for i in 0..100 { - let random_job_id = nanoid::nanoid!(16); // Random unique ID - + let random_job_id = nanoid::nanoid!(16); // Random unique ID + let job_payload = RandomJobPayload { eoa: format!("0x{}", nanoid::nanoid!(8)), chain_id: 137, @@ -212,15 +207,15 @@ async fn test_prune_with_random_ids() { .push() .await .expect("Failed to push job"); - + if i % 10 == 0 { tracing::info!("Pushed {} jobs", i + 1); } - + // Small delay between pushes tokio::time::sleep(Duration::from_millis(10)).await; } - + // Wait for jobs to complete tracing::info!("Waiting for jobs to complete..."); for _ in 0..100 { @@ -230,19 +225,19 @@ async fn test_prune_with_random_ids() { } tokio::time::sleep(Duration::from_millis(100)).await; } - + let final_success = SUCCESS_COUNT.load(Ordering::SeqCst); - + // Check Redis directly to see if pruning occurred let mut conn = queue.redis.clone(); let success_list_len: usize = conn.llen(queue.success_list_name()).await.unwrap(); let pending_list_len: usize = conn.llen(queue.pending_list_name()).await.unwrap(); let delayed_zset_len: usize = conn.zcard(queue.delayed_zset_name()).await.unwrap(); let active_hash_len: usize = conn.hlen(queue.active_hash_name()).await.unwrap(); - + // Get actual job IDs in success list let success_job_ids: Vec = conn.lrange(queue.success_list_name(), 0, -1).await.unwrap(); - + // Count how many job metadata hashes still exist (should match success list length if pruning works) let meta_pattern = format!("twmq:{}:job:*:meta", queue.name()); let meta_keys: Vec = redis::cmd("KEYS") @@ -251,12 +246,15 @@ async fn test_prune_with_random_ids() { .await .unwrap_or_default(); let metadata_count = meta_keys.len(); - + // Count job data entries let job_data_count: usize = conn.hlen(queue.job_data_hash_name()).await.unwrap(); - + // Get what's in pending/delayed/active to understand why pruning might be blocked - let pending_jobs: Vec = conn.lrange(queue.pending_list_name(), 0, -1).await.unwrap_or_default(); + let pending_jobs: Vec = conn + .lrange(queue.pending_list_name(), 0, -1) + .await + .unwrap_or_default(); let delayed_jobs: Vec = redis::cmd("ZRANGE") .arg(queue.delayed_zset_name()) .arg(0) @@ -264,18 +262,45 @@ async fn test_prune_with_random_ids() { .query_async(&mut conn) .await .unwrap_or_default(); - let active_jobs: Vec = conn.hkeys(queue.active_hash_name()).await.unwrap_or_default(); - + let active_jobs: Vec = conn + .hkeys(queue.active_hash_name()) + .await + .unwrap_or_default(); + // Get debug info from Lua script - let debug_candidates: String = conn.get("debug:candidates_count").await.unwrap_or_else(|_| "N/A".to_string()); - let debug_active_count: String = conn.get("debug:active_count").await.unwrap_or_else(|_| "N/A".to_string()); - let debug_delayed_count: String = conn.get("debug:delayed_count").await.unwrap_or_else(|_| "N/A".to_string()); - let debug_blocked_job: String = conn.get("debug:last_blocked_job_id").await.unwrap_or_else(|_| "N/A".to_string()); - let debug_hexists: String = conn.get("debug:last_hexists_result").await.unwrap_or_else(|_| "N/A".to_string()); - let debug_zscore: String = conn.get("debug:last_zscore_result").await.unwrap_or_else(|_| "N/A".to_string()); - let debug_is_active: String = conn.get("debug:last_is_active").await.unwrap_or_else(|_| "N/A".to_string()); - let debug_is_delayed: String = conn.get("debug:last_is_delayed").await.unwrap_or_else(|_| "N/A".to_string()); - + let debug_candidates: String = conn + .get("debug:candidates_count") + .await + .unwrap_or_else(|_| "N/A".to_string()); + let debug_active_count: String = conn + .get("debug:active_count") + .await + .unwrap_or_else(|_| "N/A".to_string()); + let debug_delayed_count: String = conn + .get("debug:delayed_count") + .await + .unwrap_or_else(|_| "N/A".to_string()); + let debug_blocked_job: String = conn + .get("debug:last_blocked_job_id") + .await + .unwrap_or_else(|_| "N/A".to_string()); + let debug_hexists: String = conn + .get("debug:last_hexists_result") + .await + .unwrap_or_else(|_| "N/A".to_string()); + let debug_zscore: String = conn + .get("debug:last_zscore_result") + .await + .unwrap_or_else(|_| "N/A".to_string()); + let debug_is_active: String = conn + .get("debug:last_is_active") + .await + .unwrap_or_else(|_| "N/A".to_string()); + let debug_is_delayed: String = conn + .get("debug:last_is_delayed") + .await + .unwrap_or_else(|_| "N/A".to_string()); + tracing::info!("=== RESULTS ==="); tracing::info!("Total successes: {}", final_success); tracing::info!("Total nacks: {}", NACK_COUNT.load(Ordering::SeqCst)); @@ -305,25 +330,34 @@ async fn test_prune_with_random_ids() { tracing::info!(" is_delayed (ZSCORE~=nil): {}", debug_is_delayed); tracing::info!(""); tracing::info!("Max success setting: {}", queue.options.max_success); - + if success_list_len <= queue.options.max_success { tracing::info!("✅ List pruning is working - success list is within max_success limit"); } else { - tracing::warn!("⚠️ Success list ({}) exceeds max_success ({})", - success_list_len, queue.options.max_success); + tracing::warn!( + "⚠️ Success list ({}) exceeds max_success ({})", + success_list_len, + queue.options.max_success + ); tracing::warn!(" This might indicate list pruning is not working correctly"); } - + if metadata_count == success_list_len { tracing::info!("✅ Metadata cleanup is working - metadata count matches list length"); } else { tracing::warn!("⚠️ Metadata leak detected!"); - tracing::warn!(" Job metadata hashes: {}, Success list length: {}", metadata_count, success_list_len); - tracing::warn!(" {} job metadata entries were not cleaned up", metadata_count.saturating_sub(success_list_len)); + tracing::warn!( + " Job metadata hashes: {}, Success list length: {}", + metadata_count, + success_list_len + ); + tracing::warn!( + " {} job metadata entries were not cleaned up", + metadata_count.saturating_sub(success_list_len) + ); } worker1.shutdown().await.unwrap(); worker2.shutdown().await.unwrap(); cleanup_redis_keys(&queue.redis, &queue_name).await; } -