Skip to content

Commit a1657ec

Browse files
committed
[consensus/marshal] Sharded broadcast Application wrapper
Introduces a wrapper around the new `Application` trait that minimizes the boilerplate required to implement applications that utilize sharded broadcast. This wrapper is possible because when using erasure coding, the verify and broadcast steps are identical for all applications.
1 parent 9ba5c0e commit a1657ec

File tree

3 files changed

+230
-5
lines changed

3 files changed

+230
-5
lines changed
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
//! A wrapper around an [Application] that intercepts messages from consensus and marshal,
2+
//! hiding details of erasure coded broadcast and shard verification.
3+
4+
use crate::{
5+
marshal::{self, ingress::coding::types::CodedBlock},
6+
threshold_simplex::types::Context,
7+
types::{Round, View},
8+
Application, Automaton, Block, Epochable, Relay, Reporter, Supervisor, Viewable,
9+
};
10+
use commonware_coding::{Config as CodingConfig, Scheme};
11+
use commonware_cryptography::{bls12381::primitives::variant::Variant, Committable, PublicKey};
12+
use commonware_runtime::{Clock, Metrics, Spawner};
13+
use futures::channel::oneshot;
14+
use rand::Rng;
15+
use std::sync::{Arc, Mutex};
16+
use tracing::{debug, info, warn};
17+
18+
/// An [Application] adapter that handles erasure coding and shard verification for consensus.
19+
#[derive(Clone)]
20+
#[allow(clippy::type_complexity)]
21+
pub struct CodingAdapter<E, A, V, B, S, P, Z>
22+
where
23+
E: Rng + Spawner + Metrics + Clock,
24+
A: Application<E>,
25+
V: Variant,
26+
B: Block<Commitment = S::Commitment>,
27+
S: Scheme,
28+
P: PublicKey,
29+
Z: Supervisor<Index = View, PublicKey = P>,
30+
{
31+
context: E,
32+
application: A,
33+
marshal: marshal::Mailbox<V, B, S, P>,
34+
identity: P,
35+
supervisor: Z,
36+
last_built: Arc<Mutex<Option<(View, CodedBlock<B, S>)>>>,
37+
}
38+
39+
impl<E, A, V, B, S, P, Z> CodingAdapter<E, A, V, B, S, P, Z>
40+
where
41+
E: Rng + Spawner + Metrics + Clock,
42+
A: Application<E, Block = B, Context = Context<B::Commitment>>,
43+
V: Variant,
44+
B: Block<Commitment = S::Commitment>,
45+
S: Scheme,
46+
P: PublicKey,
47+
Z: Supervisor<Index = View, PublicKey = P>,
48+
{
49+
pub fn new(
50+
context: E,
51+
application: A,
52+
marshal: marshal::Mailbox<V, B, S, P>,
53+
identity: P,
54+
supervisor: Z,
55+
) -> Self {
56+
Self {
57+
context,
58+
application,
59+
marshal,
60+
identity,
61+
supervisor,
62+
last_built: Arc::new(Mutex::new(None)),
63+
}
64+
}
65+
}
66+
67+
impl<E, A, V, B, S, P, Z> Automaton for CodingAdapter<E, A, V, B, S, P, Z>
68+
where
69+
E: Rng + Spawner + Metrics + Clock,
70+
A: Application<E, Block = B, Context = Context<B::Commitment>>,
71+
V: Variant,
72+
B: Block<Commitment = S::Commitment>,
73+
S: Scheme,
74+
P: PublicKey,
75+
Z: Supervisor<Index = View, PublicKey = P>,
76+
{
77+
type Digest = B::Commitment;
78+
type Context = A::Context;
79+
80+
async fn genesis(&mut self, epoch: <Self::Context as Epochable>::Epoch) -> Self::Digest {
81+
self.application.genesis(epoch).await.commitment()
82+
}
83+
84+
async fn propose(&mut self, context: Context<Self::Digest>) -> oneshot::Receiver<Self::Digest> {
85+
let (parent_view, parent_commitment) = context.parent;
86+
let genesis = self.application.genesis(context.epoch()).await;
87+
let mut marshal = self.marshal.clone();
88+
let mut application = self.application.clone();
89+
let last_built = self.last_built.clone();
90+
91+
let participants = self
92+
.supervisor
93+
.participants(context.view())
94+
.expect("failed to get participants for round");
95+
96+
// Compute the coding configuration from the number of participants.
97+
//
98+
// Currently, `CodingAdapter` mandates the use of `threshold_simplex`,
99+
// which requires at least `3f + 1` participants to tolerate `f` faults.
100+
let n_participants = participants.len() as u16;
101+
assert!(
102+
n_participants >= 4,
103+
"Need at least 4 participants to maintain fault tolerance with threshold_simplex"
104+
);
105+
let max_faults = ((n_participants - 1) / 3) as u16;
106+
let coding_config = CodingConfig {
107+
minimum_shards: max_faults + 1,
108+
extra_shards: n_participants - (max_faults + 1),
109+
};
110+
111+
let (tx, rx) = oneshot::channel();
112+
self.context
113+
.with_label("propose")
114+
.spawn(move |r_ctx| async move {
115+
let parent_block = if parent_commitment == genesis.commitment() {
116+
genesis
117+
} else {
118+
let block_request = marshal
119+
.subscribe(
120+
Some(Round::new(context.epoch(), parent_view)),
121+
parent_commitment,
122+
)
123+
.await
124+
.await;
125+
126+
if let Ok(block) = block_request {
127+
block
128+
} else {
129+
warn!("propose job aborted");
130+
return;
131+
}
132+
};
133+
134+
let built_block = application
135+
.build(r_ctx.with_label("build"), parent_commitment, parent_block)
136+
.await;
137+
let coded_block = CodedBlock::new(built_block, coding_config);
138+
let commitment = coded_block.commitment();
139+
140+
// Update the latest built block.
141+
let mut lock = last_built.lock().expect("failed to lock last_built mutex");
142+
*lock = Some((context.view(), coded_block));
143+
144+
let result = tx.send(commitment);
145+
info!(
146+
round = %context.round,
147+
?commitment,
148+
success = result.is_ok(),
149+
"proposed new block"
150+
);
151+
});
152+
rx
153+
}
154+
155+
async fn verify(
156+
&mut self,
157+
context: Context<Self::Digest>,
158+
payload: Self::Digest,
159+
) -> oneshot::Receiver<bool> {
160+
let mut marshal = self.marshal.clone();
161+
let self_index = self
162+
.supervisor
163+
.is_participant(context.view(), &self.identity)
164+
.expect("failed to get self index among participants");
165+
166+
#[allow(clippy::async_yields_async)]
167+
self.context
168+
.with_label("verify")
169+
.spawn(move |_| async move { marshal.verify_shard(payload, self_index as usize).await })
170+
.await
171+
.expect("failed to spawn verify task")
172+
}
173+
}
174+
175+
impl<E, A, V, B, S, P, Z> Relay for CodingAdapter<E, A, V, B, S, P, Z>
176+
where
177+
E: Rng + Spawner + Metrics + Clock,
178+
A: Application<E, Block = B, Context = Context<B::Commitment>>,
179+
V: Variant,
180+
B: Block<Commitment = S::Commitment>,
181+
S: Scheme,
182+
P: PublicKey,
183+
Z: Supervisor<Index = View, PublicKey = P>,
184+
{
185+
type Digest = B::Commitment;
186+
187+
async fn broadcast(&mut self, _commitment: Self::Digest) {
188+
let Some((round, block)) = self.last_built.lock().unwrap().clone() else {
189+
warn!("missing block to broadcast");
190+
return;
191+
};
192+
193+
let participants = self
194+
.supervisor
195+
.participants(round)
196+
.cloned()
197+
.expect("failed to get participants for round");
198+
199+
debug!(
200+
round = %round,
201+
commitment = %block.commitment(),
202+
height = block.height(),
203+
"requested broadcast of built block"
204+
);
205+
self.marshal.broadcast(block, participants).await;
206+
}
207+
}
208+
209+
impl<E, A, V, B, S, P, Z> Reporter for CodingAdapter<E, A, V, B, S, P, Z>
210+
where
211+
E: Rng + Spawner + Metrics + Clock,
212+
A: Application<E, Block = B, Context = Context<B::Commitment>>,
213+
V: Variant,
214+
B: Block<Commitment = S::Commitment>,
215+
S: Scheme,
216+
P: PublicKey,
217+
Z: Supervisor<Index = View, PublicKey = P>,
218+
{
219+
type Activity = B;
220+
221+
async fn report(&mut self, block: Self::Activity) {
222+
self.application.finalize(block).await
223+
}
224+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
pub mod application;
12
pub mod mailbox;
23
pub mod types;

consensus/src/marshal/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ mod tests {
389389
setup_network_links(&mut oracle, &peers, link.clone()).await;
390390

391391
let coding_config = commonware_coding::Config {
392-
minimum_shards: (peers.len() / 2) as u16,
392+
minimum_shards: peers.len().div_ceil(2) as u16,
393393
extra_shards: (peers.len() / 2) as u16,
394394
};
395395

@@ -512,7 +512,7 @@ mod tests {
512512
setup_network_links(&mut oracle, &peers, LINK).await;
513513

514514
let coding_config = commonware_coding::Config {
515-
minimum_shards: (peers.len() / 2) as u16,
515+
minimum_shards: peers.len().div_ceil(2) as u16,
516516
extra_shards: (peers.len() / 2) as u16,
517517
};
518518

@@ -576,7 +576,7 @@ mod tests {
576576
setup_network_links(&mut oracle, &peers, LINK).await;
577577

578578
let coding_config = commonware_coding::Config {
579-
minimum_shards: (peers.len() / 2) as u16,
579+
minimum_shards: peers.len().div_ceil(2) as u16,
580580
extra_shards: (peers.len() / 2) as u16,
581581
};
582582

@@ -661,7 +661,7 @@ mod tests {
661661
setup_network_links(&mut oracle, &peers, LINK).await;
662662

663663
let coding_config = commonware_coding::Config {
664-
minimum_shards: (peers.len() / 2) as u16,
664+
minimum_shards: peers.len().div_ceil(2) as u16,
665665
extra_shards: (peers.len() / 2) as u16,
666666
};
667667

@@ -738,7 +738,7 @@ mod tests {
738738
setup_network_links(&mut oracle, &peers, LINK).await;
739739

740740
let coding_config = commonware_coding::Config {
741-
minimum_shards: (peers.len() / 2) as u16,
741+
minimum_shards: peers.len().div_ceil(2) as u16,
742742
extra_shards: (peers.len() / 2) as u16,
743743
};
744744

0 commit comments

Comments
 (0)