diff --git a/.config/dictionaries/project.dic b/.config/dictionaries/project.dic index ece094a570..6f45bf29a5 100644 --- a/.config/dictionaries/project.dic +++ b/.config/dictionaries/project.dic @@ -12,10 +12,12 @@ asyncio Attributes auditability Auliffe +autonat auxdata babystep backpressure bech +Behaviour bimap bindgen bkioshn @@ -48,6 +50,7 @@ ciphertexts Coap codegen codepoints +connexa coti coverallsapp cpus @@ -62,6 +65,7 @@ Datelike DBSTATUS dbsync dcbor +dcutr decompressor delegators devnet @@ -141,6 +145,7 @@ jormungandr Jörmungandr jsonschema Justfile +keypair kiduri labelloc lcov @@ -245,6 +250,7 @@ pytype qpsg qrcode quic +rafal rankdir rapidoc readlinkat diff --git a/rust/c509-certificate/Cargo.toml b/rust/c509-certificate/Cargo.toml index 01b28a49d0..a1249827cd 100644 --- a/rust/c509-certificate/Cargo.toml +++ b/rust/c509-certificate/Cargo.toml @@ -23,7 +23,7 @@ hex = "0.4.3" oid = "0.2.1" oid-registry = "0.7.1" asn1-rs = "0.6.2" -anyhow = "1.0.95" +anyhow = "1.0.99" bimap = "0.6.3" strum = "0.26.3" strum_macros = "0.26.4" diff --git a/rust/hermes-ipfs/Cargo.toml b/rust/hermes-ipfs/Cargo.toml index 106e4c73aa..4f32bd8bf1 100644 --- a/rust/hermes-ipfs/Cargo.toml +++ b/rust/hermes-ipfs/Cargo.toml @@ -13,13 +13,16 @@ repository.workspace = true workspace = true [dependencies] -anyhow = "1.0.95" +anyhow = "1.0.100" derive_more = {version = "2.0.1", features = ["from","into","display"] } ipld-core = { version = "0.4.1", features = ["serde"]} # A fork of crates-io version with updated dependencies (`libp2p` and `ring` in particular). # rev-26b99cb as at July 30, 2025 -rust-ipfs = { version = "0.15.0", git = "https://github.com/dariusc93/rust-ipfs", rev = "26b99cb" } +rust-ipfs = { version = "0.15.0", git = "https://github.com/dariusc93/rust-ipfs", rev = "0a6269e05a4ccfaa547a47a56f92171e4abc0564" } tokio = "1.46.0" +futures = "0.3.31" +libp2p = "0.56.0" +connexa = { version = "0.4.1", features = ["identify", "dcutr", "gossipsub", "autonat", "relay", "kad", "keypair_base64_encoding", "ping", "request-response", "request-response-misc", "rendezvous", "mdns"] } [dev-dependencies] # Dependencies used by examples diff --git a/rust/hermes-ipfs/examples/hermes-ipfs-cli.rs b/rust/hermes-ipfs/examples/hermes-ipfs-cli.rs index f8926792db..0bc0f81aa1 100644 --- a/rust/hermes-ipfs/examples/hermes-ipfs-cli.rs +++ b/rust/hermes-ipfs/examples/hermes-ipfs-cli.rs @@ -1,7 +1,8 @@ //! Hermes IPFS VFS compatibility use clap::{Parser, Subcommand}; -use hermes_ipfs::{HermesIpfs, IpfsBuilder}; +use connexa::dummy; +use hermes_ipfs::{HermesIpfs, HermesIpfsBuilder}; use lipsum::lipsum; use rust_ipfs::IpfsPath; @@ -44,11 +45,11 @@ async fn main() -> anyhow::Result<()> { let args = Cli::parse(); let base_dir = dirs::data_dir().unwrap_or_else(|| std::path::PathBuf::from(".")); let ipfs_data_path = base_dir.as_path().join("hermes/ipfs"); - let builder = IpfsBuilder::new() + let builder = HermesIpfsBuilder::::new() .with_default() .set_default_listener() // TODO(saibatizoku): Re-Enable default transport config when libp2p Cert bug is fixed - .disable_tls() + //.enable_secure_websocket() .set_disk_storage(ipfs_data_path); let hermes_node: HermesIpfs = builder.start().await?.into(); match args.command { diff --git a/rust/hermes-ipfs/examples/pubsub.rs b/rust/hermes-ipfs/examples/pubsub.rs index 34d93ef843..48b296ebc1 100644 --- a/rust/hermes-ipfs/examples/pubsub.rs +++ b/rust/hermes-ipfs/examples/pubsub.rs @@ -14,8 +14,8 @@ //! * The task that reads lines from stdin and publishes them as either node. use std::io::Write; -use hermes_ipfs::{FutureExt, HermesIpfs, StreamExt, pin_mut}; -use rust_ipfs::PubsubEvent; +use futures::{FutureExt, StreamExt, pin_mut}; +use hermes_ipfs::HermesIpfs; use rustyline_async::Readline; #[allow(clippy::indexing_slicing)] @@ -57,15 +57,11 @@ async fn start_bootstrapped_nodes() -> anyhow::Result<(HermesIpfs, HermesIpfs)> /// Main function async fn main() -> anyhow::Result<()> { let topic = String::from("ipfs-chat"); - let option_topic = Option::Some(topic.clone()); // Initialize the repo and start a daemon let (hermes_a, hermes_b) = start_bootstrapped_nodes().await?; let (mut rl, mut stdout) = Readline::new(format!("{} > ", "Write message to publish"))?; - let mut event_stream = hermes_a.pubsub_events(option_topic.clone()).await?; - let mut event_stream_b = hermes_b.pubsub_events(option_topic).await?; - let stream = hermes_a.pubsub_subscribe(topic.clone()).await?; let stream_b = hermes_b.pubsub_subscribe(topic.clone()).await?; @@ -79,24 +75,12 @@ async fn main() -> anyhow::Result<()> { tokio::select! { data = stream.next() => { if let Some(msg) = data { - writeln!(stdout, "NODE A RECV: {}", String::from_utf8_lossy(&msg.data))?; + writeln!(stdout, "NODE A RECV: {:?}", &msg)?; } } data = stream_b.next() => { if let Some(msg) = data { - writeln!(stdout, "NODE B RECV: {}", String::from_utf8_lossy(&msg.data))?; - } - } - Some(event) = event_stream.next() => { - match event { - PubsubEvent::Subscribe { peer_id, topic } => writeln!(stdout, "{peer_id} subscribed to {topic:?}")?, - PubsubEvent::Unsubscribe { peer_id, topic } => writeln!(stdout, "{peer_id} unsubscribed from {topic:?}")?, - } - } - Some(event) = event_stream_b.next() => { - match event { - PubsubEvent::Subscribe { peer_id , topic} => writeln!(stdout, "{peer_id} subscribed to {topic:?}")?, - PubsubEvent::Unsubscribe { peer_id, topic } => writeln!(stdout, "{peer_id} unsubscribed from {topic:?}")?, + writeln!(stdout, "NODE B RECV: {:?}", &msg)?; } } line = rl.readline().fuse() => match line { diff --git a/rust/hermes-ipfs/examples/put-get-dht.rs b/rust/hermes-ipfs/examples/put-get-dht.rs index 3cfacc9f5b..1af79d9c0a 100644 --- a/rust/hermes-ipfs/examples/put-get-dht.rs +++ b/rust/hermes-ipfs/examples/put-get-dht.rs @@ -48,7 +48,8 @@ async fn main() -> anyhow::Result<()> { println!("* Hermes IPFS node A is publishing 'my_key' to DHT."); hermes_ipfs_a.dht_put(b"my_key", ipfs_file).await?; println!("* Hermes IPFS node B is getting 'my_key' from DHT."); - let data_retrieved = hermes_ipfs_b.dht_get(b"my_key").await?; + let key: Vec = "my_key".bytes().collect(); + let data_retrieved = hermes_ipfs_b.dht_get(key).await?; let data = String::from_utf8(data_retrieved)?; println!(" Got data: {data:?}"); // Stop the nodes and exit. diff --git a/rust/hermes-ipfs/src/lib.rs b/rust/hermes-ipfs/src/lib.rs index 64ab8edf5a..2a80db0a3f 100644 --- a/rust/hermes-ipfs/src/lib.rs +++ b/rust/hermes-ipfs/src/lib.rs @@ -2,13 +2,15 @@ //! //! Provides support for storage, and `PubSub` functionality. -use std::str::FromStr; +use std::{convert::Infallible, str::FromStr}; use derive_more::{Display, From, Into}; +use futures::{StreamExt, pin_mut, stream::BoxStream}; /// IPFS Content Identifier. pub use ipld_core::cid::Cid; /// IPLD pub use ipld_core::ipld::Ipld; +use libp2p::gossipsub::MessageId as PubsubMessageId; /// `rust_ipfs` re-export. pub use rust_ipfs; /// Server, Client, or Auto mode @@ -19,12 +21,6 @@ pub use rust_ipfs::Ipfs; pub use rust_ipfs::Multiaddr; /// Peer ID type. pub use rust_ipfs::PeerId; -/// Stream for `PubSub` Topic Subscriptions. -pub use rust_ipfs::SubscriptionStream; -/// Builder type for IPFS Node configuration. -use rust_ipfs::UninitializedIpfsDefault as UninitializedIpfs; -/// libp2p re-exports. -pub use rust_ipfs::libp2p::futures::{FutureExt, StreamExt, pin_mut, stream::BoxStream}; /// Peer Info type. pub use rust_ipfs::p2p::PeerInfo; /// Enum for specifying paths in IPFS. @@ -32,10 +28,8 @@ pub use rust_ipfs::path::IpfsPath; /// Storage type for IPFS node. pub use rust_ipfs::repo::StorageTypes; use rust_ipfs::{ - PubsubEvent, Quorum, - dag::ResolveError, - libp2p::gossipsub::{Message as PubsubMessage, MessageId as PubsubMessageId}, - unixfs::AddOpt, + GossipsubMessage, NetworkBehaviour, Quorum, ToRecordKey, builder::IpfsBuilder, + dag::ResolveError, dummy, gossipsub::IntoGossipsubTopic, unixfs::AddOpt, }; #[derive(Debug, Display, From, Into)] @@ -43,13 +37,22 @@ use rust_ipfs::{ pub struct MessageId(pub PubsubMessageId); /// Builder type for IPFS Node configuration. -pub struct IpfsBuilder(UninitializedIpfs); +pub struct HermesIpfsBuilder(IpfsBuilder) +where N: NetworkBehaviour + Send + Sync; + +impl Default for HermesIpfsBuilder { + fn default() -> Self { + Self(IpfsBuilder::new()) + } +} -impl IpfsBuilder { +impl HermesIpfsBuilder +where N: NetworkBehaviour + Send + Sync +{ #[must_use] /// Create a new` IpfsBuilder`. pub fn new() -> Self { - Self(UninitializedIpfs::new()) + Self(IpfsBuilder::new()) } #[must_use] @@ -78,26 +81,6 @@ impl IpfsBuilder { ) } - #[must_use] - /// Set the transport configuration for the IPFS node. - pub fn set_transport_configuration( - self, - transport: rust_ipfs::p2p::TransportConfig, - ) -> Self { - Self(self.0.set_transport_configuration(transport)) - } - - #[must_use] - /// Disable TLS for the IPFS node. - pub fn disable_tls(self) -> Self { - let transport = rust_ipfs::p2p::TransportConfig { - enable_quic: false, - enable_secure_websocket: false, - ..Default::default() - }; - Self(self.0.set_transport_configuration(transport)) - } - /// Start the IPFS node. /// /// ## Errors @@ -107,12 +90,6 @@ impl IpfsBuilder { } } -impl Default for IpfsBuilder { - fn default() -> Self { - Self::new() - } -} - /// Hermes IPFS Node. pub struct HermesIpfs { /// IPFS node @@ -130,11 +107,13 @@ impl HermesIpfs { /// /// Returns an error if the IPFS daemon fails to start. pub async fn start() -> anyhow::Result { - let node: Ipfs = IpfsBuilder::new() + let node: Ipfs = HermesIpfsBuilder::::new() .with_default() .set_default_listener() // TODO(saibatizoku): Re-Enable default transport config when libp2p Cert bug is fixed - .disable_tls() + // TODO(rafal-ch): TLS is disabled by default, we can enable it by calling + // on of the `IpfsBuilder::enable_secure...()` flavors. + //.enable_secure_websocket() .start() .await?; Ok(HermesIpfs { node }) @@ -394,7 +373,7 @@ impl HermesIpfs { key: impl AsRef<[u8]>, value: impl Into>, ) -> anyhow::Result<()> { - self.node.dht_put(key, value, Quorum::One).await + self.node.dht_put(key, value.into(), Quorum::One).await } /// Get content from DHT. @@ -412,7 +391,7 @@ impl HermesIpfs { /// Returns error if unable to get content from DHT pub async fn dht_get( &self, - key: impl AsRef<[u8]>, + key: impl AsRef<[u8]> + ToRecordKey, ) -> anyhow::Result> { let record_stream = self.node.dht_get(key).await?; pin_mut!(record_stream); @@ -456,26 +435,6 @@ impl HermesIpfs { self.node.bootstrap().await } - /// Returns a stream of pubsub swarm events for a topic. - /// - /// ## Parameters - /// - /// * `topic` - `impl Into>` - /// - /// ## Returns - /// - /// * A result with `BoxStream<'static, PubsubEvent>` - /// - /// ## Errors - /// - /// Returns error if unable to retrieve pubsub swarm events. - pub async fn pubsub_events( - &self, - topic: impl Into>, - ) -> anyhow::Result> { - self.node.pubsub_events(topic).await - } - /// Subscribes to a pubsub topic. /// /// ## Parameters @@ -484,7 +443,7 @@ impl HermesIpfs { /// /// ## Returns /// - /// * `SubscriptionStream` + /// * Stream of `GossipsubEvent` /// /// ## Errors /// @@ -492,8 +451,10 @@ impl HermesIpfs { pub async fn pubsub_subscribe( &self, topic: impl Into, - ) -> anyhow::Result { - self.node.pubsub_subscribe(topic).await + ) -> anyhow::Result> { + let topic = topic.into(); + self.node.pubsub_subscribe(&topic).await?; + self.node.pubsub_listener(&topic).await } /// Unsubscribes from a pubsub topic. @@ -502,17 +463,13 @@ impl HermesIpfs { /// /// * `topic` - `impl Into` /// - /// ## Returns - /// - /// * `bool` - /// /// ## Errors /// /// Returns error if unable to unsubscribe from pubsub topic. pub async fn pubsub_unsubscribe( &self, - topic: impl Into, - ) -> anyhow::Result { + topic: impl Into + IntoGossipsubTopic, + ) -> anyhow::Result<()> { self.node.pubsub_unsubscribe(topic).await } @@ -523,22 +480,15 @@ impl HermesIpfs { /// * `topic` - `impl Into` /// * `message` - `Vec` /// - /// ## Returns - /// - /// * `Result` - /// /// ## Errors /// /// Returns error if unable to publish to a pubsub topic. pub async fn pubsub_publish( &self, - topic: impl Into, + topic: impl IntoGossipsubTopic, message: Vec, - ) -> anyhow::Result { - self.node - .pubsub_publish(topic, message) - .await - .map(std::convert::Into::into) + ) -> anyhow::Result<()> { + self.node.pubsub_publish(topic, message).await } /// Ban peer from node. @@ -647,15 +597,43 @@ impl FromStr for GetIpfsFile { } } +/// `GossipsubEvents` related to subscription state +#[derive(Display, Debug)] +pub enum SubscriptionStatusEvent { + /// Peer has been subscribed + Subscribed { + /// Peer id + peer_id: PeerId, + }, + /// Peer has been unsubscribed + Unsubscribed { + /// Peer id + peer_id: PeerId, + }, +} + /// Handle stream of messages from the IPFS pubsub topic -pub fn subscription_stream_task( - stream: SubscriptionStream, - handler: fn(PubsubMessage), -) -> tokio::task::JoinHandle<()> { +pub fn subscription_stream_task( + stream: BoxStream<'static, connexa::prelude::GossipsubEvent>, + message_handler: MH, + subscription_handler: SH, +) -> tokio::task::JoinHandle<()> +where + MH: Fn(GossipsubMessage) + Send + 'static, + SH: Fn(SubscriptionStatusEvent) + Send + 'static, +{ tokio::spawn(async move { pin_mut!(stream); while let Some(msg) = stream.next().await { - handler(msg); + match msg { + connexa::prelude::GossipsubEvent::Subscribed { peer_id } => { + subscription_handler(SubscriptionStatusEvent::Subscribed { peer_id }); + }, + connexa::prelude::GossipsubEvent::Unsubscribed { peer_id } => { + subscription_handler(SubscriptionStatusEvent::Unsubscribed { peer_id }); + }, + connexa::prelude::GossipsubEvent::Message { message } => message_handler(message), + } } }) }