diff --git a/.asf.yaml b/.asf.yaml index f27975c8..de776871 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -22,7 +22,7 @@ notifications: jira_options: link label worklog github: description: "Apache Arrow DataFusion Python Bindings" - homepage: https://arrow.apache.org/datafusion + homepage: https://arrow.apache.org/datafusion-python enabled_merge_buttons: squash: true merge: false diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..411e6029 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,12 @@ +.cargo +.github +.pytest_cache +ci +conda +dev +docs +examples +parquet +target +testing +venv \ No newline at end of file diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index c69205be..f672c812 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -101,7 +101,7 @@ jobs: if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }} run: | source venv/bin/activate - flake8 --exclude venv --ignore=E501,W503 + flake8 --exclude venv,benchmarks/db-benchmark --ignore=E501,W503 black --line-length 79 --diff --check . - name: Run tests diff --git a/.gitignore b/.gitignore index 4e445008..1d0a84a4 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,6 @@ venv apache-rat-*.jar *rat.txt .env -CHANGELOG.md.bak \ No newline at end of file +CHANGELOG.md.bak + +docs/mdbook/book \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 26c4eb18..22d7c0f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,12 +19,35 @@ # Changelog -## [23.0.0](https://github.com/apache/arrow-datafusion-python/tree/23.0.0) (2023-04-23) +## [24.0.0](https://github.com/apache/arrow-datafusion-python/tree/24.0.0) (2023-05-09) -[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/22.0.0...23.0.0) +[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/23.0.0...24.0.0) + +**Documentation updates:** + +- Fix link to user guide [#354](https://github.com/apache/arrow-datafusion-python/pull/354) (andygrove) **Merged pull requests:** +- Add interface to serialize Substrait plans to Python Bytes. [#344](https://github.com/apache/arrow-datafusion-python/pull/344) (kylebrooks-8451) +- Add partition_count property to ExecutionPlan. [#346](https://github.com/apache/arrow-datafusion-python/pull/346) (kylebrooks-8451) +- Remove unsendable from all Rust pyclass types. [#348](https://github.com/apache/arrow-datafusion-python/pull/348) (kylebrooks-8451) +- Fix link to user guide [#354](https://github.com/apache/arrow-datafusion-python/pull/354) (andygrove) +- Fix SessionContext execute. [#353](https://github.com/apache/arrow-datafusion-python/pull/353) (kylebrooks-8451) +- Pub mod expr in lib.rs [#357](https://github.com/apache/arrow-datafusion-python/pull/357) (jdye64) +- Add benchmark derived from TPC-H [#355](https://github.com/apache/arrow-datafusion-python/pull/355) (andygrove) +- Add db-benchmark [#365](https://github.com/apache/arrow-datafusion-python/pull/365) (andygrove) +- First pass of documentation in mdBook [#364](https://github.com/apache/arrow-datafusion-python/pull/364) (MrPowers) +- Add 'pub' and '#[pyo3(get, set)]' to DataTypeMap [#371](https://github.com/apache/arrow-datafusion-python/pull/371) (jdye64) +- Fix db-benchmark [#369](https://github.com/apache/arrow-datafusion-python/pull/369) (andygrove) +- Docs explaining how to view query plans [#373](https://github.com/apache/arrow-datafusion-python/pull/373) (andygrove) +- Improve db-benchmark [#372](https://github.com/apache/arrow-datafusion-python/pull/372) (andygrove) +- Make expr member of PyExpr public [#375](https://github.com/apache/arrow-datafusion-python/pull/375) (jdye64) + +## [23.0.0](https://github.com/apache/arrow-datafusion-python/tree/23.0.0) (2023-04-23) + +[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/22.0.0...23.0.0) + **Merged pull requests:** - Improve API docs, README, and examples for configuring context [#321](https://github.com/apache/arrow-datafusion-python/pull/321) (andygrove) diff --git a/Cargo.lock b/Cargo.lock index 2622625c..17acaf7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -113,9 +113,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aea9fcb25bbb70f7f922f95b99ca29c1013dab47f6df61a6f24861842dd7f2e" +checksum = "c107a57b5913d852da9d5a40e280e4695f2258b5b87733c13b770c63a7117287" dependencies = [ "ahash", "arrow-arith", @@ -136,9 +136,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d967b42f7b12c91fd78acd396b20c2973b184c8866846674abbb00c963e93ab" +checksum = "ace6aa3d5617c5d03041a05e01c6819428a8ddf49dd0b055df9b40fef9d96094" dependencies = [ "arrow-array", "arrow-buffer", @@ -151,9 +151,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3190f208ee7aa0f3596fa0098d42911dec5e123ca88c002a08b24877ad14c71e" +checksum = "104a04520692cc674e6afd7682f213ca41f9b13ff1873f63a5a2857a590b87b3" dependencies = [ "ahash", "arrow-buffer", @@ -168,9 +168,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d33c733c5b6c44a0fc526f29c09546e04eb56772a7a21e48e602f368be381f6" +checksum = "72c875bcb9530ec403998fb0b2dc6d180a7c64563ca4bc22b90eafb84b113143" dependencies = [ "half", "num", @@ -178,9 +178,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abd349520b6a1ed4924ae2afc9d23330a3044319e4ec3d5b124c09e4d440ae87" +checksum = "d6d6e18281636c8fc0b93be59834da6bf9a72bb70fd0c98ddfdaf124da466c28" dependencies = [ "arrow-array", "arrow-buffer", @@ -195,9 +195,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c80af3c3e290a2a7e1cc518f1471dff331878cb4af9a5b088bf030b89debf649" +checksum = "3197dab0963a236ff8e7c82e2272535745955ac1321eb740c29f2f88b353f54e" dependencies = [ "arrow-array", "arrow-buffer", @@ -214,9 +214,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c8361947aaa96d331da9df3f7a08bdd8ab805a449994c97f5c4d24c4b7e2cf" +checksum = "eb68113d6ecdbe8bba48b2c4042c151bf9e1c61244e45072a50250a6fc59bafe" dependencies = [ "arrow-buffer", "arrow-schema", @@ -226,9 +226,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a46ee000b9fbd1e8db6e8b26acb8c760838512b39d8c9f9d73892cb55351d50" +checksum = "eab4bbf2dd3078facb5ce0a9641316a64f42bfd8cf357e6775c8a5e6708e3a8d" dependencies = [ "arrow-array", "arrow-buffer", @@ -240,9 +240,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bf2366607be867ced681ad7f272371a5cf1fc2941328eef7b4fee14565166fb" +checksum = "48c5b650d23746a494665d914a7fa3d21d939153cff9d53bdebe39bffa88f263" dependencies = [ "arrow-array", "arrow-buffer", @@ -260,9 +260,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "304069901c867200e21ec868ae7521165875470ef2f1f6d58f979a443d63997e" +checksum = "68c6fce28e5011e30acc7466b5efcb8ed0197c396240bd2b10e167f275a3c208" dependencies = [ "arrow-array", "arrow-buffer", @@ -275,9 +275,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d57fe8ceef3392fdd493269d8a2d589de17bafce151aacbffbddac7a57f441a" +checksum = "f20a421f19799d8b93eb8edde5217e910fa1e2d6ceb3c529f000e57b6db144c0" dependencies = [ "ahash", "arrow-array", @@ -290,18 +290,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a16b88a93ac8350f0200b1cd336a1f887315925b8dd7aa145a37b8bdbd8497a4" +checksum = "bc85923d8d6662cc66ac6602c7d1876872e671002d60993dfdf492a6badeae92" dependencies = [ "bitflags 2.2.1", ] [[package]] name = "arrow-select" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98e8a4d6ca37d5212439b24caad4d80743fcbb706706200dd174bb98e68fe9d8" +checksum = "f6ab6613ce65b61d85a3410241744e84e48fbab0fe06e1251b4429d21b3470fd" dependencies = [ "arrow-array", "arrow-buffer", @@ -312,9 +312,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbb594efa397eb6a546f42b1f8df3d242ea84dbfda5232e06035dc2b2e2c8459" +checksum = "f3008641239e884aefba66d8b8532da6af40d14296349fcc85935de4ba67b89e" dependencies = [ "arrow-array", "arrow-buffer", @@ -736,9 +736,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "23.0.0" +version = "24.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a7d4b334f4512ff2fdbce87f511f570ae895af1ac7c729e77c12583253b22a" +checksum = "0404a559d5a6d8320369bb0a290b43bbc4f8622d0ef6f04bd095ace9a663f439" dependencies = [ "ahash", "apache-avro", @@ -788,9 +788,9 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "23.0.0" +version = "24.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80abfcb1dbc6390f952f21de9069e6177ad6318fcae5fbceabb50666d96533dd" +checksum = "4653b79a55161852973760db69ea6dcd05c9966a1b588fd83028f625536a1d7f" dependencies = [ "apache-avro", "arrow", @@ -805,9 +805,9 @@ dependencies = [ [[package]] name = "datafusion-execution" -version = "23.0.0" +version = "24.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df2524f1b4b58319895b112809d2a59e54fa662d0e46330a455f22882c2cb7b9" +checksum = "53481c334b73c6759697919d1d05690392381145fa1890849a65b5a71a24a1ec" dependencies = [ "dashmap", "datafusion-common", @@ -823,9 +823,9 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "23.0.0" +version = "24.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af8040b7a75b04685f4db0a1b11ffa93cd163c1bc13751df3f5cf76baabaf5a1" +checksum = "a8ecd7c6605d0b4269346d03289e2ced1715a303e75e6d313dba0bafb1f823f2" dependencies = [ "ahash", "arrow", @@ -835,9 +835,9 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "23.0.0" +version = "24.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74ceae25accc0f640a4238283f55f3a9fd181d55398703a4330fb2c46261e6a2" +checksum = "70a7c04e94cb4aa9c323993856e18b91f690dda0358a34ab07a3fe0f14bc6600" dependencies = [ "arrow", "async-trait", @@ -848,14 +848,14 @@ dependencies = [ "hashbrown 0.13.2", "itertools", "log", - "regex-syntax 0.6.29", + "regex-syntax 0.7.1", ] [[package]] name = "datafusion-physical-expr" -version = "23.0.0" +version = "24.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df4cf228b312f2758cb78e93fe3d2dc602345028efdf7cfa5b338cb370d0a347" +checksum = "9e34eb8668fee1443965fff41ba73b2956d50a07ed8dd929cfa2e839ab91da5a" dependencies = [ "ahash", "arrow", @@ -886,7 +886,7 @@ dependencies = [ [[package]] name = "datafusion-python" -version = "23.0.0" +version = "24.0.0" dependencies = [ "async-trait", "datafusion", @@ -899,6 +899,8 @@ dependencies = [ "mimalloc", "object_store", "parking_lot", + "prost", + "prost-types", "pyo3", "pyo3-build-config", "rand", @@ -911,9 +913,9 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "23.0.0" +version = "24.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b52b486fb3d81bb132e400304be01af5aba0ad6737e3518045bb98944991fe32" +checksum = "efa800ae88dfd62ea6c58c24a1154d92937c755672f522b84e8ea6539fad369b" dependencies = [ "arrow", "datafusion-common", @@ -923,9 +925,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "23.0.0" +version = "24.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773e985c182e41cfd68f7a7b483ab6bfb68beaac241c348cd4b1bf9f9d61b762" +checksum = "556642ef90073e39af721362353ccce4e1f418da7a8e31c23510ed9de6eb71f2" dependencies = [ "arrow", "arrow-schema", @@ -937,9 +939,9 @@ dependencies = [ [[package]] name = "datafusion-substrait" -version = "23.0.0" +version = "24.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836e9b1c0ea430199c9bd4b88024cb8d617e3768ffdb412064169e2504a850ed" +checksum = "0d7643a77bb446047095ec21b913adb900b71c7a2ae600f8062906dd2e5642b9" dependencies = [ "async-recursion", "chrono", @@ -962,26 +964,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dirs" -version = "4.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" -dependencies = [ - "dirs-sys", -] - -[[package]] -name = "dirs-sys" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" -dependencies = [ - "libc", - "redox_users", - "winapi", -] - [[package]] name = "doc-comment" version = "0.3.3" @@ -1210,9 +1192,9 @@ dependencies = [ [[package]] name = "gix" -version = "0.43.1" +version = "0.44.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c256ea71cc1967faaefdaad15f334146b7c806f12460dcafd3afed845c8c78dd" +checksum = "6bf41b61f7df395284f7a579c0fa1a7e012c5aede655174d4e91299ef1cac643" dependencies = [ "gix-actor", "gix-attributes", @@ -1222,9 +1204,11 @@ dependencies = [ "gix-diff", "gix-discover", "gix-features", + "gix-fs", "gix-glob", "gix-hash", "gix-hashtable", + "gix-ignore", "gix-index", "gix-lock", "gix-mailmap", @@ -1240,6 +1224,7 @@ dependencies = [ "gix-tempfile", "gix-traverse", "gix-url", + "gix-utils", "gix-validate", "gix-worktree", "log", @@ -1252,9 +1237,9 @@ dependencies = [ [[package]] name = "gix-actor" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc22b0cdc52237667c301dd7cdc6ead8f8f73c9f824e9942c8ebd6b764f6c0bf" +checksum = "848efa0f1210cea8638f95691c82a46f98a74b9e3524f01d4955ebc25a8f84f3" dependencies = [ "bstr", "btoi", @@ -1266,24 +1251,26 @@ dependencies = [ [[package]] name = "gix-attributes" -version = "0.10.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2231a25934a240d0a4b6f4478401c73ee81d8be52de0293eedbc172334abf3e1" +checksum = "3015baa01ad2122fbcaab7863c857a603eb7b7ec12ac8141207c42c6439805e2" dependencies = [ "bstr", - "gix-features", "gix-glob", "gix-path", "gix-quote", + "kstring", + "log", + "smallvec", "thiserror", "unicode-bom", ] [[package]] name = "gix-bitmap" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "024bca0c7187517bda5ea24ab148c9ca8208dd0c3e2bea88cdb2008f91791a6d" +checksum = "55a95f4942360766c3880bdb2b4b57f1ef73b190fc424755e7fdf480430af618" dependencies = [ "thiserror", ] @@ -1308,9 +1295,9 @@ dependencies = [ [[package]] name = "gix-config" -version = "0.20.1" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fbad5ce54a8fc997acc50febd89ec80fa6e97cb7f8d0654cb229936407489d8" +checksum = "1d252a0eddb6df74600d3d8872dc9fe98835a7da43110411d705b682f49d4ac1" dependencies = [ "bstr", "gix-config-value", @@ -1330,11 +1317,11 @@ dependencies = [ [[package]] name = "gix-config-value" -version = "0.10.2" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d09154c0c8677e4da0ec35e896f56ee3e338e741b9599fae06075edd83a4081c" +checksum = "786861e84a5793ad5f863d846de5eb064cd23b87e61ad708c8c402608202e7be" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.2.1", "bstr", "gix-path", "libc", @@ -1343,9 +1330,9 @@ dependencies = [ [[package]] name = "gix-credentials" -version = "0.12.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "750b684197374518ea057e0a0594713e07683faa0a3f43c0f93d97f64130ad8d" +checksum = "4874a4fc11ffa844a3c2b87a66957bda30a73b577ef1acf15ac34df5745de5ff" dependencies = [ "bstr", "gix-command", @@ -1359,9 +1346,9 @@ dependencies = [ [[package]] name = "gix-date" -version = "0.4.3" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b96271912ce39822501616f177dea7218784e6c63be90d5f36322ff3a722aae2" +checksum = "99056f37270715f5c7584fd8b46899a2296af9cae92463bf58b8bd1f5a78e553" dependencies = [ "bstr", "itoa", @@ -1371,9 +1358,9 @@ dependencies = [ [[package]] name = "gix-diff" -version = "0.28.1" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "103a0fa79b0d438f5ecb662502f052e530ace4fe1fe8e1c83c0c6da76d728e67" +checksum = "644a0f2768bc42d7a69289ada80c9e15c589caefc6a315d2307202df83ed1186" dependencies = [ "gix-hash", "gix-object", @@ -1383,9 +1370,9 @@ dependencies = [ [[package]] name = "gix-discover" -version = "0.16.2" +version = "0.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6eba8ba458cb8f4a6c33409b0fe650b1258655175a7ffd1d24fafd3ed31d880b" +checksum = "1a6b61363e63e7cdaa3e6f96acb0257ebdb3d8883e21eba5930c99f07f0a5fc0" dependencies = [ "bstr", "dunce", @@ -1398,9 +1385,9 @@ dependencies = [ [[package]] name = "gix-features" -version = "0.28.1" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b76f9a80f6dd7be66442ae86e1f534effad9546676a392acc95e269d0c21c22" +checksum = "cf69b0f5c701cc3ae22d3204b671907668f6437ca88862d355eaf9bc47a4f897" dependencies = [ "crc32fast", "flate2", @@ -1413,21 +1400,32 @@ dependencies = [ "walkdir", ] +[[package]] +name = "gix-fs" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b37a1832f691fdc09910bd267f9a2e413737c1f9ec68c6e31f9e802616278a9" +dependencies = [ + "gix-features", +] + [[package]] name = "gix-glob" -version = "0.5.5" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93e43efd776bc543f46f0fd0ca3d920c37af71a764a16f2aebd89765e9ff2993" +checksum = "c07c98204529ac3f24b34754540a852593d2a4c7349008df389240266627a72a" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.2.1", "bstr", + "gix-features", + "gix-path", ] [[package]] name = "gix-hash" -version = "0.10.4" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a258595457bc192d1f1c59d0d168a1e34e2be9b97a614e14995416185de41a7" +checksum = "078eec3ac2808cc03f0bddd2704cb661da5c5dc33b41a9d7947b141d499c7c42" dependencies = [ "hex", "thiserror", @@ -1435,22 +1433,34 @@ dependencies = [ [[package]] name = "gix-hashtable" -version = "0.1.3" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4e55e40dfd694884f0eb78796c5bddcf2f8b295dace47039099dd7e76534973" +checksum = "afebb85691c6a085b114e01a27f4a61364519298c5826cb87a45c304802299bc" dependencies = [ "gix-hash", "hashbrown 0.13.2", "parking_lot", ] +[[package]] +name = "gix-ignore" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba205b6df563e2906768bb22834c82eb46c5fdfcd86ba2c347270bc8309a05b2" +dependencies = [ + "bstr", + "gix-glob", + "gix-path", + "unicode-bom", +] + [[package]] name = "gix-index" -version = "0.15.1" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "717ab601ece7921f59fe86849dbe27d44a46ebb883b5885732c4f30df4996177" +checksum = "f39c1ccc8f1912cbbd5191efc28dbc5f0d0598042aa56bc09427b7c34efab3ba" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.2.1", "bstr", "btoi", "filetime", @@ -1479,9 +1489,9 @@ dependencies = [ [[package]] name = "gix-mailmap" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b66aea5e52875cd4915f4957a6f4b75831a36981e2ec3f5fad9e370e444fe1a" +checksum = "e8856cec3bdc3610c06970d28b6cb20a0c6621621cf9a8ec48cbd23f2630f362" dependencies = [ "bstr", "gix-actor", @@ -1490,9 +1500,9 @@ dependencies = [ [[package]] name = "gix-object" -version = "0.28.0" +version = "0.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8df068db9180ee935fbb70504848369e270bdcb576b05c0faa8b9fd3b86fc017" +checksum = "c9bb30ce0818d37096daa29efe361a4bc6dd0b51a5726598898be7e9a40a01e1" dependencies = [ "bstr", "btoi", @@ -1509,9 +1519,9 @@ dependencies = [ [[package]] name = "gix-odb" -version = "0.43.1" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e83af2e3e36005bfe010927f0dff41fb5acc3e3d89c6f1174135b3a34086bda2" +checksum = "bca2f324aa67672b6d0f2c0fa93f96eb6a7029d260e4c1df5dce3c015f5e5add" dependencies = [ "arc-swap", "gix-features", @@ -1527,9 +1537,9 @@ dependencies = [ [[package]] name = "gix-pack" -version = "0.33.2" +version = "0.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9401911c7fe032ad7b31c6a6b5be59cb283d1d6c999417a8215056efe6d635f3" +checksum = "164a515900a83257ae4aa80e741655bee7a2e39113fb535d7a5ac623b445ff20" dependencies = [ "clru", "gix-chunk", @@ -1549,24 +1559,26 @@ dependencies = [ [[package]] name = "gix-path" -version = "0.7.3" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32370dce200bb951df013e03dff35b4233fc7a89458642b047629b91734a7e19" +checksum = "4fc78f47095a0c15aea0e66103838f0748f4494bf7a9555dfe0f00425400396c" dependencies = [ "bstr", + "home", + "once_cell", "thiserror", ] [[package]] name = "gix-prompt" -version = "0.3.3" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f3034d4d935aef2c7bf719aaa54b88c520e82413118d886ae880a31d5bdee57" +checksum = "330d11fdf88fff3366c2491efde2f3e454958efe7d5ddf60272e8fb1d944bb01" dependencies = [ "gix-command", "gix-config-value", - "nix", "parking_lot", + "rustix", "thiserror", ] @@ -1583,12 +1595,13 @@ dependencies = [ [[package]] name = "gix-ref" -version = "0.27.2" +version = "0.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4e909396ed3b176823991ccc391c276ae2a015e54edaafa3566d35123cfac9d" +checksum = "1e03989e9d49954368e1b526578230fc7189d1634acdfbe79e9ba1de717e15d5" dependencies = [ "gix-actor", "gix-features", + "gix-fs", "gix-hash", "gix-lock", "gix-object", @@ -1602,9 +1615,9 @@ dependencies = [ [[package]] name = "gix-refspec" -version = "0.9.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aba332462bda2e8efeae4302b39a6ed01ad56ef772fd5b7ef197cf2798294d65" +checksum = "0a6ea733820df67e4cd7797deb12727905824d8f5b7c59d943c456d314475892" dependencies = [ "bstr", "gix-hash", @@ -1616,9 +1629,9 @@ dependencies = [ [[package]] name = "gix-revision" -version = "0.12.2" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c6f6ff53f888858afc24bf12628446a14279ceec148df6194481f306f553ad2" +checksum = "810f35e9afeccca999d5d348b239f9c162353127d2e13ff3240e31b919e35476" dependencies = [ "bstr", "gix-date", @@ -1630,15 +1643,14 @@ dependencies = [ [[package]] name = "gix-sec" -version = "0.6.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8ffa5bf0772f9b01de501c035b6b084cf9b8bb07dec41e3afc6a17336a65f47" +checksum = "794520043d5a024dfeac335c6e520cb616f6963e30dab995892382e998c12897" dependencies = [ - "bitflags 1.3.2", - "dirs", + "bitflags 2.2.1", "gix-path", "libc", - "windows 0.43.0", + "windows", ] [[package]] @@ -1657,9 +1669,9 @@ dependencies = [ [[package]] name = "gix-traverse" -version = "0.24.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd9a4a07bb22168dc79c60e1a6a41919d198187ca83d8a5940ad8d7122a45df3" +checksum = "a5be1e807f288c33bb005075111886cceb43ed8a167b3182a0f62c186e2a0dd1" dependencies = [ "gix-hash", "gix-hashtable", @@ -1669,9 +1681,9 @@ dependencies = [ [[package]] name = "gix-url" -version = "0.16.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6a22b4b32ad14d68f7b7fb6458fa58d44b01797d94c1b8f4db2d9c7b3c366b5" +checksum = "dfc77f89054297cc81491e31f1bab4027e554b5ef742a44bd7035db9a0f78b76" dependencies = [ "bstr", "gix-features", @@ -1681,6 +1693,15 @@ dependencies = [ "url", ] +[[package]] +name = "gix-utils" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c10b69beac219acb8df673187a1f07dde2d74092f974fb3f9eb385aeb667c909" +dependencies = [ + "fastrand", +] + [[package]] name = "gix-validate" version = "0.7.4" @@ -1693,15 +1714,18 @@ dependencies = [ [[package]] name = "gix-worktree" -version = "0.15.2" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54ec9a000b4f24af706c3cc680c7cda235656cbe3216336522f5692773b8a301" +checksum = "a69eaff0ae973a9d37c40f02ae5ae50fa726c8fc2fd3ab79d0a19eb61975aafa" dependencies = [ "bstr", + "filetime", "gix-attributes", "gix-features", + "gix-fs", "gix-glob", "gix-hash", + "gix-ignore", "gix-index", "gix-object", "gix-path", @@ -1877,7 +1901,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows 0.48.0", + "windows", ] [[package]] @@ -2001,6 +2025,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kstring" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3066350882a1cd6d950d055997f379ac37fd39f81cd4d8ed186032eb3c5747" +dependencies = [ + "static_assertions", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -2259,18 +2292,6 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" -[[package]] -name = "nix" -version = "0.26.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" -dependencies = [ - "bitflags 1.3.2", - "cfg-if", - "libc", - "static_assertions", -] - [[package]] name = "nom" version = "7.1.3" @@ -2445,9 +2466,9 @@ dependencies = [ [[package]] name = "parquet" -version = "37.0.0" +version = "38.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5022d98333271f4ca3e87bab760498e61726bf5a6ca919123c80517e20ded29" +checksum = "4cbd51311f8d9ff3d2697b1522b18a588782e097d313a1a278b0faf2ccf2d3f6" dependencies = [ "ahash", "arrow-array", @@ -2467,6 +2488,7 @@ dependencies = [ "lz4", "num", "num-bigint", + "object_store", "paste", "seq-macro", "snap", @@ -2794,17 +2816,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "redox_users" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" -dependencies = [ - "getrandom", - "redox_syscall 0.2.16", - "thiserror", -] - [[package]] name = "regex" version = "1.8.1" @@ -3256,9 +3267,9 @@ dependencies = [ [[package]] name = "substrait" -version = "0.7.5" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3ae64fb7ad0670c7d6d53d57b1b91beb2212afc30e164cc8edb02d6b2cff32a" +checksum = "54cd43d44620f716d55d46b998b3cf1baab2935aaa8adc14e3d3d9a465ddae15" dependencies = [ "gix", "heck", @@ -3605,9 +3616,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" [[package]] name = "unicode-bom" -version = "1.1.4" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63ec69f541d875b783ca40184d655f2927c95f0bffd486faa83cd3ac3529ec32" +checksum = "98e90c70c9f0d4d1ee6d0a7d04aa06cb9bbd53d8cfbdd62a0269a7c2eb640552" [[package]] name = "unicode-ident" @@ -3857,21 +3868,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "windows" -version = "0.43.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04662ed0e3e5630dfa9b26e4cb823b817f1a9addda855d973a9458c236556244" -dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", -] - [[package]] name = "windows" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 8c205a4c..4c6248a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "datafusion-python" -version = "23.0.0" +version = "24.0.0" homepage = "https://github.com/apache/arrow-datafusion-python" repository = "https://github.com/apache/arrow-datafusion-python" authors = ["Apache Arrow "] @@ -36,12 +36,14 @@ protoc = [ "datafusion-substrait/protoc" ] tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync"] } rand = "0.8" pyo3 = { version = "0.18.1", features = ["extension-module", "abi3", "abi3-py37"] } -datafusion = { version = "23.0.0" , features = ["pyarrow", "avro"] } -datafusion-common = { version = "23.0.0", features = ["pyarrow"] } -datafusion-expr = "23.0.0" -datafusion-optimizer = "23.0.0" -datafusion-sql = "23.0.0" -datafusion-substrait = "23.0.0" +datafusion = { version = "24.0.0" , features = ["pyarrow", "avro"] } +datafusion-common = { version = "24.0.0", features = ["pyarrow"] } +datafusion-expr = "24.0.0" +datafusion-optimizer = "24.0.0" +datafusion-sql = "24.0.0" +datafusion-substrait = "24.0.0" +prost = "0.11" +prost-types = "0.11" uuid = { version = "1.2", features = ["v4"] } mimalloc = { version = "0.1", optional = true, default-features = false } async-trait = "0.1" diff --git a/benchmarks/db-benchmark/README.md b/benchmarks/db-benchmark/README.md new file mode 100644 index 00000000..8ce45344 --- /dev/null +++ b/benchmarks/db-benchmark/README.md @@ -0,0 +1,32 @@ + + +# DataFusion Implementation of db-benchmark + +This directory contains scripts for running [db-benchmark](https://github.com/duckdblabs/db-benchmark) with +DataFusion's Python bindings. + +## Directions + +Run the following from root of this project. + +```bash +docker build -t db-benchmark -f benchmarks/db-benchmark/db-benchmark.dockerfile . +docker run --privileged -it db-benchmark +``` diff --git a/benchmarks/db-benchmark/db-benchmark.dockerfile b/benchmarks/db-benchmark/db-benchmark.dockerfile new file mode 100644 index 00000000..d8842b25 --- /dev/null +++ b/benchmarks/db-benchmark/db-benchmark.dockerfile @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +FROM ubuntu:22.04 +ARG DEBIAN_FRONTEND=noninteractive +ARG TARGETPLATFORM + +# This section is based on https://github.com/duckdblabs/db-benchmark/blob/master/_utils/repro.sh + +RUN apt-get -qq update +RUN apt-get -qq -y upgrade +RUN apt-get -qq install -y apt-utils + +RUN apt-get -qq install -y lsb-release software-properties-common wget curl vim htop git byobu libcurl4-openssl-dev libssl-dev +RUN apt-get -qq install -y libfreetype6-dev +RUN apt-get -qq install -y libfribidi-dev +RUN apt-get -qq install -y libharfbuzz-dev +RUN apt-get -qq install -y git +RUN apt-get -qq install -y libxml2-dev +RUN apt-get -qq install -y make +RUN apt-get -qq install -y libfontconfig1-dev +RUN apt-get -qq install -y libicu-dev pandoc zlib1g-dev libgit2-dev libcurl4-openssl-dev libssl-dev libjpeg-dev libpng-dev libtiff-dev +# apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9 +RUN add-apt-repository "deb [arch=amd64,i386] https://cloud.r-project.org/bin/linux/ubuntu $(lsb_release -cs)-cran40/" + +RUN apt-get -qq install -y r-base-dev virtualenv + +RUN cd /usr/local/lib/R && \ + chmod o+w site-library + +RUN cd / && \ + git clone https://github.com/duckdblabs/db-benchmark.git + +WORKDIR /db-benchmark + +RUN mkdir -p .R && \ + echo 'CFLAGS=-O3 -mtune=native' >> .R/Makevars && \ + echo 'CXXFLAGS=-O3 -mtune=native' >> .R/Makevars + +RUN cd pydatatable && \ + virtualenv py-pydatatable --python=/usr/bin/python3.10 +RUN cd pandas && \ + virtualenv py-pandas --python=/usr/bin/python3.10 +RUN cd modin && \ + virtualenv py-modin --python=/usr/bin/python3.10 + +RUN Rscript -e 'install.packages(c("jsonlite","bit64","devtools","rmarkdown"), dependecies=TRUE, repos="https://cloud.r-project.org")' + +SHELL ["/bin/bash", "-c"] + +RUN source ./pandas/py-pandas/bin/activate && \ + python3 -m pip install --upgrade psutil && \ + python3 -m pip install --upgrade pandas && \ + deactivate + +RUN source ./modin/py-modin/bin/activate && \ + python3 -m pip install --upgrade modin && \ + deactivate + +RUN source ./pydatatable/py-pydatatable/bin/activate && \ + python3 -m pip install --upgrade git+https://github.com/h2oai/datatable && \ + deactivate + +## install dplyr +#RUN Rscript -e 'devtools::install_github(c("tidyverse/readr","tidyverse/dplyr"))' + +# install data.table +RUN Rscript -e 'install.packages("data.table", repos="https://rdatatable.gitlab.io/data.table/")' + +## generate data for groupby 0.5GB +RUN Rscript _data/groupby-datagen.R 1e7 1e2 0 0 +RUN #Rscript _data/groupby-datagen.R 1e8 1e2 0 0 +RUN #Rscript _data/groupby-datagen.R 1e9 1e2 0 0 + +RUN mkdir data && \ + mv G1_1e7_1e2_0_0.csv data/ + +# set only groupby task +RUN echo "Changing run.conf and _control/data.csv to run only groupby at 0.5GB" && \ + cp run.conf run.conf.original && \ + sed -i 's/groupby join groupby2014/groupby/g' run.conf && \ + sed -i 's/data.table dplyr pandas pydatatable spark dask clickhouse polars arrow duckdb/data.table dplyr duckdb/g' run.conf && \ + sed -i 's/DO_PUBLISH=true/DO_PUBLISH=false/g' run.conf + +## set sizes +RUN mv _control/data.csv _control/data.csv.original && \ + echo "task,data,nrow,k,na,sort,active" > _control/data.csv && \ + echo "groupby,G1_1e7_1e2_0_0,1e7,1e2,0,0,1" >> _control/data.csv + +RUN #./dplyr/setup-dplyr.sh +RUN #./datatable/setup-datatable.sh +RUN #./duckdb/setup-duckdb.sh + +# END OF SETUP + +RUN python3 -m pip install --upgrade pandas +RUN python3 -m pip install --upgrade polars psutil +RUN python3 -m pip install --upgrade datafusion + +# Now add our solution +RUN rm -rf datafusion-python 2>/dev/null && \ + mkdir datafusion-python +ADD benchmarks/db-benchmark/*.py datafusion-python/ +ADD benchmarks/db-benchmark/run-bench.sh . + +ENTRYPOINT [ "/db-benchmark/run-bench.sh" ] \ No newline at end of file diff --git a/benchmarks/db-benchmark/groupby-datafusion.py b/benchmarks/db-benchmark/groupby-datafusion.py new file mode 100644 index 00000000..2c35259e --- /dev/null +++ b/benchmarks/db-benchmark/groupby-datafusion.py @@ -0,0 +1,530 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import gc +import timeit +import datafusion as df +from datafusion import ( + col, + functions as f, + RuntimeConfig, + SessionConfig, + SessionContext, +) +import pyarrow +from pyarrow import csv as pacsv + + +print("# groupby-datafusion.py", flush=True) + +exec(open("./_helpers/helpers.py").read()) + + +def ans_shape(batches): + rows, cols = 0, 0 + for batch in batches: + rows += batch.num_rows + if cols == 0: + cols = batch.num_columns + else: + assert cols == batch.num_columns + return rows, cols + + +def execute(df): + print(df.execution_plan().display_indent()) + return df.collect() + + +ver = df.__version__ +git = "" +task = "groupby" +solution = "datafusion" +fun = ".groupby" +cache = "TRUE" +on_disk = "FALSE" + +# experimental - support running with both DataFrame and SQL APIs +sql = True + +data_name = os.environ["SRC_DATANAME"] +src_grp = os.path.join("data", data_name + ".csv") +print("loading dataset %s" % src_grp, flush=True) + +schema = pyarrow.schema( + [ + ("id4", pyarrow.int32()), + ("id5", pyarrow.int32()), + ("id6", pyarrow.int32()), + ("v1", pyarrow.int32()), + ("v2", pyarrow.int32()), + ("v3", pyarrow.float64()), + ] +) + +data = pacsv.read_csv( + src_grp, + convert_options=pacsv.ConvertOptions( + auto_dict_encode=True, column_types=schema + ), +) +print("dataset loaded") + +# create a session context with explicit runtime and config settings +runtime = ( + RuntimeConfig() + .with_disk_manager_os() + .with_fair_spill_pool(64 * 1024 * 1024 * 1024) +) +config = ( + SessionConfig() + .with_repartition_joins(False) + .with_repartition_aggregations(False) + .set("datafusion.execution.coalesce_batches", "false") +) +ctx = SessionContext(config, runtime) +print(ctx) + +ctx.register_record_batches("x", [data.to_batches()]) +print("registered record batches") +# cols = ctx.sql("SHOW columns from x") +# ans.show() + +in_rows = data.num_rows +# print(in_rows, flush=True) + +task_init = timeit.default_timer() + +question = "sum v1 by id1" # q1 +gc.collect() +t_start = timeit.default_timer() +if sql: + df = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1") +else: + df = ctx.table("x").aggregate( + [f.col("id1")], [f.sum(f.col("v1")).alias("v1")] + ) +ans = execute(df) + +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q1: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "sum v1 by id1:id2" # q2 +gc.collect() +t_start = timeit.default_timer() +if sql: + df = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2") +else: + df = ctx.table("x").aggregate( + [f.col("id1"), f.col("id2")], [f.sum(f.col("v1")).alias("v1")] + ) +ans = execute(df) +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q2: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "sum v1 mean v3 by id3" # q3 +gc.collect() +t_start = timeit.default_timer() +if sql: + df = ctx.sql( + "SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3" + ) +else: + df = ctx.table("x").aggregate( + [f.col("id3")], + [ + f.sum(f.col("v1")).alias("v1"), + f.avg(f.col("v3")).alias("v3"), + ], + ) +ans = execute(df) +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q3: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = ( + df.aggregate([], [f.sum(col("v1")), f.sum(col("v3"))]) + .collect()[0] + .to_pandas() + .to_numpy()[0] +) +chkt = timeit.default_timer() - t_start +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "mean v1:v3 by id4" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql( + "SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4" +).collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q4: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = ( + df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]) + .collect()[0] + .to_pandas() + .to_numpy()[0] +) +chkt = timeit.default_timer() - t_start +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "sum v1:v3 by id6" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql( + "SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6" +).collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q5: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = ( + df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]) + .collect()[0] + .to_pandas() + .to_numpy()[0] +) +chkt = timeit.default_timer() - t_start +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "median v3 sd v3 by id4 id5" # q6 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql( + "SELECT id4, id5, approx_percentile_cont(v3, .5) AS median_v3, stddev(v3) AS stddev_v3 FROM x GROUP BY id4, id5" +).collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q6: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = ( + df.aggregate([], [f.sum(col("median_v3")), f.sum(col("stddev_v3"))]) + .collect()[0] + .to_pandas() + .to_numpy()[0] +) +chkt = timeit.default_timer() - t_start +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "max v1 - min v2 by id3" # q7 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql( + "SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3" +).collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q7: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("range_v1_v2"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "largest two v3 by id6" # q8 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql( + "SELECT id6, v3 from (SELECT id6, v3, row_number() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS row FROM x) t WHERE row <= 2" +).collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q8: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v3"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "regression v1 v2 by id2 id4" # q9 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT corr(v1, v2) as corr FROM x GROUP BY id2, id4").collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q9: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("corr"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "sum v3 count by id1:id6" # q10 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql( + "SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6" +).collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q10: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = ( + df.aggregate([], [f.sum(col("v3")), f.sum(col("cnt"))]) + .collect()[0] + .to_pandas() + .to_numpy()[0] +) +chkt = timeit.default_timer() - t_start +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +print( + "grouping finished, took %0.fs" % (timeit.default_timer() - task_init), + flush=True, +) + +exit(0) diff --git a/benchmarks/db-benchmark/join-datafusion.py b/benchmarks/db-benchmark/join-datafusion.py new file mode 100755 index 00000000..602cee69 --- /dev/null +++ b/benchmarks/db-benchmark/join-datafusion.py @@ -0,0 +1,312 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import gc +import timeit +import datafusion as df +from datafusion import functions as f +from datafusion import col +from pyarrow import csv as pacsv + + +print("# join-datafusion.py", flush=True) + +exec(open("./_helpers/helpers.py").read()) + + +def ans_shape(batches): + rows, cols = 0, 0 + for batch in batches: + rows += batch.num_rows + if cols == 0: + cols = batch.num_columns + else: + assert cols == batch.num_columns + return rows, cols + + +ver = df.__version__ +task = "join" +git = "" +solution = "datafusion" +fun = ".join" +cache = "TRUE" +on_disk = "FALSE" + +data_name = os.environ["SRC_DATANAME"] +src_jn_x = os.path.join("data", data_name + ".csv") +y_data_name = join_to_tbls(data_name) +src_jn_y = [ + os.path.join("data", y_data_name[0] + ".csv"), + os.path.join("data", y_data_name[1] + ".csv"), + os.path.join("data", y_data_name[2] + ".csv"), +] +if len(src_jn_y) != 3: + raise Exception("Something went wrong in preparing files used for join") + +print( + "loading datasets " + + data_name + + ", " + + y_data_name[0] + + ", " + + y_data_name[1] + + ", " + + y_data_name[2], + flush=True, +) + +ctx = df.SessionContext() +print(ctx) + +# TODO we should be applying projections to these table reads to crete relations of different sizes + +x_data = pacsv.read_csv( + src_jn_x, convert_options=pacsv.ConvertOptions(auto_dict_encode=True) +) +ctx.register_record_batches("x", [x_data.to_batches()]) +small_data = pacsv.read_csv( + src_jn_y[0], convert_options=pacsv.ConvertOptions(auto_dict_encode=True) +) +ctx.register_record_batches("small", [small_data.to_batches()]) +medium_data = pacsv.read_csv( + src_jn_y[1], convert_options=pacsv.ConvertOptions(auto_dict_encode=True) +) +ctx.register_record_batches("medium", [medium_data.to_batches()]) +large_data = pacsv.read_csv( + src_jn_y[2], convert_options=pacsv.ConvertOptions(auto_dict_encode=True) +) +ctx.register_record_batches("large", [large_data.to_batches()]) + +print(x_data.num_rows, flush=True) +print(small_data.num_rows, flush=True) +print(medium_data.num_rows, flush=True) +print(large_data.num_rows, flush=True) + +task_init = timeit.default_timer() +print("joining...", flush=True) + +question = "small inner on int" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql( + "SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1" +).collect() +# ans = ctx.sql("SELECT * FROM x INNER JOIN small ON x.id1 = small.id1").collect() +# print(set([b.schema for b in ans])) +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q1: {t}") +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log( + task=task, + data=data_name, + in_rows=x_data.num_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "medium inner on int" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql( + "SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x INNER JOIN medium ON x.id2 = medium.id2" +).collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q2: {t}") +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = ( + df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]) + .collect()[0] + .column(0)[0] +) +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log( + task=task, + data=data_name, + in_rows=x_data.num_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "medium outer on int" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql( + "SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id2 = medium.id2" +).collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q3: {t}") +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = ( + df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]) + .collect()[0] + .column(0)[0] +) +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log( + task=task, + data=data_name, + in_rows=x_data.num_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "medium inner on factor" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql( + "SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id5 = medium.id5" +).collect() +shape = ans_shape(ans) +# print(shape) +t = timeit.default_timer() - t_start +print(f"q4: {t}") +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = ( + df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]) + .collect()[0] + .column(0)[0] +) +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log( + task=task, + data=data_name, + in_rows=x_data.num_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +question = "big inner on int" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql( + "SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x LEFT JOIN large ON x.id3 = large.id3" +).collect() +shape = ans_shape(ans) +# print(shape) +t = timeit.default_timer() - t_start +print(f"q5: {t}") +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = ( + df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]) + .collect()[0] + .column(0)[0] +) +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log( + task=task, + data=data_name, + in_rows=x_data.num_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) +del ans +gc.collect() + +print( + "joining finished, took %0.fs" % (timeit.default_timer() - task_init), + flush=True, +) + +exit(0) diff --git a/benchmarks/db-benchmark/run-bench.sh b/benchmarks/db-benchmark/run-bench.sh new file mode 100755 index 00000000..36a6087d --- /dev/null +++ b/benchmarks/db-benchmark/run-bench.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +set -e + +#SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/polars/groupby-polars.py +SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/datafusion-python/groupby-datafusion.py + +# joins need more work still +#SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/datafusion-python/join-datafusion.py +#SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/polars/join-polars.py + +cat time.csv diff --git a/benchmarks/tpch/.gitignore b/benchmarks/tpch/.gitignore new file mode 100644 index 00000000..4471c6d1 --- /dev/null +++ b/benchmarks/tpch/.gitignore @@ -0,0 +1,2 @@ +data +results.csv \ No newline at end of file diff --git a/benchmarks/tpch/README.md b/benchmarks/tpch/README.md new file mode 100644 index 00000000..a118a744 --- /dev/null +++ b/benchmarks/tpch/README.md @@ -0,0 +1,78 @@ + + +# DataFusion Python Benchmarks Derived from TPC-H + +## Create Release Build + +From repo root: + +```bash +maturin develop --release +``` + +Note that release builds take a really long time, so you may want to temporarily comment out this section of the +root Cargo.toml when frequently building. + +```toml +[profile.release] +lto = true +codegen-units = 1 +``` + +## Generate Data + +```bash +./tpch-gen.sh 1 +``` + +## Run Benchmarks + +```bash +python tpch.py ./data ./queries +``` + +A summary of the benchmark timings will be written to `results.csv`. For example: + +```csv +setup,1.4 +q1,2978.6 +q2,679.7 +q3,2943.7 +q4,2894.9 +q5,3592.3 +q6,1691.4 +q7,3003.9 +q8,3818.7 +q9,4237.9 +q10,2344.7 +q11,526.1 +q12,2284.6 +q13,1009.2 +q14,1738.4 +q15,1942.1 +q16,499.8 +q17,5178.9 +q18,4127.7 +q19,2056.6 +q20,2162.5 +q21,8046.5 +q22,754.9 +total,58513.2 +``` \ No newline at end of file diff --git a/benchmarks/tpch/create_tables.sql b/benchmarks/tpch/create_tables.sql new file mode 100644 index 00000000..4b2209c4 --- /dev/null +++ b/benchmarks/tpch/create_tables.sql @@ -0,0 +1,119 @@ +-- Schema derived from TPC-H schema under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. + +CREATE EXTERNAL TABLE customer ( + c_custkey INT NOT NULL, + c_name VARCHAR NOT NULL, + c_address VARCHAR NOT NULL, + c_nationkey INT NOT NULL, + c_phone VARCHAR NOT NULL, + c_acctbal DECIMAL(15, 2) NOT NULL, + c_mktsegment VARCHAR NOT NULL, + c_comment VARCHAR NOT NULL, + c_extra VARCHAR NOT NULL, +) +STORED AS CSV +WITH HEADER ROW DELIMITER '|' +LOCATION '$PATH/customer.csv'; + +CREATE EXTERNAL TABLE lineitem ( + l_orderkey INT NOT NULL, + l_partkey INT NOT NULL, + l_suppkey INT NOT NULL, + l_linenumber INT NOT NULL, + l_quantity DECIMAL(15, 2) NOT NULL, + l_extendedprice DECIMAL(15, 2) NOT NULL, + l_discount DECIMAL(15, 2) NOT NULL, + l_tax DECIMAL(15, 2) NOT NULL, + l_returnflag VARCHAR NOT NULL, + l_linestatus VARCHAR NOT NULL, + l_shipdate DATE NOT NULL, + l_commitdate DATE NOT NULL, + l_receiptdate DATE NOT NULL, + l_shipinstruct VARCHAR NOT NULL, + l_shipmode VARCHAR NOT NULL, + l_comment VARCHAR NOT NULL, + l_extra VARCHAR NOT NULL, +) +STORED AS CSV +WITH HEADER ROW DELIMITER '|' +LOCATION '$PATH/lineitem.csv'; + +CREATE EXTERNAL TABLE nation ( + n_nationkey INT NOT NULL, + n_name VARCHAR NOT NULL, + n_regionkey INT NOT NULL, + n_comment VARCHAR NOT NULL, + n_extra VARCHAR NOT NULL, +) +STORED AS CSV +WITH HEADER ROW DELIMITER '|' +LOCATION '$PATH/nation.csv'; + +CREATE EXTERNAL TABLE orders ( + o_orderkey INT NOT NULL, + o_custkey INT NOT NULL, + o_orderstatus VARCHAR NOT NULL, + o_totalprice DECIMAL(15, 2) NOT NULL, + o_orderdate DATE NOT NULL, + o_orderpriority VARCHAR NOT NULL, + o_clerk VARCHAR NOT NULL, + o_shippriority INT NULL, + o_comment VARCHAR NOT NULL, + o_extra VARCHAR NOT NULL, +) +STORED AS CSV +WITH HEADER ROW DELIMITER '|' +LOCATION '$PATH/orders.csv'; + +CREATE EXTERNAL TABLE part ( + p_partkey INT NOT NULL, + p_name VARCHAR NOT NULL, + p_mfgr VARCHAR NOT NULL, + p_brand VARCHAR NOT NULL, + p_type VARCHAR NOT NULL, + p_size INT NULL, + p_container VARCHAR NOT NULL, + p_retailprice DECIMAL(15, 2) NOT NULL, + p_comment VARCHAR NOT NULL, + p_extra VARCHAR NOT NULL, +) +STORED AS CSV +WITH HEADER ROW DELIMITER '|' +LOCATION '$PATH/part.csv'; + +CREATE EXTERNAL TABLE partsupp ( + ps_partkey INT NOT NULL, + ps_suppkey INT NOT NULL, + ps_availqty INT NOT NULL, + ps_supplycost DECIMAL(15, 2) NOT NULL, + ps_comment VARCHAR NOT NULL, + ps_extra VARCHAR NOT NULL, +) +STORED AS CSV +WITH HEADER ROW DELIMITER '|' +LOCATION '$PATH/partsupp.csv'; + +CREATE EXTERNAL TABLE region ( + r_regionkey INT NOT NULL, + r_name VARCHAR NOT NULL, + r_comment VARCHAR NOT NULL, + r_extra VARCHAR NOT NULL, +) +STORED AS CSV +WITH HEADER ROW DELIMITER '|' +LOCATION '$PATH/region.csv'; + +CREATE EXTERNAL TABLE supplier ( + s_suppkey INT NOT NULL, + s_name VARCHAR NOT NULL, + s_address VARCHAR NOT NULL, + s_nationkey INT NOT NULL, + s_phone VARCHAR NOT NULL, + s_acctbal DECIMAL(15, 2) NOT NULL, + s_comment VARCHAR NOT NULL, + s_extra VARCHAR NOT NULL, +) +STORED AS CSV +WITH HEADER ROW DELIMITER '|' +LOCATION '$PATH/supplier.csv'; \ No newline at end of file diff --git a/benchmarks/tpch/queries/q1.sql b/benchmarks/tpch/queries/q1.sql new file mode 100644 index 00000000..e7e8e32b --- /dev/null +++ b/benchmarks/tpch/queries/q1.sql @@ -0,0 +1,23 @@ +-- Benchmark Query 1 derived from TPC-H query 1 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + l_returnflag, + l_linestatus, + sum(l_quantity) as sum_qty, + sum(l_extendedprice) as sum_base_price, + sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, + sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, + avg(l_quantity) as avg_qty, + avg(l_extendedprice) as avg_price, + avg(l_discount) as avg_disc, + count(*) as count_order +from + lineitem +where + l_shipdate <= date '1998-12-01' - interval '68 days' +group by + l_returnflag, + l_linestatus +order by + l_returnflag, + l_linestatus; diff --git a/benchmarks/tpch/queries/q10.sql b/benchmarks/tpch/queries/q10.sql new file mode 100644 index 00000000..8391f627 --- /dev/null +++ b/benchmarks/tpch/queries/q10.sql @@ -0,0 +1,33 @@ +-- Benchmark Query 10 derived from TPC-H query 10 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + c_custkey, + c_name, + sum(l_extendedprice * (1 - l_discount)) as revenue, + c_acctbal, + n_name, + c_address, + c_phone, + c_comment +from + customer, + orders, + lineitem, + nation +where + c_custkey = o_custkey + and l_orderkey = o_orderkey + and o_orderdate >= date '1993-07-01' + and o_orderdate < date '1993-07-01' + interval '3' month + and l_returnflag = 'R' + and c_nationkey = n_nationkey +group by + c_custkey, + c_name, + c_acctbal, + c_phone, + n_name, + c_address, + c_comment +order by + revenue desc limit 20; diff --git a/benchmarks/tpch/queries/q11.sql b/benchmarks/tpch/queries/q11.sql new file mode 100644 index 00000000..58776d36 --- /dev/null +++ b/benchmarks/tpch/queries/q11.sql @@ -0,0 +1,29 @@ +-- Benchmark Query 11 derived from TPC-H query 11 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + ps_partkey, + sum(ps_supplycost * ps_availqty) as value +from + partsupp, + supplier, + nation +where + ps_suppkey = s_suppkey + and s_nationkey = n_nationkey + and n_name = 'ALGERIA' +group by + ps_partkey having + sum(ps_supplycost * ps_availqty) > ( + select + sum(ps_supplycost * ps_availqty) * 0.0001000000 + from + partsupp, + supplier, + nation + where + ps_suppkey = s_suppkey + and s_nationkey = n_nationkey + and n_name = 'ALGERIA' + ) +order by + value desc; diff --git a/benchmarks/tpch/queries/q12.sql b/benchmarks/tpch/queries/q12.sql new file mode 100644 index 00000000..0b973de9 --- /dev/null +++ b/benchmarks/tpch/queries/q12.sql @@ -0,0 +1,30 @@ +-- Benchmark Query 12 derived from TPC-H query 12 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + l_shipmode, + sum(case + when o_orderpriority = '1-URGENT' + or o_orderpriority = '2-HIGH' + then 1 + else 0 + end) as high_line_count, + sum(case + when o_orderpriority <> '1-URGENT' + and o_orderpriority <> '2-HIGH' + then 1 + else 0 + end) as low_line_count +from + orders, + lineitem +where + o_orderkey = l_orderkey + and l_shipmode in ('FOB', 'SHIP') + and l_commitdate < l_receiptdate + and l_shipdate < l_commitdate + and l_receiptdate >= date '1995-01-01' + and l_receiptdate < date '1995-01-01' + interval '1' year +group by + l_shipmode +order by + l_shipmode; diff --git a/benchmarks/tpch/queries/q13.sql b/benchmarks/tpch/queries/q13.sql new file mode 100644 index 00000000..145dd6f1 --- /dev/null +++ b/benchmarks/tpch/queries/q13.sql @@ -0,0 +1,22 @@ +-- Benchmark Query 13 derived from TPC-H query 13 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + c_count, + count(*) as custdist +from + ( + select + c_custkey, + count(o_orderkey) + from + customer left outer join orders on + c_custkey = o_custkey + and o_comment not like '%express%requests%' + group by + c_custkey + ) as c_orders (c_custkey, c_count) +group by + c_count +order by + custdist desc, + c_count desc; diff --git a/benchmarks/tpch/queries/q14.sql b/benchmarks/tpch/queries/q14.sql new file mode 100644 index 00000000..1a91a04d --- /dev/null +++ b/benchmarks/tpch/queries/q14.sql @@ -0,0 +1,15 @@ +-- Benchmark Query 14 derived from TPC-H query 14 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + 100.00 * sum(case + when p_type like 'PROMO%' + then l_extendedprice * (1 - l_discount) + else 0 + end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue +from + lineitem, + part +where + l_partkey = p_partkey + and l_shipdate >= date '1995-02-01' + and l_shipdate < date '1995-02-01' + interval '1' month; diff --git a/benchmarks/tpch/queries/q15.sql b/benchmarks/tpch/queries/q15.sql new file mode 100644 index 00000000..68cc32cb --- /dev/null +++ b/benchmarks/tpch/queries/q15.sql @@ -0,0 +1,33 @@ +-- Benchmark Query 15 derived from TPC-H query 15 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +create view revenue0 (supplier_no, total_revenue) as + select + l_suppkey, + sum(l_extendedprice * (1 - l_discount)) + from + lineitem + where + l_shipdate >= date '1996-08-01' + and l_shipdate < date '1996-08-01' + interval '3' month + group by + l_suppkey; +select + s_suppkey, + s_name, + s_address, + s_phone, + total_revenue +from + supplier, + revenue0 +where + s_suppkey = supplier_no + and total_revenue = ( + select + max(total_revenue) + from + revenue0 + ) +order by + s_suppkey; +drop view revenue0; diff --git a/benchmarks/tpch/queries/q16.sql b/benchmarks/tpch/queries/q16.sql new file mode 100644 index 00000000..098b4f3b --- /dev/null +++ b/benchmarks/tpch/queries/q16.sql @@ -0,0 +1,32 @@ +-- Benchmark Query 16 derived from TPC-H query 16 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + p_brand, + p_type, + p_size, + count(distinct ps_suppkey) as supplier_cnt +from + partsupp, + part +where + p_partkey = ps_partkey + and p_brand <> 'Brand#14' + and p_type not like 'SMALL PLATED%' + and p_size in (14, 6, 5, 31, 49, 15, 41, 47) + and ps_suppkey not in ( + select + s_suppkey + from + supplier + where + s_comment like '%Customer%Complaints%' + ) +group by + p_brand, + p_type, + p_size +order by + supplier_cnt desc, + p_brand, + p_type, + p_size; diff --git a/benchmarks/tpch/queries/q17.sql b/benchmarks/tpch/queries/q17.sql new file mode 100644 index 00000000..ed02d7b7 --- /dev/null +++ b/benchmarks/tpch/queries/q17.sql @@ -0,0 +1,19 @@ +-- Benchmark Query 17 derived from TPC-H query 17 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + sum(l_extendedprice) / 7.0 as avg_yearly +from + lineitem, + part +where + p_partkey = l_partkey + and p_brand = 'Brand#42' + and p_container = 'LG BAG' + and l_quantity < ( + select + 0.2 * avg(l_quantity) + from + lineitem + where + l_partkey = p_partkey + ); diff --git a/benchmarks/tpch/queries/q18.sql b/benchmarks/tpch/queries/q18.sql new file mode 100644 index 00000000..cf1f8c89 --- /dev/null +++ b/benchmarks/tpch/queries/q18.sql @@ -0,0 +1,34 @@ +-- Benchmark Query 18 derived from TPC-H query 18 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + c_name, + c_custkey, + o_orderkey, + o_orderdate, + o_totalprice, + sum(l_quantity) +from + customer, + orders, + lineitem +where + o_orderkey in ( + select + l_orderkey + from + lineitem + group by + l_orderkey having + sum(l_quantity) > 313 + ) + and c_custkey = o_custkey + and o_orderkey = l_orderkey +group by + c_name, + c_custkey, + o_orderkey, + o_orderdate, + o_totalprice +order by + o_totalprice desc, + o_orderdate limit 100; diff --git a/benchmarks/tpch/queries/q19.sql b/benchmarks/tpch/queries/q19.sql new file mode 100644 index 00000000..3968f0d2 --- /dev/null +++ b/benchmarks/tpch/queries/q19.sql @@ -0,0 +1,37 @@ +-- Benchmark Query 19 derived from TPC-H query 19 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + sum(l_extendedprice* (1 - l_discount)) as revenue +from + lineitem, + part +where + ( + p_partkey = l_partkey + and p_brand = 'Brand#21' + and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') + and l_quantity >= 8 and l_quantity <= 8 + 10 + and p_size between 1 and 5 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ) + or + ( + p_partkey = l_partkey + and p_brand = 'Brand#13' + and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') + and l_quantity >= 20 and l_quantity <= 20 + 10 + and p_size between 1 and 10 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ) + or + ( + p_partkey = l_partkey + and p_brand = 'Brand#52' + and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') + and l_quantity >= 30 and l_quantity <= 30 + 10 + and p_size between 1 and 15 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ); diff --git a/benchmarks/tpch/queries/q2.sql b/benchmarks/tpch/queries/q2.sql new file mode 100644 index 00000000..46ec5d23 --- /dev/null +++ b/benchmarks/tpch/queries/q2.sql @@ -0,0 +1,45 @@ +-- Benchmark Query 2 derived from TPC-H query 2 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + s_acctbal, + s_name, + n_name, + p_partkey, + p_mfgr, + s_address, + s_phone, + s_comment +from + part, + supplier, + partsupp, + nation, + region +where + p_partkey = ps_partkey + and s_suppkey = ps_suppkey + and p_size = 48 + and p_type like '%TIN' + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'ASIA' + and ps_supplycost = ( + select + min(ps_supplycost) + from + partsupp, + supplier, + nation, + region + where + p_partkey = ps_partkey + and s_suppkey = ps_suppkey + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'ASIA' + ) +order by + s_acctbal desc, + n_name, + s_name, + p_partkey limit 100; diff --git a/benchmarks/tpch/queries/q20.sql b/benchmarks/tpch/queries/q20.sql new file mode 100644 index 00000000..5bb16563 --- /dev/null +++ b/benchmarks/tpch/queries/q20.sql @@ -0,0 +1,39 @@ +-- Benchmark Query 20 derived from TPC-H query 20 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + s_name, + s_address +from + supplier, + nation +where + s_suppkey in ( + select + ps_suppkey + from + partsupp + where + ps_partkey in ( + select + p_partkey + from + part + where + p_name like 'blanched%' + ) + and ps_availqty > ( + select + 0.5 * sum(l_quantity) + from + lineitem + where + l_partkey = ps_partkey + and l_suppkey = ps_suppkey + and l_shipdate >= date '1993-01-01' + and l_shipdate < date '1993-01-01' + interval '1' year + ) + ) + and s_nationkey = n_nationkey + and n_name = 'KENYA' +order by + s_name; diff --git a/benchmarks/tpch/queries/q21.sql b/benchmarks/tpch/queries/q21.sql new file mode 100644 index 00000000..6f84b876 --- /dev/null +++ b/benchmarks/tpch/queries/q21.sql @@ -0,0 +1,41 @@ +-- Benchmark Query 21 derived from TPC-H query 21 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + s_name, + count(*) as numwait +from + supplier, + lineitem l1, + orders, + nation +where + s_suppkey = l1.l_suppkey + and o_orderkey = l1.l_orderkey + and o_orderstatus = 'F' + and l1.l_receiptdate > l1.l_commitdate + and exists ( + select + * + from + lineitem l2 + where + l2.l_orderkey = l1.l_orderkey + and l2.l_suppkey <> l1.l_suppkey + ) + and not exists ( + select + * + from + lineitem l3 + where + l3.l_orderkey = l1.l_orderkey + and l3.l_suppkey <> l1.l_suppkey + and l3.l_receiptdate > l3.l_commitdate + ) + and s_nationkey = n_nationkey + and n_name = 'ARGENTINA' +group by + s_name +order by + numwait desc, + s_name limit 100; diff --git a/benchmarks/tpch/queries/q22.sql b/benchmarks/tpch/queries/q22.sql new file mode 100644 index 00000000..65ea49b0 --- /dev/null +++ b/benchmarks/tpch/queries/q22.sql @@ -0,0 +1,39 @@ +-- Benchmark Query 22 derived from TPC-H query 22 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + cntrycode, + count(*) as numcust, + sum(c_acctbal) as totacctbal +from + ( + select + substring(c_phone from 1 for 2) as cntrycode, + c_acctbal + from + customer + where + substring(c_phone from 1 for 2) in + ('24', '34', '16', '30', '33', '14', '13') + and c_acctbal > ( + select + avg(c_acctbal) + from + customer + where + c_acctbal > 0.00 + and substring(c_phone from 1 for 2) in + ('24', '34', '16', '30', '33', '14', '13') + ) + and not exists ( + select + * + from + orders + where + o_custkey = c_custkey + ) + ) as custsale +group by + cntrycode +order by + cntrycode; diff --git a/benchmarks/tpch/queries/q3.sql b/benchmarks/tpch/queries/q3.sql new file mode 100644 index 00000000..161f2e1e --- /dev/null +++ b/benchmarks/tpch/queries/q3.sql @@ -0,0 +1,24 @@ +-- Benchmark Query 3 derived from TPC-H query 3 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + l_orderkey, + sum(l_extendedprice * (1 - l_discount)) as revenue, + o_orderdate, + o_shippriority +from + customer, + orders, + lineitem +where + c_mktsegment = 'BUILDING' + and c_custkey = o_custkey + and l_orderkey = o_orderkey + and o_orderdate < date '1995-03-15' + and l_shipdate > date '1995-03-15' +group by + l_orderkey, + o_orderdate, + o_shippriority +order by + revenue desc, + o_orderdate limit 10; diff --git a/benchmarks/tpch/queries/q4.sql b/benchmarks/tpch/queries/q4.sql new file mode 100644 index 00000000..e444dbfc --- /dev/null +++ b/benchmarks/tpch/queries/q4.sql @@ -0,0 +1,23 @@ +-- Benchmark Query 4 derived from TPC-H query 4 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + o_orderpriority, + count(*) as order_count +from + orders +where + o_orderdate >= date '1995-04-01' + and o_orderdate < date '1995-04-01' + interval '3' month + and exists ( + select + * + from + lineitem + where + l_orderkey = o_orderkey + and l_commitdate < l_receiptdate + ) +group by + o_orderpriority +order by + o_orderpriority; diff --git a/benchmarks/tpch/queries/q5.sql b/benchmarks/tpch/queries/q5.sql new file mode 100644 index 00000000..4426bd24 --- /dev/null +++ b/benchmarks/tpch/queries/q5.sql @@ -0,0 +1,26 @@ +-- Benchmark Query 5 derived from TPC-H query 5 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + n_name, + sum(l_extendedprice * (1 - l_discount)) as revenue +from + customer, + orders, + lineitem, + supplier, + nation, + region +where + c_custkey = o_custkey + and l_orderkey = o_orderkey + and l_suppkey = s_suppkey + and c_nationkey = s_nationkey + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'AFRICA' + and o_orderdate >= date '1994-01-01' + and o_orderdate < date '1994-01-01' + interval '1' year +group by + n_name +order by + revenue desc; diff --git a/benchmarks/tpch/queries/q6.sql b/benchmarks/tpch/queries/q6.sql new file mode 100644 index 00000000..3d6e51cf --- /dev/null +++ b/benchmarks/tpch/queries/q6.sql @@ -0,0 +1,11 @@ +-- Benchmark Query 6 derived from TPC-H query 6 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + sum(l_extendedprice * l_discount) as revenue +from + lineitem +where + l_shipdate >= date '1994-01-01' + and l_shipdate < date '1994-01-01' + interval '1' year + and l_discount between 0.04 - 0.01 and 0.04 + 0.01 + and l_quantity < 24; diff --git a/benchmarks/tpch/queries/q7.sql b/benchmarks/tpch/queries/q7.sql new file mode 100644 index 00000000..6e36ad61 --- /dev/null +++ b/benchmarks/tpch/queries/q7.sql @@ -0,0 +1,41 @@ +-- Benchmark Query 7 derived from TPC-H query 7 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + supp_nation, + cust_nation, + l_year, + sum(volume) as revenue +from + ( + select + n1.n_name as supp_nation, + n2.n_name as cust_nation, + extract(year from l_shipdate) as l_year, + l_extendedprice * (1 - l_discount) as volume + from + supplier, + lineitem, + orders, + customer, + nation n1, + nation n2 + where + s_suppkey = l_suppkey + and o_orderkey = l_orderkey + and c_custkey = o_custkey + and s_nationkey = n1.n_nationkey + and c_nationkey = n2.n_nationkey + and ( + (n1.n_name = 'GERMANY' and n2.n_name = 'IRAQ') + or (n1.n_name = 'IRAQ' and n2.n_name = 'GERMANY') + ) + and l_shipdate between date '1995-01-01' and date '1996-12-31' + ) as shipping +group by + supp_nation, + cust_nation, + l_year +order by + supp_nation, + cust_nation, + l_year; diff --git a/benchmarks/tpch/queries/q8.sql b/benchmarks/tpch/queries/q8.sql new file mode 100644 index 00000000..e28235ed --- /dev/null +++ b/benchmarks/tpch/queries/q8.sql @@ -0,0 +1,39 @@ +-- Benchmark Query 8 derived from TPC-H query 8 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + o_year, + sum(case + when nation = 'IRAQ' then volume + else 0 + end) / sum(volume) as mkt_share +from + ( + select + extract(year from o_orderdate) as o_year, + l_extendedprice * (1 - l_discount) as volume, + n2.n_name as nation + from + part, + supplier, + lineitem, + orders, + customer, + nation n1, + nation n2, + region + where + p_partkey = l_partkey + and s_suppkey = l_suppkey + and l_orderkey = o_orderkey + and o_custkey = c_custkey + and c_nationkey = n1.n_nationkey + and n1.n_regionkey = r_regionkey + and r_name = 'MIDDLE EAST' + and s_nationkey = n2.n_nationkey + and o_orderdate between date '1995-01-01' and date '1996-12-31' + and p_type = 'LARGE PLATED STEEL' + ) as all_nations +group by + o_year +order by + o_year; diff --git a/benchmarks/tpch/queries/q9.sql b/benchmarks/tpch/queries/q9.sql new file mode 100644 index 00000000..86ae0248 --- /dev/null +++ b/benchmarks/tpch/queries/q9.sql @@ -0,0 +1,34 @@ +-- Benchmark Query 9 derived from TPC-H query 9 under the terms of the TPC Fair Use Policy. +-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council. +select + nation, + o_year, + sum(amount) as sum_profit +from + ( + select + n_name as nation, + extract(year from o_orderdate) as o_year, + l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount + from + part, + supplier, + lineitem, + partsupp, + orders, + nation + where + s_suppkey = l_suppkey + and ps_suppkey = l_suppkey + and ps_partkey = l_partkey + and p_partkey = l_partkey + and o_orderkey = l_orderkey + and s_nationkey = n_nationkey + and p_name like '%moccasin%' + ) as profit +group by + nation, + o_year +order by + nation, + o_year desc; diff --git a/benchmarks/tpch/tpch-gen.sh b/benchmarks/tpch/tpch-gen.sh new file mode 100755 index 00000000..e27472a3 --- /dev/null +++ b/benchmarks/tpch/tpch-gen.sh @@ -0,0 +1,53 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +mkdir -p data/answers 2>/dev/null + +set -e + +#pushd .. +#. ./dev/build-set-env.sh +#popd + +# Generate data into the ./data directory if it does not already exist +FILE=./data/supplier.tbl +if test -f "$FILE"; then + echo "$FILE exists." +else + docker run -v `pwd`/data:/data -it --rm ghcr.io/databloom-ai/tpch-docker:main -vf -s $1 + + # workaround for https://github.com/apache/arrow-datafusion/issues/6147 + mv data/customer.tbl data/customer.csv + mv data/lineitem.tbl data/lineitem.csv + mv data/nation.tbl data/nation.csv + mv data/orders.tbl data/orders.csv + mv data/part.tbl data/part.csv + mv data/partsupp.tbl data/partsupp.csv + mv data/region.tbl data/region.csv + mv data/supplier.tbl data/supplier.csv + + ls -l data +fi + +# Copy expected answers (at SF=1) into the ./data/answers directory if it does not already exist +FILE=./data/answers/q1.out +if test -f "$FILE"; then + echo "$FILE exists." +else + docker run -v `pwd`/data:/data -it --entrypoint /bin/bash --rm ghcr.io/databloom-ai/tpch-docker:main -c "cp /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/" +fi \ No newline at end of file diff --git a/benchmarks/tpch/tpch.py b/benchmarks/tpch/tpch.py new file mode 100644 index 00000000..ea830a1f --- /dev/null +++ b/benchmarks/tpch/tpch.py @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import argparse +from datafusion import SessionContext +import time + + +def bench(data_path, query_path): + with open("results.csv", "w") as results: + # register tables + start = time.time() + total_time_millis = 0 + + # create context + # runtime = ( + # RuntimeConfig() + # .with_disk_manager_os() + # .with_fair_spill_pool(10000000) + # ) + # config = ( + # SessionConfig() + # .with_create_default_catalog_and_schema(True) + # .with_default_catalog_and_schema("datafusion", "tpch") + # .with_information_schema(True) + # ) + # ctx = SessionContext(config, runtime) + + ctx = SessionContext() + print("Configuration:\n", ctx) + + # register tables + with open("create_tables.sql") as f: + sql = "" + for line in f.readlines(): + if line.startswith("--"): + continue + sql = sql + line + if sql.strip().endswith(";"): + sql = sql.strip().replace("$PATH", data_path) + ctx.sql(sql) + sql = "" + + end = time.time() + time_millis = (end - start) * 1000 + total_time_millis += time_millis + print("setup,{}".format(round(time_millis, 1))) + results.write("setup,{}\n".format(round(time_millis, 1))) + results.flush() + + # run queries + for query in range(1, 23): + with open("{}/q{}.sql".format(query_path, query)) as f: + text = f.read() + tmp = text.split(";") + queries = [] + for str in tmp: + if len(str.strip()) > 0: + queries.append(str.strip()) + + try: + start = time.time() + for sql in queries: + print(sql) + df = ctx.sql(sql) + # result_set = df.collect() + df.show() + end = time.time() + time_millis = (end - start) * 1000 + total_time_millis += time_millis + print("q{},{}".format(query, round(time_millis, 1))) + results.write( + "q{},{}\n".format(query, round(time_millis, 1)) + ) + results.flush() + except Exception as e: + print("query", query, "failed", e) + + print("total,{}".format(round(total_time_millis, 1))) + results.write("total,{}\n".format(round(total_time_millis, 1))) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("data_path") + parser.add_argument("query_path") + args = parser.parse_args() + bench(args.data_path, args.query_path) diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py index cd78f3c8..221b0cc0 100644 --- a/datafusion/tests/test_dataframe.py +++ b/datafusion/tests/test_dataframe.py @@ -370,6 +370,9 @@ def test_execution_plan(aggregate_df): assert expected == plan.display() + # Check the number of partitions is as expected. + assert type(plan.partition_count) is int + expected = ( "ProjectionExec: expr=[c1@0 as c1, SUM(test.c2)@1 as SUM(test.c2)]\n" " Aggregate: groupBy=[[test.c1]], aggr=[[SUM(test.c2)]]\n" diff --git a/datafusion/tests/test_substrait.py b/datafusion/tests/test_substrait.py index 9a08b760..01df2d74 100644 --- a/datafusion/tests/test_substrait.py +++ b/datafusion/tests/test_substrait.py @@ -41,6 +41,8 @@ def test_substrait_serialization(ctx): substrait_plan = ss.substrait.serde.serialize_to_plan( "SELECT * FROM t", ctx ) + substrait_bytes = substrait_plan.encode() + assert type(substrait_bytes) is bytes substrait_bytes = ss.substrait.serde.serialize_bytes( "SELECT * FROM t", ctx ) diff --git a/dev/python_lint.sh b/dev/python_lint.sh index 94934629..3bc67fb1 100755 --- a/dev/python_lint.sh +++ b/dev/python_lint.sh @@ -22,5 +22,5 @@ set -e source venv/bin/activate -flake8 --exclude venv --ignore=E501,W503 +flake8 --exclude venv,benchmarks/db-benchmark --ignore=E501,W503 black --line-length 79 . diff --git a/dev/release/rat_exclude_files.txt b/dev/release/rat_exclude_files.txt index c7754f35..6d0fee18 100644 --- a/dev/release/rat_exclude_files.txt +++ b/dev/release/rat_exclude_files.txt @@ -42,4 +42,6 @@ Cargo.lock .history *rat.txt */.git -.github/* \ No newline at end of file +.github/* +benchmarks/tpch/queries/q*.sql +benchmarks/tpch/create_tables.sql \ No newline at end of file diff --git a/docs/mdbook/README.md b/docs/mdbook/README.md new file mode 100644 index 00000000..6dae6bc6 --- /dev/null +++ b/docs/mdbook/README.md @@ -0,0 +1,33 @@ + +# DataFusion Book + +This folder builds a DataFusion user guide using [mdBook](https://github.com/rust-lang/mdBook). + +## Build and run book locally + +Build the latest files with `mdbook build`. + +Open the book locally by running `open book/index.html`. + +## Install mdBook + +Download the `mdbook` binary or run `cargo install mdbook`. + +Then manually open it, so you have permissions to run it on your Mac. + +Add it to your path with a command like this so you can easily run the commands: `mv ~/Downloads/mdbook /Users/matthew.powers/.local/bin`. diff --git a/docs/mdbook/book.toml b/docs/mdbook/book.toml new file mode 100644 index 00000000..089cb9a9 --- /dev/null +++ b/docs/mdbook/book.toml @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[book] +authors = ["Apache Arrow "] +language = "en" +multilingual = false +src = "src" +title = "DataFusion Book" diff --git a/docs/mdbook/src/SUMMARY.md b/docs/mdbook/src/SUMMARY.md new file mode 100644 index 00000000..23467ed4 --- /dev/null +++ b/docs/mdbook/src/SUMMARY.md @@ -0,0 +1,25 @@ + +# Summary + +- [Index](./index.md) +- [Installation](./installation.md) +- [Quickstart](./quickstart.md) +- [Usage](./usage/index.md) + - [Create a table](./usage/create-table.md) + - [Query a table](./usage/query-table.md) + - [Viewing Query Plans](./usage/query-plans.md) \ No newline at end of file diff --git a/docs/mdbook/src/images/datafusion-jupyterlab.png b/docs/mdbook/src/images/datafusion-jupyterlab.png new file mode 100644 index 00000000..c4d46884 Binary files /dev/null and b/docs/mdbook/src/images/datafusion-jupyterlab.png differ diff --git a/docs/mdbook/src/images/plan.svg b/docs/mdbook/src/images/plan.svg new file mode 100644 index 00000000..92714798 --- /dev/null +++ b/docs/mdbook/src/images/plan.svg @@ -0,0 +1,111 @@ + + + + + + +%3 + + +cluster_1 + +LogicalPlan + + +cluster_6 + +Detailed LogicalPlan + + + +2 + +Projection: my_table.a, SUM(my_table.b) + + + +3 + +Aggregate: groupBy=[[my_table.a]], aggr=[[SUM(my_table.b)]] + + + +2->3 + + + + + +4 + +Filter: my_table.a < Int64(3) + + + +3->4 + + + + + +5 + +TableScan: my_table + + + +4->5 + + + + + +7 + +Projection: my_table.a, SUM(my_table.b) +Schema: [a:Int64;N, SUM(my_table.b):Int64;N] + + + +8 + +Aggregate: groupBy=[[my_table.a]], aggr=[[SUM(my_table.b)]] +Schema: [a:Int64;N, SUM(my_table.b):Int64;N] + + + +7->8 + + + + + +9 + +Filter: my_table.a < Int64(3) +Schema: [a:Int64;N, b:Int64;N] + + + +8->9 + + + + + +10 + +TableScan: my_table +Schema: [a:Int64;N, b:Int64;N] + + + +9->10 + + + + + diff --git a/docs/mdbook/src/index.md b/docs/mdbook/src/index.md new file mode 100644 index 00000000..3cd0fec1 --- /dev/null +++ b/docs/mdbook/src/index.md @@ -0,0 +1,43 @@ + +# DataFusion Book + +DataFusion is a blazing fast query engine that lets you run data analyses quickly and reliably. + +DataFusion is written in Rust, but also exposes Python and SQL bindings, so you can easily query data in your langauge of choice. You don't need to know any Rust to be a happy and productive user of DataFusion. + +DataFusion lets you run queries faster than pandas. Let's compare query runtimes for a 5GB CSV file with 100 million rows of data. + +Take a look at a few rows of the data: + +``` ++-------+-------+--------------+-----+-----+-------+----+----+-----------+ +| id1 | id2 | id3 | id4 | id5 | id6 | v1 | v2 | v3 | ++-------+-------+--------------+-----+-----+-------+----+----+-----------+ +| id016 | id016 | id0000042202 | 15 | 24 | 5971 | 5 | 11 | 37.211254 | +| id039 | id045 | id0000029558 | 40 | 49 | 39457 | 5 | 4 | 48.951141 | +| id047 | id023 | id0000071286 | 68 | 20 | 74463 | 2 | 14 | 60.469241 | ++-------+-------+--------------+-----+-----+-------+----+----+-----------+ +``` + +Suppose you'd like to run the following query: `SELECT id1, sum(v1) AS v1 from the_table GROUP BY id1`. + +If you use pandas, then this query will take 43.6 seconds to execute. + +It only takes DataFusion 9.8 seconds to execute the same query. + +DataFusion is easy to use, powerful, and fast. Let's learn more! diff --git a/docs/mdbook/src/installation.md b/docs/mdbook/src/installation.md new file mode 100644 index 00000000..ba00c8b8 --- /dev/null +++ b/docs/mdbook/src/installation.md @@ -0,0 +1,62 @@ + +# Installation + +DataFusion is easy to install, just like any other Python library. + +## Using pip + +``` bash +pip install datafusion +``` + +## Conda & JupyterLab setup + +This section explains how to install DataFusion in a conda environment with other libraries that allow for a nice Jupyter workflow. This setup is completely optional. These steps are only needed if you'd like to run DataFusion in a Jupyter notebook and have an interface like this: + +![DataFusion in Jupyter](https://github.com/MrPowers/datafusion-book/raw/main/src/images/datafusion-jupyterlab.png) + +Create a conda environment with DataFusion, Jupyter, and other useful dependencies in the `datafusion-env.yml` file: + +``` +name: datafusion-env +channels: + - conda-forge + - defaults +dependencies: + - python=3.9 + - ipykernel + - nb_conda + - jupyterlab + - jupyterlab_code_formatter + - isort + - black + - pip + - pip: + - datafusion + +``` + +Create the environment with `conda env create -f datafusion-env.yml`. + +Activate the environment with `conda activate datafusion-env`. + +Run `jupyter lab` or open the [JupyterLab Desktop application](https://github.com/jupyterlab/jupyterlab-desktop) to start running DataFusion in a Jupyter notebook. + +## Examples + +See the [DataFusion Python Examples](https://github.com/apache/arrow-datafusion-python/tree/main/examples) for a variety of Python scripts that show DataFusion in action! diff --git a/docs/mdbook/src/quickstart.md b/docs/mdbook/src/quickstart.md new file mode 100644 index 00000000..bba0b36a --- /dev/null +++ b/docs/mdbook/src/quickstart.md @@ -0,0 +1,77 @@ + +# DataFusion Quickstart + +You can easily query a DataFusion table with the Python API or with pure SQL. + +Let's create a small DataFrame and then run some queries with both APIs. + +Start by creating a DataFrame with four rows of data and two columns: `a` and `b`. + +```python +from datafusion import SessionContext + +ctx = SessionContext() + +df = ctx.from_pydict({"a": [1, 2, 3, 1], "b": [4, 5, 6, 7]}, name="my_table") +``` + +Let's append a column to this DataFrame that adds columns `a` and `b` with the SQL API. + +``` +ctx.sql("select a, b, a + b as sum_a_b from my_table") + ++---+---+---------+ +| a | b | sum_a_b | ++---+---+---------+ +| 1 | 4 | 5 | +| 2 | 5 | 7 | +| 3 | 6 | 9 | +| 1 | 7 | 8 | ++---+---+---------+ +``` + +DataFusion makes it easy to run SQL queries on DataFrames. + +Now let's run the same query with the DataFusion Python API: + +```python +from datafusion import col + +df.select( + col("a"), + col("b"), + col("a") + col("b"), +) +``` + +We get the same result as before: + +``` ++---+---+-------------------------+ +| a | b | my_table.a + my_table.b | ++---+---+-------------------------+ +| 1 | 4 | 5 | +| 2 | 5 | 7 | +| 3 | 6 | 9 | +| 1 | 7 | 8 | ++---+---+-------------------------+ +``` + +DataFusion also allows you to query data with a well-designed Python interface. + +Python users have two great ways to query DataFusion tables. diff --git a/docs/mdbook/src/usage/create-table.md b/docs/mdbook/src/usage/create-table.md new file mode 100644 index 00000000..332863a1 --- /dev/null +++ b/docs/mdbook/src/usage/create-table.md @@ -0,0 +1,59 @@ + +# DataFusion Create Table + +It's easy to create DataFusion tables from a variety of data sources. + +## Create Table from Python Dictionary + +Here's how to create a DataFusion table from a Python dictionary: + +```python +from datafusion import SessionContext + +ctx = SessionContext() + +df = ctx.from_pydict({"a": [1, 2, 3, 1], "b": [4, 5, 6, 7]}, name="my_table") +``` + +Supplying the `name` parameter is optional. You only need to name the table if you'd like to query it with the SQL API. + +You can also create a DataFrame without a name that can be queried with the Python API: + +```python +from datafusion import SessionContext + +ctx = SessionContext() + +df = ctx.from_pydict({"a": [1, 2, 3, 1], "b": [4, 5, 6, 7]}) +``` + +## Create Table from CSV + +You can read a CSV into a DataFusion DataFrame. Here's how to read the `G1_1e8_1e2_0_0.csv` file into a table named `csv_1e8`: + +```python +ctx.register_csv("csv_1e8", "G1_1e8_1e2_0_0.csv") +``` + +## Create Table from Parquet + +You can read a Parquet file into a DataFusion DataFrame. Here's how to read the `yellow_tripdata_2021-01.parquet` file into a table named `taxi`. + +```python +ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet") +``` diff --git a/docs/mdbook/src/usage/index.md b/docs/mdbook/src/usage/index.md new file mode 100644 index 00000000..1ef4406f --- /dev/null +++ b/docs/mdbook/src/usage/index.md @@ -0,0 +1,25 @@ + +# Usage + +This section shows how to create DataFusion DataFrames from a variety of data sources like CSV files and Parquet files. + +You'll learn more about the SQL statements that are supported by DataFusion. + +You'll also learn about the DataFusion's Python API for querying data. + +The documentation will wrap up with a variety of real-world data processing tasks that are well suited for DataFusion. The lightning-fast speed and reliable execution makes DataFusion the best technology for a variety of data processing tasks. diff --git a/docs/mdbook/src/usage/query-plans.md b/docs/mdbook/src/usage/query-plans.md new file mode 100644 index 00000000..a39aa9e4 --- /dev/null +++ b/docs/mdbook/src/usage/query-plans.md @@ -0,0 +1,170 @@ + + +# DataFusion Query Plans + +DataFusion's `DataFrame` is a wrapper around a query plan. In this chapter we will learn how to view +logical and physical query plans for DataFrames. + +## Sample Data + +Let's go ahead and create a simple DataFrame. You can do this in the Python shell or in a notebook. + +```python +from datafusion import SessionContext + +ctx = SessionContext() + +df = ctx.from_pydict({"a": [1, 2, 3, 1], "b": [4, 5, 6, 7]}, name="my_table") +``` + +## Logical Plan + +Next, let's look at the logical plan for this dataframe. + +```python +>>> df.logical_plan() +TableScan: my_table +``` + +The logical plan here consists of a single `TableScan` operator. Let's make a more interesting plan by creating a new +`DataFrame` representing an aggregate query with a filter. + +```python +>>> df = ctx.sql("SELECT a, sum(b) FROM my_table WHERE a < 3 GROUP BY a") +``` + +When we view the plan for this `DataFrame` we can see that there are now four operators in the plan, each +representing a logical transformation of the data. We start with a `TableScan` to read the data, followed by +a `Filter` to filter out rows that do not match the filter expression, then an `Aggregate` is performed. Finally, +a `Projection` is applied to ensure that the order of the final columns matches the `SELECT` part of the SQL query. + +```python +>>> df.logical_plan() +Projection: my_table.a, SUM(my_table.b) + Aggregate: groupBy=[[my_table.a]], aggr=[[SUM(my_table.b)]] + Filter: my_table.a < Int64(3) + TableScan: my_table +``` + +## Optimized Logical Plan + +DataFusion has a powerful query optimizer which will rewrite query plans to make them more efficient before they are +executed. We can view the output of the optimized by viewint the optimized logical plan. + +```python +>>> df.optimized_logical_plan() +Aggregate: groupBy=[[my_table.a]], aggr=[[SUM(my_table.b)]] + Filter: my_table.a < Int64(3) + TableScan: my_table projection=[a, b] +``` + +We can see that there are two key differences compared to the unoptimized logical plan: + +- The `Projection` has been removed because it was redundant in this case (the output of the `Aggregatge` plan + already had the columns in the correct order). +- The `TableScan` now has a projection pushed down so that it only reads the columns required to be able to execute + the query. In this case the table only has two columns and we referenced them both in the query, but this optimization + can be very effective in real-world queries against large tables. + +## Physical Plan + +Logical plans provide a representation of "what" the query should do it. Physical plans explain "how" the query +should be executed. + +We can view the physical plan (also known as an execution plan) using the `execution_plan` method. + +```python +>>> df.execution_plan() +AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[SUM(my_table.b)] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "a", index: 0 }], 48), input_partitions=48 + AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[SUM(my_table.b)] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: a@0 < 3 + RepartitionExec: partitioning=RoundRobinBatch(48), input_partitions=1 + MemoryExec: partitions=1, partition_sizes=[1] +``` + +The `TableScan` has now been replaced by a more specific `MemoryExec` for scanning the in-memory data. If we were +querying a CSV file on disk then we would expect to see a `CsvExec` instead. + +This plan has additional operators that were not in the logical plan: + +- `RepartionExec` has been added so that the data can be split into partitions and processed in parallel using + multiple cores. +- `CoalesceBatchesExec` will combine small batches into larger batches to ensure that processing remains efficient. + +The `Aggregate` operator now appears twice. This is because aggregates are performed in a two step process. Data is +aggregated within each partition in parallel and then those results (which could contain duplicate grouping keys) are +combined and the aggregate operations is applied again. + +## Creating Query Plan Diagrams + +DataFusion supports generating query plan diagrams in [DOT format](). + +DOT is a language for describing graphs and there are open source tools such as GraphViz that can render diagrams +from DOT files. + +We can use the following code to generate a DOT file for a logical query plan. + +```python +>>> diagram = df.logical_plan().display_graphviz() +>>> with open('plan.dot', 'w') as f: +>>> f.write(diagram) +``` + +If we view the view, we will see the following content. + +``` +// Begin DataFusion GraphViz Plan (see https://graphviz.org) +digraph { + subgraph cluster_1 + { + graph[label="LogicalPlan"] + 2[shape=box label="Projection: my_table.a, SUM(my_table.b)"] + 3[shape=box label="Aggregate: groupBy=[[my_table.a]], aggr=[[SUM(my_table.b)]]"] + 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back] + 4[shape=box label="Filter: my_table.a < Int64(3)"] + 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back] + 5[shape=box label="TableScan: my_table"] + 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back] + } + subgraph cluster_6 + { + graph[label="Detailed LogicalPlan"] + 7[shape=box label="Projection: my_table.a, SUM(my_table.b)\nSchema: [a:Int64;N, SUM(my_table.b):Int64;N]"] + 8[shape=box label="Aggregate: groupBy=[[my_table.a]], aggr=[[SUM(my_table.b)]]\nSchema: [a:Int64;N, SUM(my_table.b):Int64;N]"] + 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back] + 9[shape=box label="Filter: my_table.a < Int64(3)\nSchema: [a:Int64;N, b:Int64;N]"] + 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back] + 10[shape=box label="TableScan: my_table\nSchema: [a:Int64;N, b:Int64;N]"] + 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back] + } +} +// End DataFusion GraphViz Plan +``` + +We can use GraphViz from the command-line to convert this DOT file into an image. + +```bash +dot -Tsvg plan.dot > plan.svg +``` + +This generates the following diagram: + +![Query Plan Diagram](../images/plan.svg) diff --git a/docs/mdbook/src/usage/query-table.md b/docs/mdbook/src/usage/query-table.md new file mode 100644 index 00000000..5e4e3800 --- /dev/null +++ b/docs/mdbook/src/usage/query-table.md @@ -0,0 +1,125 @@ + +# DataFusion Query Table + +DataFusion tables can be queried with SQL or with the Python API. + +Let's create a small table and show the different types of queries that can be run. + +```python +df = ctx.from_pydict( + { + "first_name": ["li", "wang", "ron", "amanda"], + "age": [25, 75, 68, 18], + "country": ["china", "china", "us", "us"], + }, + name="some_people", +) +``` + +Here's the data in the table: + +``` ++------------+-----+---------+ +| first_name | age | country | ++------------+-----+---------+ +| li | 25 | china | +| wang | 75 | china | +| ron | 68 | us | +| amanda | 18 | us | ++------------+-----+---------+ +``` + +## DataFusion Filter DataFrame + +Here's how to find all individuals that are older than 65 years old in the data with SQL: + +``` +ctx.sql("select * from some_people where age > 65") + ++------------+-----+---------+ +| first_name | age | country | ++------------+-----+---------+ +| wang | 75 | china | +| ron | 68 | us | ++------------+-----+---------+ +``` + +Here's how to run the same query with Python: + +```python +df.filter(col("age") > lit(65)) +``` + +``` ++------------+-----+---------+ +| first_name | age | country | ++------------+-----+---------+ +| wang | 75 | china | +| ron | 68 | us | ++------------+-----+---------+ +``` + +## DataFusion Select Columns from DataFrame + +Here's how to select the `first_name` and `country` columns from the DataFrame with SQL: + +``` +ctx.sql("select first_name, country from some_people") + + ++------------+---------+ +| first_name | country | ++------------+---------+ +| li | china | +| wang | china | +| ron | us | +| amanda | us | ++------------+---------+ +``` + +Here's how to run the same query with Python: + +```python +df.select(col("first_name"), col("country")) +``` + +``` ++------------+---------+ +| first_name | country | ++------------+---------+ +| li | china | +| wang | china | +| ron | us | +| amanda | us | ++------------+---------+ +``` + +## DataFusion Aggregation Query + +Here's how to run a group by aggregation query: + +``` +ctx.sql("select country, count(*) as num_people from some_people group by country") + ++---------+------------+ +| country | num_people | ++---------+------------+ +| china | 2 | +| us | 2 | ++---------+------------+ +``` diff --git a/examples/substrait.py b/examples/substrait.py index c167f7d9..c579751d 100644 --- a/examples/substrait.py +++ b/examples/substrait.py @@ -23,7 +23,7 @@ ctx = SessionContext() # Register table with context -ctx.register_parquet( +ctx.register_csv( "aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv" ) @@ -32,8 +32,13 @@ ) # type(substrait_plan) -> +# Encode it to bytes +substrait_bytes = substrait_plan.encode() +# type(substrait_bytes) -> , at this point the bytes can be distributed to file, network, etc safely +# where they could subsequently be deserialized on the receiving end. + # Alternative serialization approaches -# type(substrait_bytes) -> , at this point the bytes can be distributed to file, network, etc safely +# type(substrait_bytes) -> , at this point the bytes can be distributed to file, network, etc safely # where they could subsequently be deserialized on the receiving end. substrait_bytes = ss.substrait.serde.serialize_bytes( "SELECT * FROM aggregate_test_data", ctx diff --git a/src/common/data_type.rs b/src/common/data_type.rs index a7b79f49..d55a0e86 100644 --- a/src/common/data_type.rs +++ b/src/common/data_type.rs @@ -21,6 +21,17 @@ use pyo3::prelude::*; use crate::errors::py_datafusion_err; +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[pyclass(name = "RexType", module = "datafusion.common")] +pub enum RexType { + Alias, + Literal, + Call, + Reference, + ScalarSubquery, + Other, +} + /// These bindings are tying together several disparate systems. /// You have SQL types for the SQL strings and RDBMS systems itself. /// Rust types for the DataFusion code @@ -33,12 +44,12 @@ use crate::errors::py_datafusion_err; #[derive(Debug, Clone)] #[pyclass(name = "DataTypeMap", module = "datafusion.common", subclass)] pub struct DataTypeMap { - #[allow(dead_code)] - arrow_type: PyDataType, - #[allow(dead_code)] - python_type: PythonType, - #[allow(dead_code)] - sql_type: SqlType, + #[pyo3(get, set)] + pub arrow_type: PyDataType, + #[pyo3(get, set)] + pub python_type: PythonType, + #[pyo3(get, set)] + pub sql_type: SqlType, } impl DataTypeMap { @@ -421,7 +432,7 @@ impl DataTypeMap { #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] #[pyclass(name = "DataType", module = "datafusion.common")] pub struct PyDataType { - data_type: DataType, + pub data_type: DataType, } impl From for DataType { diff --git a/src/context.rs b/src/context.rs index 3dc8a8fb..b603c010 100644 --- a/src/context.rs +++ b/src/context.rs @@ -55,7 +55,7 @@ use pyo3::types::PyTuple; use tokio::task::JoinHandle; /// Configuration options for a SessionContext -#[pyclass(name = "SessionConfig", module = "datafusion", subclass, unsendable)] +#[pyclass(name = "SessionConfig", module = "datafusion", subclass)] #[derive(Clone, Default)] pub(crate) struct PySessionConfig { pub(crate) config: SessionConfig, @@ -148,7 +148,7 @@ impl PySessionConfig { } /// Runtime options for a SessionContext -#[pyclass(name = "RuntimeConfig", module = "datafusion", subclass, unsendable)] +#[pyclass(name = "RuntimeConfig", module = "datafusion", subclass)] #[derive(Clone)] pub(crate) struct PyRuntimeConfig { pub(crate) config: RuntimeConfig, @@ -210,7 +210,7 @@ impl PyRuntimeConfig { /// `PySessionContext` is able to plan and execute DataFusion plans. /// It has a powerful optimizer, a physical planner for local execution, and a /// multi-threaded execution engine to perform the execution. -#[pyclass(name = "SessionContext", module = "datafusion", subclass, unsendable)] +#[pyclass(name = "SessionContext", module = "datafusion", subclass)] #[derive(Clone)] pub(crate) struct PySessionContext { pub(crate) ctx: SessionContext, @@ -224,7 +224,7 @@ impl PySessionContext { let config = if let Some(c) = config { c.config } else { - SessionConfig::default() + SessionConfig::default().with_information_schema(true) }; let runtime_config = if let Some(c) = runtime { c.config @@ -712,14 +712,7 @@ impl PySessionContext { part: usize, py: Python, ) -> PyResult { - let ctx = TaskContext::new( - None, - "session_id".to_string(), - SessionConfig::new(), - HashMap::new(), - HashMap::new(), - Arc::new(RuntimeEnv::default()), - ); + let ctx: TaskContext = TaskContext::from(&self.ctx.state()); // create a Tokio runtime to run the async code let rt = &get_tokio_runtime(py).0; let plan = plan.plan.clone(); diff --git a/src/errors.rs b/src/errors.rs index ce6b3c28..d12b6ade 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -21,6 +21,7 @@ use std::fmt::Debug; use datafusion::arrow::error::ArrowError; use datafusion::error::DataFusionError as InnerDataFusionError; +use prost::EncodeError; use pyo3::{exceptions::PyException, PyErr}; pub type Result = std::result::Result; @@ -31,6 +32,7 @@ pub enum DataFusionError { ArrowError(ArrowError), Common(String), PythonError(PyErr), + EncodeError(EncodeError), } impl fmt::Display for DataFusionError { @@ -40,6 +42,7 @@ impl fmt::Display for DataFusionError { DataFusionError::ArrowError(e) => write!(f, "Arrow error: {e:?}"), DataFusionError::PythonError(e) => write!(f, "Python error {e:?}"), DataFusionError::Common(e) => write!(f, "{e}"), + DataFusionError::EncodeError(e) => write!(f, "Failed to encode substrait plan: {e}"), } } } diff --git a/src/expr.rs b/src/expr.rs index 7c80d0d8..4ada4c16 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -22,6 +22,7 @@ use datafusion::arrow::datatypes::DataType; use datafusion::arrow::pyarrow::PyArrowType; use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField}; +use crate::common::data_type::RexType; use crate::errors::py_runtime_err; use crate::expr::aggregate_expr::PyAggregateFunction; use crate::expr::binary_expr::PyBinaryExpr; @@ -83,7 +84,7 @@ pub mod union; #[pyclass(name = "Expr", module = "datafusion.expr", subclass)] #[derive(Debug, Clone)] pub struct PyExpr { - pub(crate) expr: Expr, + pub expr: Expr, } impl From for Expr { @@ -228,6 +229,51 @@ impl PyExpr { let expr = Expr::Cast(Cast::new(Box::new(self.expr.clone()), to.0)); expr.into() } + + /// A Rex (Row Expression) specifies a single row of data. That specification + /// could include user defined functions or types. RexType identifies the row + /// as one of the possible valid `RexTypes`. + pub fn rex_type(&self) -> PyResult { + Ok(match self.expr { + Expr::Alias(..) => RexType::Alias, + Expr::Column(..) | Expr::QualifiedWildcard { .. } | Expr::GetIndexedField { .. } => { + RexType::Reference + } + Expr::ScalarVariable(..) | Expr::Literal(..) => RexType::Literal, + Expr::BinaryExpr { .. } + | Expr::Not(..) + | Expr::IsNotNull(..) + | Expr::Negative(..) + | Expr::IsNull(..) + | Expr::Like { .. } + | Expr::ILike { .. } + | Expr::SimilarTo { .. } + | Expr::Between { .. } + | Expr::Case { .. } + | Expr::Cast { .. } + | Expr::TryCast { .. } + | Expr::Sort { .. } + | Expr::ScalarFunction { .. } + | Expr::AggregateFunction { .. } + | Expr::WindowFunction { .. } + | Expr::AggregateUDF { .. } + | Expr::InList { .. } + | Expr::Wildcard + | Expr::ScalarUDF { .. } + | Expr::Exists { .. } + | Expr::InSubquery { .. } + | Expr::GroupingSet(..) + | Expr::IsTrue(..) + | Expr::IsFalse(..) + | Expr::IsUnknown(_) + | Expr::IsNotTrue(..) + | Expr::IsNotFalse(..) + | Expr::Placeholder { .. } + | Expr::OuterReferenceColumn(_, _) + | Expr::IsNotUnknown(_) => RexType::Call, + Expr::ScalarSubquery(..) => RexType::ScalarSubquery, + }) + } } /// Initializes the `expr` module to match the pattern of `datafusion-expr` https://docs.rs/datafusion-expr/latest/datafusion_expr/ diff --git a/src/lib.rs b/src/lib.rs index 0bb4d9a1..2512aefa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,7 +40,7 @@ mod dataset; mod dataset_exec; pub mod errors; #[allow(clippy::borrow_deref_ref)] -mod expr; +pub mod expr; #[allow(clippy::borrow_deref_ref)] mod functions; pub mod physical_plan; diff --git a/src/physical_plan.rs b/src/physical_plan.rs index 340d527f..6f02cefa 100644 --- a/src/physical_plan.rs +++ b/src/physical_plan.rs @@ -53,6 +53,15 @@ impl PyExecutionPlan { let d = displayable(self.plan.as_ref()); format!("{}", d.indent()) } + + fn __repr__(&self) -> String { + self.display_indent() + } + + #[getter] + pub fn partition_count(&self) -> usize { + self.plan.output_partitioning().partition_count() + } } impl From for Arc { diff --git a/src/sql/logical.rs b/src/sql/logical.rs index a22f269f..a75315d3 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -100,7 +100,7 @@ impl PyLogicalPlan { } fn display_graphviz(&self) -> String { - format!("{}", self.plan.display_indent_schema()) + format!("{}", self.plan.display_graphviz()) } } diff --git a/src/store.rs b/src/store.rs index 7d9bb751..542cfa92 100644 --- a/src/store.rs +++ b/src/store.rs @@ -32,12 +32,7 @@ pub enum StorageContexts { LocalFileSystem(PyLocalFileSystemContext), } -#[pyclass( - name = "LocalFileSystem", - module = "datafusion.store", - subclass, - unsendable -)] +#[pyclass(name = "LocalFileSystem", module = "datafusion.store", subclass)] #[derive(Debug, Clone)] pub struct PyLocalFileSystemContext { pub inner: Arc, @@ -63,12 +58,7 @@ impl PyLocalFileSystemContext { } } -#[pyclass( - name = "MicrosoftAzure", - module = "datafusion.store", - subclass, - unsendable -)] +#[pyclass(name = "MicrosoftAzure", module = "datafusion.store", subclass)] #[derive(Debug, Clone)] pub struct PyMicrosoftAzureContext { pub inner: Arc, @@ -140,12 +130,7 @@ impl PyMicrosoftAzureContext { } } -#[pyclass( - name = "GoogleCloud", - module = "datafusion.store", - subclass, - unsendable -)] +#[pyclass(name = "GoogleCloud", module = "datafusion.store", subclass)] #[derive(Debug, Clone)] pub struct PyGoogleCloudContext { pub inner: Arc, @@ -175,7 +160,7 @@ impl PyGoogleCloudContext { } } -#[pyclass(name = "AmazonS3", module = "datafusion.store", subclass, unsendable)] +#[pyclass(name = "AmazonS3", module = "datafusion.store", subclass)] #[derive(Debug, Clone)] pub struct PyAmazonS3Context { pub inner: Arc, diff --git a/src/substrait.rs b/src/substrait.rs index 2bde0112..5d2e7a48 100644 --- a/src/substrait.rs +++ b/src/substrait.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use pyo3::prelude::*; +use pyo3::{prelude::*, types::PyBytes}; use crate::context::PySessionContext; use crate::errors::{py_datafusion_err, DataFusionError}; @@ -25,13 +25,25 @@ use crate::utils::wait_for_future; use datafusion_substrait::logical_plan::{consumer, producer}; use datafusion_substrait::serializer; use datafusion_substrait::substrait::proto::Plan; +use prost::Message; -#[pyclass(name = "plan", module = "datafusion.substrait", subclass, unsendable)] +#[pyclass(name = "plan", module = "datafusion.substrait", subclass)] #[derive(Debug, Clone)] pub(crate) struct PyPlan { pub(crate) plan: Plan, } +#[pymethods] +impl PyPlan { + fn encode(&self, py: Python) -> PyResult { + let mut proto_bytes = Vec::::new(); + self.plan + .encode(&mut proto_bytes) + .map_err(|e| DataFusionError::EncodeError(e))?; + Ok(PyBytes::new(py, &proto_bytes).into()) + } +} + impl From for Plan { fn from(plan: PyPlan) -> Plan { plan.plan @@ -47,7 +59,7 @@ impl From for PyPlan { /// A PySubstraitSerializer is a representation of a Serializer that is capable of both serializing /// a `LogicalPlan` instance to Substrait Protobuf bytes and also deserialize Substrait Protobuf bytes /// to a valid `LogicalPlan` instance. -#[pyclass(name = "serde", module = "datafusion.substrait", subclass, unsendable)] +#[pyclass(name = "serde", module = "datafusion.substrait", subclass)] #[derive(Debug, Clone)] pub(crate) struct PySubstraitSerializer; @@ -63,16 +75,19 @@ impl PySubstraitSerializer { #[staticmethod] pub fn serialize_to_plan(sql: &str, ctx: PySessionContext, py: Python) -> PyResult { match PySubstraitSerializer::serialize_bytes(sql, ctx, py) { - Ok(proto_bytes) => PySubstraitSerializer::deserialize_bytes(proto_bytes, py), + Ok(proto_bytes) => { + let proto_bytes: &PyBytes = proto_bytes.as_ref(py).downcast().unwrap(); + PySubstraitSerializer::deserialize_bytes(proto_bytes.as_bytes().to_vec(), py) + } Err(e) => Err(py_datafusion_err(e)), } } #[staticmethod] - pub fn serialize_bytes(sql: &str, ctx: PySessionContext, py: Python) -> PyResult> { + pub fn serialize_bytes(sql: &str, ctx: PySessionContext, py: Python) -> PyResult { let proto_bytes: Vec = wait_for_future(py, serializer::serialize_bytes(sql, &ctx.ctx)) .map_err(DataFusionError::from)?; - Ok(proto_bytes) + Ok(PyBytes::new(py, &proto_bytes).into()) } #[staticmethod] @@ -90,12 +105,7 @@ impl PySubstraitSerializer { } } -#[pyclass( - name = "producer", - module = "datafusion.substrait", - subclass, - unsendable -)] +#[pyclass(name = "producer", module = "datafusion.substrait", subclass)] #[derive(Debug, Clone)] pub(crate) struct PySubstraitProducer; @@ -111,12 +121,7 @@ impl PySubstraitProducer { } } -#[pyclass( - name = "consumer", - module = "datafusion.substrait", - subclass, - unsendable -)] +#[pyclass(name = "consumer", module = "datafusion.substrait", subclass)] #[derive(Debug, Clone)] pub(crate) struct PySubstraitConsumer; @@ -136,6 +141,7 @@ impl PySubstraitConsumer { } pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?;