Skip to content

Commit 715871e

Browse files
committed
Decoupled Pipelines from Reth Transaction Pool
1 parent fb6874d commit 715871e

File tree

23 files changed

+322
-218
lines changed

23 files changed

+322
-218
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ members = [
77
resolver = "2"
88

99
[workspace.package]
10-
version = "0.2.2"
10+
version = "0.3.0"
1111
edition = "2024"
1212
rust-version = "1.87"
1313
license = "MIT"

src/pipelines/exec/mod.rs

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
//! [`BlockBuilder::apply_pre_execution_changes`] applied to its state.
1212
1313
use {
14-
super::service::ServiceContext,
14+
super::{StepInstance, service::ServiceContext},
1515
crate::{prelude::*, reth},
1616
core::{
1717
pin::Pin,
@@ -38,10 +38,9 @@ type PipelineOutput<P: Platform> =
3838
/// the step is async and needs many polls before it completes. The executor
3939
/// future will resolve when the whole pipeline has been executed, or when an
4040
/// error occurs.
41-
pub(super) struct PipelineExecutor<P, Provider, Pool>
41+
pub(super) struct PipelineExecutor<P, Provider>
4242
where
4343
P: Platform,
44-
Pool: traits::PoolBounds<P>,
4544
Provider: traits::ProviderBounds<P>,
4645
{
4746
/// The current state of the executor state machine.
@@ -54,24 +53,21 @@ where
5453
block: BlockContext<P>,
5554

5655
// The reth payload builder service context that is running this payload job.
57-
service: Arc<ServiceContext<P, Provider, Pool>>,
56+
service: Arc<ServiceContext<P, Provider>>,
5857

5958
/// Execution scopes. This root scope represents the top-level pipeline that
6059
/// may contain nested scopes for each nested pipeline.
6160
scope: Arc<RootScope>,
6261
}
6362

64-
impl<
65-
P: Platform,
66-
Provider: traits::ProviderBounds<P>,
67-
Pool: traits::PoolBounds<P>,
68-
> PipelineExecutor<P, Provider, Pool>
63+
impl<P: Platform, Provider: traits::ProviderBounds<P>>
64+
PipelineExecutor<P, Provider>
6965
{
7066
/// Begins the execution of a pipeline for a new block/payload job.
7167
pub fn run(
7268
pipeline: Arc<Pipeline<P>>,
7369
block: BlockContext<P>,
74-
service: Arc<ServiceContext<P, Provider, Pool>>,
70+
service: Arc<ServiceContext<P, Provider>>,
7571
) -> Self {
7672
// Emit a system event for this new payload job and record initial metrics.
7773
pipeline.events.publish(PayloadJobStarted(block.clone()));
@@ -89,7 +85,6 @@ impl<
8985
cursor: Cursor::<P>::Initializing({
9086
let block = block.clone();
9187
let pipeline = Arc::clone(&pipeline);
92-
let service = Arc::clone(&service);
9388
let scope = Arc::clone(&root);
9489

9590
async move {
@@ -102,7 +97,7 @@ impl<
10297
implementation.",
10398
);
10499
let scope = scope.of(&step).expect("invalid step path");
105-
let ctx = StepContext::new(&block, &service, &navi, scope);
100+
let ctx = StepContext::new(&block, &navi, scope);
106101
navi.instance().before_job(ctx).await?;
107102
}
108103
Ok(())
@@ -123,11 +118,8 @@ impl<
123118
}
124119

125120
/// private implementation details for the `PipelineExecutor`.
126-
impl<
127-
P: Platform,
128-
Provider: traits::ProviderBounds<P>,
129-
Pool: traits::PoolBounds<P>,
130-
> PipelineExecutor<P, Provider, Pool>
121+
impl<P: Platform, Provider: traits::ProviderBounds<P>>
122+
PipelineExecutor<P, Provider>
131123
{
132124
/// This method creates a future that encapsulates the execution an an async
133125
/// step. The created future will be held inside `Cursor::StepInProgress` and
@@ -151,7 +143,7 @@ impl<
151143
implementation.",
152144
);
153145

154-
let ctx = StepContext::new(&self.block, &self.service, &step_navi, scope);
146+
let ctx = StepContext::new(&self.block, &step_navi, scope);
155147
let step = Arc::clone(step_navi.instance());
156148
async move { step.step(input, ctx).await }.boxed()
157149
}
@@ -236,7 +228,6 @@ impl<
236228
let pipeline = Arc::clone(&self.pipeline);
237229
let block = self.block.clone();
238230
let pipeline = Arc::clone(&pipeline);
239-
let service = Arc::clone(&self.service);
240231
let scope = Arc::clone(&self.scope);
241232

242233
async move {
@@ -249,7 +240,7 @@ impl<
249240
implementation.",
250241
);
251242
let scope = scope.of(&step).expect("invalid step path");
252-
let ctx = StepContext::new(&block, &service, &navi, scope);
243+
let ctx = StepContext::new(&block, &navi, scope);
253244
navi.instance().after_job(ctx, output.clone()).await?;
254245
}
255246

@@ -264,11 +255,10 @@ impl<
264255
}
265256
}
266257

267-
impl<P, Provider, Pool> Future for PipelineExecutor<P, Provider, Pool>
258+
impl<P, Provider> Future for PipelineExecutor<P, Provider>
268259
where
269260
P: Platform,
270261
Provider: traits::ProviderBounds<P>,
271-
Pool: traits::PoolBounds<P>,
272262
{
273263
type Output = PipelineOutput<P>;
274264

src/pipelines/exec/navi.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
//! etc.
66
77
use {
8-
super::super::step::instance::StepInstance,
8+
super::StepInstance,
99
crate::prelude::*,
1010
core::fmt::Formatter,
1111
derive_more::{From, Into},

src/pipelines/exec/scope.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ impl RootScope {
7070
unsafe impl Send for RootScope {}
7171
unsafe impl Sync for RootScope {}
7272

73+
/// Given a path to a step in the pipeline, returns a path to the immediate
74+
/// pipeline that contains it.
7375
#[inline]
7476
fn scope_of(step: &StepPath) -> StepPath {
7577
step.clone().remove_leaf().unwrap_or(StepPath::empty())
@@ -79,14 +81,14 @@ fn scope_of(step: &StepPath) -> StepPath {
7981
///
8082
/// Each pipeline has its scope that may include nested scopes for each nested
8183
/// pipeline. Scopes are used to manage limits and metrics for each pipeline
82-
/// execution. All steps in a pipeline run within the scope of the pipeline that
83-
/// contains it. When a scope is active, then all its parent scopes are active
84-
/// as well.
84+
/// execution. All steps in a pipeline run within the scopes of the pipelines
85+
/// that contain it. When a scope is active, then all its parent scopes are
86+
/// active as well.
8587
#[derive(Debug)]
8688
pub struct Scope {
8789
limits: Limits,
8890
metrics: Metrics,
89-
entered: Cell<Option<Instant>>,
91+
entered_at: Cell<Option<Instant>>,
9092
enter_counter: AtomicU32,
9193
nested: HashMap<usize, Scope>,
9294
}
@@ -96,18 +98,18 @@ impl Scope {
9698
/// When a scope is active it means that one of its steps (or in its nested
9799
/// scopes) is currently being executed,
98100
pub const fn is_active(&self) -> bool {
99-
self.entered.get().is_some()
101+
self.entered_at.get().is_some()
100102
}
101103

102104
/// Returns the elapsed time since the scope was entered.
103105
/// This will only return a value if the scope is currently active.
104106
pub fn elapsed(&self) -> Option<Duration> {
105-
self.entered.get().map(|start| start.elapsed())
107+
self.entered_at.get().map(|start| start.elapsed())
106108
}
107109

108110
/// Returns when the scope was entered most recently.
109111
pub fn started_at(&self) -> Option<Instant> {
110-
self.entered.get()
112+
self.entered_at.get()
111113
}
112114

113115
/// Returns the payload limits for steps running within the current scope.
@@ -143,14 +145,14 @@ impl Scope {
143145
limits,
144146
metrics,
145147
nested,
146-
entered: Cell::new(None),
148+
entered_at: Cell::new(None),
147149
enter_counter: AtomicU32::new(0),
148150
}
149151
}
150152

151153
fn enter(&self) {
152154
assert!(!self.is_active(), "Scope is already active");
153-
self.entered.set(Some(Instant::now()));
155+
self.entered_at.set(Some(Instant::now()));
154156
self.enter_counter.fetch_add(1, Ordering::Relaxed);
155157
self.metrics.iter_count_total.increment(1);
156158
}
@@ -175,7 +177,7 @@ impl Scope {
175177
.increment(duration.as_millis() as u64);
176178

177179
self.metrics.exec_duration_histogram.record(duration);
178-
self.entered.replace(None);
180+
self.entered_at.replace(None);
179181
}
180182

181183
/// Returns the scope with a given path.
@@ -221,7 +223,7 @@ impl Scope {
221223
limits,
222224
metrics,
223225
nested,
224-
entered: Cell::new(None),
226+
entered_at: Cell::new(None),
225227
enter_counter: AtomicU32::new(0),
226228
}
227229
}

src/pipelines/job.rs

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,24 @@ use {
2828
/// This is a long-running job that will be polled by the CL node until it is
2929
/// resolved. The job future must resolve within 1 second from the moment
3030
/// [`PayloadJob::resolve_kind`] is called with [`PaylodKind::Earliest`].
31-
pub struct PayloadJob<P, Provider, Pool>
31+
pub struct PayloadJob<P, Provider>
3232
where
3333
P: Platform,
34-
Pool: traits::PoolBounds<P>,
3534
Provider: traits::ProviderBounds<P>,
3635
{
3736
block: BlockContext<P>,
38-
fut: ExecutorFuture<P, Provider, Pool>,
37+
fut: ExecutorFuture<P, Provider>,
3938
}
4039

41-
impl<P, Provider, Pool> PayloadJob<P, Provider, Pool>
40+
impl<P, Provider> PayloadJob<P, Provider>
4241
where
4342
P: Platform,
44-
Pool: traits::PoolBounds<P>,
4543
Provider: traits::ProviderBounds<P>,
4644
{
4745
pub fn new(
4846
pipeline: &Arc<Pipeline<P>>,
4947
block: BlockContext<P>,
50-
service: &Arc<ServiceContext<P, Provider, Pool>>,
48+
service: &Arc<ServiceContext<P, Provider>>,
5149
) -> Self {
5250
debug!(
5351
"New Payload Job {} with block context: {block:#?}",
@@ -64,15 +62,14 @@ where
6462
}
6563
}
6664

67-
impl<P, Provider, Pool> RethPayloadJobTrait for PayloadJob<P, Provider, Pool>
65+
impl<P, Provider> RethPayloadJobTrait for PayloadJob<P, Provider>
6866
where
6967
P: Platform,
70-
Pool: traits::PoolBounds<P>,
7168
Provider: traits::ProviderBounds<P>,
7269
{
7370
type BuiltPayload = types::BuiltPayload<P>;
7471
type PayloadAttributes = types::PayloadBuilderAttributes<P>;
75-
type ResolvePayloadFuture = ExecutorFuture<P, Provider, Pool>;
72+
type ResolvePayloadFuture = ExecutorFuture<P, Provider>;
7673

7774
fn best_payload(&self) -> Result<Self::BuiltPayload, PayloadBuilderError> {
7875
unimplemented!("PayloadJob::best_payload is not implemented");
@@ -111,10 +108,9 @@ where
111108
/// This future is polled for the first time by the Reth runtime when the
112109
/// `PayloadJob` is created. Here we want to immediately start executing
113110
/// the pipeline instead of waiting for the `resolve_kind` to be called.
114-
impl<P, Provider, Pool> Future for PayloadJob<P, Provider, Pool>
111+
impl<P, Provider> Future for PayloadJob<P, Provider>
115112
where
116113
P: Platform,
117-
Pool: traits::PoolBounds<P>,
118114
Provider: traits::ProviderBounds<P>,
119115
{
120116
type Output = Result<(), PayloadBuilderError>;
@@ -144,14 +140,13 @@ where
144140
/// This future wraps the `PipelineExecutor` and is used to poll the
145141
/// internal executor of the pipeline. Once this future is resolved, it
146142
/// can be polled again and will return copie of the resolved payload.
147-
pub struct ExecutorFuture<P, Provider, Pool>
143+
pub struct ExecutorFuture<P, Provider>
148144
where
149145
P: Platform,
150-
Pool: traits::PoolBounds<P>,
151146
Provider: traits::ProviderBounds<P>,
152147
{
153148
payload_id: PayloadId,
154-
state: ExecutorFutureState<P, Provider, Pool>,
149+
state: ExecutorFutureState<P, Provider>,
155150
}
156151

157152
/// This enum allows us to wrap the `PipelineExecutor` future
@@ -162,35 +157,32 @@ where
162157
/// Whenever any of the copies of the future is polled, it will poll the
163158
/// executor, if any copy resolved, all copies will also resolve with the same
164159
/// result.
165-
enum ExecutorFutureState<P, Provider, Pool>
160+
enum ExecutorFutureState<P, Provider>
166161
where
167162
P: Platform,
168-
Pool: traits::PoolBounds<P>,
169163
Provider: traits::ProviderBounds<P>,
170164
{
171165
Ready(Result<types::BuiltPayload<P>, Arc<PayloadBuilderError>>),
172-
Future(Shared<PipelineExecutor<P, Provider, Pool>>),
166+
Future(Shared<PipelineExecutor<P, Provider>>),
173167
}
174168

175-
impl<P, Provider, Pool> ExecutorFuture<P, Provider, Pool>
169+
impl<P, Provider> ExecutorFuture<P, Provider>
176170
where
177171
P: Platform,
178-
Pool: traits::PoolBounds<P>,
179172
Provider: traits::ProviderBounds<P>,
180173
{
181-
pub fn new(executor: PipelineExecutor<P, Provider, Pool>) -> Self {
174+
pub fn new(executor: PipelineExecutor<P, Provider>) -> Self {
182175
Self {
183176
payload_id: executor.payload_id(),
184177
state: ExecutorFutureState::Future(executor.shared()),
185178
}
186179
}
187180
}
188181

189-
impl<P, Provider, Pool> Future for ExecutorFuture<P, Provider, Pool>
182+
impl<P, Provider> Future for ExecutorFuture<P, Provider>
190183
where
191184
P: Platform,
192185
Provider: traits::ProviderBounds<P>,
193-
Pool: traits::PoolBounds<P>,
194186
{
195187
type Output = Result<types::BuiltPayload<P>, PayloadBuilderError>;
196188

@@ -230,11 +222,10 @@ where
230222
/// We want this to be clonable because the `resolve_kind` method could
231223
/// potentially return multiple copies of the future, and we want all of them to
232224
/// resolve with the same result at the same time.
233-
impl<P, Provider, Pool> Clone for ExecutorFuture<P, Provider, Pool>
225+
impl<P, Provider> Clone for ExecutorFuture<P, Provider>
234226
where
235227
P: Platform,
236228
Provider: traits::ProviderBounds<P>,
237-
Pool: traits::PoolBounds<P>,
238229
{
239230
fn clone(&self) -> Self {
240231
Self {

src/pipelines/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@ use {
1010
futures::Stream,
1111
pipelines_macros::impl_into_pipeline_steps,
1212
std::sync::Arc,
13-
step::instance::StepInstance,
13+
step::StepInstance,
1414
};
1515

16-
mod context;
1716
mod events;
1817
mod exec;
1918
mod iter;
@@ -28,9 +27,8 @@ mod tests;
2827
// public API exports
2928
pub use {
3029
Behavior::{Loop, Once},
31-
context::StepContext,
3230
events::system_events::*,
33-
step::{ControlFlow, InitContext, PayloadBuilderError, Step},
31+
step::{ControlFlow, InitContext, PayloadBuilderError, Step, StepContext},
3432
};
3533

3634
#[derive(Debug, Clone, Copy, PartialEq, Eq)]

0 commit comments

Comments
 (0)