Skip to content

Commit a67ef95

Browse files
committed
Better payload job log message
1 parent 2bebf2b commit a67ef95

File tree

6 files changed

+85
-36
lines changed

6 files changed

+85
-36
lines changed

src/pipelines/job.rs

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
use {
22
super::traits,
33
crate::{
4+
alloy,
45
pipelines::{exec::PipelineExecutor, service::ServiceContext},
56
prelude::*,
6-
reth::{
7-
api::PayloadBuilderAttributes,
8-
payload::builder::{PayloadJob as RethPayloadJobTrait, *},
9-
},
7+
reth,
108
},
9+
alloy::consensus::BlockHeader,
1110
core::{
1211
pin::Pin,
1312
task::{Context, Poll},
1413
},
1514
futures::{FutureExt, future::Shared},
16-
reth_node_builder::BuiltPayload,
17-
std::sync::Arc,
15+
reth::{
16+
api::PayloadBuilderAttributes,
17+
node::builder::{BlockBody, BuiltPayload},
18+
payload::builder::{PayloadJob as RethPayloadJobTrait, *},
19+
},
20+
std::{fmt::Write as _, sync::Arc, time::Instant},
1821
tracing::{debug, warn},
1922
};
2023

@@ -146,6 +149,7 @@ where
146149
Provider: traits::ProviderBounds<P>,
147150
{
148151
payload_id: PayloadId,
152+
started_at: Instant,
149153
state: ExecutorFutureState<P, Provider>,
150154
}
151155

@@ -173,10 +177,58 @@ where
173177
{
174178
pub fn new(executor: PipelineExecutor<P, Provider>) -> Self {
175179
Self {
180+
started_at: Instant::now(),
176181
payload_id: executor.payload_id(),
177182
state: ExecutorFutureState::Future(executor.shared()),
178183
}
179184
}
185+
186+
fn log_payload_job_result(
187+
&self,
188+
result: &Result<types::BuiltPayload<P>, Arc<PayloadBuilderError>>,
189+
) -> core::fmt::Result {
190+
match &result {
191+
Ok(built_payload) => {
192+
let built_block = built_payload.block();
193+
let fees = built_payload.fees();
194+
195+
let mut out = String::new();
196+
writeln!(&mut out, "Payload job {}:", self.payload_id)?;
197+
writeln!(&mut out, " Status: Success")?;
198+
writeln!(&mut out, " Duration: {:?}", self.started_at.elapsed())?;
199+
writeln!(&mut out, " Fees: {fees}")?;
200+
writeln!(&mut out, " Built Block:")?;
201+
writeln!(&mut out, " Number: {:?}", built_block.number())?;
202+
writeln!(&mut out, " Hash: {:?}", built_block.hash())?;
203+
writeln!(&mut out, " Parent: {:?}", built_block.parent_hash())?;
204+
writeln!(&mut out, " Timestamp: {:?}", built_block.timestamp())?;
205+
writeln!(
206+
&mut out,
207+
" Gas: {}/{} ({}%)",
208+
built_block.gas_used(),
209+
built_block.gas_limit(),
210+
built_block.gas_used() * 100 / built_block.gas_limit()
211+
)?;
212+
writeln!(
213+
&mut out,
214+
" Transactions: {}",
215+
built_block.body().transactions().len()
216+
)?;
217+
218+
debug!("{out}");
219+
}
220+
Err(error) => {
221+
let mut out = String::new();
222+
writeln!(&mut out, "Payload job {}:", self.payload_id)?;
223+
writeln!(&mut out, " Status: Failed")?;
224+
writeln!(&mut out, " Duration: {:?}", self.started_at.elapsed())?;
225+
writeln!(&mut out, " Error: {error:#?}")?;
226+
warn!("{out}");
227+
}
228+
}
229+
230+
Ok(())
231+
}
180232
}
181233

