Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ members = [
"processes/replayer", # All-inclusive process to replay messages
"processes/golden_tests", # All-inclusive golden tests process
"processes/tx_submitter_cli", # CLI wrapper for TX submitter
"processes/indexer", # Minimal example indexer
]
resolver = "2"

Expand Down
6 changes: 6 additions & 0 deletions common/src/commands/chain_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use crate::{BlockHash, Slot};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum ChainSyncCommand {
FindIntersect { slot: Slot, hash: BlockHash },
}
1 change: 1 addition & 0 deletions common/src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod chain_sync;
pub mod transactions;
2 changes: 2 additions & 0 deletions common/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// We don't use these messages in the acropolis_common crate itself
#![allow(dead_code)]

use crate::commands::chain_sync::ChainSyncCommand;
use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse};
use crate::genesis_values::GenesisValues;
use crate::ledger_state::SPOState;
Expand Down Expand Up @@ -453,6 +454,7 @@ pub enum StateQueryResponse {
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum Command {
Transactions(TransactionsCommand),
ChainSync(ChainSyncCommand),
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
Expand Down
22 changes: 22 additions & 0 deletions modules/indexer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Acropolis indexer module

[package]
name = "acropolis_module_indexer"
version = "0.1.0"
edition = "2021"
authors = ["William Hankins <william@sundae.fi>"]
description = "Core indexer logic"
license = "Apache-2.0"

[dependencies]
acropolis_common = { path = "../../common" }

caryatid_sdk = { workspace = true }

anyhow = { workspace = true }
config = { workspace = true }
serde = { workspace = true, features = ["rc"] }
tracing = { workspace = true }

[lib]
path = "src/indexer.rs"
2 changes: 2 additions & 0 deletions modules/indexer/config.default.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# The topic to publish sync commands on
sync-command-topic = "cardano.sync.command"
21 changes: 21 additions & 0 deletions modules/indexer/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use anyhow::Result;
use config::Config;

#[derive(serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct IndexerConfig {
pub sync_command_topic: String,
}

impl IndexerConfig {
pub fn try_load(config: &Config) -> Result<Self> {
let full_config = Config::builder()
.add_source(config::File::from_str(
include_str!("../config.default.toml"),
config::FileFormat::Toml,
))
.add_source(config.clone())
.build()?;
Ok(full_config.try_deserialize()?)
}
}
67 changes: 67 additions & 0 deletions modules/indexer/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Acropolis indexer module for Caryatid
mod configuration;

use acropolis_common::{
commands::chain_sync::ChainSyncCommand,
hash::Hash,
messages::{Command, Message},
};
use anyhow::Result;
use caryatid_sdk::{module, Context};
use config::Config;
use std::{str::FromStr, sync::Arc};
use tracing::info;

use crate::configuration::IndexerConfig;

/// Indexer module
#[module(
message_type(Message),
name = "indexer",
description = "Core indexer module for indexer process"
)]
pub struct Indexer;

impl Indexer {
/// Async initialisation
pub async fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> {
let cfg = IndexerConfig::try_load(&config)?;
info!(
"Creating sync command publisher on '{}'",
cfg.sync_command_topic
);

let ctx = context.clone();

// This is a placeholder to test dynamic sync
context.run(async move {
let example = ChainSyncCommand::FindIntersect {
slot: 4492799,
hash: Hash::from_str(
"f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457",
)
.expect("Valid hash"),
};

// Initial sync message (This will be read from config for first sync and from DB on subsequent runs)
ctx.message_bus
.publish(
&cfg.sync_command_topic,
Arc::new(Message::Command(Command::ChainSync(example.clone()))),
)
.await
.unwrap();

// Simulate a later sync command to reset sync point to where we started

ctx.message_bus
.publish(
&cfg.sync_command_topic,
Arc::new(Message::Command(Command::ChainSync(example))),
)
.await
.unwrap();
});
Ok(())
}
}
3 changes: 3 additions & 0 deletions modules/peer_network_interface/config.default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ block-topic = "cardano.block.available"
snapshot-completion-topic = "cardano.snapshot.complete"
# The topic to wait for when listening for genesis values from another module
genesis-completion-topic = "cardano.sequence.bootstrapped"
# The topic to listen on for runtime sync commands
sync-command-topic = "cardano.sync.command"

