From ae0cc7fa254b84512c68098821ad43eb6310ccc0 Mon Sep 17 00:00:00 2001 From: eloramirez1356 Date: Thu, 11 Dec 2025 13:21:42 -0600 Subject: [PATCH] feat: experimental/events API now uses absolute numeric offsets --- api-server/README.md | 2 +- api-server/api/openapi.yaml | 16 ++++++-- api-server/docs/Event.md | 1 + api-server/src/models.rs | 20 +++++++++- api/ceramic.yaml | 3 ++ api/src/server.rs | 38 +++++++++++++------ api/src/tests.rs | 2 +- event-svc/src/event/store.rs | 12 +++++- event-svc/src/store/sql/query.rs | 2 + .../http-client/src/__generated__/api.ts | 4 ++ 10 files changed, 81 insertions(+), 19 deletions(-) diff --git a/api-server/README.md b/api-server/README.md index b2a603664..9cc2cc867 100644 --- a/api-server/README.md +++ b/api-server/README.md @@ -15,7 +15,7 @@ To see how to make this your own, look here: [README]((https://openapi-generator.tech)) - API version: 0.58.0 -- Build date: 2025-12-08T21:21:04.734650742Z[Etc/UTC] +- Build date: 2025-12-10T10:02:01.541355-06:00[America/Chicago] diff --git a/api-server/api/openapi.yaml b/api-server/api/openapi.yaml index ae0bb8d51..b9417ea24 100644 --- a/api-server/api/openapi.yaml +++ b/api-server/api/openapi.yaml @@ -675,12 +675,16 @@ components: description: A Ceramic event as part of a Ceramic Stream. Contains both the root CID used to identify the Event as well as the Event payload data. example: + offset: 0 data: data id: id properties: id: description: Multibase encoding of event root CID bytes. type: string + offset: + description: Numeric offset of the event (monotonic index) used for resuming. + type: integer data: description: Multibase encoding of event data. type: string @@ -705,9 +709,11 @@ components: example: resumeToken: resumeToken events: - - data: data + - offset: 0 + data: data id: id - - data: data + - offset: 0 + data: data id: id properties: events: @@ -728,9 +734,11 @@ components: example: resumeOffset: 0 events: - - data: data + - offset: 0 + data: data id: id - - data: data + - offset: 0 + data: data id: id isComplete: true properties: diff --git a/api-server/docs/Event.md b/api-server/docs/Event.md index 474e40a2c..250edcbed 100644 --- a/api-server/docs/Event.md +++ b/api-server/docs/Event.md @@ -4,6 +4,7 @@ Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **id** | **String** | Multibase encoding of event root CID bytes. | +**offset** | **i32** | Numeric offset of the event (monotonic index) used for resuming. | [optional] [default to None] **data** | **String** | Multibase encoding of event data. | [optional] [default to None] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/api-server/src/models.rs b/api-server/src/models.rs index 01b6467f6..72d6282bc 100644 --- a/api-server/src/models.rs +++ b/api-server/src/models.rs @@ -286,6 +286,11 @@ pub struct Event { #[serde(rename = "id")] pub id: String, + /// Numeric offset of the event (monotonic index) used for resuming. + #[serde(rename = "offset")] + #[serde(skip_serializing_if = "Option::is_none")] + pub offset: Option, + /// Multibase encoding of event data. #[serde(rename = "data")] #[serde(skip_serializing_if = "Option::is_none")] @@ -295,7 +300,11 @@ pub struct Event { impl Event { #[allow(clippy::new_without_default)] pub fn new(id: String) -> Event { - Event { id, data: None } + Event { + id, + offset: None, + data: None, + } } } @@ -307,6 +316,9 @@ impl std::string::ToString for Event { let params: Vec> = vec![ Some("id".to_string()), Some(self.id.to_string()), + self.offset + .as_ref() + .map(|offset| ["offset".to_string(), offset.to_string()].join(",")), self.data .as_ref() .map(|data| ["data".to_string(), data.to_string()].join(",")), @@ -328,6 +340,7 @@ impl std::str::FromStr for Event { #[allow(dead_code)] struct IntermediateRep { pub id: Vec, + pub offset: Vec, pub data: Vec, } @@ -355,6 +368,10 @@ impl std::str::FromStr for Event { ::from_str(val).map_err(|x| x.to_string())?, ), #[allow(clippy::redundant_clone)] + "offset" => intermediate_rep.offset.push( + ::from_str(val).map_err(|x| x.to_string())?, + ), + #[allow(clippy::redundant_clone)] "data" => intermediate_rep.data.push( ::from_str(val).map_err(|x| x.to_string())?, ), @@ -377,6 +394,7 @@ impl std::str::FromStr for Event { .into_iter() .next() .ok_or_else(|| "id missing in Event".to_string())?, + offset: intermediate_rep.offset.into_iter().next(), data: intermediate_rep.data.into_iter().next(), }) } diff --git a/api/ceramic.yaml b/api/ceramic.yaml index 82f097450..7a9d14ddb 100644 --- a/api/ceramic.yaml +++ b/api/ceramic.yaml @@ -626,6 +626,9 @@ components: id: type: string description: Multibase encoding of event root CID bytes. + offset: + type: integer + description: Numeric offset of the event (monotonic index) used for resuming. data: type: string description: Multibase encoding of event data. diff --git a/api/src/server.rs b/api/src/server.rs index 00707438d..1d965c2cf 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -74,12 +74,16 @@ const INSERT_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_se // Helper to build responses consistent as we can't implement for the api_server::models directly pub struct BuildResponse {} impl BuildResponse { - pub fn event(id: Cid, data: Option>) -> models::Event { + pub fn event(id: Cid, data: Option>, offset: Option) -> models::Event { let id = id.to_string(); let mut res = models::Event::new(id); if data.as_ref().is_some_and(|e| !e.is_empty()) { res.data = Some(multibase::encode(multibase::Base::Base64, data.unwrap())); } + if let Some(o) = offset { + // clamp to i32 range where applicable + res.offset = Some(o as i32); + } res } } @@ -218,11 +222,13 @@ pub struct EventDataResult { pub id: ceramic_core::Cid, /// The data as a car file. Can be none if not requested. pub data: Option>, + /// The storage-delivered monotonic offset for this event (node-scoped, starts at 1) + pub offset: i64, } impl EventDataResult { - pub fn new(id: ceramic_core::Cid, data: Option>) -> Self { - Self { id, data } + pub fn new(id: ceramic_core::Cid, data: Option>, offset: i64) -> Self { + Self { id, data, offset } } } @@ -585,9 +591,11 @@ where .events_since_highwater_mark(hw, limit as i64, include_data) .await .map_err(|e| ErrorResponse::new(format!("failed to get event data: {e}")))?; + // For the feed endpoint keep the response simple and do not expose + // storage-delivered offsets here; reserve offsets for experimental endpoints. let events = event_ids .into_iter() - .map(|ev| BuildResponse::event(ev.id, ev.data)) + .map(|ev| BuildResponse::event(ev.id, ev.data, None)) .collect(); Ok(FeedEventsGetResponse::Success(models::EventFeed { @@ -670,19 +678,26 @@ where let (start, stop) = self.build_start_stop_range(&sep_key, &sep_value, controller, stream_id)?; - let events = self + let rows = self .model .range_with_values(start..stop, offset, limit) .await - .map_err(|err| ErrorResponse::new(format!("failed to get keys: {err}")))? - .into_iter() - .map(|(id, data)| BuildResponse::event(id, Some(data))) - .collect::>(); + .map_err(|err| ErrorResponse::new(format!("failed to get keys: {err}")))?; + + let mut events = Vec::with_capacity(rows.len()); + // Fall back to computing a monotonic offset relative to the provided `offset` when + // the store doesn't provide authoritative delivered values for range queries. + let mut last_offset: i64 = offset as i64; + for (i, (cid, data)) in rows.into_iter().enumerate() { + let delivered = offset as i64 + i as i64 + 1; + events.push(BuildResponse::event(cid, Some(data), Some(delivered))); + last_offset = delivered; + } let event_cnt = events.len() as u32; Ok(ExperimentalEventsSepSepValueGetResponse::Success( models::EventsGet { - resume_offset: (offset + event_cnt) as i32, + resume_offset: last_offset as i32, events, is_complete: event_cnt < limit, }, @@ -807,7 +822,8 @@ where }; match data { Ok(Some(data)) => { - let event = BuildResponse::event(cid, Some(data)); + // when returning a single event, offset is not known here; leave None + let event = BuildResponse::event(cid, Some(data), None); Ok(EventsEventIdGetResponse::Success(event)) } Ok(None) => Ok(EventsEventIdGetResponse::EventNotFound(format!( diff --git a/api/src/tests.rs b/api/src/tests.rs index 41bd4ab07..cd49aabee 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -764,7 +764,7 @@ async fn get_events_for_interest_range() { r: Success(EventsGet { events: [Event { id: "fce0105ff012616e0f0c1e987ef0f772afbe2c7f05c50102bc800", data: "" }], resume_offset: 1, is_complete: false }) */ let mock_interest = MockAccessInterestStoreTest::new(); - let expected = BuildResponse::event(cid, None); + let expected = BuildResponse::event(cid, None, Some(1)); let mut mock_event_store = MockEventStoreTest::new(); mock_event_store .expect_range_with_values() diff --git a/event-svc/src/event/store.rs b/event-svc/src/event/store.rs index 146894d7d..751df87e7 100644 --- a/event-svc/src/event/store.rs +++ b/event-svc/src/event/store.rs @@ -218,9 +218,18 @@ impl ceramic_api::EventService for EventService { .event_access .new_events_since_value(highwater, limit) .await?; + // Compute per-event delivered offsets when the storage layer only returns CIDs. + // The storage returns `hw` as the next highwater mark (last_row + 1), so we + // reconstruct delivered values relative to `hw` and the number of returned + // events to preserve a monotonic sequence for clients. + let event_cnt = cids.len() as i64; let res = cids .into_iter() - .map(|cid| ceramic_api::EventDataResult::new(cid, None)) + .enumerate() + .map(|(i, cid)| { + let delivered = hw - event_cnt + i as i64 + 1; + ceramic_api::EventDataResult::new(cid, None, delivered) + }) .collect(); (hw, res) } @@ -234,6 +243,7 @@ impl ceramic_api::EventService for EventService { res.push(ceramic_api::EventDataResult::new( row.cid, Some(row.event.encode_car()?), + row.delivered, )); } (hw, res) diff --git a/event-svc/src/store/sql/query.rs b/event-svc/src/store/sql/query.rs index b51ba3166..35d0bc2b8 100644 --- a/event-svc/src/store/sql/query.rs +++ b/event-svc/src/store/sql/query.rs @@ -79,6 +79,8 @@ impl EventQuery { ORDER BY key.order_key, eb.idx;"# } + // No delivered-specific grouped query; use `value_blocks_by_order_key_many` for ranges + /// Find event CIDs that have not yet been delivered to the client /// Useful after a restart, or if the task managing delivery has availability to try old events /// Requires binding two parameters: diff --git a/sdk/packages/http-client/src/__generated__/api.ts b/sdk/packages/http-client/src/__generated__/api.ts index 868aa6548..ce2261873 100644 --- a/sdk/packages/http-client/src/__generated__/api.ts +++ b/sdk/packages/http-client/src/__generated__/api.ts @@ -1237,6 +1237,10 @@ export interface components { * Multibase encoding of event data. */ data?: string + /** + * Numeric offset of the event (monotonic index) used for resuming. + */ + offset?: number } /** * A Ceramic Event Data Payload