Skip to content

Commit c4415b9

Browse files
leoyvensjonahgao
authored andcommitted
support recursive CTEs logical plans in datafusion-proto (apache#13314)
* support LogicaPlan::RecursiveQuery in datafusion-proto * fixed and failing test roundtrip_recursive_query * fix rebase artifact * add node for CteWorkTableScan in datafusion-proto * Use Arc::clone --------- Co-authored-by: jonahgao <jonahgao@msn.com>
1 parent 939425a commit c4415b9

File tree

8 files changed

+516
-38
lines changed

8 files changed

+516
-38
lines changed

datafusion/core/src/datasource/cte_worktable.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ use crate::datasource::{TableProvider, TableType};
3838
/// See here for more details: www.postgresql.org/docs/11/queries-with.html#id-1.5.6.12.5.4
3939
pub struct CteWorkTable {
4040
/// The name of the CTE work table
41-
// WIP, see https://github.com/apache/datafusion/issues/462
42-
#[allow(dead_code)]
4341
name: String,
4442
/// This schema must be shared across both the static and recursive terms of a recursive query
4543
table_schema: SchemaRef,
@@ -55,6 +53,16 @@ impl CteWorkTable {
5553
table_schema,
5654
}
5755
}
56+
57+
/// The user-provided name of the CTE
58+
pub fn name(&self) -> &str {
59+
&self.name
60+
}
61+
62+
/// The schema of the recursive term of the query
63+
pub fn schema(&self) -> SchemaRef {
64+
Arc::clone(&self.table_schema)
65+
}
5866
}
5967

6068
#[async_trait]

datafusion/proto-common/src/generated/pbjson.rs

Lines changed: 54 additions & 3 deletions
Large diffs are not rendered by default.

