From 94be2a433ee2956512fdb0b568a129a62c5be494 Mon Sep 17 00:00:00 2001 From: Igal Shilman Date: Fri, 24 Jan 2025 11:59:08 +0100 Subject: [PATCH] [sql] Use JSON contrib functions (#2540) This commit adds few common JSON functions as UDFs, to use the latest version, this commit also updates datafusion to 0.44 --- Cargo.lock | 357 +++++++++++++----- Cargo.toml | 9 +- crates/storage-query-datafusion/Cargo.toml | 1 + .../storage-query-datafusion/src/analyzer.rs | 1 + .../storage-query-datafusion/src/context.rs | 20 +- .../src/partition_filter.rs | 11 +- .../src/physical_optimizer.rs | 13 +- .../src/table_providers.rs | 12 +- .../src/table_util.rs | 8 +- 9 files changed, 321 insertions(+), 111 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 597f4a5e59..76f268c011 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1038,6 +1038,19 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "bigdecimal" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f31f3af01c5c65a07985c804d3366560e6fa7883d640a122819b14ec327482c" +dependencies = [ + "autocfg", + "libm", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "bincode" version = "2.0.0-rc.3" @@ -2047,11 +2060,10 @@ dependencies = [ [[package]] name = "datafusion" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dae5f2abc725737d6e87b6d348a5aa2d0a77e4cf873045f004546da946e6e619" +checksum = "014fc8c384ecacedaabb3bc8359c2a6c6e9d8f7bea65be3434eccacfc37f52d9" dependencies = [ - "ahash 0.8.11", "arrow", "arrow-array", "arrow-ipc", @@ -2067,6 +2079,7 @@ dependencies = [ "datafusion-expr", "datafusion-functions", "datafusion-functions-aggregate", + "datafusion-functions-table", "datafusion-functions-window", "datafusion-optimizer", "datafusion-physical-expr", @@ -2076,17 +2089,12 @@ dependencies = [ "datafusion-sql", "futures", "glob", - "half", - "hashbrown 0.14.5", - "indexmap 2.7.0", "itertools 0.13.0", "log", - "num_cpus", "object_store", "parking_lot", - "paste", - "pin-project-lite", "rand", + "regex", "sqlparser", "tempfile", "tokio", @@ -2096,9 +2104,9 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "998761705551f11ffa4ee692cc285b44eb1def6e0d28c4eaf5041b9e2810dc1e" +checksum = "ee60d33e210ef96070377ae667ece7caa0e959c8387496773d4a1a72f1a5012e" dependencies = [ "arrow-schema", "async-trait", @@ -2111,50 +2119,54 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11986f191e88d950f10a5cc512a598afba27d92e04a0201215ad60785005115a" +checksum = "0b42b7d720fe21ed9cca2ebb635f3f13a12cfab786b41e0fba184fb2e620525b" dependencies = [ "ahash 0.8.11", "arrow", "arrow-array", "arrow-buffer", "arrow-schema", - "chrono", "half", "hashbrown 0.14.5", - "instant", + "indexmap 2.7.0", "libc", - "num_cpus", + "log", "object_store", "paste", "sqlparser", "tokio", + "web-time", ] [[package]] name = "datafusion-common-runtime" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "694c9d7ea1b82f95768215c4cb5c2d5c613690624e832a7ee64be563139d582f" +checksum = "72fbf14d4079f7ce5306393084fe5057dddfdc2113577e0049310afa12e94281" dependencies = [ "log", "tokio", ] +[[package]] +name = "datafusion-doc" +version = "44.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c278dbd64860ed0bb5240fc1f4cb6aeea437153910aea69bcf7d5a8d6d0454f3" + [[package]] name = "datafusion-execution" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b4cedcd98151e0a297f34021b6b232ff0ebc0f2f18ea5e7446b5ebda99b1a1" +checksum = "e22cb02af47e756468b3cbfee7a83e3d4f2278d452deb4b033ba933c75169486" dependencies = [ "arrow", - "chrono", "dashmap", "datafusion-common", "datafusion-expr", "futures", - "hashbrown 0.14.5", "log", "object_store", "parking_lot", @@ -2165,42 +2177,40 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8dd114dc0296cacaee98ad3165724529fcca9a65b2875abcd447b9cc02b2b74" +checksum = "62298eadb1d15b525df1315e61a71519ffc563d41d5c3b2a30fda2d70f77b93c" dependencies = [ - "ahash 0.8.11", "arrow", - "arrow-array", - "arrow-buffer", "chrono", "datafusion-common", + "datafusion-doc", "datafusion-expr-common", "datafusion-functions-aggregate-common", + "datafusion-functions-window-common", "datafusion-physical-expr-common", + "indexmap 2.7.0", "paste", "serde_json", "sqlparser", - "strum", - "strum_macros", ] [[package]] name = "datafusion-expr-common" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d1ba2bb018218d9260bbd7de6a46a20f61b93d4911dba8aa07735625004c4fb" +checksum = "dda7f73c5fc349251cd3dcb05773c5bf55d2505a698ef9d38dfc712161ea2f55" dependencies = [ "arrow", "datafusion-common", - "paste", + "itertools 0.13.0", ] [[package]] name = "datafusion-functions" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "547cb780a4ac51fd8e52c0fb9188bc16cea4e35aebf6c454bda0b82a7a417304" +checksum = "fd197f3b2975424d3a4898ea46651be855a46721a56727515dbd5c9e2fb597da" dependencies = [ "arrow", "arrow-buffer", @@ -2209,8 +2219,11 @@ dependencies = [ "blake3", "chrono", "datafusion-common", + "datafusion-doc", "datafusion-execution", "datafusion-expr", + "datafusion-expr-common", + "datafusion-macros", "hashbrown 0.14.5", "hex", "itertools 0.13.0", @@ -2225,136 +2238,182 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e68cf5aa7ebcac08bd04bb709a9a6d4963eafd227da62b628133bc509c40f5a0" +checksum = "aabbe48fba18f9981b134124381bee9e46f93518b8ad2f9721ee296cef5affb9" dependencies = [ "ahash 0.8.11", "arrow", "arrow-schema", "datafusion-common", + "datafusion-doc", "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate-common", + "datafusion-macros", "datafusion-physical-expr", "datafusion-physical-expr-common", "half", "log", "paste", - "sqlparser", ] [[package]] name = "datafusion-functions-aggregate-common" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2285d080dfecdfb8605b0ab2f1a41e2473208dc8e9bd6f5d1dbcfe97f517e6f" +checksum = "d7a3fefed9c8c11268d446d924baca8cabf52fe32f73fdaa20854bac6473590c" dependencies = [ "ahash 0.8.11", "arrow", "datafusion-common", "datafusion-expr-common", "datafusion-physical-expr-common", - "rand", +] + +[[package]] +name = "datafusion-functions-json" +version = "0.44.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6eebc05fb387a5f7d6d1ca0996c1f881ccde3811fa00689b036684dee40f62d" +dependencies = [ + "datafusion", + "jiter", + "log", + "paste", +] + +[[package]] +name = "datafusion-functions-table" +version = "44.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c35c070eb705c12795dab399c3809f4dfbc290678c624d3989490ca9b8449c1" +dependencies = [ + "arrow", + "async-trait", + "datafusion-catalog", + "datafusion-common", + "datafusion-expr", + "datafusion-physical-plan", + "parking_lot", + "paste", ] [[package]] name = "datafusion-functions-window" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e78d30ebd6e9f74d4aeddec32744f5a18b5f9584591bc586fb5259c4848bac5" +checksum = "52229bca26b590b140900752226c829f15fc1a99840e1ca3ce1a9534690b82a8" dependencies = [ "datafusion-common", + "datafusion-doc", "datafusion-expr", + "datafusion-functions-window-common", + "datafusion-macros", + "datafusion-physical-expr", "datafusion-physical-expr-common", "log", + "paste", +] + +[[package]] +name = "datafusion-functions-window-common" +version = "44.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "367befc303b64a668a10ae6988a064a9289e1999e71a7f8e526b6e14d6bdd9d6" +dependencies = [ + "datafusion-common", + "datafusion-physical-expr-common", +] + +[[package]] +name = "datafusion-macros" +version = "44.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5de3c8f386ea991696553afe241a326ecbc3c98a12c562867e4be754d3a060c" +dependencies = [ + "quote", + "syn 2.0.90", ] [[package]] name = "datafusion-optimizer" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be172c44bf344df707e0c041fa3f41e6dc5fb0976f539c68bc442bca150ee58c" +checksum = "53b520413906f755910422b016fb73884ae6e9e1b376de4f9584b6c0e031da75" dependencies = [ "arrow", - "async-trait", "chrono", "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown 0.14.5", "indexmap 2.7.0", "itertools 0.13.0", "log", - "paste", + "regex", "regex-syntax 0.8.5", ] [[package]] name = "datafusion-physical-expr" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43b86b7fa0b8161c49b0f005b0df193fc6d9b65ceec675f155422cda5d1583ca" +checksum = "acd6ddc378f6ad19af95ccd6790dec8f8e1264bc4c70e99ddc1830c1a1c78ccd" dependencies = [ "ahash 0.8.11", "arrow", "arrow-array", "arrow-buffer", - "arrow-ord", "arrow-schema", - "arrow-string", - "base64 0.22.1", - "chrono", "datafusion-common", - "datafusion-execution", "datafusion-expr", "datafusion-expr-common", "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", "half", "hashbrown 0.14.5", - "hex", "indexmap 2.7.0", "itertools 0.13.0", "log", "paste", "petgraph", - "regex", ] [[package]] name = "datafusion-physical-expr-common" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "242ba8a26351d9ca16295814c46743b0d1b00ec372174bdfbba991d0953dd596" +checksum = "06e6c05458eccd74b4c77ed6a1fe63d52434240711de7f6960034794dad1caf5" dependencies = [ "ahash 0.8.11", "arrow", "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", - "rand", + "itertools 0.13.0", ] [[package]] name = "datafusion-physical-optimizer" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25ca088eb904bf1cfc9c5e5653110c70a6eaba43164085a9d180b35b77ce3b8b" +checksum = "9dc3a82190f49c37d377f31317e07ab5d7588b837adadba8ac367baad5dc2351" dependencies = [ - "arrow-schema", + "arrow", "datafusion-common", "datafusion-execution", + "datafusion-expr-common", "datafusion-physical-expr", "datafusion-physical-plan", "itertools 0.13.0", + "log", ] [[package]] name = "datafusion-physical-plan" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4989a53b824abc759685eb643f4d604c2fc2fea4e2c309ac3473bea263ecbbeb" +checksum = "6a6608bc9844b4ddb5ed4e687d173e6c88700b1d0482f43894617d18a1fe75da" dependencies = [ "ahash 0.8.11", "arrow", @@ -2368,8 +2427,7 @@ dependencies = [ "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", - "datafusion-functions-aggregate", - "datafusion-functions-aggregate-common", + "datafusion-functions-window-common", "datafusion-physical-expr", "datafusion-physical-expr-common", "futures", @@ -2378,28 +2436,27 @@ dependencies = [ "indexmap 2.7.0", "itertools 0.13.0", "log", - "once_cell", "parking_lot", "pin-project-lite", - "rand", "tokio", ] [[package]] name = "datafusion-sql" -version = "42.2.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66b9b75b9da10ed656073ac0553708f17eb8fa5a7b065ef9848914c93150ab9e" +checksum = "6a884061c79b33d0c8e84a6f4f4be8bdc12c0f53f5af28ddf5d6d95ac0b15fdc" dependencies = [ "arrow", "arrow-array", "arrow-schema", + "bigdecimal", "datafusion-common", "datafusion-expr", + "indexmap 2.7.0", "log", "regex", "sqlparser", - "strum", ] [[package]] @@ -3771,9 +3828,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" dependencies = [ "cfg-if", - "js-sys", - "wasm-bindgen", - "web-sys", ] [[package]] @@ -3887,6 +3941,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "jiter" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8243cf2d026264056bfacf305e54f5bee8866fd46b4c1873adcaebf614a0d306" +dependencies = [ + "ahash 0.8.11", + "bitvec", + "lexical-parse-float 0.8.5", + "num-bigint", + "num-traits", + "pyo3", + "smallvec", +] + [[package]] name = "jobserver" version = "0.1.32" @@ -4031,21 +4100,42 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b765c31809609075565a70b4b71402281283aeda7ecaf4818ac14a7b2ade8958" dependencies = [ - "lexical-parse-float", - "lexical-parse-integer", - "lexical-util", + "lexical-parse-float 1.0.5", + "lexical-parse-integer 1.0.5", + "lexical-util 1.0.6", "lexical-write-float", "lexical-write-integer", ] +[[package]] +name = "lexical-parse-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" +dependencies = [ + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", + "static_assertions", +] + [[package]] name = "lexical-parse-float" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de6f9cb01fb0b08060209a057c048fcbab8717b4c1ecd2eac66ebfe39a65b0f2" dependencies = [ - "lexical-parse-integer", - "lexical-util", + "lexical-parse-integer 1.0.5", + "lexical-util 1.0.6", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" +dependencies = [ + "lexical-util 0.8.5", "static_assertions", ] @@ -4055,7 +4145,16 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72207aae22fc0a121ba7b6d479e42cbfea549af1479c3f3a4f12c70dd66df12e" dependencies = [ - "lexical-util", + "lexical-util 1.0.6", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" +dependencies = [ "static_assertions", ] @@ -4074,7 +4173,7 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5afc668a27f460fb45a81a757b6bf2f43c2d7e30cb5a2dcd3abf294c78d62bd" dependencies = [ - "lexical-util", + "lexical-util 1.0.6", "lexical-write-integer", "static_assertions", ] @@ -4085,7 +4184,7 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "629ddff1a914a836fb245616a7888b62903aae58fa771e1d83943035efa0f978" dependencies = [ - "lexical-util", + "lexical-util 1.0.6", "static_assertions", ] @@ -4256,6 +4355,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "metrics" version = "0.24.1" @@ -5671,6 +5779,70 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "pyo3" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57fe09249128b3173d092de9523eaa75136bf7ba85e0d69eca241c7939c933cc" +dependencies = [ + "cfg-if", + "indoc", + "libc", + "memoffset", + "num-bigint", + "once_cell", + "portable-atomic", + "pyo3-build-config", + "pyo3-ffi", + "pyo3-macros", + "unindent", +] + +[[package]] +name = "pyo3-build-config" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd3927b5a78757a0d71aa9dff669f903b1eb64b54142a9bd9f757f8fde65fd7" +dependencies = [ + "once_cell", + "target-lexicon", +] + +[[package]] +name = "pyo3-ffi" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dab6bb2102bd8f991e7749f130a70d05dd557613e39ed2deeee8e9ca0c4d548d" +dependencies = [ + "libc", + "pyo3-build-config", +] + +[[package]] +name = "pyo3-macros" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91871864b353fd5ffcb3f91f2f703a22a9797c91b9ab497b1acac7b07ae509c7" +dependencies = [ + "proc-macro2", + "pyo3-macros-backend", + "quote", + "syn 2.0.90", +] + +[[package]] +name = "pyo3-macros-backend" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43abc3b80bc20f3facd86cd3c60beed58c3e2aa26213f3cda368de39c60a27e4" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "pyo3-build-config", + "quote", + "syn 2.0.90", +] + [[package]] name = "quanta" version = "0.12.3" @@ -7060,6 +7232,7 @@ dependencies = [ "chrono", "codederror", "datafusion", + "datafusion-functions-json", "derive_more", "futures", "googletest", @@ -8247,9 +8420,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "sqlparser" -version = "0.50.0" +version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2e5b515a2bd5168426033e9efbfd05500114833916f1d5c268f938b4ee130ac" +checksum = "05a528114c392209b3264855ad491fcce534b94a38771b0a0b97a79379275ce8" dependencies = [ "log", "sqlparser_derive", @@ -8257,9 +8430,9 @@ dependencies = [ [[package]] name = "sqlparser_derive" -version = "0.2.2" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" +checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c" dependencies = [ "proc-macro2", "quote", @@ -8430,6 +8603,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "target-lexicon" +version = "0.12.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" + [[package]] name = "tempfile" version = "3.14.0" @@ -9298,6 +9477,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "unindent" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" + [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index d77329adb9..db4bfd2686 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,13 +101,13 @@ cling = { version = "0.1", default-features = false, features = ["derive"] } criterion = "0.5" crossterm = { version = "0.27.0" } dashmap = { version = "6" } -datafusion = { version = "42.0.0", default-features = false, features = [ +datafusion = { version = "44.0.0", default-features = false, features = [ "crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", ] } -datafusion-expr = { version = "42.0.0" } +datafusion-expr = { version = "44.0.0" } derive_builder = "0.20.0" derive_more = { version = "1", features = ["full"] } dialoguer = { version = "0.11.0" } @@ -237,3 +237,8 @@ strip = true # Automatically strip symbols from the binary. [profile.bench] # Should be enabled for benchmarking runs; increases binary size debug = true + +[profile.dev.package.tikv-jemalloc-sys] +opt-level = 2 + + diff --git a/crates/storage-query-datafusion/Cargo.toml b/crates/storage-query-datafusion/Cargo.toml index 105fd4572c..cd587a59cd 100644 --- a/crates/storage-query-datafusion/Cargo.toml +++ b/crates/storage-query-datafusion/Cargo.toml @@ -29,6 +29,7 @@ bytestring = { workspace = true } chrono = { workspace = true } codederror = { workspace = true } datafusion = { workspace = true } +datafusion-functions-json = { version = "0.44.1" } derive_more = { workspace = true } futures = { workspace = true } paste = { workspace = true } diff --git a/crates/storage-query-datafusion/src/analyzer.rs b/crates/storage-query-datafusion/src/analyzer.rs index 47532b407d..0933ac77d8 100644 --- a/crates/storage-query-datafusion/src/analyzer.rs +++ b/crates/storage-query-datafusion/src/analyzer.rs @@ -15,6 +15,7 @@ use datafusion::logical_expr::{Join, LogicalPlan}; use datafusion::optimizer::analyzer::AnalyzerRule; use datafusion::prelude::Expr; +#[derive(Debug)] pub(crate) struct UseSymmetricHashJoinWhenPartitionKeyIsPresent; impl UseSymmetricHashJoinWhenPartitionKeyIsPresent { diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index 335bb9358a..78b061525f 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -17,13 +17,12 @@ use codederror::CodedError; use datafusion::catalog::TableProvider; use datafusion::error::DataFusionError; use datafusion::execution::context::SQLOptions; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::SessionStateBuilder; use datafusion::physical_optimizer::optimizer::PhysicalOptimizer; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion::sql::TableReference; - use restate_core::Metadata; use restate_invoker_api::StatusHandle; use restate_partition_store::PartitionStoreManager; @@ -34,6 +33,7 @@ use restate_types::live::Live; use restate_types::partition_table::Partition; use restate_types::schema::deployment::DeploymentResolver; use restate_types::schema::service::ServiceMetadataResolver; +use tracing::warn; use crate::remote_query_scanner_manager::RemoteScannerManager; use crate::table_providers::ScanPartition; @@ -224,11 +224,13 @@ impl QueryContext { // // build the runtime // - let mut runtime_config = RuntimeConfig::default().with_memory_limit(memory_limit, 1.0); + let mut runtime_config = RuntimeEnvBuilder::default(); + runtime_config = runtime_config.with_memory_limit(memory_limit, 1.0); + if let Some(folder) = temp_folder { runtime_config = runtime_config.with_temp_file_path(folder); } - let runtime = Arc::new(RuntimeEnv::new(runtime_config).expect("runtime")); + let runtime = runtime_config.build_arc().expect("runtime"); // // build the session // @@ -290,7 +292,15 @@ impl QueryContext { state_builder.with_physical_optimizer_rules(default_physical_optimizer_rules); let state = state_builder.build(); - let ctx = SessionContext::new_with_state(state); + + let mut ctx = SessionContext::new_with_state(state); + + match datafusion_functions_json::register_all(&mut ctx) { + Ok(_) => {} + Err(err) => { + warn!("Unable to register json functions {}", err); + } + }; let sql_options = SQLOptions::new() .with_allow_ddl(false) diff --git a/crates/storage-query-datafusion/src/partition_filter.rs b/crates/storage-query-datafusion/src/partition_filter.rs index c2f9f63c55..69b4f25e0a 100644 --- a/crates/storage-query-datafusion/src/partition_filter.rs +++ b/crates/storage-query-datafusion/src/partition_filter.rs @@ -13,12 +13,14 @@ use datafusion::common::ScalarValue; use datafusion::logical_expr::{col, BinaryExpr, Expr, Operator}; use restate_types::identifiers::partitioner::HashPartitioner; use restate_types::identifiers::{InvocationId, PartitionKey, WithPartitionKey}; +use std::fmt::{Debug, Formatter}; use std::str::FromStr; -pub trait PartitionKeyExtractor: Send + Sync + 'static { +pub trait PartitionKeyExtractor: Send + Sync + 'static + Debug { fn try_extract(&self, filters: &[Expr]) -> anyhow::Result>; } +#[derive(Debug)] pub struct FirstMatchingPartitionKeyExtractor { extractors: Vec>, } @@ -72,6 +74,12 @@ pub(crate) struct MatchingColumnExtractor { extractor: F, } +impl Debug for MatchingColumnExtractor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("MatchingColumnExtractor{:?}", self.column)) + } +} + impl MatchingColumnExtractor { pub(crate) fn new(column_name: impl Into, extractor: F) -> Self { let column = col(column_name.into()); @@ -103,6 +111,7 @@ where } } +#[derive(Debug)] struct IdentityPartitionKeyExtractor(Expr); impl IdentityPartitionKeyExtractor { diff --git a/crates/storage-query-datafusion/src/physical_optimizer.rs b/crates/storage-query-datafusion/src/physical_optimizer.rs index 9808dcb479..5cb687b2fa 100644 --- a/crates/storage-query-datafusion/src/physical_optimizer.rs +++ b/crates/storage-query-datafusion/src/physical_optimizer.rs @@ -19,6 +19,7 @@ use datafusion::physical_plan::joins::{ use datafusion::physical_plan::ExecutionPlan; use std::sync::Arc; +#[derive(Debug)] pub(crate) struct JoinRewrite; impl JoinRewrite { @@ -60,16 +61,8 @@ impl PhysicalOptimizerRule for JoinRewrite { hash_join.filter().cloned(), hash_join.join_type(), hash_join.null_equals_null(), - hash_join - .left() - .properties() - .output_ordering() - .map(|s| s.to_vec()), - hash_join - .right() - .properties() - .output_ordering() - .map(|s| s.to_vec()), + hash_join.left().properties().output_ordering().cloned(), + hash_join.right().properties().output_ordering().cloned(), StreamJoinPartitionMode::Partitioned, ) else { return Ok(Transformed::no(plan)); diff --git a/crates/storage-query-datafusion/src/table_providers.rs b/crates/storage-query-datafusion/src/table_providers.rs index 73e8463524..0cd29d2774 100644 --- a/crates/storage-query-datafusion/src/table_providers.rs +++ b/crates/storage-query-datafusion/src/table_providers.rs @@ -23,8 +23,9 @@ use datafusion::datasource::{TableProvider, TableType}; use datafusion::execution::context::TaskContext; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, SendableRecordBatchStream, }; use restate_types::identifiers::{PartitionId, PartitionKey}; @@ -39,6 +40,7 @@ pub trait ScanPartition: Send + Sync + Debug + 'static { ) -> anyhow::Result; } +#[derive(Debug)] pub(crate) struct PartitionedTableProvider { partition_selector: S, schema: SchemaRef, @@ -136,7 +138,8 @@ where let plan = PlanProperties::new( eq_properties, Partitioning::UnknownPartitioning(required_partitions.len()), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ); Ok(Arc::new(PartitionedExecutionPlan { @@ -248,6 +251,8 @@ pub(crate) trait Scan: Debug + Send + Sync + 'static { } pub(crate) type ScannerRef = Arc; + +#[derive(Debug)] pub(crate) struct GenericTableProvider { schema: SchemaRef, scanner: ScannerRef, @@ -327,7 +332,8 @@ impl GenericExecutionPlan { let plan_properties = PlanProperties::new( eq_properties, Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ); Self { diff --git a/crates/storage-query-datafusion/src/table_util.rs b/crates/storage-query-datafusion/src/table_util.rs index 8149d782d7..9ef55878d6 100644 --- a/crates/storage-query-datafusion/src/table_util.rs +++ b/crates/storage-query-datafusion/src/table_util.rs @@ -11,7 +11,7 @@ use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; use datafusion::physical_expr::expressions::col; -use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; use std::fmt::Write; use tracing::error; @@ -28,11 +28,11 @@ macro_rules! log_data_corruption_error { }; } -pub(crate) fn compute_ordering(schema: SchemaRef) -> Option> { - let ordering = vec![PhysicalSortExpr { +pub(crate) fn compute_ordering(schema: SchemaRef) -> Option { + let ordering = LexOrdering::new(vec![PhysicalSortExpr { expr: col("partition_key", &schema).ok()?, options: Default::default(), - }]; + }]); Some(ordering) }