Skip to content

Commit ec1720b

Browse files
Omega359findepi
authored andcommitted
Add drop_columns to dataframe api (apache#11010)
* Add drop_columns to dataframe api apache#11007 * Prettier cleanup * Added additional drop_columns tests and fixed issue with nonexistent columns.
1 parent fc4e602 commit ec1720b

File tree

2 files changed

+170
-0
lines changed

2 files changed

+170
-0
lines changed

datafusion/core/src/dataframe/mod.rs

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,42 @@ impl DataFrame {
244244
})
245245
}
246246

247+
/// Returns a new DataFrame containing all columns except the specified columns.
248+
///
249+
/// ```
250+
/// # use datafusion::prelude::*;
251+
/// # use datafusion::error::Result;
252+
/// # #[tokio::main]
253+
/// # async fn main() -> Result<()> {
254+
/// let ctx = SessionContext::new();
255+
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
256+
/// let df = df.drop_columns(&["a"])?;
257+
/// # Ok(())
258+
/// # }
259+
/// ```
260+
pub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame> {
261+
let fields_to_drop = columns
262+
.iter()
263+
.map(|name| {
264+
self.plan
265+
.schema()
266+
.qualified_field_with_unqualified_name(name)
267+
})
268+
.filter(|r| r.is_ok())
269+
.collect::<Result<Vec<_>>>()?;
270+
let expr: Vec<Expr> = self
271+
.plan
272+
.schema()
273+
.fields()
274+
.into_iter()
275+
.enumerate()
276+
.map(|(idx, _)| self.plan.schema().qualified_field(idx))
277+
.filter(|(qualifier, f)| !fields_to_drop.contains(&(*qualifier, f)))
278+
.map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
279+
.collect();
280+
self.select(expr)
281+
}
282+
247283
/// Expand each list element of a column to multiple rows.
248284
#[deprecated(since = "37.0.0", note = "use unnest_columns instead")]
249285
pub fn unnest_column(self, column: &str) -> Result<DataFrame> {
@@ -1799,6 +1835,139 @@ mod tests {
17991835
Ok(())
18001836
}
18011837

1838+
#[tokio::test]
1839+
async fn drop_columns() -> Result<()> {
1840+
// build plan using Table API
1841+
let t = test_table().await?;
1842+
let t2 = t.drop_columns(&["c2", "c11"])?;
1843+
let plan = t2.plan.clone();
1844+
1845+
// build query using SQL
1846+
let sql_plan = create_plan(
1847+
"SELECT c1,c3,c4,c5,c6,c7,c8,c9,c10,c12,c13 FROM aggregate_test_100",
1848+
)
1849+
.await?;
1850+
1851+
// the two plans should be identical
1852+
assert_same_plan(&plan, &sql_plan);
1853+
1854+
Ok(())
1855+
}
1856+
1857+
#[tokio::test]
1858+
async fn drop_columns_with_duplicates() -> Result<()> {
1859+
// build plan using Table API
1860+
let t = test_table().await?;
1861+
let t2 = t.drop_columns(&["c2", "c11", "c2", "c2"])?;
1862+
let plan = t2.plan.clone();
1863+
1864+
// build query using SQL
1865+
let sql_plan = create_plan(
1866+
"SELECT c1,c3,c4,c5,c6,c7,c8,c9,c10,c12,c13 FROM aggregate_test_100",
1867+
)
1868+
.await?;
1869+
1870+
// the two plans should be identical
1871+
assert_same_plan(&plan, &sql_plan);
1872+
1873+
Ok(())
1874+
}
1875+
1876+
#[tokio::test]
1877+
async fn drop_columns_with_nonexistent_columns() -> Result<()> {
1878+
// build plan using Table API
1879+
let t = test_table().await?;
1880+
let t2 = t.drop_columns(&["canada", "c2", "rocks"])?;
1881+
let plan = t2.plan.clone();
1882+
1883+
// build query using SQL
1884+
let sql_plan = create_plan(
1885+
"SELECT c1,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 FROM aggregate_test_100",
1886+
)
1887+
.await?;
1888+
1889+
// the two plans should be identical
1890+
assert_same_plan(&plan, &sql_plan);
1891+
1892+
Ok(())
1893+
}
1894+
1895+
#[tokio::test]
1896+
async fn drop_columns_with_empty_array() -> Result<()> {
1897+
// build plan using Table API
1898+
let t = test_table().await?;
1899+
let t2 = t.drop_columns(&[])?;
1900+
let plan = t2.plan.clone();
1901+
1902+
// build query using SQL
1903+
let sql_plan = create_plan(
1904+
"SELECT c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 FROM aggregate_test_100",
1905+
)
1906+
.await?;
1907+
1908+
// the two plans should be identical
1909+
assert_same_plan(&plan, &sql_plan);
1910+
1911+
Ok(())
1912+
}
1913+
1914+
#[tokio::test]
1915+
async fn drop_with_quotes() -> Result<()> {
1916+
// define data with a column name that has a "." in it:
1917+
let array1: Int32Array = [1, 10].into_iter().collect();
1918+
let array2: Int32Array = [2, 11].into_iter().collect();
1919+
let batch = RecordBatch::try_from_iter(vec![
1920+
("f\"c1", Arc::new(array1) as _),
1921+
("f\"c2", Arc::new(array2) as _),
1922+
])?;
1923+
1924+
let ctx = SessionContext::new();
1925+
ctx.register_batch("t", batch)?;
1926+
1927+
let df = ctx.table("t").await?.drop_columns(&["f\"c1"])?;
1928+
1929+
let df_results = df.collect().await?;
1930+
1931+
assert_batches_sorted_eq!(
1932+
[
1933+
"+------+",
1934+
"| f\"c2 |",
1935+
"+------+",
1936+
"| 2 |",
1937+
"| 11 |",
1938+
"+------+"
1939+
],
1940+
&df_results
1941+
);
1942+
1943+
Ok(())
1944+
}
1945+
1946+
#[tokio::test]
1947+
async fn drop_with_periods() -> Result<()> {
1948+
// define data with a column name that has a "." in it:
1949+
let array1: Int32Array = [1, 10].into_iter().collect();
1950+
let array2: Int32Array = [2, 11].into_iter().collect();
1951+
let batch = RecordBatch::try_from_iter(vec![
1952+
("f.c1", Arc::new(array1) as _),
1953+
("f.c2", Arc::new(array2) as _),
1954+
])?;
1955+
1956+
let ctx = SessionContext::new();
1957+
ctx.register_batch("t", batch)?;
1958+
1959+
let df = ctx.table("t").await?.drop_columns(&["f.c1"])?;
1960+
1961+
let df_results = df.collect().await?;
1962+
1963+
assert_batches_sorted_eq!(
1964+
["+------+", "| f.c2 |", "+------+", "| 2 |", "| 11 |", "+------+"],
1965+
&df_results
1966+
);
1967+
1968+
Ok(())
1969+
}
1970+
18021971
#[tokio::test]
18031972
async fn aggregate() -> Result<()> {
18041973
// build plan using DataFrame API

docs/source/user-guide/dataframe.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ execution. The plan is evaluated (executed) when an action method is invoked, su
6464
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ |
6565
| aggregate | Perform an aggregate query with optional grouping expressions. |
6666
| distinct | Filter out duplicate rows. |
67+
| drop_columns | Create a projection with all but the provided column names. |
6768
| except | Calculate the exception of two DataFrames. The two DataFrames must have exactly the same schema |
6869
| filter | Filter a DataFrame to only include rows that match the specified filter expression. |
6970
| intersect | Calculate the intersection of two DataFrames. The two DataFrames must have exactly the same schema |

0 commit comments

Comments
 (0)