Skip to content

Commit 14df97b

Browse files
authored
feat: expose intersect distinct/except distinct in dataframe api (#16578)
* feat: expose intersect distinct/except distinct in dataframe api * Update mod.rs
1 parent 8a0227e commit 14df97b

File tree

2 files changed

+100
-0
lines changed

2 files changed

+100
-0
lines changed

datafusion/core/src/dataframe/mod.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1710,6 +1710,40 @@ impl DataFrame {
17101710
})
17111711
}
17121712

1713+
/// Calculate the distinct intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
1714+
///
1715+
/// ```
1716+
/// # use datafusion::prelude::*;
1717+
/// # use datafusion::error::Result;
1718+
/// # use datafusion_common::assert_batches_sorted_eq;
1719+
/// # #[tokio::main]
1720+
/// # async fn main() -> Result<()> {
1721+
/// let ctx = SessionContext::new();
1722+
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1723+
/// let d2 = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1724+
/// let df = df.intersect_distinct(d2)?;
1725+
/// let expected = vec![
1726+
/// "+---+---+---+",
1727+
/// "| a | b | c |",
1728+
/// "+---+---+---+",
1729+
/// "| 1 | 2 | 3 |",
1730+
/// "+---+---+---+"
1731+
/// ];
1732+
/// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1733+
/// # Ok(())
1734+
/// # }
1735+
/// ```
1736+
pub fn intersect_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
1737+
let left_plan = self.plan;
1738+
let right_plan = dataframe.plan;
1739+
let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, false)?;
1740+
Ok(DataFrame {
1741+
session_state: self.session_state,
1742+
plan,
1743+
projection_requires_validation: true,
1744+
})
1745+
}
1746+
17131747
/// Calculate the exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
17141748
///
17151749
/// ```
@@ -1746,6 +1780,42 @@ impl DataFrame {
17461780
})
17471781
}
17481782

1783+
/// Calculate the distinct exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
1784+
///
1785+
/// ```
1786+
/// # use datafusion::prelude::*;
1787+
/// # use datafusion::error::Result;
1788+
/// # use datafusion_common::assert_batches_sorted_eq;
1789+
/// # #[tokio::main]
1790+
/// # async fn main() -> Result<()> {
1791+
/// let ctx = SessionContext::new();
1792+
/// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1793+
/// let d2 = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1794+
/// let result = df.except_distinct(d2)?;
1795+
/// // those columns are not in example.csv, but in example_long.csv
1796+
/// let expected = vec![
1797+
/// "+---+---+---+",
1798+
/// "| a | b | c |",
1799+
/// "+---+---+---+",
1800+
/// "| 4 | 5 | 6 |",
1801+
/// "| 7 | 8 | 9 |",
1802+
/// "+---+---+---+"
1803+
/// ];
1804+
/// # assert_batches_sorted_eq!(expected, &result.collect().await?);
1805+
/// # Ok(())
1806+
/// # }
1807+
/// ```
1808+
pub fn except_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
1809+
let left_plan = self.plan;
1810+
let right_plan = dataframe.plan;
1811+
let plan = LogicalPlanBuilder::except(left_plan, right_plan, false)?;
1812+
Ok(DataFrame {
1813+
session_state: self.session_state,
1814+
plan,
1815+
projection_requires_validation: true,
1816+
})
1817+
}
1818+
17491819
/// Execute this `DataFrame` and write the results to `table_name`.
17501820
///
17511821
/// Returns a single [RecordBatch] containing a single column and

datafusion/core/tests/dataframe/mod.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1360,6 +1360,36 @@ async fn except() -> Result<()> {
13601360
Ok(())
13611361
}
13621362

1363+
#[tokio::test]
1364+
async fn except_distinct() -> Result<()> {
1365+
let df = test_table().await?.select_columns(&["c1", "c3"])?;
1366+
let d2 = df.clone();
1367+
let plan = df.except_distinct(d2)?;
1368+
let result = plan.logical_plan().clone();
1369+
let expected = create_plan(
1370+
"SELECT c1, c3 FROM aggregate_test_100
1371+
EXCEPT DISTINCT SELECT c1, c3 FROM aggregate_test_100",
1372+
)
1373+
.await?;
1374+
assert_same_plan(&result, &expected);
1375+
Ok(())
1376+
}
1377+
1378+
#[tokio::test]
1379+
async fn intersect_distinct() -> Result<()> {
1380+
let df = test_table().await?.select_columns(&["c1", "c3"])?;
1381+
let d2 = df.clone();
1382+
let plan = df.intersect_distinct(d2)?;
1383+
let result = plan.logical_plan().clone();
1384+
let expected = create_plan(
1385+
"SELECT c1, c3 FROM aggregate_test_100
1386+
INTERSECT DISTINCT SELECT c1, c3 FROM aggregate_test_100",
1387+
)
1388+
.await?;
1389+
assert_same_plan(&result, &expected);
1390+
Ok(())
1391+
}
1392+
13631393
#[tokio::test]
13641394
async fn register_table() -> Result<()> {
13651395
let df = test_table().await?.select_columns(&["c1", "c12"])?;

0 commit comments

Comments
 (0)