Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,099 changes: 1,591 additions & 508 deletions Cargo.lock

Large diffs are not rendered by default.

34 changes: 19 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
members = ["benchmarks", "cli"]

[workspace.dependencies]
datafusion = { version = "51.0.0", default-features = false }
datafusion-proto = { version = "51.0.0" }
datafusion = { version = "50.0.0", default-features = false }
datafusion-proto = { version = "50.0.0" }

[package]
name = "datafusion-distributed"
Expand All @@ -14,16 +14,15 @@ edition = "2024"
chrono = { version = "0.4.42" }
datafusion = { workspace = true, features = [
"parquet",
"sql",
"unicode_expressions",
"datetime_expressions",
] }
datafusion-proto = { workspace = true }
arrow-flight = "57.0.0"
arrow-select = "57.0.0"
arrow-flight = "56.0.0"
arrow-select = "56.0.0"
async-trait = "0.1.88"
tokio = { version = "1.46.1", features = ["full"] }
tonic = { version = "0.14.1", features = ["transport"] }
tonic = { version = "0.13", features = ["transport"] }
tower = "0.5.2"
http = "1.3.1"
itertools = "0.14.0"
Expand All @@ -32,7 +31,7 @@ url = "2.5.4"
uuid = "1.17.0"
delegate = "0.13.4"
dashmap = "6.1.0"
prost = "0.14.0"
prost = "0.13"
rand = "0.8.5"
object_store = "0.12.3"
bytes = "1.10.1"
Expand All @@ -41,10 +40,10 @@ tokio-stream = "0.1.17"

# integration_tests deps
insta = { version = "1.43.1", features = ["filters"], optional = true }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf", optional = true }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf", optional = true }
parquet = { version = "57.0.0", optional = true }
arrow = { version = "57.0.0", optional = true }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
parquet = { version = "56.0.0", optional = true }
arrow = { version = "56.0.0", optional = true }
hyper-util = { version = "0.1.16", optional = true }
pretty_assertions = { version = "1.4", optional = true }

Expand All @@ -66,10 +65,15 @@ tpcds = ["integration"]
[dev-dependencies]
structopt = "0.3"
insta = { version = "1.43.1", features = ["filters"] }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf" }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf" }
parquet = "57.0.0"
arrow = "57.0.0"
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
parquet = "56.0.0"
arrow = "56.0.0"
tokio-stream = "0.1.17"
hyper-util = "0.1.16"
pretty_assertions = "1.4"

[patch.crates-io]
datafusion = { git = "https://github.com/DataDog/datafusion", rev = "4cc47d3418ee7dfe8f82394d356560401b950d63", package = "datafusion" }
datafusion-proto = { git = "https://github.com/DataDog/datafusion", rev = "4cc47d3418ee7dfe8f82394d356560401b950d63", package = "datafusion-proto" }
datafusion-proto-common = { git = "https://github.com/DataDog/datafusion", rev = "4cc47d3418ee7dfe8f82394d356560401b950d63", package = "datafusion-proto-common" }
6 changes: 3 additions & 3 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-distributed = { path = "..", features = ["integration"] }
tokio = { version = "1.46.1", features = ["full"] }
parquet = { version = "57.0.0" }
parquet = { version = "56.0.0" }
structopt = { version = "0.3.26" }
log = "0.4.27"
serde = "1.0.219"
Expand All @@ -19,10 +19,10 @@ async-trait = "0.1.88"
chrono = "0.4.41"
futures = "0.3.31"
dashmap = "6.1.0"
prost = "0.14.0"
prost = "0.13"
url = "2.5.4"
arrow-flight = "57.0.0"
tonic = { version = "0.14.1", features = ["transport"] }
tonic = { version = "0.13", features = ["transport"] }
axum = "0.7"
object_store = { version = "0.12.4", features = ["aws"] }
aws-config = "1"
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/util/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::{exec_err, extensions_options, plan_err};
use datafusion::config::{ConfigExtension, ConfigOptions};
use datafusion::error::DataFusionError;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::execution::{FunctionRegistry, SendableRecordBatchStream, TaskContext};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
Expand Down Expand Up @@ -150,7 +150,7 @@ impl PhysicalExtensionCodec for InMemoryCacheExecCodec {
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_registry: &dyn FunctionRegistry,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
let Ok(proto) = InMemoryCacheExecProto::decode(buf) else {
return plan_err!("no InMemoryDataSourceExecProto");
Expand Down
2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ clap = { version = "4", features = ["derive"] }
env_logger = "0.11"
dirs = "6"
arrow-flight = "57.0.0"
tonic = { version = "0.14.1", features = ["transport"] }
tonic = { version = "0.13", features = ["transport"] }
tower = "0.5.2"
hyper-util = "0.1.16"
tokio-stream = "0.1.17"
Expand Down
2 changes: 1 addition & 1 deletion src/distributed_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub trait DistributedExt: Sized {
/// struct CustomExecCodec;
///
/// impl PhysicalExtensionCodec for CustomExecCodec {
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &TaskContext) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &dyn FunctionRegistry) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
/// todo!()
/// }
///
Expand Down
3 changes: 2 additions & 1 deletion src/flight_service/do_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ impl ArrowFlightEndpoint {
let stage_data = once
.get_or_try_init(|| async {
let proto_node = PhysicalPlanNode::try_decode(doget.plan_proto.as_ref())?;
let mut plan = proto_node.try_into_physical_plan(&ctx.task_ctx(), &codec)?;
let mut plan =
proto_node.try_into_physical_plan(&ctx, &ctx.runtime_env(), &codec)?;
for hook in self.hooks.on_plan.iter() {
plan = hook(plan)
}
Expand Down
2 changes: 1 addition & 1 deletion src/flight_service/session_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub trait DistributedSessionBuilder {
/// struct CustomExecCodec;
///
/// impl PhysicalExtensionCodec for CustomExecCodec {
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &TaskContext) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &dyn FunctionRegistry) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
/// todo!()
/// }
///
Expand Down
4 changes: 0 additions & 4 deletions src/metrics/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,6 @@ pub fn df_metric_to_proto(metric: Arc<Metric>) -> Result<MetricProto, DataFusion
labels,
}),
MetricValue::Custom { .. } => internal_err!("{}", CUSTOM_METRICS_NOT_SUPPORTED),
MetricValue::OutputBytes(_) | MetricValue::PruningMetrics { .. } | MetricValue::Ratio { .. } => {
// TODO: Support these metrics
internal_err!("{}", UNSUPPORTED_METRICS)
}
}
}

