Skip to content

Commit c839569

Browse files
authored
Enhance transaction hydration logic to handle missing data by deleting entries from Redis. Update results type to accommodate optional values and implement deletion pipeline for missing transactions. (#64)
1 parent 5879bae commit c839569

File tree

1 file changed

+30
-2
lines changed

1 file changed

+30
-2
lines changed

executors/src/eoa/store/hydrate.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,39 @@ impl EoaExecutorStore {
8080
);
8181
}
8282

83-
let results: Vec<String> = pipe.query_async(&mut self.redis.clone()).await?;
83+
let results: Vec<Option<String>> = pipe.query_async(&mut self.redis.clone()).await?;
8484

8585
let mut hydrated = Vec::with_capacity(dehydrated.len());
86+
87+
let mut deletion_pipe = twmq::redis::pipe();
88+
8689
for (d, r) in dehydrated.into_iter().zip(results.iter()) {
87-
hydrated.push(d.hydrate(serde_json::from_str::<EoaTransactionRequest>(r)?));
90+
match r {
91+
Some(r) => {
92+
hydrated.push(d.hydrate(serde_json::from_str::<EoaTransactionRequest>(r)?))
93+
}
94+
None => {
95+
// delete this transaction entry from pending, borrowed, submitted
96+
deletion_pipe.zrem(
97+
self.keys.pending_transactions_zset_name(),
98+
d.transaction_id(),
99+
);
100+
deletion_pipe.hdel(
101+
self.keys.borrowed_transactions_hashmap_name(),
102+
d.transaction_id(),
103+
);
104+
tracing::warn!(
105+
"Transaction {} data was missing, deleting transaction from redis",
106+
d.transaction_id()
107+
);
108+
}
109+
}
110+
}
111+
112+
if !deletion_pipe.is_empty() {
113+
deletion_pipe
114+
.query_async::<()>(&mut self.redis.clone())
115+
.await?;
88116
}
89117

90118
Ok(hydrated)

0 commit comments

Comments
 (0)