Skip to content

Commit 0557c46

Browse files
author
Jiayu Liu
committed
fix 593
1 parent d55a105 commit 0557c46

File tree

14 files changed

+138
-137
lines changed

14 files changed

+138
-137
lines changed

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
6161
.iter()
6262
.map(|expr| expr.try_into())
6363
.collect::<Result<Vec<_>, _>>()?;
64-
LogicalPlanBuilder::from(&input)
64+
LogicalPlanBuilder::from(input)
6565
.project(x)?
6666
.build()
6767
.map_err(|e| e.into())
6868
}
6969
LogicalPlanType::Selection(selection) => {
7070
let input: LogicalPlan = convert_box_required!(selection.input)?;
71-
LogicalPlanBuilder::from(&input)
71+
LogicalPlanBuilder::from(input)
7272
.filter(
7373
selection
7474
.expr
@@ -86,7 +86,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
8686
.iter()
8787
.map(|expr| expr.try_into())
8888
.collect::<Result<Vec<_>, _>>()?;
89-
LogicalPlanBuilder::from(&input)
89+
LogicalPlanBuilder::from(input)
9090
.window(window_expr)?
9191
.build()
9292
.map_err(|e| e.into())
@@ -103,7 +103,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
103103
.iter()
104104
.map(|expr| expr.try_into())
105105
.collect::<Result<Vec<_>, _>>()?;
106-
LogicalPlanBuilder::from(&input)
106+
LogicalPlanBuilder::from(input)
107107
.aggregate(group_expr, aggr_expr)?
108108
.build()
109109
.map_err(|e| e.into())
@@ -162,7 +162,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
162162
.iter()
163163
.map(|expr| expr.try_into())
164164
.collect::<Result<Vec<Expr>, _>>()?;
165-
LogicalPlanBuilder::from(&input)
165+
LogicalPlanBuilder::from(input)
166166
.sort(sort_expr)?
167167
.build()
168168
.map_err(|e| e.into())
@@ -193,7 +193,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
193193
}
194194
};
195195

