diff --git a/docs/docs/core/data_types.mdx b/docs/docs/core/data_types.mdx index d3209b886..daa30f078 100644 --- a/docs/docs/core/data_types.mdx +++ b/docs/docs/core/data_types.mdx @@ -46,6 +46,7 @@ This is the list of all primitive types supported by CocoIndex: | *Bytes* | `bytes` | | | | *Str* | `str` | | | | *Bool* | `bool` | | | +| *Enum* | `str`, `cocoindex.typing.Enum()` | | | | *Int64* | `cocoindex.Int64`, `int`, `numpy.int64` | | | | *Float32* | `cocoindex.Float32`, `numpy.float32` | *Float64* | | | *Float64* | `cocoindex.Float64`, `float`, `numpy.float64` | | | @@ -84,6 +85,9 @@ Notes: In Python, it's represented by `cocoindex.Json`. It's useful to hold data without fixed schema known at flow definition time. +#### Enum Type + +*Enum* represents a string-like enumerated type. In Python, use the helper from `cocoindex.typing`. #### Vector Types diff --git a/docs/docs/examples/examples/docs_to_knowledge_graph.md b/docs/docs/examples/examples/docs_to_knowledge_graph.md index 0c644f416..ad3a99180 100644 --- a/docs/docs/examples/examples/docs_to_knowledge_graph.md +++ b/docs/docs/examples/examples/docs_to_knowledge_graph.md @@ -373,4 +373,4 @@ You can open it at [http://localhost:7474](http://localhost:7474), and run the f MATCH p=()-->() RETURN p ``` -![Neo4j Browser](/img/examples/docs_to_knowledge_graph/neo4j_browser.png) \ No newline at end of file +![Neo4j Browser](/img/examples/docs_to_knowledge_graph/neo4j_browser.png) diff --git a/docs/docs/sources/index.md b/docs/docs/sources/index.md index 09cbe1662..0857b1467 100644 --- a/docs/docs/sources/index.md +++ b/docs/docs/sources/index.md @@ -17,6 +17,6 @@ In CocoIndex, a source is the data origin you import from (e.g., files, database | [Postgres](/docs/sources/postgres) | Relational database (Postgres) | Related: -- [Life cycle of a indexing flow](/docs/core/basics#life-cycle-of-an-indexing-flow) -- [Live Update Tutorial](/docs/tutorials/live_updates) +- [Life cycle of a indexing flow](/docs/core/basics#life-cycle-of-an-indexing-flow) +- [Live Update Tutorial](/docs/tutorials/live_updates) for change capture mechanisms. diff --git a/docs/docs/targets/index.md b/docs/docs/targets/index.md index c90d76545..f90a5c329 100644 --- a/docs/docs/targets/index.md +++ b/docs/docs/targets/index.md @@ -334,6 +334,3 @@ You can find end-to-end examples fitting into any of supported property graphs i * * - - - diff --git a/docs/docs/targets/kuzu.md b/docs/docs/targets/kuzu.md index 441e9e784..dc7410638 100644 --- a/docs/docs/targets/kuzu.md +++ b/docs/docs/targets/kuzu.md @@ -13,7 +13,7 @@ Exports data to a [Kuzu](https://kuzu.com/) graph database. ## Get Started -Read [Property Graph Targets](./index.md#property-graph-targets) for more information to get started on how it works in CocoIndex. +Read [Property Graph Targets](./index.md#property-graph-targets) for more information to get started on how it works in CocoIndex. ## Spec @@ -59,4 +59,4 @@ You can then access the explorer at [http://localhost:8124](http://localhost:812 href="https://github.com/cocoindex-io/cocoindex/tree/main/examples/docs_to_knowledge_graph" text="Docs to Knowledge Graph" margin="16px 0 24px 0" -/> \ No newline at end of file +/> diff --git a/docs/docs/targets/neo4j.md b/docs/docs/targets/neo4j.md index ab9e0d16e..5e4fdb22c 100644 --- a/docs/docs/targets/neo4j.md +++ b/docs/docs/targets/neo4j.md @@ -11,7 +11,7 @@ import { ExampleButton } from '../../src/components/GitHubButton'; ## Get Started -Read [Property Graph Targets](./index.md#property-graph-targets) for more information to get started on how it works in CocoIndex. +Read [Property Graph Targets](./index.md#property-graph-targets) for more information to get started on how it works in CocoIndex. ## Spec @@ -59,4 +59,4 @@ If you are building multiple CocoIndex flows from different projects to neo4j, w This way, you can clean up the data for each flow independently. -In case you need to clean up the data in the same database, you can do it manually by running `cocoindex drop ` from the project you want to clean up. \ No newline at end of file +In case you need to clean up the data in the same database, you can do it manually by running `cocoindex drop ` from the project you want to clean up. diff --git a/examples/product_recommendation/README.md b/examples/product_recommendation/README.md index f3ce29b0a..314464cf1 100644 --- a/examples/product_recommendation/README.md +++ b/examples/product_recommendation/README.md @@ -8,7 +8,7 @@ Please drop [CocoIndex on Github](https://github.com/cocoindex-io/cocoindex) a s ## Prerequisite -* [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) +* [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) * Install [Neo4j](https://cocoindex.io/docs/targets/neo4j) * [Configure your OpenAI API key](https://cocoindex.io/docs/ai/llm#openai). diff --git a/python/cocoindex/typing.py b/python/cocoindex/typing.py index c4b0ef600..007ac9055 100644 --- a/python/cocoindex/typing.py +++ b/python/cocoindex/typing.py @@ -13,6 +13,8 @@ Literal, NamedTuple, Protocol, + Optional, + Sequence, TypeVar, overload, Self, @@ -587,6 +589,7 @@ class BasicValueType: "OffsetDateTime", "TimeDelta", "Json", + "Enum", "Vector", "Union", ] diff --git a/src/base/json_schema.rs b/src/base/json_schema.rs index c7a9756cc..3c833c717 100644 --- a/src/base/json_schema.rs +++ b/src/base/json_schema.rs @@ -1,6 +1,6 @@ use crate::prelude::*; - use crate::utils::immutable::RefList; +use indexmap::IndexMap; use schemars::schema::{ ArrayValidation, InstanceType, ObjectValidation, Schema, SchemaObject, SingleOrVec, SubschemaValidation, @@ -74,6 +74,9 @@ impl JsonSchemaBuilder { schema::BasicValueType::Str => { schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::String))); } + schema::BasicValueType::Enum(_) => { + schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::String))); + } schema::BasicValueType::Bytes => { schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::String))); } @@ -245,15 +248,34 @@ impl JsonSchemaBuilder { field_path.prepend(&f.name), ); if self.options.fields_always_required && f.value_type.nullable { - if let Some(instance_type) = &mut field_schema.instance_type { - let mut types = match instance_type { - SingleOrVec::Single(t) => vec![**t], - SingleOrVec::Vec(t) => std::mem::take(t), + if field_schema.enum_values.is_some() { + // Keep the enum as-is and support null via oneOf + let non_null = Schema::Object(field_schema); + let null_branch = Schema::Object(SchemaObject { + instance_type: Some(SingleOrVec::Single(Box::new( + InstanceType::Null, + ))), + ..Default::default() + }); + field_schema = SchemaObject { + subschemas: Some(Box::new(SubschemaValidation { + one_of: Some(vec![non_null, null_branch]), + ..Default::default() + })), + ..Default::default() }; - types.push(InstanceType::Null); - *instance_type = SingleOrVec::Vec(types); + } else { + if let Some(instance_type) = &mut field_schema.instance_type { + let mut types = match instance_type { + SingleOrVec::Single(t) => vec![**t], + SingleOrVec::Vec(t) => std::mem::take(t), + }; + types.push(InstanceType::Null); + *instance_type = SingleOrVec::Vec(types); + } } } + (f.name.to_string(), field_schema.into()) }) .collect(), @@ -298,9 +320,39 @@ impl JsonSchemaBuilder { enriched_value_type: &schema::EnrichedValueType, field_path: RefList<'_, &'_ spec::FieldName>, ) -> SchemaObject { - self.for_value_type(schema_base, &enriched_value_type.typ, field_path) - } + let mut out = self.for_value_type(schema_base, &enriched_value_type.typ, field_path); + + if let schema::ValueType::Basic(schema::BasicValueType::Enum(enum_t)) = + &enriched_value_type.typ + { + let mut vals: Vec = enum_t + .variants + .iter() + .map(|s| serde_json::Value::String(s.to_string())) + .collect(); + + if vals.is_empty() { + if let Some(a) = enriched_value_type + .attrs + .get("variants") + .and_then(|v| v.as_array()) + { + vals = a + .iter() + .filter_map(|v| { + v.as_str().map(|s| serde_json::Value::String(s.to_string())) + }) + .collect(); + } + } + + if !vals.is_empty() { + out.enum_values = Some(vals); + } + } + out + } fn build_extra_instructions(&self) -> Result> { if self.extra_instructions_per_field.is_empty() { return Ok(None); @@ -458,6 +510,57 @@ mod tests { .assert_eq(&serde_json::to_string_pretty(&json_schema).unwrap()); } + #[test] + fn test_basic_types_enum_without_variants() { + let value_type = EnrichedValueType { + typ: ValueType::Basic(BasicValueType::Enum(EnumTypeSchema::default())), + nullable: false, + attrs: Arc::new(BTreeMap::new()), + }; + + let options = create_test_options(); + let result = build_json_schema(value_type, options).unwrap(); + let json_schema = schema_to_json(&result.schema); + + expect![[r#" + { + "type": "string" + }"#]] + .assert_eq(&serde_json::to_string_pretty(&json_schema).unwrap()); + } + + #[test] + fn test_basic_types_enum_with_variants() { + let mut attrs = BTreeMap::new(); + attrs.insert( + "variants".to_string(), + serde_json::json!(["red", "green", "blue"]), + ); + + let value_type = EnrichedValueType { + typ: ValueType::Basic(BasicValueType::Enum(EnumTypeSchema { + variants: vec!["red".into(), "green".into(), "blue".into()], + })), + nullable: false, + attrs: Arc::new(BTreeMap::new()), + }; + + let options = create_test_options(); + let result = build_json_schema(value_type, options).unwrap(); + let json_schema = schema_to_json(&result.schema); + + expect![[r#" + { + "enum": [ + "red", + "green", + "blue" + ], + "type": "string" + }"#]] + .assert_eq(&serde_json::to_string_pretty(&json_schema).unwrap()); + } + #[test] fn test_basic_types_bool() { let value_type = EnrichedValueType { diff --git a/src/base/schema.rs b/src/base/schema.rs index 37898687c..4bcf54851 100644 --- a/src/base/schema.rs +++ b/src/base/schema.rs @@ -13,6 +13,10 @@ pub struct VectorTypeSchema { pub struct UnionTypeSchema { pub types: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct EnumTypeSchema { + pub variants: Vec>, +} #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "kind")] @@ -23,6 +27,9 @@ pub enum BasicValueType { /// String encoded in UTF-8. Str, + /// Enumerated symbolic value. + Enum(EnumTypeSchema), + /// A boolean value. Bool, @@ -71,6 +78,7 @@ impl std::fmt::Display for BasicValueType { match self { BasicValueType::Bytes => write!(f, "Bytes"), BasicValueType::Str => write!(f, "Str"), + BasicValueType::Enum(_) => write!(f, "Enum"), BasicValueType::Bool => write!(f, "Bool"), BasicValueType::Int64 => write!(f, "Int64"), BasicValueType::Float32 => write!(f, "Float32"), diff --git a/src/base/value.rs b/src/base/value.rs index 882097b53..53935e81f 100644 --- a/src/base/value.rs +++ b/src/base/value.rs @@ -202,6 +202,7 @@ impl KeyPart { KeyPart::Bytes(Bytes::from(BASE64_STANDARD.decode(v)?)) } BasicValueType::Str => KeyPart::Str(Arc::from(v)), + BasicValueType::Enum(_) => KeyPart::Str(Arc::from(v)), BasicValueType::Bool => KeyPart::Bool(v.parse()?), BasicValueType::Int64 => KeyPart::Int64(v.parse()?), BasicValueType::Range => { @@ -1136,6 +1137,9 @@ impl BasicValue { BasicValue::Bytes(Bytes::from(BASE64_STANDARD.decode(v)?)) } (serde_json::Value::String(v), BasicValueType::Str) => BasicValue::Str(Arc::from(v)), + (serde_json::Value::String(v), BasicValueType::Enum(_)) => { + BasicValue::Str(Arc::from(v)) + } (serde_json::Value::Bool(v), BasicValueType::Bool) => BasicValue::Bool(v), (serde_json::Value::Number(v), BasicValueType::Int64) => BasicValue::Int64( v.as_i64() diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 760219cf8..b3fe48293 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -10,6 +10,8 @@ use super::stats; use futures::future::try_join_all; use indicatif::ProgressBar; use sqlx::PgPool; +use std::collections::VecDeque; +use std::time::Instant; use tokio::{sync::watch, task::JoinSet, time::MissedTickBehavior}; pub struct FlowLiveUpdaterUpdates { @@ -95,8 +97,9 @@ struct SourceUpdateTask { status_tx: watch::Sender, num_remaining_tasks_tx: watch::Sender, + start_time: Instant, + rate_history: std::sync::Mutex>, } - impl Drop for SourceUpdateTask { fn drop(&mut self) { self.status_tx.send_modify(|update| { @@ -303,19 +306,56 @@ impl SourceUpdateTask { try_join_all(futs).await?; Ok(()) } + fn rate_bracket(&self) -> String { + let now = std::time::Instant::now(); + let total = self.source_update_stats.num_insertions.get() + + self.source_update_stats.num_deletions.get() + + self.source_update_stats.num_reprocesses.get(); + + let mut hist = self.rate_history.lock().unwrap(); + + // Drop entries older than 60s, but keep at least one + while let Some((t, _)) = hist.front().copied() { + if now.duration_since(t) > std::time::Duration::from_secs(60) && hist.len() > 1 { + hist.pop_front(); + } else { + break; + } + } + + // Push current sample + hist.push_back((now, total)); + + // Baseline is earliest kept sample (or now) + let (base_t, base_c) = hist.front().copied().unwrap_or((now, total)); + let secs = now.duration_since(base_t).as_secs_f64(); + let rate = if secs > 0.0 { + (total - base_c) as f64 / secs + } else { + 0.0 + }; + + format!( + " [elapsed: {:.0}s, {:.3}/s now]", + self.start_time.elapsed().as_secs_f64(), + rate + ) + } fn report_stats(&self, stats: &stats::UpdateStats, update_title: &str) { self.source_update_stats.merge(stats); + let bracket = self.rate_bracket(); + if self.options.print_stats { println!( - "{}.{} ({update_title}): {}", + "{}.{} ({update_title}): {}{bracket}", self.flow.flow_instance.name, self.import_op().name, stats ); } else { trace!( - "{}.{} ({update_title}): {}", + "{}.{} ({update_title}): {}{bracket}", self.flow.flow_instance.name, self.import_op().name, stats @@ -323,6 +363,29 @@ impl SourceUpdateTask { } } + fn report_stats_with_extras( + &self, + stats: &stats::UpdateStats, + update_title: &str, + elapsed_secs: f64, + recent_rate: f64, + ) { + self.source_update_stats.merge(stats); + let line = format!( + "{}.{} ({update_title}): {} [elapsed: {:.3}s, {:.3}/s now]", + self.flow.flow_instance.name, + self.import_op().name, + stats, + elapsed_secs, + recent_rate + ); + if self.options.print_stats { + println!("{line}"); + } else { + trace!("{line}"); + } + } + async fn update_one_pass( &self, source_indexing_context: &Arc, @@ -458,6 +521,8 @@ impl FlowLiveUpdater { options: options.clone(), status_tx: status_tx.clone(), num_remaining_tasks_tx: num_remaining_tasks_tx.clone(), + start_time: Instant::now(), + rate_history: std::sync::Mutex::new(VecDeque::from([(Instant::now(), 0)])), }; join_set.spawn(source_update_task.run()); stats_per_task.push(source_update_stats); diff --git a/src/ops/targets/kuzu.rs b/src/ops/targets/kuzu.rs index d6b0bbc1c..0145c3fa5 100644 --- a/src/ops/targets/kuzu.rs +++ b/src/ops/targets/kuzu.rs @@ -101,6 +101,7 @@ fn basic_type_to_kuzu(basic_type: &BasicValueType) -> Result { Ok(match basic_type { BasicValueType::Bytes => "BLOB".to_string(), BasicValueType::Str => "STRING".to_string(), + BasicValueType::Enum(_) => "STRING".to_string(), BasicValueType::Bool => "BOOL".to_string(), BasicValueType::Int64 => "INT64".to_string(), BasicValueType::Float32 => "FLOAT".to_string(), diff --git a/src/ops/targets/postgres.rs b/src/ops/targets/postgres.rs index ae808361d..0eae29588 100644 --- a/src/ops/targets/postgres.rs +++ b/src/ops/targets/postgres.rs @@ -474,6 +474,7 @@ fn to_column_type_sql(column_type: &ValueType) -> String { ValueType::Basic(basic_type) => match basic_type { BasicValueType::Bytes => "bytea".into(), BasicValueType::Str => "text".into(), + BasicValueType::Enum(_) => "text".into(), BasicValueType::Bool => "boolean".into(), BasicValueType::Int64 => "bigint".into(), BasicValueType::Float32 => "real".into(), diff --git a/src/py/convert.rs b/src/py/convert.rs index 67e254892..0793e0daf 100644 --- a/src/py/convert.rs +++ b/src/py/convert.rs @@ -156,6 +156,9 @@ fn basic_value_from_py_object<'py>( value::BasicValue::Bytes(Bytes::from(v.extract::>()?)) } schema::BasicValueType::Str => value::BasicValue::Str(Arc::from(v.extract::()?)), + schema::BasicValueType::Enum(_) => { + value::BasicValue::Str(Arc::from(v.extract::()?)) + } schema::BasicValueType::Bool => value::BasicValue::Bool(v.extract::()?), schema::BasicValueType::Int64 => value::BasicValue::Int64(v.extract::()?), schema::BasicValueType::Float32 => value::BasicValue::Float32(v.extract::()?),