Skip to content

feat: implement fake DMQ node #2635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 31 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
931384e
refactor(dmq): move 'publisher' in 'client' sub-module
jpraynaud Jul 7, 2025
f84479c
refactor(dmq): move 'consumer' in 'client' sub-module
jpraynaud Jul 7, 2025
afd839a
refactor(dmq): rename 'DmqConsumerPallas' to 'DmqConsumerClientPallas'
jpraynaud Jul 7, 2025
ad49138
refactor(dmq): rename 'DmqPublisherPallas' to 'DmqPublisherClientPallas'
jpraynaud Jul 7, 2025
bbaf832
refactor(dmq): promote 'message' module to directory
jpraynaud Jul 8, 2025
e2ef2a7
feat(dmq): add 'DmqMessage' type to wrap a 'DmqMsg'
jpraynaud Jul 8, 2025
7d02662
feat(dmq): add 'DmqPublisherServer' trait
jpraynaud Jul 8, 2025
c354c89
feat(dmq): add 'DmqPublisherServerPallas' implementation of 'DmqPubli…
jpraynaud Jul 8, 2025
97ee0f4
feat(dmq): add 'DmqConsumerServer' trait
jpraynaud Jul 8, 2025
b6768a4
feat(dmq): add 'MessageQueue' implementation for the consumer server
jpraynaud Jul 8, 2025
1776fdf
feat(relay): add support for DMQ messages
jpraynaud Jul 8, 2025
11e99f5
feat(relay): update passive relay for DMQ messages
jpraynaud Jul 8, 2025
d26ada6
feat(relay): update signer relay for DMQ messages
jpraynaud Jul 8, 2025
c378dcb
feat(relay): update signer command for DMQ messages
jpraynaud Jul 8, 2025
f0aed24
feat(relay): update aggregator relay for DMQ messages
jpraynaud Jul 8, 2025
42f198a
feat(relay): update aggregator command for DMQ messages
jpraynaud Jul 8, 2025
d1cfd60
feat(relay): update integration test for DMQ messages
jpraynaud Jul 8, 2025
623bb43
feat(dmq): export DMQ servers for publisher and consumer
jpraynaud Jul 8, 2025
e54ce22
feat(relay): use binary encoding for exchanging messages in P2P pubsu…
jpraynaud Jul 21, 2025
59b0115
fix(dmq): activate Pallas server sides for Unix only
jpraynaud Jul 15, 2025
624e5ed
fix(dmq): add missing 'kes_period' in 'DmqMsg'
jpraynaud Jul 15, 2025
56ac1b3
fix(dmq): gate code behind 'future_dmq' feature
jpraynaud Jul 15, 2025
4a5f54f
fix(dmq): missing wait for Done message in publisher server state mac…
jpraynaud Jul 22, 2025
f4f5bbe
fix(relay): fix clippy warning
jpraynaud Jul 16, 2025
529a6f3
feat(infra): support for DMQ protocol in aggregator infrastructure
jpraynaud Jul 16, 2025
1b58d52
feat(infra): support for DMQ protocol in signer infrastructure
jpraynaud Jul 16, 2025
c557a10
feat(ci): support for using DMQ in infrastucture deployment
jpraynaud Jul 16, 2025
c166630
wip(e2e): support for fake DMQ node
jpraynaud Jul 8, 2025
0b5a048
wip: DO NOT MERGE - activate 'future_dmq'
jpraynaud Jul 15, 2025
142b703
wip: DO NOT MERGE - activate 'allow_skip_signer_certification'
jpraynaud Jul 16, 2025
57452a5
wip: DO MOT MERGE - skip KES signature in publisher
jpraynaud Jul 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ inputs:
description: Mithril use P2P network (experimental, for test only).
required: false
default: "false"
mithril_p2p_use_dmq_protocol:
description: Mithril P2P network use DMQ protocol (experimental, for test only).
required: false
default: "false"
mithril_p2p_network_bootstrap_peer:
description: Mithril P2P network bootstrap peer (experimental, for test only).
required: false
Expand Down Expand Up @@ -247,6 +251,7 @@ runs:
google_compute_instance_ssh_keys_environment = "${{ inputs.google_compute_instance_ssh_keys_environment }}"
google_service_credentials_json_file = "./google-application-credentials.json"
mithril_use_p2p_network = "${{ inputs.mithril_use_p2p_network }}"
mithril_p2p_use_dmq_protocol = "${{ inputs.mithril_p2p_use_dmq_protocol }}"
mithril_p2p_network_bootstrap_peer = "${{ inputs.mithril_p2p_network_bootstrap_peer }}"
mithril_p2p_signer_relay_signer_registration_mode = "${{ inputs.mithril_p2p_signer_relay_signer_registration_mode }}"
mithril_p2p_signer_relay_signature_registration_mode = "${{ inputs.mithril_p2p_signer_relay_signature_registration_mode }}"
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/test-deploy-network.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
environment_prefix: dev
cardano_network: preview
mithril_use_p2p_network: true
mithril_p2p_use_dmq_protocol: true
mithril_p2p_signer_relay_signer_registration_mode: passthrough
mithril_p2p_signer_relay_signature_registration_mode: p2p
mithril_api_domain: api.mithril.network
Expand Down Expand Up @@ -72,6 +73,7 @@ jobs:
environment_prefix: dev-follower
cardano_network: preview
mithril_use_p2p_network: true
mithril_p2p_use_dmq_protocol: true
mithril_p2p_network_bootstrap_peer: "/dns4/aggregator.dev-preview.api.mithril.network/tcp/6060"
mithril_p2p_signer_relay_signer_registration_mode: passthrough
mithril_p2p_signer_relay_signature_registration_mode: p2p
Expand Down Expand Up @@ -103,6 +105,7 @@ jobs:
environment_prefix: dev
cardano_network: mainnet
mithril_use_p2p_network: false
mithril_p2p_use_dmq_protocol: true
mithril_api_domain: api.mithril.network
mithril_era_reader_adapter_type: bootstrap
mithril_protocol_parameters: |
Expand Down Expand Up @@ -160,6 +163,7 @@ jobs:
google_compute_instance_ssh_keys_environment: testing
google_application_credentials: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }}
mithril_use_p2p_network: ${{ matrix.mithril_use_p2p_network }}
mithril_p2p_use_dmq_protocol: ${{ matrix.mithril_p2p_use_dmq_protocol }}
mithril_p2p_network_bootstrap_peer: ${{ matrix.mithril_p2p_network_bootstrap_peer }}
mithril_api_domain: ${{ matrix.mithril_api_domain }}
mithril_image_id: ${{ inputs.mithril_image_id }}
Expand Down
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions internal/mithril-dmq/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,24 @@ license.workspace = true
repository.workspace = true
include = ["**/*.rs", "Cargo.toml", "README.md", ".gitignore"]

