Skip to content

Commit 029bc40

Browse files
authored
cleanup bad transactions from pending in peek (#65)
1 parent c839569 commit 029bc40

File tree

1 file changed

+45
-18
lines changed

1 file changed

+45
-18
lines changed

executors/src/eoa/store/mod.rs

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -577,24 +577,51 @@ impl EoaExecutorStore {
577577
pipe.hget(&tx_data_key, "user_request");
578578
}
579579

580-
let user_requests: Vec<String> = pipe.query_async(&mut conn).await?;
581-
582-
let user_requests: Vec<EoaTransactionRequest> = user_requests
583-
.into_iter()
584-
.map(|user_request_json| serde_json::from_str(&user_request_json))
585-
.collect::<Result<Vec<EoaTransactionRequest>, serde_json::Error>>()?;
586-
587-
let pending_transactions: Vec<PendingTransaction> = transaction_ids
588-
.into_iter()
589-
.zip(user_requests)
590-
.map(
591-
|((transaction_id, queued_at), user_request)| PendingTransaction {
592-
transaction_id,
593-
queued_at,
594-
user_request,
595-
},
596-
)
597-
.collect();
580+
let user_requests: Vec<Option<String>> = pipe.query_async(&mut conn).await?;
581+
582+
let mut pending_transactions: Vec<PendingTransaction> = Vec::new();
583+
let mut deletion_pipe = twmq::redis::pipe();
584+
585+
for ((transaction_id, queued_at), user_request) in transaction_ids.into_iter().zip(user_requests) {
586+
match user_request {
587+
Some(user_request) => {
588+
let user_request_parsed = serde_json::from_str(&user_request)?;
589+
pending_transactions.push(PendingTransaction {
590+
transaction_id,
591+
queued_at,
592+
user_request: user_request_parsed,
593+
});
594+
}
595+
None => {
596+
tracing::warn!(
597+
"Transaction {} data was missing, deleting transaction from redis",
598+
transaction_id
599+
);
600+
deletion_pipe.zrem(self.keys.pending_transactions_zset_name(), transaction_id);
601+
}
602+
}
603+
}
604+
605+
if !deletion_pipe.is_empty() {
606+
deletion_pipe.query_async::<()>(&mut conn).await?;
607+
}
608+
609+
// let user_requests: Vec<EoaTransactionRequest> = user_requests
610+
// .into_iter()
611+
// .map(|user_request_json| serde_json::from_str(&user_request_json))
612+
// .collect::<Result<Vec<EoaTransactionRequest>, serde_json::Error>>()?;
613+
614+
// let pending_transactions: Vec<PendingTransaction> = transaction_ids
615+
// .into_iter()
616+
// .zip(user_requests)
617+
// .map(
618+
// |((transaction_id, queued_at), user_request)| PendingTransaction {
619+
// transaction_id,
620+
// queued_at,
621+
// user_request,
622+
// },
623+
// )
624+
// .collect();
598625

599626
Ok(pending_transactions)
600627
}

0 commit comments

Comments
 (0)