diff --git a/core/src/rpc_clients/bundler.rs b/core/src/rpc_clients/bundler.rs index 7a1a3a2..004db70 100644 --- a/core/src/rpc_clients/bundler.rs +++ b/core/src/rpc_clients/bundler.rs @@ -71,11 +71,18 @@ pub struct TwExecuteResponse { } /// Response from tw_getTransactionHash bundler method +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase", tag = "status")] +pub enum TwGetTransactionHashResponse { + Pending, + Success { transaction_hash: String }, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct TwGetTransactionHashResponse { - /// The transaction hash - pub transaction_hash: Option, +pub enum TwGetTransactionHashStatus { + Pending, + Success, } impl BundlerClient { @@ -152,12 +159,12 @@ impl BundlerClient { pub async fn tw_get_transaction_hash( &self, transaction_id: &str, - ) -> TransportResult> { + ) -> TransportResult { let params = serde_json::json!([transaction_id]); let response: TwGetTransactionHashResponse = self.inner.request("tw_getTransactionHash", params).await?; - Ok(response.transaction_hash) + Ok(response) } } diff --git a/core/src/rpc_clients/transport.rs b/core/src/rpc_clients/transport.rs index ec38eb5..508adba 100644 --- a/core/src/rpc_clients/transport.rs +++ b/core/src/rpc_clients/transport.rs @@ -64,12 +64,12 @@ impl HeaderInjectingTransport { .map_err(TransportErrorKind::custom)?; let status = resp.status(); - debug!(%status, "received response from server"); + debug!(?status, "received response from server"); // Get response body let body = resp.bytes().await.map_err(TransportErrorKind::custom)?; debug!(bytes = body.len(), "retrieved response body"); - trace!(body = %String::from_utf8_lossy(&body), "response body"); + trace!(body = ?String::from_utf8_lossy(&body), "response body"); // Check for HTTP errors if !status.is_success() { @@ -100,7 +100,7 @@ impl Service for HeaderInjectingTransport { #[inline] fn call(&mut self, req: RequestPacket) -> Self::Future { let this = self.clone(); // Clone is cheap - just clones the Arc inside Client - let span = debug_span!("HeaderInjectingTransport", url = %this.url); + let span = debug_span!("HeaderInjectingTransport", url = ?this.url); Box::pin(this.do_request(req).instrument(span)) } } diff --git a/executors/src/eip7702_executor/confirm.rs b/executors/src/eip7702_executor/confirm.rs index aaeebf7..70ae221 100644 --- a/executors/src/eip7702_executor/confirm.rs +++ b/executors/src/eip7702_executor/confirm.rs @@ -2,6 +2,7 @@ use alloy::primitives::{Address, TxHash}; use alloy::providers::Provider; use alloy::rpc::types::TransactionReceipt; use engine_core::error::{AlloyRpcErrorToEngineError, EngineError}; +use engine_core::rpc_clients::TwGetTransactionHashResponse; use engine_core::{ chain::{Chain, ChainService, RpcCredentials}, execution_options::WebhookOptions, @@ -31,11 +32,7 @@ pub struct Eip7702ConfirmationJobData { pub transaction_id: String, pub chain_id: u64, pub bundler_transaction_id: String, - /// ! Deprecated todo: remove this field after all jobs are processed - pub eoa_address: Option
, - - // TODO: make non-optional after all jobs are processed - pub sender_details: Option, + pub sender_details: Eip7702Sender, pub rpc_credentials: RpcCredentials, #[serde(default)] @@ -189,7 +186,7 @@ where let chain = chain.with_new_default_headers(chain_auth_headers); // 2. Get transaction hash from bundler - let transaction_hash_str = chain + let transaction_hash_res = chain .bundler_client() .tw_get_transaction_hash(&job_data.bundler_transaction_id) .await @@ -198,16 +195,19 @@ where }) .map_err_fail()?; - let transaction_hash = match transaction_hash_str { - Some(hash) => hash.parse::().map_err(|e| { - Eip7702ConfirmationError::TransactionHashError { - message: format!("Invalid transaction hash format: {}", e), - } - .fail() - })?, - None => { + let transaction_hash = match transaction_hash_res { + TwGetTransactionHashResponse::Success { transaction_hash } => { + transaction_hash.parse::().map_err(|e| { + Eip7702ConfirmationError::TransactionHashError { + message: format!("Invalid transaction hash format: {}", e), + } + .fail() + })? + } + + TwGetTransactionHashResponse::Pending => { return Err(Eip7702ConfirmationError::TransactionHashError { - message: "Transaction not found".to_string(), + message: "Transaction not yet confirmed".to_string(), }) .map_err_nack(Some(Duration::from_secs(2)), RequeuePosition::Last); } @@ -262,25 +262,11 @@ where "Transaction confirmed successfully" ); - // todo: remove this after all jobs are processed - let sender_details = job_data - .sender_details - .clone() - .or_else(|| { - job_data - .eoa_address - .map(|eoa_address| Eip7702Sender::Owner { eoa_address }) - }) - .ok_or_else(|| Eip7702ConfirmationError::InternalError { - message: "No sender details found".to_string(), - }) - .map_err_fail()?; - Ok(Eip7702ConfirmationResult { transaction_id: job_data.transaction_id.clone(), transaction_hash, receipt, - sender_details, + sender_details: job_data.sender_details.clone(), }) } diff --git a/executors/src/eip7702_executor/send.rs b/executors/src/eip7702_executor/send.rs index c7f5283..3f106cb 100644 --- a/executors/src/eip7702_executor/send.rs +++ b/executors/src/eip7702_executor/send.rs @@ -38,19 +38,7 @@ pub struct Eip7702SendJobData { pub transaction_id: String, pub chain_id: u64, pub transactions: Vec, - - // !IMPORTANT TODO - // To preserve backwards compatibility with pre-existing queued jobs, we continue keeping the eoa_address field until the next release - // However, we make it optional now, and rely on the Eip7702ExecutionOptions instead - pub eoa_address: Option
, - - // We must also keep the execution_options as optional to prevent deserialization errors - // when we remove the eoa_address field, we can make execution_options required - // at runtime we resolve from both, with preference to execution_options - // if both are none, we return an error - #[serde(skip_serializing_if = "Option::is_none")] - pub execution_options: Option, - + pub execution_options: Eip7702ExecutionOptions, pub signing_credential: SigningCredential, #[serde(default)] pub webhook_options: Vec, @@ -208,24 +196,17 @@ where let chain = chain.with_new_default_headers(chain_auth_headers); - let owner_address = job_data - .eoa_address - .or(job_data.execution_options.as_ref().map(|e| match e { - Eip7702ExecutionOptions::Owner(o) => o.from, - Eip7702ExecutionOptions::SessionKey(s) => s.session_key_address, - })) - .ok_or(Eip7702SendError::InternalError { - message: "No owner address found".to_string(), - }) - .map_err_fail()?; + let owner_address = match &job_data.execution_options { + Eip7702ExecutionOptions::Owner(o) => o.from, + Eip7702ExecutionOptions::SessionKey(s) => s.session_key_address, + }; let account = DelegatedAccount::new(owner_address, chain); - let session_key_target_address = - job_data.execution_options.as_ref().and_then(|e| match e { - Eip7702ExecutionOptions::Owner(_) => None, - Eip7702ExecutionOptions::SessionKey(s) => Some(s.account_address), - }); + let session_key_target_address = match &job_data.execution_options { + Eip7702ExecutionOptions::Owner(_) => None, + Eip7702ExecutionOptions::SessionKey(s) => Some(s.account_address), + }; let transactions = match session_key_target_address { Some(target_address) => { @@ -343,8 +324,7 @@ where transaction_id: job.job.data.transaction_id.clone(), chain_id: job.job.data.chain_id, bundler_transaction_id: success_data.result.transaction_id.clone(), - eoa_address: None, - sender_details: Some(success_data.result.sender_details.clone()), + sender_details: success_data.result.sender_details.clone(), rpc_credentials: job.job.data.rpc_credentials.clone(), webhook_options: job.job.data.webhook_options.clone(), }) diff --git a/executors/src/eoa/store/atomic.rs b/executors/src/eoa/store/atomic.rs index e054be6..6c3e672 100644 --- a/executors/src/eoa/store/atomic.rs +++ b/executors/src/eoa/store/atomic.rs @@ -105,9 +105,9 @@ impl AtomicEoaExecutorStore { { Ok(()) => { tracing::debug!( - eoa = %self.eoa(), - chain_id = %self.chain_id(), - worker_id = %self.worker_id(), + eoa = ?self.eoa(), + chain_id = self.chain_id(), + worker_id = self.worker_id(), "Successfully released EOA lock" ); Ok(self.store) @@ -115,9 +115,9 @@ impl AtomicEoaExecutorStore { Err(TransactionStoreError::LockLost { .. }) => { // Lock was already taken over, which is fine for release tracing::debug!( - eoa = %self.eoa(), - chain_id = %self.chain_id(), - worker_id = %self.worker_id(), + eoa = ?self.eoa(), + chain_id = self.chain_id(), + worker_id = self.worker_id(), "Lock already released or taken over by another worker" ); Ok(self.store) @@ -125,10 +125,10 @@ impl AtomicEoaExecutorStore { Err(e) => { // Other errors shouldn't fail the worker, just log tracing::warn!( - eoa = %self.eoa(), - chain_id = %self.chain_id(), - worker_id = %self.worker_id(), - error = %e, + eoa = ?self.eoa(), + chain_id = self.chain_id(), + worker_id = self.worker_id(), + error = ?e, "Failed to release EOA lock" ); Ok(self.store) @@ -197,7 +197,7 @@ impl AtomicEoaExecutorStore { tracing::debug!( retry_count = retry_count, delay_ms = delay_ms, - eoa = %self.eoa(), + eoa = ?self.eoa(), chain_id = self.chain_id(), "Retrying lock check operation" ); @@ -312,7 +312,7 @@ impl AtomicEoaExecutorStore { tracing::debug!( retry_count = retry_count, delay_ms = delay_ms, - eoa = %self.eoa, + eoa = ?self.eoa, chain_id = self.chain_id, operation = safe_tx.name(), "Retrying atomic operation" diff --git a/executors/src/eoa/store/borrowed.rs b/executors/src/eoa/store/borrowed.rs index 7408ec7..98c73cc 100644 --- a/executors/src/eoa/store/borrowed.rs +++ b/executors/src/eoa/store/borrowed.rs @@ -81,8 +81,8 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { valid_results.push(result.clone()); } else { tracing::warn!( - transaction_id = %transaction_id, - nonce = %result.transaction.nonce, + transaction_id = ?transaction_id, + nonce = result.transaction.nonce, "Submission result not found in borrowed state, ignoring" ); } @@ -174,6 +174,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { // Update transaction data status let tx_data_key = self.keys.transaction_data_key_name(transaction_id); pipeline.hset(&tx_data_key, "status", "pending"); + pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce); // Queue webhook event using user_request from SubmissionResult let event = EoaExecutorEvent { @@ -206,6 +207,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { pipeline.hset(&tx_data_key, "status", "failed"); pipeline.hset(&tx_data_key, "completed_at", now); pipeline.hset(&tx_data_key, "failure_reason", err.to_string()); + pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce); // Queue webhook event using user_request from SubmissionResult let event = EoaExecutorEvent { diff --git a/executors/src/eoa/store/mod.rs b/executors/src/eoa/store/mod.rs index c17da80..98b97da 100644 --- a/executors/src/eoa/store/mod.rs +++ b/executors/src/eoa/store/mod.rs @@ -404,9 +404,9 @@ impl EoaExecutorStore { } // Lock exists, forcefully take it over tracing::warn!( - eoa = %self.eoa, - chain_id = %self.chain_id, - worker_id = %worker_id, + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = worker_id, "Forcefully taking over EOA lock from stalled worker" ); // Force set - no expiry, only released by explicit takeover @@ -504,13 +504,32 @@ impl EoaExecutorStore { &self, limit: u64, ) -> Result, TransactionStoreError> { + self.peek_pending_transactions_paginated(0, limit).await + } + + /// Peek at pending transactions with pagination support + pub async fn peek_pending_transactions_paginated( + &self, + offset: u64, + limit: u64, + ) -> Result, TransactionStoreError> { + if limit == 0 { + return Ok(Vec::new()); + } + let pending_key = self.pending_transactions_zset_name(); let mut conn = self.redis.clone(); - // Use ZRANGE to peek without removing - let transaction_ids: Vec = conn - .zrange_withscores(&pending_key, 0, (limit - 1) as isize) - .await?; + // Use ZRANGE to peek without removing, with offset support + let start = offset as isize; + let stop = (offset + limit - 1) as isize; + + let transaction_ids: Vec = + conn.zrange_withscores(&pending_key, start, stop).await?; + + if transaction_ids.is_empty() { + return Ok(Vec::new()); + } let mut pipe = twmq::redis::pipe(); diff --git a/executors/src/eoa/store/submitted.rs b/executors/src/eoa/store/submitted.rs index 3f3229e..92514ab 100644 --- a/executors/src/eoa/store/submitted.rs +++ b/executors/src/eoa/store/submitted.rs @@ -475,6 +475,8 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> { .zrange(self.keys.recycled_nonces_zset_name(), 0, -1) .await?; + // filter out nonces that are higher than the highest submitted nonce + // these don't need to be recycled, they'll be used up by incrementing the nonce let recycled_nonces = recycled_nonces .into_iter() .filter(|nonce| *nonce < highest_submitted_nonce) diff --git a/executors/src/eoa/worker/confirm.rs b/executors/src/eoa/worker/confirm.rs index 57579a1..c961e6e 100644 --- a/executors/src/eoa/worker/confirm.rs +++ b/executors/src/eoa/worker/confirm.rs @@ -80,7 +80,7 @@ impl EoaExecutorWorker { .await { tracing::warn!( - error = %e, + error = ?e, "Failed to attempt gas bump for stalled nonce" ); } @@ -122,9 +122,9 @@ impl EoaExecutorWorker { Ok(receipt_json) => receipt_json, Err(e) => { tracing::warn!( - transaction_id = %tx.transaction_id, - hash = %tx.transaction_hash, - error = %e, + transaction_id = ?tx.transaction_id, + hash = tx.transaction_hash, + error = ?e, "Failed to serialize receipt as JSON, using debug format" ); format!("{:?}", tx.receipt) @@ -132,9 +132,9 @@ impl EoaExecutorWorker { }; tracing::info!( - transaction_id = %tx.transaction_id, + transaction_id = ?tx.transaction_id, nonce = tx.nonce, - hash = %tx.transaction_hash, + hash = tx.transaction_hash, "Transaction confirmed" ); @@ -262,7 +262,7 @@ impl EoaExecutorWorker { if let Some((transaction_id, tx_data)) = newest_transaction { tracing::info!( - transaction_id = %transaction_id, + transaction_id = ?transaction_id, nonce = expected_nonce, "Found newest transaction for gas bump" ); @@ -294,9 +294,9 @@ impl EoaExecutorWorker { } tracing::warn!( - transaction_id = %transaction_id, + transaction_id = ?transaction_id, nonce = expected_nonce, - error = %e, + error = ?e, "Failed to build typed transaction for gas bump" ); return Ok(false); @@ -310,9 +310,9 @@ impl EoaExecutorWorker { Ok(tx) => tx, Err(e) => { tracing::warn!( - transaction_id = %transaction_id, + transaction_id = ?transaction_id, nonce = expected_nonce, - error = %e, + error = ?e, "Failed to sign transaction for gas bump" ); return Ok(false); @@ -337,7 +337,7 @@ impl EoaExecutorWorker { match self.chain.provider().send_tx_envelope(tx_envelope).await { Ok(_) => { tracing::info!( - transaction_id = %transaction_id, + transaction_id = ?transaction_id, nonce = expected_nonce, "Successfully sent gas bumped transaction" ); @@ -345,9 +345,9 @@ impl EoaExecutorWorker { } Err(e) => { tracing::warn!( - transaction_id = %transaction_id, + transaction_id = ?transaction_id, nonce = expected_nonce, - error = %e, + error = ?e, "Failed to send gas bumped transaction" ); // Don't fail the worker, just log the error diff --git a/executors/src/eoa/worker/error.rs b/executors/src/eoa/worker/error.rs index cc54363..9420831 100644 --- a/executors/src/eoa/worker/error.rs +++ b/executors/src/eoa/worker/error.rs @@ -150,7 +150,7 @@ pub enum SendContext { InitialBroadcast, } -#[tracing::instrument(skip_all, fields(error = %error, context = ?context))] +#[tracing::instrument(skip_all, fields(error = ?error, context = ?context))] pub fn classify_send_error( error: &RpcError, context: SendContext, diff --git a/executors/src/eoa/worker/mod.rs b/executors/src/eoa/worker/mod.rs index 68de7ce..2ecec52 100644 --- a/executors/src/eoa/worker/mod.rs +++ b/executors/src/eoa/worker/mod.rs @@ -127,7 +127,7 @@ where type ErrorData = EoaExecutorWorkerError; type JobData = EoaExecutorWorkerJobData; - #[tracing::instrument(skip_all, fields(eoa = %job.job.data.eoa_address, chain_id = job.job.data.chain_id))] + #[tracing::instrument(name = "eoa_executor_worker", skip_all, fields(eoa = ?job.job.data.eoa_address, chain_id = job.job.data.chain_id))] async fn process( &self, job: &BorrowedJob, @@ -170,7 +170,7 @@ where let result = worker.execute_main_workflow().await?; if let Err(e) = worker.release_eoa_lock().await { - tracing::error!("Error releasing EOA lock: {}", e); + tracing::error!(error = ?e, "Error releasing EOA lock"); } if result.is_work_remaining() { @@ -232,9 +232,9 @@ where let mut conn = self.redis.clone(); if let Err(e) = conn.del::<&str, ()>(&lock_key).await { tracing::error!( - eoa = %job_data.eoa_address, - chain_id = %job_data.chain_id, - error = %e, + eoa = ?job_data.eoa_address, + chain_id = job_data.chain_id, + error = ?e, "Failed to release EOA lock" ); } @@ -276,7 +276,7 @@ impl EoaExecutorWorker { .confirm_flow() .await .map_err(|e| { - tracing::error!("Error in confirm flow: {}", e); + tracing::error!(error = ?e, "Error in confirm flow"); e }) .map_err(|e| e.handle())?; @@ -286,7 +286,7 @@ impl EoaExecutorWorker { .send_flow() .await .map_err(|e| { - tracing::error!("Error in send_flow: {}", e); + tracing::error!(error = ?e, "Error in send_flow"); e }) .map_err(|e| e.handle())?; @@ -297,7 +297,7 @@ impl EoaExecutorWorker { .peek_pending_transactions(1000) .await .map_err(|e| { - tracing::error!("Error in peek_pending_transactions: {}", e); + tracing::error!(error = ?e, "Error in peek_pending_transactions"); e }) .map_err(|e| Into::::into(e).handle())? @@ -308,7 +308,7 @@ impl EoaExecutorWorker { .peek_borrowed_transactions() .await .map_err(|e| { - tracing::error!("Error in peek_borrowed_transactions: {}", e); + tracing::error!(error = ?e, "Error in peek_borrowed_transactions"); e }) .map_err(|e| Into::::into(e).handle())? @@ -319,7 +319,7 @@ impl EoaExecutorWorker { .peek_recycled_nonces() .await .map_err(|e| { - tracing::error!("Error in peek_recycled_nonces: {}", e); + tracing::error!(error = ?e, "Error in peek_recycled_nonces"); e }) .map_err(|e| Into::::into(e).handle())? @@ -330,7 +330,7 @@ impl EoaExecutorWorker { .get_submitted_transactions_count() .await .map_err(|e| { - tracing::error!("Error in get_submitted_transactions_count: {}", e); + tracing::error!(error = ?e, "Error in get_submitted_transactions_count"); e }) .map_err(|e| Into::::into(e).handle())?; @@ -376,7 +376,7 @@ impl EoaExecutorWorker { let transaction_id = borrowed.transaction_id.clone(); tracing::info!( - transaction_id = %transaction_id, + transaction_id = ?transaction_id, nonce = nonce, "Recovering borrowed transaction" ); @@ -475,7 +475,7 @@ impl EoaExecutorWorker { } } - #[tracing::instrument(skip_all, fields(eoa = %self.eoa, chain_id = %self.chain.chain_id()))] + #[tracing::instrument(skip_all, fields(eoa = ?self.eoa, chain_id = self.chain.chain_id()))] async fn update_balance_threshold(&self) -> Result<(), EoaExecutorWorkerError> { let mut health = self.get_eoa_health().await?; diff --git a/executors/src/eoa/worker/send.rs b/executors/src/eoa/worker/send.rs index 19b7b88..ea2c4e1 100644 --- a/executors/src/eoa/worker/send.rs +++ b/executors/src/eoa/worker/send.rs @@ -118,11 +118,12 @@ impl EoaExecutorWorker { let prepared_results = futures::future::join_all(build_tasks).await; let prepared_results_with_pending = pending_txs .iter() + .take(prepared_results.len()) .zip(prepared_results.into_iter()) .collect::>(); let cleaned_results = self - .clean_prepration_results(prepared_results_with_pending) + .clean_prepration_results(prepared_results_with_pending, false) .await?; if cleaned_results.is_empty() { @@ -217,23 +218,40 @@ impl EoaExecutorWorker { Ok(total_sent as u32) } + /// Clean preparation results to only contain successful transactions. + /// + /// If `should_break_on_failure` is true, the function will break on the first failure. + /// + /// Otherwise, it will continue to process all transactions. + /// + /// `should_break_on_failure` is used to handle incremented nonce processing + /// where we want to break on the first failure to maintain nonce continuity. + /// + /// Regardless of break condition, all errors are still processed for non-retryable errors, and cleaned up async fn clean_prepration_results( &self, results: Vec<( &PendingTransaction, Result, )>, + should_break_on_failure: bool, ) -> Result, EoaExecutorWorkerError> { let mut cleaned_results = Vec::new(); let mut balance_threshold_update_needed = false; + let mut failure_occurred = false; for (pending, result) in results.into_iter() { - match result { - Ok(borrowed_data) => { + match (failure_occurred, result) { + (false, Ok(borrowed_data)) => { cleaned_results.push(borrowed_data); } - Err(e) => { + (_, Err(e)) => { // Track balance threshold issues + + if should_break_on_failure { + failure_occurred = true; + } + if let EoaExecutorWorkerError::TransactionSimulationFailed { inner_error, .. } = &e @@ -249,17 +267,23 @@ impl EoaExecutorWorker { // For deterministic build failures, fail the transaction immediately if !is_retryable_preparation_error(&e) { + tracing::error!( + error = ?e, + transaction_id = pending.transaction_id, + "Transaction permanently failed due to non-retryable preparation error", + ); self.store .fail_pending_transaction(pending, e, self.webhook_queue.clone()) .await?; } } + (true, Ok(_)) => continue, } } if balance_threshold_update_needed { if let Err(e) = self.update_balance_threshold().await { - tracing::error!("Failed to update balance threshold: {}", e); + tracing::error!(error = ?e, "Failed to update balance threshold"); } } @@ -322,7 +346,7 @@ impl EoaExecutorWorker { // Clean preparation results (handles failures and removes bad transactions) let cleaned_results = self - .clean_prepration_results(prepared_results_with_pending) + .clean_prepration_results(prepared_results_with_pending, true) .await?; if cleaned_results.is_empty() { @@ -392,12 +416,10 @@ impl EoaExecutorWorker { ); total_sent += processing_report.moved_to_submitted; - remaining_budget = remaining_budget.saturating_sub(moved_count as u64); - // If we didn't use all our budget in this iteration, we're likely done - if moved_count < batch_size { - break; - } + // Update remaining budget by actual nonce consumption + remaining_budget = + remaining_budget.saturating_sub(processing_report.moved_to_submitted as u64); } Ok(total_sent as u32) diff --git a/executors/src/eoa/worker/transaction.rs b/executors/src/eoa/worker/transaction.rs index cac5313..0c7efcf 100644 --- a/executors/src/eoa/worker/transaction.rs +++ b/executors/src/eoa/worker/transaction.rs @@ -50,7 +50,7 @@ impl EoaExecutorWorker { tokio::time::sleep(Duration::from_millis(delay)).await; tracing::debug!( - transaction_id = %pending_transaction.transaction_id, + transaction_id = ?pending_transaction.transaction_id, attempt = attempt, "Retrying transaction preparation" ); @@ -64,9 +64,9 @@ impl EoaExecutorWorker { Err(error) => { if is_retryable_preparation_error(&error) && attempt < MAX_PREPARATION_RETRIES { tracing::warn!( - transaction_id = %pending_transaction.transaction_id, + transaction_id = ?pending_transaction.transaction_id, attempt = attempt, - error = %error, + error = ?error, "Retryable error during transaction preparation, will retry" ); last_error = Some(error); diff --git a/executors/src/external_bundler/confirm.rs b/executors/src/external_bundler/confirm.rs index 273eb79..b44297c 100644 --- a/executors/src/external_bundler/confirm.rs +++ b/executors/src/external_bundler/confirm.rs @@ -204,10 +204,10 @@ where // 3. We got a receipt - that's confirmation success! // Whether it reverted or not is just information in the receipt tracing::info!( - transaction_id = %job_data.transaction_id, + transaction_id = job_data.transaction_id, user_op_hash = ?job_data.user_op_hash, transaction_hash = ?receipt.receipt.transaction_hash, - success = %receipt.success, + success = ?receipt.success, "User operation confirmed on-chain" ); @@ -240,7 +240,7 @@ where ); tracing::info!( - transaction_id = %job.job.data.transaction_id, + transaction_id = job.job.data.transaction_id, account_address = ?job.job.data.account_address, "Added atomic lock release and cache update to transaction pipeline" ); @@ -249,8 +249,8 @@ where // Queue success webhook if let Err(e) = self.queue_success_webhook(job, success_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue success webhook" ); } @@ -266,15 +266,15 @@ where // Just queue webhook with current status if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue nack webhook" ); } tracing::debug!( - transaction_id = %job.job.data.transaction_id, - attempt = %job.job.attempts, + transaction_id = job.job.data.transaction_id, + attempt = job.job.attempts, "Confirmation job NACKed, retaining lock for retry" ); } @@ -305,9 +305,9 @@ where }; tracing::error!( - transaction_id = %job.job.data.transaction_id, + transaction_id = job.job.data.transaction_id, account_address = ?job.job.data.account_address, - reason = %failure_reason, + reason = failure_reason, "Added lock release to transaction pipeline due to permanent failure" ); } @@ -315,8 +315,8 @@ where // Queue failure webhook if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue fail webhook" ); } diff --git a/executors/src/external_bundler/send.rs b/executors/src/external_bundler/send.rs index 427d53b..5f61c95 100644 --- a/executors/src/external_bundler/send.rs +++ b/executors/src/external_bundler/send.rs @@ -547,16 +547,16 @@ where if let Err(e) = tx.queue_job(confirmation_job) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue confirmation job" ); } if let Err(e) = self.queue_success_webhook(job, success_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue success webhook" ); } @@ -578,8 +578,8 @@ where if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue nack webhook" ); } @@ -605,8 +605,8 @@ where if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) { tracing::error!( - transaction_id = %job.job.data.transaction_id, - error = %e, + transaction_id = job.job.data.transaction_id, + error = ?e, "Failed to queue fail webhook" ); } diff --git a/executors/src/webhook/envelope.rs b/executors/src/webhook/envelope.rs index 0422f16..b043173 100644 --- a/executors/src/webhook/envelope.rs +++ b/executors/src/webhook/envelope.rs @@ -270,11 +270,11 @@ pub trait WebhookCapable: DurableExecution + ExecutorStage { tx.queue_job(webhook_job)?; tracing::info!( - transaction_id = %job.job.transaction_id(), - executor = %Self::executor_name(), - stage = %Self::stage_name(), + transaction_id = job.job.transaction_id(), + executor = Self::executor_name(), + stage = Self::stage_name(), event = ?envelope.event_type, - notification_id = %envelope.notification_id, + notification_id = envelope.notification_id, "Queued webhook notification" ); diff --git a/executors/src/webhook/mod.rs b/executors/src/webhook/mod.rs index e2cfc1b..c7a9e26 100644 --- a/executors/src/webhook/mod.rs +++ b/executors/src/webhook/mod.rs @@ -5,8 +5,8 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use engine_core::execution_options::WebhookOptions; use hex; use hmac::{Hmac, Mac}; -use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use reqwest::StatusCode; +use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use serde::{Deserialize, Serialize}; use twmq::error::TwmqError; use twmq::hooks::TransactionContext; @@ -237,10 +237,10 @@ impl DurableExecution for WebhookJobHandler { .body(payload.body.clone()); tracing::debug!( - job_id = %job.job.id, - url = %payload.url, - method = %http_method_str, - attempt = %job.job.attempts, + job_id = job.job.id, + url = payload.url, + method = http_method_str, + attempt = job.job.attempts, "Sending webhook request" ); @@ -259,13 +259,18 @@ impl DurableExecution for WebhookJobHandler { )); return Err(err).map_err_fail(); } - tracing::warn!(job_id = %job.job.id, "Failed to read response body for error status {}: {}", status, e); + tracing::warn!( + job_id = job.job.id, + "Failed to read response body for error status {}: {}", + status, + e + ); None } }; if status.is_success() { - tracing::info!(job_id = %job.job.id, status = %status, "Webhook delivered successfully"); + tracing::info!(job_id = job.job.id, status = ?status, "Webhook delivered successfully"); Ok(WebhookJobOutput { status_code: status.as_u16(), response_body: response_body_text, @@ -298,11 +303,11 @@ impl DurableExecution for WebhookJobHandler { let delay = Duration::from_millis(delay_ms); tracing::warn!( - job_id = %job.job.id, - status = %status, - attempt = %job.job.attempts, - max_attempts = %self.retry_config.max_attempts, - delay_ms = %delay.as_millis(), + job_id = job.job.id, + status = ?status, + attempt = job.job.attempts, + max_attempts = self.retry_config.max_attempts, + delay_ms = delay.as_millis(), "Webhook failed with retryable status, NACKing." ); Err(JobError::Nack { @@ -312,17 +317,17 @@ impl DurableExecution for WebhookJobHandler { }) } else { tracing::error!( - job_id = %job.job.id, - status = %status, - attempt = %job.job.attempts, + job_id = job.job.id, + status = ?status, + attempt = job.job.attempts, "Webhook failed after max attempts, FAILING." ); Err(JobError::Fail(webhook_error)) } } else { tracing::error!( - job_id = %job.job.id, - status = %status, + job_id = job.job.id, + status = ?status, "Webhook failed with non-retryable client error, FAILING." ); Err(JobError::Fail(webhook_error)) @@ -342,7 +347,7 @@ impl DurableExecution for WebhookJobHandler { && !e.is_connect() && !e.is_timeout() { - tracing::error!(job_id = %job.job.id, error = %webhook_error, "Webhook construction error, FAILING."); + tracing::error!(job_id = job.job.id, error = ?webhook_error, "Webhook construction error, FAILING."); return Err(JobError::Fail(webhook_error)); } @@ -356,11 +361,11 @@ impl DurableExecution for WebhookJobHandler { let delay = Duration::from_millis(delay_ms); tracing::warn!( - job_id = %job.job.id, - error = %webhook_error, - attempt = %job.job.attempts, - max_attempts = %self.retry_config.max_attempts, - delay_ms = %delay.as_millis(), + job_id = job.job.id, + error = ?webhook_error, + attempt = job.job.attempts, + max_attempts = self.retry_config.max_attempts, + delay_ms = delay.as_millis(), "Webhook request failed, NACKing." ); @@ -371,9 +376,9 @@ impl DurableExecution for WebhookJobHandler { }) } else { tracing::error!( - job_id = %job.job.id, - error = %webhook_error, - attempt = %job.job.attempts, + job_id = job.job.id, + error = ?webhook_error, + attempt = job.job.attempts, "Webhook request failed after max attempts, FAILING." ); Err(JobError::Fail(webhook_error)) @@ -390,9 +395,9 @@ impl DurableExecution for WebhookJobHandler { _tx: &mut TransactionContext<'_>, ) { tracing::info!( - job_id = %job.job.id, - url = %job.job.data.url, - status = %d.result.status_code, + job_id = job.job.id, + url = job.job.data.url, + status = d.result.status_code, "Webhook successfully processed (on_success hook)." ); } @@ -405,11 +410,11 @@ impl DurableExecution for WebhookJobHandler { _tx: &mut TransactionContext<'_>, ) { tracing::warn!( - job_id = %job.job.id, - url = %job.job.data.url, - attempt = %job.job.attempts, + job_id = job.job.id, + url = job.job.data.url, + attempt = job.job.attempts, error = ?d.error, - delay_ms = %d.delay.map_or(0, |dur| dur.as_millis()), + delay_ms = d.delay.map_or(0, |dur| dur.as_millis()), "Webhook NACKed (on_nack hook)." ); } @@ -422,9 +427,9 @@ impl DurableExecution for WebhookJobHandler { _tx: &mut TransactionContext<'_>, ) { tracing::error!( - job_id = %job.job.id, - url = %job.job.data.url, - attempt = %job.job.attempts, + job_id = job.job.id, + url = job.job.data.url, + attempt = job.job.attempts, error = ?d.error, "Webhook FAILED permanently (on_fail hook)." ); @@ -442,9 +447,12 @@ pub fn queue_webhook_envelopes( webhook_options .iter() .map(|webhook_option| { - let webhook_notification_envelope = envelope - .clone() - .into_webhook_notification_envelope(now, webhook_option.url.clone(), webhook_option.user_metadata.clone()); + let webhook_notification_envelope = + envelope.clone().into_webhook_notification_envelope( + now, + webhook_option.url.clone(), + webhook_option.user_metadata.clone(), + ); let serialised_envelope = serde_json::to_string(&webhook_notification_envelope)?; Ok(( serialised_envelope, @@ -493,11 +501,11 @@ pub fn queue_webhook_envelopes( tx.queue_job(webhook_job)?; tracing::info!( - transaction_id = %webhook_notification_envelope.transaction_id, - executor = %webhook_notification_envelope.executor_name, - stage = %webhook_notification_envelope.stage_name, + transaction_id = webhook_notification_envelope.transaction_id, + executor = webhook_notification_envelope.executor_name, + stage = webhook_notification_envelope.stage_name, event = ?webhook_notification_envelope.event_type, - notification_id = %webhook_notification_envelope.notification_id, + notification_id = webhook_notification_envelope.notification_id, "Queued webhook notification" ); } diff --git a/server/src/execution_router/mod.rs b/server/src/execution_router/mod.rs index 7325d4a..3d028bf 100644 --- a/server/src/execution_router/mod.rs +++ b/server/src/execution_router/mod.rs @@ -337,7 +337,7 @@ impl ExecutionRouter { .await?; tracing::debug!( - transaction_id = %base_execution_options.idempotency_key, + transaction_id = base_execution_options.idempotency_key, queue = "external_bundler_send", "Job queued successfully" ); @@ -358,8 +358,7 @@ impl ExecutionRouter { transaction_id: base_execution_options.idempotency_key.clone(), chain_id: base_execution_options.chain_id, transactions: transactions.to_vec(), - eoa_address: None, - execution_options: Some(eip7702_execution_options.clone()), + execution_options: eip7702_execution_options.clone(), signing_credential, webhook_options, rpc_credentials, @@ -383,7 +382,7 @@ impl ExecutionRouter { .await?; tracing::debug!( - transaction_id = %base_execution_options.idempotency_key, + transaction_id = base_execution_options.idempotency_key, queue = "eip7702_send", "Job queued successfully" ); @@ -504,9 +503,9 @@ impl ExecutionRouter { .await?; tracing::debug!( - transaction_id = %base_execution_options.idempotency_key, - eoa = %eoa_execution_options.from, - chain_id = %base_execution_options.chain_id, + transaction_id = base_execution_options.idempotency_key, + eoa = ?eoa_execution_options.from, + chain_id = base_execution_options.chain_id, queue = "eoa_executor", "EOA transaction added to store and worker job ensured" ); diff --git a/server/src/http/routes/admin/queue.rs b/server/src/http/routes/admin/queue.rs index 0026dfc..632070f 100644 --- a/server/src/http/routes/admin/queue.rs +++ b/server/src/http/routes/admin/queue.rs @@ -42,7 +42,7 @@ pub async fn empty_queue_idempotency_set( Path(queue_name): Path, ) -> Result { tracing::info!( - queue_name = %queue_name, + queue_name = queue_name, "Processing empty idempotency set request" ); @@ -100,7 +100,7 @@ pub async fn empty_queue_idempotency_set( match result { Ok(()) => { tracing::info!( - queue_name = %queue_name, + queue_name = queue_name, "Successfully emptied idempotency set" ); @@ -117,8 +117,8 @@ pub async fn empty_queue_idempotency_set( } Err(e) => { tracing::error!( - queue_name = %queue_name, - error = %e, + queue_name = queue_name, + error = ?e, "Failed to empty idempotency set" ); diff --git a/server/src/http/routes/contract_write.rs b/server/src/http/routes/contract_write.rs index acd4c41..7a762cb 100644 --- a/server/src/http/routes/contract_write.rs +++ b/server/src/http/routes/contract_write.rs @@ -113,9 +113,9 @@ pub async fn write_contract( let executor_type = request.execution_options.executor_type(); tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, executor_type = ?executor_type, - chain_id = %chain_id, + chain_id = chain_id, "Processing contract write request" ); @@ -186,7 +186,7 @@ pub async fn write_contract( .map_err(ApiEngineError)?; tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, executor_type = ?executor_type, "Contract write transaction queued successfully" ); diff --git a/server/src/http/routes/transaction.rs b/server/src/http/routes/transaction.rs index a14e75b..a791ef0 100644 --- a/server/src/http/routes/transaction.rs +++ b/server/src/http/routes/transaction.rs @@ -53,7 +53,7 @@ pub async fn cancel_transaction( Path(transaction_id): Path, ) -> Result { tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, "Processing transaction cancellation request" ); @@ -85,7 +85,7 @@ pub async fn cancel_transaction( .map_err(|e| ApiEngineError(e.into()))?; tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, "Transaction cancelled immediately" ); @@ -93,7 +93,7 @@ pub async fn cancel_transaction( } TwmqCancelResult::CancellationPending => { tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, "Transaction cancellation pending" ); @@ -101,7 +101,7 @@ pub async fn cancel_transaction( } TwmqCancelResult::NotFound => { tracing::warn!( - transaction_id = %transaction_id, + transaction_id = transaction_id, "Transaction not found in send queue" ); @@ -111,7 +111,7 @@ pub async fn cancel_transaction( } Some("userop_confirm") => { tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, "Cannot cancel transaction - already sent and waiting for mine" ); @@ -121,8 +121,8 @@ pub async fn cancel_transaction( } Some(other_queue) => { tracing::warn!( - transaction_id = %transaction_id, - queue = %other_queue, + transaction_id = transaction_id, + queue = other_queue, "Transaction in unsupported queue for cancellation" ); @@ -132,7 +132,7 @@ pub async fn cancel_transaction( } None => { tracing::warn!( - transaction_id = %transaction_id, + transaction_id = transaction_id, "Transaction not found in registry" ); diff --git a/server/src/http/routes/transaction_write.rs b/server/src/http/routes/transaction_write.rs index 0977eb6..6a40034 100644 --- a/server/src/http/routes/transaction_write.rs +++ b/server/src/http/routes/transaction_write.rs @@ -47,9 +47,9 @@ pub async fn write_transaction( let executor_type = request.execution_options.executor_type(); tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, executor_type = ?executor_type, - chain_id = %request.execution_options.chain_id(), + chain_id = request.execution_options.chain_id(), "Processing transaction request" ); @@ -60,7 +60,7 @@ pub async fn write_transaction( .map_err(ApiEngineError)?; tracing::info!( - transaction_id = %transaction_id, + transaction_id = transaction_id, executor_type = ?executor_type, "Transaction queued successfully" ); diff --git a/twmq/src/lib.rs b/twmq/src/lib.rs index 96582bb..fcb0d47 100644 --- a/twmq/src/lib.rs +++ b/twmq/src/lib.rs @@ -446,7 +446,7 @@ impl Queue { // Process the cancellation through hook system if let Err(e) = self.process_cancelled_job(job_id).await { tracing::error!( - job_id = %job_id, + job_id = job_id, error = ?e, "Failed to process immediately cancelled job" ); @@ -757,10 +757,10 @@ impl Queue { // Log individual lease timeouts and cancellations for job_id in &timed_out_jobs { - tracing::warn!(job_id = %job_id, "Job lease expired, moved back to pending"); + tracing::warn!(job_id = job_id, "Job lease expired, moved back to pending"); } for job_id in &cancelled_jobs { - tracing::info!(job_id = %job_id, "Job cancelled by user request"); + tracing::info!(job_id = job_id, "Job cancelled by user request"); } let mut jobs = Vec::new(); @@ -841,7 +841,7 @@ impl Queue { tokio::spawn(async move { if let Err(e) = queue_clone.process_cancelled_job(&job_id).await { tracing::error!( - job_id = %job_id, + job_id = job_id, error = ?e, "Failed to process cancelled job" ); @@ -884,7 +884,7 @@ impl Queue { pipeline.query_async::<()>(&mut self.redis.clone()).await?; tracing::info!( - job_id = %job_id, + job_id = job_id, "Successfully processed job cancellation hooks" ); @@ -892,7 +892,7 @@ impl Queue { } None => { tracing::warn!( - job_id = %job_id, + job_id = job_id, "Cancelled job not found when trying to process hooks" ); Ok(()) @@ -1190,7 +1190,7 @@ impl Queue { let lease_exists: bool = conn.exists(&lease_key).await?; if !lease_exists { redis::cmd("UNWATCH").query_async::<()>(&mut conn).await?; - tracing::warn!(job_id = %job.job.id, "Lease no longer exists, job was cancelled or timed out"); + tracing::warn!(job_id = job.job.id, "Lease no longer exists, job was cancelled or timed out"); return Ok(()); } @@ -1211,12 +1211,12 @@ impl Queue { Err(JobError::Fail(_)) => self.post_fail_completion().await?, } - tracing::debug!(job_id = %job.job.id, "Job completion successful"); + tracing::debug!(job_id = job.job.id, "Job completion successful"); return Ok(()); } Err(_) => { // WATCH failed (lease key changed), retry - tracing::debug!(job_id = %job.job.id, "WATCH failed during completion, retrying"); + tracing::debug!(job_id = job.job.id, "WATCH failed during completion, retrying"); continue; } } @@ -1290,7 +1290,7 @@ impl Queue { let lease_exists: bool = conn.exists(&lease_key).await?; if !lease_exists { redis::cmd("UNWATCH").query_async::<()>(&mut conn).await?; - tracing::warn!(job_id = %job.id, "Lease no longer exists, job was cancelled or timed out"); + tracing::warn!(job_id = job.id, "Lease no longer exists, job was cancelled or timed out"); return Ok(()); } @@ -1306,12 +1306,12 @@ impl Queue { Ok(_) => { // Success! Run post-completion self.post_fail_completion().await?; - tracing::debug!(job_id = %job.id, "Queue error job completion successful"); + tracing::debug!(job_id = job.id, "Queue error job completion successful"); return Ok(()); } Err(_) => { // WATCH failed (lease key changed), retry - tracing::debug!(job_id = %job.id, "WATCH failed during queue error completion, retrying"); + tracing::debug!(job_id = job.id, "WATCH failed during queue error completion, retrying"); continue; } } diff --git a/twmq/src/multilane.rs b/twmq/src/multilane.rs index bb9b359..cd25ae3 100644 --- a/twmq/src/multilane.rs +++ b/twmq/src/multilane.rs @@ -424,7 +424,7 @@ impl MultilaneQueue { "cancelled_immediately" => { if let Err(e) = self.process_cancelled_job(job_id).await { tracing::error!( - job_id = %job_id, + job_id = job_id, error = ?e, "Failed to process immediately cancelled job" ); @@ -771,10 +771,18 @@ impl MultilaneQueue { // Log lease timeouts and cancellations with lane context for (lane_id, job_id) in &timed_out_jobs { - tracing::warn!(job_id = %job_id, lane_id = %lane_id, "Job lease expired, moved back to pending"); + tracing::warn!( + job_id = job_id, + lane_id = lane_id, + "Job lease expired, moved back to pending" + ); } for (lane_id, job_id) in &cancelled_jobs { - tracing::info!(job_id = %job_id, lane_id = %lane_id, "Job cancelled by user request"); + tracing::info!( + job_id = job_id, + lane_id = lane_id, + "Job cancelled by user request" + ); } let mut jobs = Vec::new(); @@ -856,8 +864,8 @@ impl MultilaneQueue { tokio::spawn(async move { if let Err(e) = queue_clone.process_cancelled_job(&job_id).await { tracing::error!( - job_id = %job_id, - lane_id = %lane_id, + job_id = job_id, + lane_id = lane_id, error = ?e, "Failed to process cancelled job" ); @@ -892,7 +900,7 @@ impl MultilaneQueue { pipeline.query_async::<()>(&mut self.redis.clone()).await?; tracing::info!( - job_id = %job_id, + job_id = job_id, "Successfully processed job cancellation hooks" ); @@ -900,7 +908,7 @@ impl MultilaneQueue { } None => { tracing::warn!( - job_id = %job_id, + job_id = job_id, "Cancelled job not found when trying to process hooks" ); Ok(()) @@ -1216,7 +1224,10 @@ impl MultilaneQueue { let lease_exists: bool = conn.exists(&lease_key).await?; if !lease_exists { redis::cmd("UNWATCH").query_async::<()>(&mut conn).await?; - tracing::warn!(job_id = %job.job.id, "Lease no longer exists, job was cancelled or timed out"); + tracing::warn!( + job_id = job.job.id, + "Lease no longer exists, job was cancelled or timed out" + ); return Ok(()); } @@ -1234,11 +1245,18 @@ impl MultilaneQueue { Err(JobError::Fail(_)) => self.post_fail_completion().await?, } - tracing::debug!(job_id = %job.job.id, lane_id = %lane_id, "Job completion successful"); + tracing::debug!( + job_id = job.job.id, + lane_id = lane_id, + "Job completion successful" + ); return Ok(()); } Err(_) => { - tracing::debug!(job_id = %job.job.id, "WATCH failed during completion, retrying"); + tracing::debug!( + job_id = job.job.id, + "WATCH failed during completion, retrying" + ); continue; } } @@ -1313,7 +1331,10 @@ impl MultilaneQueue { let lease_exists: bool = conn.exists(&lease_key).await?; if !lease_exists { redis::cmd("UNWATCH").query_async::<()>(&mut conn).await?; - tracing::warn!(job_id = %job.id, "Lease no longer exists, job was cancelled or timed out"); + tracing::warn!( + job_id = job.id, + "Lease no longer exists, job was cancelled or timed out" + ); return Ok(()); } @@ -1326,11 +1347,18 @@ impl MultilaneQueue { { Ok(_) => { self.post_fail_completion().await?; - tracing::debug!(job_id = %job.id, lane_id = %lane_id, "Queue error job completion successful"); + tracing::debug!( + job_id = job.id, + lane_id = lane_id, + "Queue error job completion successful" + ); return Ok(()); } Err(_) => { - tracing::debug!(job_id = %job.id, "WATCH failed during queue error completion, retrying"); + tracing::debug!( + job_id = job.id, + "WATCH failed during queue error completion, retrying" + ); continue; } }