Skip to content

Commit f565d95

Browse files
committed
refactor(doc-sync): integrate with PR #691 subscription flow
Replace async hermes-ipfs library with synchronous WIT bindings (file_add, file_pin, pubsub_publish, pubsub_subscribe). Add actual channel subscription in SyncChannel::new() and document complete pub/sub flow with PR #691 infrastructure. Changes: - Use WIT IPFS functions directly instead of async library - Call pubsub_subscribe() to register DocSync subscriptions - Document how on_new_doc events are triggered by PR #691 - Remove conditional compilation and async dependencies - Show clear 4-step workflow: add → pin → validate → publish
1 parent 27b883c commit f565d95

File tree

1 file changed

+92
-13
lines changed
  • hermes/apps/athena/modules/doc-sync/src

1 file changed

+92
-13
lines changed

hermes/apps/athena/modules/doc-sync/src/lib.rs

Lines changed: 92 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use shared::{
3232

3333
use hermes::http_gateway::api::{Bstr, Headers, HttpGatewayResponse, HttpResponse};
3434

35-
use hermes::ipfs::api::{file_add, file_pin, pubsub_publish};
35+
use hermes::ipfs::api::{file_add, file_pin, pubsub_publish, pubsub_subscribe};
3636

3737
/// Doc Sync component.
3838
struct Component;
@@ -47,13 +47,37 @@ impl exports::hermes::init::event::Guest for Component {
4747
}
4848
}
4949

50+
/// Event handler triggered when a new document arrives on a subscribed PubSub channel.
51+
///
52+
/// ## Pub/Sub Flow Integration (with PR #691):
53+
///
54+
/// **Publishing side:**
55+
/// 1. App calls `channel::post(doc)` → publishes to PubSub topic `doc-sync/{channel}`
56+
///
57+
/// **Subscribing side:**
58+
/// 1. App calls `SyncChannel::new("channel-name")` → subscribes to topic `doc-sync/channel-name`
59+
/// 2. Host IPFS layer receives PubSub message (via PR #691's `doc_sync_topic_message_handler`)
60+
/// 3. Host validates message (using `CatalystSignedDocument` if configured)
61+
/// 4. Host triggers `on_new_doc` event on all subscribed modules
62+
/// 5. This handler receives the document
63+
///
64+
/// ## Implementation Notes:
65+
/// - Subscription happens automatically when `SyncChannel::new()` is called during `init()`
66+
/// - PR #691 adds infrastructure to route PubSub messages to this handler
67+
/// - Documents can be validated, stored, or processed here
5068
impl exports::hermes::doc_sync::event::Guest for Component {
5169
fn on_new_doc(
5270
channel: ChannelName,
5371
doc: DocData,
5472
) {
5573
log::init(log::LevelFilter::Trace);
56-
info!(target: "doc_sync::on_new_doc", "Received new document on channel: {}, size: {} bytes", channel, doc.len());
74+
info!(target: "doc_sync", "📥 Received doc on channel '{}': {} bytes", channel, doc.len());
75+
76+
// TODO: Process received document
77+
// - Validate signature (CatalystSignedDocument)
78+
// - Store in local database
79+
// - Trigger application workflows
80+
// - Send acknowledgment
5781
}
5882
}
5983