196-
LogicalPlanBuilder::from(&input)
196+
LogicalPlanBuilder::from(input)
197197
.repartition(partitioning_scheme)?
198198
.build()
199199
.map_err(|e| e.into())
@@ -223,14 +223,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
223223
}
224224
LogicalPlanType::Explain(explain) => {
225225
let input: LogicalPlan = convert_box_required!(explain.input)?;
226-
LogicalPlanBuilder::from(&input)
226+
LogicalPlanBuilder::from(input)
227227
.explain(explain.verbose)?
228228
.build()
229229
.map_err(|e| e.into())
230230
}
231231
LogicalPlanType::Limit(limit) => {
232232
let input: LogicalPlan = convert_box_required!(limit.input)?;
233-
LogicalPlanBuilder::from(&input)
233+
LogicalPlanBuilder::from(input)
234234
.limit(limit.limit as usize)?
235235
.build()
236236
.map_err(|e| e.into())
@@ -255,7 +255,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
255255
protobuf::JoinType::Semi => JoinType::Semi,
256256
protobuf::JoinType::Anti => JoinType::Anti,
257257
};
258-
LogicalPlanBuilder::from(&convert_box_required!(join.left)?)
258+
LogicalPlanBuilder::from(convert_box_required!(join.left)?)
259259
.join(
260260
&convert_box_required!(join.right)?,
261261
join_type,

datafusion/src/execution/context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,7 +1073,7 @@ mod tests {
10731073
let ctx = create_ctx(&tmp_dir, partition_count)?;
10741074

10751075
let table = ctx.table("test")?;
1076-
let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan())
1076+
let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
10771077
.project(vec![col("c2")])?
10781078
.build()?;
10791079

@@ -2566,7 +2566,7 @@ mod tests {
25662566

25672567
let t = ctx.table("t")?;
25682568

2569-
let plan = LogicalPlanBuilder::from(&t.to_logical_plan())
2569+
let plan = LogicalPlanBuilder::from(t.to_logical_plan())
25702570
.project(vec![
25712571
col("a"),
25722572
col("b"),

datafusion/src/execution/dataframe_impl.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,15 @@ impl DataFrame for DataFrameImpl {
6363

6464
/// Create a projection based on arbitrary expressions
6565
fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
66-
let plan = LogicalPlanBuilder::from(&self.plan)
66+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
6767
.project(expr_list)?
6868
.build()?;
6969
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
7070
}
7171

7272
/// Create a filter based on a predicate expression
7373
fn filter(&self, predicate: Expr) -> Result<Arc<dyn DataFrame>> {
74-
let plan = LogicalPlanBuilder::from(&self.plan)
74+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
7575
.filter(predicate)?
7676
.build()?;
7777
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
@@ -83,21 +83,25 @@ impl DataFrame for DataFrameImpl {
8383
group_expr: Vec<Expr>,
8484
aggr_expr: Vec<Expr>,
8585
) -> Result<Arc<dyn DataFrame>> {
86-
let plan = LogicalPlanBuilder::from(&self.plan)
86+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
8787
.aggregate(group_expr, aggr_expr)?
8888
.build()?;
8989
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
9090
}
9191

9292
/// Limit the number of rows
9393
fn limit(&self, n: usize) -> Result<Arc<dyn DataFrame>> {
94-
let plan = LogicalPlanBuilder::from(&self.plan).limit(n)?.build()?;
94+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
95+
.limit(n)?
96+
.build()?;
9597
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
9698
}
9799

98100
/// Sort by specified sorting expressions
99101
fn sort(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
100-
let plan = LogicalPlanBuilder::from(&self.plan).sort(expr)?.build()?;
102+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
103+
.sort(expr)?
104+
.build()?;
101105
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
102106
}
103107

@@ -109,7 +113,7 @@ impl DataFrame for DataFrameImpl {
109113
left_cols: &[&str],
110114
right_cols: &[&str],
111115
) -> Result<Arc<dyn DataFrame>> {
112-
let plan = LogicalPlanBuilder::from(&self.plan)
116+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
113117
.join(
114118
&right.to_logical_plan(),
115119
join_type,
@@ -124,7 +128,7 @@ impl DataFrame for DataFrameImpl {
124128
&self,
125129
partitioning_scheme: Partitioning,
126130
) -> Result<Arc<dyn DataFrame>> {
127-
let plan = LogicalPlanBuilder::from(&self.plan)
131+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
128132
.repartition(partitioning_scheme)?
129133
.build()?;
130134
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
@@ -161,7 +165,7 @@ impl DataFrame for DataFrameImpl {
161165
}
162166

163167
fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>> {
164-
let plan = LogicalPlanBuilder::from(&self.plan)
168+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
165169
.explain(verbose)?
166170
.build()?;
167171
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
@@ -173,7 +177,7 @@ impl DataFrame for DataFrameImpl {
173177
}
174178

175179
fn union(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>> {
176-
let plan = LogicalPlanBuilder::from(&self.plan)
180+
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
177181
.union(dataframe.to_logical_plan())?
178182
.build()?;
179183
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))

datafusion/src/logical_plan/builder.rs

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,15 @@ pub struct LogicalPlanBuilder {
8989

9090
impl LogicalPlanBuilder {
9191
/// Create a builder from an existing plan
92-
pub fn from(plan: &LogicalPlan) -> Self {
93-
Self { plan: plan.clone() }
92+
pub fn from(plan: LogicalPlan) -> Self {
93+
Self { plan }
9494
}
9595

9696
/// Create an empty relation.
9797
///
9898
/// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
9999
pub fn empty(produce_one_row: bool) -> Self {
100-
Self::from(&LogicalPlan::EmptyRelation {
100+
Self::from(LogicalPlan::EmptyRelation {
101101
produce_one_row,
102102
schema: DFSchemaRef::new(DFSchema::empty()),
103103
})
@@ -182,7 +182,7 @@ impl LogicalPlanBuilder {
182182
limit: None,
183183
};
184184

185-
Ok(Self::from(&table_scan))
185+
Ok(Self::from(table_scan))
186186
}
187187

188188
/// Apply a projection.
@@ -214,7 +214,7 @@ impl LogicalPlanBuilder {
214214

215215
let schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;
216216

217-
Ok(Self::from(&LogicalPlan::Projection {
217+
Ok(Self::from(LogicalPlan::Projection {
218218
expr: projected_expr,
219219
input: Arc::new(self.plan.clone()),
220220
schema: DFSchemaRef::new(schema),
@@ -224,15 +224,15 @@ impl LogicalPlanBuilder {
224224
/// Apply a filter
225225
pub fn filter(&self, expr: Expr) -> Result<Self> {
226226
let expr = normalize_col(expr, &self.plan.all_schemas())?;
227-
Ok(Self::from(&LogicalPlan::Filter {
227+
Ok(Self::from(LogicalPlan::Filter {
228228
predicate: expr,
229229
input: Arc::new(self.plan.clone()),
230230
}))
231231
}
232232

233233
/// Apply a limit
234234
pub fn limit(&self, n: usize) -> Result<Self> {
235-
Ok(Self::from(&LogicalPlan::Limit {
235+
Ok(Self::from(LogicalPlan::Limit {
236236
n,
237237
input: Arc::new(self.plan.clone()),
238238
}))
@@ -241,19 +241,15 @@ impl LogicalPlanBuilder {
241241
/// Apply a sort
242242
pub fn sort(&self, exprs: impl IntoIterator<Item = Expr>) -> Result<Self> {
243243
let schemas = self.plan.all_schemas();
244-
Ok(Self::from(&LogicalPlan::Sort {
244+
Ok(Self::from(LogicalPlan::Sort {
245245
expr: normalize_cols(exprs, &schemas)?,
246246
input: Arc::new(self.plan.clone()),
247247
}))
248248
}
249249

250250
/// Apply a union
251251
pub fn union(&self, plan: LogicalPlan) -> Result<Self> {
252-
Ok(Self::from(&union_with_alias(
253-
self.plan.clone(),
254-
plan,
255-
None,
256-
)?))
252+
Ok(Self::from(union_with_alias(self.plan.clone(), plan, None)?))
257253
}
258254

259255
/// Apply a join with on constraint
@@ -287,7 +283,7 @@ impl LogicalPlanBuilder {
287283
&JoinConstraint::On,
288284
)?;
289285

290-
Ok(Self::from(&LogicalPlan::Join {
286+
Ok(Self::from(LogicalPlan::Join {
291287
left: Arc::new(self.plan.clone()),
292288
right: Arc::new(right.clone()),
293289
on,
@@ -323,7 +319,7 @@ impl LogicalPlanBuilder {
323319
&JoinConstraint::Using,
324320
)?;
325321

326-
Ok(Self::from(&LogicalPlan::Join {
322+
Ok(Self::from(LogicalPlan::Join {
327323
left: Arc::new(self.plan.clone()),
328324
right: Arc::new(right.clone()),
329325
on,
@@ -336,7 +332,7 @@ impl LogicalPlanBuilder {
336332
/// Apply a cross join
337333
pub fn cross_join(&self, right: &LogicalPlan) -> Result<Self> {
338334
let schema = self.plan.schema().join(right.schema())?;
339-
Ok(Self::from(&LogicalPlan::CrossJoin {
335+
Ok(Self::from(LogicalPlan::CrossJoin {
340336
left: Arc::new(self.plan.clone()),
341337
right: Arc::new(right.clone()),
342338
schema: DFSchemaRef::new(schema),
@@ -345,7 +341,7 @@ impl LogicalPlanBuilder {
345341

346342
/// Repartition
347343
pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result<Self> {
348-
Ok(Self::from(&LogicalPlan::Repartition {
344+
Ok(Self::from(LogicalPlan::Repartition {
349345
input: Arc::new(self.plan.clone()),
350346
partitioning_scheme,
351347
}))
@@ -368,7 +364,7 @@ impl LogicalPlanBuilder {
368364
exprlist_to_fields(all_expr, self.plan.schema())?;
369365
window_fields.extend_from_slice(self.plan.schema().fields());
370366

371-
Ok(Self::from(&LogicalPlan::Window {
367+
Ok(Self::from(LogicalPlan::Window {
372368
input: Arc::new(self.plan.clone()),
373369
window_expr,
374370
schema: Arc::new(DFSchema::new(window_fields)?),
@@ -393,7 +389,7 @@ impl LogicalPlanBuilder {
393389
let aggr_schema =
394390
DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;
395391

396-
Ok(Self::from(&LogicalPlan::Aggregate {
392+
Ok(Self::from(LogicalPlan::Aggregate {
397393
input: Arc::new(self.plan.clone()),
398394
group_expr,
399395
aggr_expr,
@@ -410,7 +406,7 @@ impl LogicalPlanBuilder {
410406

411407
let schema = LogicalPlan::explain_schema();
412408

413-
Ok(Self::from(&LogicalPlan::Explain {
409+
Ok(Self::from(LogicalPlan::Explain {
414410
verbose,
415411
plan: Arc::new(self.plan.clone()),
416412
stringified_plans,

0 commit comments

Comments
 (0)