Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ To see how to make this your own, look here:
[README]((https://openapi-generator.tech))

- API version: 0.58.0
- Build date: 2025-12-08T21:21:04.734650742Z[Etc/UTC]
- Build date: 2025-12-10T10:02:01.541355-06:00[America/Chicago]



Expand Down
16 changes: 12 additions & 4 deletions api-server/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -675,12 +675,16 @@ components:
description: A Ceramic event as part of a Ceramic Stream. Contains both the
root CID used to identify the Event as well as the Event payload data.
example:
offset: 0
data: data
id: id
properties:
id:
description: Multibase encoding of event root CID bytes.
type: string
offset:
description: Numeric offset of the event (monotonic index) used for resuming.
type: integer
data:
description: Multibase encoding of event data.
type: string
Expand All @@ -705,9 +709,11 @@ components:
example:
resumeToken: resumeToken
events:
- data: data
- offset: 0
data: data
id: id
- data: data
- offset: 0
data: data
id: id
properties:
events:
Expand All @@ -728,9 +734,11 @@ components:
example:
resumeOffset: 0
events:
- data: data
- offset: 0
data: data
id: id
- data: data
- offset: 0
data: data
id: id
isComplete: true
properties:
Expand Down
1 change: 1 addition & 0 deletions api-server/docs/Event.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**id** | **String** | Multibase encoding of event root CID bytes. |
**offset** | **i32** | Numeric offset of the event (monotonic index) used for resuming. | [optional] [default to None]
**data** | **String** | Multibase encoding of event data. | [optional] [default to None]

[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
Expand Down
20 changes: 19 additions & 1 deletion api-server/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ pub struct Event {
#[serde(rename = "id")]
pub id: String,

/// Numeric offset of the event (monotonic index) used for resuming.
#[serde(rename = "offset")]
#[serde(skip_serializing_if = "Option::is_none")]
pub offset: Option<i32>,

/// Multibase encoding of event data.
#[serde(rename = "data")]
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -295,7 +300,11 @@ pub struct Event {
impl Event {
#[allow(clippy::new_without_default)]
pub fn new(id: String) -> Event {
Event { id, data: None }
Event {
id,
offset: None,
data: None,
}
}
}

Expand All @@ -307,6 +316,9 @@ impl std::string::ToString for Event {
let params: Vec<Option<String>> = vec![
Some("id".to_string()),
Some(self.id.to_string()),
self.offset
.as_ref()
.map(|offset| ["offset".to_string(), offset.to_string()].join(",")),
self.data
.as_ref()
.map(|data| ["data".to_string(), data.to_string()].join(",")),
Expand All @@ -328,6 +340,7 @@ impl std::str::FromStr for Event {
#[allow(dead_code)]
struct IntermediateRep {
pub id: Vec<String>,
pub offset: Vec<i32>,
pub data: Vec<String>,
}

Expand Down Expand Up @@ -355,6 +368,10 @@ impl std::str::FromStr for Event {
<String as std::str::FromStr>::from_str(val).map_err(|x| x.to_string())?,
),
#[allow(clippy::redundant_clone)]
"offset" => intermediate_rep.offset.push(
<i32 as std::str::FromStr>::from_str(val).map_err(|x| x.to_string())?,
),
#[allow(clippy::redundant_clone)]
"data" => intermediate_rep.data.push(
<String as std::str::FromStr>::from_str(val).map_err(|x| x.to_string())?,
),
Expand All @@ -377,6 +394,7 @@ impl std::str::FromStr for Event {
.into_iter()
.next()
.ok_or_else(|| "id missing in Event".to_string())?,
offset: intermediate_rep.offset.into_iter().next(),
data: intermediate_rep.data.into_iter().next(),
})
}
Expand Down
3 changes: 3 additions & 0 deletions api/ceramic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,9 @@ components:
id:
type: string
description: Multibase encoding of event root CID bytes.
offset:
type: integer
description: Numeric offset of the event (monotonic index) used for resuming.
data:
type: string
description: Multibase encoding of event data.
Expand Down
38 changes: 27 additions & 11 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,16 @@ const INSERT_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_se
// Helper to build responses consistent as we can't implement for the api_server::models directly
pub struct BuildResponse {}
impl BuildResponse {
pub fn event(id: Cid, data: Option<Vec<u8>>) -> models::Event {
pub fn event(id: Cid, data: Option<Vec<u8>>, offset: Option<i64>) -> models::Event {
let id = id.to_string();
let mut res = models::Event::new(id);
if data.as_ref().is_some_and(|e| !e.is_empty()) {
res.data = Some(multibase::encode(multibase::Base::Base64, data.unwrap()));
}
if let Some(o) = offset {
// clamp to i32 range where applicable
res.offset = Some(o as i32);
}
res
}
}
Expand Down Expand Up @@ -218,11 +222,13 @@ pub struct EventDataResult {
pub id: ceramic_core::Cid,
/// The data as a car file. Can be none if not requested.
pub data: Option<Vec<u8>>,
/// The storage-delivered monotonic offset for this event (node-scoped, starts at 1)
pub offset: i64,
}

impl EventDataResult {
pub fn new(id: ceramic_core::Cid, data: Option<Vec<u8>>) -> Self {
Self { id, data }
pub fn new(id: ceramic_core::Cid, data: Option<Vec<u8>>, offset: i64) -> Self {
Self { id, data, offset }
}
}

Expand Down Expand Up @@ -585,9 +591,11 @@ where
.events_since_highwater_mark(hw, limit as i64, include_data)
.await
.map_err(|e| ErrorResponse::new(format!("failed to get event data: {e}")))?;
// For the feed endpoint keep the response simple and do not expose
// storage-delivered offsets here; reserve offsets for experimental endpoints.
let events = event_ids
.into_iter()
.map(|ev| BuildResponse::event(ev.id, ev.data))
.map(|ev| BuildResponse::event(ev.id, ev.data, None))
.collect();

Ok(FeedEventsGetResponse::Success(models::EventFeed {
Expand Down Expand Up @@ -670,19 +678,26 @@ where
let (start, stop) =
self.build_start_stop_range(&sep_key, &sep_value, controller, stream_id)?;

let events = self
let rows = self
.model
.range_with_values(start..stop, offset, limit)
.await
.map_err(|err| ErrorResponse::new(format!("failed to get keys: {err}")))?
.into_iter()
.map(|(id, data)| BuildResponse::event(id, Some(data)))
.collect::<Vec<_>>();
.map_err(|err| ErrorResponse::new(format!("failed to get keys: {err}")))?;

let mut events = Vec::with_capacity(rows.len());
// Fall back to computing a monotonic offset relative to the provided `offset` when
// the store doesn't provide authoritative delivered values for range queries.
let mut last_offset: i64 = offset as i64;
for (i, (cid, data)) in rows.into_iter().enumerate() {
let delivered = offset as i64 + i as i64 + 1;
events.push(BuildResponse::event(cid, Some(data), Some(delivered)));
last_offset = delivered;
}

let event_cnt = events.len() as u32;
Ok(ExperimentalEventsSepSepValueGetResponse::Success(
models::EventsGet {
resume_offset: (offset + event_cnt) as i32,
resume_offset: last_offset as i32,
events,
is_complete: event_cnt < limit,
},
Expand Down Expand Up @@ -807,7 +822,8 @@ where
};
match data {
Ok(Some(data)) => {
let event = BuildResponse::event(cid, Some(data));
// when returning a single event, offset is not known here; leave None
let event = BuildResponse::event(cid, Some(data), None);
Ok(EventsEventIdGetResponse::Success(event))
}
Ok(None) => Ok(EventsEventIdGetResponse::EventNotFound(format!(
Expand Down
2 changes: 1 addition & 1 deletion api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ async fn get_events_for_interest_range() {
r: Success(EventsGet { events: [Event { id: "fce0105ff012616e0f0c1e987ef0f772afbe2c7f05c50102bc800", data: "" }], resume_offset: 1, is_complete: false })
*/
let mock_interest = MockAccessInterestStoreTest::new();
let expected = BuildResponse::event(cid, None);
let expected = BuildResponse::event(cid, None, Some(1));
let mut mock_event_store = MockEventStoreTest::new();
mock_event_store
.expect_range_with_values()
Expand Down
12 changes: 11 additions & 1 deletion event-svc/src/event/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,18 @@ impl ceramic_api::EventService for EventService {
.event_access
.new_events_since_value(highwater, limit)
.await?;
// Compute per-event delivered offsets when the storage layer only returns CIDs.
// The storage returns `hw` as the next highwater mark (last_row + 1), so we
// reconstruct delivered values relative to `hw` and the number of returned
// events to preserve a monotonic sequence for clients.
let event_cnt = cids.len() as i64;
let res = cids
.into_iter()
.map(|cid| ceramic_api::EventDataResult::new(cid, None))
.enumerate()
.map(|(i, cid)| {
let delivered = hw - event_cnt + i as i64 + 1;
ceramic_api::EventDataResult::new(cid, None, delivered)
})
.collect();
(hw, res)
}
Expand All @@ -234,6 +243,7 @@ impl ceramic_api::EventService for EventService {
res.push(ceramic_api::EventDataResult::new(
row.cid,
Some(row.event.encode_car()?),
row.delivered,
));
}
(hw, res)
Expand Down
2 changes: 2 additions & 0 deletions event-svc/src/store/sql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ impl EventQuery {
ORDER BY key.order_key, eb.idx;"#
}

// No delivered-specific grouped query; use `value_blocks_by_order_key_many` for ranges

/// Find event CIDs that have not yet been delivered to the client
/// Useful after a restart, or if the task managing delivery has availability to try old events
/// Requires binding two parameters:
Expand Down
4 changes: 4 additions & 0 deletions sdk/packages/http-client/src/__generated__/api.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.