From 47e45188b88fd7c0ec704517fedfcc0ad8bda23a Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Wed, 8 Feb 2023 18:25:54 +0800 Subject: [PATCH] feat: upgrade to Datafusion 17 (#601) * feat: upgrade to datafusion 17 * fix integration tests * fix integration test * make CI happy * refactor code * refactor by CR * chore: remove uesless file --- Cargo.lock | 647 +++++++++++++++--- Cargo.toml | 23 +- analytic_engine/Cargo.toml | 1 + .../src/row_iter/record_batch_stream.rs | 4 +- analytic_engine/src/sst/parquet/encoding.rs | 8 +- analytic_engine/src/sst/parquet/hybrid.rs | 11 +- analytic_engine/src/table/mod.rs | 3 +- common_types/Cargo.toml | 3 +- common_types/src/column_schema.rs | 32 +- common_types/src/datum.rs | 24 +- components/arrow_ext/src/display.rs | 4 +- components/message_queue/src/tests/util.rs | 2 +- components/object_store/Cargo.toml | 2 +- components/parquet_ext/src/prune/equal.rs | 6 +- components/parquet_ext/src/prune/min_max.rs | 6 +- df_operator/src/aggregate.rs | 14 +- df_operator/src/udfs/time_bucket.rs | 28 +- .../cases/local/00_dummy/select_1.result | 2 +- .../02_function/thetasketch_distinct.result | 2 +- .../cases/local/03_dml/issue-59.result | 4 +- .../cases/local/03_dml/select_having.result | 25 - .../cases/local/03_dml/select_having.sql | 19 - .../cases/local/04_explain/explain.result | 2 +- .../cases/local/07_optimizer/optimizer.result | 4 +- interpreters/Cargo.toml | 1 + interpreters/src/insert.rs | 31 +- interpreters/src/show_create.rs | 3 - interpreters/src/tests.rs | 11 +- query_engine/src/context.rs | 35 +- .../src/df_execution_extension/prom_align.rs | 36 +- query_engine/src/df_planner_extension/mod.rs | 2 +- .../src/df_planner_extension/prom_align.rs | 2 +- .../table_scan_by_primary_key.rs | 2 +- query_engine/src/logical_optimizer/mod.rs | 6 +- .../logical_optimizer/order_by_primary_key.rs | 62 +- query_engine/src/logical_optimizer/tests.rs | 26 +- .../src/logical_optimizer/type_conversion.rs | 80 ++- .../physical_optimizer/coalesce_batches.rs | 16 +- query_engine/src/physical_optimizer/mod.rs | 1 + .../src/physical_optimizer/repartition.rs | 8 +- server/Cargo.toml | 1 + server/src/limiter.rs | 3 +- sql/Cargo.toml | 3 +- sql/src/parser.rs | 50 +- sql/src/plan.rs | 5 +- sql/src/planner.rs | 79 ++- sql/src/promql/convert.rs | 16 +- sql/src/promql/datafusion_util.rs | 9 +- sql/src/promql/udf.rs | 15 +- sql/src/provider.rs | 18 +- .../partition/rule/df_adapter/extractor.rs | 20 +- table_engine/src/predicate.rs | 17 +- table_engine/src/provider.rs | 104 ++- wal/src/table_kv_impl/encoding.rs | 55 +- wal/src/table_kv_impl/namespace.rs | 42 +- 55 files changed, 1110 insertions(+), 525 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6106d11d44..4f9637ee5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,13 +86,14 @@ version = "1.0.0-alpha02" dependencies = [ "arc-swap 1.5.1", "arena", - "arrow", + "arrow 31.0.0", "async-trait", "base64 0.13.0", "bytes 1.2.1", "common_types", "common_util", "datafusion", + "datafusion-expr", "env_logger", "ethbloom", "futures 0.3.25", @@ -121,6 +122,15 @@ dependencies = [ "wal", ] +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "ansi_term" version = "0.12.1" @@ -192,14 +202,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fedc767fbaa36ea50f086215f54f1a007d22046fc4754b0448c657bcbe9f8413" dependencies = [ "ahash 0.8.0", - "arrow-buffer", + "arrow-buffer 23.0.0", "bitflags", "chrono", - "comfy-table", "csv", - "flatbuffers", + "flatbuffers 2.1.2", "half 2.1.0", - "hashbrown", + "hashbrown 0.12.3", "indexmap", "lazy_static", "lexical-core 0.8.5", @@ -211,6 +220,60 @@ dependencies = [ "serde_json", ] +[[package]] +name = "arrow" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b556d39f9d19e363833a0fe65d591cd0e2ecc0977589a78179b592bea8dc945" +dependencies = [ + "ahash 0.8.0", + "arrow-arith", + "arrow-array", + "arrow-buffer 31.0.0", + "arrow-cast", + "arrow-csv", + "arrow-data", + "arrow-ipc", + "arrow-json", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", + "comfy-table", +] + +[[package]] +name = "arrow-arith" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85c61b9235694b48f60d89e0e8d6cb478f39c65dd14b0fe1c3f04379b7d50068" +dependencies = [ + "arrow-array", + "arrow-buffer 31.0.0", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.1.0", + "num", +] + +[[package]] +name = "arrow-array" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1e6e839764618a911cc460a58ebee5ad3d42bc12d9a5e96a29b7cc296303aa1" +dependencies = [ + "ahash 0.8.0", + "arrow-buffer 31.0.0", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.1.0", + "hashbrown 0.13.2", + "num", +] + [[package]] name = "arrow-buffer" version = "23.0.0" @@ -221,6 +284,63 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-buffer" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03a21d232b1bc1190a3fdd2f9c1e39b7cd41235e95a0d44dd4f522bc5f495748" +dependencies = [ + "half 2.1.0", + "num", +] + +[[package]] +name = "arrow-cast" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83dcdb1436cac574f1c1b30fda91c53c467534337bef4064bbd4ea2d6fbc6e04" +dependencies = [ + "arrow-array", + "arrow-buffer 31.0.0", + "arrow-data", + "arrow-schema", + "arrow-select", + "chrono", + "lexical-core 0.8.5", + "num", +] + +[[package]] +name = "arrow-csv" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a01677ae9458f5af9e35e1aa6ba97502f539e621db0c6672566403f97edd0448" +dependencies = [ + "arrow-array", + "arrow-buffer 31.0.0", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "csv", + "csv-core", + "lazy_static", + "lexical-core 0.8.5", + "regex", +] + +[[package]] +name = "arrow-data" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14e3e69c9fd98357eeeab4aa0f626ecf7ecf663e68e8fc04eac87c424a414477" +dependencies = [ + "arrow-buffer 31.0.0", + "arrow-schema", + "half 2.1.0", + "num", +] + [[package]] name = "arrow-format" version = "0.6.0" @@ -231,6 +351,101 @@ dependencies = [ "serde", ] +[[package]] +name = "arrow-ipc" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64cac2706acbd796965b6eaf0da30204fe44aacf70273f8cb3c9b7d7f3d4c190" +dependencies = [ + "arrow-array", + "arrow-buffer 31.0.0", + "arrow-cast", + "arrow-data", + "arrow-schema", + "flatbuffers 22.9.29", +] + +[[package]] +name = "arrow-json" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7790e8b7df2d8ef5ac802377ac256cf2fb80cbf7d44b82d6464e20ace6232a5a" +dependencies = [ + "arrow-array", + "arrow-buffer 31.0.0", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.1.0", + "indexmap", + "num", + "serde_json", +] + +[[package]] +name = "arrow-ord" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7ee6e1b761dfffaaf7b5bbe68c113a576a3a802146c5c0b9fcec781e30d80a3" +dependencies = [ + "arrow-array", + "arrow-buffer 31.0.0", + "arrow-data", + "arrow-schema", + "arrow-select", + "num", +] + +[[package]] +name = "arrow-row" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e65bfedf782fc92721e796fdd26ae7343c98ba9a9243d62def9e4e1c4c1cf0b" +dependencies = [ + "ahash 0.8.0", + "arrow-array", + "arrow-buffer 31.0.0", + "arrow-data", + "arrow-schema", + "half 2.1.0", + "hashbrown 0.13.2", +] + +[[package]] +name = "arrow-schema" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73ca49d010b27e2d73f70c1d1f90c1b378550ed0f4ad379c4dea0c997d97d723" + +[[package]] +name = "arrow-select" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "976cbaeb1a85c09eea81f3f9c149c758630ff422ed0238624c5c3f4704b6a53c" +dependencies = [ + "arrow-array", + "arrow-buffer 31.0.0", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "arrow-string" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d4882762f8f48a9218946c016553d38b04b4fe8202038dad4141b3b887b7da8" +dependencies = [ + "arrow-array", + "arrow-buffer 31.0.0", + "arrow-data", + "arrow-schema", + "arrow-select", + "regex", + "regex-syntax", +] + [[package]] name = "arrow2" version = "0.12.0" @@ -255,11 +470,27 @@ dependencies = [ name = "arrow_ext" version = "1.0.0-alpha02" dependencies = [ - "arrow", + "arrow 31.0.0", "snafu 0.6.10", "zstd 0.12.1+zstd.1.5.2", ] +[[package]] +name = "async-compression" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" +dependencies = [ + "bzip2", + "flate2", + "futures-core", + "futures-io", + "memchr", + "pin-project-lite", + "tokio 1.25.0", + "xz2", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -358,7 +589,7 @@ dependencies = [ "matchit", "memchr", "mime", - "percent-encoding 2.1.0", + "percent-encoding 2.2.0", "pin-project-lite", "serde", "sync_wrapper", @@ -415,13 +646,19 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "base64" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" + [[package]] name = "benchmarks" version = "1.0.0-alpha02" dependencies = [ "analytic_engine", "arena", - "arrow", + "arrow 31.0.0", "arrow2", "arrow_ext", "base64 0.13.0", @@ -721,6 +958,16 @@ dependencies = [ "snafu 0.6.10", ] +[[package]] +name = "bzip2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8" +dependencies = [ + "bzip2-sys", + "libc", +] + [[package]] name = "bzip2-sys" version = "0.1.11+1.0.8" @@ -807,7 +1054,7 @@ name = "ceresdb-client-rs" version = "0.1.0" source = "git+https://github.com/CeresDB/ceresdb-client-rs.git?rev=a72e673103463c7962e01a097592fc7edbcc0b79#a72e673103463c7962e01a097592fc7edbcc0b79" dependencies = [ - "arrow", + "arrow 23.0.0", "async-trait", "ceresdbproto", "dashmap 5.4.0", @@ -873,10 +1120,11 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.20" +version = "0.4.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6127248204b9aba09a362f6c930ef6a78f2c1b2215f8a7b398c06e1083f17af0" +checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" dependencies = [ + "iana-time-zone", "js-sys", "num-integer", "num-traits", @@ -994,6 +1242,16 @@ dependencies = [ "cc", ] +[[package]] +name = "codespan-reporting" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" +dependencies = [ + "termcolor", + "unicode-width", +] + [[package]] name = "comfy-table" version = "6.1.0" @@ -1009,7 +1267,7 @@ dependencies = [ name = "common_types" version = "1.0.0-alpha02" dependencies = [ - "arrow", + "arrow 31.0.0", "arrow_ext", "byteorder", "bytes_ext", @@ -1030,7 +1288,7 @@ dependencies = [ name = "common_util" version = "1.0.0-alpha02" dependencies = [ - "arrow", + "arrow 31.0.0", "avro-rs", "backtrace", "chrono", @@ -1379,6 +1637,50 @@ dependencies = [ "memchr", ] +[[package]] +name = "cxx" +version = "1.0.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322296e2f2e5af4270b54df9e85a02ff037e271af20ba3e7fe1575515dc840b8" +dependencies = [ + "cc", + "cxxbridge-flags", + "cxxbridge-macro", + "link-cplusplus", +] + +[[package]] +name = "cxx-build" +version = "1.0.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "017a1385b05d631e7875b1f151c9f012d37b53491e2a87f65bff5c262b2111d8" +dependencies = [ + "cc", + "codespan-reporting", + "once_cell", + "proc-macro2", + "quote", + "scratch", + "syn", +] + +[[package]] +name = "cxxbridge-flags" +version = "1.0.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c26bbb078acf09bc1ecda02d4223f03bdd28bd4874edcb0379138efc499ce971" + +[[package]] +name = "cxxbridge-macro" +version = "1.0.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357f40d1f06a24b60ae1fe122542c1fb05d28d32acb2aed064e84bc2ad1e252e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "darling" version = "0.14.2" @@ -1432,7 +1734,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" dependencies = [ "cfg-if 1.0.0", - "hashbrown", + "hashbrown 0.12.3", "lock_api 0.4.9", "once_cell", "parking_lot_core 0.9.3", @@ -1440,32 +1742,38 @@ dependencies = [ [[package]] name = "datafusion" -version = "12.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=d84ea9c79c9e83ff0b4dadf8880a4983af59ef48#d84ea9c79c9e83ff0b4dadf8880a4983af59ef48" +version = "17.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6d90cae91414aaeda37ae8022a23ef1124ca8efc08ac7d7770274249f7cf148" dependencies = [ "ahash 0.8.0", - "arrow", + "arrow 31.0.0", + "async-compression", "async-trait", "bytes 1.2.1", + "bzip2", "chrono", + "dashmap 5.4.0", "datafusion-common", "datafusion-expr", "datafusion-optimizer", "datafusion-physical-expr", "datafusion-row", "datafusion-sql", + "flate2", "futures 0.3.25", "glob", - "hashbrown", + "hashbrown 0.13.2", + "indexmap", "itertools", "lazy_static", "log", "num_cpus", - "object_store 0.5.1", - "ordered-float 3.0.0", + "object_store 0.5.3", "parking_lot 0.12.1", "parquet", "paste 1.0.8", + "percent-encoding 2.2.0", "pin-project-lite", "rand 0.8.5", "smallvec 1.9.0", @@ -1473,92 +1781,112 @@ dependencies = [ "tempfile", "tokio 1.25.0", "tokio-stream", + "tokio-util", "url 2.2.2", - "uuid 1.1.2", + "uuid 1.2.2", + "xz2", ] [[package]] name = "datafusion-common" -version = "12.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=d84ea9c79c9e83ff0b4dadf8880a4983af59ef48#d84ea9c79c9e83ff0b4dadf8880a4983af59ef48" +version = "17.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b21c4b8e8b7815e86d79d25da16854fee6d4d1b386572e802a248b7d43188e86" dependencies = [ - "arrow", - "object_store 0.5.1", - "ordered-float 3.0.0", + "arrow 31.0.0", + "chrono", + "num_cpus", + "object_store 0.5.3", "parquet", - "serde_json", "sqlparser", ] [[package]] name = "datafusion-expr" -version = "12.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=d84ea9c79c9e83ff0b4dadf8880a4983af59ef48#d84ea9c79c9e83ff0b4dadf8880a4983af59ef48" +version = "17.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db8c07b051fbaf01657a3eb910a76b042ecfed0350a40412f70cf6b949bd5328" dependencies = [ "ahash 0.8.0", - "arrow", + "arrow 31.0.0", "datafusion-common", + "log", "sqlparser", ] [[package]] name = "datafusion-optimizer" -version = "12.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=d84ea9c79c9e83ff0b4dadf8880a4983af59ef48#d84ea9c79c9e83ff0b4dadf8880a4983af59ef48" +version = "17.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2ce4d34a808cd2e4c4864cdc759dd1bd22dcac2b8af38aa570e30fd54577c4d" dependencies = [ - "arrow", + "arrow 31.0.0", "async-trait", "chrono", "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown", + "hashbrown 0.13.2", "log", + "regex-syntax", ] [[package]] name = "datafusion-physical-expr" -version = "12.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=d84ea9c79c9e83ff0b4dadf8880a4983af59ef48#d84ea9c79c9e83ff0b4dadf8880a4983af59ef48" +version = "17.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a38afa11a09505c24bd7e595039d7914ec39329ba490209413ef2d37895c8220" dependencies = [ "ahash 0.8.0", - "arrow", + "arrow 31.0.0", + "arrow-buffer 31.0.0", + "arrow-schema", "blake2", "blake3", "chrono", "datafusion-common", "datafusion-expr", "datafusion-row", - "hashbrown", + "half 2.1.0", + "hashbrown 0.13.2", + "indexmap", + "itertools", "lazy_static", "md-5", - "ordered-float 3.0.0", + "num-traits", "paste 1.0.8", "rand 0.8.5", "regex", "sha2 0.10.2", "unicode-segmentation", + "uuid 1.2.2", ] [[package]] name = "datafusion-proto" -version = "12.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=d84ea9c79c9e83ff0b4dadf8880a4983af59ef48#d84ea9c79c9e83ff0b4dadf8880a4983af59ef48" +version = "17.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e58a6e887a6965f35c4feb5787ae7185803cc5e2fda15d02825194a0a6c1a" dependencies = [ - "arrow", + "arrow 31.0.0", + "chrono", "datafusion", "datafusion-common", "datafusion-expr", + "object_store 0.5.3", + "parking_lot 0.12.1", + "pbjson-build", "prost", "prost-build", ] [[package]] name = "datafusion-row" -version = "12.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=d84ea9c79c9e83ff0b4dadf8880a4983af59ef48#d84ea9c79c9e83ff0b4dadf8880a4983af59ef48" +version = "17.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9172411b25ff4aa97f8e99884898595a581636d93cc96c12f96dbe3bf51cd7e5" dependencies = [ - "arrow", + "arrow 31.0.0", "datafusion-common", "paste 1.0.8", "rand 0.8.5", @@ -1566,16 +1894,15 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "12.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=d84ea9c79c9e83ff0b4dadf8880a4983af59ef48#d84ea9c79c9e83ff0b4dadf8880a4983af59ef48" +version = "17.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fbe5e61563ced2f6992a60afea568ff3de69e32940bbf07db06fc5c9d8cd866" dependencies = [ - "ahash 0.8.0", - "arrow", + "arrow-schema", "datafusion-common", "datafusion-expr", - "hashbrown", + "log", "sqlparser", - "tokio 1.25.0", ] [[package]] @@ -1584,7 +1911,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d" dependencies = [ - "uuid 1.1.2", + "uuid 1.2.2", ] [[package]] @@ -1635,7 +1962,7 @@ dependencies = [ name = "df_operator" version = "1.0.0-alpha02" dependencies = [ - "arrow", + "arrow 31.0.0", "base64 0.13.0", "bincode", "chrono", @@ -1904,6 +2231,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "flatbuffers" +version = "22.9.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce016b9901aef3579617931fbb2df8fc9a9f7cb95a16eb8acc8148209bb9e70" +dependencies = [ + "bitflags", + "thiserror", +] + [[package]] name = "flate2" version = "1.0.24" @@ -1943,7 +2280,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" dependencies = [ "matches", - "percent-encoding 2.1.0", + "percent-encoding 2.2.0", ] [[package]] @@ -2311,6 +2648,15 @@ dependencies = [ "ahash 0.7.6", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash 0.8.0", +] + [[package]] name = "headers" version = "0.3.7" @@ -2553,6 +2899,30 @@ dependencies = [ "siphasher", ] +[[package]] +name = "iana-time-zone" +version = "0.1.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64c122667b287044802d6ce17ee2ddf13207ed924c712de9a66a5814d5b64765" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "winapi 0.3.9", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca" +dependencies = [ + "cxx", + "cxx-build", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -2601,12 +2971,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.9.1" +version = "1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" +checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" dependencies = [ "autocfg 1.1.0", - "hashbrown", + "hashbrown 0.12.3", ] [[package]] @@ -2657,7 +3027,7 @@ name = "interpreters" version = "1.0.0-alpha02" dependencies = [ "analytic_engine", - "arrow", + "arrow 31.0.0", "async-trait", "catalog", "catalog_impls", @@ -2665,6 +3035,7 @@ dependencies = [ "common_util", "datafusion", "datafusion-expr", + "datafusion-optimizer", "datafusion-proto", "df_operator", "log", @@ -3002,6 +3373,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "link-cplusplus" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5" +dependencies = [ + "cc", +] + [[package]] name = "linux-raw-sys" version = "0.1.4" @@ -3055,7 +3435,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a" dependencies = [ - "hashbrown", + "hashbrown 0.12.3", ] [[package]] @@ -3078,6 +3458,17 @@ dependencies = [ "libc", ] +[[package]] +name = "lzma-sys" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "matchers" version = "0.0.1" @@ -3161,7 +3552,7 @@ dependencies = [ "serde_derive", "snafu 0.6.10", "tokio 1.25.0", - "uuid 1.1.2", + "uuid 1.2.2", ] [[package]] @@ -3336,7 +3727,7 @@ dependencies = [ "named_pipe", "net2", "nix 0.15.0", - "percent-encoding 2.1.0", + "percent-encoding 2.2.0", "regex", "serde", "serde_json", @@ -3408,7 +3799,7 @@ dependencies = [ "subprocess", "thiserror", "time 0.3.15", - "uuid 1.1.2", + "uuid 1.2.2", ] [[package]] @@ -3640,9 +4031,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.5.1" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56ce10a205d9f610ae3532943039c34c145930065ce0c4284134c897fe6073b1" +checksum = "b4201837dc4c27a8670f0363b1255cd3845a4f0c521211cced1ed14c1d0cc6d2" dependencies = [ "async-trait", "bytes 1.2.1", @@ -3650,7 +4041,7 @@ dependencies = [ "futures 0.3.25", "itertools", "parking_lot 0.12.1", - "percent-encoding 2.1.0", + "percent-encoding 2.2.0", "snafu 0.7.1", "tokio 1.25.0", "tracing", @@ -3672,7 +4063,7 @@ dependencies = [ "lazy_static", "log", "lru", - "object_store 0.5.1", + "object_store 0.5.3", "oss-rust-sdk", "prometheus 0.12.0", "prometheus-static-metric", @@ -3807,9 +4198,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "3.0.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96bcbab4bfea7a59c2c0fe47211a1ac4e3e96bea6eb446d704f310bc5c732ae2" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" dependencies = [ "num-traits", ] @@ -3915,28 +4306,35 @@ dependencies = [ [[package]] name = "parquet" -version = "23.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc38abcd826e52e80a45abe46707745e6d12d31031a9b11c2eab112def7efe71" +checksum = "6b4ee1ffc0778395c9783a5c74f2cad2fb1a128ade95a965212d31b7b13e3d45" dependencies = [ "ahash 0.8.0", - "arrow", - "base64 0.13.0", + "arrow-array", + "arrow-buffer 31.0.0", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64 0.21.0", "brotli", "bytes 1.2.1", "chrono", "flate2", "futures 0.3.25", - "hashbrown", + "hashbrown 0.13.2", "lz4", "num", "num-bigint 0.4.3", - "rand 0.8.5", + "paste 1.0.8", "seq-macro", "snap", - "thrift 0.16.0", + "thrift 0.17.0", "tokio 1.25.0", - "zstd 0.11.2+zstd.1.5.2", + "twox-hash", + "zstd 0.12.1+zstd.1.5.2", ] [[package]] @@ -3976,7 +4374,7 @@ dependencies = [ name = "parquet_ext" version = "1.0.0-alpha02" dependencies = [ - "arrow", + "arrow 31.0.0", "arrow_ext", "async-trait", "bytes 1.2.1", @@ -4015,6 +4413,18 @@ dependencies = [ "proc-macro-hack", ] +[[package]] +name = "pbjson-build" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdbb7b706f2afc610f3853550cdbbf6372fd324824a087806bd4480ea4996e24" +dependencies = [ + "heck 0.4.0", + "itertools", + "prost", + "prost-types", +] + [[package]] name = "peeking_take_while" version = "0.1.2" @@ -4029,9 +4439,9 @@ checksum = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" [[package]] name = "percent-encoding" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" [[package]] name = "petgraph" @@ -4435,7 +4845,7 @@ dependencies = [ name = "query_engine" version = "1.0.0-alpha02" dependencies = [ - "arrow", + "arrow 31.0.0", "async-trait", "common_types", "common_util", @@ -4763,9 +5173,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.6.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" +checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733" dependencies = [ "aho-corasick", "memchr", @@ -4783,9 +5193,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.27" +version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" +checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" [[package]] name = "remote_engine_client" @@ -4874,7 +5284,7 @@ dependencies = [ "mime", "native-tls", "once_cell", - "percent-encoding 2.1.0", + "percent-encoding 2.2.0", "pin-project-lite", "serde", "serde_json", @@ -5134,6 +5544,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "scratch" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" + [[package]] name = "security-framework" version = "2.3.1" @@ -5263,7 +5679,7 @@ name = "server" version = "1.0.0-alpha02" dependencies = [ "analytic_engine", - "arrow", + "arrow 31.0.0", "arrow_ext", "async-trait", "bytes 1.2.1", @@ -5273,6 +5689,7 @@ dependencies = [ "common_types", "common_util", "datafusion", + "datafusion-expr", "df_operator", "futures 0.3.25", "http 0.2.8", @@ -5594,7 +6011,8 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" name = "sql" version = "1.0.0-alpha02" dependencies = [ - "arrow", + "arrow 31.0.0", + "async-trait", "catalog", "ceresdbproto", "common_types", @@ -5603,7 +6021,7 @@ dependencies = [ "datafusion-expr", "datafusion-proto", "df_operator", - "hashbrown", + "hashbrown 0.12.3", "log", "paste 1.0.8", "regex", @@ -5631,12 +6049,24 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.23.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0beb13adabbdda01b63d595f38c8bfd19a361e697fd94ce0098a634077bc5b25" +checksum = "db67dc6ef36edb658196c3fef0464a80b53dbbc194a904e81f9bd4190f9ecc5b" dependencies = [ "log", "serde", + "sqlparser_derive", +] + +[[package]] +name = "sqlparser_derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -5785,7 +6215,7 @@ dependencies = [ "debugid", "memmap2", "stable_deref_trait", - "uuid 1.1.2", + "uuid 1.2.2", ] [[package]] @@ -5832,7 +6262,7 @@ dependencies = [ name = "system_catalog" version = "1.0.0-alpha02" dependencies = [ - "arrow", + "arrow 31.0.0", "async-trait", "catalog", "common_types", @@ -5850,7 +6280,7 @@ dependencies = [ name = "table_engine" version = "1.0.0-alpha02" dependencies = [ - "arrow", + "arrow 31.0.0", "async-trait", "ceresdbproto", "common_types", @@ -6001,13 +6431,13 @@ dependencies = [ [[package]] name = "thrift" -version = "0.16.0" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09678c4cdbb4eed72e18b7c2af1329c69825ed16fcbac62d083fc3e2b0590ff0" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", "integer-encoding 3.0.4", - "ordered-float 1.1.1", + "ordered-float 2.10.0", ] [[package]] @@ -6306,9 +6736,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" +checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" dependencies = [ "bytes 1.2.1", "futures-core", @@ -6345,7 +6775,7 @@ dependencies = [ "http-body 0.4.5", "hyper 0.14.20", "hyper-timeout", - "percent-encoding 2.1.0", + "percent-encoding 2.2.0", "pin-project", "prost", "prost-derive", @@ -6688,7 +7118,7 @@ dependencies = [ "form_urlencoded", "idna 0.2.3", "matches", - "percent-encoding 2.1.0", + "percent-encoding 2.2.0", ] [[package]] @@ -6718,9 +7148,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.1.2" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" +checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" dependencies = [ "getrandom 0.2.7", ] @@ -6795,7 +7225,7 @@ dependencies = [ "table_kv", "tempfile", "tokio 1.25.0", - "uuid 1.1.2", + "uuid 1.2.2", ] [[package]] @@ -6846,7 +7276,7 @@ dependencies = [ "mime", "mime_guess", "multipart", - "percent-encoding 2.1.0", + "percent-encoding 2.2.0", "pin-project", "rustls-pemfile", "scoped-tls", @@ -7146,6 +7576,15 @@ dependencies = [ "tap", ] +[[package]] +name = "xz2" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2" +dependencies = [ + "lzma-sys", +] + [[package]] name = "yatp" version = "0.0.1" diff --git a/Cargo.toml b/Cargo.toml index ca8d89bba6..6f55deaae2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,8 +51,8 @@ name = "ceresdb-server" path = "src/bin/ceresdb-server.rs" [workspace.dependencies] -arrow = { version = "23.0.0", features = ["prettyprint"] } -arrow_ipc = { version = "23.0.0" } +arrow = { version = "31.0.0", features = ["prettyprint"] } +arrow_ipc = { version = "31.0.0" } arrow_ext = { path = "components/arrow_ext" } analytic_engine = { path = "analytic_engine" } arena = { path = "components/arena" } @@ -69,6 +69,10 @@ cluster = { path = "cluster" } criterion = "0.3" common_types = { path = "common_types" } common_util = { path = "common_util" } +datafusion = "17.0.0" +datafusion-expr = "17.0.0" +datafusion-optimizer = "17.0.0" +datafusion-proto = "17.0.0" df_operator = { path = "df_operator" } env_logger = "0.6" ethbloom = "0.13.0" @@ -81,7 +85,7 @@ interpreters = { path = "interpreters" } meta_client = { path = "meta_client" } object_store = { path = "components/object_store" } parquet_ext = { path = "components/parquet_ext" } -parquet = { version = "23.0.0" } +parquet = { version = "31.0.0" } paste = "1.0" profile = { path = "components/profile" } prometheus = "0.12" @@ -100,6 +104,7 @@ server = { path = "server" } smallvec = "1.6" slog = "2.7" sql = { path = "sql" } +sqlparser = { version = "0.30", features = ["serde"] } system_catalog = { path = "system_catalog" } table_engine = { path = "table_engine" } table_kv = { path = "components/table_kv" } @@ -115,18 +120,6 @@ zstd = { version = "0.12", default-features = false } git = "https://github.com/CeresDB/ceresdbproto.git" rev = "55495dd395d12a1f97f6c98e355290671d107c44" -[workspace.dependencies.datafusion] -git = "https://github.com/CeresDB/arrow-datafusion.git" -rev = "d84ea9c79c9e83ff0b4dadf8880a4983af59ef48" - -[workspace.dependencies.datafusion-expr] -git = "https://github.com/CeresDB/arrow-datafusion.git" -rev = "d84ea9c79c9e83ff0b4dadf8880a4983af59ef48" - -[workspace.dependencies.datafusion-proto] -git = "https://github.com/CeresDB/arrow-datafusion.git" -rev = "d84ea9c79c9e83ff0b4dadf8880a4983af59ef48" - [dependencies] analytic_engine = { workspace = true } catalog = { workspace = true } diff --git a/analytic_engine/Cargo.toml b/analytic_engine/Cargo.toml index 59ab911ca5..a213043a5a 100644 --- a/analytic_engine/Cargo.toml +++ b/analytic_engine/Cargo.toml @@ -24,6 +24,7 @@ bytes = { workspace = true } common_types = { workspace = true } common_util = { workspace = true } datafusion = { workspace = true } +datafusion-expr = { workspace = true } ethbloom = { workspace = true } futures = { workspace = true } lazy_static = { workspace = true } diff --git a/analytic_engine/src/row_iter/record_batch_stream.rs b/analytic_engine/src/row_iter/record_batch_stream.rs index 2b34ce0eab..1d8858d0b7 100644 --- a/analytic_engine/src/row_iter/record_batch_stream.rs +++ b/analytic_engine/src/row_iter/record_batch_stream.rs @@ -17,7 +17,7 @@ use common_util::define_result; use datafusion::{ common::ToDFSchema, error::DataFusionError, - logical_expr::expr_fn, + optimizer::utils::conjunction, physical_expr::{self, execution_props::ExecutionProps}, physical_plan::PhysicalExpr, }; @@ -171,7 +171,7 @@ pub fn filter_stream( input_schema: ArrowSchemaRef, predicate: &Predicate, ) -> Result { - let filter = match expr_fn::combine_filters(predicate.exprs()) { + let filter = match conjunction(predicate.exprs().to_owned()) { Some(filter) => filter, None => return Ok(origin_stream), }; diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index 7ea7a64087..c412dc6800 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -3,7 +3,7 @@ use std::{convert::TryFrom, sync::Arc}; use arrow::{ - array::{Array, ArrayData, ArrayRef}, + array::{make_array, Array, ArrayData, ArrayRef}, buffer::MutableBuffer, compute, record_batch::RecordBatch as ArrowRecordBatch, @@ -557,7 +557,7 @@ impl HybridRecordDecoder { .map_err(|e| Box::new(e) as _) .context(DecodeRecordBatch)?; - Ok(array_data.into()) + Ok(make_array(array_data)) } /// Like `stretch_variable_length_column`, but array value is fixed-size @@ -601,7 +601,7 @@ impl HybridRecordDecoder { .map_err(|e| Box::new(e) as _) .context(DecodeRecordBatch)?; - Ok(array_data.into()) + Ok(make_array(array_data)) } /// Decode offset slices into Vec @@ -645,7 +645,7 @@ impl RecordDecoder for HybridRecordDecoder { // are collapsed by hybrid storage format, to differentiate // List column in original records DataType::List(_nested_field) => { - Ok(array_ref.data().child_data()[0].clone().into()) + Ok(make_array(array_ref.data().child_data()[0].clone())) } _ => { let datum_kind = DatumKind::from_data_type(data_type).unwrap(); diff --git a/analytic_engine/src/sst/parquet/hybrid.rs b/analytic_engine/src/sst/parquet/hybrid.rs index 985635ca57..f3922d5704 100644 --- a/analytic_engine/src/sst/parquet/hybrid.rs +++ b/analytic_engine/src/sst/parquet/hybrid.rs @@ -8,7 +8,7 @@ use arrow::{ UInt64Array, }, bitmap::Bitmap, - buffer::{Buffer, MutableBuffer}, + buffer::MutableBuffer, datatypes::Schema as ArrowSchema, record_batch::RecordBatch as ArrowRecordBatch, util::bit_util, @@ -153,7 +153,7 @@ trait VariableSizeArray { // Returns the length for the element at index i. fn value_length(&self, index: usize) -> i32; // Returns a clone of the value data buffer. - fn value_data(&self) -> Buffer; + fn value_data(&self) -> &[u8]; } macro_rules! impl_offsets { @@ -167,7 +167,7 @@ macro_rules! impl_offsets { self.0.value_length(index) } - fn value_data(&self) -> Buffer { + fn value_data(&self) -> &[u8] { self.0.value_data() } } @@ -404,9 +404,8 @@ impl ListArrayBuilder { inner_offsets.push(inner_length_so_far); } - inner_values.extend_from_slice( - &array.value_data().as_slice()[start as usize..end as usize], - ); + inner_values + .extend_from_slice(&array.value_data()[start as usize..end as usize]); } } // The data in the arrays belong to the same tsid, so the offsets is the total diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index ce80cac0bb..8c5ea728bb 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -6,7 +6,8 @@ use std::{collections::HashMap, fmt}; use async_trait::async_trait; use common_types::{row::Row, schema::Schema, time::TimeRange}; -use datafusion::logical_plan::{Column, Expr}; +use datafusion::common::Column; +use datafusion_expr::Expr; use futures::TryStreamExt; use snafu::{ensure, OptionExt, ResultExt}; use table_engine::{ diff --git a/common_types/Cargo.toml b/common_types/Cargo.toml index ce37eaf92b..6df34d6c49 100644 --- a/common_types/Cargo.toml +++ b/common_types/Cargo.toml @@ -30,5 +30,4 @@ serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } snafu = { workspace = true } -# TODO(yingwen): Make sqlparser support a feature -sqlparser = { version = "0.23.0", features = ["serde"] } +sqlparser = { workspace = true } diff --git a/common_types/src/column_schema.rs b/common_types/src/column_schema.rs index 6625acaec0..bc51e342d8 100644 --- a/common_types/src/column_schema.rs +++ b/common_types/src/column_schema.rs @@ -2,7 +2,7 @@ //! Schema of column -use std::{collections::BTreeMap, convert::TryFrom, str::FromStr}; +use std::{collections::HashMap, convert::TryFrom, str::FromStr}; use arrow::datatypes::{DataType, Field}; use proto::common as common_pb; @@ -281,11 +281,7 @@ impl TryFrom<&Field> for ColumnSchema { id, is_tag, comment, - } = field - .metadata() - .map(decode_arrow_field_meta_data) - .transpose()? - .unwrap_or_default(); + } = decode_arrow_field_meta_data(field.metadata())?; Ok(Self { id, name: field.name().clone(), @@ -311,14 +307,14 @@ impl From<&ColumnSchema> for Field { col_schema.data_type.into(), col_schema.is_nullable, ); - field.set_metadata(Some(metadata)); + field.set_metadata(metadata); field } } fn parse_arrow_field_meta_value( - meta: &BTreeMap, + meta: &HashMap, key: ArrowFieldMetaKey, ) -> Result where @@ -333,16 +329,20 @@ where .context(InvalidArrowFieldMetaValue { key, raw_value }) } -fn decode_arrow_field_meta_data(meta: &BTreeMap) -> Result { - Ok(ArrowFieldMeta { - id: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Id)?, - is_tag: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::IsTag)?, - comment: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Comment)?, - }) +fn decode_arrow_field_meta_data(meta: &HashMap) -> Result { + if meta.is_empty() { + Ok(ArrowFieldMeta::default()) + } else { + Ok(ArrowFieldMeta { + id: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Id)?, + is_tag: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::IsTag)?, + comment: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Comment)?, + }) + } } -fn encode_arrow_field_meta_data(col_schema: &ColumnSchema) -> BTreeMap { - let mut meta = BTreeMap::new(); +fn encode_arrow_field_meta_data(col_schema: &ColumnSchema) -> HashMap { + let mut meta = HashMap::new(); meta.insert(ArrowFieldMetaKey::Id.to_string(), col_schema.id.to_string()); meta.insert( diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index 32b669616d..7efe9f9aa4 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -205,7 +205,7 @@ impl TryFrom<&SqlDataType> for DatumKind { fn try_from(sql_type: &SqlDataType) -> Result { match sql_type { // TODO(yingwen): Consider timezone - SqlDataType::Timestamp => Ok(Self::Timestamp), + SqlDataType::Timestamp(_, _) => Ok(Self::Timestamp), SqlDataType::Real | SqlDataType::Float(_) => Ok(Self::Float), SqlDataType::Double => Ok(Self::Double), SqlDataType::Boolean => Ok(Self::Boolean), @@ -213,7 +213,7 @@ impl TryFrom<&SqlDataType> for DatumKind { SqlDataType::Int(_) => Ok(Self::Int32), SqlDataType::SmallInt(_) => Ok(Self::Int16), SqlDataType::String => Ok(Self::String), - SqlDataType::Custom(objects) if objects.0.len() == 1 => { + SqlDataType::Custom(objects, _) if objects.0.len() == 1 => { match objects.0[0].value.as_str() { "UINT64" | "uint64" => Ok(Self::UInt64), "UINT32" | "uint32" => Ok(Self::UInt32), @@ -538,7 +538,7 @@ impl Datum { pub fn display_string(&self) -> String { match self { Datum::Null => "null".to_string(), - Datum::Timestamp(v) => Local.timestamp_millis(v.as_i64()).to_rfc3339(), + Datum::Timestamp(v) => Local.timestamp_millis_opt(v.as_i64()).unwrap().to_rfc3339(), Datum::Double(v) => v.to_string(), Datum::Float(v) => v.to_string(), Datum::Varbinary(v) => format!("{:?}", v), @@ -887,7 +887,9 @@ pub mod arrow_convert { ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => v .as_ref() .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))), - ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => v + ScalarValue::Binary(v) + | ScalarValue::FixedSizeBinary(_, v) + | ScalarValue::LargeBinary(v) => v .as_ref() .map(|v| Datum::Varbinary(Bytes::copy_from_slice(v.as_slice()))), ScalarValue::TimestampMillisecond(v, _) => { @@ -896,7 +898,10 @@ pub mod arrow_convert { ScalarValue::List(_, _) | ScalarValue::Date32(_) | ScalarValue::Date64(_) - | ScalarValue::Time64(_) + | ScalarValue::Time32Second(_) + | ScalarValue::Time32Millisecond(_) + | ScalarValue::Time64Microsecond(_) + | ScalarValue::Time64Nanosecond(_) | ScalarValue::TimestampSecond(_, _) | ScalarValue::TimestampMicrosecond(_, _) | ScalarValue::TimestampNanosecond(_, _) @@ -928,7 +933,9 @@ pub mod arrow_convert { ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { v.as_ref().map(|v| DatumView::String(v.as_str())) } - ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => { + ScalarValue::Binary(v) + | ScalarValue::FixedSizeBinary(_, v) + | ScalarValue::LargeBinary(v) => { v.as_ref().map(|v| DatumView::Varbinary(v.as_slice())) } ScalarValue::TimestampMillisecond(v, _) => { @@ -937,7 +944,10 @@ pub mod arrow_convert { ScalarValue::List(_, _) | ScalarValue::Date32(_) | ScalarValue::Date64(_) - | ScalarValue::Time64(_) + | ScalarValue::Time32Second(_) + | ScalarValue::Time32Millisecond(_) + | ScalarValue::Time64Microsecond(_) + | ScalarValue::Time64Nanosecond(_) | ScalarValue::TimestampSecond(_, _) | ScalarValue::TimestampMicrosecond(_, _) | ScalarValue::TimestampNanosecond(_, _) diff --git a/components/arrow_ext/src/display.rs b/components/arrow_ext/src/display.rs index bc7c1e739a..a0945d829e 100644 --- a/components/arrow_ext/src/display.rs +++ b/components/arrow_ext/src/display.rs @@ -259,7 +259,7 @@ pub fn make_string_from_decimal( ) -> Result { let array = column .as_any() - .downcast_ref::>() + .downcast_ref::>() .unwrap(); let formatted_decimal = array.value_as_string(row); @@ -389,7 +389,7 @@ pub fn array_value_to_string(column: &array::ArrayRef, row: usize) -> Result Vec { let mut messages = Vec::with_capacity(cnt); - let base_ts = Utc.timestamp_millis(1337); + let base_ts = Utc.timestamp_millis_opt(1337).unwrap(); for i in 0..cnt { let key = format!("test_key_{}", i); let val = format!("test_val_{}", i); diff --git a/components/object_store/Cargo.toml b/components/object_store/Cargo.toml index 7736444a77..0d794ca22e 100644 --- a/components/object_store/Cargo.toml +++ b/components/object_store/Cargo.toml @@ -31,7 +31,7 @@ serde_derive = { workspace = true } serde_json = { workspace = true } snafu = { workspace = true } tokio = { workspace = true } -upstream = { package = "object_store", version = "0.5.1" } +upstream = { package = "object_store", version = "0.5.3" } [dev-dependencies] tempfile = { workspace = true } diff --git a/components/parquet_ext/src/prune/equal.rs b/components/parquet_ext/src/prune/equal.rs index 4b3c258ecf..b10fe5fa7a 100644 --- a/components/parquet_ext/src/prune/equal.rs +++ b/components/parquet_ext/src/prune/equal.rs @@ -1,8 +1,8 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. use arrow::datatypes::SchemaRef; -use datafusion::{logical_plan::Column, scalar::ScalarValue}; -use datafusion_expr::{Expr, Operator}; +use datafusion::{common::Column, scalar::ScalarValue}; +use datafusion_expr::{self, Expr, Operator}; const MAX_ELEMS_IN_LIST_FOR_FILTER: usize = 100; @@ -149,7 +149,7 @@ fn normalize_predicate_expression(expr: &Expr) -> NormalizedExpr { let unhandled = NormalizedExpr::True; match expr { - Expr::BinaryExpr { left, op, right } => match op { + Expr::BinaryExpr(datafusion_expr::BinaryExpr { left, op, right }) => match op { Operator::And => { let left = normalize_predicate_expression(left); let right = normalize_predicate_expression(right); diff --git a/components/parquet_ext/src/prune/min_max.rs b/components/parquet_ext/src/prune/min_max.rs index 60ab4e2de2..ddf34d627d 100644 --- a/components/parquet_ext/src/prune/min_max.rs +++ b/components/parquet_ext/src/prune/min_max.rs @@ -248,10 +248,10 @@ mod test { let testcases = vec![ // (expr, min, max, schema, expected) ( - col("a").eq(lit(5i32)), // a == 5 + col("a").eq(lit(5i64)), // a == 5 10, 20, - vec![("a", ArrowDataType::Int32)], + vec![("a", ArrowDataType::Int64)], vec![], ), ( @@ -273,7 +273,7 @@ mod test { col("a").in_list(vec![lit(17i64), lit(100i64)], false), // a in (17, 100) 101, 200, - vec![("a", ArrowDataType::Int32)], + vec![("a", ArrowDataType::Int64)], vec![], ), ]; diff --git a/df_operator/src/aggregate.rs b/df_operator/src/aggregate.rs index 245810c7b1..e8526a9fb1 100644 --- a/df_operator/src/aggregate.rs +++ b/df_operator/src/aggregate.rs @@ -11,7 +11,6 @@ use datafusion::{ physical_plan::Accumulator as DfAccumulator, scalar::ScalarValue as DfScalarValue, }; -use datafusion_expr::AggregateState as DfAggregateState; use snafu::Snafu; use crate::functions::{ScalarValue, ScalarValueRef}; @@ -35,8 +34,9 @@ define_result!(Error); pub struct State(Vec); impl State { - fn into_df_aggregate_states(self) -> Vec { - self.0.into_iter().map(DfAggregateState::Scalar).collect() + /// Convert to a set of ScalarValues + fn into_state(self) -> Vec { + self.0 } } @@ -112,11 +112,11 @@ impl ToDfAccumulator { } impl DfAccumulator for ToDfAccumulator { - fn state(&self) -> DfResult> { + fn state(&self) -> DfResult> { let state = self.accumulator.state().map_err(|e| { DataFusionError::Execution(format!("Accumulator failed to get state, err:{}", e)) })?; - Ok(state.into_df_aggregate_states()) + Ok(state.into_state()) } fn update_batch(&mut self, values: &[DfArrayRef]) -> DfResult<()> { @@ -160,4 +160,8 @@ impl DfAccumulator for ToDfAccumulator { Ok(value.into_df_scalar_value()) } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } } diff --git a/df_operator/src/udfs/time_bucket.rs b/df_operator/src/udfs/time_bucket.rs index 40e428ec5a..d2d8ba102c 100644 --- a/df_operator/src/udfs/time_bucket.rs +++ b/df_operator/src/udfs/time_bucket.rs @@ -265,25 +265,25 @@ impl Period { } fn truncate_day(ts: Timestamp, period: u16) -> Option { - let offset = FixedOffset::east(DEFAULT_TIMEZONE_OFFSET_SECS); + let offset = FixedOffset::east_opt(DEFAULT_TIMEZONE_OFFSET_SECS).expect("won't panic"); // Convert to local time. - let datetime = offset.timestamp_millis(ts.as_i64()); + let datetime = offset.timestamp_millis_opt(ts.as_i64()).unwrap(); // Truncate day let day = datetime.day(); let day = day - (day % u32::from(period)); let truncated_datetime = offset - .ymd(datetime.year(), datetime.month(), day) - .and_hms(0, 0, 0); + .with_ymd_and_hms(datetime.year(), datetime.month(), day, 0, 0, 0) + .unwrap(); let truncated_ts = truncated_datetime.timestamp_millis(); Some(Timestamp::new(truncated_ts)) } fn truncate_week(ts: Timestamp) -> Timestamp { - let offset = FixedOffset::east(DEFAULT_TIMEZONE_OFFSET_SECS); + let offset = FixedOffset::east_opt(DEFAULT_TIMEZONE_OFFSET_SECS).expect("won't panic"); // Convert to local time. - let datetime = offset.timestamp_millis(ts.as_i64()); + let datetime = offset.timestamp_millis_opt(ts.as_i64()).unwrap(); // Truncate week. let week_offset = datetime.weekday().num_days_from_monday(); @@ -297,26 +297,28 @@ impl Period { } fn truncate_month(ts: Timestamp) -> Timestamp { - let offset = FixedOffset::east(DEFAULT_TIMEZONE_OFFSET_SECS); + let offset = FixedOffset::east_opt(DEFAULT_TIMEZONE_OFFSET_SECS).expect("won't panic"); // Convert to local time. - let datetime = offset.timestamp_millis(ts.as_i64()); + let datetime = offset.timestamp_millis_opt(ts.as_i64()).unwrap(); // Truncate month let truncated_datetime = offset - .ymd(datetime.year(), datetime.month(), 1) - .and_hms(0, 0, 0); + .with_ymd_and_hms(datetime.year(), datetime.month(), 1, 0, 0, 0) + .unwrap(); let truncated_ts = truncated_datetime.timestamp_millis(); Timestamp::new(truncated_ts) } fn truncate_year(ts: Timestamp) -> Timestamp { - let offset = FixedOffset::east(DEFAULT_TIMEZONE_OFFSET_SECS); + let offset = FixedOffset::east_opt(DEFAULT_TIMEZONE_OFFSET_SECS).expect("won't panic"); // Convert to local time. - let datetime = offset.timestamp_millis(ts.as_i64()); + let datetime = offset.timestamp_millis_opt(ts.as_i64()).unwrap(); // Truncate year - let truncated_datetime = offset.ymd(datetime.year(), 1, 1).and_hms(0, 0, 0); + let truncated_datetime = offset + .with_ymd_and_hms(datetime.year(), 1, 1, 0, 0, 0) + .unwrap(); let truncated_ts = truncated_datetime.timestamp_millis(); Timestamp::new(truncated_ts) diff --git a/integration_tests/cases/local/00_dummy/select_1.result b/integration_tests/cases/local/00_dummy/select_1.result index 681ff41187..52be9a215e 100644 --- a/integration_tests/cases/local/00_dummy/select_1.result +++ b/integration_tests/cases/local/00_dummy/select_1.result @@ -6,7 +6,7 @@ Int64(1), SELECT x; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: SELECT x;. Caused by: Failed to create plan, err:Failed to generate datafusion plan, err:Schema error: No field named 'x'. Valid fields are ." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: SELECT x;. Caused by: Failed to create plan, err:Failed to generate datafusion plan, err:Schema error: No field named 'x'." }) SELECT 'a'; diff --git a/integration_tests/cases/local/02_function/thetasketch_distinct.result b/integration_tests/cases/local/02_function/thetasketch_distinct.result index 3ff5f72f81..28de6ff5e2 100644 --- a/integration_tests/cases/local/02_function/thetasketch_distinct.result +++ b/integration_tests/cases/local/02_function/thetasketch_distinct.result @@ -440,7 +440,7 @@ ORDER BY arch,thetasketch_distinct(02_function_thetasketch_distinct_table.value), String("x86"),UInt64(115), -String("arm"),UInt64(114), +String("arm"),UInt64(117), DROP TABLE `02_function_thetasketch_distinct_table`; diff --git a/integration_tests/cases/local/03_dml/issue-59.result b/integration_tests/cases/local/03_dml/issue-59.result index 4a9e68759c..50daf26248 100644 --- a/integration_tests/cases/local/03_dml/issue-59.result +++ b/integration_tests/cases/local/03_dml/issue-59.result @@ -24,7 +24,7 @@ FROM issue59 GROUP BY id+1; plan_type,plan, -String("logical_plan"),String("Projection: #issue59.id + Int64(1), #COUNT(DISTINCT issue59.account)\n Projection: #group_alias_0 AS issue59.id + Int64(1), #COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[#group_alias_0]], aggr=[[COUNT(#alias1)]]\n Aggregate: groupBy=[[#issue59.id + Int64(1) AS group_alias_0, #issue59.account AS alias1]], aggr=[[]]\n TableScan: issue59 projection=[id, account]"), -String("physical_plan"),String("ProjectionExec: expr=[issue59.id + Int64(1)@0 as issue59.id + Int64(1), COUNT(DISTINCT issue59.account)@1 as COUNT(DISTINCT issue59.account)]\n ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }], 8)\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }, Column { name: \"alias1\", index: 1 }], 8)\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8, order=None, \n"), +String("logical_plan"),String("Projection: issue59.id + Int64(1), COUNT(DISTINCT issue59.account)\n Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n TableScan: issue59 projection=[id, account]"), +String("physical_plan"),String("ProjectionExec: expr=[issue59.id + Int64(1)@0 as issue59.id + Int64(1), COUNT(DISTINCT issue59.account)@1 as COUNT(DISTINCT issue59.account)]\n ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }, Column { name: \"alias1\", index: 1 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8, order=None, \n"), diff --git a/integration_tests/cases/local/03_dml/select_having.result b/integration_tests/cases/local/03_dml/select_having.result index a58b464bfd..aad0c1097f 100644 --- a/integration_tests/cases/local/03_dml/select_having.result +++ b/integration_tests/cases/local/03_dml/select_having.result @@ -24,31 +24,6 @@ VALUES affected_rows: 6 -SELECT - `timestamp`, - `value` -FROM - `03_dml_select_having_table1` - having `value` > 500 -ORDER BY - `value` ASC; - -timestamp,value, -Timestamp(2),Int32(1002), -Timestamp(5),Int32(4405), -Timestamp(4),Int32(30004), - - -SELECT - `timestamp`, - `value` -FROM - `03_dml_select_having_table1` -HAVING - 1 = 2; - -affected_rows: 0 - SELECT `value` % 3, MAX(`value`) AS max diff --git a/integration_tests/cases/local/03_dml/select_having.sql b/integration_tests/cases/local/03_dml/select_having.sql index 573cbb9fc1..bb0590b89d 100644 --- a/integration_tests/cases/local/03_dml/select_having.sql +++ b/integration_tests/cases/local/03_dml/select_having.sql @@ -20,25 +20,6 @@ VALUES (6, 406); -SELECT - `timestamp`, - `value` -FROM - `03_dml_select_having_table1` - having `value` > 500 -ORDER BY - `value` ASC; - - -SELECT - `timestamp`, - `value` -FROM - `03_dml_select_having_table1` -HAVING - 1 = 2; - - SELECT `value` % 3, MAX(`value`) AS max diff --git a/integration_tests/cases/local/04_explain/explain.result b/integration_tests/cases/local/04_explain/explain.result index 993b0092bf..7a49cdf8d5 100644 --- a/integration_tests/cases/local/04_explain/explain.result +++ b/integration_tests/cases/local/04_explain/explain.result @@ -9,7 +9,7 @@ affected_rows: 0 EXPLAIN SELECT t FROM `04_explain_t`; plan_type,plan, -String("logical_plan"),String("Projection: #04_explain_t.t\n TableScan: 04_explain_t projection=[t]"), +String("logical_plan"),String("Projection: 04_explain_t.t\n TableScan: 04_explain_t projection=[t]"), String("physical_plan"),String("ProjectionExec: expr=[t@0 as t]\n ScanTable: table=04_explain_t, parallelism=8, order=None, \n"), diff --git a/integration_tests/cases/local/07_optimizer/optimizer.result b/integration_tests/cases/local/07_optimizer/optimizer.result index 140d856ebe..81a1c18054 100644 --- a/integration_tests/cases/local/07_optimizer/optimizer.result +++ b/integration_tests/cases/local/07_optimizer/optimizer.result @@ -9,8 +9,8 @@ affected_rows: 0 EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `07_optimizer_t` GROUP BY name; plan_type,plan, -String("logical_plan"),String("Projection: #MAX(07_optimizer_t.value) AS c1, #AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[#07_optimizer_t.name]], aggr=[[MAX(#07_optimizer_t.value), AVG(#07_optimizer_t.value)]]\n TableScan: 07_optimizer_t projection=[name, value]"), -String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 8)\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ScanTable: table=07_optimizer_t, parallelism=8, order=None, \n"), +String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n TableScan: 07_optimizer_t projection=[name, value]"), +String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ScanTable: table=07_optimizer_t, parallelism=8, order=None, \n"), DROP TABLE `07_optimizer_t`; diff --git a/interpreters/Cargo.toml b/interpreters/Cargo.toml index 43e5ada775..fca00b79f2 100644 --- a/interpreters/Cargo.toml +++ b/interpreters/Cargo.toml @@ -19,6 +19,7 @@ common_types = { workspace = true } common_util = { workspace = true } datafusion = { workspace = true } datafusion-expr = { workspace = true } +datafusion-optimizer = { workspace = true } datafusion-proto = { workspace = true } df_operator = { workspace = true } log = { workspace = true } diff --git a/interpreters/src/insert.rs b/interpreters/src/insert.rs index 72837840f7..1b1c384910 100644 --- a/interpreters/src/insert.rs +++ b/interpreters/src/insert.rs @@ -19,15 +19,15 @@ use common_types::{ }; use common_util::codec::{compact::MemCompactEncoder, Encoder}; use datafusion::{ + common::ToDFSchema, error::DataFusionError, logical_expr::ColumnarValue as DfColumnarValue, - logical_plan::ToDFSchema, - optimizer::simplify_expressions::ConstEvaluator, physical_expr::{ create_physical_expr, execution_props::ExecutionProps, expressions::TryCastExpr, }, }; -use datafusion_expr::{expr::Expr as DfLogicalExpr, expr_rewriter::ExprRewritable}; +use datafusion_expr::expr::Expr as DfLogicalExpr; +use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; use df_operator::visitor::find_columns_by_expr; use snafu::{OptionExt, ResultExt, Snafu}; use sql::plan::InsertPlan; @@ -206,19 +206,27 @@ fn fill_default_values( ) -> Result<()> { let mut cached_column_values: HashMap = HashMap::new(); let table_arrow_schema = table.schema().to_arrow_schema_ref(); + let df_schema_ref = table_arrow_schema + .clone() + .to_dfschema_ref() + .context(DatafusionSchema)?; for (column_idx, default_value_expr) in default_value_map.iter() { - // Optimize logical expr let execution_props = ExecutionProps::default(); - let mut const_optimizer = - ConstEvaluator::try_new(&execution_props).context(DatafusionExpr)?; - let evaluated_expr = default_value_expr - .clone() - .rewrite(&mut const_optimizer) + + // Optimize logical expr + let simplifier = ExprSimplifier::new( + SimplifyContext::new(&execution_props).with_schema(df_schema_ref.clone()), + ); + let default_value_expr = simplifier + .coerce(default_value_expr.clone(), df_schema_ref.clone()) + .context(DatafusionExpr)?; + let simplified_expr = simplifier + .simplify(default_value_expr) .context(DatafusionExpr)?; // Find input columns - let required_column_idxes = find_columns_by_expr(&evaluated_expr) + let required_column_idxes = find_columns_by_expr(&simplified_expr) .iter() .map(|column_name| { table @@ -236,9 +244,8 @@ fn fill_default_values( .context(DatafusionSchema)?; // Create physical expr - let execution_props = ExecutionProps::default(); let physical_expr = create_physical_expr( - &evaluated_expr, + &simplified_expr, &input_df_schema, &input_arrow_schema, &execution_props, diff --git a/interpreters/src/show_create.rs b/interpreters/src/show_create.rs index 98302bfd33..cfaed7cfdc 100644 --- a/interpreters/src/show_create.rs +++ b/interpreters/src/show_create.rs @@ -159,9 +159,6 @@ impl ShowCreateInterpreter { } } - // TODO: update datafusion to remove `#`. - // Refer to https://github.com/apache/arrow-datafusion/commit/d72eb9a1c4c18bcabbf941541a9c1defa83a592c. - res.remove_matches("#"); res } diff --git a/interpreters/src/tests.rs b/interpreters/src/tests.rs index 732a7148aa..ef12347d1d 100644 --- a/interpreters/src/tests.rs +++ b/interpreters/src/tests.rs @@ -138,8 +138,8 @@ where "+------------+---------------------+--------+--------+", "| key1 | key2 | field1 | field2 |", "+------------+---------------------+--------+--------+", - "| 7461676b | 2021-12-02 07:00:34 | 100 | hello3 |", - "| 7461676b32 | 2021-12-02 07:00:34 | 100 | hello3 |", + "| 7461676b | 2021-12-02T07:00:34 | 100 | hello3 |", + "| 7461676b32 | 2021-12-02T07:00:34 | 100 | hello3 |", "+------------+---------------------+--------+--------+", ]; common_util::record_batch::assert_record_batches_eq(&expected, records); @@ -254,8 +254,8 @@ where "+------------+---------------------+--------+--------+--------+--------+--------+", "| key1 | key2 | field1 | field2 | field3 | field4 | field5 |", "+------------+---------------------+--------+--------+--------+--------+--------+", - "| 7461676b | 2021-12-02 07:00:34 | 10 | 20 | 3 | 1 | 3 |", - "| 7461676b32 | 2021-12-02 07:00:34 | 10 | 20 | 3 | 10 | 12 |", + "| 7461676b | 2021-12-02T07:00:34 | 10 | 20 | 3 | 1 | 3 |", + "| 7461676b32 | 2021-12-02T07:00:34 | 10 | 20 | 3 | 10 | 12 |", "+------------+---------------------+--------+--------+--------+--------+--------+", ]; common_util::record_batch::assert_record_batches_eq(&expected, records); @@ -334,6 +334,7 @@ where #[tokio::test] async fn test_interpreters_rocks() { + common_util::tests::init_log_for_test(); let rocksdb_ctx = RocksDBEngineContext::default(); test_interpreters(rocksdb_ctx).await; } @@ -362,8 +363,6 @@ async fn test_interpreters(engine_context: T) { env.test_show_create_table().await; env.test_alter_table().await; env.test_drop_table().await; - env.test_insert_table_with_missing_columns().await; - env.test_enable_partition_table_access().await; } diff --git a/query_engine/src/context.rs b/query_engine/src/context.rs index c4f681ea40..52cb4e19fa 100644 --- a/query_engine/src/context.rs +++ b/query_engine/src/context.rs @@ -9,15 +9,14 @@ use datafusion::{ execution::context::default_session_builder, optimizer::{ common_subexpr_eliminate::CommonSubexprEliminate, eliminate_limit::EliminateLimit, - filter_push_down::FilterPushDown, limit_push_down::LimitPushDown, optimizer::OptimizerRule, - projection_push_down::ProjectionPushDown, simplify_expressions::SimplifyExpressions, - single_distinct_to_groupby::SingleDistinctToGroupBy, + optimizer::OptimizerRule, push_down_filter::PushDownFilter, push_down_limit::PushDownLimit, + push_down_projection::PushDownProjection, simplify_expressions::SimplifyExpressions, + single_distinct_to_groupby::SingleDistinctToGroupBy, type_coercion::TypeCoercion, }, physical_optimizer::optimizer::PhysicalOptimizerRule, prelude::{SessionConfig, SessionContext}, - scalar::ScalarValue, }; -use table_engine::provider::{CERESDB_REQUEST_ID, CERESDB_REQUEST_TIMEOUT}; +use table_engine::provider::CeresdbOptions; use crate::{ config::Config, @@ -47,23 +46,28 @@ impl Context { ) -> SessionContext { let timeout = deadline.map(|deadline| deadline.duration_since(Instant::now()).as_millis() as u64); - let df_session_config = SessionConfig::new() + let ceresdb_options = CeresdbOptions { + request_id: request_id.as_u64(), + request_timeout: timeout, + }; + let mut df_session_config = SessionConfig::new() .with_default_catalog_and_schema( self.default_catalog.clone(), self.default_schema.clone(), ) - .set_u64(CERESDB_REQUEST_ID, request_id.as_u64()) - .set(CERESDB_REQUEST_TIMEOUT, ScalarValue::UInt64(timeout)) .with_target_partitions(config.read_parallelism); + df_session_config + .config_options_mut() + .extensions + .insert(ceresdb_options); let logical_optimize_rules = Self::logical_optimize_rules(); - let mut state = default_session_builder(df_session_config) + let state = default_session_builder(df_session_config) .with_query_planner(Arc::new(QueryPlannerAdapter)) .with_optimizer_rules(logical_optimize_rules); let physical_optimizer = - Self::apply_adapters_for_physical_optimize_rules(&state.physical_optimizers); - state.physical_optimizers = physical_optimizer; - SessionContext::with_state(state) + Self::apply_adapters_for_physical_optimize_rules(state.physical_optimizers()); + SessionContext::with_state(state.with_physical_optimizer_rules(physical_optimizer)) } fn apply_adapters_for_physical_optimize_rules( @@ -84,9 +88,10 @@ impl Context { Arc::new(SimplifyExpressions::new()), Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), - Arc::new(ProjectionPushDown::new()), - Arc::new(FilterPushDown::new()), - Arc::new(LimitPushDown::new()), + Arc::new(PushDownProjection::new()), + Arc::new(PushDownFilter::new()), + Arc::new(PushDownLimit::new()), + Arc::new(TypeCoercion::new()), Arc::new(SingleDistinctToGroupBy::new()), ]; diff --git a/query_engine/src/df_execution_extension/prom_align.rs b/query_engine/src/df_execution_extension/prom_align.rs index a8b3565796..84ba36a497 100644 --- a/query_engine/src/df_execution_extension/prom_align.rs +++ b/query_engine/src/df_execution_extension/prom_align.rs @@ -59,7 +59,7 @@ define_result!(Error); /// Limits Extrapolation range. /// Refer to https://github.com/prometheus/prometheus/pull/1295 -const PROMTHEUS_EXTRAPOLATION_THRESHOLD_COEFFICIENT: f64 = 1.1; +const PROMETHEUS_EXTRAPOLATION_THRESHOLD_COEFFICIENT: f64 = 1.1; #[derive(Debug)] struct ExtractTsidExpr {} @@ -70,6 +70,27 @@ impl fmt::Display for ExtractTsidExpr { } } +impl PartialEq for ExtractTsidExpr { + fn eq(&self, other: &dyn Any) -> bool { + down_cast_any_ref(other).downcast_ref::().is_some() + } +} + +// Copy from https://github.com/apache/arrow-datafusion/blob/71353bb9ad99a0688a9ae36a5cda77a5ab6af00b/datafusion/physical-expr/src/physical_expr.rs#L237 +fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { + if any.is::>() { + any.downcast_ref::>() + .unwrap() + .as_any() + } else if any.is::>() { + any.downcast_ref::>() + .unwrap() + .as_any() + } else { + any + } +} + impl PhysicalExpr for ExtractTsidExpr { fn as_any(&self) -> &dyn Any { self @@ -90,6 +111,17 @@ impl PhysicalExpr for ExtractTsidExpr { .expect("checked in plan build"); Ok(ColumnarValue::Array(batch.column(tsid_idx).clone())) } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> ArrowResult> { + Ok(self) + } } /// Note: caller should ensure data[tail_index] is valid @@ -744,7 +776,7 @@ fn extrapolate_fn_helper( } let extrapolation_threshold = - average_duration_between_data * PROMTHEUS_EXTRAPOLATION_THRESHOLD_COEFFICIENT; + average_duration_between_data * PROMETHEUS_EXTRAPOLATION_THRESHOLD_COEFFICIENT; // if lots of data is absent (`range_to_start` or `range_to_end` is longer than // `extrapolation_threshold`), Prometheus will not estimate all time range. Use diff --git a/query_engine/src/df_planner_extension/mod.rs b/query_engine/src/df_planner_extension/mod.rs index ed5fad8102..4f27687181 100644 --- a/query_engine/src/df_planner_extension/mod.rs +++ b/query_engine/src/df_planner_extension/mod.rs @@ -6,12 +6,12 @@ use std::sync::Arc; use datafusion::{ execution::context::{QueryPlanner, SessionState}, - logical_plan::LogicalPlan, physical_plan::{ planner::{DefaultPhysicalPlanner, ExtensionPlanner}, ExecutionPlan, PhysicalPlanner, }, }; +use datafusion_expr::logical_plan::LogicalPlan; pub mod prom_align; pub mod table_scan_by_primary_key; diff --git a/query_engine/src/df_planner_extension/prom_align.rs b/query_engine/src/df_planner_extension/prom_align.rs index 6505cf7125..26276075b0 100644 --- a/query_engine/src/df_planner_extension/prom_align.rs +++ b/query_engine/src/df_planner_extension/prom_align.rs @@ -6,9 +6,9 @@ use async_trait::async_trait; use datafusion::{ error::DataFusionError, execution::context::SessionState, - logical_plan::{LogicalPlan, UserDefinedLogicalNode}, physical_plan::{planner::ExtensionPlanner, ExecutionPlan, PhysicalPlanner}, }; +use datafusion_expr::logical_plan::{LogicalPlan, UserDefinedLogicalNode}; use snafu::Snafu; use sql::promql::PromAlignNode; diff --git a/query_engine/src/df_planner_extension/table_scan_by_primary_key.rs b/query_engine/src/df_planner_extension/table_scan_by_primary_key.rs index 8b2192d22a..2ccdba8f06 100644 --- a/query_engine/src/df_planner_extension/table_scan_by_primary_key.rs +++ b/query_engine/src/df_planner_extension/table_scan_by_primary_key.rs @@ -104,7 +104,7 @@ impl TableScanByPrimaryKey { table_provider .scan_table( session_state, - projection, + projection.as_ref(), &filters, *fetch, ReadOrder::from_is_asc(Some(self.asc)), diff --git a/query_engine/src/logical_optimizer/mod.rs b/query_engine/src/logical_optimizer/mod.rs index 8c7f13c83a..e77d2fe0e5 100644 --- a/query_engine/src/logical_optimizer/mod.rs +++ b/query_engine/src/logical_optimizer/mod.rs @@ -51,7 +51,11 @@ impl LogicalOptimizer for LogicalOptimizerImpl { mut df_plan, tables, } = plan; - df_plan = self.ctx.optimize(&df_plan).context(DataFusionOptimize)?; + df_plan = self + .ctx + .state() + .optimize(&df_plan) + .context(DataFusionOptimize)?; Ok(QueryPlan { df_plan, tables }) } diff --git a/query_engine/src/logical_optimizer/order_by_primary_key.rs b/query_engine/src/logical_optimizer/order_by_primary_key.rs index f556be8f50..241e016427 100644 --- a/query_engine/src/logical_optimizer/order_by_primary_key.rs +++ b/query_engine/src/logical_optimizer/order_by_primary_key.rs @@ -4,12 +4,13 @@ use std::{convert::TryFrom, sync::Arc}; use common_types::schema::Schema; use datafusion::{ - logical_plan::{ - plan::{Extension, Filter, Projection, Sort}, - DFSchemaRef, Expr, Limit, LogicalPlan, TableScan, - }, + common::DFSchemaRef, optimizer::{optimizer::OptimizerRule, OptimizerConfig}, }; +use datafusion_expr::{ + expr::{Expr, Sort as ExprSort}, + logical_plan::{Extension, Filter, Limit, LogicalPlan, Projection, Sort, TableScan}, +}; use log::info; use crate::df_planner_extension::table_scan_by_primary_key::TableScanByPrimaryKey; @@ -38,18 +39,20 @@ impl OrderByPrimaryKeyRule { if let LogicalPlan::Sort(Sort { expr: sort_exprs, input: projection_plan, + fetch: sort_fetch, }) = sort_plan.as_ref() { if let LogicalPlan::Projection(Projection { expr: projection_exprs, input: scan_or_filter_plan, schema: projection_schema, - alias, + .. }) = projection_plan.as_ref() { let (scan_plan, filter_predicate) = if let LogicalPlan::Filter(Filter { predicate, input: scan_plan, + .. }) = scan_or_filter_plan.as_ref() { (scan_plan, Some(predicate)) @@ -75,9 +78,9 @@ impl OrderByPrimaryKeyRule { projection: projection_exprs.clone(), filter_predicate: filter_predicate.cloned(), schema: projection_schema.clone(), - alias: alias.clone(), scan_plan: scan_plan.clone(), sort_exprs: sort_exprs.clone(), + sort_fetch: *sort_fetch, sort_in_asc_order, skip: *skip, fetch: *fetch, @@ -111,7 +114,7 @@ impl OrderByPrimaryKeyRule { let mut in_asc_order = None; for (sort_expr, key_col) in sort_exprs.iter().zip(sub_key_cols.iter()) { - if let Expr::Sort { expr, asc, .. } = sort_expr { + if let Expr::Sort(ExprSort { expr, asc, .. }) = sort_expr { if let Some(in_asc_order) = in_asc_order.as_mut() { if in_asc_order != asc { return None; @@ -160,20 +163,21 @@ impl OrderByPrimaryKeyRule { })); let filter_plan = if let Some(predicate) = rewrite_ctx.filter_predicate { - Arc::new(LogicalPlan::Filter(Filter { - predicate, - input: order_by_primary_key_scan, - })) + Arc::new(LogicalPlan::Filter( + Filter::try_new(predicate, order_by_primary_key_scan).unwrap(), + )) } else { order_by_primary_key_scan }; - let new_project_plan = Arc::new(LogicalPlan::Projection(Projection { - expr: rewrite_ctx.projection, - input: filter_plan, - schema: rewrite_ctx.schema, - alias: rewrite_ctx.alias, - })); + let new_project_plan = Arc::new(LogicalPlan::Projection( + Projection::try_new_with_schema( + rewrite_ctx.projection, + filter_plan, + rewrite_ctx.schema, + ) + .unwrap(), + )); let new_limit_plan = Arc::new(LogicalPlan::Limit(Limit { skip: rewrite_ctx.skip, @@ -184,7 +188,9 @@ impl OrderByPrimaryKeyRule { let new_sort_plan = Arc::new(LogicalPlan::Sort(Sort { expr: rewrite_ctx.sort_exprs, input: new_limit_plan, + fetch: rewrite_ctx.sort_fetch, })); + LogicalPlan::Limit(Limit { skip: rewrite_ctx.skip, fetch: rewrite_ctx.fetch, @@ -194,20 +200,20 @@ impl OrderByPrimaryKeyRule { } impl OptimizerRule for OrderByPrimaryKeyRule { - fn optimize( + fn try_optimize( &self, plan: &LogicalPlan, - _optimizer_config: &mut OptimizerConfig, - ) -> datafusion::error::Result { + _optimizer_config: &dyn OptimizerConfig, + ) -> datafusion::error::Result> { match self.do_optimize(plan)? { Some(new_plan) => { info!( "optimize plan by OrderByPrimaryKeyRule, original plan:\n{:?}\n optimized plan:\n{:?}", plan, new_plan ); - Ok(new_plan) + Ok(Some(new_plan)) } - None => Ok(plan.clone()), + None => Ok(Some(plan.clone())), } } @@ -216,13 +222,14 @@ impl OptimizerRule for OrderByPrimaryKeyRule { } } +#[derive(Debug)] struct RewriteContext { projection: Vec, filter_predicate: Option, schema: DFSchemaRef, - alias: Option, scan_plan: Arc, sort_exprs: Vec, + sort_fetch: Option, sort_in_asc_order: bool, skip: usize, fetch: Option, @@ -231,7 +238,8 @@ struct RewriteContext { #[cfg(test)] mod tests { use common_types::{column_schema, datum::DatumKind, schema}; - use datafusion::{logical_plan::Column, scalar::ScalarValue}; + use datafusion::{common::Column, scalar::ScalarValue}; + use datafusion_expr::expr::Sort as ExprSort; use super::*; use crate::logical_optimizer::tests::LogicalPlanNodeBuilder; @@ -290,11 +298,11 @@ mod tests { fn build_sort_expr(sort_col: &str, asc: bool) -> Expr { let col_expr = Expr::Column(Column::from(sort_col)); - Expr::Sort { + Expr::Sort(ExprSort { expr: Box::new(col_expr), asc, nulls_first: false, - } + }) } fn build_primary_key_sort_exprs(schema: &Schema, asc: bool) -> Vec { @@ -369,7 +377,7 @@ mod tests { #[test] fn test_optimize_applied_with_filter() { let schema = build_optimized_schema(); - let filter_expr = Expr::Literal(ScalarValue::Int8(None)); + let filter_expr = Expr::Literal(ScalarValue::Boolean(None)); let sort_in_asc_order = false; let sort_exprs = build_primary_key_sort_exprs(&schema, sort_in_asc_order); diff --git a/query_engine/src/logical_optimizer/tests.rs b/query_engine/src/logical_optimizer/tests.rs index 2966f462cc..ece6fcfa7b 100644 --- a/query_engine/src/logical_optimizer/tests.rs +++ b/query_engine/src/logical_optimizer/tests.rs @@ -8,15 +8,15 @@ use arrow::datatypes::SchemaRef; use async_trait::async_trait; use common_types::schema::Schema; use datafusion::{ + common::{DFSchemaRef, ToDFSchema}, datasource::TableProvider, execution::context::SessionState, - logical_plan::{ - plan::{Extension, Filter, Projection, Sort}, - DFSchemaRef, Expr, Limit, LogicalPlan, TableScan, ToDFSchema, - }, physical_plan::ExecutionPlan, }; -use datafusion_expr::{TableSource, TableType}; +use datafusion_expr::{ + Expr, Extension, Filter, Limit, LogicalPlan, Projection, Sort, TableScan, TableSource, + TableType, +}; use crate::df_planner_extension::table_scan_by_primary_key::TableScanByPrimaryKey; @@ -49,7 +49,7 @@ impl TableProvider for MockTableProvider { async fn scan( &self, _state: &SessionState, - _projection: &Option>, + _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, ) -> datafusion::error::Result> { @@ -89,10 +89,7 @@ impl LogicalPlanNodeBuilder { } pub fn filter(mut self, predicate: Expr) -> Self { - let plan = LogicalPlan::Filter(Filter { - predicate, - input: self.take_plan(), - }); + let plan = LogicalPlan::Filter(Filter::try_new(predicate, self.take_plan()).unwrap()); self.plan = Some(Arc::new(plan)); @@ -100,12 +97,8 @@ impl LogicalPlanNodeBuilder { } pub fn projection(mut self, proj_exprs: Vec) -> Self { - let plan = LogicalPlan::Projection(Projection { - expr: proj_exprs, - input: self.take_plan(), - schema: self.df_schema_ref(), - alias: None, - }); + let plan = + LogicalPlan::Projection(Projection::try_new(proj_exprs, self.take_plan()).unwrap()); self.plan = Some(Arc::new(plan)); @@ -128,6 +121,7 @@ impl LogicalPlanNodeBuilder { let plan = LogicalPlan::Sort(Sort { expr: sort_exprs, input: self.take_plan(), + fetch: None, }); self.plan = Some(Arc::new(plan)); diff --git a/query_engine/src/logical_optimizer/type_conversion.rs b/query_engine/src/logical_optimizer/type_conversion.rs index d4be958031..3dc33b4bf4 100644 --- a/query_engine/src/logical_optimizer/type_conversion.rs +++ b/query_engine/src/logical_optimizer/type_conversion.rs @@ -5,15 +5,17 @@ use std::{mem, sync::Arc}; use arrow::{compute, compute::kernels::cast_utils::string_to_timestamp_nanos}; use datafusion::{ arrow::datatypes::DataType, + common::DFSchemaRef, error::{DataFusionError, Result}, - logical_plan::{ - plan::Filter, DFSchemaRef, Expr, ExprRewritable, ExprRewriter, LogicalPlan, Operator, - TableScan, - }, optimizer::{optimizer::OptimizerRule, OptimizerConfig}, scalar::ScalarValue, }; -use datafusion_expr::{utils, ExprSchemable}; +use datafusion_expr::{ + expr::Expr, + expr_rewriter::{ExprRewritable, ExprRewriter}, + logical_plan::{Filter, LogicalPlan, TableScan}, + utils, Between, BinaryExpr, ExprSchemable, Operator, +}; use log::debug; /// Optimizer that cast literal value to target column's type @@ -27,20 +29,28 @@ use log::debug; pub struct TypeConversion; impl OptimizerRule for TypeConversion { - fn optimize( + fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, - ) -> Result { + optimizer_config: &dyn OptimizerConfig, + ) -> Result> { let mut rewriter = TypeRewriter { schemas: plan.all_schemas(), }; match plan { - LogicalPlan::Filter(Filter { predicate, input }) => Ok(LogicalPlan::Filter(Filter { - predicate: predicate.clone().rewrite(&mut rewriter)?, - input: Arc::new(self.optimize(input, optimizer_config)?), - })), + LogicalPlan::Filter(Filter { + predicate, input, .. + }) => { + let predicate = predicate.clone().rewrite(&mut rewriter)?; + let input = self + .try_optimize(input, optimizer_config)? + .unwrap_or_else(|| input.as_ref().clone()); + Ok(Some(LogicalPlan::Filter(Filter::try_new( + predicate, + Arc::new(input), + )?))) + } LogicalPlan::TableScan(TableScan { table_name, source, @@ -54,14 +64,14 @@ impl OptimizerRule for TypeConversion { .into_iter() .map(|e| e.rewrite(&mut rewriter)) .collect::>>()?; - Ok(LogicalPlan::TableScan(TableScan { + Ok(Some(LogicalPlan::TableScan(TableScan { table_name: table_name.clone(), source: source.clone(), projection: projection.clone(), projected_schema: projected_schema.clone(), filters: rewrite_filters, fetch: *fetch, - })) + }))) } LogicalPlan::Projection { .. } | LogicalPlan::Window { .. } @@ -80,11 +90,18 @@ impl OptimizerRule for TypeConversion { | LogicalPlan::DropView { .. } | LogicalPlan::Values { .. } | LogicalPlan::Analyze { .. } - | LogicalPlan::Distinct { .. } => { + | LogicalPlan::Distinct { .. } + | LogicalPlan::SetVariable { .. } + | LogicalPlan::Prepare { .. } + | LogicalPlan::DescribeTable { .. } + | LogicalPlan::Dml { .. } => { let inputs = plan.inputs(); let new_inputs = inputs .iter() - .map(|plan| self.optimize(plan, optimizer_config)) + .map(|plan| { + self.try_optimize(plan, optimizer_config) + .map(|v| v.unwrap_or_else(|| (*plan).clone())) + }) .collect::>>()?; let expr = plan @@ -93,15 +110,14 @@ impl OptimizerRule for TypeConversion { .map(|e| e.rewrite(&mut rewriter)) .collect::>>()?; - utils::from_plan(plan, &expr, &new_inputs) + Ok(Some(utils::from_plan(plan, &expr, &new_inputs)?)) } - LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) | LogicalPlan::CreateView(_) | LogicalPlan::CreateCatalogSchema(_) | LogicalPlan::CreateCatalog(_) - | LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()), + | LogicalPlan::EmptyRelation { .. } => Ok(Some(plan.clone())), } } @@ -195,7 +211,7 @@ impl<'a> TypeRewriter<'a> { impl<'a> ExprRewriter for TypeRewriter<'a> { fn mutate(&mut self, expr: Expr) -> Result { let new_expr = match expr { - Expr::BinaryExpr { left, op, right } => match op { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { Operator::Eq | Operator::NotEq | Operator::Lt @@ -203,28 +219,28 @@ impl<'a> ExprRewriter for TypeRewriter<'a> { | Operator::Gt | Operator::GtEq => { let (left, right) = self.convert_type(&left, &right)?; - Expr::BinaryExpr { + Expr::BinaryExpr(BinaryExpr { left: Box::new(left), op, right: Box::new(right), - } + }) } - _ => Expr::BinaryExpr { left, op, right }, + _ => Expr::BinaryExpr(BinaryExpr { left, op, right }), }, - Expr::Between { + Expr::Between(Between { expr, negated, low, high, - } => { + }) => { let (expr, low) = self.convert_type(&expr, &low)?; let (expr, high) = self.convert_type(&expr, &high)?; - Expr::Between { + Expr::Between(Between { expr: Box::new(expr), negated, low: Box::new(low), high: Box::new(high), - } + }) } Expr::InList { expr, @@ -299,7 +315,7 @@ mod tests { use arrow::datatypes::TimeUnit; use datafusion::{ - logical_plan::{DFField, DFSchema}, + common::{DFField, DFSchema}, prelude::col, }; @@ -482,16 +498,16 @@ mod tests { // Timestamp c6 between "2021-09-07 16:00:00" and "2021-09-07 17:00:00" let date_string2 = "2021-09-07 17:00:00".to_string(); - let exp = Expr::Between { + let exp = Expr::Between(Between { expr: Box::new(col("c6")), negated: false, low: Box::new(Expr::Literal(ScalarValue::Utf8(Some(date_string.clone())))), high: Box::new(Expr::Literal(ScalarValue::Utf8(Some(date_string2.clone())))), - }; + }); let rewrite_exp = exp.rewrite(&mut rewriter).unwrap(); assert_eq!( rewrite_exp, - Expr::Between { + Expr::Between(Between { expr: Box::new(col("c6")), negated: false, low: Box::new(Expr::Literal(ScalarValue::TimestampMillisecond( @@ -510,7 +526,7 @@ mod tests { ), None ),)) - } + }) ); } } diff --git a/query_engine/src/physical_optimizer/coalesce_batches.rs b/query_engine/src/physical_optimizer/coalesce_batches.rs index a62bc20866..027fc5104f 100644 --- a/query_engine/src/physical_optimizer/coalesce_batches.rs +++ b/query_engine/src/physical_optimizer/coalesce_batches.rs @@ -3,10 +3,9 @@ use std::sync::Arc; use datafusion::{ - config::OPT_BATCH_SIZE, + config::ConfigOptions, physical_optimizer::{coalesce_batches::CoalesceBatches, optimizer::PhysicalOptimizerRule}, physical_plan::{limit::GlobalLimitExec, ExecutionPlan}, - prelude::SessionConfig, }; use crate::physical_optimizer::{Adapter, OptimizeRuleRef}; @@ -30,7 +29,7 @@ impl CoalesceBatchesAdapter { /// `batch_size`). fn detect_small_limit_plan(plan: &dyn ExecutionPlan, batch_size: usize) -> bool { if let Some(limit_plan) = plan.as_any().downcast_ref::() { - return limit_plan.skip() + limit_plan.fetch().copied().unwrap_or(0) < batch_size; + return limit_plan.skip() + limit_plan.fetch().unwrap_or(0) < batch_size; } for child_plan in plan.children() { @@ -48,12 +47,9 @@ impl PhysicalOptimizerRule for CoalesceBatchesAdapter { fn optimize( &self, plan: Arc, - config: &SessionConfig, + config: &ConfigOptions, ) -> datafusion::error::Result> { - if Self::detect_small_limit_plan( - &*plan, - config.config_options.get_u64(OPT_BATCH_SIZE) as usize, - ) { + if Self::detect_small_limit_plan(&*plan, config.execution.batch_size) { Ok(plan) } else { self.original_rule.optimize(plan, config) @@ -63,4 +59,8 @@ impl PhysicalOptimizerRule for CoalesceBatchesAdapter { fn name(&self) -> &str { "custom_coalesce_batches" } + + fn schema_check(&self) -> bool { + true + } } diff --git a/query_engine/src/physical_optimizer/mod.rs b/query_engine/src/physical_optimizer/mod.rs index 2296150b8c..469dcdc6bd 100644 --- a/query_engine/src/physical_optimizer/mod.rs +++ b/query_engine/src/physical_optimizer/mod.rs @@ -61,6 +61,7 @@ impl PhysicalOptimizer for PhysicalOptimizerImpl { async fn optimize(&mut self, logical_plan: QueryPlan) -> Result { let exec_plan = self .ctx + .state() .create_physical_plan(&logical_plan.df_plan) .await .context(DataFusionOptimize)?; diff --git a/query_engine/src/physical_optimizer/repartition.rs b/query_engine/src/physical_optimizer/repartition.rs index c25838791a..92bdebf00c 100644 --- a/query_engine/src/physical_optimizer/repartition.rs +++ b/query_engine/src/physical_optimizer/repartition.rs @@ -5,9 +5,9 @@ use std::sync::Arc; use datafusion::{ + config::ConfigOptions, physical_optimizer::{optimizer::PhysicalOptimizerRule, repartition::Repartition}, physical_plan::ExecutionPlan, - prelude::SessionConfig, }; use log::debug; @@ -31,7 +31,7 @@ impl PhysicalOptimizerRule for RepartitionAdapter { fn optimize( &self, plan: Arc, - config: &SessionConfig, + config: &ConfigOptions, ) -> datafusion::error::Result> { // the underlying plan maybe requires the order of the output. if plan.output_partitioning().partition_count() == 1 { @@ -48,4 +48,8 @@ impl PhysicalOptimizerRule for RepartitionAdapter { fn name(&self) -> &str { "custom-repartition" } + + fn schema_check(&self) -> bool { + true + } } diff --git a/server/Cargo.toml b/server/Cargo.toml index 1d7b641beb..c35605af99 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -22,6 +22,7 @@ cluster = { workspace = true } common_types = { workspace = true } common_util = { workspace = true } datafusion = { workspace = true } +datafusion-expr = { workspace = true } df_operator = { workspace = true } futures = { workspace = true } http = "0.2" diff --git a/server/src/limiter.rs b/server/src/limiter.rs index 81f998251e..98d25960ca 100644 --- a/server/src/limiter.rs +++ b/server/src/limiter.rs @@ -2,7 +2,8 @@ use std::{collections::HashSet, sync::RwLock}; -use datafusion::{catalog::TableReference, logical_plan::LogicalPlan}; +use datafusion::catalog::TableReference; +use datafusion_expr::logical_plan::LogicalPlan; use serde::Serialize; use serde_derive::Deserialize; use snafu::{Backtrace, Snafu}; diff --git a/sql/Cargo.toml b/sql/Cargo.toml index 90ef914ede..59ab5566aa 100644 --- a/sql/Cargo.toml +++ b/sql/Cargo.toml @@ -16,6 +16,7 @@ test = [] [dependencies] # In alphabetical order arrow = { workspace = true } +async-trait = { workspace = true } catalog = { workspace = true } ceresdbproto = { workspace = true } common_types = { workspace = true } @@ -29,7 +30,7 @@ log = { workspace = true } paste = { workspace = true } regex = "1" snafu = { workspace = true } -sqlparser = "0.23.0" +sqlparser = { workspace = true } table_engine = { workspace = true } [dev-dependencies] diff --git a/sql/src/parser.rs b/sql/src/parser.rs index 5c71e15e14..06c171fc46 100644 --- a/sql/src/parser.rs +++ b/sql/src/parser.rs @@ -111,8 +111,10 @@ impl<'a> Parser<'a> { let mut tokenizer = Tokenizer::new(dialect, sql); let tokens = tokenizer.tokenize()?; + let parser = SqlParser::new(dialect); + Ok(Parser { - parser: SqlParser::new(tokens, dialect), + parser: parser.with_tokens(tokens), }) } @@ -133,7 +135,7 @@ impl<'a> Parser<'a> { break; } if expecting_statement_delimiter { - return parser.expected("end of statement", parser.parser.peek_token()); + return parser.expected("end of statement", parser.parser.peek_token().token); } let statement = parser.parse_statement()?; @@ -153,7 +155,7 @@ impl<'a> Parser<'a> { // Parse a new expression fn parse_statement(&mut self) -> Result { - match self.parser.peek_token() { + match self.parser.peek_token().token { Token::Word(w) => { match w.keyword { Keyword::CREATE => { @@ -202,9 +204,9 @@ impl<'a> Parser<'a> { } pub fn parse_alter(&mut self) -> Result { - let nth1_token = self.parser.peek_token(); - let nth2_token = self.parser.peek_nth_token(2); - let nth3_token = self.parser.peek_nth_token(3); + let nth1_token = self.parser.peek_token().token; + let nth2_token = self.parser.peek_nth_token(2).token; + let nth3_token = self.parser.peek_nth_token(3).token; if let (Token::Word(nth1_word), Token::Word(nth2_word), Token::Word(nth3_word)) = (nth1_token, nth2_token, nth3_token) { @@ -236,18 +238,18 @@ impl<'a> Parser<'a> { } else if self.consume_token("CREATE") { Ok(self.parse_show_create()?) } else { - self.expected("create/tables/databases", self.parser.peek_token()) + self.expected("create/tables/databases", self.parser.peek_token().token) } } fn parse_show_tables(&mut self) -> Result { - let pattern = match self.parser.next_token() { + let pattern = match self.parser.next_token().token { Token::Word(w) => match w.keyword { Keyword::LIKE => Some(self.parser.parse_literal_string()?), - _ => return self.expected("like", self.parser.peek_token()), + _ => return self.expected("like", self.parser.peek_token().token), }, Token::SemiColon | Token::EOF => None, - _ => return self.expected(";", self.parser.peek_token()), + _ => return self.expected(";", self.parser.peek_token().token), }; Ok(Statement::ShowTables(ShowTables { pattern })) } @@ -366,12 +368,12 @@ impl<'a> Parser<'a> { loop { if let Some(constraint) = self.parse_optional_table_constraint()? { constraints.push(constraint); - } else if let Token::Word(_) = self.parser.peek_token() { + } else if let Token::Word(_) = self.parser.peek_token().token { columns.push(self.parse_column_def()?); } else { return self.expected( "column name or constraint definition", - self.parser.peek_token(), + self.parser.peek_token().token, ); } let comma = self.parser.consume_token(&Token::Comma); @@ -381,7 +383,7 @@ impl<'a> Parser<'a> { } else if !comma { return self.expected( "',' or ')' after column definition", - self.parser.peek_token(), + self.parser.peek_token().token, ); } } @@ -400,7 +402,7 @@ impl<'a> Parser<'a> { self.parser.expect_token(&Token::Eq)?; - match self.parser.next_token() { + match self.parser.next_token().token { Token::Word(w) => Ok(w.value), unexpected => self.expected("Engine is missing", unexpected), } @@ -424,7 +426,7 @@ impl<'a> Parser<'a> { } else { return self.expected( "constraint details after CONSTRAINT ", - self.parser.peek_token(), + self.parser.peek_token().token, ); } } else if let Some(option) = self.parse_optional_column_option()? { @@ -448,10 +450,12 @@ impl<'a> Parser<'a> { } else { None }; - match self.parser.next_token() { + match self.parser.next_token().token { Token::Word(w) if w.keyword == Keyword::PRIMARY => { self.parser.expect_keyword(Keyword::KEY)?; - let columns = self.parser.parse_parenthesized_column_list(Mandatory)?; + let columns = self + .parser + .parse_parenthesized_column_list(Mandatory, false)?; Ok(Some(TableConstraint::Unique { name, columns, @@ -460,7 +464,9 @@ impl<'a> Parser<'a> { } Token::Word(w) if w.keyword == Keyword::TIMESTAMP => { self.parser.expect_keyword(Keyword::KEY)?; - let columns = self.parser.parse_parenthesized_column_list(Mandatory)?; + let columns = self + .parser + .parse_parenthesized_column_list(Mandatory, false)?; // TODO(boyan), TableConstraint doesn't support dialect right now // we use unique constraint as TIMESTAMP KEY constraint. Ok(Some(TableConstraint::Unique { @@ -591,7 +597,7 @@ impl<'a> Parser<'a> { let key_columns = self .parser - .parse_parenthesized_column_list(Mandatory) + .parse_parenthesized_column_list(Mandatory, false) .map_err(|e| { ParserError::ParserError(format!("Fail to parse partition key, err:{}", e)) })?; @@ -798,7 +804,7 @@ fn maybe_convert_table_name(object_name: &mut ObjectName) { #[cfg(test)] mod tests { use sqlparser::{ - ast::{ColumnOptionDef, DataType, Ident, ObjectName, Value}, + ast::{ColumnOptionDef, DataType, Ident, ObjectName, TimezoneInfo, Value}, parser::ParserError::ParserError, }; @@ -908,7 +914,7 @@ mod tests { if_not_exists: false, table_name: make_table_name("mytbl"), columns: vec![ - make_column_def("c1", DataType::Timestamp), + make_column_def("c1", DataType::Timestamp(None, TimezoneInfo::None)), make_column_def("c2", DataType::Double), make_column_def("c3", DataType::String), ], @@ -925,7 +931,7 @@ mod tests { if_not_exists: false, table_name: make_table_name("mytbl"), columns: vec![ - make_column_def("c1", DataType::Timestamp), + make_column_def("c1", DataType::Timestamp(None, TimezoneInfo::None)), make_comment_column_def("c2", DataType::Double, "id".to_string()), make_comment_column_def("c3", DataType::String, "name".to_string()), ], diff --git a/sql/src/plan.rs b/sql/src/plan.rs index e7f939ba6c..0ee61dcd61 100644 --- a/sql/src/plan.rs +++ b/sql/src/plan.rs @@ -11,8 +11,9 @@ use std::{ use common_types::{column_schema::ColumnSchema, row::RowGroup, schema::Schema}; use common_util::define_result; -use datafusion::logical_plan::LogicalPlan as DataFusionLogicalPlan; -use datafusion_expr::expr::Expr as DfLogicalExpr; +use datafusion_expr::{ + expr::Expr as DfLogicalExpr, logical_plan::LogicalPlan as DataFusionLogicalPlan, +}; use snafu::Snafu; use table_engine::{partition::PartitionInfo, table::TableRef}; diff --git a/sql/src/planner.rs b/sql/src/planner.rs index cd5be673c0..d68098608e 100644 --- a/sql/src/planner.rs +++ b/sql/src/planner.rs @@ -24,10 +24,10 @@ use common_types::{ use datafusion::{ common::{DFField, DFSchema}, error::DataFusionError, + optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}, physical_expr::{create_physical_expr, execution_props::ExecutionProps}, - sql::planner::SqlToRel, + sql::planner::{PlannerContext, SqlToRel}, }; -use hashbrown::HashMap as NoStdHashMap; use log::{debug, trace}; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use sqlparser::ast::{ @@ -670,7 +670,11 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { // This column in schema is not in insert stmt if let Some(expr) = &column.default_value { let expr = df_planner - .sql_to_rex(expr.clone(), &df_schema, &mut NoStdHashMap::new()) + .sql_to_expr( + expr.clone(), + &df_schema, + &mut PlannerContext::new(), + ) .context(DatafusionExpr)?; default_value_map.insert(idx, expr); @@ -794,7 +798,7 @@ fn parse_data_value_from_expr(data_type: DatumKind, expr: &mut Expr) -> Result false, _ => InsertExprNotValue { source_expr: Expr::UnaryOp { - op: op.clone(), + op: *op, expr: child_expr.clone(), }, } @@ -825,10 +829,12 @@ fn build_row_group( ) -> Result { // Build row group by schema match *source.body { - SetExpr::Values(Values(values)) => { - let mut row_group_builder = - RowGroupBuilder::with_capacity(schema.clone(), values.len()); - for mut exprs in values { + SetExpr::Values(Values { + explicit_row: _, + rows, + }) => { + let mut row_group_builder = RowGroupBuilder::with_capacity(schema.clone(), rows.len()); + for mut exprs in rows { // Try to build row let mut row_builder = row_group_builder.row_builder(); @@ -915,19 +921,18 @@ fn parse_options(options: Vec) -> Result> { pub fn parse_for_option(value: Value) -> Result> { let value_opt = match value { Value::Number(n, _long) => Some(n), - Value::SingleQuotedString(v) | Value::DoubleQuotedString(v) => Some(v), + Value::SingleQuotedString(v) | Value::DoubleQuotedString(v) | Value::UnQuotedString(v) => { + Some(v) + } Value::NationalStringLiteral(v) | Value::HexStringLiteral(v) => { return UnsupportedOption { value: v }.fail(); } Value::Boolean(v) => Some(v.to_string()), - Value::Interval { value, .. } => { - return UnsupportedOption { - value: value.to_string(), - } - .fail(); - } // Ignore this option if value is null. - Value::Null | Value::Placeholder(_) | Value::EscapedStringLiteral(_) => None, + Value::Null + | Value::Placeholder(_) + | Value::EscapedStringLiteral(_) + | Value::DollarQuotedString(_) => None, }; Ok(value_opt) @@ -996,11 +1001,23 @@ fn ensure_column_default_value_valid<'a, P: MetaProvider>( for column_def in columns.iter() { if let Some(expr) = &column_def.default_value { let df_logical_expr = df_planner - .sql_to_rex(expr.clone(), &df_schema, &mut NoStdHashMap::new()) + .sql_to_expr(expr.clone(), &df_schema, &mut PlannerContext::new()) .context(DatafusionExpr)?; // Create physical expr let execution_props = ExecutionProps::default(); + let df_schema_ref = Arc::new(df_schema.clone()); + let simplifier = ExprSimplifier::new( + SimplifyContext::new(&execution_props).with_schema(df_schema_ref.clone()), + ); + let df_logical_expr = simplifier + .coerce(df_logical_expr, df_schema_ref.clone()) + .context(DatafusionExpr)?; + + let df_logical_expr = simplifier + .simplify(df_logical_expr) + .context(DatafusionExpr)?; + let physical_expr = create_physical_expr( &df_logical_expr, &df_schema, @@ -1041,7 +1058,7 @@ fn ensure_column_default_value_valid<'a, P: MetaProvider>( #[cfg(test)] mod tests { - use sqlparser::ast::{Ident, Value}; + use sqlparser::ast::Value; use super::*; use crate::{ @@ -1089,22 +1106,8 @@ mod tests { true, None, ), - (Value::HexStringLiteral(test_string.clone()), true, None), + (Value::HexStringLiteral(test_string), true, None), (Value::Boolean(true), false, Some("true".to_string())), - ( - Value::Interval { - value: Box::new(Expr::Identifier(Ident { - value: test_string, - quote_style: None, - })), - leading_field: None, - leading_precision: None, - last_field: None, - fractional_seconds_precision: None, - }, - true, - None, - ), (Value::Null, false, None), ]; @@ -1278,12 +1281,16 @@ mod tests { assert!(quick_test(sql, "").is_err()); let sql = "select * from test_table;"; - quick_test(sql, "Query( + quick_test( + sql, + "Query( QueryPlan { - df_plan: Projection: #test_table.key1, #test_table.key2, #test_table.field1, #test_table.field2 + df_plan: Projection: test_table.key1, test_table.key2, test_table.field1, test_table.field2 TableScan: test_table, }, -)").unwrap(); +)", + ) + .unwrap(); } #[test] diff --git a/sql/src/promql/convert.rs b/sql/src/promql/convert.rs index 383ced4e6f..ce600e02f6 100644 --- a/sql/src/promql/convert.rs +++ b/sql/src/promql/convert.rs @@ -11,12 +11,12 @@ use common_types::{ time::{TimeRange, Timestamp}, }; use datafusion::{ - error::DataFusionError, - logical_plan::{ - avg, col, combine_filters, count, lit, max, min, plan::Extension, sum, - Expr as DataFusionExpr, LogicalPlan, LogicalPlanBuilder, - }, - sql::planner::ContextProvider, + error::DataFusionError, optimizer::utils::conjunction, sql::planner::ContextProvider, +}; +use datafusion_expr::{ + avg, col, count, lit, + logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder}, + max, min, sum, Expr as DataFusionExpr, }; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; @@ -331,7 +331,7 @@ impl Expr { }; let aggr_expr = Self::aggr_op_expr(&op, &column_name.field, column_name.field.clone())?; - let tag_exprs = groupby_columns.iter().map(|v| col(v)).collect::>(); + let tag_exprs = groupby_columns.iter().map(|v| col(*v)).collect::>(); let udf_args = tag_exprs.clone(); let mut groupby_expr = vec![col(&column_name.timestamp)]; groupby_expr.extend(udf_args); @@ -617,7 +617,7 @@ impl Selector { filter_exprs.push(timerange_to_expr(query_range, ×tamp_column_name)); let builder = LogicalPlanBuilder::scan(table.clone(), table_provider, None)? - .filter(combine_filters(&filter_exprs).expect("at least one filter(timestamp)"))? + .filter(conjunction(filter_exprs).expect("at least one filter(timestamp)"))? .project(projection)? .sort(default_sort_exprs(×tamp_column_name))?; let column_name = Arc::new(ColumnNames { diff --git a/sql/src/promql/datafusion_util.rs b/sql/src/promql/datafusion_util.rs index 95c6a85b8e..b1a28115a5 100644 --- a/sql/src/promql/datafusion_util.rs +++ b/sql/src/promql/datafusion_util.rs @@ -3,8 +3,9 @@ use std::{any::Any, fmt, sync::Arc}; use common_types::{schema::TSID_COLUMN, time::TimeRange}; -use datafusion::logical_plan::{ - col, lit, DFSchemaRef, Expr as DataFusionExpr, Expr, LogicalPlan, UserDefinedLogicalNode, +use datafusion::common::DFSchemaRef; +use datafusion_expr::{ + col, lit, Between, Expr as DataFusionExpr, Expr, LogicalPlan, UserDefinedLogicalNode, }; use crate::promql::pushdown::{AlignParameter, Func}; @@ -19,12 +20,12 @@ pub struct ColumnNames { /// Translate to `column_name BETWEEN start AND end` expr pub fn timerange_to_expr(query_range: TimeRange, column_name: &str) -> DataFusionExpr { - DataFusionExpr::Between { + DataFusionExpr::Between(Between { expr: Box::new(col(column_name)), negated: false, low: Box::new(lit(query_range.inclusive_start().as_i64())), high: Box::new(lit(query_range.exclusive_end().as_i64() + 1)), - } + }) } pub fn default_sort_exprs(timestamp_column: &str) -> Vec { diff --git a/sql/src/promql/udf.rs b/sql/src/promql/udf.rs index b6c1dc9ca1..31ac1bb657 100644 --- a/sql/src/promql/udf.rs +++ b/sql/src/promql/udf.rs @@ -13,10 +13,9 @@ use common_types::hash::hash64; use common_util::codec::{compact::MemCompactEncoder, Encoder}; use datafusion::{ error::{DataFusionError, Result as DataFusionResult}, - logical_plan::{create_udf, Expr}, physical_plan::{functions::make_scalar_function, udf::ScalarUDF}, }; -use datafusion_expr::Volatility; +use datafusion_expr::{create_udf, Expr, Volatility}; /// The name of the regex_match UDF given to DataFusion. pub const REGEX_MATCH_UDF_NAME: &str = "RegexMatch"; @@ -155,12 +154,8 @@ mod tests { util::pretty::pretty_format_batches, }; use common_types::schema::{ArrowSchema, ArrowSchemaRef, DataType, Field}; - use datafusion::{ - datasource::MemTable, - error::DataFusionError, - logical_plan::{col, Expr as DataFusionExpr}, - prelude::SessionContext, - }; + use datafusion::{datasource::MemTable, error::DataFusionError, prelude::SessionContext}; + use datafusion_expr::{col, Expr}; #[tokio::test] async fn regex_match_expr() { @@ -272,13 +267,13 @@ mod tests { async fn run_plan( schema: ArrowSchemaRef, rb: Vec>, - op: DataFusionExpr, + op: Expr, ) -> Result, DataFusionError> { let provider = MemTable::try_new(Arc::clone(&schema), rb).unwrap(); let ctx = SessionContext::new(); ctx.register_table("t", Arc::new(provider)).unwrap(); - let df = ctx.table("t").unwrap(); + let df = ctx.table("t").await.unwrap(); let df = df.filter(op).unwrap(); // execute the query diff --git a/sql/src/provider.rs b/sql/src/provider.rs index 77d25d0bc9..31d80705c9 100644 --- a/sql/src/provider.rs +++ b/sql/src/provider.rs @@ -4,10 +4,12 @@ use std::{any::Any, cell::RefCell, collections::HashMap, sync::Arc}; +use async_trait::async_trait; use catalog::manager::ManagerRef; use datafusion::{ catalog::{catalog::CatalogProvider, schema::SchemaProvider}, common::DataFusionError, + config::ConfigOptions, datasource::{DefaultTableSource, TableProvider}, physical_plan::{udaf::AggregateUDF, udf::ScalarUDF}, sql::planner::ContextProvider, @@ -147,7 +149,10 @@ pub struct ContextProviderAdapter<'a, P> { err: RefCell>, meta_provider: &'a P, /// Read parallelism for each table. + // TODO: to remove this parameter, use the config read_parallelism: usize, + /// Read config for each table. + config: ConfigOptions, } impl<'a, P: MetaProvider> ContextProviderAdapter<'a, P> { @@ -155,12 +160,14 @@ impl<'a, P: MetaProvider> ContextProviderAdapter<'a, P> { pub fn new(meta_provider: &'a P, read_parallelism: usize) -> Self { let default_catalog = meta_provider.default_catalog_name().to_string(); let default_schema = meta_provider.default_schema_name().to_string(); - + let mut config = ConfigOptions::default(); + config.execution.target_partitions = read_parallelism; Self { table_cache: RefCell::new(TableContainer::new(default_catalog, default_schema)), err: RefCell::new(None), meta_provider, read_parallelism, + config, } } @@ -282,6 +289,10 @@ impl<'a, P: MetaProvider> ContextProvider for ContextProviderAdapter<'a, P> { ) -> Option { None } + + fn options(&self) -> &ConfigOptions { + &self.config + } } struct SchemaProviderAdapter { @@ -291,6 +302,7 @@ struct SchemaProviderAdapter { read_parallelism: usize, } +#[async_trait] impl SchemaProvider for SchemaProviderAdapter { fn as_any(&self) -> &dyn Any { self @@ -307,7 +319,7 @@ impl SchemaProvider for SchemaProviderAdapter { names } - fn table(&self, name: &str) -> Option> { + async fn table(&self, name: &str) -> Option> { let name_ref = TableReference::Full { catalog: &self.catalog, schema: &self.schema, @@ -320,7 +332,7 @@ impl SchemaProvider for SchemaProviderAdapter { } fn table_exist(&self, name: &str) -> bool { - self.table(name).is_some() + self.tables.get(TableReference::parse_str(name)).is_some() } } diff --git a/table_engine/src/partition/rule/df_adapter/extractor.rs b/table_engine/src/partition/rule/df_adapter/extractor.rs index 2407202326..5b497d017c 100644 --- a/table_engine/src/partition/rule/df_adapter/extractor.rs +++ b/table_engine/src/partition/rule/df_adapter/extractor.rs @@ -59,16 +59,18 @@ impl FilterExtractor for KeyExtractor { // (Actually, there is type conversion on high-level, but when converted data // is overflow, it may take no effect). let partition_filter = match filter.clone() { - Expr::BinaryExpr { left, op, right } => match (*left, op, *right) { - (Expr::Column(col), Operator::Eq, Expr::Literal(val)) - | (Expr::Literal(val), Operator::Eq, Expr::Column(col)) => { - let datum_opt = Datum::from_scalar_value(&val); - datum_opt.map(|datum| { - PartitionFilter::new(col.name, PartitionCondition::Eq(datum)) - }) + Expr::BinaryExpr(datafusion_expr::BinaryExpr { left, op, right }) => { + match (*left, op, *right) { + (Expr::Column(col), Operator::Eq, Expr::Literal(val)) + | (Expr::Literal(val), Operator::Eq, Expr::Column(col)) => { + let datum_opt = Datum::from_scalar_value(&val); + datum_opt.map(|datum| { + PartitionFilter::new(col.name, PartitionCondition::Eq(datum)) + }) + } + _ => None, } - _ => None, - }, + } Expr::InList { expr, list, diff --git a/table_engine/src/predicate.rs b/table_engine/src/predicate.rs index f17cf3377a..9b5ece2311 100644 --- a/table_engine/src/predicate.rs +++ b/table_engine/src/predicate.rs @@ -9,10 +9,8 @@ use common_types::{ schema::Schema, time::{TimeRange, Timestamp}, }; -use datafusion::{ - logical_plan::{Expr, Operator}, - scalar::ScalarValue, -}; +use datafusion::scalar::ScalarValue; +use datafusion_expr::{Expr, Operator}; use datafusion_proto::bytes::Serializeable; use log::debug; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; @@ -298,8 +296,6 @@ impl<'a> TimeRangeExtractor<'a> { | Operator::Multiply | Operator::Divide | Operator::Modulo - | Operator::Like - | Operator::NotLike | Operator::IsDistinctFrom | Operator::IsNotDistinctFrom | Operator::RegexMatch @@ -360,15 +356,15 @@ impl<'a> TimeRangeExtractor<'a> { /// how to handle it, returns `TimeRange::zero_to_max()`. fn extract_time_range_from_expr(&self, expr: &Expr) -> TimeRange { match expr { - Expr::BinaryExpr { left, op, right } => { + Expr::BinaryExpr(datafusion_expr::BinaryExpr { left, op, right }) => { self.extract_time_range_from_binary_expr(left, right, op) } - Expr::Between { + Expr::Between(datafusion_expr::Between { expr, negated, low, high, - } => { + }) => { if let Expr::Column(column) = expr.as_ref() { if column.name == self.timestamp_column_name { return Self::time_range_from_between_expr(low, high, *negated); @@ -423,7 +419,8 @@ impl<'a> TimeRangeExtractor<'a> { | Expr::ScalarSubquery(_) | Expr::QualifiedWildcard { .. } | Expr::GroupingSet(_) - | Expr::GetIndexedField { .. } => TimeRange::min_to_max(), + | Expr::GetIndexedField { .. } + | Expr::Placeholder { .. } => TimeRange::min_to_max(), } } } diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index 7945dc314c..dbc94a0a6d 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -13,19 +13,17 @@ use arrow::datatypes::SchemaRef; use async_trait::async_trait; use common_types::{projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema}; use datafusion::{ - config::OPT_BATCH_SIZE, + config::{ConfigEntry, ConfigExtension, ExtensionOptions}, datasource::datasource::{TableProvider, TableProviderFilterPushDown}, error::{DataFusionError, Result}, execution::context::{SessionState, TaskContext}, - logical_plan::Expr, physical_expr::PhysicalSortExpr, physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics, }, - scalar::ScalarValue, }; -use datafusion_expr::{TableSource, TableType}; +use datafusion_expr::{Expr, TableSource, TableType}; use df_operator::visitor; use log::debug; @@ -35,9 +33,71 @@ use crate::{ table::{self, ReadOptions, ReadOrder, ReadRequest, TableRef}, }; -// Config keys set in Datafusion's SessionConfig -pub const CERESDB_REQUEST_TIMEOUT: &str = "ceresdb_request_timeout"; -pub const CERESDB_REQUEST_ID: &str = "ceresdb_request_id"; +#[derive(Clone, Debug)] +pub struct CeresdbOptions { + pub request_id: u64, + pub request_timeout: Option, +} + +impl ConfigExtension for CeresdbOptions { + const PREFIX: &'static str = "ceresdb"; +} + +impl ExtensionOptions for CeresdbOptions { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn cloned(&self) -> Box { + Box::new(self.clone()) + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + match key { + "request_id" => { + self.request_id = value.parse::().map_err(|e| { + DataFusionError::External( + format!("could not parse request_id, input:{}, err:{:?}", value, e).into(), + ) + })? + } + "request_timeout" => { + self.request_timeout = Some(value.parse::().map_err(|e| { + DataFusionError::External( + format!( + "could not parse request_timeout, input:{}, err:{:?}", + value, e + ) + .into(), + ) + })?) + } + _ => Err(DataFusionError::External( + format!("could not find key, key:{}", key).into(), + ))?, + } + Ok(()) + } + + fn entries(&self) -> Vec { + vec![ + ConfigEntry { + key: "request_id".to_string(), + value: Some(self.request_id.to_string()), + description: "", + }, + ConfigEntry { + key: "request_timeout".to_string(), + value: self.request_timeout.map(|v| v.to_string()), + description: "", + }, + ] + } +} /// An adapter to [TableProvider] with schema snapshot. /// @@ -72,16 +132,18 @@ impl TableProviderAdapter { pub async fn scan_table( &self, state: &SessionState, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], limit: Option, read_order: ReadOrder, ) -> Result> { - let request_id = RequestId::from(state.config.config_options().get_u64(CERESDB_REQUEST_ID)); - let deadline = match state.config.config_options().get(CERESDB_REQUEST_TIMEOUT) { - Some(ScalarValue::UInt64(Some(n))) => Some(Instant::now() + Duration::from_millis(n)), - _ => None, - }; + let ceresdb_options = state.config_options().extensions.get::(); + assert!(ceresdb_options.is_some()); + let ceresdb_options = ceresdb_options.unwrap(); + let request_id = RequestId::from(ceresdb_options.request_id); + let deadline = ceresdb_options + .request_timeout + .map(|n| Instant::now() + Duration::from_millis(n)); debug!( "scan table, table:{}, request_id:{}, projection:{:?}, filters:{:?}, limit:{:?}, read_order:{:?}, deadline:{:?}", self.table.name(), @@ -103,13 +165,13 @@ impl TableProviderAdapter { let predicate = self.check_and_build_predicate_from_filters(filters); let mut scan_table = ScanTable { - projected_schema: ProjectedSchema::new(self.read_schema.clone(), projection.clone()) + projected_schema: ProjectedSchema::new(self.read_schema.clone(), projection.cloned()) .map_err(|e| { - DataFusionError::Internal(format!( - "Invalid projection, plan:{:?}, projection:{:?}, err:{:?}", - self, projection, e - )) - })?, + DataFusionError::Internal(format!( + "Invalid projection, plan:{:?}, projection:{:?}, err:{:?}", + self, projection, e + )) + })?, table: self.table.clone(), request_id, read_order, @@ -170,7 +232,7 @@ impl TableProvider for TableProviderAdapter { async fn scan( &self, state: &SessionState, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> Result> { @@ -254,7 +316,7 @@ impl ScanTable { let req = ReadRequest { request_id: self.request_id, opts: ReadOptions { - batch_size: state.config.config_options.get_u64(OPT_BATCH_SIZE) as usize, + batch_size: state.config_options().execution.batch_size, read_parallelism: self.read_parallelism, deadline: self.deadline, }, diff --git a/wal/src/table_kv_impl/encoding.rs b/wal/src/table_kv_impl/encoding.rs index d663ed2fd1..66c128c6d3 100644 --- a/wal/src/table_kv_impl/encoding.rs +++ b/wal/src/table_kv_impl/encoding.rs @@ -4,9 +4,18 @@ use chrono::{TimeZone, Utc}; use common_types::{table::TableId, time::Timestamp}; -use common_util::config::ReadableDuration; +use common_util::{config::ReadableDuration, define_result}; +use snafu::Snafu; use table_kv::{KeyBoundary, ScanRequest}; +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Timestamp is invalid, timestamp:{}", timestamp))] + InValidTimestamp { timestamp: i64 }, +} + +define_result!(Error); + /// Key prefix for namespace in meta table. const META_NAMESPACE_PREFIX: &str = "v1/namespace"; /// Key prefix for bucket in meta table. @@ -41,17 +50,22 @@ pub fn format_timed_bucket_key( namespace: &str, bucket_duration: ReadableDuration, gmt_start_ms: Timestamp, -) -> String { +) -> Result { let duration = bucket_duration.to_string(); - - let dt = Utc.timestamp_millis(gmt_start_ms.as_i64()); - format!( + let dt = match Utc.timestamp_millis_opt(gmt_start_ms.as_i64()).single() { + None => InValidTimestamp { + timestamp: gmt_start_ms.as_i64(), + } + .fail()?, + Some(v) => v, + }; + Ok(format!( "{}/{}/{}/{}", META_BUCKET_PREFIX, namespace, duration, dt.format(BUCKET_TIMESTAMP_FORMAT) - ) + )) } pub fn format_permanent_bucket_key(namespace: &str) -> String { @@ -64,15 +78,24 @@ pub fn format_table_unit_meta_name(namespace: &str, shard_id: usize) -> String { } #[inline] -pub fn format_timed_wal_name(namespace: &str, gmt_start_ms: Timestamp, shard_id: usize) -> String { - let dt = Utc.timestamp_millis(gmt_start_ms.as_i64()); - - format!( +pub fn format_timed_wal_name( + namespace: &str, + gmt_start_ms: Timestamp, + shard_id: usize, +) -> Result { + let dt = match Utc.timestamp_millis_opt(gmt_start_ms.as_i64()).single() { + None => InValidTimestamp { + timestamp: gmt_start_ms.as_i64(), + } + .fail()?, + Some(v) => v, + }; + Ok(format!( "wal_{}_{}_{:0>6}", namespace, dt.format(WAL_SHARD_TIMESTAMP_FORMAT), shard_id - ) + )) } #[inline] @@ -109,7 +132,7 @@ mod tests { let ts = Timestamp::new(1648425600000); let bucket_duration = ReadableDuration(Duration::from_millis(namespace::BUCKET_DURATION_MS as u64)); - let key = format_timed_bucket_key(ns, bucket_duration, ts); + let key = format_timed_bucket_key(ns, bucket_duration, ts).unwrap(); assert_eq!("v1/bucket/aabbcc/1d/2022-03-28T00:00:00", key); let key = format_permanent_bucket_key(ns); @@ -152,19 +175,19 @@ mod tests { fn test_format_timed_wal_name() { let ns = "mywal"; - let name = format_timed_wal_name(ns, Timestamp::ZERO, 0); + let name = format_timed_wal_name(ns, Timestamp::ZERO, 0).unwrap(); assert_eq!("wal_mywal_19700101000000_000000", name); // gmt time 2022-03-28T00:00:00 let ts = Timestamp::new(1648425600000); - let name = format_timed_wal_name(ns, ts, 124); + let name = format_timed_wal_name(ns, ts, 124).unwrap(); assert_eq!("wal_mywal_20220328000000_000124", name); - let name = format_timed_wal_name(ns, ts, 999999); + let name = format_timed_wal_name(ns, ts, 999999).unwrap(); assert_eq!("wal_mywal_20220328000000_999999", name); - let name = format_timed_wal_name(ns, ts, 1234567); + let name = format_timed_wal_name(ns, ts, 1234567).unwrap(); assert_eq!("wal_mywal_20220328000000_1234567", name); } diff --git a/wal/src/table_kv_impl/namespace.rs b/wal/src/table_kv_impl/namespace.rs index 12b4e0ddf0..6adc22b52a 100644 --- a/wal/src/table_kv_impl/namespace.rs +++ b/wal/src/table_kv_impl/namespace.rs @@ -56,7 +56,7 @@ pub enum Error { }, #[snafu(display("Failed to open bucket, namespace:{}, err:{}", namespace, source,))] - OpenBucket { + BucketMeta { namespace: String, source: Box, }, @@ -119,7 +119,7 @@ pub enum Error { ValueNotFound { key: String, backtrace: Backtrace }, #[snafu(display("Failed to build namespace, namespace:{}, err:{}", namespace, source,))] - BuildNamepsace { + BuildNamespace { namespace: String, source: crate::table_kv_impl::model::Error, }, @@ -315,7 +315,7 @@ impl NamespaceInner { .context(LoadBuckets { namespace: self.name(), })?; - let bucket = Bucket::new(self.name(), bucket_entry); + let bucket = Bucket::new(self.name(), bucket_entry)?; // Collect the outdated bucket entries for deletion. if let Some(ttl) = self.entry.wal.ttl { @@ -758,7 +758,7 @@ impl BucketCreator { inner.config.ttl, ); - let bucket = Bucket::new(inner.name(), bucket_entry); + let bucket = Bucket::new(inner.name(), bucket_entry)?; self.create_bucket(inner, bucket) } @@ -783,7 +783,7 @@ impl BucketCreator { bucket: Bucket, ) -> Result { // Insert bucket record into TableKv. - let key = bucket.format_bucket_key(inner.name()); + let key = bucket.format_bucket_key(inner.name())?; let value = bucket.entry.encode().context(Encode { namespace: inner.name(), })?; @@ -837,7 +837,7 @@ impl BucketCreator { let value = get_value(&inner.table_kv, &inner.meta_table_name, key)?; let bucket_entry = BucketEntry::decode(&value).context(Decode { key })?; - let bucket = Bucket::new(inner.name(), bucket_entry); + let bucket = Bucket::new(inner.name(), bucket_entry)?; Ok(bucket) } @@ -1063,7 +1063,7 @@ impl Namespace { ) -> Result> { let mut namespace_entry = config .new_namespace_entry(namespace) - .context(BuildNamepsace { namespace })?; + .context(BuildNamespace { namespace })?; let key = encoding::format_namespace_key(namespace); let value = namespace_entry.encode().context(Encode { namespace })?; @@ -1193,12 +1193,12 @@ impl TableOperator { let table_exists = table_kv .table_exists(table_name) .map_err(|e| Box::new(e) as _) - .context(OpenBucket { namespace })?; + .context(BucketMeta { namespace })?; if !table_exists { table_kv .create_table(table_name) .map_err(|e| Box::new(e) as _) - .context(OpenBucket { namespace })?; + .context(BucketMeta { namespace })?; } Ok(()) @@ -1289,7 +1289,7 @@ pub struct Bucket { } impl Bucket { - fn new(namespace: &str, entry: BucketEntry) -> Self { + fn new(namespace: &str, entry: BucketEntry) -> Result { let mut wal_shard_names = Vec::with_capacity(entry.shard_num); for shard_id in 0..entry.shard_num { @@ -1297,15 +1297,17 @@ impl Bucket { encoding::format_permanent_wal_name(namespace, shard_id) } else { encoding::format_timed_wal_name(namespace, entry.gmt_start_ms(), shard_id) + .map_err(|e| Box::new(e) as _) + .context(BucketMeta { namespace })? }; wal_shard_names.push(table_name); } - Self { + Ok(Self { entry, wal_shard_names, - } + }) } #[inline] @@ -1319,7 +1321,7 @@ impl Bucket { &self.wal_shard_names[index] } - fn format_bucket_key(&self, namespace: &str) -> String { + fn format_bucket_key(&self, namespace: &str) -> Result { match self.entry.bucket_duration() { Some(bucket_duration) => { // Timed bucket. @@ -1328,10 +1330,12 @@ impl Bucket { ReadableDuration(bucket_duration), self.entry.gmt_start_ms(), ) + .map_err(|e| Box::new(e) as _) + .context(BucketMeta { namespace }) } None => { // This is a permanent bucket. - encoding::format_permanent_bucket_key(namespace) + Ok(encoding::format_permanent_bucket_key(namespace)) } } } @@ -1438,7 +1442,7 @@ fn purge_buckets( // All tables of this bucket have been dropped, we can remove the bucket record // later. - let key = bucket.format_bucket_key(namespace); + let key = bucket.format_bucket_key(namespace).map_err(Box::new)?; batch.delete(key.as_bytes()); @@ -1534,7 +1538,7 @@ mod tests { let entry = BucketEntry::new_timed(4, gmt_start_ms, BUCKET_DURATION_MS).unwrap(); assert!(!entry.is_permanent()); - let bucket = Bucket::new("test", entry); + let bucket = Bucket::new("test", entry).unwrap(); assert_eq!(4, bucket.wal_shard_names.len()); let expect_names = [ "wal_test_20220328000000_000000", @@ -1550,7 +1554,7 @@ mod tests { let entry = BucketEntry::new_permanent(4); assert!(entry.is_permanent()); - let bucket = Bucket::new("test", entry); + let bucket = Bucket::new("test", entry).unwrap(); assert_eq!(4, bucket.wal_shard_names.len()); let expect_names = [ "wal_test_permanent_000000", @@ -1564,7 +1568,7 @@ mod tests { #[test] fn test_permanent_bucket_set() { let entry = BucketEntry::new_permanent(4); - let bucket = Arc::new(Bucket::new("test", entry)); + let bucket = Arc::new(Bucket::new("test", entry).unwrap()); let mut bucket_set = BucketSet::new(false); let buckets = bucket_set.buckets(); @@ -1593,7 +1597,7 @@ mod tests { fn new_timed_bucket(ts: Timestamp) -> BucketRef { let entry = BucketEntry::new_timed(1, ts, BUCKET_DURATION_MS).unwrap(); - Arc::new(Bucket::new("test", entry)) + Arc::new(Bucket::new("test", entry).unwrap()) } #[test]