datafusion/proto-common/src/generated/prost.rs

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ pub struct ParquetFormat {
4545
pub options: ::core::option::Option<TableParquetOptions>,
4646
}
4747
#[allow(clippy::derive_partial_eq_without_eq)]
48-
#[derive(Clone, PartialEq, ::prost::Message)]
48+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
4949
pub struct AvroFormat {}
5050
#[allow(clippy::derive_partial_eq_without_eq)]
51-
#[derive(Clone, PartialEq, ::prost::Message)]
51+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
5252
pub struct NdJsonFormat {
5353
#[prost(message, optional, tag = "1")]
5454
pub options: ::core::option::Option<JsonOptions>,
@@ -89,10 +89,10 @@ pub struct Constraints {
8989
pub constraints: ::prost::alloc::vec::Vec<Constraint>,
9090
}
9191
#[allow(clippy::derive_partial_eq_without_eq)]
92-
#[derive(Clone, PartialEq, ::prost::Message)]
92+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
9393
pub struct AvroOptions {}
9494
#[allow(clippy::derive_partial_eq_without_eq)]
95-
#[derive(Clone, PartialEq, ::prost::Message)]
95+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
9696
pub struct ArrowOptions {}
9797
#[allow(clippy::derive_partial_eq_without_eq)]
9898
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -137,15 +137,15 @@ pub struct Timestamp {
137137
pub timezone: ::prost::alloc::string::String,
138138
}
139139
#[allow(clippy::derive_partial_eq_without_eq)]
140-
#[derive(Clone, PartialEq, ::prost::Message)]
140+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
141141
pub struct Decimal {
142142
#[prost(uint32, tag = "3")]
143143
pub precision: u32,
144144
#[prost(int32, tag = "4")]
145145
pub scale: i32,
146146
}
147147
#[allow(clippy::derive_partial_eq_without_eq)]
148-
#[derive(Clone, PartialEq, ::prost::Message)]
148+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
149149
pub struct Decimal256Type {
150150
#[prost(uint32, tag = "3")]
151151
pub precision: u32,
@@ -223,15 +223,15 @@ pub mod scalar_nested_value {
223223
}
224224
}
225225
#[allow(clippy::derive_partial_eq_without_eq)]
226-
#[derive(Clone, PartialEq, ::prost::Message)]
226+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
227227
pub struct ScalarTime32Value {
228228
#[prost(oneof = "scalar_time32_value::Value", tags = "1, 2")]
229229
pub value: ::core::option::Option<scalar_time32_value::Value>,
230230
}
231231
/// Nested message and enum types in `ScalarTime32Value`.
232232
pub mod scalar_time32_value {
233233
#[allow(clippy::derive_partial_eq_without_eq)]
234-
#[derive(Clone, PartialEq, ::prost::Oneof)]
234+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
235235
pub enum Value {
236236
#[prost(int32, tag = "1")]
237237
Time32SecondValue(i32),
@@ -240,15 +240,15 @@ pub mod scalar_time32_value {
240240
}
241241
}
242242
#[allow(clippy::derive_partial_eq_without_eq)]
243-
#[derive(Clone, PartialEq, ::prost::Message)]
243+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
244244
pub struct ScalarTime64Value {
245245
#[prost(oneof = "scalar_time64_value::Value", tags = "1, 2")]
246246
pub value: ::core::option::Option<scalar_time64_value::Value>,
247247
}
248248
/// Nested message and enum types in `ScalarTime64Value`.
249249
pub mod scalar_time64_value {
250250
#[allow(clippy::derive_partial_eq_without_eq)]
251-
#[derive(Clone, PartialEq, ::prost::Oneof)]
251+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
252252
pub enum Value {
253253
#[prost(int64, tag = "1")]
254254
Time64MicrosecondValue(i64),
@@ -267,7 +267,7 @@ pub struct ScalarTimestampValue {
267267
/// Nested message and enum types in `ScalarTimestampValue`.
268268
pub mod scalar_timestamp_value {
269269
#[allow(clippy::derive_partial_eq_without_eq)]
270-
#[derive(Clone, PartialEq, ::prost::Oneof)]
270+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
271271
pub enum Value {
272272
#[prost(int64, tag = "1")]
273273
TimeMicrosecondValue(i64),
@@ -288,15 +288,15 @@ pub struct ScalarDictionaryValue {
288288
pub value: ::core::option::Option<::prost::alloc::boxed::Box<ScalarValue>>,
289289
}
290290
#[allow(clippy::derive_partial_eq_without_eq)]
291-
#[derive(Clone, PartialEq, ::prost::Message)]
291+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
292292
pub struct IntervalDayTimeValue {
293293
#[prost(int32, tag = "1")]
294294
pub days: i32,
295295
#[prost(int32, tag = "2")]
296296
pub milliseconds: i32,
297297
}
298298
#[allow(clippy::derive_partial_eq_without_eq)]
299-
#[derive(Clone, PartialEq, ::prost::Message)]
299+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
300300
pub struct IntervalMonthDayNanoValue {
301301
#[prost(int32, tag = "1")]
302302
pub months: i32,
@@ -558,10 +558,10 @@ pub mod arrow_type {
558558
/// }
559559
/// }
560560
#[allow(clippy::derive_partial_eq_without_eq)]
561-
#[derive(Clone, PartialEq, ::prost::Message)]
561+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
562562
pub struct EmptyMessage {}
563563
#[allow(clippy::derive_partial_eq_without_eq)]
564-
#[derive(Clone, PartialEq, ::prost::Message)]
564+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
565565
pub struct JsonWriterOptions {
566566
#[prost(enumeration = "CompressionTypeVariant", tag = "1")]
567567
pub compression: i32,
@@ -658,7 +658,7 @@ pub struct CsvOptions {
658658
}
659659
/// Options controlling CSV format
660660
#[allow(clippy::derive_partial_eq_without_eq)]
661-
#[derive(Clone, PartialEq, ::prost::Message)]
661+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
662662
pub struct JsonOptions {
663663
/// Compression type
664664
#[prost(enumeration = "CompressionTypeVariant", tag = "1")]
@@ -723,7 +723,7 @@ pub struct ParquetColumnOptions {
723723
/// Nested message and enum types in `ParquetColumnOptions`.
724724
pub mod parquet_column_options {
725725
#[allow(clippy::derive_partial_eq_without_eq)]
726-
#[derive(Clone, PartialEq, ::prost::Oneof)]
726+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
727727
pub enum BloomFilterEnabledOpt {
728728
#[prost(bool, tag = "1")]
729729
BloomFilterEnabled(bool),
@@ -735,7 +735,7 @@ pub mod parquet_column_options {
735735
Encoding(::prost::alloc::string::String),
736736
}
737737
#[allow(clippy::derive_partial_eq_without_eq)]
738-
#[derive(Clone, PartialEq, ::prost::Oneof)]
738+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
739739
pub enum DictionaryEnabledOpt {
740740
#[prost(bool, tag = "3")]
741741
DictionaryEnabled(bool),
@@ -753,19 +753,19 @@ pub mod parquet_column_options {
753753
StatisticsEnabled(::prost::alloc::string::String),
754754
}
755755
#[allow(clippy::derive_partial_eq_without_eq)]
756-
#[derive(Clone, PartialEq, ::prost::Oneof)]
756+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
757757
pub enum BloomFilterFppOpt {
758758
#[prost(double, tag = "6")]
759759
BloomFilterFpp(f64),
760760
}
761761
#[allow(clippy::derive_partial_eq_without_eq)]
762-
#[derive(Clone, PartialEq, ::prost::Oneof)]
762+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
763763
pub enum BloomFilterNdvOpt {
764764
#[prost(uint64, tag = "7")]
765765
BloomFilterNdv(u64),
766766
}
767767
#[allow(clippy::derive_partial_eq_without_eq)]
768-
#[derive(Clone, PartialEq, ::prost::Oneof)]
768+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
769769
pub enum MaxStatisticsSizeOpt {
770770
#[prost(uint32, tag = "8")]
771771
MaxStatisticsSize(u32),
@@ -860,7 +860,7 @@ pub struct ParquetOptions {
860860
/// Nested message and enum types in `ParquetOptions`.
861861
pub mod parquet_options {
862862
#[allow(clippy::derive_partial_eq_without_eq)]
863-
#[derive(Clone, PartialEq, ::prost::Oneof)]
863+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
864864
pub enum MetadataSizeHintOpt {
865865
#[prost(uint64, tag = "4")]
866866
MetadataSizeHint(u64),
@@ -872,7 +872,7 @@ pub mod parquet_options {
872872
Compression(::prost::alloc::string::String),
873873
}
874874
#[allow(clippy::derive_partial_eq_without_eq)]
875-
#[derive(Clone, PartialEq, ::prost::Oneof)]
875+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
876876
pub enum DictionaryEnabledOpt {
877877
#[prost(bool, tag = "11")]
878878
DictionaryEnabled(bool),
@@ -884,13 +884,13 @@ pub mod parquet_options {
884884
StatisticsEnabled(::prost::alloc::string::String),
885885
}
886886
#[allow(clippy::derive_partial_eq_without_eq)]
887-
#[derive(Clone, PartialEq, ::prost::Oneof)]
887+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
888888
pub enum MaxStatisticsSizeOpt {
889889
#[prost(uint64, tag = "14")]
890890
MaxStatisticsSize(u64),
891891
}
892892
#[allow(clippy::derive_partial_eq_without_eq)]
893-
#[derive(Clone, PartialEq, ::prost::Oneof)]
893+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
894894
pub enum ColumnIndexTruncateLengthOpt {
895895
#[prost(uint64, tag = "17")]
896896
ColumnIndexTruncateLength(u64),
@@ -902,13 +902,13 @@ pub mod parquet_options {
902902
Encoding(::prost::alloc::string::String),
903903
}
904904
#[allow(clippy::derive_partial_eq_without_eq)]
905-
#[derive(Clone, PartialEq, ::prost::Oneof)]
905+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
906906
pub enum BloomFilterFppOpt {
907907
#[prost(double, tag = "21")]
908908
BloomFilterFpp(f64),
909909
}
910910
#[allow(clippy::derive_partial_eq_without_eq)]
911-
#[derive(Clone, PartialEq, ::prost::Oneof)]
911+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
912912
pub enum BloomFilterNdvOpt {
913913
#[prost(uint64, tag = "22")]
914914
BloomFilterNdv(u64),

datafusion/proto/proto/datafusion.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ message LogicalPlanNode {
5959
DistinctOnNode distinct_on = 28;
6060
CopyToNode copy_to = 29;
6161
UnnestNode unnest = 30;
62+
RecursiveQueryNode recursive_query = 31;
63+
CteWorkTableScanNode cte_work_table_scan = 32;
6264
}
6365
}
6466

@@ -1214,3 +1216,15 @@ message PartitionStats {
12141216
int64 num_bytes = 3;
12151217
repeated datafusion_common.ColumnStats column_stats = 4;
12161218
}
1219+
1220+
message RecursiveQueryNode {
1221+
string name = 1;
1222+
LogicalPlanNode static_term = 2;
1223+
LogicalPlanNode recursive_term = 3;
1224+
bool is_distinct = 4;
1225+
}
1226+
1227+
message CteWorkTableScanNode {
1228+
string name = 1;
1229+
datafusion_common.Schema schema = 2;
1230+
}

0 commit comments

Comments
 (0)