Skip to content

Commit ffb195c

Browse files
authored
fix 593 (#610)
1 parent f995de5 commit ffb195c

File tree

14 files changed

+137
-136
lines changed

14 files changed

+137
-136
lines changed

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
6161
.iter()
6262
.map(|expr| expr.try_into())
6363
.collect::<Result<Vec<_>, _>>()?;
64-
LogicalPlanBuilder::from(&input)
64+
LogicalPlanBuilder::from(input)
6565
.project(x)?
6666
.build()
6767
.map_err(|e| e.into())
6868
}
6969
LogicalPlanType::Selection(selection) => {
7070
let input: LogicalPlan = convert_box_required!(selection.input)?;
71-
LogicalPlanBuilder::from(&input)
71+
LogicalPlanBuilder::from(input)
7272
.filter(
7373
selection
7474
.expr
@@ -86,7 +86,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
8686
.iter()
8787
.map(|expr| expr.try_into())
8888
.collect::<Result<Vec<_>, _>>()?;
89-
LogicalPlanBuilder::from(&input)
89+
LogicalPlanBuilder::from(input)
9090
.window(window_expr)?
9191
.build()
9292
.map_err(|e| e.into())
@@ -103,7 +103,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
103103
.iter()
104104
.map(|expr| expr.try_into())
105105
.collect::<Result<Vec<_>, _>>()?;
106-
LogicalPlanBuilder::from(&input)
106+
LogicalPlanBuilder::from(input)
107107
.aggregate(group_expr, aggr_expr)?
108108
.build()
109109
.map_err(|e| e.into())
@@ -172,7 +172,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
172172
.iter()
173173
.map(|expr| expr.try_into())
174174
.collect::<Result<Vec<Expr>, _>>()?;
175-
LogicalPlanBuilder::from(&input)
175+
LogicalPlanBuilder::from(input)
176176
.sort(sort_expr)?
177177
.build()
178178
.map_err(|e| e.into())
@@ -203,7 +203,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
203203
}
204204
};
205205