# Upstream node connections
node-addresses = [
Expand All @@ -19,6 +21,7 @@ magic-number = 764824073
# - "tip": sync from the very end of the chain
# - "cache": replay messages from a local filesystem cache, then sync from the point right after that cache.
# - "snapshot": wait for another module to restore from a snapshot, then sync from the point right after that snapshot.
# - "dynamic": awaits a sync command to begin fetching blocks, can change sync point at runtime.
sync-point = "snapshot"
# The cache dir to use when sync-point is "cache"
cache-dir = "upstream-cache"
2 changes: 2 additions & 0 deletions modules/peer_network_interface/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub enum SyncPoint {
Tip,
Cache,
Snapshot,
Dynamic,
}

#[derive(serde::Deserialize)]
Expand All @@ -20,6 +21,7 @@ pub struct InterfaceConfig {
pub sync_point: SyncPoint,
pub snapshot_completion_topic: String,
pub genesis_completion_topic: String,
pub sync_command_topic: String,
pub node_addresses: Vec<String>,
pub magic_number: u64,
pub cache_dir: PathBuf,
Expand Down
32 changes: 27 additions & 5 deletions modules/peer_network_interface/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,35 @@ impl NetworkManager {

pub async fn run(mut self) -> Result<()> {
while let Some(event) = self.events.recv().await {
match event {
NetworkEvent::PeerUpdate { peer, event } => {
self.handle_peer_update(peer, event);
self.publish_blocks().await?;
self.on_network_event(event).await?;
}

Ok(())
}

async fn on_network_event(&mut self, event: NetworkEvent) -> Result<()> {
match event {
NetworkEvent::PeerUpdate { peer, event } => {
self.handle_peer_update(peer, event);
self.publish_blocks().await?;
}
NetworkEvent::SyncPointUpdate { point } => {
self.chain = ChainState::new();

for peer in self.peers.values_mut() {
peer.reqs.clear();
}

if let Point::Specific(slot, _) = point {
let (epoch, _) = self.block_sink.genesis_values.slot_to_epoch(slot);
self.block_sink.last_epoch = Some(epoch);
}

self.sync_to_point(point);
}
}
bail!("event sink closed")

Ok(())
}

pub fn handle_new_connection(&mut self, address: String, delay: Duration) {
Expand Down Expand Up @@ -235,6 +256,7 @@ impl NetworkManager {

pub enum NetworkEvent {
PeerUpdate { peer: PeerId, event: PeerEvent },
SyncPointUpdate { point: Point },
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
Expand Down
46 changes: 39 additions & 7 deletions modules/peer_network_interface/src/peer_network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ mod network;

use acropolis_common::{
BlockInfo, BlockStatus,
commands::chain_sync::ChainSyncCommand,
genesis_values::GenesisValues,
messages::{CardanoMessage, Message, RawBlockMessage},
messages::{CardanoMessage, Command, Message, RawBlockMessage},
upstream_cache::{UpstreamCache, UpstreamCacheRecord},
};
use anyhow::{Result, bail};
Expand All @@ -21,7 +22,7 @@ use std::{path::Path, sync::Arc, time::Duration};
use crate::{
configuration::{InterfaceConfig, SyncPoint},
connection::Header,
network::NetworkManager,
network::{NetworkEvent, NetworkManager},
};

#[module(
Expand All @@ -43,6 +44,7 @@ impl PeerNetworkInterface {
SyncPoint::Snapshot => Some(context.subscribe(&cfg.snapshot_completion_topic).await?),
_ => None,
};
let command_subscription = context.subscribe(&cfg.sync_command_topic).await?;

context.clone().run(async move {
let genesis_values = if let Some(mut sub) = genesis_complete {
Expand Down Expand Up @@ -82,20 +84,20 @@ impl PeerNetworkInterface {

let manager = match cfg.sync_point {
SyncPoint::Origin => {
let mut manager = Self::init_manager(cfg, sink);
let mut manager = Self::init_manager(cfg, sink, command_subscription);
manager.sync_to_point(Point::Origin);
manager
}
SyncPoint::Tip => {
let mut manager = Self::init_manager(cfg, sink);
let mut manager = Self::init_manager(cfg, sink, command_subscription);
if let Err(error) = manager.sync_to_tip().await {
warn!("could not sync to tip: {error:#}");
return;
}
manager
}
SyncPoint::Cache => {
let mut manager = Self::init_manager(cfg, sink);
let mut manager = Self::init_manager(cfg, sink, command_subscription);
manager.sync_to_point(cache_sync_point);
manager
}
Expand All @@ -108,7 +110,7 @@ impl PeerNetworkInterface {
let (epoch, _) = sink.genesis_values.slot_to_epoch(slot);
sink.last_epoch = Some(epoch);
}
let mut manager = Self::init_manager(cfg, sink);
let mut manager = Self::init_manager(cfg, sink, command_subscription);
manager.sync_to_point(point);
manager
}
Expand All @@ -118,6 +120,7 @@ impl PeerNetworkInterface {
}
}
}
SyncPoint::Dynamic => Self::init_manager(cfg, sink, command_subscription),
};

if let Err(err) = manager.run().await {
Expand All @@ -128,15 +131,44 @@ impl PeerNetworkInterface {
Ok(())
}

fn init_manager(cfg: InterfaceConfig, sink: BlockSink) -> NetworkManager {
fn init_manager(
cfg: InterfaceConfig,
sink: BlockSink,
command_subscription: Box<dyn Subscription<Message>>,
) -> NetworkManager {
let (events_sender, events) = mpsc::channel(1024);
tokio::spawn(Self::forward_commands_to_events(
command_subscription,
events_sender.clone(),
));
let mut manager = NetworkManager::new(cfg.magic_number, events, events_sender, sink);
for address in cfg.node_addresses {
manager.handle_new_connection(address, Duration::ZERO);
}
manager
}

async fn forward_commands_to_events(
mut subscription: Box<dyn Subscription<Message>>,
events_sender: mpsc::Sender<NetworkEvent>,
) -> Result<()> {
while let Ok((_, msg)) = subscription.read().await {
if let Message::Command(Command::ChainSync(ChainSyncCommand::FindIntersect {
slot,
hash,
})) = msg.as_ref()
{
let point = Point::new(*slot, hash.to_vec());

if events_sender.send(NetworkEvent::SyncPointUpdate { point }).await.is_err() {
bail!("event channel closed");
}
}
}

bail!("subscription closed");
}

async fn init_cache(
cache_dir: &Path,
block_topic: &str,
Expand Down
Loading