Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,7 @@ mod tests {
.file_extension(".out");
let df = ctx.read_csv(&format!("{}/answers/q{}.out", path, n), options)?;
let df = df.select(
&get_answer_schema(n)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty good example -- the answer_schema was created and immediately dropped after calling df.select(). Now the exprs don't need to be copied

get_answer_schema(n)
.fields()
.iter()
.map(|field| {
Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async fn main() -> datafusion::error::Result<()> {
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;

let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(&[col("a")], &[min(col("b"))])?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(100)?;

// execute and print results
Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/examples/simple_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async fn main() -> Result<()> {
let df = ctx.table("t")?;

// perform the aggregation
let df = df.aggregate(&[], &[geometric_mean.call(vec![col("a")])])?;
let df = df.aggregate(vec![], vec![geometric_mean.call(vec![col("a")])])?;

// note that "a" is f32, not f64. DataFusion coerces it to match the UDAF's signature.

Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ async fn main() -> Result<()> {
let expr1 = pow.call(vec![col("a"), col("b")]);

// equivalent to `'SELECT pow(a, b), pow(a, b) AS pow1 FROM t'`
let df = df.select(&[
let df = df.select(vec![
expr,
// alias so that they have different column names
expr1.alias("pow1"),
Expand Down
20 changes: 10 additions & 10 deletions rust/datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use async_trait::async_trait;
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.filter(col("a").lt_eq(col("b")))?
/// .aggregate(&[col("a")], &[min(col("b"))])?
/// .aggregate(vec![col("a")], vec![min(col("b"))])?
/// .limit(100)?;
/// let results = df.collect();
/// # Ok(())
Expand Down Expand Up @@ -75,11 +75,11 @@ pub trait DataFrame: Send + Sync {
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.select(&[col("a") * col("b"), col("c")])?;
/// let df = df.select(vec![col("a") * col("b"), col("c")])?;
/// # Ok(())
/// # }
/// ```
fn select(&self, expr: &[Expr]) -> Result<Arc<dyn DataFrame>>;
fn select(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>>;

/// Filter a DataFrame to only include rows that match the specified filter expression.
///
Expand All @@ -105,17 +105,17 @@ pub trait DataFrame: Send + Sync {
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
///
/// // The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
/// let _ = df.aggregate(&[col("a")], &[min(col("b"))])?;
/// let _ = df.aggregate(vec![col("a")], vec![min(col("b"))])?;
///
/// // The following use is the equivalent of "SELECT MIN(b)"
/// let _ = df.aggregate(&[], &[min(col("b"))])?;
/// let _ = df.aggregate(vec![], vec![min(col("b"))])?;
/// # Ok(())
/// # }
/// ```
fn aggregate(
&self,
group_expr: &[Expr],
aggr_expr: &[Expr],
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> Result<Arc<dyn DataFrame>>;

/// Limit the number of rows returned from this DataFrame.
Expand Down Expand Up @@ -155,11 +155,11 @@ pub trait DataFrame: Send + Sync {
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.sort(&[col("a").sort(true, true), col("b").sort(false, false)])?;
/// let df = df.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?;
/// # Ok(())
/// # }
/// ```
fn sort(&self, expr: &[Expr]) -> Result<Arc<dyn DataFrame>>;
fn sort(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>>;

/// Join this DataFrame with another DataFrame using the specified columns as join keys
///
Expand All @@ -171,7 +171,7 @@ pub trait DataFrame: Send + Sync {
/// let mut ctx = ExecutionContext::new();
/// let left = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let right = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?
/// .select(&[
/// .select(vec![
/// col("a").alias("a2"),
/// col("b").alias("b2"),
/// col("c").alias("c2")])?;
Expand Down
14 changes: 7 additions & 7 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use parquet::file::properties::WriterProperties;
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.filter(col("a").lt_eq(col("b")))?
/// .aggregate(&[col("a")], &[min(col("b"))])?
/// .aggregate(vec![col("a")], vec![min(col("b"))])?
/// .limit(100)?;
/// let results = df.collect();
/// # Ok(())
Expand Down Expand Up @@ -954,7 +954,7 @@ mod tests {

let table = ctx.table("test")?;
let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan())
.project(&[col("c2")])?
.project(vec![col("c2")])?
.build()?;

let optimized_plan = ctx.optimize(&logical_plan)?;
Expand Down Expand Up @@ -999,7 +999,7 @@ mod tests {
assert_eq!(schema.field_with_name("c1")?.is_nullable(), false);

let plan = LogicalPlanBuilder::scan_empty("", &schema, None)?
.project(&[col("c1")])?
.project(vec![col("c1")])?
.build()?;

let plan = ctx.optimize(&plan)?;
Expand Down Expand Up @@ -1030,7 +1030,7 @@ mod tests {
)?]];

let plan = LogicalPlanBuilder::scan_memory(partitions, schema, None)?
.project(&[col("b")])?
.project(vec![col("b")])?
.build()?;
assert_fields_eq(&plan, vec!["b"]);

Expand Down Expand Up @@ -1660,8 +1660,8 @@ mod tests {
]));

let plan = LogicalPlanBuilder::scan_empty("", schema.as_ref(), None)?
.aggregate(&[col("c1")], &[sum(col("c2"))])?
.project(&[col("c1"), col("SUM(c2)").alias("total_salary")])?
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.project(vec![col("c1"), col("SUM(c2)").alias("total_salary")])?
.build()?;

let plan = ctx.optimize(&plan)?;
Expand Down Expand Up @@ -1886,7 +1886,7 @@ mod tests {
let t = ctx.table("t")?;

let plan = LogicalPlanBuilder::from(&t.to_logical_plan())
.project(&[
.project(vec![
col("a"),
col("b"),
ctx.udf("my_add")?.call(vec![col("a"), col("b")]),
Expand Down
18 changes: 9 additions & 9 deletions rust/datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ impl DataFrame for DataFrameImpl {
.map(|name| self.plan.schema().field_with_unqualified_name(name))
.collect::<Result<Vec<_>>>()?;
let expr: Vec<Expr> = fields.iter().map(|f| col(f.name())).collect();
self.select(&expr)
self.select(expr)
}

/// Create a projection based on arbitrary expressions
fn select(&self, expr_list: &[Expr]) -> Result<Arc<dyn DataFrame>> {
fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
.project(expr_list)?
.build()?;
Expand All @@ -80,8 +80,8 @@ impl DataFrame for DataFrameImpl {
/// Perform an aggregate query
fn aggregate(
&self,
group_expr: &[Expr],
aggr_expr: &[Expr],
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
.aggregate(group_expr, aggr_expr)?
Expand All @@ -96,7 +96,7 @@ impl DataFrame for DataFrameImpl {
}

/// Sort by specified sorting expressions
fn sort(&self, expr: &[Expr]) -> Result<Arc<dyn DataFrame>> {
fn sort(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan).sort(expr)?.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}
Expand Down Expand Up @@ -204,7 +204,7 @@ mod tests {
fn select_expr() -> Result<()> {
// build plan using Table API
let t = test_table()?;
let t2 = t.select(&[col("c1"), col("c2"), col("c11")])?;
let t2 = t.select(vec![col("c1"), col("c2"), col("c11")])?;
let plan = t2.to_logical_plan();

// build query using SQL
Expand All @@ -220,8 +220,8 @@ mod tests {
fn aggregate() -> Result<()> {
// build plan using DataFrame API
let df = test_table()?;
let group_expr = &[col("c1")];
let aggr_expr = &[
let group_expr = vec![col("c1")];
let aggr_expr = vec![
min(col("c12")),
max(col("c12")),
avg(col("c12")),
Expand Down Expand Up @@ -322,7 +322,7 @@ mod tests {

let f = df.registry();

let df = df.select(&[f.udf("my_fn")?.call(vec![col("c12")])])?;
let df = df.select(vec![f.udf("my_fn")?.call(vec![col("c12")])])?;
let plan = df.to_logical_plan();

// build query using SQL
Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
//!
//! // create a plan
//! let df = df.filter(col("a").lt_eq(col("b")))?
//! .aggregate(&[col("a")], &[min(col("b"))])?
//! .aggregate(vec![col("a")], vec![min(col("b"))])?
//! .limit(100)?;
//!
//! // execute the plan
Expand Down
Loading