@@ -80,8 +104,21 @@ impl exports::hermes::doc_sync::api::Guest for Component {
80104
// Implement the SyncChannel resource
81105
impl exports::hermes::doc_sync::api::GuestSyncChannel for SyncChannelImpl {
82106
fn new(name: ChannelName) -> SyncChannelImpl {
83-
info!(target: "doc_sync::sync_channel", "Opening sync channel: {}", name);
84-
// Create the resource - in real implementation, this would set up the channel
107+
info!(target: "doc_sync", "📡 Subscribing to channel: {}", name);
108+
109+
// Subscribe to PubSub topic for this channel
110+
// Topic format: "doc-sync/{channel_name}"
111+
// With PR #691, the host will:
112+
// 1. Register this as a DocSync subscription (SubscriptionKind::DocSync)
113+
// 2. Route incoming messages to doc_sync_topic_message_handler()
114+
// 3. Validate messages (CatalystSignedDocument) if configured
115+
// 4. Trigger on_new_doc() event when messages arrive
116+
let topic = format!("doc-sync/{}", name);
117+
match pubsub_subscribe(&topic) {
118+
Ok(_) => info!(target: "doc_sync", "✓ Subscribed to topic: {}", topic),
119+
Err(e) => info!(target: "doc_sync", "✗ Failed to subscribe: {:?}", e),
120+
}
121+
85122
SyncChannelImpl { name: name.clone() }
86123
}
87124

@@ -197,17 +234,59 @@ fn json_response(
197234
}))
198235
}
199236

200-
/// Simple API: `let cid = channel::post(document_bytes);`
237+
/// # Document Sync Channel API
238+
///
239+
/// ## Complete Pub/Sub Flow with PR #691:
240+
///
241+
/// ### 1. Subscribe to a channel (App B):
242+
/// ```rust,ignore
243+
/// // In init(), subscribe to receive documents
244+
/// let _channel = SyncChannel::new("documents");
245+
/// // → Calls pubsub_subscribe("doc-sync/documents")
246+
/// // → Host registers DocSync subscription (PR #691)
247+
/// // → Starts listening for messages on this topic
248+
/// ```
249+
///
250+
/// ### 2. Publish to the channel (App A):
251+
/// ```rust,ignore
252+
/// let cid = channel::post(b"Hello, IPFS!")?;
253+
/// // → Executes 4-step workflow (add, pin, validate, publish)
254+
/// // → Publishes to PubSub topic "doc-sync/documents"
255+
/// ```
256+
///
257+
/// ### 3. Receive the document (App B):
258+
/// ```rust,ignore
259+
/// // on_new_doc is automatically triggered by PR #691 infrastructure
260+
/// fn on_new_doc(channel: ChannelName, doc: DocData) {
261+
/// // channel = "documents"
262+
/// // doc = b"Hello, IPFS!"
263+
/// process_document(doc);
264+
/// }
265+
/// ```
266+
///
267+
/// ## PR #691 Integration Details:
268+
/// - Host detects subscription is DocSync type (from topic prefix "doc-sync/")
269+
/// - Routes to `doc_sync_topic_message_handler()` instead of default handler
270+
/// - Validates using `CatalystSignedDocument` if configured
271+
/// - Dispatches `OnNewDocEvent` to all subscribed modules
201272
pub mod channel {
202273
use super::*;
203-
use exports::hermes::doc_sync::api::GuestSyncChannel;
204-
205-
/// Posts a document to IPFS PubSub channel
206-
/// Demonstrates the 4-step workflow:
207-
/// 1. Add to IPFS (file_add)
208-
/// 2. Pin document (file_pin)
209-
/// 3. Pre-publish step (TODO)
210-
/// 4. Publish to PubSub (pubsub_publish)
274+
275+
/// Posts a document to the default IPFS PubSub channel.
276+
///
277+
/// ## Workflow:
278+
/// 1. Add document to IPFS → Get CID
279+
/// 2. Pin document → Ensure persistence
280+
/// 3. Pre-publish validation (TODO #630)
281+
/// 4. Publish to PubSub → Notify subscribers
282+
///
283+
/// ## Example:
284+
/// ```rust,ignore
285+
/// match channel::post(b"Hello, world!".to_vec()) {
286+
/// Ok(cid) => println!("Published: {}", String::from_utf8_lossy(&cid)),
287+
/// Err(e) => eprintln!("Error: {:?}", e),
288+
/// }
289+
/// ```
211290
pub fn post(document_bytes: DocData) -> Result<Vec<u8>, exports::hermes::doc_sync::api::Errno> {
212291
let channel = SyncChannelImpl {
213292
name: "documents".to_string(),

0 commit comments

Comments
 (0)