Skip to content

Commit 5f1092c

Browse files
authored
Merge pull request #394 from input-output-hk/whankinsiv/peer-network-dynamic-sync
feat: Add dynamic sync mode to peer_network_interface
2 parents c54e5ac + 725a021 commit 5f1092c

File tree

18 files changed

+384
-13
lines changed

18 files changed

+384
-13
lines changed

Cargo.lock

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ members = [
3535
"processes/replayer", # All-inclusive process to replay messages
3636
"processes/golden_tests", # All-inclusive golden tests process
3737
"processes/tx_submitter_cli", # CLI wrapper for TX submitter
38+
"processes/indexer", # Minimal example indexer
3839
]
3940
resolver = "2"
4041

common/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ regex = "1"
2929
serde = { workspace = true, features = ["rc"] }
3030
serde_json = { workspace = true }
3131
serde_with = { workspace = true, features = ["base64"] }
32-
tempfile = "3"
3332
tokio = { workspace = true }
3433
tracing = { workspace = true }
3534
futures = "0.3.31"
@@ -40,8 +39,11 @@ rayon = "1.11.0"
4039
cryptoxide = "0.5.1"
4140
thiserror = "2.0.17"
4241
sha2 = "0.10.8"
42+
43+
[dev-dependencies]
4344
caryatid_process = { workspace = true }
4445
config = { workspace = true }
46+
tempfile = "3"
4547

4648
[lib]
4749
crate-type = ["rlib"]

common/src/commands/chain_sync.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
use crate::{BlockHash, Slot};
2+
3+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4+
pub enum ChainSyncCommand {
5+
FindIntersect { slot: Slot, hash: BlockHash },
6+
}

common/src/commands/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
pub mod chain_sync;
12
pub mod transactions;

common/src/messages.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// We don't use these messages in the acropolis_common crate itself
44
#![allow(dead_code)]
55

6+
use crate::commands::chain_sync::ChainSyncCommand;
67
use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse};
78
use crate::genesis_values::GenesisValues;
89
use crate::ledger_state::SPOState;
@@ -453,6 +454,7 @@ pub enum StateQueryResponse {
453454
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
454455
pub enum Command {
455456
Transactions(TransactionsCommand),
457+
ChainSync(ChainSyncCommand),
456458
}
457459

458460
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]

modules/indexer/Cargo.toml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Acropolis indexer module
2+
3+
[package]
4+
name = "acropolis_module_indexer"
5+
version = "0.1.0"
6+
edition = "2021"
7+
authors = ["William Hankins <william@sundae.fi>"]
8+
description = "Core indexer logic"
9+
license = "Apache-2.0"
10+
11+
[dependencies]
12+
acropolis_common = { path = "../../common" }
13+
14+
caryatid_sdk = { workspace = true }
15+
16+
anyhow = { workspace = true }
17+
config = { workspace = true }
18+
serde = { workspace = true, features = ["rc"] }
19+
tracing = { workspace = true }
20+
21+
[lib]
22+
path = "src/indexer.rs"
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# The topic to publish sync commands on
2+
sync-command-topic = "cardano.sync.command"
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use anyhow::Result;
2+
use config::Config;
3+
4+
#[derive(serde::Deserialize)]
5+
#[serde(rename_all = "kebab-case")]
6+
pub struct IndexerConfig {
7+
pub sync_command_topic: String,
8+
}
9+
10+
impl IndexerConfig {
11+
pub fn try_load(config: &Config) -> Result<Self> {
12+
let full_config = Config::builder()
13+
.add_source(config::File::from_str(
14+
include_str!("../config.default.toml"),
15+
config::FileFormat::Toml,
16+
))
17+
.add_source(config.clone())
18+
.build()?;
19+
Ok(full_config.try_deserialize()?)
20+
}
21+
}

modules/indexer/src/indexer.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
//! Acropolis indexer module for Caryatid
2+
mod configuration;
3+
4+
use acropolis_common::{
5+
commands::chain_sync::ChainSyncCommand,
6+
hash::Hash,
7+
messages::{Command, Message},
8+
};
9+
use anyhow::Result;
10+
use caryatid_sdk::{module, Context};
11+
use config::Config;
12+
use std::{str::FromStr, sync::Arc};
13+
use tracing::info;
14+
15+
use crate::configuration::IndexerConfig;
16+
17+
/// Indexer module
18+
#[module(
19+
message_type(Message),
20+
name = "indexer",
21+
description = "Core indexer module for indexer process"
22+
)]
23+
pub struct Indexer;
24+
25+
impl Indexer {
26+
/// Async initialisation
27+
pub async fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> {
28+
let cfg = IndexerConfig::try_load(&config)?;
29+
info!(
30+
"Creating sync command publisher on '{}'",
31+
cfg.sync_command_topic
32+
);
33+
34+
let ctx = context.clone();
35+
36+
// This is a placeholder to test dynamic sync
37+
context.run(async move {
38+
let example = ChainSyncCommand::FindIntersect {
39+
slot: 4492799,
40+
hash: Hash::from_str(
41+
"f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457",
42+
)
43+
.expect("Valid hash"),
44+
};
45+
46+
// Initial sync message (This will be read from config for first sync and from DB on subsequent runs)
47+
ctx.message_bus
48+
.publish(
49+
&cfg.sync_command_topic,
50+
Arc::new(Message::Command(Command::ChainSync(example.clone()))),
51+
)
52+
.await
53+
.unwrap();
54+
55+
// Simulate a later sync command to reset sync point to where we started
56+
57+
ctx.message_bus
58+
.publish(
59+
&cfg.sync_command_topic,
60+
Arc::new(Message::Command(Command::ChainSync(example))),
61+
)
62+
.await
63+
.unwrap();
64+
});
65+
Ok(())
66+
}
67+
}

0 commit comments

Comments
 (0)