diff --git a/.github/workflows/client_api_check.yml b/.github/workflows/client_api_check.yml index 0e82244a8..8ba9f4fca 100644 --- a/.github/workflows/client_api_check.yml +++ b/.github/workflows/client_api_check.yml @@ -29,10 +29,6 @@ jobs: working-directory: ./libs/client-api run: cargo build --features "enable_brotli" - - name: Build ClientAPI WASM - working-directory: ./libs/client-api - run: wasm-pack build - - name: Check ClientAPI Dependencies working-directory: ./libs/client-api run: bash ../../script/client_api_deps_check.sh diff --git a/Cargo.lock b/Cargo.lock index 4e81bcd06..26a23c590 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,7 +76,7 @@ dependencies = [ "actix-service", "actix-tls", "actix-utils", - "ahash 0.8.7", + "ahash 0.8.11", "base64 0.21.7", "bitflags 2.4.2", "brotli", @@ -247,7 +247,7 @@ dependencies = [ "actix-tls", "actix-utils", "actix-web-codegen", - "ahash 0.8.7", + "ahash 0.8.11", "bytes", "bytestring", "cfg-if 1.0.0", @@ -421,9 +421,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.7" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if 1.0.0", "const-random", @@ -549,7 +549,6 @@ dependencies = [ "bincode", "getrandom 0.2.12", "reqwest 0.11.27", - "rust-s3", "serde", "serde_json", "serde_repr", @@ -605,6 +604,8 @@ dependencies = [ "async-stream", "async-trait", "authentication", + "aws-config", + "aws-sdk-s3", "brotli", "bytes", "chrono", @@ -639,6 +640,7 @@ dependencies = [ "once_cell", "opener", "openssl", + "percent-encoding", "pgvector", "pin-project", "prometheus-client", @@ -647,7 +649,6 @@ dependencies = [ "rcgen", "redis 0.25.2", "reqwest 0.11.27", - "rust-s3", "scraper", "secrecy", "semver", @@ -972,22 +973,6 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41e67cd8309bbd06cd603a9e693a784ac2e5d1e955f11286e355089fcab3047c" -[[package]] -name = "attohttpc" -version = "0.26.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f77d243921b0979fbbd728dd2d5162e68ac8252976797c24eb5b3a6af9090dc" -dependencies = [ - "http 0.2.11", - "log", - "native-tls", - "rustls 0.21.12", - "serde", - "serde_json", - "url", - "webpki-roots 0.25.3", -] - [[package]] name = "authentication" version = "0.1.0" @@ -1012,29 +997,378 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] -name = "aws-creds" -version = "0.36.0" +name = "aws-config" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "390ad3b77f3e21e01a4a0355865853b681daf1988510b0b15e31c0c4ae7eb0f6" +checksum = "2ac9889352d632214df943e26740c46a0f3da6e329fbd28164fe7ae1b061da7b" dependencies = [ - "attohttpc", - "home", - "log", - "quick-xml", - "rust-ini", - "serde", - "thiserror", + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http 0.2.11", + "hyper 0.14.28", + "ring 0.17.7", "time", + "tokio", + "tracing", "url", + "zeroize", ] [[package]] -name = "aws-region" -version = "0.25.4" +name = "aws-credential-types" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42fed2b9fca70f2908268d057a607f2a906f47edbf856ea8587de9038d264e22" +checksum = "e16838e6c9e12125face1c1eff1343c75e3ff540de98ff7ebd61874a89bcfeb9" dependencies = [ - "thiserror", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + +[[package]] +name = "aws-runtime" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36978815abdd7297662bf906adff132941a02ecf425bc78fac7d90653ce87560" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.11", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-s3" +version = "1.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99e06a6cd8592e486f29c8af427c4083286cee4ea0e4ae46a164a24d07ee19d5" +dependencies = [ + "ahash 0.8.11", + "aws-credential-types", + "aws-runtime", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "fastrand", + "hex", + "hmac", + "http 0.2.11", + "http-body 0.4.6", + "lru", + "once_cell", + "percent-encoding", + "regex-lite", + "sha2", + "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7833dd5b061741825b8531360789bbd74fc365674601d3e9a79914310be320f9" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.11", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c531346d4f36874b74ea82978a03011ab413b007b841029a8c30a48f18cc3f37" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.11", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca214135f34b4841050f6466d4a56743e02aa63169f1b5e77161043f20653400" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.11", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31eed8d45759b2c5fe7fd304dd70739060e9e0de509209036eabea14d0720cce" +dependencies = [ + "aws-credential-types", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "crypto-bigint 0.5.5", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.11", + "http 1.0.0", + "once_cell", + "p256", + "percent-encoding", + "ring 0.17.7", + "sha2", + "subtle", + "time", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-checksums" +version = "0.60.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5b30ea96823b8b25fb6471643a516e1bd475fd5575304e6240aea179f213216" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "crc32c", + "crc32fast", + "hex", + "http 0.2.11", + "http-body 0.4.6", + "md-5", + "pin-project-lite", + "sha1", + "sha2", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.60.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + +[[package]] +name = "aws-smithy-http" +version = "0.60.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a7de001a1b9a25601016d8057ea16e31a45fdca3751304c8edf4ad72e706c08" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.11", + "http-body 0.4.6", + "once_cell", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fbd61ceb3fe8a1cb7352e42689cec5335833cd9f94103a61e98f9bb61c64bb" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db83b08939838d18e33b5dbaf1a0f048f28c10bd28071ab7ce6f245451855414" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand", + "h2 0.3.26", + "http 0.2.11", + "http-body 0.4.6", + "http-body 1.0.0", + "httparse", + "hyper 0.14.28", + "hyper-rustls 0.24.2", + "once_cell", + "pin-project-lite", + "pin-utils", + "rustls 0.21.12", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b570ea39eb95bd32543f6e4032bce172cb6209b9bc8c83c770d08169e875afc" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.11", + "http 1.0.0", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe321a6b21f5d8eabd0ade9c55d3d0335f3c3157fc2b3e87f05f34b539e4df5" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.11", + "http 1.0.0", + "http-body 0.4.6", + "http-body 1.0.0", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d123fbc2a4adc3c301652ba8e149bf4bc1d1725affb9784eb20c953ace06bf55" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f734808d43702a67e57d478a12e227d4d038d0b90c9005a78c87890d3805922" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "http 0.2.11", + "rustc_version", + "tracing", ] [[package]] @@ -1174,6 +1508,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base16ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" + [[package]] name = "base64" version = "0.13.1" @@ -1198,6 +1538,16 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" @@ -1393,6 +1743,16 @@ dependencies = [ "serde", ] +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "bytestring" version = "1.3.1" @@ -1602,17 +1962,16 @@ dependencies = [ "brotli", "bytes", "chrono", + "client-api-entity", "client-websocket", "collab", - "collab-entity", "collab-rt-entity", "collab-rt-protocol", - "database-entity", "futures-core", "futures-util", "getrandom 0.2.12", "gotrue", - "gotrue-entity", + "infra", "mime", "parking_lot 0.12.1", "prost", @@ -1637,6 +1996,18 @@ dependencies = [ "yrs", ] +[[package]] +name = "client-api-entity" +version = "0.1.0" +dependencies = [ + "collab-entity", + "collab-rt-entity", + "database-entity", + "gotrue-entity", + "infra", + "shared-entity", +] + [[package]] name = "client-api-test" version = "0.1.0" @@ -2021,6 +2392,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32c" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.3.2" @@ -2115,6 +2495,28 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "crypto-bigint" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" +dependencies = [ + "generic-array", + "rand_core 0.6.4", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto-bigint" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -2246,6 +2648,7 @@ dependencies = [ "anyhow", "app-error", "async-trait", + "aws-sdk-s3", "base64 0.21.7", "bincode", "bytes", @@ -2256,7 +2659,6 @@ dependencies = [ "futures-util", "pgvector", "redis 0.25.2", - "rust-s3", "rust_decimal", "serde", "serde_json", @@ -2297,6 +2699,16 @@ dependencies = [ "byteorder", ] +[[package]] +name = "der" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de" +dependencies = [ + "const-oid", + "zeroize", +] + [[package]] name = "der" version = "0.7.8" @@ -2368,15 +2780,6 @@ dependencies = [ "syn 2.0.48", ] -[[package]] -name = "dlv-list" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" -dependencies = [ - "const-random", -] - [[package]] name = "dotenvy" version = "0.15.7" @@ -2398,6 +2801,18 @@ dependencies = [ "dtoa", ] +[[package]] +name = "ecdsa" +version = "0.14.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" +dependencies = [ + "der 0.6.1", + "elliptic-curve", + "rfc6979", + "signature 1.6.4", +] + [[package]] name = "ego-tree" version = "0.6.2" @@ -2413,6 +2828,26 @@ dependencies = [ "serde", ] +[[package]] +name = "elliptic-curve" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3" +dependencies = [ + "base16ct", + "crypto-bigint 0.4.9", + "der 0.6.1", + "digest", + "ff", + "generic-array", + "group", + "pkcs8 0.9.0", + "rand_core 0.6.4", + "sec1", + "subtle", + "zeroize", +] + [[package]] name = "email-encoding" version = "0.3.0" @@ -2540,6 +2975,16 @@ dependencies = [ "getrandom 0.2.12", ] +[[package]] +name = "ff" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d013fc25338cc558c5c2cfbad646908fb23591e2404481826742b651c9af7160" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "fiat-crypto" version = "0.2.5" @@ -2887,6 +3332,17 @@ dependencies = [ "spinning_top", ] +[[package]] +name = "group" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" +dependencies = [ + "ff", + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "h2" version = "0.3.26" @@ -2967,19 +3423,13 @@ dependencies = [ "ahash 0.7.7", ] -[[package]] -name = "hashbrown" -version = "0.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" - [[package]] name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" dependencies = [ - "ahash 0.8.7", + "ahash 0.8.11", "allocator-api2", ] @@ -3219,7 +3669,9 @@ dependencies = [ "futures-util", "http 0.2.11", "hyper 0.14.28", + "log", "rustls 0.21.12", + "rustls-native-certs", "tokio", "tokio-rustls 0.24.1", ] @@ -3419,9 +3871,11 @@ name = "infra" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "reqwest 0.11.27", "serde", "serde_json", + "tokio", "tracing", ] @@ -3660,6 +4114,15 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lru" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "mac" version = "0.1.1" @@ -3695,17 +4158,6 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" -[[package]] -name = "maybe-async" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f1b8c13cb1f814b634a96b2c725449fe7ed464a7b8781de8688be5ffbd3f305" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "md-5" version = "0.10.6" @@ -3765,15 +4217,6 @@ dependencies = [ "triomphe", ] -[[package]] -name = "minidom" -version = "0.15.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f45614075738ce1b77a1768912a60c0227525971b03e09122a05b8a34a2a6278" -dependencies = [ - "rxml", -] - [[package]] name = "minimal-lexical" version = "0.2.1" @@ -4082,14 +4525,10 @@ dependencies = [ ] [[package]] -name = "ordered-multimap" -version = "0.6.0" +name = "outref" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e" -dependencies = [ - "dlv-list", - "hashbrown 0.13.2", -] +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" [[package]] name = "overload" @@ -4097,6 +4536,17 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "p256" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" +dependencies = [ + "ecdsa", + "elliptic-curve", + "sha2", +] + [[package]] name = "parking_lot" version = "0.11.2" @@ -4368,9 +4818,19 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" dependencies = [ - "der", - "pkcs8", - "spki", + "der 0.7.8", + "pkcs8 0.10.2", + "spki 0.7.3", +] + +[[package]] +name = "pkcs8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" +dependencies = [ + "der 0.6.1", + "spki 0.6.0", ] [[package]] @@ -4379,8 +4839,8 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ - "der", - "spki", + "der 0.7.8", + "spki 0.7.3", ] [[package]] @@ -4721,16 +5181,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "quick-xml" -version = "0.30.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "quote" version = "1.0.35" @@ -4965,6 +5415,12 @@ dependencies = [ "regex-syntax 0.8.2", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -5085,13 +5541,24 @@ dependencies = [ "winreg 0.52.0", ] +[[package]] +name = "rfc6979" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb" +dependencies = [ + "crypto-bigint 0.4.9", + "hmac", + "zeroize", +] + [[package]] name = "rhai" version = "1.16.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3625f343d89990133d013e39c46e350915178cf94f1bec9f49b0cbef98a3e3c" dependencies = [ - "ahash 0.8.7", + "ahash 0.8.11", "bitflags 2.4.2", "instant", "num-traits", @@ -5193,61 +5660,14 @@ dependencies = [ "num-integer", "num-traits", "pkcs1", - "pkcs8", + "pkcs8 0.10.2", "rand_core 0.6.4", - "signature", - "spki", + "signature 2.2.0", + "spki 0.7.3", "subtle", "zeroize", ] -[[package]] -name = "rust-ini" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091" -dependencies = [ - "cfg-if 1.0.0", - "ordered-multimap", -] - -[[package]] -name = "rust-s3" -version = "0.34.0-rc4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0533896b025761b23147ca1a168c436e1b87d14e460b1f19a6442882d2a3e07f" -dependencies = [ - "async-trait", - "aws-creds", - "aws-region", - "base64 0.21.7", - "bytes", - "cfg-if 1.0.0", - "futures", - "hex", - "hmac", - "http 0.2.11", - "hyper 0.14.28", - "hyper-tls 0.5.0", - "log", - "maybe-async", - "md5", - "minidom", - "native-tls", - "percent-encoding", - "quick-xml", - "serde", - "serde_derive", - "serde_json", - "sha2", - "thiserror", - "time", - "tokio", - "tokio-native-tls", - "tokio-stream", - "url", -] - [[package]] name = "rust_decimal" version = "1.33.1" @@ -5391,23 +5811,6 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" -[[package]] -name = "rxml" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a98f186c7a2f3abbffb802984b7f1dfd65dac8be1aafdaabbca4137f53f0dff7" -dependencies = [ - "bytes", - "rxml_validation", - "smartstring", -] - -[[package]] -name = "rxml_validation" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22a197350ece202f19a166d1ad6d9d6de145e1d2a8ef47db299abe164dbd7530" - [[package]] name = "ryu" version = "1.0.16" @@ -5456,7 +5859,7 @@ version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c95a930e03325234c18c7071fd2b60118307e025d6fff3e12745ffbf63a3d29c" dependencies = [ - "ahash 0.8.7", + "ahash 0.8.11", "cssparser", "ego-tree", "getopts", @@ -5483,6 +5886,20 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "sec1" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" +dependencies = [ + "base16ct", + "der 0.6.1", + "generic-array", + "pkcs8 0.9.0", + "subtle", + "zeroize", +] + [[package]] name = "secrecy" version = "0.8.0" @@ -5748,7 +6165,6 @@ dependencies = [ "log", "pin-project", "reqwest 0.11.27", - "rust-s3", "serde", "serde_json", "serde_repr", @@ -5766,6 +6182,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "1.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +dependencies = [ + "digest", + "rand_core 0.6.4", +] + [[package]] name = "signature" version = "2.2.0" @@ -5892,6 +6318,16 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spki" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b" +dependencies = [ + "base64ct", + "der 0.6.1", +] + [[package]] name = "spki" version = "0.7.3" @@ -5899,7 +6335,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" dependencies = [ "base64ct", - "der", + "der 0.7.8", ] [[package]] @@ -5932,7 +6368,7 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24ba59a9342a3d9bab6c56c118be528b27c9b60e490080e9711a04dccac83ef6" dependencies = [ - "ahash 0.8.7", + "ahash 0.8.11", "atoi", "byteorder", "bytes", @@ -6700,7 +7136,7 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5c266b9ac83dedf0e0385ad78514949e6d89491269e7065bee51d2bb8ec7373" dependencies = [ - "ahash 0.8.7", + "ahash 0.8.11", "gethostname", "log", "serde", @@ -7031,6 +7467,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "walkdir" version = "2.4.0" @@ -7548,6 +7990,12 @@ dependencies = [ "time", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "xtask" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 8cd4b2d00..8bf2c294d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ tokio = { workspace = true, features = [ "sync", "fs", "time", + "full", ] } tokio-stream.workspace = true tokio-util = { version = "0.7.10", features = ["io"] } @@ -47,7 +48,8 @@ validator = "0.16.1" bytes = "1.5.0" rcgen = { version = "0.10.0", features = ["pem", "x509-parser"] } mime = "0.3.17" -rust-s3 = { version = "0.34.0-rc4", default-features = false, features = ["tokio-rustls-tls", "with-tokio", "no-verify-ssl"] } +aws-sdk-s3 = { version = "1.36.0", features = ["behavior-version-latest", "rt-tokio"] } +aws-config = { version = "1.5.1", features = ["behavior-version-latest"] } redis = { workspace = true, features = ["json", "tokio-comp", "connection-manager"] } tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", features = ["registry", "env-filter", "ansi", "json", "tracing-log"] } @@ -70,6 +72,7 @@ tonic.workspace = true prost.workspace = true tonic-proto.workspace = true appflowy-collaborate = { path = "services/appflowy-collaborate" } +percent-encoding = "2.3.1" # ai appflowy-ai-client = { workspace = true, features = ["dto", "client-api"] } @@ -152,6 +155,7 @@ members = [ "libs/wasm-test", "libs/client-api-wasm", "libs/appflowy-ai-client", + "libs/client-api-entity", # services "services/appflowy-history", "services/appflowy-indexer", @@ -206,6 +210,7 @@ prost = "0.12" tonic-proto = { path = "libs/tonic-proto" } appflowy-ai-client = { path = "libs/appflowy-ai-client", default-features = false } pgvector = { version = "0.3", features = ["sqlx"] } +client-api-entity = { path = "libs/client-api-entity" } # collaboration yrs = "0.18.7" diff --git a/libs/app-error/Cargo.toml b/libs/app-error/Cargo.toml index eb57d27e7..008190009 100644 --- a/libs/app-error/Cargo.toml +++ b/libs/app-error/Cargo.toml @@ -15,7 +15,6 @@ anyhow = "1.0.79" uuid = { version = "1.6.1", features = ["v4"] } sqlx = { version = "0.7", default-features = false, features = ["postgres", "json"], optional = true } validator = { version = "0.16", optional = true } -rust-s3 = { version = "0.34.0-rc4", optional = true } url = { version = "2.5.0" } actix-web = { version = "4.4.1", optional = true } reqwest.workspace = true @@ -28,7 +27,6 @@ appflowy-ai-client = { workspace = true, optional = true, default-features = fal default = [] sqlx_error = ["sqlx"] validation_error = ["validator"] -s3_error = ["rust-s3"] actix_web_error = ["actix-web"] tokio_error = ["tokio"] gotrue_error = [] diff --git a/libs/app-error/src/lib.rs b/libs/app-error/src/lib.rs index ab5142544..08e259492 100644 --- a/libs/app-error/src/lib.rs +++ b/libs/app-error/src/lib.rs @@ -65,10 +65,6 @@ pub enum AppError { #[error("{user}: do not have permissions to {action}")] NotEnoughPermissions { user: String, action: String }, - #[cfg(feature = "s3_error")] - #[error(transparent)] - S3Error(#[from] s3::error::S3Error), - #[error("s3 response error:{0}")] S3ResponseError(String), @@ -163,8 +159,6 @@ impl AppError { AppError::InvalidRequest(_) => ErrorCode::InvalidRequest, AppError::NotLoggedIn(_) => ErrorCode::NotLoggedIn, AppError::NotEnoughPermissions { .. } => ErrorCode::NotEnoughPermissions, - #[cfg(feature = "s3_error")] - AppError::S3Error(_) => ErrorCode::S3Error, AppError::StorageSpaceNotEnough => ErrorCode::StorageSpaceNotEnough, AppError::PayloadTooLarge(_) => ErrorCode::PayloadTooLarge, AppError::Internal(_) => ErrorCode::Internal, @@ -280,8 +274,6 @@ pub enum ErrorCode { InvalidOAuthProvider = 1009, NotLoggedIn = 1011, NotEnoughPermissions = 1012, - #[cfg(feature = "s3_error")] - S3Error = 1014, StorageSpaceNotEnough = 1015, PayloadTooLarge = 1016, Internal = 1017, diff --git a/libs/client-api-entity/Cargo.toml b/libs/client-api-entity/Cargo.toml new file mode 100644 index 000000000..c4c501736 --- /dev/null +++ b/libs/client-api-entity/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "client-api-entity" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +collab-entity = { workspace = true } +gotrue-entity = { workspace = true } +shared-entity = { workspace = true } +collab-rt-entity = { workspace = true } +database-entity.workspace = true + +infra = { workspace = true, optional = true } + +[features] +file_util = ["infra/file_util"] \ No newline at end of file diff --git a/libs/client-api-entity/src/lib.rs b/libs/client-api-entity/src/lib.rs new file mode 100644 index 000000000..4d2fd53cf --- /dev/null +++ b/libs/client-api-entity/src/lib.rs @@ -0,0 +1,9 @@ +pub use collab_entity::*; +pub use collab_rt_entity::user::*; +pub use database_entity::dto::*; +pub use database_entity::file_dto::*; +pub use gotrue_entity::dto::*; +pub use shared_entity::dto::*; + +#[cfg(feature = "file_util")] +pub use infra::file_util; diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index aca618068..10e53c56d 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -39,16 +39,15 @@ serde.workspace = true app-error = { workspace = true, features = ["tokio_error", "bincode_error"] } scraper = { version = "0.17.1", optional = true } -collab-entity = { workspace = true } -gotrue-entity = { workspace = true } shared-entity = { workspace = true } collab-rt-entity = { workspace = true } -database-entity.workspace = true +client-api-entity.workspace = true serde_urlencoded = "0.7.1" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio-retry = "0.3" tokio-util = "0.7" +infra = { workspace = true, features = ["file_util"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio] workspace = true diff --git a/libs/client-api/src/collab_sync/collab_stream.rs b/libs/client-api/src/collab_sync/collab_stream.rs index c482a5a0f..4d0b1e2ff 100644 --- a/libs/client-api/src/collab_sync/collab_stream.rs +++ b/libs/client-api/src/collab_sync/collab_stream.rs @@ -3,9 +3,9 @@ use crate::collab_sync::{ start_sync, CollabSink, MissUpdateReason, SyncError, SyncObject, SyncReason, }; +use client_api_entity::{validate_data_for_folder, CollabType}; use collab::core::collab::MutexCollab; use collab::core::origin::CollabOrigin; -use collab_entity::{validate_data_for_folder, CollabType}; use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, ServerInit, UpdateSync}; use collab_rt_protocol::{ handle_message_follow_protocol, ClientSyncProtocol, Message, MessageReader, SyncMessage, diff --git a/libs/client-api/src/collab_sync/plugin.rs b/libs/client-api/src/collab_sync/plugin.rs index 3cecd90f8..85c8f95d0 100644 --- a/libs/client-api/src/collab_sync/plugin.rs +++ b/libs/client-api/src/collab_sync/plugin.rs @@ -6,10 +6,10 @@ use anyhow::anyhow; use collab::core::awareness::{AwarenessUpdate, Event}; use collab::core::collab::MutexCollab; +use client_api_entity::{CollabObject, CollabType}; use collab::core::collab_state::SyncState; use collab::core::origin::CollabOrigin; use collab::preclude::{Collab, CollabPlugin}; -use collab_entity::{CollabObject, CollabType}; use collab_rt_entity::{ClientCollabMessage, ServerCollabMessage, UpdateSync}; use collab_rt_protocol::{Message, SyncMessage}; diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index eb8787f19..1d1303ec4 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -1,11 +1,11 @@ use crate::notify::{ClientToken, TokenStateReceiver}; use app_error::AppError; -use collab_entity::CollabType; +use client_api_entity::AuthProvider; +use client_api_entity::CollabType; use gotrue::grant::PasswordGrant; use gotrue::grant::{Grant, RefreshTokenGrant}; use gotrue::params::MagicLinkParams; use gotrue::params::{AdminUserParams, GenerateLinkParams}; -use gotrue_entity::dto::AuthProvider; use shared_entity::dto::workspace_dto::{CreateWorkspaceParam, PatchWorkspaceParam}; use std::fmt::{Display, Formatter}; #[cfg(feature = "enable_brotli")] @@ -15,7 +15,7 @@ use parking_lot::RwLock; use reqwest::Method; use reqwest::RequestBuilder; -use database_entity::dto::{ +use client_api_entity::{ AFSnapshotMeta, AFSnapshotMetas, AFUserProfile, AFUserWorkspaceInfo, AFWorkspace, AFWorkspaces, QuerySnapshotParams, SnapshotData, }; @@ -31,8 +31,8 @@ use tracing::{error, event, info, instrument, trace, warn}; use url::Url; use crate::ws::ConnectInfo; -use gotrue_entity::dto::SignUpResponse::{Authenticated, NotAuthenticated}; -use gotrue_entity::dto::{GotrueTokenResponse, UpdateGotrueUserParams, User}; +use client_api_entity::SignUpResponse::{Authenticated, NotAuthenticated}; +use client_api_entity::{GotrueTokenResponse, UpdateGotrueUserParams, User}; pub const X_COMPRESSION_TYPE: &str = "X-Compression-Type"; pub const X_COMPRESSION_BUFFER_SIZE: &str = "X-Compression-Buffer-Size"; diff --git a/libs/client-api/src/http_blob.rs b/libs/client-api/src/http_blob.rs index 1f87806bc..18e696faf 100644 --- a/libs/client-api/src/http_blob.rs +++ b/libs/client-api/src/http_blob.rs @@ -1,12 +1,14 @@ use crate::http::log_request_id; use crate::Client; + use app_error::AppError; use bytes::Bytes; -use futures_util::StreamExt; +use futures_util::TryStreamExt; use mime::Mime; use reqwest::{header, Method, StatusCode}; use shared_entity::dto::workspace_dto::{BlobMetadata, RepeatedBlobMetaData}; use shared_entity::response::{AppResponse, AppResponseError}; + use tracing::instrument; impl Client { @@ -64,6 +66,66 @@ impl Client { .await? .into_data() } + pub fn get_blob_url_v1(&self, workspace_id: &str, parent_dir: &str, file_id: &str) -> String { + format!( + "{}/api/file_storage/{workspace_id}/v1/blob/{parent_dir}/{file_id}", + self.base_url + ) + } + + #[instrument(level = "info", skip_all)] + pub async fn get_blob_v1( + &self, + workspace_id: &str, + parent_dir: &str, + file_id: &str, + ) -> Result<(Mime, Vec), AppResponseError> { + let url = self.get_blob_url_v1(workspace_id, parent_dir, file_id); + self.get_blob(&url).await + } + + #[instrument(level = "info", skip_all)] + pub async fn delete_blob_v1( + &self, + workspace_id: &str, + parent_dir: &str, + file_id: &str, + ) -> Result<(), AppResponseError> { + let url = format!( + "{}/api/file_storage/{workspace_id}/v1/blob/{parent_dir}/{file_id}", + self.base_url + ); + let resp = self + .http_client_with_auth(Method::DELETE, &url) + .await? + .send() + .await?; + log_request_id(&resp); + AppResponse::<()>::from_response(resp).await?.into_error() + } + + #[instrument(level = "info", skip_all)] + pub async fn get_blob_v1_metadata( + &self, + workspace_id: &str, + parent_dir: &str, + file_id: &str, + ) -> Result { + let url = format!( + "{}/api/file_storage/{workspace_id}/v1/metadata/{parent_dir}/{file_id}", + self.base_url + ); + let resp = self + .http_client_with_auth(Method::GET, &url) + .await? + .send() + .await?; + + log_request_id(&resp); + AppResponse::::from_response(resp) + .await? + .into_data() + } /// Get the file with the given url. The url should be in the format of /// `https://appflowy.io/api/file_storage//`. @@ -77,42 +139,37 @@ impl Client { log_request_id(&resp); match resp.status() { - reqwest::StatusCode::OK => { - // get mime from resp header - let mime = { - match resp.headers().get(header::CONTENT_TYPE) { - Some(v) => match v.to_str() { - Ok(v) => match v.parse::() { - Ok(v) => v, - Err(e) => { - tracing::error!("failed to parse mime from header: {:?}", e); - mime::TEXT_PLAIN - }, - }, - Err(e) => { - tracing::error!("failed to get mime from header: {:?}", e); - mime::TEXT_PLAIN - }, - }, - None => mime::TEXT_PLAIN, - } - }; - - let mut stream = resp.bytes_stream(); - let mut acc: Vec = Vec::new(); - while let Some(raw_bytes) = stream.next().await { - acc.extend_from_slice(&raw_bytes?); - } - Ok((mime, acc)) + StatusCode::OK => { + let mime = resp + .headers() + .get(header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) + .unwrap_or(mime::TEXT_PLAIN); + + let bytes = resp + .bytes_stream() + .try_fold(Vec::new(), |mut acc, chunk| async move { + acc.extend_from_slice(&chunk); + Ok(acc) + }) + .await?; + + Ok((mime, bytes)) }, - reqwest::StatusCode::NOT_FOUND => Err(AppResponseError::from(AppError::RecordNotFound( + StatusCode::NOT_FOUND => Err(AppResponseError::from(AppError::RecordNotFound( url.to_owned(), ))), - c => Err(AppResponseError::from(AppError::Unhandled(format!( - "status code: {}, message: {}", - c, - resp.text().await? - )))), + status => { + let message = resp + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + Err(AppResponseError::from(AppError::Unhandled(format!( + "status code: {}, message: {}", + status, message + )))) + }, } } diff --git a/libs/client-api/src/http_chat.rs b/libs/client-api/src/http_chat.rs index dcd3013f5..2821547fc 100644 --- a/libs/client-api/src/http_chat.rs +++ b/libs/client-api/src/http_chat.rs @@ -1,7 +1,7 @@ use crate::http::log_request_id; use crate::Client; use bytes::Bytes; -use database_entity::dto::{ +use client_api_entity::{ ChatMessage, CreateAnswerMessageParams, CreateChatMessageParams, CreateChatParams, MessageCursor, RepeatedChatMessage, UpdateChatMessageContentParams, }; diff --git a/libs/client-api/src/http_collab.rs b/libs/client-api/src/http_collab.rs index 21cee54c0..5deaa7e45 100644 --- a/libs/client-api/src/http_collab.rs +++ b/libs/client-api/src/http_collab.rs @@ -1,13 +1,12 @@ use crate::http::log_request_id; use crate::{spawn_blocking_brotli_compress, Client}; use app_error::AppError; -use database_entity::dto::{ +use client_api_entity::{ BatchQueryCollabParams, BatchQueryCollabResult, CreateCollabParams, DeleteCollabParams, QueryCollab, }; use reqwest::Method; use shared_entity::response::{AppResponse, AppResponseError}; -use std::time::Duration; use tracing::instrument; impl Client { @@ -35,7 +34,7 @@ impl Client { #[cfg(not(target_arch = "wasm32"))] { - builder = builder.timeout(Duration::from_secs(60)); + builder = builder.timeout(std::time::Duration::from_secs(60)); } let resp = builder.body(compress_bytes).send().await?; diff --git a/libs/client-api/src/http_history.rs b/libs/client-api/src/http_history.rs index 5de7cd7af..4c3b50372 100644 --- a/libs/client-api/src/http_history.rs +++ b/libs/client-api/src/http_history.rs @@ -1,6 +1,6 @@ use crate::http::log_request_id; use crate::Client; -use collab_entity::CollabType; +use client_api_entity::CollabType; use reqwest::Method; use shared_entity::dto::history_dto::{RepeatedSnapshotMeta, SnapshotInfo}; use shared_entity::response::{AppResponse, AppResponseError}; diff --git a/libs/client-api/src/http_member.rs b/libs/client-api/src/http_member.rs index ebb20652d..b198e9bf1 100644 --- a/libs/client-api/src/http_member.rs +++ b/libs/client-api/src/http_member.rs @@ -1,6 +1,6 @@ use crate::http::log_request_id; use crate::Client; -use database_entity::dto::{ +use client_api_entity::{ AFCollabMember, AFCollabMembers, AFWorkspaceInvitation, AFWorkspaceInvitationStatus, AFWorkspaceMember, CollabMemberIdentify, InsertCollabMemberParams, QueryCollabMembers, QueryWorkspaceMember, UpdateCollabMemberParams, diff --git a/libs/client-api/src/http_publish.rs b/libs/client-api/src/http_publish.rs index 7ed3d7aa3..6ac03828c 100644 --- a/libs/client-api/src/http_publish.rs +++ b/libs/client-api/src/http_publish.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use database_entity::dto::UpdatePublishNamespace; +use client_api_entity::UpdatePublishNamespace; use reqwest::{Body, Method}; use shared_entity::response::{AppResponse, AppResponseError}; diff --git a/libs/client-api/src/http_settings.rs b/libs/client-api/src/http_settings.rs index b62f27238..4ca4f0a6d 100644 --- a/libs/client-api/src/http_settings.rs +++ b/libs/client-api/src/http_settings.rs @@ -1,7 +1,7 @@ use reqwest::Method; use tracing::instrument; -use database_entity::dto::AFWorkspaceSettings; +use client_api_entity::AFWorkspaceSettings; use shared_entity::response::{AppResponse, AppResponseError}; use crate::http::log_request_id; diff --git a/libs/client-api/src/lib.rs b/libs/client-api/src/lib.rs index 06f910ed4..006cce819 100644 --- a/libs/client-api/src/lib.rs +++ b/libs/client-api/src/lib.rs @@ -1,6 +1,7 @@ mod http; mod http_ai; mod http_billing; + mod http_blob; mod http_collab; mod http_history; @@ -36,10 +37,7 @@ pub mod error { // Export all dto entities that will be used in the frontend application pub mod entity { - pub use collab_rt_entity::user::*; - pub use database_entity::dto::*; - pub use gotrue_entity::dto::*; - pub use shared_entity::dto::*; + pub use client_api_entity::*; } #[cfg(feature = "template")] diff --git a/libs/client-api/src/native/http_native.rs b/libs/client-api/src/native/http_native.rs index 46a3ce21c..08561d1aa 100644 --- a/libs/client-api/src/native/http_native.rs +++ b/libs/client-api/src/native/http_native.rs @@ -6,21 +6,104 @@ use crate::{RefreshTokenAction, RefreshTokenRetryCondition}; use anyhow::anyhow; use app_error::AppError; use async_trait::async_trait; + +use client_api_entity::{CollabParams, QueryCollabParams}; +use client_api_entity::{ + CompleteUploadRequest, CreateUploadRequest, CreateUploadResponse, UploadPartResponse, +}; use collab_rt_entity::HttpRealtimeMessage; -use database_entity::dto::{CollabParams, QueryCollabParams}; use futures_util::stream; use prost::Message; use reqwest::{Body, Method}; use shared_entity::dto::workspace_dto::CollabResponse; use shared_entity::response::{AppResponse, AppResponseError}; use std::future::Future; + use std::sync::atomic::Ordering; use std::time::Duration; use tokio_retry::strategy::{ExponentialBackoff, FixedInterval}; use tokio_retry::{Retry, RetryIf}; -use tracing::{event, info, instrument}; +use tracing::{event, info, instrument, trace}; + +pub use infra::file_util::ChunkedBytes; impl Client { + pub async fn create_upload( + &self, + workspace_id: &str, + req: CreateUploadRequest, + ) -> Result { + trace!("create_upload: {}", req); + let url = format!( + "{}/api/file_storage/{workspace_id}/create_upload", + self.base_url + ); + let resp = self + .http_client_with_auth(Method::POST, &url) + .await? + .json(&req) + .send() + .await?; + log_request_id(&resp); + AppResponse::::from_response(resp) + .await? + .into_data() + } + + /// Upload a part of a file. The part number should be 1-based. + /// + /// In Amazon S3, the minimum chunk size for multipart uploads is 5 MB,except for the last part, + /// which can be smaller.(https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html) + pub async fn upload_part( + &self, + workspace_id: &str, + parent_dir: &str, + file_id: &str, + upload_id: &str, + part_number: i32, + body: Vec, + ) -> Result { + if body.is_empty() { + return Err(AppResponseError::from(AppError::InvalidRequest( + "Empty body".to_string(), + ))); + } + + let url = format!( + "{}/api/file_storage/{workspace_id}/upload_part/{parent_dir}/{file_id}/{upload_id}/{part_number}", + self.base_url + ); + let resp = self + .http_client_with_auth(Method::PUT, &url) + .await? + .body(body) + .send() + .await?; + log_request_id(&resp); + AppResponse::::from_response(resp) + .await? + .into_data() + } + + pub async fn complete_upload( + &self, + workspace_id: &str, + req: CompleteUploadRequest, + ) -> Result<(), AppResponseError> { + let url = format!( + "{}/api/file_storage/{}/complete_upload", + self.base_url, workspace_id + ); + let resp = self + .http_client_with_auth(Method::PUT, &url) + .await? + .json(&req) + .send() + .await?; + log_request_id(&resp); + AppResponse::<()>::from_response(resp).await?.into_error() + } + #[instrument(level = "debug", skip_all)] pub async fn get_collab( &self, diff --git a/libs/client-api/src/native/retry.rs b/libs/client-api/src/native/retry.rs index f7d2e797d..3454de4fa 100644 --- a/libs/client-api/src/native/retry.rs +++ b/libs/client-api/src/native/retry.rs @@ -5,8 +5,8 @@ use crate::ws::{ }; use crate::Client; use app_error::gotrue::GoTrueError; +use client_api_entity::QueryCollabParams; use client_websocket::{connect_async, WebSocketStream}; -use database_entity::dto::QueryCollabParams; use gotrue::grant::{Grant, RefreshTokenGrant}; use parking_lot::RwLock; use reqwest::header::HeaderMap; diff --git a/libs/client-api/src/notify.rs b/libs/client-api/src/notify.rs index 3af369f25..6afd86eda 100644 --- a/libs/client-api/src/notify.rs +++ b/libs/client-api/src/notify.rs @@ -1,5 +1,5 @@ use anyhow::Error; -use gotrue_entity::dto::GotrueTokenResponse; +use client_api_entity::GotrueTokenResponse; use std::ops::{Deref, DerefMut}; use tokio::sync::broadcast::{channel, Receiver, Sender}; use tracing::{event, warn}; diff --git a/libs/client-api/src/wasm/http_wasm.rs b/libs/client-api/src/wasm/http_wasm.rs index c0719db2e..1ba015ebf 100644 --- a/libs/client-api/src/wasm/http_wasm.rs +++ b/libs/client-api/src/wasm/http_wasm.rs @@ -4,7 +4,7 @@ use crate::Client; use app_error::gotrue::GoTrueError; use app_error::ErrorCode; use async_trait::async_trait; -use database_entity::dto::{CollabParams, QueryCollabParams}; +use client_api_entity::{CollabParams, QueryCollabParams}; use gotrue::grant::{Grant, RefreshTokenGrant}; use reqwest::Method; use shared_entity::dto::workspace_dto::{CollabResponse, CollabTypeParam}; diff --git a/libs/database-entity/src/file_dto.rs b/libs/database-entity/src/file_dto.rs new file mode 100644 index 000000000..a87076c87 --- /dev/null +++ b/libs/database-entity/src/file_dto.rs @@ -0,0 +1,85 @@ +use serde::{Deserialize, Serialize}; +use std::fmt::Display; + +#[derive(Serialize, Deserialize)] +pub struct CreateUploadRequest { + pub file_id: String, + pub parent_dir: String, + pub content_type: String, +} + +impl Display for CreateUploadRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "CreateUploadRequest: file_id: {}, content_type: {}", + self.file_id, self.content_type + ) + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct CreateUploadResponse { + pub file_id: String, + pub upload_id: String, +} + +#[derive(Serialize, Deserialize)] +pub struct UploadPartData { + pub file_id: String, + pub upload_id: String, + pub part_number: i32, + pub body: Vec, +} + +impl Display for UploadPartData { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "UploadPartRequest: file_id: {}, upload_id: {}, part_number: {}, size:{}", + self.file_id, + self.upload_id, + self.part_number, + self.body.len() + ) + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct UploadPartResponse { + pub e_tag: String, + pub part_num: i32, +} + +#[derive(Serialize, Deserialize)] +pub struct CompleteUploadRequest { + pub file_id: String, + pub parent_dir: String, + pub upload_id: String, + pub parts: Vec, +} + +impl Display for CompleteUploadRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "CompleteUploadRequest: file_id: {}, upload_id: {}, parts: {}", + self.file_id, + self.upload_id, + self.parts.len() + ) + } +} + +#[derive(Serialize, Deserialize)] +pub struct CompletedPartRequest { + pub e_tag: String, + pub part_number: i32, +} + +#[derive(Serialize, Deserialize)] +pub struct CompleteUploadResponse { + pub file_id: String, + pub upload_id: String, + pub parts: Vec, +} diff --git a/libs/database-entity/src/lib.rs b/libs/database-entity/src/lib.rs index cfea643e8..0d3d14ec8 100644 --- a/libs/database-entity/src/lib.rs +++ b/libs/database-entity/src/lib.rs @@ -1,2 +1,3 @@ pub mod dto; +pub mod file_dto; mod util; diff --git a/libs/database/Cargo.toml b/libs/database/Cargo.toml index b071a1aae..a63189680 100644 --- a/libs/database/Cargo.toml +++ b/libs/database/Cargo.toml @@ -10,7 +10,7 @@ collab = { workspace = true } collab-entity = { workspace = true } validator = { version = "0.16", features = ["validator_derive", "derive"] } database-entity.workspace = true -app-error = { workspace = true, features = ["sqlx_error", "validation_error", "s3_error"] } +app-error = { workspace = true, features = ["sqlx_error", "validation_error"] } tokio = { workspace = true, features = ["sync"] } async-trait = "0.1.77" @@ -27,7 +27,7 @@ chrono = { version = "0.4", features = ["serde"] } redis.workspace = true futures-util = "0.3.30" bytes = "1.5" -rust-s3 = { version = "0.34.0-rc4", optional = true } +aws-sdk-s3 = { version = "1.36.0", features = ["behavior-version-latest", "rt-tokio"], optional = true } sha2 = "0.10.8" base64 = "0.21.7" rust_decimal = "1.33.1" @@ -35,4 +35,4 @@ bincode.workspace = true [features] default = ["s3"] -s3 = ["rust-s3"] +s3 = ["aws-sdk-s3"] diff --git a/libs/database/src/file/bucket_s3_impl.rs b/libs/database/src/file/bucket_s3_impl.rs deleted file mode 100644 index 8fce9722a..000000000 --- a/libs/database/src/file/bucket_s3_impl.rs +++ /dev/null @@ -1,73 +0,0 @@ -use crate::file::{BucketClient, BucketStorage, ResponseBlob}; -use app_error::AppError; -use async_trait::async_trait; - -pub type S3BucketStorage = BucketStorage; - -impl S3BucketStorage { - pub fn from_s3_bucket(bucket: s3::Bucket, pg_pool: sqlx::PgPool) -> Self { - Self::new(BucketClientS3Impl(bucket), pg_pool) - } -} - -pub struct BucketClientS3Impl(s3::Bucket); - -#[async_trait] -impl BucketClient for BucketClientS3Impl { - type ResponseData = S3ResponseData; - - async fn pub_blob

(&self, id: P, content: &[u8]) -> Result<(), AppError> - where - P: AsRef + Send, - { - let code = self.0.put_object(id, content).await?.status_code(); - check_s3_status_code(code)?; - Ok(()) - } - - async fn delete_blob

(&self, id: P) -> Result - where - P: AsRef + Send, - { - let response = self.0.delete_object(id).await?; - check_s3_response_data(&response)?; - Ok(S3ResponseData(response)) - } - - async fn get_blob

(&self, id: P) -> Result - where - P: AsRef + Send, - { - let response = self.0.get_object(id).await?; - Ok(S3ResponseData(response)) - } -} - -pub struct S3ResponseData(s3::request::ResponseData); -impl ResponseBlob for S3ResponseData { - fn to_blob(self) -> Vec { - self.0.to_vec() - } -} - -#[inline] -fn check_s3_response_data(resp: &s3::request::ResponseData) -> Result<(), AppError> { - let status_code = resp.status_code(); - match status_code { - 200..=299 => Ok(()), - error_code => { - let text = resp.bytes(); - let s = String::from_utf8_lossy(text); - let msg = format!("S3 error: {}, code: {}", s, error_code); - Err(AppError::S3ResponseError(msg)) - }, - } -} - -#[inline] -fn check_s3_status_code(status_code: u16) -> Result<(), AppError> { - match status_code { - 200..=299 => Ok(()), - error_code => Err(AppError::S3ResponseError(format!("{}", error_code))), - } -} diff --git a/libs/database/src/file/file_storage.rs b/libs/database/src/file/file_storage.rs index 289397c98..15682beaa 100644 --- a/libs/database/src/file/file_storage.rs +++ b/libs/database/src/file/file_storage.rs @@ -4,9 +4,12 @@ use crate::resource_usage::{ }; use app_error::AppError; use async_trait::async_trait; +use database_entity::file_dto::{ + CompleteUploadRequest, CreateUploadRequest, CreateUploadResponse, UploadPartData, + UploadPartResponse, +}; use sqlx::PgPool; - -use tracing::{instrument, warn}; +use tracing::{info, instrument, warn}; use uuid::Uuid; pub trait ResponseBlob { @@ -17,17 +20,38 @@ pub trait ResponseBlob { pub trait BucketClient { type ResponseData: ResponseBlob; - async fn pub_blob

(&self, id: P, content: &[u8]) -> Result<(), AppError> + async fn pub_blob

(&self, id: &P, content: &[u8]) -> Result<(), AppError> where - P: AsRef + Send; + P: BlobKey; - async fn delete_blob

(&self, id: P) -> Result - where - P: AsRef + Send; + async fn delete_blob(&self, object_key: &str) -> Result; - async fn get_blob

(&self, id: P) -> Result - where - P: AsRef + Send; + async fn get_blob(&self, object_key: &str) -> Result; + + async fn create_upload( + &self, + key: impl BlobKey, + req: CreateUploadRequest, + ) -> Result; + async fn upload_part( + &self, + key: &impl BlobKey, + req: UploadPartData, + ) -> Result; + async fn complete_upload( + &self, + key: &impl BlobKey, + req: CompleteUploadRequest, + ) -> Result<(usize, String), AppError>; + + async fn remove_dir(&self, dir: &str) -> Result<(), AppError>; +} + +pub trait BlobKey: Send + Sync { + fn workspace_id(&self) -> &Uuid; + fn object_key(&self) -> String; + fn meta_key(&self) -> &str; + fn e_tag(&self) -> &str; } pub struct BucketStorage { @@ -43,30 +67,35 @@ where Self { client, pg_pool } } + pub async fn remove_dir(&self, dir: &str) -> Result<(), AppError> { + info!("removing dir: {}", dir); + self.client.remove_dir(dir).await?; + Ok(()) + } + #[instrument(skip_all, err)] #[inline] - pub async fn put_blob( + pub async fn put_blob( &self, - workspace_id: Uuid, - file_id: String, + key: K, file_data: Vec, file_type: String, ) -> Result<(), AppError> { - if is_blob_metadata_exists(&self.pg_pool, &workspace_id, &file_id).await? { + if is_blob_metadata_exists(&self.pg_pool, key.workspace_id(), key.meta_key()).await? { warn!( - "file already exists, workspace_id: {}, file_id: {}", - workspace_id, file_id + "file already exists, workspace_id: {}, meta_key: {}", + key.workspace_id(), + key.meta_key() ); return Ok(()); } - let obj_key = format!("{}/{}", workspace_id, file_id); - self.client.pub_blob(obj_key, &file_data).await?; + self.client.pub_blob(&key, &file_data).await?; insert_blob_metadata( &self.pg_pool, - &file_id, - &workspace_id, + key.meta_key(), + key.workspace_id(), &file_type, file_data.len(), ) @@ -74,11 +103,11 @@ where Ok(()) } - pub async fn delete_blob(&self, workspace_id: &Uuid, file_id: &str) -> Result<(), AppError> { - let obj_key = format!("{}/{}", workspace_id, file_id); + pub async fn delete_blob(&self, key: impl BlobKey) -> Result<(), AppError> { + self.client.delete_blob(&key.object_key()).await?; + let mut tx = self.pg_pool.begin().await?; - delete_blob_metadata(&mut tx, workspace_id, file_id).await?; - self.client.delete_blob(obj_key).await?; + delete_blob_metadata(&mut tx, key.workspace_id(), key.meta_key()).await?; tx.commit().await?; Ok(()) } @@ -86,15 +115,56 @@ where pub async fn get_blob_metadata( &self, workspace_id: &Uuid, - file_id: &str, + meta_key: &str, ) -> Result { - let metadata = get_blob_metadata(&self.pg_pool, workspace_id, file_id).await?; + let metadata = get_blob_metadata(&self.pg_pool, workspace_id, meta_key).await?; Ok(metadata) } - pub async fn get_blob(&self, workspace_id: &Uuid, file_id: &str) -> Result, AppError> { - let obj_key = format!("{}/{}", workspace_id, file_id); - let blob = self.client.get_blob(obj_key).await?.to_blob(); + pub async fn get_blob(&self, key: &impl BlobKey) -> Result, AppError> { + let blob = self.client.get_blob(&key.object_key()).await?.to_blob(); Ok(blob) } + + pub async fn create_upload( + &self, + key: impl BlobKey, + req: CreateUploadRequest, + ) -> Result { + self.client.create_upload(key, req).await + } + + pub async fn upload_part( + &self, + key: impl BlobKey, + req: UploadPartData, + ) -> Result { + self.client.upload_part(&key, req).await + } + + pub async fn complete_upload( + &self, + key: impl BlobKey, + req: CompleteUploadRequest, + ) -> Result<(), AppError> { + if is_blob_metadata_exists(&self.pg_pool, key.workspace_id(), &key.object_key()).await? { + warn!( + "file already exists, workspace_id: {}, request: {}", + key.workspace_id(), + req + ); + return Ok(()); + } + + let (content_length, content_type) = self.client.complete_upload(&key, req).await?; + insert_blob_metadata( + &self.pg_pool, + key.meta_key(), + key.workspace_id(), + &content_type, + content_length, + ) + .await?; + Ok(()) + } } diff --git a/libs/database/src/file/mod.rs b/libs/database/src/file/mod.rs index 16d5b207b..7337d660e 100644 --- a/libs/database/src/file/mod.rs +++ b/libs/database/src/file/mod.rs @@ -1,5 +1,5 @@ -pub mod bucket_s3_impl; mod file_storage; +pub mod s3_client_impl; mod utils; pub use file_storage::*; diff --git a/libs/database/src/file/s3_client_impl.rs b/libs/database/src/file/s3_client_impl.rs new file mode 100644 index 000000000..1dcddaa81 --- /dev/null +++ b/libs/database/src/file/s3_client_impl.rs @@ -0,0 +1,383 @@ +use crate::file::{BlobKey, BucketClient, BucketStorage, ResponseBlob}; +use anyhow::anyhow; +use app_error::AppError; +use async_trait::async_trait; +use aws_sdk_s3::operation::delete_object::DeleteObjectOutput; + +use std::ops::Deref; + +use aws_sdk_s3::error::SdkError; + +use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput; +use aws_sdk_s3::operation::get_object::GetObjectError; +use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}; +use aws_sdk_s3::Client; +use database_entity::file_dto::{ + CompleteUploadRequest, CreateUploadRequest, CreateUploadResponse, UploadPartData, + UploadPartResponse, +}; + +use tracing::{error, trace}; + +pub type S3BucketStorage = BucketStorage; + +impl S3BucketStorage { + pub fn from_bucket_impl(client: AwsS3BucketClientImpl, pg_pool: sqlx::PgPool) -> Self { + Self::new(client, pg_pool) + } +} + +pub struct AwsS3BucketClientImpl { + client: Client, + bucket: String, +} + +impl AwsS3BucketClientImpl { + pub fn new(client: Client, bucket: String) -> Self { + debug_assert!(!bucket.is_empty()); + AwsS3BucketClientImpl { client, bucket } + } + + async fn complete_upload_and_get_metadata( + &self, + object_key: &str, + upload_id: &str, + completed_multipart_upload: CompletedMultipartUpload, + ) -> Result<(usize, String), AppError> { + // Complete the multipart upload + let _ = self + .client + .complete_multipart_upload() + .bucket(&self.bucket) + .key(object_key) + .upload_id(upload_id) + .multipart_upload(completed_multipart_upload) + .send() + .await + .map_err(|e| AppError::Internal(anyhow::anyhow!(e)))?; + + // Retrieve the object metadata using head_object + let head_object_result = self + .client + .head_object() + .bucket(&self.bucket) + .key(object_key) + .send() + .await + .map_err(|e| AppError::Internal(anyhow::anyhow!(e)))?; + + let content_length = head_object_result + .content_length() + .ok_or_else(|| AppError::Unhandled("Content-Length not found".to_string()))?; + let content_type = head_object_result + .content_type() + .map(|s| s.to_string()) + .unwrap_or_else(|| "application/octet-stream".to_string()); + + Ok((content_length as usize, content_type)) + } +} + +#[async_trait] +impl BucketClient for AwsS3BucketClientImpl { + type ResponseData = S3ResponseData; + + async fn pub_blob

(&self, id: &P, content: &[u8]) -> Result<(), AppError> + where + P: BlobKey + Send, + { + let key = id.object_key(); + trace!( + "Uploading object to S3 bucket:{}, key {}, len: {}", + self.bucket, + key, + content.len() + ); + let body = ByteStream::from(content.to_vec()); + self + .client + .put_object() + .bucket(&self.bucket) + .key(key) + .body(body) + .send() + .await + .map_err(|err| anyhow!("Failed to upload object to S3: {}", err))?; + + Ok(()) + } + + async fn delete_blob(&self, object_key: &str) -> Result { + let output = self + .client + .delete_object() + .bucket(&self.bucket) + .key(object_key) + .send() + .await + .map_err(|err| anyhow!("Failed to delete object to S3: {}", err))?; + + Ok(S3ResponseData::new(output)) + } + + async fn get_blob(&self, object_key: &str) -> Result { + match self + .client + .get_object() + .bucket(&self.bucket) + .key(object_key) + .send() + .await + { + Ok(output) => match output.body.collect().await { + Ok(body) => { + let data = body.into_bytes().to_vec(); + Ok(S3ResponseData::new_with_data(data)) + }, + Err(err) => Err(AppError::from(anyhow!("Failed to collect body: {}", err))), + }, + Err(SdkError::ServiceError(service_err)) => match service_err.err() { + GetObjectError::NoSuchKey(_) => Err(AppError::RecordNotFound(format!( + "blob not found for key:{object_key}" + ))), + _ => Err(AppError::from(anyhow!( + "Failed to get object from S3: {:?}", + service_err + ))), + }, + Err(err) => Err(AppError::from(anyhow!( + "Failed to get object from S3: {}", + err + ))), + } + } + + /// Create a new upload session + /// https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html + async fn create_upload( + &self, + key: impl BlobKey, + req: CreateUploadRequest, + ) -> Result { + let object_key = key.object_key(); + trace!( + "Creating upload to S3 bucket:{}, key {}, request: {}", + self.bucket, + object_key, + req + ); + let multipart_upload_res = self + .client + .create_multipart_upload() + .bucket(&self.bucket) + .key(&object_key) + .content_type(req.content_type) + .send() + .await + .map_err(|err| anyhow!("Failed to create upload: {}", err))?; + + match multipart_upload_res.upload_id { + None => Err(anyhow!("Failed to create upload: upload_id is None").into()), + Some(upload_id) => Ok(CreateUploadResponse { + file_id: req.file_id, + upload_id, + }), + } + } + + async fn upload_part( + &self, + key: &impl BlobKey, + req: UploadPartData, + ) -> Result { + if req.body.is_empty() { + return Err(AppError::InvalidRequest("body is empty".to_string())); + } + let object_key = key.object_key(); + trace!( + "Uploading part to S3 bucket:{}, key {}, request: {}", + self.bucket, + object_key, + req, + ); + let body = ByteStream::from(req.body); + let upload_part_res = self + .client + .upload_part() + .bucket(&self.bucket) + .key(&object_key) + .upload_id(&req.upload_id) + .part_number(req.part_number) + .body(body) + .send() + .await + .map_err(|err| anyhow!("Failed to upload part: {}", err))?; + + match upload_part_res.e_tag { + None => Err(anyhow!("Failed to upload part: e_tag is None").into()), + Some(e_tag) => Ok(UploadPartResponse { + part_num: req.part_number, + e_tag, + }), + } + } + + /// Return the content length and content type of the uploaded object + async fn complete_upload( + &self, + key: &impl BlobKey, + req: CompleteUploadRequest, + ) -> Result<(usize, String), AppError> { + let object_key = key.object_key(); + trace!( + "Completing upload to S3 bucket:{}, key {}, request: {}", + self.bucket, + object_key, + req, + ); + let parts = req + .parts + .into_iter() + .map(|part| { + CompletedPart::builder() + .e_tag(part.e_tag) + .part_number(part.part_number) + .build() + }) + .collect::>(); + let completed_multipart_upload = CompletedMultipartUpload::builder() + .set_parts(Some(parts)) + .build(); + + self + .complete_upload_and_get_metadata(&object_key, &req.upload_id, completed_multipart_upload) + .await + } + + async fn remove_dir(&self, parent_dir: &str) -> Result<(), AppError> { + let mut continuation_token = None; + loop { + let list_objects = self + .client + .list_objects_v2() + .bucket(&self.bucket) + .prefix(parent_dir) + .set_continuation_token(continuation_token.clone()) + .send() + .await + .map_err(|err| anyhow!("Failed to list object: {}", err))?; + + let mut objects_to_delete: Vec = list_objects + .contents + .unwrap_or_default() + .into_iter() + .filter_map(|object| { + object.key.and_then(|key| { + ObjectIdentifier::builder() + .key(key) + .build() + .map_err(|e| { + error!("Error building ObjectIdentifier: {:?}", e); + e + }) + .ok() + }) + }) + .collect(); + + trace!( + "objects_to_delete: {:?} at directory: {}", + objects_to_delete.len(), + parent_dir + ); + + // Step 2: Delete the listed objects in batches of 1000 + while !objects_to_delete.is_empty() { + let batch = if objects_to_delete.len() > 1000 { + objects_to_delete.split_off(1000) + } else { + Vec::new() + }; + + trace!( + "Deleting {} objects: {:?}", + parent_dir, + objects_to_delete + .iter() + .map(|object| &object.key) + .collect::>() + ); + + let delete = Delete::builder() + .set_objects(Some(objects_to_delete)) + .build() + .map_err(|e| { + println!("Error building Delete: {:?}", e); + e + }) + .map_err(|err| anyhow!("Failed to build delete object: {}", err))?; + + let delete_objects_output: DeleteObjectsOutput = self + .client + .delete_objects() + .bucket(&self.bucket) + .delete(delete) + .send() + .await + .map_err(|err| anyhow!("Failed to delete delete object: {}", err))?; + + if let Some(errors) = delete_objects_output.errors { + for error in errors { + println!("Error deleting object: {:?}", error); + } + } + + objects_to_delete = batch; + } + + // is_truncated is true if there are more objects to list. If it's false, it means we have listed all objects in the directory + match list_objects.is_truncated { + None => break, + Some(is_truncated) => { + if !is_truncated { + break; + } + }, + } + + continuation_token = list_objects.next_continuation_token; + } + + Ok(()) + } +} + +#[derive(Debug)] +pub struct S3ResponseData { + data: Vec, +} + +impl Deref for S3ResponseData { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +impl ResponseBlob for S3ResponseData { + fn to_blob(self) -> Vec { + self.data + } +} + +impl S3ResponseData { + pub fn new(_output: DeleteObjectOutput) -> Self { + S3ResponseData { data: Vec::new() } + } + + pub fn new_with_data(data: Vec) -> Self { + S3ResponseData { data } + } +} diff --git a/libs/database/src/resource_usage.rs b/libs/database/src/resource_usage.rs index cc87fc4c5..3fc8d7dc1 100644 --- a/libs/database/src/resource_usage.rs +++ b/libs/database/src/resource_usage.rs @@ -8,7 +8,7 @@ use sqlx::{PgPool, Transaction}; use tracing::instrument; use uuid::Uuid; -#[instrument(level = "trace", skip_all, err)] +#[instrument(level = "trace", skip_all)] #[inline] pub async fn is_blob_metadata_exists( pool: &PgPool, diff --git a/libs/gotrue/Cargo.toml b/libs/gotrue/Cargo.toml index ea67c782f..67c9cd85e 100644 --- a/libs/gotrue/Cargo.toml +++ b/libs/gotrue/Cargo.toml @@ -14,9 +14,9 @@ futures-util = "0.3.30" anyhow = "1.0.79" reqwest = { workspace = true, features = ["json", "rustls-tls", "cookies"] } tokio = { workspace = true, features = ["sync", "macros"] } -infra = { path = "../infra" } +infra = { path = "../infra", features = ["request_util"] } gotrue-entity = { path = "../gotrue-entity" } tracing = "0.1" [target.'cfg(target_arch = "wasm32")'.dependencies] -getrandom = { version = "0.2", features = ["js"]} \ No newline at end of file +getrandom = { version = "0.2", features = ["js"] } \ No newline at end of file diff --git a/libs/infra/Cargo.toml b/libs/infra/Cargo.toml index ed1a777cc..88ec41d82 100644 --- a/libs/infra/Cargo.toml +++ b/libs/infra/Cargo.toml @@ -6,8 +6,14 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -reqwest = { workspace = true } +reqwest = { workspace = true, optional = true } anyhow = "1.0.79" serde.workspace = true serde_json.workspace = true -tracing.workspace = true \ No newline at end of file +tracing.workspace = true +bytes = { workspace = true, optional = true } +tokio = { workspace = true, optional = true } + +[features] +file_util = ["bytes", "tokio/fs"] +request_util = ["reqwest"] \ No newline at end of file diff --git a/libs/infra/src/file_util.rs b/libs/infra/src/file_util.rs new file mode 100644 index 000000000..e492fa006 --- /dev/null +++ b/libs/infra/src/file_util.rs @@ -0,0 +1,211 @@ +use anyhow::anyhow; +use bytes::Bytes; +use std::ops::Deref; +use std::path::PathBuf; +use tokio::io::AsyncReadExt; + +pub const MIN_CHUNK_SIZE: usize = 5 * 1024 * 1024; // 5 MB +pub struct ChunkedBytes { + pub data: Bytes, + pub chunk_size: i32, + pub offsets: Vec<(usize, usize)>, +} + +impl Deref for ChunkedBytes { + type Target = Bytes; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +impl ChunkedBytes { + pub fn from_bytes_with_chunk_size(data: Bytes, chunk_size: i32) -> Result { + if chunk_size < MIN_CHUNK_SIZE as i32 { + return Err(anyhow!( + "Chunk size should be greater than or equal to {}", + MIN_CHUNK_SIZE + )); + } + + let offsets = split_into_chunks(&data, chunk_size as usize); + Ok(ChunkedBytes { + data, + offsets, + chunk_size, + }) + } + + /// Used to create a `ChunkedBytes` from a `Bytes` object. The default chunk size is 5 MB. + pub fn from_bytes(data: Bytes) -> Result { + let chunk_size = MIN_CHUNK_SIZE as i32; + let offsets = split_into_chunks(&data, MIN_CHUNK_SIZE); + Ok(ChunkedBytes { + data, + offsets, + chunk_size, + }) + } + + pub async fn from_file(file_path: &PathBuf, chunk_size: i32) -> Result { + let mut file = tokio::fs::File::open(file_path).await?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer).await?; + let data = Bytes::from(buffer); + + let offsets = split_into_chunks(&data, chunk_size as usize); + Ok(ChunkedBytes { + data, + offsets, + chunk_size, + }) + } + + pub fn set_chunk_size(&mut self, chunk_size: i32) -> Result<(), anyhow::Error> { + if chunk_size < MIN_CHUNK_SIZE as i32 { + return Err(anyhow!( + "Chunk size should be greater than or equal to {}", + MIN_CHUNK_SIZE + )); + } + + self.chunk_size = chunk_size; + self.offsets = split_into_chunks(&self.data, chunk_size as usize); + Ok(()) + } + + pub fn iter(&self) -> ChunkedBytesIterator { + ChunkedBytesIterator { + chunked_data: self, + current_index: 0, + } + } +} + +pub struct ChunkedBytesIterator<'a> { + chunked_data: &'a ChunkedBytes, + current_index: usize, +} +impl<'a> Iterator for ChunkedBytesIterator<'a> { + type Item = Bytes; + + fn next(&mut self) -> Option { + if self.current_index >= self.chunked_data.offsets.len() { + None + } else { + let (start, end) = self.chunked_data.offsets[self.current_index]; + self.current_index += 1; + Some(self.chunked_data.data.slice(start..end)) + } + } +} +// Function to split input bytes into several chunks and return offsets +pub fn split_into_chunks(data: &Bytes, chunk_size: usize) -> Vec<(usize, usize)> { + let mut offsets = Vec::new(); + let mut start = 0; + + while start < data.len() { + let end = std::cmp::min(start + chunk_size, data.len()); + offsets.push((start, end)); + start = end; + } + offsets +} + +// Function to get chunk data using chunk number +pub async fn get_chunk( + data: Bytes, + chunk_number: usize, + offsets: &[(usize, usize)], +) -> Result { + if chunk_number >= offsets.len() { + return Err(anyhow!("Chunk number out of range")); + } + + let (start, end) = offsets[chunk_number]; + let chunk = data.slice(start..end); + + Ok(chunk) +} + +#[cfg(test)] +mod tests { + use crate::file_util::{ChunkedBytes, MIN_CHUNK_SIZE}; + use bytes::Bytes; + use std::env::temp_dir; + use tokio::io::AsyncWriteExt; + + #[tokio::test] + async fn test_chunked_bytes_less_than_chunk_size() { + let data = Bytes::from(vec![0; 1024 * 1024]); // 1 MB of zeroes + let chunked_data = + ChunkedBytes::from_bytes_with_chunk_size(data.clone(), MIN_CHUNK_SIZE as i32).unwrap(); + + // Check if the offsets are correct + assert_eq!(chunked_data.offsets.len(), 1); // Should have 1 chunk + assert_eq!(chunked_data.offsets[0], (0, 1024 * 1024)); + + // Check if the data can be iterated correctly + let mut iter = chunked_data.iter(); + assert_eq!(iter.next().unwrap().len(), 1024 * 1024); + assert!(iter.next().is_none()); + } + + #[tokio::test] + async fn test_chunked_bytes_from_bytes() { + let data = Bytes::from(vec![0; 15 * 1024 * 1024]); // 15 MB of zeroes + let chunked_data = + ChunkedBytes::from_bytes_with_chunk_size(data.clone(), MIN_CHUNK_SIZE as i32).unwrap(); + + // Check if the offsets are correct + assert_eq!(chunked_data.offsets.len(), 3); // Should have 3 chunks + assert_eq!(chunked_data.offsets[0], (0, 5 * 1024 * 1024)); + assert_eq!(chunked_data.offsets[1], (5 * 1024 * 1024, 10 * 1024 * 1024)); + assert_eq!( + chunked_data.offsets[2], + (10 * 1024 * 1024, 15 * 1024 * 1024) + ); + + // Check if the data can be iterated correctly + let mut iter = chunked_data.iter(); + assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); + assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); + assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); + assert!(iter.next().is_none()); + } + + #[tokio::test] + async fn test_chunked_bytes_from_file() { + // Create a temporary file with 15 MB of zeroes + let mut file_path = temp_dir(); + file_path.push("test_file"); + + let mut file = tokio::fs::File::create(&file_path).await.unwrap(); + file.write_all(&vec![0; 15 * 1024 * 1024]).await.unwrap(); + file.flush().await.unwrap(); + + // Read the file into ChunkedBytes + let chunked_data = ChunkedBytes::from_file(&file_path, MIN_CHUNK_SIZE as i32) + .await + .unwrap(); + + // Check if the offsets are correct + assert_eq!(chunked_data.offsets.len(), 3); // Should have 3 chunks + assert_eq!(chunked_data.offsets[0], (0, 5 * 1024 * 1024)); + assert_eq!(chunked_data.offsets[1], (5 * 1024 * 1024, 10 * 1024 * 1024)); + assert_eq!( + chunked_data.offsets[2], + (10 * 1024 * 1024, 15 * 1024 * 1024) + ); + + // Check if the data can be iterated correctly + let mut iter = chunked_data.iter(); + assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); + assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); + assert_eq!(iter.next().unwrap().len(), 5 * 1024 * 1024); + assert!(iter.next().is_none()); + + // Clean up the temporary file + tokio::fs::remove_file(file_path).await.unwrap(); + } +} diff --git a/libs/infra/src/lib.rs b/libs/infra/src/lib.rs index 92dae6c9c..16119205a 100644 --- a/libs/infra/src/lib.rs +++ b/libs/infra/src/lib.rs @@ -1,2 +1,6 @@ pub mod env_util; + +#[cfg(feature = "file_util")] +pub mod file_util; +#[cfg(feature = "request_util")] pub mod reqwest; diff --git a/libs/shared-entity/Cargo.toml b/libs/shared-entity/Cargo.toml index 1a0cc04e5..bbc4d8b17 100644 --- a/libs/shared-entity/Cargo.toml +++ b/libs/shared-entity/Cargo.toml @@ -25,11 +25,10 @@ pin-project = "1.1.5" actix-web = { version = "4.4.1", default-features = false, features = ["http2"], optional = true } validator = { version = "0.16", features = ["validator_derive", "derive"], optional = true } -rust-s3 = { version = "0.34.0-rc4", optional = true } futures = "0.3.30" bytes = "1.6.0" log = "0.4.21" [features] -cloud = ["actix-web", "validator", "rust-s3"] +cloud = ["actix-web", "validator"] diff --git a/script/client_api_deps_check.sh b/script/client_api_deps_check.sh index c5311aa49..37d3461fc 100755 --- a/script/client_api_deps_check.sh +++ b/script/client_api_deps_check.sh @@ -3,7 +3,7 @@ # Generate the current dependency list cargo tree > current_deps.txt -BASELINE_COUNT=594 +BASELINE_COUNT=595 CURRENT_COUNT=$(cat current_deps.txt | wc -l) echo "Expected dependency count (baseline): $BASELINE_COUNT" diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index 34f758a88..2caa1d172 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -207,6 +207,8 @@ where collab_type: collab_type.clone(), doc_state: encode_collab.doc_state.to_vec(), }; + trace!("Send control event: {}", open_event); + tokio::spawn(async move { if let Err(err) = cloned_control_event_stream .lock() diff --git a/src/api/file_storage.rs b/src/api/file_storage.rs index cbe4231f7..d4c64fb56 100644 --- a/src/api/file_storage.rs +++ b/src/api/file_storage.rs @@ -6,12 +6,19 @@ use actix_web::http::header::{ use actix_web::web::{Json, Payload}; use actix_web::{ web::{self, Data}, - HttpRequest, Scope, + HttpRequest, ResponseError, Scope, }; use actix_web::{HttpResponse, Result}; use app_error::AppError; use chrono::DateTime; +use database::file::BlobKey; use database::resource_usage::{get_all_workspace_blob_metadata, get_workspace_usage_size}; +use database_entity::file_dto::{ + CompleteUploadRequest, CreateUploadRequest, CreateUploadResponse, UploadPartData, + UploadPartResponse, +}; + +use serde::Deserialize; use shared_entity::dto::workspace_dto::{BlobMetadata, RepeatedBlobMetaData, WorkspaceSpaceUsage}; use shared_entity::response::{AppResponse, AppResponseError, JsonAppResponse}; use sqlx::types::Uuid; @@ -42,17 +49,139 @@ pub fn file_storage_scope() -> Scope { web::resource("/{workspace_id}/blobs") .route(web::get().to(get_all_workspace_blob_metadata_handler)), ) + .service(web::resource("/{workspace_id}/create_upload").route(web::post().to(create_upload))) + .service( + web::resource("/{workspace_id}/upload_part/{parent_dir}/{file_id}/{upload_id}/{part_num}") + .route(web::put().to(upload_part_handler)), + ) + .service( + web::resource("/{workspace_id}/complete_upload") + .route(web::put().to(complete_upload_handler)), + ) + .service( + web::resource("/{workspace_id}/v1/blob/{parent_dir}/{file_id}") + .route(web::get().to(get_blob_v1_handler)) + .route(web::delete().to(delete_blob_v1_handler)), + ) + .service( + web::resource("/{workspace_id}/v1/metadata/{parent_dir}/{file_id}") + .route(web::get().to(get_blob_metadata_v1_handler)), + ) +} + +#[instrument(skip_all, err)] +async fn create_upload( + workspace_id: web::Path, + state: web::Data, + req: web::Json, +) -> Result> { + let req = req.into_inner(); + if req.parent_dir.is_empty() { + return Err(AppError::InvalidRequest("parent_dir is empty".to_string()).into()); + } + + if req.file_id.is_empty() { + return Err(AppError::InvalidRequest("file_id is empty".to_string()).into()); + } + + let key = BlobPathV1 { + workspace_id: workspace_id.into_inner(), + parent_dir: req.parent_dir.clone(), + file_id: req.file_id.clone(), + }; + let resp = state + .bucket_storage + .create_upload(key, req) + .await + .map_err(AppResponseError::from)?; + + Ok(AppResponse::Ok().with_data(resp).into()) +} + +#[derive(Deserialize)] +struct UploadPartPath { + workspace_id: Uuid, + parent_dir: String, + file_id: String, + upload_id: String, + part_num: i32, +} + +#[instrument(level = "debug", skip_all, err)] +async fn upload_part_handler( + path: web::Path, + state: web::Data, + content_length: web::Header, + mut payload: Payload, +) -> Result> { + let path_params = path.into_inner(); + let content_length = content_length.into_inner().into_inner(); + let mut content = Vec::with_capacity(content_length); + while let Some(chunk) = payload.try_next().await? { + content.extend_from_slice(&chunk); + } + + if content.len() != content_length { + return Err( + AppError::InvalidRequest(format!( + "Content length is {}, but received {} bytes", + content_length, + content.len() + )) + .into(), + ); + } + let data = UploadPartData { + file_id: path_params.file_id.clone(), + upload_id: path_params.upload_id, + part_number: path_params.part_num, + body: content, + }; + + let key = BlobPathV1 { + workspace_id: path_params.workspace_id, + parent_dir: path_params.parent_dir, + file_id: path_params.file_id, + }; + + let resp = state + .bucket_storage + .upload_part(key, data) + .await + .map_err(AppResponseError::from)?; + Ok(AppResponse::Ok().with_data(resp).into()) +} + +async fn complete_upload_handler( + workspace_id: web::Path, + state: web::Data, + req: web::Json, +) -> Result> { + let req = req.into_inner(); + + let key = BlobPathV1 { + workspace_id: workspace_id.into_inner(), + parent_dir: req.parent_dir.clone(), + file_id: req.file_id.clone(), + }; + state + .bucket_storage + .complete_upload(key, req) + .await + .map_err(AppResponseError::from)?; + + Ok(AppResponse::Ok().into()) } #[instrument(skip(state, payload), err)] async fn put_blob_handler( state: Data, - path: web::Path<(Uuid, String)>, + path: web::Path, content_type: web::Header, content_length: web::Header, payload: Payload, ) -> Result> { - let (workspace_id, file_id) = path.into_inner(); + let path = path.into_inner(); let content_length = content_length.into_inner().into_inner(); let content_type = content_type.into_inner().to_string(); let content = { @@ -86,14 +215,14 @@ async fn put_blob_handler( event!( tracing::Level::TRACE, "start put blob. workspace_id: {}, file_id: {}, content_length: {}", - workspace_id, - file_id, + path.workspace_id, + path.file_id, content_length ); state .bucket_storage - .put_blob(workspace_id, file_id.to_string(), content, content_type) + .put_blob(path, content, content_type) .await .map_err(AppResponseError::from)?; @@ -103,29 +232,52 @@ async fn put_blob_handler( #[instrument(level = "debug", skip(state), err)] async fn delete_blob_handler( state: Data, - path: web::Path<(Uuid, String)>, + path: web::Path, ) -> Result> { - let (workspace_id, file_id) = path.into_inner(); + let path = path.into_inner(); state .bucket_storage - .delete_blob(&workspace_id, &file_id) + .delete_blob(path) .await .map_err(AppResponseError::from)?; + Ok(AppResponse::Ok().into()) } #[instrument(level = "debug", skip(state), err)] -async fn get_blob_handler( +async fn get_blob_v1_handler( state: Data, - path: web::Path<(Uuid, String)>, + path: web::Path, req: HttpRequest, ) -> Result> { - let (workspace_id, file_id) = path.into_inner(); + let path = path.into_inner(); + get_blob_by_object_key(state, &path, req).await +} +#[instrument(level = "debug", skip(state), err)] +async fn delete_blob_v1_handler( + state: Data, + path: web::Path, +) -> Result> { + let path = path.into_inner(); + state + .bucket_storage + .delete_blob(path) + .await + .map_err(AppResponseError::from)?; + + Ok(AppResponse::Ok().into()) +} + +async fn get_blob_by_object_key( + state: Data, + key: &impl BlobKey, + req: HttpRequest, +) -> Result> { // Get the metadata let result = state .bucket_storage - .get_blob_metadata(&workspace_id, &file_id) + .get_blob_metadata(key.workspace_id(), key.meta_key()) .await; if let Err(err) = result.as_ref() { @@ -148,34 +300,75 @@ async fn get_blob_handler( return Ok(HttpResponse::NotModified().finish()); } } - let blob = state + + let blob_result = state.bucket_storage.get_blob(key).await; + match blob_result { + Ok(blob) => { + let response = HttpResponse::Ok() + .append_header((ETAG, key.e_tag())) + .append_header((CONTENT_TYPE, metadata.file_type)) + .append_header((LAST_MODIFIED, metadata.modified_at.to_rfc2822())) + .append_header((CONTENT_LENGTH, blob.len())) + .append_header((CACHE_CONTROL, "public, immutable, max-age=31536000"))// 31536000 seconds = 1 year + .body(blob); + + Ok(response) + }, + Err(err) => { + if err.is_record_not_found() { + Ok(HttpResponse::NotFound().finish()) + } else { + Ok(AppResponseError::from(err).error_response()) + } + }, + } +} + +#[instrument(level = "debug", skip(state), err)] +async fn get_blob_handler( + state: Data, + path: web::Path, + req: HttpRequest, +) -> Result> { + let blob_path = path.into_inner(); + get_blob_by_object_key(state, &blob_path, req).await +} + +#[instrument(level = "debug", skip(state), err)] +async fn get_blob_metadata_handler( + state: Data, + path: web::Path, +) -> Result> { + let path = path.into_inner(); + + // Get the metadata + let metadata = state .bucket_storage - .get_blob(&workspace_id, &file_id) + .get_blob_metadata(&path.workspace_id, path.meta_key()) .await + .map(|meta| BlobMetadata { + workspace_id: meta.workspace_id, + file_id: meta.file_id, + file_type: meta.file_type, + file_size: meta.file_size, + modified_at: meta.modified_at, + }) .map_err(AppResponseError::from)?; - let response = HttpResponse::Ok() - .append_header((ETAG, file_id)) - .append_header((CONTENT_TYPE, metadata.file_type)) - .append_header((LAST_MODIFIED, metadata.modified_at.to_rfc2822())) - .append_header((CONTENT_LENGTH, blob.len())) - .append_header((CACHE_CONTROL, "public, immutable, max-age=31536000"))// 31536000 seconds = 1 year - .body(blob); - - Ok(response) + Ok(Json(AppResponse::Ok().with_data(metadata))) } #[instrument(level = "debug", skip(state), err)] -async fn get_blob_metadata_handler( +async fn get_blob_metadata_v1_handler( state: Data, - path: web::Path<(Uuid, String)>, + path: web::Path, ) -> Result> { - let (workspace_id, file_id) = path.into_inner(); + let path = path.into_inner(); // Get the metadata let metadata = state .bucket_storage - .get_blob_metadata(&workspace_id, &file_id) + .get_blob_metadata(&path.workspace_id, path.meta_key()) .await .map(|meta| BlobMetadata { workspace_id: meta.workspace_id, @@ -233,3 +426,54 @@ fn payload_to_async_read(payload: Payload) -> Pin> { let reader = StreamReader::new(mapped); Box::pin(reader) } + +/// Use [BlobPathV0] when get/put object by single part +#[derive(Deserialize, Debug)] +struct BlobPathV0 { + workspace_id: Uuid, + file_id: String, +} + +impl BlobKey for BlobPathV0 { + fn workspace_id(&self) -> &Uuid { + &self.workspace_id + } + + fn object_key(&self) -> String { + format!("{}/{}", self.workspace_id, self.file_id) + } + + fn meta_key(&self) -> &str { + &self.file_id + } + + fn e_tag(&self) -> &str { + &self.file_id + } +} + +/// Use [BlobPathV1] when put/get object by multiple upload parts +#[derive(Deserialize, Debug)] +pub struct BlobPathV1 { + pub workspace_id: Uuid, + pub parent_dir: String, + pub file_id: String, +} + +impl BlobKey for BlobPathV1 { + fn workspace_id(&self) -> &Uuid { + &self.workspace_id + } + + fn object_key(&self) -> String { + format!("{}/{}/{}", self.workspace_id, self.parent_dir, self.file_id) + } + + fn meta_key(&self) -> &str { + &self.file_id + } + + fn e_tag(&self) -> &str { + &self.file_id + } +} diff --git a/src/application.rs b/src/application.rs index f17b4a598..eb1545ad1 100644 --- a/src/application.rs +++ b/src/application.rs @@ -19,12 +19,13 @@ use crate::middleware::request_id::RequestIdMiddleware; use crate::self_signed::create_self_signed_certificate; use crate::state::{AppMetrics, AppState, GoTrueAdmin, UserCache}; use actix::Supervisor; + use actix_identity::IdentityMiddleware; use actix_session::storage::RedisSessionStore; use actix_session::SessionMiddleware; use actix_web::cookie::Key; use actix_web::middleware::NormalizePath; -use actix_web::{dev::Server, web, web::Data, App, HttpServer}; +use actix_web::{dev::Server, web::Data, App, HttpServer}; use anyhow::{Context, Error}; use appflowy_ai_client::client::AppFlowyAIClient; use appflowy_collaborate::actix_ws::server::RealtimeServerActor; @@ -37,13 +38,17 @@ use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender}; use appflowy_collaborate::shared_state::RealtimeSharedState; use appflowy_collaborate::snapshot::SnapshotControl; use appflowy_collaborate::CollaborationServer; -use database::file::bucket_s3_impl::S3BucketStorage; + +use aws_sdk_s3::config::{Credentials, Region, SharedCredentialsProvider}; +use aws_sdk_s3::operation::create_bucket::CreateBucketError; +use aws_sdk_s3::types::{BucketInfo, BucketType, CreateBucketConfiguration}; use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod}; use openssl::x509::X509; use secrecy::{ExposeSecret, Secret}; use snowflake::Snowflake; use sqlx::{postgres::PgPoolOptions, PgPool}; use std::net::TcpListener; + use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; @@ -51,7 +56,8 @@ use tonic_proto::history::history_client::HistoryClient; use crate::api::ai::ai_completion_scope; use crate::api::search::search_scope; -use tracing::{info, warn}; +use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage}; +use tracing::{error, info, trace, warn}; use workspace_access::WorkspaceAccessControlImpl; pub struct Application { @@ -68,7 +74,7 @@ impl Application { let address = format!("{}:{}", config.application.host, config.application.port); let listener = TcpListener::bind(&address)?; let port = listener.local_addr().unwrap().port(); - tracing::info!("Server started at {}", listener.local_addr().unwrap()); + info!("Server started at {}", listener.local_addr().unwrap()); let actix_server = run_actix_server(listener, state, config, rt_cmd_recv).await?; Ok(Self { port, actix_server }) @@ -88,7 +94,7 @@ pub async fn run_actix_server( state: AppState, config: Config, rt_cmd_recv: CLCommandReceiver, -) -> Result { +) -> Result { let redis_store = RedisSessionStore::new(config.redis_uri.expose_secret()) .await .map_err(|e| { @@ -140,7 +146,6 @@ pub async fn run_actix_server( // .wrap(DecryptPayloadMiddleware) .wrap(access_control.clone()) .wrap(RequestIdMiddleware) - .app_data(web::JsonConfig::default().limit(5 * 1024 * 1024)) .service(user_scope()) .service(workspace_scope()) .service(collab_scope()) @@ -191,8 +196,14 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result Result Result { - info!("Connecting to S3 bucket with setting: {:?}", &s3_setting); - let region = { - match s3_setting.use_minio { - true => s3::Region::Custom { - region: s3_setting.region.to_owned(), - endpoint: s3_setting.minio_url.to_owned(), - }, - false => s3_setting - .region - .parse::() - .context("failed to parser s3 setting")?, - } - }; - - let cred = s3::creds::Credentials { - access_key: Some(s3_setting.access_key.to_owned()), - secret_key: Some(s3_setting.secret_key.expose_secret().to_owned()), - security_token: None, - session_token: None, - expiration: None, +pub async fn get_aws_s3_client(s3_setting: &S3Setting) -> Result { + trace!("Connecting to S3 with setting: {:?}", &s3_setting); + let credentials = Credentials::new( + s3_setting.access_key.clone(), + s3_setting.secret_key.expose_secret().clone(), + None, + None, + "custom", + ); + let shared_credentials = SharedCredentialsProvider::new(credentials); + + // Configure the AWS SDK + let config_builder = aws_sdk_s3::Config::builder() + .credentials_provider(shared_credentials) + .force_path_style(true) + .region(Region::new(s3_setting.region.clone())); + let config = if s3_setting.use_minio { + config_builder.endpoint_url(&s3_setting.minio_url).build() + } else { + config_builder.build() }; + let client = aws_sdk_s3::Client::from_conf(config); + create_bucket_if_not_exists(&client, &s3_setting.bucket).await?; + Ok(client) +} - match s3::Bucket::create_with_path_style( - &s3_setting.bucket, - region.clone(), - cred.clone(), - s3::BucketConfiguration::default(), - ) - .await +async fn create_bucket_if_not_exists( + client: &aws_sdk_s3::Client, + bucket: &str, +) -> Result<(), Error> { + let bucket_cfg = CreateBucketConfiguration::builder() + .bucket(BucketInfo::builder().r#type(BucketType::Directory).build()) + .build(); + + match client + .create_bucket() + .create_bucket_configuration(bucket_cfg) + .bucket(bucket) + .send() + .await { - Ok(_) => Ok(()), - Err(e) => match e { - s3::error::S3Error::HttpFailWithBody(409, _) => Ok(()), // Bucket already exists - _ => Err(e), + Ok(_) => { + info!("bucket created successfully: {}", bucket); + Ok(()) }, - }?; - - Ok(s3::Bucket::new(&s3_setting.bucket, region.clone(), cred.clone())?.with_path_style()) + Err(err) => { + if let Some(service_error) = err.as_service_error() { + match service_error { + CreateBucketError::BucketAlreadyOwnedByYou(_) + | CreateBucketError::BucketAlreadyExists(_) => { + info!("Bucket already exists"); + Ok(()) + }, + _ => Err(err.into()), + } + } else { + error!("Failed to create bucket: {:?}", err); + Ok(()) + } + }, + } } async fn get_connection_pool(setting: &DatabaseSetting) -> Result { diff --git a/src/biz/workspace/ops.rs b/src/biz/workspace/ops.rs index 05c2530a3..8f1805f4e 100644 --- a/src/biz/workspace/ops.rs +++ b/src/biz/workspace/ops.rs @@ -11,10 +11,9 @@ use access_control::workspace::WorkspaceAccessControl; use app_error::{AppError, ErrorCode}; use appflowy_collaborate::collab::storage::CollabAccessControlStorage; use database::collab::upsert_collab_member_with_txn; -use database::file::bucket_s3_impl::BucketClientS3Impl; -use database::file::BucketStorage; +use database::file::s3_client_impl::S3BucketStorage; use database::pg_row::{AFWorkspaceMemberRow, AFWorkspaceRow}; -use database::resource_usage::get_all_workspace_blob_metadata; + use database::user::select_uid_from_email; use database::workspace::{ change_workspace_icon, delete_from_workspace, delete_published_collab, delete_workspace_members, @@ -47,20 +46,12 @@ use crate::state::GoTrueAdmin; pub async fn delete_workspace_for_user( pg_pool: &PgPool, workspace_id: &Uuid, - bucket_storage: &Arc>, + bucket_storage: &Arc, ) -> Result<(), AppResponseError> { // remove files from s3 - - let blob_metadatas = get_all_workspace_blob_metadata(pg_pool, workspace_id) - .await - .context("Get all workspace blob metadata")?; - - for blob_metadata in blob_metadatas { - bucket_storage - .delete_blob(workspace_id, blob_metadata.file_id.as_str()) - .await - .context("Delete blob from s3")?; - } + bucket_storage + .remove_dir(workspace_id.to_string().as_str()) + .await?; // remove from postgres delete_from_workspace(pg_pool, workspace_id).await?; diff --git a/src/state.rs b/src/state.rs index e4049abd0..d03f9b13f 100644 --- a/src/state.rs +++ b/src/state.rs @@ -17,7 +17,8 @@ use appflowy_collaborate::collab::storage::CollabAccessControlStorage; use appflowy_collaborate::metrics::CollabMetrics; use appflowy_collaborate::shared_state::RealtimeSharedState; use appflowy_collaborate::CollabRealtimeMetrics; -use database::file::bucket_s3_impl::S3BucketStorage; +use database::file::s3_client_impl::S3BucketStorage; + use database::user::{select_all_uid_uuid, select_uid_from_uuid}; use gotrue::grant::{Grant, PasswordGrant}; use snowflake::Snowflake; diff --git a/tests/workspace/blob/asset/16kb_logo.png b/tests/file_test/asset/16kb_logo.png similarity index 100% rename from tests/workspace/blob/asset/16kb_logo.png rename to tests/file_test/asset/16kb_logo.png diff --git a/tests/file_test/delete_dir_test.rs b/tests/file_test/delete_dir_test.rs new file mode 100644 index 000000000..40bcfc46b --- /dev/null +++ b/tests/file_test/delete_dir_test.rs @@ -0,0 +1,95 @@ +use crate::collab::util::generate_random_string; +use app_error::ErrorCode; +use bytes::Bytes; +use client_api::ChunkedBytes; +use client_api_test::{generate_unique_registered_user_client, workspace_id_from_client}; +use database_entity::file_dto::{CompleteUploadRequest, CompletedPartRequest, CreateUploadRequest}; +use uuid::Uuid; + +#[tokio::test] +async fn delete_workspace_resource_test() { + let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; + let mime = mime::TEXT_PLAIN_UTF_8; + let data = "hello world"; + let file_id = uuid::Uuid::new_v4().to_string(); + let url = c1.get_blob_url(&workspace_id, &file_id); + c1.put_blob(&url, data, &mime).await.unwrap(); + c1.delete_workspace(&workspace_id).await.unwrap(); + + let error = c1.get_blob(&url).await.unwrap_err(); + assert_eq!(error.code, ErrorCode::RecordNotFound); +} + +#[tokio::test] +async fn delete_workspace_sub_folder_resource_test() { + let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; + let parent_dir = format!("SubFolder:{}", uuid::Uuid::new_v4()); + let mime = mime::TEXT_PLAIN_UTF_8; + let mut file_ids = vec![]; + + for i in 1..5 { + let text = generate_random_string(i * 2 * 1024 * 1024); + let file_id = Uuid::new_v4().to_string(); + file_ids.push(file_id.clone()); + let upload = c1 + .create_upload( + &workspace_id, + CreateUploadRequest { + file_id: file_id.clone(), + parent_dir: parent_dir.clone(), + content_type: mime.to_string(), + }, + ) + .await + .unwrap(); + + let chunked_bytes = ChunkedBytes::from_bytes(Bytes::from(text.clone())).unwrap(); + let mut completed_parts = Vec::new(); + let iter = chunked_bytes.iter().enumerate(); + for (index, next) in iter { + let resp = c1 + .upload_part( + &workspace_id, + &parent_dir, + &file_id, + &upload.upload_id, + index as i32 + 1, + next.to_vec(), + ) + .await + .unwrap(); + + completed_parts.push(CompletedPartRequest { + e_tag: resp.e_tag, + part_number: resp.part_num, + }); + } + + let req = CompleteUploadRequest { + file_id: file_id.clone(), + parent_dir: parent_dir.clone(), + upload_id: upload.upload_id, + parts: completed_parts, + }; + c1.complete_upload(&workspace_id, req).await.unwrap(); + + let blob = c1 + .get_blob_v1(&workspace_id, &parent_dir, &file_id) + .await + .unwrap() + .1; + let blob_text = String::from_utf8(blob.to_vec()).unwrap(); + assert_eq!(blob_text, text); + } + c1.delete_workspace(&workspace_id).await.unwrap(); + + for file_id in file_ids { + let error = c1 + .get_blob_v1(&workspace_id, &parent_dir, &file_id) + .await + .unwrap_err(); + assert_eq!(error.code, ErrorCode::RecordNotFound); + } +} diff --git a/tests/file_test/mod.rs b/tests/file_test/mod.rs new file mode 100644 index 000000000..5e18da757 --- /dev/null +++ b/tests/file_test/mod.rs @@ -0,0 +1,64 @@ +use std::borrow::Cow; +use std::ops::Deref; + +mod delete_dir_test; +mod multiple_part_test; +mod put_and_get; +mod usage; + +use appflowy_cloud::application::get_aws_s3_client; +use appflowy_cloud::config::config::S3Setting; +use database::file::s3_client_impl::AwsS3BucketClientImpl; +use lazy_static::lazy_static; +use secrecy::Secret; +use tracing::warn; + +lazy_static! { + pub static ref LOCALHOST_MINIO_URL: Cow<'static, str> = + get_env_var("LOCALHOST_MINIO_URL", "http://localhost:9000"); + pub static ref LOCALHOST_MINIO_ACCESS_KEY: Cow<'static, str> = + get_env_var("LOCALHOST_MINIO_ACCESS_KEY", "minioadmin"); + pub static ref LOCALHOST_MINIO_SECRET_KEY: Cow<'static, str> = + get_env_var("LOCALHOST_MINIO_SECRET_KEY", "minioadmin"); + pub static ref LOCALHOST_MINIO_BUCKET_NAME: Cow<'static, str> = + get_env_var("LOCALHOST_MINIO_BUCKET_NAME", "appflowy"); +} + +pub struct TestBucket(pub AwsS3BucketClientImpl); + +impl Deref for TestBucket { + type Target = AwsS3BucketClientImpl; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl TestBucket { + pub async fn new() -> Self { + let setting = S3Setting { + use_minio: true, + minio_url: LOCALHOST_MINIO_URL.to_string(), + access_key: LOCALHOST_MINIO_ACCESS_KEY.to_string(), + secret_key: Secret::new(LOCALHOST_MINIO_SECRET_KEY.to_string()), + bucket: LOCALHOST_MINIO_BUCKET_NAME.to_string(), + region: "".to_string(), + }; + let client = AwsS3BucketClientImpl::new( + get_aws_s3_client(&setting).await.unwrap(), + setting.bucket.clone(), + ); + Self(client) + } +} + +fn get_env_var<'default>(key: &str, default: &'default str) -> Cow<'default, str> { + dotenvy::dotenv().ok(); + match std::env::var(key) { + Ok(value) => Cow::Owned(value), + Err(_) => { + warn!("could not read env var {}: using default: {}", key, default); + Cow::Borrowed(default) + }, + } +} diff --git a/tests/file_test/multiple_part_test.rs b/tests/file_test/multiple_part_test.rs new file mode 100644 index 000000000..986e6f83f --- /dev/null +++ b/tests/file_test/multiple_part_test.rs @@ -0,0 +1,357 @@ +use super::TestBucket; +use crate::collab::util::{generate_random_bytes, generate_random_string}; +use app_error::ErrorCode; +use appflowy_cloud::api::file_storage::BlobPathV1; +use aws_sdk_s3::types::CompletedPart; +use bytes::Bytes; +use client_api::ChunkedBytes; +use client_api_test::{generate_unique_registered_user_client, workspace_id_from_client}; +use database::file::{BlobKey, BucketClient, ResponseBlob}; +use database_entity::file_dto::{ + CompleteUploadRequest, CompletedPartRequest, CreateUploadRequest, UploadPartData, +}; +use uuid::Uuid; + +#[tokio::test] +async fn multiple_part_put_and_get_test() { + let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; + let parent_dir = workspace_id.clone(); + let mime = mime::TEXT_PLAIN_UTF_8; + let text = generate_random_string(8 * 1024 * 1024); + let file_id = Uuid::new_v4().to_string(); + + let upload = c1 + .create_upload( + &workspace_id, + CreateUploadRequest { + file_id: file_id.clone(), + parent_dir: parent_dir.clone(), + content_type: mime.to_string(), + }, + ) + .await + .unwrap(); + let mut chunked_bytes = ChunkedBytes::from_bytes(Bytes::from(text.clone())).unwrap(); + assert_eq!(chunked_bytes.offsets.len(), 2); + chunked_bytes.set_chunk_size(5 * 1024 * 1024).unwrap(); + + let mut completed_parts = Vec::new(); + let iter = chunked_bytes.iter().enumerate(); + for (index, next) in iter { + let resp = c1 + .upload_part( + &workspace_id, + &parent_dir, + &file_id, + &upload.upload_id, + index as i32 + 1, + next.to_vec(), + ) + .await + .unwrap(); + + completed_parts.push(CompletedPartRequest { + e_tag: resp.e_tag, + part_number: resp.part_num, + }); + } + + assert_eq!(completed_parts.len(), 2); + assert_eq!(completed_parts[0].part_number, 1); + assert_eq!(completed_parts[1].part_number, 2); + + let req = CompleteUploadRequest { + file_id: file_id.clone(), + parent_dir: parent_dir.clone(), + upload_id: upload.upload_id, + parts: completed_parts, + }; + c1.complete_upload(&workspace_id, req).await.unwrap(); + + let blob = c1 + .get_blob_v1(&workspace_id, &parent_dir, &file_id) + .await + .unwrap() + .1; + + let blob_text = String::from_utf8(blob.to_vec()).unwrap(); + assert_eq!(blob_text, text); +} + +#[tokio::test] +async fn single_part_put_and_get_test() { + // Test with smaller file (single part) + let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; + let mime = mime::TEXT_PLAIN_UTF_8; + let text = generate_random_string(1024); + let file_id = Uuid::new_v4().to_string(); + + let upload = c1 + .create_upload( + &workspace_id, + CreateUploadRequest { + file_id: file_id.clone(), + parent_dir: workspace_id.clone(), + content_type: mime.to_string(), + }, + ) + .await + .unwrap(); + + let chunked_bytes = ChunkedBytes::from_bytes(Bytes::from(text.clone())).unwrap(); + assert_eq!(chunked_bytes.offsets.len(), 1); + + let mut completed_parts = Vec::new(); + let iter = chunked_bytes.iter().enumerate(); + for (index, next) in iter { + let resp = c1 + .upload_part( + &workspace_id, + &workspace_id, + &file_id, + &upload.upload_id, + index as i32 + 1, + next.to_vec(), + ) + .await + .unwrap(); + + completed_parts.push(CompletedPartRequest { + e_tag: resp.e_tag, + part_number: resp.part_num, + }); + } + assert_eq!(completed_parts.len(), 1); + assert_eq!(completed_parts[0].part_number, 1); + + let req = CompleteUploadRequest { + file_id: file_id.clone(), + parent_dir: workspace_id.clone(), + upload_id: upload.upload_id, + parts: completed_parts, + }; + c1.complete_upload(&workspace_id, req).await.unwrap(); + + let blob = c1 + .get_blob_v1(&workspace_id, &workspace_id, &file_id) + .await + .unwrap() + .1; + let blob_text = String::from_utf8(blob.to_vec()).unwrap(); + assert_eq!(blob_text, text); +} + +#[tokio::test] +async fn empty_part_upload_test() { + // Test with empty part + let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; + let mime = mime::TEXT_PLAIN_UTF_8; + let file_id = Uuid::new_v4().to_string(); + + let upload = c1 + .create_upload( + &workspace_id, + CreateUploadRequest { + file_id: file_id.clone(), + parent_dir: workspace_id.clone(), + content_type: mime.to_string(), + }, + ) + .await + .unwrap(); + + let result = c1 + .upload_part(&workspace_id, "", &file_id, &upload.upload_id, 1, vec![]) + .await + .unwrap_err(); + assert_eq!(result.code, ErrorCode::InvalidRequest) +} + +#[tokio::test] +async fn multiple_part_upload_test() { + let test_bucket = TestBucket::new().await; + + // Test with a payload of less than 5MB + let small_file_size = 4 * 1024 * 1024; // 4 MB + let small_blob = generate_random_bytes(small_file_size); + perform_upload_test(&test_bucket, small_blob, small_file_size, "small_file").await; + + // Test with a payload of exactly 10MB + let file_size = 10 * 1024 * 1024; // 10 MB + let blob = generate_random_bytes(file_size); + perform_upload_test(&test_bucket, blob, file_size, "large_file").await; + + // Test with a payload of exactly 20MB + let file_size = 20 * 1024 * 1024; // 20 MB + let blob = generate_random_bytes(file_size); + perform_upload_test(&test_bucket, blob, file_size, "large_file").await; +} + +#[tokio::test] +#[should_panic] +async fn multiple_part_upload_empty_data_test() { + let test_bucket = TestBucket::new().await; + let empty_blob = Vec::new(); + perform_upload_test(&test_bucket, empty_blob, 0, "empty_file").await; +} + +async fn perform_upload_test( + test_bucket: &TestBucket, + blob: Vec, + file_size: usize, + description: &str, +) { + let chunk_size = 5 * 1024 * 1024; // 5 MB + let file_id = Uuid::new_v4().to_string(); + let workspace_id = Uuid::new_v4(); + let parent_dir = workspace_id.to_string(); + + let req = CreateUploadRequest { + file_id: file_id.clone(), + parent_dir: parent_dir.clone(), + content_type: "text".to_string(), + }; + + let key = BlobPathV1 { + workspace_id, + parent_dir: parent_dir.clone(), + file_id, + }; + let upload = test_bucket.create_upload(key, req).await.unwrap(); + + let mut chunk_count = (file_size / chunk_size) + 1; + let mut size_of_last_chunk = file_size % chunk_size; + if size_of_last_chunk == 0 { + size_of_last_chunk = chunk_size; + chunk_count -= 1; + } + + let mut completed_parts = Vec::new(); + for chunk_index in 0..chunk_count { + let start = chunk_index * chunk_size; + let end = start + + if chunk_index == chunk_count - 1 { + size_of_last_chunk + } else { + chunk_size + }; + + let chunk = &blob[start..end]; + let part_number = (chunk_index + 1) as i32; + + let req = UploadPartData { + file_id: upload.file_id.clone(), + upload_id: upload.upload_id.clone(), + part_number, + body: chunk.to_vec(), + }; + let key = BlobPathV1 { + workspace_id, + parent_dir: parent_dir.clone(), + file_id: upload.file_id.clone(), + }; + let resp = test_bucket.upload_part(&key, req).await.unwrap(); + + completed_parts.push( + CompletedPart::builder() + .e_tag(resp.e_tag) + .part_number(resp.part_num) + .build(), + ); + } + + let complete_req = CompleteUploadRequest { + file_id: upload.file_id.clone(), + parent_dir: parent_dir.clone(), + upload_id: upload.upload_id.clone(), + parts: completed_parts + .into_iter() + .map(|p| CompletedPartRequest { + e_tag: p.e_tag().unwrap().to_string(), + part_number: p.part_number.unwrap(), + }) + .collect(), + }; + + let key = BlobPathV1 { + workspace_id, + parent_dir: parent_dir.clone(), + file_id: upload.file_id.clone(), + }; + test_bucket + .complete_upload(&key, complete_req) + .await + .unwrap(); + + // Verify the upload + let object = test_bucket.get_blob(&key.object_key()).await.unwrap(); + assert_eq!(object.len(), file_size, "Failed for {}", description); + assert_eq!(object.to_blob(), blob, "Failed for {}", description); +} + +#[tokio::test] +async fn invalid_test() { + let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; + let parent_dir = workspace_id.clone(); + let file_id = uuid::Uuid::new_v4().to_string(); + let mime = mime::TEXT_PLAIN_UTF_8; + + // test invalid create upload request + for request in [ + CreateUploadRequest { + file_id: "".to_string(), + parent_dir: parent_dir.clone(), + content_type: mime.to_string(), + }, + CreateUploadRequest { + file_id: file_id.clone(), + parent_dir: "".to_string(), + content_type: mime.to_string(), + }, + ] { + let err = c1.create_upload(&workspace_id, request).await.unwrap_err(); + assert_eq!(err.code, ErrorCode::InvalidRequest); + } + + // test invalid upload part request + let upload_id = uuid::Uuid::new_v4().to_string(); + for request in vec![ + // workspace_id, parent_dir, file_id, upload_id, part_number, body + ( + "".to_string(), + parent_dir.clone(), + file_id.clone(), + upload_id.clone(), + 1, + vec![1, 2, 3], + ), + ( + workspace_id.clone(), + "".to_string(), + file_id.clone(), + upload_id.clone(), + 1, + vec![1, 2, 3], + ), + ( + workspace_id.clone(), + parent_dir.clone(), + "".to_string(), + upload_id.clone(), + 1, + vec![1, 2, 3], + ), + ] { + let err = c1 + .upload_part( + &request.0, &request.1, &request.2, &request.3, request.4, request.5, + ) + .await + .unwrap_err(); + assert_eq!(err.code, ErrorCode::Internal); + } +} diff --git a/tests/workspace/blob/put_and_get.rs b/tests/file_test/put_and_get.rs similarity index 93% rename from tests/workspace/blob/put_and_get.rs rename to tests/file_test/put_and_get.rs index 08656498d..04588913c 100644 --- a/tests/workspace/blob/put_and_get.rs +++ b/tests/file_test/put_and_get.rs @@ -1,6 +1,9 @@ use super::TestBucket; + use app_error::ErrorCode; + use client_api_test::{generate_unique_registered_user_client, workspace_id_from_client}; +use database::file::{BucketClient, ResponseBlob}; #[tokio::test] async fn get_but_not_exists() { @@ -115,10 +118,8 @@ async fn put_and_delete_workspace() { { // blob exists in the bucket - let raw_data = test_bucket - .get_object(&workspace_id, &file_id) - .await - .unwrap(); + let obj_key = format!("{}/{}", workspace_id, file_id); + let raw_data = test_bucket.get_blob(&obj_key).await.unwrap().to_blob(); assert_eq!(blob_to_put, String::from_utf8_lossy(&raw_data)); } @@ -127,11 +128,9 @@ async fn put_and_delete_workspace() { { // blob does not exist in the bucket - let is_none = test_bucket - .get_object(&workspace_id, &file_id) - .await - .is_none(); - assert!(is_none); + let obj_key = format!("{}/{}", workspace_id, file_id); + let err = test_bucket.get_blob(&obj_key).await.unwrap_err(); + assert!(err.is_record_not_found()); } } diff --git a/tests/workspace/blob/usage.rs b/tests/file_test/usage.rs similarity index 100% rename from tests/workspace/blob/usage.rs rename to tests/file_test/usage.rs diff --git a/tests/main.rs b/tests/main.rs index 586bf2882..67f9c8b72 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -7,5 +7,6 @@ mod workspace; mod ai_test; mod collab_history; +mod file_test; mod search; mod yrs_version; diff --git a/tests/workspace/blob/mod.rs b/tests/workspace/blob/mod.rs deleted file mode 100644 index 7ae09c4b3..000000000 --- a/tests/workspace/blob/mod.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::borrow::Cow; - -mod put_and_get; -mod usage; -use lazy_static::lazy_static; -use tracing::warn; - -lazy_static! { - pub static ref LOCALHOST_MINIO_URL: Cow<'static, str> = - get_env_var("LOCALHOST_MINIO_URL", "http://localhost:9000"); - pub static ref LOCALHOST_MINIO_ACCESS_KEY: Cow<'static, str> = - get_env_var("LOCALHOST_MINIO_ACCESS_KEY", "minioadmin"); - pub static ref LOCALHOST_MINIO_SECRET_KEY: Cow<'static, str> = - get_env_var("LOCALHOST_MINIO_SECRET_KEY", "minioadmin"); - pub static ref LOCALHOST_MINIO_BUCKET_NAME: Cow<'static, str> = - get_env_var("LOCALHOST_MINIO_BUCKET_NAME", "appflowy"); -} - -pub struct TestBucket(pub s3::Bucket); - -impl TestBucket { - pub async fn new() -> Self { - let region = s3::Region::Custom { - region: "".to_owned(), - endpoint: LOCALHOST_MINIO_URL.to_string(), - }; - - let cred = s3::creds::Credentials { - access_key: Some(LOCALHOST_MINIO_ACCESS_KEY.to_string()), - secret_key: Some(LOCALHOST_MINIO_SECRET_KEY.to_string()), - security_token: None, - session_token: None, - expiration: None, - }; - - match s3::Bucket::create_with_path_style( - &LOCALHOST_MINIO_BUCKET_NAME, - region.clone(), - cred.clone(), - s3::BucketConfiguration::default(), - ) - .await - { - Ok(_) => {}, - Err(e) => match e { - s3::error::S3Error::HttpFailWithBody(409, _) => {}, - _ => panic!("could not create bucket: {}", e), - }, - } - - Self( - s3::Bucket::new(&LOCALHOST_MINIO_BUCKET_NAME, region.clone(), cred.clone()) - .unwrap() - .with_path_style(), - ) - } - - pub async fn get_object(&self, workspace_id: &str, file_id: &str) -> Option { - let object_key = format!("{}/{}", workspace_id, file_id); - match self.0.get_object(&object_key).await { - Ok(resp) => { - assert!(resp.status_code() == 200); - Some(resp.bytes().to_owned()) - }, - Err(err) => match err { - s3::error::S3Error::HttpFailWithBody(404, _) => None, - _ => panic!("could not get object: {}", err), - }, - } - } -} - -fn get_env_var<'default>(key: &str, default: &'default str) -> Cow<'default, str> { - dotenvy::dotenv().ok(); - match std::env::var(key) { - Ok(value) => Cow::Owned(value), - Err(_) => { - warn!("could not read env var {}: using default: {}", key, default); - Cow::Borrowed(default) - }, - } -} diff --git a/tests/workspace/mod.rs b/tests/workspace/mod.rs index f94874902..017a8a42b 100644 --- a/tests/workspace/mod.rs +++ b/tests/workspace/mod.rs @@ -1,4 +1,3 @@ -mod blob; mod edit_workspace; mod invitation_crud; mod member_crud;