@@ -26,8 +26,8 @@ use datafusion::logical_plan::window_frames::{
2626} ;
2727use datafusion:: logical_plan:: {
2828 abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin,
29- sqrt, tan, trunc, Column , DFField , DFSchema , Expr , JoinType , LogicalPlan ,
30- LogicalPlanBuilder , Operator ,
29+ sqrt, tan, trunc, Column , DFField , DFSchema , Expr , JoinConstraint , JoinType ,
30+ LogicalPlan , LogicalPlanBuilder , Operator ,
3131} ;
3232use datafusion:: physical_plan:: aggregates:: AggregateFunction ;
3333use datafusion:: physical_plan:: csv:: CsvReadOptions ;
@@ -247,23 +247,34 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
247247 join. join_type
248248 ) )
249249 } ) ?;
250- let join_type = match join_type {
251- protobuf:: JoinType :: Inner => JoinType :: Inner ,
252- protobuf:: JoinType :: Left => JoinType :: Left ,
253- protobuf:: JoinType :: Right => JoinType :: Right ,
254- protobuf:: JoinType :: Full => JoinType :: Full ,
255- protobuf:: JoinType :: Semi => JoinType :: Semi ,
256- protobuf:: JoinType :: Anti => JoinType :: Anti ,
257- } ;
258- LogicalPlanBuilder :: from ( & convert_box_required ! ( join. left) ?)
259- . join (
250+ let join_constraint = protobuf:: JoinConstraint :: from_i32 (
251+ join. join_constraint ,
252+ )
253+ . ok_or_else ( || {
254+ proto_error ( format ! (
255+ "Received a JoinNode message with unknown JoinConstraint {}" ,
256+ join. join_constraint
257+ ) )
258+ } ) ?;
259+
260+ let builder =
261+ LogicalPlanBuilder :: from ( & convert_box_required ! ( join. left) ?) ;
262+
263+ let builder = match join_constraint. into ( ) {
264+ JoinConstraint :: On => builder. join (
260265 & convert_box_required ! ( join. right) ?,
261- join_type,
266+ join_type. into ( ) ,
262267 left_keys,
263268 right_keys,
264- ) ?
265- . build ( )
266- . map_err ( |e| e. into ( ) )
269+ ) ?,
270+ JoinConstraint :: Using => builder. join_using (
271+ & convert_box_required ! ( join. right) ?,
272+ join_type. into ( ) ,
273+ left_keys,
274+ ) ?,
275+ } ;
276+
277+ builder. build ( ) . map_err ( |e| e. into ( ) )
267278 }
268279 }
269280 }
0 commit comments