Skip to content

Commit f2c01de

Browse files
author
QP Hou
authored
Support qualified columns in queries (#55)
* support qualified columns in queries * handle coalesced hash join partition in HashJoinStream * implement Into<Column> for &str * add todo for ARROW-10971 * fix cross join handling in production push down optimizer When a projection is pushed down to cross join inputs, fields from resulting plan's schema need to be trimmed to only contain projected fields. * maintain field order during plan optimization using projections * change TableScane name from Option<String> to String * WIP: fix ballista * separate logical and physical expressions in proto, fix ballista build * fix join schema handling in production push down optimizer schema needs to be recalculated based on newly optimized inputs * tpch 7 & 8 are now passing! * fix roundtrip_join test * fix clippy warnings * fix sql planner test error checking with matches `format("{:?}", err)` yields different results between stable and nightly rust. * address FIXMEs * honor datafusion field name semantic strip qualifer name in physical field names * add more comment * enable more queries in benchmark/run.sh * use unzip to avoid unnecessary iterators * reduce diff by discarding style related changes * simplify hash_join tests * reduce diff for easier revuew * fix unnecessary reference clippy error * incorporate code review feedback * fix window schema handling in projection pushdown optimizer
1 parent a461e9c commit f2c01de

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+3603
-1880
lines changed

ballista/rust/core/proto/ballista.proto

Lines changed: 155 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,29 @@ option java_outer_classname = "BallistaProto";
2828
// Ballista Logical Plan
2929
///////////////////////////////////////////////////////////////////////////////////////////////////
3030

31+
message ColumnRelation {
32+
string relation = 1;
33+
}
34+
35+
message Column {
36+
string name = 1;
37+
ColumnRelation relation = 2;
38+
}
39+
40+
message DfField{
41+
Field field = 1;
42+
ColumnRelation qualifier = 2;
43+
}
44+
45+
message DfSchema {
46+
repeated DfField columns = 1;
47+
}
48+
3149
// logical expressions
3250
message LogicalExprNode {
3351
oneof ExprType {
3452
// column references
35-
string column_name = 1;
53+
Column column = 1;
3654

3755
// alias
3856
AliasNode alias = 2;
@@ -295,7 +313,7 @@ message CreateExternalTableNode{
295313
string location = 2;
296314
FileType file_type = 3;
297315
bool has_header = 4;
298-
Schema schema = 5;
316+
DfSchema schema = 5;
299317
}
300318

301319
enum FileType{
@@ -309,11 +327,6 @@ message ExplainNode{
309327
bool verbose = 2;
310328
}
311329

312-
message DfField{
313-
string qualifier = 2;
314-
Field field = 1;
315-
}
316-
317330
message AggregateNode {
318331
LogicalPlanNode input = 1;
319332
repeated LogicalExprNode group_expr = 2;
@@ -369,8 +382,8 @@ message JoinNode {
369382
LogicalPlanNode left = 1;
370383
LogicalPlanNode right = 2;
371384
JoinType join_type = 3;
372-
repeated string left_join_column = 4;
373-
repeated string right_join_column = 5;
385+
repeated Column left_join_column = 4;
386+
repeated Column right_join_column = 5;
374387
}
375388

376389
message LimitNode {
@@ -408,6 +421,119 @@ message PhysicalPlanNode {
408421
}
409422
}
410423

424+
// physical expressions
425+
message PhysicalExprNode {
426+
oneof ExprType {
427+
// column references
428+
PhysicalColumn column = 1;
429+
430+
ScalarValue literal = 2;
431+
432+
// binary expressions
433+
PhysicalBinaryExprNode binary_expr = 3;
434+
435+
// aggregate expressions
436+
PhysicalAggregateExprNode aggregate_expr = 4;
437+
438+
// null checks
439+
PhysicalIsNull is_null_expr = 5;
440+
PhysicalIsNotNull is_not_null_expr = 6;
441+
PhysicalNot not_expr = 7;
442+
443+
PhysicalCaseNode case_ = 8;
444+
PhysicalCastNode cast = 9;
445+
PhysicalSortExprNode sort = 10;
446+
PhysicalNegativeNode negative = 11;
447+
PhysicalInListNode in_list = 12;
448+
PhysicalScalarFunctionNode scalar_function = 13;
449+
PhysicalTryCastNode try_cast = 14;
450+
451+
// window expressions
452+
PhysicalWindowExprNode window_expr = 15;
453+
}
454+
}
455+
456+
message PhysicalAggregateExprNode {
457+
AggregateFunction aggr_function = 1;
458+
PhysicalExprNode expr = 2;
459+
}
460+
461+
message PhysicalWindowExprNode {
462+
oneof window_function {
463+
AggregateFunction aggr_function = 1;
464+
BuiltInWindowFunction built_in_function = 2;
465+
// udaf = 3
466+
}
467+
PhysicalExprNode expr = 4;
468+
}
469+
470+
message PhysicalIsNull {
471+
PhysicalExprNode expr = 1;
472+
}
473+
474+
message PhysicalIsNotNull {
475+
PhysicalExprNode expr = 1;
476+
}
477+
478+
message PhysicalNot {
479+
PhysicalExprNode expr = 1;
480+
}
481+
482+
message PhysicalAliasNode {
483+
PhysicalExprNode expr = 1;
484+
string alias = 2;
485+
}
486+
487+
message PhysicalBinaryExprNode {
488+
PhysicalExprNode l = 1;
489+
PhysicalExprNode r = 2;
490+
string op = 3;
491+
}
492+
493+
message PhysicalSortExprNode {
494+
PhysicalExprNode expr = 1;
495+
bool asc = 2;
496+
bool nulls_first = 3;
497+
}
498+
499+
message PhysicalWhenThen {
500+
PhysicalExprNode when_expr = 1;
501+
PhysicalExprNode then_expr = 2;
502+
}
503+
504+
message PhysicalInListNode {
505+
PhysicalExprNode expr = 1;
506+
repeated PhysicalExprNode list = 2;
507+
bool negated = 3;
508+
}
509+
510+
message PhysicalCaseNode {
511+
PhysicalExprNode expr = 1;
512+
repeated PhysicalWhenThen when_then_expr = 2;
513+
PhysicalExprNode else_expr = 3;
514+
}
515+
516+
message PhysicalScalarFunctionNode {
517+
string name = 1;
518+
ScalarFunction fun = 2;
519+
repeated PhysicalExprNode args = 3;
520+
ArrowType return_type = 4;
521+
}
522+
523+
message PhysicalTryCastNode {
524+
PhysicalExprNode expr = 1;
525+
ArrowType arrow_type = 2;
526+
}
527+
528+
message PhysicalCastNode {
529+
PhysicalExprNode expr = 1;
530+
ArrowType arrow_type = 2;
531+
}
532+
533+
message PhysicalNegativeNode {
534+
PhysicalExprNode expr = 1;
535+
}
536+
411537
message UnresolvedShuffleExecNode {
412538
repeated uint32 query_stage_ids = 1;
413539
Schema schema = 2;
@@ -416,7 +542,7 @@ message UnresolvedShuffleExecNode {
416542

417543
message FilterExecNode {
418544
PhysicalPlanNode input = 1;
419-
LogicalExprNode expr = 2;
545+
PhysicalExprNode expr = 2;
420546
}
421547

422548
message ParquetScanExecNode {
@@ -447,11 +573,15 @@ message HashJoinExecNode {
447573

448574
}
449575

450-
message JoinOn {
451-
string left = 1;
452-
string right = 2;
576+
message PhysicalColumn {
577+
string name = 1;
578+
uint32 index = 2;
453579
}
454580

581+
message JoinOn {
582+
PhysicalColumn left = 1;
583+
PhysicalColumn right = 2;
584+
}
455585

456586
message EmptyExecNode {
457587
bool produce_one_row = 1;
@@ -460,7 +590,7 @@ message EmptyExecNode {
460590

461591
message ProjectionExecNode {
462592
PhysicalPlanNode input = 1;
463-
repeated LogicalExprNode expr = 2;
593+
repeated PhysicalExprNode expr = 2;
464594
repeated string expr_name = 3;
465595
}
466596

@@ -472,14 +602,14 @@ enum AggregateMode {
472602

473603
message WindowAggExecNode {
474604
PhysicalPlanNode input = 1;
475-
repeated LogicalExprNode window_expr = 2;
605+
repeated PhysicalExprNode window_expr = 2;
476606
repeated string window_expr_name = 3;
477607
Schema input_schema = 4;
478608
}
479609

480610
message HashAggregateExecNode {
481-
repeated LogicalExprNode group_expr = 1;
482-
repeated LogicalExprNode aggr_expr = 2;
611+
repeated PhysicalExprNode group_expr = 1;
612+
repeated PhysicalExprNode aggr_expr = 2;
483613
AggregateMode mode = 3;
484614
PhysicalPlanNode input = 4;
485615
repeated string group_expr_name = 5;
@@ -510,7 +640,7 @@ message LocalLimitExecNode {
510640

511641
message SortExecNode {
512642
PhysicalPlanNode input = 1;
513-
repeated LogicalExprNode expr = 2;
643+
repeated PhysicalExprNode expr = 2;
514644
}
515645

516646
message CoalesceBatchesExecNode {
@@ -522,11 +652,16 @@ message MergeExecNode {
522652
PhysicalPlanNode input = 1;
523653
}
524654

655+
message PhysicalHashRepartition {
656+
repeated PhysicalExprNode hash_expr = 1;
657+
uint64 partition_count = 2;
658+
}
659+
525660
message RepartitionExecNode{
526661
PhysicalPlanNode input = 1;
527662
oneof partition_method {
528663
uint64 round_robin = 2;
529-
HashRepartition hash = 3;
664+
PhysicalHashRepartition hash = 3;
530665
uint64 unknown = 4;
531666
}
532667
}
@@ -803,7 +938,7 @@ message ScalarListValue{
803938

804939

805940
message ScalarValue{
806-
oneof value{
941+
oneof value {
807942
bool bool_value = 1;
808943
string utf8_value = 2;
809944
string large_utf8_value = 3;

0 commit comments

Comments
 (0)