diff --git a/Cargo.lock b/Cargo.lock index c570d64d7..9858ac6e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2242,6 +2242,7 @@ name = "ceramic-api" version = "0.58.0" dependencies = [ "anyhow", + "arrow", "async-trait", "ceramic-actor", "ceramic-api-server", @@ -2250,9 +2251,11 @@ dependencies = [ "ceramic-event", "ceramic-metadata", "ceramic-pipeline", + "datafusion", "expect-test", "futures", "hex", + "int-enum", "ipld-core", "jemalloc_pprof", "mockall", diff --git a/api-server/.openapi-generator/FILES b/api-server/.openapi-generator/FILES index 7120523a3..7e8074f13 100644 --- a/api-server/.openapi-generator/FILES +++ b/api-server/.openapi-generator/FILES @@ -6,6 +6,8 @@ api/openapi.yaml docs/BadRequestResponse.md docs/ErrorResponse.md docs/Event.md +docs/EventAcceptedResponse.md +docs/EventCreatedResponse.md docs/EventData.md docs/EventFeed.md docs/EventsGet.md diff --git a/api-server/README.md b/api-server/README.md index b2a603664..6fad5e4fd 100644 --- a/api-server/README.md +++ b/api-server/README.md @@ -15,7 +15,7 @@ To see how to make this your own, look here: [README]((https://openapi-generator.tech)) - API version: 0.58.0 -- Build date: 2025-12-08T21:21:04.734650742Z[Etc/UTC] +- Build date: 2025-12-14T22:29:40.738562-05:00[America/New_York] @@ -160,6 +160,8 @@ Method | HTTP request | Description - [BadRequestResponse](docs/BadRequestResponse.md) - [ErrorResponse](docs/ErrorResponse.md) - [Event](docs/Event.md) + - [EventAcceptedResponse](docs/EventAcceptedResponse.md) + - [EventCreatedResponse](docs/EventCreatedResponse.md) - [EventData](docs/EventData.md) - [EventFeed](docs/EventFeed.md) - [EventsGet](docs/EventsGet.md) diff --git a/api-server/api/openapi.yaml b/api-server/api/openapi.yaml index ae0bb8d51..e54cbace6 100644 --- a/api-server/api/openapi.yaml +++ b/api-server/api/openapi.yaml @@ -101,8 +101,20 @@ paths: requestBody: $ref: '#/components/requestBodies/EventData' responses: - "204": + "201": + content: + application/json: + schema: + $ref: '#/components/schemas/EventCreatedResponse' description: success + "202": + content: + application/json: + schema: + $ref: '#/components/schemas/EventAcceptedResponse' + description: Event accepted but validation pending. The event was stored + but validation did not complete in time. Use the returned event_id to + check status. "400": content: application/json: @@ -700,6 +712,38 @@ components: - data title: A Ceramic Event Data Payload type: object + EventCreatedResponse: + description: Returned when an event is successfully created and validated + example: + eventId: eventId + streamId: streamId + properties: + eventId: + description: The CID of the created event + type: string + streamId: + description: The stream ID this event belongs to + type: string + required: + - eventId + - streamId + title: Response after event creation + type: object + EventAcceptedResponse: + description: Returned when an event is stored but validation did not complete + in time. The event may still be validated successfully. + properties: + eventId: + description: The CID of the accepted event + type: string + message: + description: Information about the pending validation status + type: string + required: + - eventId + - message + title: Response when event is accepted but validation is pending + type: object EventFeed: description: Ceramic event keys as part of a Ceramic Stream example: diff --git a/api-server/docs/EventAcceptedResponse.md b/api-server/docs/EventAcceptedResponse.md new file mode 100644 index 000000000..556dd7e11 --- /dev/null +++ b/api-server/docs/EventAcceptedResponse.md @@ -0,0 +1,11 @@ +# EventAcceptedResponse + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**event_id** | **String** | The CID of the accepted event | +**message** | **String** | Information about the pending validation status | + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/api-server/docs/EventCreatedResponse.md b/api-server/docs/EventCreatedResponse.md new file mode 100644 index 000000000..7cd3b6512 --- /dev/null +++ b/api-server/docs/EventCreatedResponse.md @@ -0,0 +1,11 @@ +# EventCreatedResponse + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**event_id** | **String** | The CID of the created event | +**stream_id** | **String** | The stream ID this event belongs to | + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/api-server/docs/default_api.md b/api-server/docs/default_api.md index e614738ec..945403a39 100644 --- a/api-server/docs/default_api.md +++ b/api-server/docs/default_api.md @@ -197,7 +197,7 @@ No authorization required [[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) # **** -> (event_data) +> models::EventCreatedResponse (event_data) Creates a new event ### Required Parameters @@ -208,7 +208,7 @@ Name | Type | Description | Notes ### Return type - (empty response body) +[**models::EventCreatedResponse**](EventCreatedResponse.md) ### Authorization diff --git a/api-server/src/client/mod.rs b/api-server/src/client/mod.rs index aa9dfe357..6124e3533 100644 --- a/api-server/src/client/mod.rs +++ b/api-server/src/client/mod.rs @@ -1060,7 +1060,34 @@ where .await?; match response.status().as_u16() { - 204 => Ok(EventsPostResponse::Success), + 201 => { + let body = response.into_body(); + let body = body + .into_raw() + .map_err(|e| ApiError(format!("Failed to read response: {}", e))) + .await?; + let body = str::from_utf8(&body) + .map_err(|e| ApiError(format!("Response was not valid UTF8: {}", e)))?; + let body = + serde_json::from_str::(body).map_err(|e| { + ApiError(format!("Response body did not match the schema: {}", e)) + })?; + Ok(EventsPostResponse::Success(body)) + } + 202 => { + let body = response.into_body(); + let body = body + .into_raw() + .map_err(|e| ApiError(format!("Failed to read response: {}", e))) + .await?; + let body = str::from_utf8(&body) + .map_err(|e| ApiError(format!("Response was not valid UTF8: {}", e)))?; + let body = + serde_json::from_str::(body).map_err(|e| { + ApiError(format!("Response body did not match the schema: {}", e)) + })?; + Ok(EventsPostResponse::EventAcceptedButValidationPending(body)) + } 400 => { let body = response.into_body(); let body = body diff --git a/api-server/src/lib.rs b/api-server/src/lib.rs index 84168c81e..9b00aa450 100644 --- a/api-server/src/lib.rs +++ b/api-server/src/lib.rs @@ -81,7 +81,9 @@ pub enum EventsOptionsResponse { #[must_use] pub enum EventsPostResponse { /// success - Success, + Success(models::EventCreatedResponse), + /// Event accepted but validation pending. The event was stored but validation did not complete in time. Use the returned event_id to check status. + EventAcceptedButValidationPending(models::EventAcceptedResponse), /// bad request BadRequest(models::BadRequestResponse), /// Internal server error diff --git a/api-server/src/models.rs b/api-server/src/models.rs index 01b6467f6..2a14841f1 100644 --- a/api-server/src/models.rs +++ b/api-server/src/models.rs @@ -425,6 +425,319 @@ impl std::convert::TryFrom for header::IntoHeaderVal } } +/// Returned when an event is stored but validation did not complete in time. The event may still be validated successfully. +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, validator::Validate)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct EventAcceptedResponse { + /// The CID of the accepted event + #[serde(rename = "eventId")] + pub event_id: String, + + /// Information about the pending validation status + #[serde(rename = "message")] + pub message: String, +} + +impl EventAcceptedResponse { + #[allow(clippy::new_without_default)] + pub fn new(event_id: String, message: String) -> EventAcceptedResponse { + EventAcceptedResponse { event_id, message } + } +} + +/// Converts the EventAcceptedResponse value to the Query Parameters representation (style=form, explode=false) +/// specified in https://swagger.io/docs/specification/serialization/ +/// Should be implemented in a serde serializer +impl std::string::ToString for EventAcceptedResponse { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("eventId".to_string()), + Some(self.event_id.to_string()), + Some("message".to_string()), + Some(self.message.to_string()), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a EventAcceptedResponse value +/// as specified in https://swagger.io/docs/specification/serialization/ +/// Should be implemented in a serde deserializer +impl std::str::FromStr for EventAcceptedResponse { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + #[allow(dead_code)] + struct IntermediateRep { + pub event_id: Vec, + pub message: Vec, + } + + let mut intermediate_rep = IntermediateRep::default(); + + // Parse into intermediate representation + let mut string_iter = s.split(','); + let mut key_result = string_iter.next(); + + while key_result.is_some() { + let val = match string_iter.next() { + Some(x) => x, + None => { + return std::result::Result::Err( + "Missing value while parsing EventAcceptedResponse".to_string(), + ) + } + }; + + if let Some(key) = key_result { + #[allow(clippy::match_single_binding)] + match key { + #[allow(clippy::redundant_clone)] + "eventId" => intermediate_rep.event_id.push( + ::from_str(val).map_err(|x| x.to_string())?, + ), + #[allow(clippy::redundant_clone)] + "message" => intermediate_rep.message.push( + ::from_str(val).map_err(|x| x.to_string())?, + ), + _ => { + return std::result::Result::Err( + "Unexpected key while parsing EventAcceptedResponse".to_string(), + ) + } + } + } + + // Get the next key + key_result = string_iter.next(); + } + + // Use the intermediate representation to return the struct + std::result::Result::Ok(EventAcceptedResponse { + event_id: intermediate_rep + .event_id + .into_iter() + .next() + .ok_or_else(|| "eventId missing in EventAcceptedResponse".to_string())?, + message: intermediate_rep + .message + .into_iter() + .next() + .ok_or_else(|| "message missing in EventAcceptedResponse".to_string())?, + }) + } +} + +// Methods for converting between header::IntoHeaderValue and hyper::header::HeaderValue + +#[cfg(any(feature = "client", feature = "server"))] +impl std::convert::TryFrom> + for hyper::header::HeaderValue +{ + type Error = String; + + fn try_from( + hdr_value: header::IntoHeaderValue, + ) -> std::result::Result { + let hdr_value = hdr_value.to_string(); + match hyper::header::HeaderValue::from_str(&hdr_value) { + std::result::Result::Ok(value) => std::result::Result::Ok(value), + std::result::Result::Err(e) => std::result::Result::Err(format!( + "Invalid header value for EventAcceptedResponse - value: {} is invalid {}", + hdr_value, e + )), + } + } +} + +#[cfg(any(feature = "client", feature = "server"))] +impl std::convert::TryFrom + for header::IntoHeaderValue +{ + type Error = String; + + fn try_from(hdr_value: hyper::header::HeaderValue) -> std::result::Result { + match hdr_value.to_str() { + std::result::Result::Ok(value) => { + match ::from_str(value) { + std::result::Result::Ok(value) => { + std::result::Result::Ok(header::IntoHeaderValue(value)) + } + std::result::Result::Err(err) => std::result::Result::Err(format!( + "Unable to convert header value '{}' into EventAcceptedResponse - {}", + value, err + )), + } + } + std::result::Result::Err(e) => std::result::Result::Err(format!( + "Unable to convert header: {:?} to string: {}", + hdr_value, e + )), + } + } +} + +/// Returned when an event is successfully created and validated +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, validator::Validate)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct EventCreatedResponse { + /// The CID of the created event + #[serde(rename = "eventId")] + pub event_id: String, + + /// The stream ID this event belongs to + #[serde(rename = "streamId")] + pub stream_id: String, +} + +impl EventCreatedResponse { + #[allow(clippy::new_without_default)] + pub fn new(event_id: String, stream_id: String) -> EventCreatedResponse { + EventCreatedResponse { + event_id, + stream_id, + } + } +} + +/// Converts the EventCreatedResponse value to the Query Parameters representation (style=form, explode=false) +/// specified in https://swagger.io/docs/specification/serialization/ +/// Should be implemented in a serde serializer +impl std::string::ToString for EventCreatedResponse { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("eventId".to_string()), + Some(self.event_id.to_string()), + Some("streamId".to_string()), + Some(self.stream_id.to_string()), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a EventCreatedResponse value +/// as specified in https://swagger.io/docs/specification/serialization/ +/// Should be implemented in a serde deserializer +impl std::str::FromStr for EventCreatedResponse { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + #[allow(dead_code)] + struct IntermediateRep { + pub event_id: Vec, + pub stream_id: Vec, + } + + let mut intermediate_rep = IntermediateRep::default(); + + // Parse into intermediate representation + let mut string_iter = s.split(','); + let mut key_result = string_iter.next(); + + while key_result.is_some() { + let val = match string_iter.next() { + Some(x) => x, + None => { + return std::result::Result::Err( + "Missing value while parsing EventCreatedResponse".to_string(), + ) + } + }; + + if let Some(key) = key_result { + #[allow(clippy::match_single_binding)] + match key { + #[allow(clippy::redundant_clone)] + "eventId" => intermediate_rep.event_id.push( + ::from_str(val).map_err(|x| x.to_string())?, + ), + #[allow(clippy::redundant_clone)] + "streamId" => intermediate_rep.stream_id.push( + ::from_str(val).map_err(|x| x.to_string())?, + ), + _ => { + return std::result::Result::Err( + "Unexpected key while parsing EventCreatedResponse".to_string(), + ) + } + } + } + + // Get the next key + key_result = string_iter.next(); + } + + // Use the intermediate representation to return the struct + std::result::Result::Ok(EventCreatedResponse { + event_id: intermediate_rep + .event_id + .into_iter() + .next() + .ok_or_else(|| "eventId missing in EventCreatedResponse".to_string())?, + stream_id: intermediate_rep + .stream_id + .into_iter() + .next() + .ok_or_else(|| "streamId missing in EventCreatedResponse".to_string())?, + }) + } +} + +// Methods for converting between header::IntoHeaderValue and hyper::header::HeaderValue + +#[cfg(any(feature = "client", feature = "server"))] +impl std::convert::TryFrom> + for hyper::header::HeaderValue +{ + type Error = String; + + fn try_from( + hdr_value: header::IntoHeaderValue, + ) -> std::result::Result { + let hdr_value = hdr_value.to_string(); + match hyper::header::HeaderValue::from_str(&hdr_value) { + std::result::Result::Ok(value) => std::result::Result::Ok(value), + std::result::Result::Err(e) => std::result::Result::Err(format!( + "Invalid header value for EventCreatedResponse - value: {} is invalid {}", + hdr_value, e + )), + } + } +} + +#[cfg(any(feature = "client", feature = "server"))] +impl std::convert::TryFrom + for header::IntoHeaderValue +{ + type Error = String; + + fn try_from(hdr_value: hyper::header::HeaderValue) -> std::result::Result { + match hdr_value.to_str() { + std::result::Result::Ok(value) => { + match ::from_str(value) { + std::result::Result::Ok(value) => { + std::result::Result::Ok(header::IntoHeaderValue(value)) + } + std::result::Result::Err(err) => std::result::Result::Err(format!( + "Unable to convert header value '{}' into EventCreatedResponse - {}", + value, err + )), + } + } + std::result::Result::Err(e) => std::result::Result::Err(format!( + "Unable to convert header: {:?} to string: {}", + hdr_value, e + )), + } + } +} + /// The data for a Ceramic event that is part of a Ceramic Stream #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, validator::Validate)] #[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] diff --git a/api-server/src/server/mod.rs b/api-server/src/server/mod.rs index 1bb831c44..5b6f659dc 100644 --- a/api-server/src/server/mod.rs +++ b/api-server/src/server/mod.rs @@ -624,8 +624,26 @@ where match result { Ok(rsp) => match rsp { EventsPostResponse::Success + (body) => { - *response.status_mut() = StatusCode::from_u16(204).expect("Unable to turn 204 into a StatusCode"); + *response.status_mut() = StatusCode::from_u16(201).expect("Unable to turn 201 into a StatusCode"); + response.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_str("application/json") + .expect("Unable to create Content-Type header for EVENTS_POST_SUCCESS")); + let body_content = serde_json::to_string(&body).expect("impossible to fail to serialize"); + *response.body_mut() = Body::from(body_content); + }, + EventsPostResponse::EventAcceptedButValidationPending + (body) + => { + *response.status_mut() = StatusCode::from_u16(202).expect("Unable to turn 202 into a StatusCode"); + response.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_str("application/json") + .expect("Unable to create Content-Type header for EVENTS_POST_EVENT_ACCEPTED_BUT_VALIDATION_PENDING")); + let body_content = serde_json::to_string(&body).expect("impossible to fail to serialize"); + *response.body_mut() = Body::from(body_content); }, EventsPostResponse::BadRequest (body) diff --git a/api/Cargo.toml b/api/Cargo.toml index db7158445..aa174c6dd 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -39,7 +39,11 @@ jemalloc_pprof = "0.1.0" tikv-jemalloc-ctl.workspace = true [dev-dependencies] +arrow.workspace = true +datafusion.workspace = true expect-test.workspace = true +futures.workspace = true +int-enum.workspace = true test-log.workspace = true mockall.workspace = true object_store.workspace = true diff --git a/api/ceramic.yaml b/api/ceramic.yaml index 82f097450..ed052ea32 100644 --- a/api/ceramic.yaml +++ b/api/ceramic.yaml @@ -107,8 +107,18 @@ paths: requestBody: $ref: "#/components/requestBodies/EventData" responses: - "204": + "201": description: success + content: + application/json: + schema: + $ref: "#/components/schemas/EventCreatedResponse" + "202": + description: Event accepted but validation pending. The event was stored but validation did not complete in time. Use the returned event_id to check status. + content: + application/json: + schema: + $ref: "#/components/schemas/EventAcceptedResponse" "400": description: bad request content: @@ -639,6 +649,34 @@ components: data: type: string description: Multibase encoding of event data. + EventCreatedResponse: + title: Response after event creation + description: Returned when an event is successfully created and validated + type: object + required: + - eventId + - streamId + properties: + eventId: + type: string + description: The CID of the created event + streamId: + type: string + description: The stream ID this event belongs to + EventAcceptedResponse: + title: Response when event is accepted but validation is pending + description: Returned when an event is stored but validation did not complete in time. The event may still be validated successfully. + type: object + required: + - eventId + - message + properties: + eventId: + type: string + description: The CID of the accepted event + message: + type: string + description: Information about the pending validation status EventFeed: title: Ceramic Event feed data description: Ceramic event keys as part of a Ceramic Stream diff --git a/api/src/server.rs b/api/src/server.rs index 00707438d..ef2a2d962 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -7,7 +7,7 @@ mod event; use std::collections::{HashMap, HashSet}; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{future::Future, ops::Range}; use std::{marker::PhantomData, ops::RangeBounds}; use std::{net::SocketAddr, ops::Bound}; @@ -38,7 +38,7 @@ use ceramic_api_server::{ use ceramic_core::{ ensure_multiaddr_has_p2p, Cid, EventId, Interest, Network, NodeId, PeerId, StreamId, }; -use ceramic_pipeline::aggregator::{AggregatorHandle, StreamStateMsg}; +use ceramic_pipeline::aggregator::{AggregatorHandle, EventValidationStatus, StreamStateMsg}; use ceramic_pipeline::PipelineHandle; use futures::TryFutureExt; use multiaddr::Protocol; @@ -47,7 +47,6 @@ use shutdown::Shutdown; use swagger::{ApiError, ByteArray}; #[cfg(not(target_env = "msvc"))] use tikv_jemalloc_ctl::epoch; -use tokio::sync::broadcast; use tracing::{instrument, trace, Level}; use crate::server::event::event_id_from_car; @@ -706,6 +705,10 @@ where } }; + let event_cid = event_id + .cid() + .ok_or_else(|| ErrorResponse::new("Event ID missing CID".to_string()))?; + let (tx, rx) = tokio::sync::oneshot::channel(); tokio::time::timeout( INSERT_ENQUEUE_TIMEOUT, @@ -738,13 +741,63 @@ where .map_err(|e| ErrorResponse::new(format!("Failed to insert event: {e}")))?; match new { - EventInsertResult::Success(_) => Ok(EventsPostResponse::Success), + EventInsertResult::Success(_) => { + // Wait for the event to be processed by the pipeline and check validation + self.wait_for_event_validation(event_cid).await + } EventInsertResult::Failed(_, reason) => Ok(EventsPostResponse::BadRequest( BadRequestResponse::new(reason), )), } } + /// Wait for an event to be processed by the pipeline and return validation result. + /// + /// Uses event-based subscription to the aggregator's broadcast channel rather than polling. + /// The aggregator broadcasts processed events, and we filter for the specific event CID. + async fn wait_for_event_validation( + &self, + event_cid: Cid, + ) -> Result { + // If no pipeline/aggregator, return success immediately (no validation) + let aggregator = match self.pipeline.as_ref().and_then(|p| p.aggregator()) { + Some(agg) => agg, + None => { + // No aggregator available, return success with just the event ID + return Ok(EventsPostResponse::Success( + models::EventCreatedResponse::new(event_cid.to_string(), String::new()), + )); + } + }; + + // Wait for the event to be processed with timeout + let status = aggregator + .wait_for_event_validation(event_cid, INSERT_REQUEST_TIMEOUT) + .await + .map_err(|e| ErrorResponse::new(format!("Error waiting for event validation: {e}")))?; + + match status { + Some(EventValidationStatus::Valid { stream_id }) => Ok(EventsPostResponse::Success( + models::EventCreatedResponse::new(event_cid.to_string(), stream_id.to_string()), + )), + Some(EventValidationStatus::Invalid { stream_id, errors }) => { + Ok(EventsPostResponse::BadRequest(BadRequestResponse::new( + format!( + "Validation failed for stream {}: {}", + stream_id, + errors.join("; ") + ), + ))) + } + None => Ok(EventsPostResponse::EventAcceptedButValidationPending( + models::EventAcceptedResponse::new( + event_cid.to_string(), + "Event accepted but validation did not complete in time. The event is stored and may still be validated. Use the event_id to check status.".to_string(), + ), + )), + } + } + pub async fn post_interests( &self, interest: models::Interest, diff --git a/api/src/tests.rs b/api/src/tests.rs index 41bd4ab07..b85a5cdae 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -10,6 +10,8 @@ use crate::{ }; use anyhow::Result; +use arrow::array::{BinaryArray, ListBuilder, RecordBatch, StringBuilder, UInt8Array}; +use arrow::datatypes::{DataType, Field, Schema}; use async_trait::async_trait; use ceramic_api_server::{ models::{self}, @@ -21,7 +23,9 @@ use ceramic_pipeline::{ aggregator::{mock::MockAggregator, StreamState}, PipelineHandle, }; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use expect_test::expect; +use futures::stream; use mockall::{mock, predicate}; use multiaddr::Multiaddr; use multibase::Base; @@ -248,7 +252,7 @@ async fn create_event() { ) .await .unwrap(); - assert!(matches!(resp, EventsPostResponse::Success)); + assert!(matches!(resp, EventsPostResponse::Success(_))); } #[test(tokio::test)] @@ -303,8 +307,8 @@ async fn create_event_twice() { &Context, ), ); - assert_eq!(resp1.unwrap(), EventsPostResponse::Success); - assert_eq!(resp2.unwrap(), EventsPostResponse::Success); + assert!(matches!(resp1.unwrap(), EventsPostResponse::Success(_))); + assert!(matches!(resp2.unwrap(), EventsPostResponse::Success(_))); } #[test(tokio::test)] async fn create_event_fails() { @@ -1122,3 +1126,224 @@ async fn stream_state() { "#]] .assert_debug_eq(&String::from_utf8(multibase::decode(state.data).unwrap().1)); } + +/// Creates a RecordBatch for mocking validation status responses. +/// This creates the minimal schema needed by AggregatorHandle::parse_validation_status_from_batch +fn create_validation_status_batch( + stream_id: &StreamId, + validation_errors: Vec, +) -> RecordBatch { + use int_enum::IntEnum; + + // Create a minimal schema with just the columns needed for validation status + let schema = Arc::new(Schema::new(vec![ + Field::new("stream_cid", DataType::Binary, false), + Field::new("stream_type", DataType::UInt8, false), + Field::new( + "validation_errors", + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + true, + ), + ])); + + // Build the arrays + let stream_cid_bytes = stream_id.cid.to_bytes(); + let stream_cid_array = BinaryArray::from(vec![stream_cid_bytes.as_slice()]); + let stream_type_array = UInt8Array::from(vec![stream_id.r#type.int_value() as u8]); + + // Build validation_errors list + // For ListBuilder: first append values, then call append(true) to finalize the list row + let mut list_builder = ListBuilder::new(StringBuilder::new()); + for error in &validation_errors { + list_builder.values().append_value(error); + } + list_builder.append(true); // Finalize the list for this row + let validation_errors_array = list_builder.finish(); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(stream_cid_array), + Arc::new(stream_type_array), + Arc::new(validation_errors_array), + ], + ) + .expect("Failed to create RecordBatch") +} + +#[test(tokio::test)] +async fn create_event_with_pipeline_valid() { + let node_id = NodeKey::random().id(); + let network = Network::Mainnet; + let expected_event_id = EventId::try_from(hex::decode(DATA_EVENT_ID).unwrap()).unwrap(); + + // Remove whitespace from event CAR file + let event_data = DATA_EVENT_CAR + .chars() + .filter(|c| !c.is_whitespace()) + .collect::(); + let mock_interest = MockAccessInterestStoreTest::new(); + let mut mock_event_store = MockEventStoreTest::new(); + mock_get_init_event(&mut mock_event_store); + let args = vec![ApiItem::new( + expected_event_id.clone(), + decode_multibase_data(&event_data).unwrap(), + )]; + + mock_event_store + .expect_insert_many() + .with(predicate::eq(args), predicate::eq(node_id)) + .times(1) + .returning(|input, _| { + Ok(input + .into_iter() + .map(|v| EventInsertResult::new_ok(v.key.clone())) + .collect()) + }); + + // Mock aggregator to return valid validation status via subscribe_since + let mut aggregator = MockAggregator::new(); + let stream_id: StreamId = "k2t6wzhjp5kk3zrbu5tyfjqdrhxyvwnzmxv8htviiganzacva34pfedi5g72tp" + .parse() + .unwrap(); + let batch = create_validation_status_batch(&stream_id, vec![]); + let schema = batch.schema(); + aggregator + .expect_handle_subscribe_since() + .returning(move |_msg| { + let batch = batch.clone(); + let schema = schema.clone(); + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::once(async move { Ok(batch) }), + ))) + }); + + let pipeline = PipelineHandle::new( + ceramic_pipeline::pipeline_ctx(Arc::new(InMemory::new())) + .await + .unwrap(), + None, + Some(MockAggregator::spawn(aggregator)), + ); + + let server = create_test_server( + node_id, + network, + mock_interest, + Arc::new(mock_event_store), + MockP2PService::new(), + Some(pipeline), + ); + let resp = server + .events_post( + models::EventData { + data: event_data.to_string(), + }, + &Context, + ) + .await + .unwrap(); + + // Verify we get a Success response with event ID and stream ID + match resp { + EventsPostResponse::Success(created) => { + assert!(!created.event_id.is_empty()); + assert!(!created.stream_id.is_empty()); + } + other => panic!("Expected Success response, got: {:?}", other), + } +} + +#[test(tokio::test)] +async fn create_event_with_pipeline_invalid() { + let node_id = NodeKey::random().id(); + let network = Network::Mainnet; + let expected_event_id = EventId::try_from(hex::decode(DATA_EVENT_ID).unwrap()).unwrap(); + + // Remove whitespace from event CAR file + let event_data = DATA_EVENT_CAR + .chars() + .filter(|c| !c.is_whitespace()) + .collect::(); + let mock_interest = MockAccessInterestStoreTest::new(); + let mut mock_event_store = MockEventStoreTest::new(); + mock_get_init_event(&mut mock_event_store); + let args = vec![ApiItem::new( + expected_event_id.clone(), + decode_multibase_data(&event_data).unwrap(), + )]; + + mock_event_store + .expect_insert_many() + .with(predicate::eq(args), predicate::eq(node_id)) + .times(1) + .returning(|input, _| { + Ok(input + .into_iter() + .map(|v| EventInsertResult::new_ok(v.key.clone())) + .collect()) + }); + + // Mock aggregator to return invalid validation status with errors via subscribe_since + let mut aggregator = MockAggregator::new(); + let stream_id: StreamId = "k2t6wzhjp5kk3zrbu5tyfjqdrhxyvwnzmxv8htviiganzacva34pfedi5g72tp" + .parse() + .unwrap(); + let batch = create_validation_status_batch( + &stream_id, + vec![ + "Missing required field: name".to_string(), + "Invalid field type: age must be integer".to_string(), + ], + ); + let schema = batch.schema(); + aggregator + .expect_handle_subscribe_since() + .returning(move |_msg| { + let batch = batch.clone(); + let schema = schema.clone(); + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::once(async move { Ok(batch) }), + ))) + }); + + let pipeline = PipelineHandle::new( + ceramic_pipeline::pipeline_ctx(Arc::new(InMemory::new())) + .await + .unwrap(), + None, + Some(MockAggregator::spawn(aggregator)), + ); + + let server = create_test_server( + node_id, + network, + mock_interest, + Arc::new(mock_event_store), + MockP2PService::new(), + Some(pipeline), + ); + let resp = server + .events_post( + models::EventData { + data: event_data.to_string(), + }, + &Context, + ) + .await + .unwrap(); + + // Verify we get a BadRequest response with validation errors + match resp { + EventsPostResponse::BadRequest(bad_request) => { + assert!(bad_request.message.contains("Validation failed")); + assert!(bad_request.message.contains("Missing required field: name")); + assert!(bad_request + .message + .contains("Invalid field type: age must be integer")); + } + other => panic!("Expected BadRequest response, got: {:?}", other), + } +} diff --git a/pipeline/src/aggregator/metrics.rs b/pipeline/src/aggregator/metrics.rs index 35e6147fc..e11db721a 100644 --- a/pipeline/src/aggregator/metrics.rs +++ b/pipeline/src/aggregator/metrics.rs @@ -3,7 +3,7 @@ use ceramic_metrics::Recorder; use crate::metrics::{MessageLabels, Metrics}; -use super::{AggregatorRecorder, NewConclusionEventsMsg, StreamStateMsg}; +use super::{AggregatorRecorder, EventValidationStatusMsg, NewConclusionEventsMsg, StreamStateMsg}; impl Recorder> for Metrics { fn record(&self, event: &MessageEvent) { @@ -21,4 +21,11 @@ impl Recorder> for Metrics { .inc(); } } +impl Recorder> for Metrics { + fn record(&self, event: &MessageEvent) { + self.message_count + .get_or_create(&MessageLabels::from(event)) + .inc(); + } +} impl AggregatorRecorder for Metrics {} diff --git a/pipeline/src/aggregator/mock.rs b/pipeline/src/aggregator/mock.rs index c73159c69..ce7fe769c 100644 --- a/pipeline/src/aggregator/mock.rs +++ b/pipeline/src/aggregator/mock.rs @@ -7,8 +7,8 @@ use prometheus_client::registry::Registry; use crate::metrics::Metrics; use super::{ - Aggregator, AggregatorActor, AggregatorEnvelope, AggregatorHandle, NewConclusionEventsMsg, - StreamStateMsg, SubscribeSinceMsg, + Aggregator, AggregatorActor, AggregatorEnvelope, AggregatorHandle, EventValidationStatusMsg, + NewConclusionEventsMsg, StreamStateMsg, SubscribeSinceMsg, }; mock! { @@ -34,6 +34,11 @@ mock! { message: StreamStateMsg, ) -> ::Result; + #[allow(missing_docs)] + pub fn handle_event_validation_status( + &mut self, + message: EventValidationStatusMsg, + ) -> ::Result; } } @@ -64,6 +69,16 @@ impl Handler for MockAggregator { } } +#[async_trait] +impl Handler for MockAggregator { + async fn handle( + &mut self, + message: EventValidationStatusMsg, + ) -> ::Result { + self.handle_event_validation_status(message) + } +} + impl Actor for MockAggregator { type Envelope = AggregatorEnvelope; } diff --git a/pipeline/src/aggregator/mod.rs b/pipeline/src/aggregator/mod.rs index d26d05de7..3123c2669 100644 --- a/pipeline/src/aggregator/mod.rs +++ b/pipeline/src/aggregator/mod.rs @@ -958,6 +958,7 @@ actor_envelope! { // TODO: Remove this message and use the analogous message on the Resolver. // This way the canonical stream state is provided via the API StreamState => StreamStateMsg, + EventValidationStatus => EventValidationStatusMsg, } #[async_trait] @@ -1219,6 +1220,108 @@ impl Handler for Aggregator { } } +/// Request the validation status of an event by its CID +#[derive(Debug)] +pub struct EventValidationStatusMsg { + /// CID of the event + pub event_cid: Cid, +} + +impl Message for EventValidationStatusMsg { + type Result = anyhow::Result>; +} + +/// Validation status of an event +#[derive(Debug)] +pub enum EventValidationStatus { + /// Event passed validation + Valid { + /// Stream ID this event belongs to + stream_id: StreamId, + }, + /// Event failed validation + Invalid { + /// Stream ID this event belongs to + stream_id: StreamId, + /// Validation error messages + errors: Vec, + }, +} + +#[async_trait] +impl Handler for Aggregator { + #[instrument(skip(self), ret, err)] + async fn handle( + &mut self, + message: EventValidationStatusMsg, + ) -> ::Result { + let event_cid = message.event_cid; + + // Query event_states table for this event CID + let result_batch = self + .ctx + .table(EVENT_STATES_TABLE) + .await + .context("table not found {EVENT_STATES_TABLE}")? + .select(vec![col("stream_cid"), col("validation_errors")]) + .context("invalid select")? + .filter(col("event_cid").eq(lit(event_cid.to_bytes()))) + .context("invalid filter")? + .collect() + .await + .context("invalid query")?; + + let num_rows: usize = result_batch.iter().map(|b| b.num_rows()).sum(); + if num_rows == 0 { + // Event not yet processed + return Ok(None); + } + + let batch = concat_batches(&result_batch[0].schema(), result_batch.iter()) + .context("concat batches")?; + + // Get stream_cid + let stream_cid_col = as_binary_array( + batch + .column_by_name("stream_cid") + .ok_or_else(|| anyhow::anyhow!("stream_cid column should exist"))?, + ) + .context("stream_cid as a binary column")?; + let stream_cid = Cid::read_bytes(stream_cid_col.value(0)).context("stream_cid as a CID")?; + let stream_id = StreamId { + r#type: StreamIdType::ModelInstanceDocument, // Default, could be determined from data + cid: stream_cid, + }; + + // Get validation_errors + let validation_errors_col = batch + .column_by_name("validation_errors") + .ok_or_else(|| anyhow::anyhow!("validation_errors column should exist"))?; + + // Check if validation_errors is empty + let list_array = validation_errors_col + .as_any() + .downcast_ref::() + .ok_or_else(|| anyhow::anyhow!("validation_errors should be a list array"))?; + + if list_array.is_null(0) || list_array.value(0).is_empty() { + return Ok(Some(EventValidationStatus::Valid { stream_id })); + } + + let errors_array = list_array.value(0); + let string_array = errors_array + .as_any() + .downcast_ref::() + .ok_or_else(|| anyhow::anyhow!("validation_errors items should be strings"))?; + + let errors: Vec = (0..string_array.len()) + .map(|i| string_array.value(i).to_string()) + .collect(); + + Ok(Some(EventValidationStatus::Invalid { stream_id, errors })) + } +} + #[async_trait] impl FeedTableSource for AggregatorHandle { fn schema(&self) -> SchemaRef { @@ -1240,6 +1343,106 @@ impl FeedTableSource for AggregatorHandle { } } +impl AggregatorHandle { + /// Wait for an event to be processed and return its validation status. + /// + /// Subscribes to the event_states stream filtered by the event CID and waits + /// for the event to appear (either from historical data or live broadcast). + /// Returns the validation status once the event is processed, or None on timeout. + pub async fn wait_for_event_validation( + &self, + event_cid: Cid, + timeout: std::time::Duration, + ) -> anyhow::Result> { + // Subscribe to event_states with filter for this specific event_cid. + // The subscription combines historical query results with live broadcast updates, + // so we get notified immediately if the event is already processed, or when it arrives. + let filter = col("event_cid").eq(lit(event_cid.to_bytes())); + + let mut stream = self + .send(SubscribeSinceMsg { + projection: None, + filters: Some(vec![filter]), + limit: Some(1), // Only need the first (and only) matching row + }) + .await??; + + // Wait for the event with timeout + // Timeout or stream error returns None + let Some(batch) = tokio::time::timeout(timeout, stream.try_next()) + .await + .ok() + .and_then(|r| r.ok()) + .flatten() + else { + return Ok(None); + }; + + // Parse the batch to extract validation status + Self::parse_validation_status_from_batch(&batch) + } + + fn parse_validation_status_from_batch( + batch: &RecordBatch, + ) -> anyhow::Result> { + use arrow::array::{Array as _, AsArray as _}; + use int_enum::IntEnum as _; + + if batch.num_rows() == 0 { + return Ok(None); + } + + // Get stream_cid + let stream_cid_col = as_binary_array( + batch + .column_by_name("stream_cid") + .ok_or_else(|| anyhow::anyhow!("stream_cid column should exist"))?, + ) + .context("stream_cid as a binary column")?; + let stream_cid = Cid::read_bytes(stream_cid_col.value(0)).context("stream_cid as a CID")?; + + // Get stream_type + let stream_type_col = batch + .column_by_name("stream_type") + .ok_or_else(|| anyhow::anyhow!("stream_type column should exist"))? + .as_primitive::(); + let stream_type_value = stream_type_col.value(0) as u64; + let stream_type = StreamIdType::from_int(stream_type_value) + .map_err(|_| anyhow::anyhow!("Invalid stream type: {stream_type_value}"))?; + + let stream_id = StreamId { + r#type: stream_type, + cid: stream_cid, + }; + + // Get validation_errors + let validation_errors_col = batch + .column_by_name("validation_errors") + .ok_or_else(|| anyhow::anyhow!("validation_errors column should exist"))?; + + let list_array = validation_errors_col + .as_any() + .downcast_ref::() + .ok_or_else(|| anyhow::anyhow!("validation_errors should be a list array"))?; + + if list_array.is_null(0) || list_array.value(0).is_empty() { + return Ok(Some(EventValidationStatus::Valid { stream_id })); + } + + let errors_array = list_array.value(0); + let string_array = errors_array + .as_any() + .downcast_ref::() + .ok_or_else(|| anyhow::anyhow!("validation_errors items should be strings"))?; + + let errors: Vec = (0..string_array.len()) + .map(|i| string_array.value(i).to_string()) + .collect(); + + Ok(Some(EventValidationStatus::Invalid { stream_id, errors })) + } +} + // Small wrapper container around the data/state fields to hold // other mutable metadata for the event. // This is specific to Models and Model Instance Documents. diff --git a/pipeline/src/concluder/mod.rs b/pipeline/src/concluder/mod.rs index 29c2862f2..a746088b3 100644 --- a/pipeline/src/concluder/mod.rs +++ b/pipeline/src/concluder/mod.rs @@ -228,7 +228,7 @@ async fn poll_new_events( metrics: Metrics, mut shutdown: ShutdownSignal, ) -> anyhow::Result<()> { - let mut interval = interval(Duration::from_millis(1_000)); + let mut interval = interval(Duration::from_millis(100)); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); // Poll for new events until shutdown diff --git a/sdk/packages/model-instance-client/src/client.ts b/sdk/packages/model-instance-client/src/client.ts index e2ce480df..be6cdd50b 100644 --- a/sdk/packages/model-instance-client/src/client.ts +++ b/sdk/packages/model-instance-client/src/client.ts @@ -23,6 +23,27 @@ import { } from './events.js' import type { DocumentState, UnknownContent } from './types.js' +function isObject(item: unknown): item is Record { + return item != null && typeof item === 'object' && !Array.isArray(item) +} + +function deepMerge( + target: Record, + source: Record, +): Record { + const result = { ...target } + for (const key of Object.keys(source)) { + const sourceValue = source[key] + const targetValue = target[key] + if (isObject(sourceValue) && isObject(targetValue)) { + result[key] = deepMerge(targetValue, sourceValue) + } else { + result[key] = sourceValue + } + } + return result +} + /** * Parameters for creating a singleton instance of a model. */ @@ -257,8 +278,7 @@ export class ModelInstanceClient extends StreamClient { * @returns A promise that resolves to the updated `DocumentState`. * * @remarks - * This method posts the new content as a data event, updating the document. - * It can optionally take the current document state to avoid re-fetching it. + * Uses deep merge - nested objects are merged recursively, arrays are replaced. */ async updateDocument( params: UpdateDataParams, @@ -274,13 +294,15 @@ export class ModelInstanceClient extends StreamClient { currentState = this.streamStateToDocumentState(params.currentState) } - const { content } = currentState + const { content: currentContent } = currentState const { controller, newContent, shouldIndex, modelVersion } = params + const mergedContent = deepMerge(currentContent ?? {}, newContent) as T + const newCommit = await this.postData({ controller: this.getDID(controller), - currentContent: content ?? undefined, - newContent, + currentContent: currentContent ?? undefined, + newContent: mergedContent, currentID: currentState.commitID, shouldIndex, modelVersion, @@ -288,7 +310,7 @@ export class ModelInstanceClient extends StreamClient { return { commitID: newCommit, - content: newContent, + content: mergedContent, metadata: { model: currentState.metadata.model, controller: currentState.metadata.controller, diff --git a/sdk/packages/model-instance-client/test/lib.test.ts b/sdk/packages/model-instance-client/test/lib.test.ts index ed86c1ba7..1047f028e 100644 --- a/sdk/packages/model-instance-client/test/lib.test.ts +++ b/sdk/packages/model-instance-client/test/lib.test.ts @@ -355,5 +355,55 @@ describe('ModelInstanceClient', () => { expect(postEventType).toHaveBeenCalled() expect(mockGet).not.toHaveBeenCalled() }) + + test('merges partial updates with existing content', async () => { + // Create mock state with multiple fields + const multiFieldMockState = { + id: 'k2t6wyfsu4pfy7r1jdd6jex9oxbqyp4gr2a5kxs8ioxwtisg8nzj3anbckji8g', + event_cid: + 'bafyreib5j4def5a4w4j6sg4upm6nb4cfn752wdjwqtwdzejfladyyymxca', + controller: 'did:key:z6MkiTBz1ymuepAQ4HEHYSF1H8quG5GLVVQR3djdX3mDooWp', + dimensions: { + context: 'u', + controller: + 'uZGlkOmtleTp6Nk1raVRCejF5bXVlcEFRNEhFSFlTRjFIOHF1RzVHTFZWUVIzZGpkWDNtRG9vV3A', + model: 'uzgEAAXESIA8og02Dnbwed_besT8M0YOnaZ-hrmMZaa7mnpdUL8jE', + }, + // Encoded JSON: {"metadata":{"shouldIndex":true},"content":{"field1":"original1","field2":"original2"}} + data: 'ueyJtZXRhZGF0YSI6eyJzaG91bGRJbmRleCI6dHJ1ZX0sImNvbnRlbnQiOnsiZmllbGQxIjoib3JpZ2luYWwxIiwiZmllbGQyIjoib3JpZ2luYWwyIn19', + } + + const streamId = + 'k2t6wyfsu4pfy7r1jdd6jex9oxbqyp4gr2a5kxs8ioxwtisg8nzj3anbckji8g' + + // Mock CeramicClient and its API + const postEventType = jest.fn(() => randomCID()) + const mockGet = jest.fn(() => + Promise.resolve({ + data: multiFieldMockState, + error: null, + }), + ) + + const ceramic = { + api: { GET: mockGet }, + postEventType, + } as unknown as CeramicClient + const client = new ModelInstanceClient({ ceramic, did: authenticatedDID }) + + // Only update field1, leave field2 untouched + const partialUpdate = { field1: 'updated1' } + + const newState = await client.updateDocument({ + streamID: streamId, + newContent: partialUpdate, + }) + + // Verify that field2 is preserved and field1 is updated + expect(newState.content).toEqual({ + field1: 'updated1', + field2: 'original2', + }) + }) }) }) diff --git a/tests/suite/src/__tests__/correctness/fast/flight-sql.test.ts b/tests/suite/src/__tests__/correctness/fast/flight-sql.test.ts index 3f847fdd6..1fd9a58fc 100644 --- a/tests/suite/src/__tests__/correctness/fast/flight-sql.test.ts +++ b/tests/suite/src/__tests__/correctness/fast/flight-sql.test.ts @@ -262,7 +262,7 @@ describe('flight sql', () => { schema: { type: 'object', properties: { - test: { type: 'string', maxLength: 10 }, + test: { type: 'string', maxLength: 100 }, }, additionalProperties: false, }, diff --git a/tests/suite/src/__tests__/correctness/fast/sync-events.test.ts b/tests/suite/src/__tests__/correctness/fast/sync-events.test.ts index e069e6214..6f836f74c 100644 --- a/tests/suite/src/__tests__/correctness/fast/sync-events.test.ts +++ b/tests/suite/src/__tests__/correctness/fast/sync-events.test.ts @@ -37,11 +37,11 @@ async function writeEvents(url: string, events: ReconEventInput[]) { method: 'POST', body: JSON.stringify(event), }) - if (response.status !== 204) { + if (response.status !== 202) { const data = await response.text() console.warn(`writeEvents: node ${url}, result: ${data}`) } - expect(response.status).toEqual(204) + expect(response.status).toEqual(202) } }