206-
LogicalPlanBuilder::from(&input)
206+
LogicalPlanBuilder::from(input)
207207
.repartition(partitioning_scheme)?
208208
.build()
209209
.map_err(|e| e.into())
@@ -233,14 +233,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
233233
}
234234
LogicalPlanType::Explain(explain) => {
235235
let input: LogicalPlan = convert_box_required!(explain.input)?;
236-
LogicalPlanBuilder::from(&input)
236+
LogicalPlanBuilder::from(input)
237237
.explain(explain.verbose)?
238238
.build()
239239
.map_err(|e| e.into())
240240
}
241241
LogicalPlanType::Limit(limit) => {
242242
let input: LogicalPlan = convert_box_required!(limit.input)?;
243-
LogicalPlanBuilder::from(&input)
243+
LogicalPlanBuilder::from(input)
244244
.limit(limit.limit as usize)?
245245
.build()
246246
.map_err(|e| e.into())
@@ -265,7 +265,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
265265
protobuf::JoinType::Semi => JoinType::Semi,
266266
protobuf::JoinType::Anti => JoinType::Anti,
267267
};
268-
LogicalPlanBuilder::from(&convert_box_required!(join.left)?)
268+
LogicalPlanBuilder::from(convert_box_required!(join.left)?)
269269
.join(
270270
&convert_box_required!(join.right)?,
271271
join_type,

datafusion/src/execution/context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,7 +1073,7 @@ mod tests {
10731073
let ctx = create_ctx(&tmp_dir, partition_count)?;
10741074

10751075
let table = ctx.table("test")?;
1076-
let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan())
1076+
let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
10771077
.project(vec![col("c2")])?
10781078
.build()?;
10791079

@@ -2566,7 +2566,7 @@ mod tests {
25662566

25672567
let t = ctx.table("t")?;
25682568

2569-
let plan = LogicalPlanBuilder::from(&t.to_logical_plan())
2569+
let plan = LogicalPlanBuilder::from(t.to_logical_plan())
25702570
.project(vec![
25712571
col("a"),
25722572
col("b"),

datafusion/src/execution/dataframe_impl.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,15 @@ impl DataFrame for DataFrameImpl {
6363

6464
/// Create a projection based on arbitrary expressions
6565
fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
66-
let plan = LogicalPlanBuilder::from(&self.plan)
66+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
6767
.project(expr_list)?
6868
.build()?;
6969
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
7070
}
7171

7272
/// Create a filter based on a predicate expression
7373
fn filter(&self, predicate: Expr) -> Result<Arc<dyn DataFrame>> {
74-
let plan = LogicalPlanBuilder::from(&self.plan)
74+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
7575
.filter(predicate)?
7676
.build()?;
7777
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
@@ -83,21 +83,25 @@ impl DataFrame for DataFrameImpl {
8383
group_expr: Vec<Expr>,
8484
aggr_expr: Vec<Expr>,
8585
) -> Result<Arc<dyn DataFrame>> {
86-
let plan = LogicalPlanBuilder::from(&self.plan)
86+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
8787
.aggregate(group_expr, aggr_expr)?
8888
.build()?;
8989
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
9090
}
9191

9292
/// Limit the number of rows
9393
fn limit(&self, n: usize) -> Result<Arc<dyn DataFrame>> {
94-
let plan = LogicalPlanBuilder::from(&self.plan).limit(n)?.build()?;
94+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
95+
.limit(n)?
96+
.build()?;
9597
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
9698
}
9799

98100
/// Sort by specified sorting expressions
99101
fn sort(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
100-
let plan = LogicalPlanBuilder::from(&self.plan).sort(expr)?.build()?;
102+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
103+
.sort(expr)?
104+
.build()?;
101105
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
102106
}
103107

@@ -109,7 +113,7 @@ impl DataFrame for DataFrameImpl {
109113
left_cols: &[&str],
110114
right_cols: &[&str],
111115
) -> Result<Arc<dyn DataFrame>> {
112-
let plan = LogicalPlanBuilder::from(&self.plan)
116+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
113117
.join(
114118
&right.to_logical_plan(),
115119
join_type,
@@ -124,7 +128,7 @@ impl DataFrame for DataFrameImpl {
124128
&self,
125129
partitioning_scheme: Partitioning,
126130
) -> Result<Arc<dyn DataFrame>> {
127-
let plan = LogicalPlanBuilder::from(&self.plan)
131+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
128132
.repartition(partitioning_scheme)?
129133
.build()?;
130134
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
@@ -161,7 +165,7 @@ impl DataFrame for DataFrameImpl {
161165
}
162166

163167
fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>> {
164-
let plan = LogicalPlanBuilder::from(&self.plan)
168+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
165169
.explain(verbose)?
166170
.build()?;
167171
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
@@ -173,7 +177,7 @@ impl DataFrame for DataFrameImpl {
173177
}
174178

175179
fn union(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>> {
176-
let plan = LogicalPlanBuilder::from(&self.plan)
180+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
177181
.union(dataframe.to_logical_plan())?
178182
.build()?;
179183
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))

datafusion/src/logical_plan/builder.rs

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,15 @@ pub struct LogicalPlanBuilder {
8989

9090
impl LogicalPlanBuilder {
9191
/// Create a builder from an existing plan
92-
pub fn from(plan: &LogicalPlan) -> Self {
93-
Self { plan: plan.clone() }
92+
pub fn from(plan: LogicalPlan) -> Self {
93+
Self { plan }
9494
}
9595

9696
/// Create an empty relation.
9797
///
9898
/// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
9999
pub fn empty(produce_one_row: bool) -> Self {
100-
Self::from(&LogicalPlan::EmptyRelation {
100+
Self::from(LogicalPlan::EmptyRelation {
101101
produce_one_row,
102102
schema: DFSchemaRef::new(DFSchema::empty()),
103103
})
@@ -202,7 +202,7 @@ impl LogicalPlanBuilder {
202202
limit: None,
203203
};
204204

205-
Ok(Self::from(&table_scan))
205+
Ok(Self::from(table_scan))
206206
}
207207

208208
/// Apply a projection.
@@ -234,7 +234,7 @@ impl LogicalPlanBuilder {
234234

235235
let schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;
236236

237-
Ok(Self::from(&LogicalPlan::Projection {
237+
Ok(Self::from(LogicalPlan::Projection {
238238
expr: projected_expr,
239239
input: Arc::new(self.plan.clone()),
240240
schema: DFSchemaRef::new(schema),
@@ -244,15 +244,15 @@ impl LogicalPlanBuilder {
244244
/// Apply a filter
245245
pub fn filter(&self, expr: Expr) -> Result<Self> {
246246
let expr = normalize_col(expr, &self.plan.all_schemas())?;
247-
Ok(Self::from(&LogicalPlan::Filter {
247+
Ok(Self::from(LogicalPlan::Filter {
248248
predicate: expr,
249249
input: Arc::new(self.plan.clone()),
250250
}))
251251
}
252252

253253
/// Apply a limit
254254
pub fn limit(&self, n: usize) -> Result<Self> {
255-
Ok(Self::from(&LogicalPlan::Limit {
255+
Ok(Self::from(LogicalPlan::Limit {
256256
n,
257257
input: Arc::new(self.plan.clone()),
258258
}))
@@ -261,19 +261,15 @@ impl LogicalPlanBuilder {
261261
/// Apply a sort
262262
pub fn sort(&self, exprs: impl IntoIterator<Item = Expr>) -> Result<Self> {
263263
let schemas = self.plan.all_schemas();
264-
Ok(Self::from(&LogicalPlan::Sort {
264+
Ok(Self::from(LogicalPlan::Sort {
265265
expr: normalize_cols(exprs, &schemas)?,
266266
input: Arc::new(self.plan.clone()),
267267
}))
268268
}
269269

270270
/// Apply a union
271271
pub fn union(&self, plan: LogicalPlan) -> Result<Self> {
272-
Ok(Self::from(&union_with_alias(
273-
self.plan.clone(),
274-
plan,
275-
None,
276-
)?))
272+
Ok(Self::from(union_with_alias(self.plan.clone(), plan, None)?))
277273
}
278274

279275
/// Apply a join with on constraint
@@ -307,7 +303,7 @@ impl LogicalPlanBuilder {
307303
&JoinConstraint::On,
308304
)?;
309305

310-
Ok(Self::from(&LogicalPlan::Join {
306+
Ok(Self::from(LogicalPlan::Join {
311307
left: Arc::new(self.plan.clone()),
312308
right: Arc::new(right.clone()),
313309
on,
@@ -343,7 +339,7 @@ impl LogicalPlanBuilder {
343339
&JoinConstraint::Using,
344340
)?;
345341

346-
Ok(Self::from(&LogicalPlan::Join {
342+
Ok(Self::from(LogicalPlan::Join {
347343
left: Arc::new(self.plan.clone()),
348344
right: Arc::new(right.clone()),
349345
on,
@@ -356,7 +352,7 @@ impl LogicalPlanBuilder {
356352
/// Apply a cross join
357353
pub fn cross_join(&self, right: &LogicalPlan) -> Result<Self> {
358354
let schema = self.plan.schema().join(right.schema())?;
359-
Ok(Self::from(&LogicalPlan::CrossJoin {
355+
Ok(Self::from(LogicalPlan::CrossJoin {
360356
left: Arc::new(self.plan.clone()),
361357
right: Arc::new(right.clone()),
362358
schema: DFSchemaRef::new(schema),
@@ -365,7 +361,7 @@ impl LogicalPlanBuilder {
365361

366362
/// Repartition
367363
pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result<Self> {
368-
Ok(Self::from(&LogicalPlan::Repartition {
364+
Ok(Self::from(LogicalPlan::Repartition {
369365
input: Arc::new(self.plan.clone()),
370366
partitioning_scheme,
371367
}))
@@ -379,7 +375,7 @@ impl LogicalPlanBuilder {
379375
let mut window_fields: Vec<DFField> =
380376
exprlist_to_fields(all_expr, self.plan.schema())?;
381377
window_fields.extend_from_slice(self.plan.schema().fields());
382-
Ok(Self::from(&LogicalPlan::Window {
378+
Ok(Self::from(LogicalPlan::Window {
383379
input: Arc::new(self.plan.clone()),
384380
window_expr,
385381
schema: Arc::new(DFSchema::new(window_fields)?),
@@ -404,7 +400,7 @@ impl LogicalPlanBuilder {
404400
let aggr_schema =
405401
DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;
406402

407-
Ok(Self::from(&LogicalPlan::Aggregate {
403+
Ok(Self::from(LogicalPlan::Aggregate {
408404
input: Arc::new(self.plan.clone()),
409405
group_expr,
410406
aggr_expr,
@@ -421,7 +417,7 @@ impl LogicalPlanBuilder {
421417

422418
let schema = LogicalPlan::explain_schema();
423419

424-
Ok(Self::from(&LogicalPlan::Explain {
420+
Ok(Self::from(LogicalPlan::Explain {
425421
verbose,
426422
plan: Arc::new(self.plan.clone()),
427423
stringified_plans,

0 commit comments

Comments
 (0)