182234
impl<P, Provider> Future for ExecutorFuture<P, Provider>
@@ -202,7 +254,7 @@ where
202254
Poll::Ready(result) => {
203255
// got a result. All future polls will return the result directly
204256
// without polling the executor again.
205-
log_payload_job_result::<P>(this.payload_id, &result);
257+
let _ = this.log_payload_job_result(&result);
206258

207259
this.state = ExecutorFutureState::Ready(result.clone());
208260
Poll::Ready(
@@ -230,6 +282,7 @@ where
230282
fn clone(&self) -> Self {
231283
Self {
232284
payload_id: self.payload_id,
285+
started_at: self.started_at,
233286
state: match &self.state {
234287
ExecutorFutureState::Ready(result) => {
235288
ExecutorFutureState::Ready(result.clone())
@@ -241,22 +294,3 @@ where
241294
}
242295
}
243296
}
244-
245-
fn log_payload_job_result<P: Platform>(
246-
payload_id: PayloadId,
247-
result: &Result<types::BuiltPayload<P>, Arc<PayloadBuilderError>>,
248-
) {
249-
match &result {
250-
Ok(built_payload) => {
251-
let built_block = built_payload.block();
252-
let fees = built_payload.fees();
253-
254-
debug!(
255-
"Payload job {payload_id} completed: {built_block:#?}, fees: {fees}"
256-
);
257-
}
258-
Err(error) => {
259-
warn!("Payload job {payload_id} failed: {error:#?}");
260-
}
261-
}
262-
}

src/pool/host.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use {
1111
StateProviderFactory,
1212
},
1313
tasks::shutdown::Shutdown,
14+
transaction_pool::TransactionPool,
1415
},
1516
std::sync::OnceLock,
1617
tracing::debug,
@@ -104,6 +105,14 @@ impl<P: Platform> HostNode<P> {
104105
pub fn system_pool(&self) -> Option<&impl traits::PoolBounds<P>> {
105106
self.instances.get().map(|i| &i.system_pool)
106107
}
108+
109+
/// If attached to a host node, this will remove a transaction from the reth
110+
/// native transaction pool.
111+
pub fn remove_transaction(&self, txhash: TxHash) {
112+
if let Some(pool) = self.instances.get().map(|i| &i.system_pool) {
113+
pool.remove_transactions(vec![txhash]);
114+
}
115+
}
107116
}
108117

109118
impl<P: Platform> HostNode<P> {

src/pool/maintain.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ impl<P: Platform> OrderPool<P> {
4949
tokio::select! {
5050
Some(OrderInclusionAttempt(order, payload_id)) = inclusion.next() => {
5151
tracing::trace!(">--> order inclusion attempt: {order} in payload job {payload_id}");
52+
order_pool.report_inclusion_attempt(order, payload_id);
5253
}
5354
Some(OrderInclusionSuccess(order, payload_id)) = success.next() => {
5455
tracing::trace!(">--> order inclusion success: {order} in payload job {payload_id}");

src/pool/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,14 @@ impl<P: Platform> OrderPool<P> {
108108

109109
/// Removes an order from the pool and makes it no longer available through
110110
/// `best_orders()`.
111-
pub fn remove(&self, order_hash: &B256) -> Option<Order<P>> {
112-
self.inner.orders.remove(order_hash).map(|(_, order)| order)
111+
pub fn remove(&self, order_hash: &B256) {
112+
self.inner.orders.remove(order_hash);
113+
self.inner.host.remove_transaction(*order_hash);
113114
}
114115

115116
/// Removes all orders that contain the a specific transaction hash.
116117
pub fn remove_any_with(&self, txhash: TxHash) {
118+
self.inner.host.remove_transaction(txhash);
117119
if let Some((_, orders)) = self.inner.txmap.remove(&txhash) {
118120
for order_hash in orders {
119121
self.remove(&order_hash);

src/pool/report.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use {
88
super::*,
99
reth::{node::builder::BlockBody, primitives::SealedBlock},
10+
reth_payload_builder::PayloadId,
1011
tracing::trace,
1112
};
1213

@@ -48,12 +49,8 @@ impl<P: Platform> OrderPool<P> {
4849
/// and there was an attempt to include it in a payload. Once an order was
4950
/// proposed once by the pool, it will be removed from the orders list and
5051
/// will not be proposed again.
51-
pub fn report_inclusion_attempt(
52-
&self,
53-
_order_hash: B256,
54-
_block: &BlockContext<P>,
55-
) {
56-
// todo
52+
pub fn report_inclusion_attempt(&self, _order_hash: B256, _: PayloadId) {
53+
// self.remove(&order_hash);
5754
}
5855

5956
/// Signals to the order pool that a block has been committed to the chain.

src/pool/step.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use {
66
dashmap::DashSet,
77
metrics::{Counter, Histogram},
88
reth::{
9+
chainspec::MIN_TRANSACTION_GAS,
910
ethereum::primitives::SignedTransaction,
1011
payload::builder::PayloadId,
1112
},
@@ -279,7 +280,13 @@ impl<'a, P: Platform> Run<'a, P> {
279280
return true;
280281
}
281282

282-
if self.payload.cumulative_gas_used() >= self.limits().gas_limit {
283+
let remaining_gas = self
284+
.limits()
285+
.gas_limit
286+
.saturating_sub(self.payload.cumulative_gas_used());
287+
288+
if remaining_gas < MIN_TRANSACTION_GAS {
289+
// won't be able to fit any more transactions
283290
return true;
284291
}
285292

@@ -388,7 +395,6 @@ impl<'a, P: Platform> Run<'a, P> {
388395
if candidate.is_bundle() {
389396
self.step.metrics.bundles_skipped.increment(1);
390397
}
391-
392398
return;
393399
}
394400

0 commit comments

Comments
 (0)