Expand Down
63 changes: 36 additions & 27 deletions src/protobuf/distributed_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::internal_datafusion_err;
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::execution::{FunctionRegistry, SessionStateBuilder};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::{ExecutionPlan, Partitioning, PlanProperties};
use datafusion::prelude::SessionConfig;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning;
use datafusion_proto::physical_plan::to_proto::serialize_partitioning;
use datafusion_proto::physical_plan::{ComposedPhysicalExtensionCodec, PhysicalExtensionCodec};
Expand Down Expand Up @@ -40,7 +40,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
ctx: &TaskContext,
registry: &dyn FunctionRegistry,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
let DistributedExecProto {
node: Some(distributed_exec_node),
Expand All @@ -51,6 +51,20 @@ impl PhysicalExtensionCodec for DistributedCodec {
));
};

// TODO: The PhysicalExtensionCodec trait doesn't provide access to session state,
// so we create a new SessionContext which loses any custom UDFs, UDAFs, and other
// user configurations. This is a limitation of the current trait design.
let state = SessionStateBuilder::new()
.with_scalar_functions(
registry
.udfs()
.iter()
.map(|f| registry.udf(f))
.collect::<Result<Vec<_>, _>>()?,
)
.build();
let ctx = SessionContext::from(state);

fn parse_stage_proto(
proto: Option<StageProto>,
inputs: &[Arc<dyn ExecutionPlan>],
Expand Down Expand Up @@ -100,7 +114,7 @@ impl PhysicalExtensionCodec for DistributedCodec {

let partitioning = parse_protobuf_partitioning(
partitioning.as_ref(),
ctx,
&ctx,
&schema,
&DistributedCodec {},
)?
Expand All @@ -124,7 +138,7 @@ impl PhysicalExtensionCodec for DistributedCodec {

let partitioning = parse_protobuf_partitioning(
partitioning.as_ref(),
ctx,
&ctx,
&schema,
&DistributedCodec {},
)?
Expand Down Expand Up @@ -389,12 +403,11 @@ mod tests {
use datafusion::physical_expr::LexOrdering;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::{
execution::registry::MemoryFunctionRegistry,
physical_expr::{Partitioning, PhysicalSortExpr, expressions::Column, expressions::col},
physical_plan::{ExecutionPlan, displayable, sorts::sort::SortExec, union::UnionExec},
};

use datafusion::prelude::SessionContext;

fn empty_exec() -> Arc<dyn ExecutionPlan> {
Arc::new(EmptyExec::new(SchemaRef::new(Schema::empty())))
}
Expand All @@ -416,14 +429,10 @@ mod tests {
displayable(plan.as_ref()).indent(true).to_string()
}

fn create_context() -> Arc<TaskContext> {
SessionContext::new().task_ctx()
}

#[test]
fn test_roundtrip_single_flight() -> datafusion::common::Result<()> {
let codec = DistributedCodec;
let ctx = create_context();
let registry = MemoryFunctionRegistry::new();

let schema = schema_i32("a");
let part = Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 4);
Expand All @@ -433,7 +442,7 @@ mod tests {
let mut buf = Vec::new();
codec.try_encode(plan.clone(), &mut buf)?;

let decoded = codec.try_decode(&buf, &[empty_exec()], &ctx)?;
let decoded = codec.try_decode(&buf, &[empty_exec()], &registry)?;
assert_eq!(repr(&plan), repr(&decoded));

Ok(())
Expand All @@ -442,7 +451,7 @@ mod tests {
#[test]
fn test_roundtrip_isolator_flight() -> datafusion::common::Result<()> {
let codec = DistributedCodec;
let ctx = create_context();
let registry = MemoryFunctionRegistry::new();

let schema = schema_i32("b");
let flight = Arc::new(new_network_hash_shuffle_exec(
Expand All @@ -457,7 +466,7 @@ mod tests {
let mut buf = Vec::new();
codec.try_encode(plan.clone(), &mut buf)?;

let decoded = codec.try_decode(&buf, &[flight], &ctx)?;
let decoded = codec.try_decode(&buf, &[flight], &registry)?;
assert_eq!(repr(&plan), repr(&decoded));

Ok(())
Expand All @@ -466,7 +475,7 @@ mod tests {
#[test]
fn test_roundtrip_isolator_union() -> datafusion::common::Result<()> {
let codec = DistributedCodec;
let ctx = create_context();
let registry = MemoryFunctionRegistry::new();

let schema = schema_i32("c");
let left = Arc::new(new_network_hash_shuffle_exec(
Expand All @@ -480,14 +489,14 @@ mod tests {
dummy_stage(),
));

let union = UnionExec::try_new(vec![left.clone(), right.clone()])?;
let union = Arc::new(UnionExec::new(vec![left.clone(), right.clone()]));
let plan: Arc<dyn ExecutionPlan> =
Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 1)?);

let mut buf = Vec::new();
codec.try_encode(plan.clone(), &mut buf)?;

let decoded = codec.try_decode(&buf, &[union], &ctx)?;
let decoded = codec.try_decode(&buf, &[union], &registry)?;
assert_eq!(repr(&plan), repr(&decoded));

Ok(())
Expand All @@ -496,7 +505,7 @@ mod tests {
#[test]
fn test_roundtrip_isolator_sort_flight() -> datafusion::common::Result<()> {
let codec = DistributedCodec;
let ctx = create_context();
let registry = MemoryFunctionRegistry::new();

let schema = schema_i32("d");
let flight = Arc::new(new_network_hash_shuffle_exec(
Expand All @@ -520,7 +529,7 @@ mod tests {
let mut buf = Vec::new();
codec.try_encode(plan.clone(), &mut buf)?;

let decoded = codec.try_decode(&buf, &[sort], &ctx)?;
let decoded = codec.try_decode(&buf, &[sort], &registry)?;
assert_eq!(repr(&plan), repr(&decoded));

Ok(())
Expand All @@ -529,7 +538,7 @@ mod tests {
#[test]
fn test_roundtrip_single_flight_coalesce() -> datafusion::common::Result<()> {
let codec = DistributedCodec;
let ctx = create_context();
let registry = MemoryFunctionRegistry::new();

let schema = schema_i32("e");
let plan: Arc<dyn ExecutionPlan> = Arc::new(new_network_coalesce_tasks_exec(
Expand All @@ -541,7 +550,7 @@ mod tests {
let mut buf = Vec::new();
codec.try_encode(plan.clone(), &mut buf)?;

let decoded = codec.try_decode(&buf, &[empty_exec()], &ctx)?;
let decoded = codec.try_decode(&buf, &[empty_exec()], &registry)?;
assert_eq!(repr(&plan), repr(&decoded));

Ok(())
Expand All @@ -550,7 +559,7 @@ mod tests {
#[test]
fn test_roundtrip_isolator_flight_coalesce() -> datafusion::common::Result<()> {
let codec = DistributedCodec;
let ctx = create_context();
let registry = MemoryFunctionRegistry::new();

let schema = schema_i32("f");
let flight = Arc::new(new_network_coalesce_tasks_exec(
Expand All @@ -565,7 +574,7 @@ mod tests {
let mut buf = Vec::new();
codec.try_encode(plan.clone(), &mut buf)?;

let decoded = codec.try_decode(&buf, &[flight], &ctx)?;
let decoded = codec.try_decode(&buf, &[flight], &registry)?;
assert_eq!(repr(&plan), repr(&decoded));

Ok(())
Expand All @@ -574,7 +583,7 @@ mod tests {
#[test]
fn test_roundtrip_isolator_union_coalesce() -> datafusion::common::Result<()> {
let codec = DistributedCodec;
let ctx = create_context();
let registry = MemoryFunctionRegistry::new();

let schema = schema_i32("g");
let left = Arc::new(new_network_coalesce_tasks_exec(
Expand All @@ -588,14 +597,14 @@ mod tests {
dummy_stage(),
));

let union = UnionExec::try_new(vec![left.clone(), right.clone()])?;
let union = Arc::new(UnionExec::new(vec![left.clone(), right.clone()]));
let plan: Arc<dyn ExecutionPlan> =
Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 3)?);

let mut buf = Vec::new();
codec.try_encode(plan.clone(), &mut buf)?;

let decoded = codec.try_decode(&buf, &[union], &ctx)?;
let decoded = codec.try_decode(&buf, &[union], &registry)?;
assert_eq!(repr(&plan), repr(&decoded));

Ok(())
Expand Down
Loading
Loading