[package.metadata.cargo-machete]
# `serde_bytes` is used for DmqMessage serialization
ignored = ["serde_bytes"]

[lib]
crate-type = ["lib", "cdylib", "staticlib"]

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
bincode = { version = "2.0.1" }
blake2 = "0.10.6"
mithril-cardano-node-chain = { path = "../cardano-node/mithril-cardano-node-chain" }
mithril-common = { path = "../../mithril-common" }
pallas-network = { git = "https://github.com/txpipe/pallas.git", branch = "main" }
pallas-codec = { git = "https://github.com/txpipe/pallas.git", branch = "main" }
serde = { workspace = true }
serde_bytes = "0.11.17"
slog = { workspace = true }
tokio = { workspace = true, features = ["sync"] }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::fmt::Debug;

use mithril_common::{StdResult, crypto_helper::TryFromBytes, entities::PartyId};

/// Trait for consuming messages from a DMQ node.
/// Trait for the client side of consuming messages from a DMQ node.
#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait DmqConsumer<M: TryFromBytes + Debug + Send + Sync>: Send + Sync {
pub trait DmqConsumerClient<M: TryFromBytes + Debug + Send + Sync>: Send + Sync {
/// Consume messages from the DMQ node.
async fn consume_messages(&self) -> StdResult<Vec<(M, PartyId)>>;
}
5 changes: 5 additions & 0 deletions internal/mithril-dmq/src/consumer/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod interface;
mod pallas;

pub use interface::*;
pub use pallas::*;
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@ use mithril_common::{
logging::LoggerExtensions,
};

use crate::DmqConsumer;
use crate::DmqConsumerClient;

/// A DMQ consumer implementation.
/// A DMQ client consumer implementation.
///
/// This implementation is built upon the n2c mini-protocols DMQ implementation in Pallas.
pub struct DmqConsumerPallas<M: TryFromBytes + Debug> {
pub struct DmqConsumerClientPallas<M: TryFromBytes + Debug> {
socket: PathBuf,
network: CardanoNetwork,
client: Mutex<Option<DmqClient>>,
logger: Logger,
phantom: PhantomData<M>,
}

impl<M: TryFromBytes + Debug> DmqConsumerPallas<M> {
/// Creates a new `DmqConsumerPallas` instance.
impl<M: TryFromBytes + Debug> DmqConsumerClientPallas<M> {
/// Creates a new `DmqConsumerClientPallas` instance.
pub fn new(socket: PathBuf, network: CardanoNetwork, logger: Logger) -> Self {
Self {
socket,
Expand All @@ -47,7 +47,7 @@ impl<M: TryFromBytes + Debug> DmqConsumerPallas<M> {
);
DmqClient::connect(&self.socket, self.network.magic_id())
.await
.with_context(|| "DmqConsumerPallas failed to create a new client")
.with_context(|| "DmqConsumerClientPallas failed to create a new client")
}

/// Gets the cached `DmqClient`, creating a new one if it does not exist.
Expand Down Expand Up @@ -128,7 +128,7 @@ impl<M: TryFromBytes + Debug> DmqConsumerPallas<M> {
}

#[async_trait::async_trait]
impl<M: TryFromBytes + Debug + Sync + Send> DmqConsumer<M> for DmqConsumerPallas<M> {
impl<M: TryFromBytes + Debug + Sync + Send> DmqConsumerClient<M> for DmqConsumerClientPallas<M> {
async fn consume_messages(&self) -> StdResult<Vec<(M, PartyId)>> {
let messages = self.consume_messages_internal().await;
if messages.is_err() {
Expand Down Expand Up @@ -247,7 +247,7 @@ mod tests {
let reply_messages = fake_msgs();
let server = setup_dmq_server(socket_path.clone(), reply_messages);
let client = tokio::spawn(async move {
let consumer = DmqConsumerPallas::new(
let consumer = DmqConsumerClientPallas::new(
socket_path,
CardanoNetwork::TestNet(0),
TestLogger::stdout(),
Expand Down Expand Up @@ -280,7 +280,7 @@ mod tests {
let reply_messages = vec![];
let server = setup_dmq_server(socket_path.clone(), reply_messages);
let client = tokio::spawn(async move {
let consumer = DmqConsumerPallas::<DmqMessageTestPayload>::new(
let consumer = DmqConsumerClientPallas::<DmqMessageTestPayload>::new(
socket_path,
CardanoNetwork::TestNet(0),
TestLogger::stdout(),
Expand All @@ -304,7 +304,7 @@ mod tests {
let reply_messages = fake_msgs();
let server = setup_dmq_server(socket_path.clone(), reply_messages);
let client = tokio::spawn(async move {
let consumer = DmqConsumerPallas::<DmqMessageTestPayload>::new(
let consumer = DmqConsumerClientPallas::<DmqMessageTestPayload>::new(
socket_path,
CardanoNetwork::TestNet(0),
TestLogger::stdout(),
Expand Down
8 changes: 4 additions & 4 deletions internal/mithril-dmq/src/consumer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod interface;
mod pallas;
mod client;
mod server;

pub use interface::*;
pub use pallas::*;
pub use client::*;
pub use server::*;
12 changes: 12 additions & 0 deletions internal/mithril-dmq/src/consumer/server/interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use mithril_common::StdResult;

/// Trait for the server side of consuming messages from a DMQ node.
#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait DmqConsumerServer: Send + Sync {
/// Processes the next message received from the DMQ network.
async fn process_message(&self) -> StdResult<()>;

/// Runs the DMQ publisher server.
async fn run(&self) -> StdResult<()>;
}
9 changes: 9 additions & 0 deletions internal/mithril-dmq/src/consumer/server/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
mod interface;
#[cfg(unix)]
mod pallas;
mod queue;

pub use interface::*;

#[cfg(unix)]
pub use pallas::*;
Loading
Loading