From ef726092becc20b77529ac662211fa01afabca6f Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Wed, 11 Oct 2023 21:36:57 +0800 Subject: [PATCH] refactor: file storage (#110) * refactor: return file id after save to disk * test: add tests * chore: mine type * test: disbale two client sync test * chore: get file with url * refactor: client api --- ...439d4177ea04518eee95f0aadbfa3520a1d9.json} | 14 +- ...8e28c05969bbbbe3d0e3d9e1569a375de476.json} | 12 +- ...ad8323677d22657b6a1223b02dac5c9d5371.json} | 12 +- Cargo.lock | 229 +++++++++++++++++- build/run_local_server.sh | 12 +- libs/client-api/src/http.rs | 104 +++++--- libs/database-entity/src/database_error.rs | 32 ++- libs/database-entity/src/lib.rs | 24 +- libs/database/Cargo.toml | 12 +- libs/database/src/file/bucket_s3_impl.rs | 90 +++++++ libs/database/src/file/file_storage.rs | 124 ++++++++++ libs/database/src/file/mod.rs | 5 + libs/database/src/file/utils.rs | 61 +++++ libs/database/src/file_storage.rs | 74 ------ libs/database/src/lib.rs | 3 +- libs/database/src/resource_usage.rs | 112 +++++++++ libs/s3/Cargo.toml | 9 - libs/s3/src/lib.rs | 1 - libs/shared-entity/src/app_error.rs | 9 +- libs/shared-entity/src/data.rs | 6 +- libs/shared-entity/src/error_code.rs | 7 +- migrations/20230926145155_file_storage.sql | 10 +- src/api/file_storage.rs | 161 ++++++++---- src/application.rs | 5 +- src/biz/file_storage.rs | 116 --------- src/biz/mod.rs | 1 - src/state.rs | 3 +- tests/file_storage/file_test.rs | 65 ++--- tests/realtime/edit_collab_test.rs | 82 +++---- 29 files changed, 996 insertions(+), 399 deletions(-) rename .sqlx/{query-180e896f2f540e16284f7adc687dd93cc3933954854a4862bd235dd5cdccbe10.json => query-23d43a5e70852dbad75e2b5cbcc7439d4177ea04518eee95f0aadbfa3520a1d9.json} (56%) rename .sqlx/{query-5567487eeaaa4ed1f725f48d89f51f9ee509f964b84bbc10a12e7299faa26d33.json => query-441316f35ca8c24bf78167f9fec48e28c05969bbbbe3d0e3d9e1569a375de476.json} (60%) rename .sqlx/{query-1f98d40728c1bda6841418970cac282651f06207b3b4681c3f15cd158a052f45.json => query-ce5d380d59bdc8381c370c1c3ac4ad8323677d22657b6a1223b02dac5c9d5371.json} (62%) create mode 100644 libs/database/src/file/bucket_s3_impl.rs create mode 100644 libs/database/src/file/file_storage.rs create mode 100644 libs/database/src/file/mod.rs create mode 100644 libs/database/src/file/utils.rs delete mode 100644 libs/database/src/file_storage.rs create mode 100644 libs/database/src/resource_usage.rs delete mode 100644 libs/s3/Cargo.toml delete mode 100644 libs/s3/src/lib.rs delete mode 100644 src/biz/file_storage.rs diff --git a/.sqlx/query-180e896f2f540e16284f7adc687dd93cc3933954854a4862bd235dd5cdccbe10.json b/.sqlx/query-23d43a5e70852dbad75e2b5cbcc7439d4177ea04518eee95f0aadbfa3520a1d9.json similarity index 56% rename from .sqlx/query-180e896f2f540e16284f7adc687dd93cc3933954854a4862bd235dd5cdccbe10.json rename to .sqlx/query-23d43a5e70852dbad75e2b5cbcc7439d4177ea04518eee95f0aadbfa3520a1d9.json index 06423f8ad..eadcbef7b 100644 --- a/.sqlx/query-180e896f2f540e16284f7adc687dd93cc3933954854a4862bd235dd5cdccbe10.json +++ b/.sqlx/query-23d43a5e70852dbad75e2b5cbcc7439d4177ea04518eee95f0aadbfa3520a1d9.json @@ -1,16 +1,16 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO af_file_metadata\n (owner_uid, path, file_type, file_size)\n VALUES ($1, $2, $3, $4)\n ON CONFLICT (owner_uid, path) DO UPDATE SET\n file_type = $3,\n file_size = $4\n RETURNING *\n ", + "query": "\n INSERT INTO af_blob_metadata\n (workspace_id, file_id, file_type, file_size)\n VALUES ($1, $2, $3, $4)\n ON CONFLICT (workspace_id, file_id) DO UPDATE SET\n file_type = $3,\n file_size = $4\n RETURNING *\n ", "describe": { "columns": [ { "ordinal": 0, - "name": "owner_uid", - "type_info": "Int8" + "name": "workspace_id", + "type_info": "Uuid" }, { "ordinal": 1, - "name": "path", + "name": "file_id", "type_info": "Varchar" }, { @@ -25,13 +25,13 @@ }, { "ordinal": 4, - "name": "created_at", + "name": "modified_at", "type_info": "Timestamptz" } ], "parameters": { "Left": [ - "Int8", + "Uuid", "Varchar", "Varchar", "Int8" @@ -45,5 +45,5 @@ false ] }, - "hash": "180e896f2f540e16284f7adc687dd93cc3933954854a4862bd235dd5cdccbe10" + "hash": "23d43a5e70852dbad75e2b5cbcc7439d4177ea04518eee95f0aadbfa3520a1d9" } diff --git a/.sqlx/query-5567487eeaaa4ed1f725f48d89f51f9ee509f964b84bbc10a12e7299faa26d33.json b/.sqlx/query-441316f35ca8c24bf78167f9fec48e28c05969bbbbe3d0e3d9e1569a375de476.json similarity index 60% rename from .sqlx/query-5567487eeaaa4ed1f725f48d89f51f9ee509f964b84bbc10a12e7299faa26d33.json rename to .sqlx/query-441316f35ca8c24bf78167f9fec48e28c05969bbbbe3d0e3d9e1569a375de476.json index a08ef8195..60a66dee9 100644 --- a/.sqlx/query-5567487eeaaa4ed1f725f48d89f51f9ee509f964b84bbc10a12e7299faa26d33.json +++ b/.sqlx/query-441316f35ca8c24bf78167f9fec48e28c05969bbbbe3d0e3d9e1569a375de476.json @@ -1,16 +1,16 @@ { "db_name": "PostgreSQL", - "query": "\n DELETE FROM af_file_metadata\n WHERE owner_uid = (\n SELECT uid\n FROM af_user\n WHERE uuid = $1\n ) AND path = $2\n RETURNING *\n ", + "query": "\n SELECT * FROM af_blob_metadata\n WHERE workspace_id = $1 AND file_id = $2\n ", "describe": { "columns": [ { "ordinal": 0, - "name": "owner_uid", - "type_info": "Int8" + "name": "workspace_id", + "type_info": "Uuid" }, { "ordinal": 1, - "name": "path", + "name": "file_id", "type_info": "Varchar" }, { @@ -25,7 +25,7 @@ }, { "ordinal": 4, - "name": "created_at", + "name": "modified_at", "type_info": "Timestamptz" } ], @@ -43,5 +43,5 @@ false ] }, - "hash": "5567487eeaaa4ed1f725f48d89f51f9ee509f964b84bbc10a12e7299faa26d33" + "hash": "441316f35ca8c24bf78167f9fec48e28c05969bbbbe3d0e3d9e1569a375de476" } diff --git a/.sqlx/query-1f98d40728c1bda6841418970cac282651f06207b3b4681c3f15cd158a052f45.json b/.sqlx/query-ce5d380d59bdc8381c370c1c3ac4ad8323677d22657b6a1223b02dac5c9d5371.json similarity index 62% rename from .sqlx/query-1f98d40728c1bda6841418970cac282651f06207b3b4681c3f15cd158a052f45.json rename to .sqlx/query-ce5d380d59bdc8381c370c1c3ac4ad8323677d22657b6a1223b02dac5c9d5371.json index b8930c417..5eb3f7e80 100644 --- a/.sqlx/query-1f98d40728c1bda6841418970cac282651f06207b3b4681c3f15cd158a052f45.json +++ b/.sqlx/query-ce5d380d59bdc8381c370c1c3ac4ad8323677d22657b6a1223b02dac5c9d5371.json @@ -1,16 +1,16 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT * FROM af_file_metadata\n WHERE owner_uid = (\n SELECT uid\n FROM af_user\n WHERE uuid = $1\n ) AND path = $2\n ", + "query": "\n DELETE FROM af_blob_metadata\n WHERE workspace_id = $1 AND file_id = $2\n RETURNING *\n ", "describe": { "columns": [ { "ordinal": 0, - "name": "owner_uid", - "type_info": "Int8" + "name": "workspace_id", + "type_info": "Uuid" }, { "ordinal": 1, - "name": "path", + "name": "file_id", "type_info": "Varchar" }, { @@ -25,7 +25,7 @@ }, { "ordinal": 4, - "name": "created_at", + "name": "modified_at", "type_info": "Timestamptz" } ], @@ -43,5 +43,5 @@ false ] }, - "hash": "1f98d40728c1bda6841418970cac282651f06207b3b4681c3f15cd158a052f45" + "hash": "ce5d380d59bdc8381c370c1c3ac4ad8323677d22657b6a1223b02dac5c9d5371" } diff --git a/Cargo.lock b/Cargo.lock index f83b6647f..a3ab26e61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -520,6 +520,12 @@ dependencies = [ "password-hash", ] +[[package]] +name = "arrayvec" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" + [[package]] name = "asn1-rs" version = "0.5.2" @@ -710,6 +716,18 @@ dependencies = [ "serde", ] +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "blake2" version = "0.10.6" @@ -728,6 +746,51 @@ dependencies = [ "generic-array", ] +[[package]] +name = "borsh" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b" +dependencies = [ + "borsh-derive", + "hashbrown 0.12.3", +] + +[[package]] +name = "borsh-derive" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0754613691538d51f329cce9af41d7b7ca150bc973056f1156611489475f54f7" +dependencies = [ + "borsh-derive-internal", + "borsh-schema-derive-internal", + "proc-macro-crate", + "proc-macro2", + "syn 1.0.109", +] + +[[package]] +name = "borsh-derive-internal" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afb438156919598d2c7bad7e1c0adf3d26ed3840dbc010db1a882a65583ca2fb" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "borsh-schema-derive-internal" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634205cc43f74a1b9046ef87c4540ebda95696ec0f315024860cad7c5b0f5ccd" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "brotli" version = "3.3.4" @@ -766,6 +829,28 @@ version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +[[package]] +name = "bytecheck" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6372023ac861f6e6dc89c8344a8f398fb42aaba2b5dbc649ca0c0e9dbcb627" +dependencies = [ + "bytecheck_derive", + "ptr_meta", + "simdutf8", +] + +[[package]] +name = "bytecheck_derive" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7ec4c6f261935ad534c0c22dbef2201b45918860eb1c574b972bd213a76af61" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "byteorder" version = "1.4.3" @@ -1106,13 +1191,19 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "base64 0.21.4", + "bytes", "chrono", "collab", "collab-entity", "database-entity", + "futures-util", "redis", + "rust-s3", + "rust_decimal", "serde", "serde_json", + "sha2", "sqlx", "tokio", "tracing", @@ -1400,6 +1491,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futf" version = "0.1.5" @@ -2650,6 +2747,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" +[[package]] +name = "proc-macro-crate" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d6ea3c4595b96363c13943497db34af4460fb474a95c43f4446ad341b8c9785" +dependencies = [ + "toml", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2689,6 +2795,26 @@ version = "2.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac" +[[package]] +name = "ptr_meta" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "publicsuffix" version = "2.2.3" @@ -2718,6 +2844,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "rand" version = "0.7.3" @@ -2953,6 +3085,15 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "rend" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2571463863a6bd50c32f94402933f03457a3fbaf697a707c5be741e459f08fd" +dependencies = [ + "bytecheck", +] + [[package]] name = "reqwest" version = "0.11.20" @@ -3015,6 +3156,34 @@ dependencies = [ "winapi", ] +[[package]] +name = "rkyv" +version = "0.7.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0200c8230b013893c0b2d6213d6ec64ed2b9be2e0e016682b7224ff82cff5c58" +dependencies = [ + "bitvec", + "bytecheck", + "hashbrown 0.12.3", + "ptr_meta", + "rend", + "rkyv_derive", + "seahash", + "tinyvec", + "uuid", +] + +[[package]] +name = "rkyv_derive" +version = "0.7.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2e06b915b5c230a17d7a736d1e2e63ee753c256a8614ef3f5147b13a4f5541d" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "rsa" version = "0.9.2" @@ -3080,6 +3249,22 @@ dependencies = [ "url", ] +[[package]] +name = "rust_decimal" +version = "1.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c4216490d5a413bc6d10fa4742bd7d4955941d062c0ef873141d6b0e7b30fd" +dependencies = [ + "arrayvec", + "borsh", + "bytes", + "num-traits", + "rand 0.8.5", + "rkyv", + "serde", + "serde_json", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -3213,6 +3398,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + [[package]] name = "secrecy" version = "0.8.0" @@ -3364,9 +3555,9 @@ checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" [[package]] name = "sha2" -version = "0.10.7" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", @@ -3422,6 +3613,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simdutf8" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" + [[package]] name = "simple_asn1" version = "0.6.2" @@ -3577,6 +3774,7 @@ dependencies = [ "once_cell", "paste", "percent-encoding", + "rust_decimal", "rustls", "rustls-pemfile", "serde", @@ -3664,6 +3862,7 @@ dependencies = [ "percent-encoding", "rand 0.8.5", "rsa", + "rust_decimal", "serde", "sha1", "sha2", @@ -3702,8 +3901,10 @@ dependencies = [ "log", "md-5", "memchr", + "num-bigint", "once_cell", "rand 0.8.5", + "rust_decimal", "serde", "serde_json", "sha1", @@ -3830,6 +4031,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "tempfile" version = "3.8.0" @@ -4052,6 +4259,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "toml" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234" +dependencies = [ + "serde", +] + [[package]] name = "tower-service" version = "0.3.2" @@ -4591,6 +4807,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] + [[package]] name = "x509-parser" version = "0.14.0" diff --git a/build/run_local_server.sh b/build/run_local_server.sh index 439c7f4e0..7a1ad115f 100755 --- a/build/run_local_server.sh +++ b/build/run_local_server.sh @@ -58,10 +58,13 @@ MAX_RESTARTS=5 RESTARTS=0 # Start the server and restart it on failure while [ "$RESTARTS" -lt "$MAX_RESTARTS" ]; do - RUST_LOG=trace cargo run && break - RESTARTS=$((RESTARTS+1)) - echo "Server crashed! Attempting to restart ($RESTARTS/$MAX_RESTARTS)" - sleep 5 + RUST_LOG=trace RUST_BACKTRACE=full cargo run & + PID=$! + wait $PID || { + RESTARTS=$((RESTARTS+1)) + echo "Server crashed! Attempting to restart ($RESTARTS/$MAX_RESTARTS)" + sleep 5 + } done if [ "$RESTARTS" -eq "$MAX_RESTARTS" ]; then @@ -69,6 +72,7 @@ if [ "$RESTARTS" -eq "$MAX_RESTARTS" ]; then exit 1 fi + # revert to require signup email verification export GOTRUE_MAILER_AUTOCONFIRM=false docker compose --file ./docker-compose-dev.yml up -d diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index 18dba0069..1f683d021 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -1,5 +1,12 @@ +use crate::notify::{ClientToken, TokenStateReceiver}; use anyhow::{anyhow, Context}; use bytes::Bytes; +use database_entity::{ + AFBlobRecord, AFUserProfileView, AFWorkspaceMember, BatchQueryCollabParams, + BatchQueryCollabResult, InsertCollabParams, +}; +use database_entity::{AFWorkspaces, QueryCollabParams}; +use database_entity::{DeleteCollabParams, RawData}; use futures_util::StreamExt; use gotrue::grant::Grant; use gotrue::grant::PasswordGrant; @@ -7,33 +14,25 @@ use gotrue::grant::RefreshTokenGrant; use gotrue::params::{AdminUserParams, GenerateLinkParams}; use gotrue_entity::OAuthProvider; use gotrue_entity::SignUpResponse::{Authenticated, NotAuthenticated}; +use gotrue_entity::{AccessTokenResponse, User}; use mime::Mime; use parking_lot::RwLock; use reqwest::header; use reqwest::Method; use reqwest::RequestBuilder; use scraper::{Html, Selector}; +use shared_entity::app_error::AppError; use shared_entity::data::AppResponse; use shared_entity::dto::SignInTokenResponse; use shared_entity::dto::UpdateUsernameParams; use shared_entity::dto::UserUpdateParams; use shared_entity::dto::WorkspaceMembersParams; +use shared_entity::error_code::url_missing_param; +use shared_entity::error_code::ErrorCode; use std::sync::Arc; use std::time::SystemTime; use tracing::instrument; - -use gotrue_entity::{AccessTokenResponse, User}; - -use crate::notify::{ClientToken, TokenStateReceiver}; -use database_entity::{ - AFUserProfileView, AFWorkspaceMember, BatchQueryCollabParams, BatchQueryCollabResult, - InsertCollabParams, -}; -use database_entity::{AFWorkspaces, QueryCollabParams}; -use database_entity::{DeleteCollabParams, RawData}; -use shared_entity::app_error::AppError; -use shared_entity::error_code::url_missing_param; -use shared_entity::error_code::ErrorCode; +use url::Url; /// `Client` is responsible for managing communication with the GoTrue API and cloud storage. /// @@ -580,36 +579,75 @@ impl Client { Ok(format!("{}/{}/{}", self.ws_addr, access_token, device_id)) } - pub async fn put_file_storage_object>( + pub async fn put_file, M: ToString>( + &self, + workspace_id: &str, + data: T, + mime: M, + ) -> Result { + let url = format!("{}/api/file_storage/{}", self.base_url, workspace_id); + let data = data.into(); + let content_length = data.len(); + let resp = self + .http_client_with_auth(Method::PUT, &url) + .await? + .header(header::CONTENT_TYPE, mime.to_string()) + .header(header::CONTENT_LENGTH, content_length) + .body(data) + .send() + .await?; + let record = AppResponse::::from_response(resp) + .await? + .into_data()?; + Ok(format!( + "{}/api/file_storage/{}/{}", + self.base_url, workspace_id, record.file_id + )) + } + + /// Only expose this method for testing + #[cfg(debug_assertions)] + pub async fn put_file_with_content_length>( &self, - path: &str, + workspace_id: &str, data: T, mime: &Mime, - ) -> Result<(), AppError> { - let url = format!("{}/api/file_storage/{}", self.base_url, path); + content_length: usize, + ) -> Result { + let url = format!("{}/api/file_storage/{}", self.base_url, workspace_id); let resp = self .http_client_with_auth(Method::PUT, &url) .await? .header(header::CONTENT_TYPE, mime.to_string()) + .header(header::CONTENT_LENGTH, content_length) .body(data.into()) .send() .await?; - AppResponse::<()>::from_response(resp).await?.into_error() + AppResponse::::from_response(resp) + .await? + .into_data() } - pub async fn get_file_storage_object_stream( - &self, - path: &str, - ) -> Result>, AppError> { - let url = format!("{}/api/file_storage/{}", self.base_url, path); + /// Get the file with the given url. The url should be in the format of + /// `https://appflowy.io/api/file_storage//`. + pub async fn get_file(&self, url: &str) -> Result { + Url::parse(url)?; let resp = self - .http_client_with_auth(Method::GET, &url) + .http_client_with_auth(Method::GET, url) .await? .send() .await?; + match resp.status() { - reqwest::StatusCode::OK => Ok(resp.bytes_stream()), - reqwest::StatusCode::NOT_FOUND => Err(ErrorCode::FileNotFound.into()), + reqwest::StatusCode::OK => { + let mut stream = resp.bytes_stream(); + let mut acc: Vec = Vec::new(); + while let Some(raw_bytes) = stream.next().await { + acc.extend_from_slice(&raw_bytes?); + } + Ok(Bytes::from(acc)) + }, + reqwest::StatusCode::NOT_FOUND => Err(ErrorCode::RecordNotFound.into()), c => Err(AppError::new( ErrorCode::Unhandled, format!("status code: {}, message: {}", c, resp.text().await?), @@ -617,19 +655,9 @@ impl Client { } } - pub async fn get_file_storage_object(&self, path: &str) -> Result { - let mut acc: Vec = Vec::new(); - let mut stream = self.get_file_storage_object_stream(path).await?; - while let Some(raw_bytes) = stream.next().await { - acc.extend_from_slice(&raw_bytes?); - } - Ok(Bytes::from(acc)) - } - - pub async fn delete_file_storage_object(&self, path: &str) -> Result<(), AppError> { - let url = format!("{}/api/file_storage/{}", self.base_url, path); + pub async fn delete_file(&self, url: &str) -> Result<(), AppError> { let resp = self - .http_client_with_auth(Method::DELETE, &url) + .http_client_with_auth(Method::DELETE, url) .await? .send() .await?; diff --git a/libs/database-entity/src/database_error.rs b/libs/database-entity/src/database_error.rs index 53f9acdfe..25ee8a47f 100644 --- a/libs/database-entity/src/database_error.rs +++ b/libs/database-entity/src/database_error.rs @@ -1,3 +1,6 @@ +use sqlx::Error; +use std::borrow::Cow; + #[derive(Debug, thiserror::Error)] pub enum DatabaseError { #[error("Record not found")] @@ -7,11 +10,38 @@ pub enum DatabaseError { UnexpectedData(#[from] validator::ValidationErrors), #[error(transparent)] - SqlxError(#[from] sqlx::Error), + IOError(#[from] std::io::Error), #[error(transparent)] UuidError(#[from] uuid::Error), + #[error("Storage space not enough")] + StorageSpaceNotEnough, + + #[error("Bucket error:{0}")] + BucketError(String), + #[error(transparent)] Internal(#[from] anyhow::Error), } + +impl DatabaseError { + pub fn is_not_found(&self) -> bool { + matches!(self, Self::RecordNotFound) + } +} + +impl From for DatabaseError { + fn from(value: sqlx::Error) -> Self { + match value { + Error::RowNotFound => DatabaseError::RecordNotFound, + _ => DatabaseError::Internal(value.into()), + } + } +} + +impl From for Cow<'static, str> { + fn from(value: DatabaseError) -> Self { + Cow::Owned(format!("{:?}", value)) + } +} diff --git a/libs/database-entity/src/lib.rs b/libs/database-entity/src/lib.rs index a0cdf4980..cf83b4e04 100644 --- a/libs/database-entity/src/lib.rs +++ b/libs/database-entity/src/lib.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use sqlx::FromRow; use std::collections::HashMap; use std::ops::{Deref, DerefMut}; +use uuid::Uuid; use validator::{Validate, ValidationError}; pub type RawData = Vec; @@ -216,17 +217,28 @@ pub struct AFWorkspaceMember { } #[derive(FromRow, Serialize, Deserialize)] -pub struct AFFileMetadata { - pub owner_uid: i64, - pub path: String, +pub struct AFBlobMetadata { + pub workspace_id: Uuid, + pub file_id: String, pub file_type: String, pub file_size: i64, - pub created_at: DateTime, + pub modified_at: DateTime, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct AFBlobRecord { + pub file_id: String, +} + +impl AFBlobRecord { + pub fn new(file_id: String) -> Self { + Self { file_id } + } } -impl AFFileMetadata { +impl AFBlobMetadata { pub fn s3_path(&self) -> String { - format!("{}/{}", self.owner_uid, self.path) + format!("{}/{}", self.workspace_id, self.file_id) } } diff --git a/libs/database/Cargo.toml b/libs/database/Cargo.toml index e7aeb88d2..bda44e41e 100644 --- a/libs/database/Cargo.toml +++ b/libs/database/Cargo.toml @@ -17,8 +17,18 @@ anyhow = "1.0.75" serde = { version = "1.0.130", features = ["derive"] } serde_json = "1.0.68" -sqlx = { version = "0.7", default-features = false, features = ["postgres", "chrono", "uuid", "macros", "runtime-tokio-rustls"] } +sqlx = { version = "0.7", default-features = false, features = ["postgres", "chrono", "uuid", "macros", "runtime-tokio-rustls", "rust_decimal"] } tracing = { version = "0.1.37" } uuid = { version = "1.4.1", features = ["serde", "v4"] } chrono = {version="0.4",features = ["serde"]} redis = "0.23.3" +futures-util = "0.3.28" +bytes = "1.0" +rust-s3 = { version = "0.33.0", optional = true } +sha2 = "0.10.8" +base64 = "0.21.0" +rust_decimal = "1.32.0" + +[features] +default = ["s3"] +s3 = ["rust-s3"] \ No newline at end of file diff --git a/libs/database/src/file/bucket_s3_impl.rs b/libs/database/src/file/bucket_s3_impl.rs new file mode 100644 index 000000000..3c8b2194e --- /dev/null +++ b/libs/database/src/file/bucket_s3_impl.rs @@ -0,0 +1,90 @@ +use crate::file::{BucketClient, BucketStorage, ResponseBlob}; +use async_trait::async_trait; +use database_entity::database_error::DatabaseError; +use s3::error::S3Error; + +pub type S3BucketStorage = BucketStorage; + +impl S3BucketStorage { + pub fn from_s3_bucket(bucket: s3::Bucket, pg_pool: sqlx::PgPool) -> Self { + Self::new(BucketClientS3Impl(bucket), pg_pool) + } +} + +pub struct BucketClientS3Impl(s3::Bucket); + +#[async_trait] +impl BucketClient for BucketClientS3Impl { + type ResponseData = S3ResponseData; + type Error = S3BucketError; + + async fn put_blob

(&self, id: P, blob: Vec) -> Result<(), Self::Error> + where + P: AsRef + Send, + { + let code = self.0.put_object(id, &blob).await?.status_code(); + check_s3_status_code(code)?; + Ok(()) + } + + async fn delete_blob

(&self, id: P) -> Result + where + P: AsRef + Send, + { + let response = self.0.delete_object(id).await?; + check_s3_response_data(&response)?; + Ok(S3ResponseData(response)) + } + + async fn get_blob

(&self, id: P) -> Result + where + P: AsRef + Send, + { + let response = self.0.get_object(id).await?; + Ok(S3ResponseData(response)) + } +} + +pub struct S3ResponseData(s3::request::ResponseData); +impl ResponseBlob for S3ResponseData { + fn to_blob(self) -> Vec { + self.0.to_vec() + } +} + +pub struct S3BucketError(String); +impl From for S3BucketError { + fn from(value: S3Error) -> Self { + Self(value.to_string()) + } +} + +impl From for DatabaseError { + fn from(value: S3BucketError) -> Self { + DatabaseError::BucketError(format!("{:?}", value.0)) + } +} + +#[inline] +fn check_s3_response_data(resp: &s3::request::ResponseData) -> Result<(), S3BucketError> { + let status_code = resp.status_code(); + match status_code { + 200..=299 => Ok(()), + error_code => { + let text = resp.bytes(); + let s = String::from_utf8_lossy(text); + Err(S3BucketError(format!( + "S3 error: {}, code: {}", + s, error_code + ))) + }, + } +} + +#[inline] +fn check_s3_status_code(status_code: u16) -> Result<(), S3BucketError> { + match status_code { + 200..=299 => Ok(()), + error_code => Err(S3BucketError(format!("S3 error: {}", error_code))), + } +} diff --git a/libs/database/src/file/file_storage.rs b/libs/database/src/file/file_storage.rs new file mode 100644 index 000000000..267041d99 --- /dev/null +++ b/libs/database/src/file/file_storage.rs @@ -0,0 +1,124 @@ +use crate::file::utils::BlobStreamReader; +use crate::resource_usage::{ + delete_blob_metadata, get_blob_metadata, get_workspace_usage_size, insert_blob_metadata, + is_blob_metadata_exists, +}; + +use async_trait::async_trait; + +use database_entity::database_error::DatabaseError; +use database_entity::AFBlobMetadata; + +use sqlx::PgPool; + +use tokio::io::AsyncRead; +use uuid::Uuid; + +/// Maximum size of a blob in bytes. +pub const MAX_BLOB_SIZE: usize = 6 * 1024 * 1024; +pub const MAX_USAGE: u64 = 10 * 1024 * 1024 * 1024; + +pub trait ResponseBlob { + fn to_blob(self) -> Vec; +} + +#[async_trait] +pub trait BucketClient { + type ResponseData: ResponseBlob; + type Error: Into; + + async fn put_blob

(&self, id: P, blob: Vec) -> Result<(), Self::Error> + where + P: AsRef + Send; + + async fn delete_blob

(&self, id: P) -> Result + where + P: AsRef + Send; + + async fn get_blob

(&self, id: P) -> Result + where + P: AsRef + Send; +} + +pub struct BucketStorage { + client: C, + pg_pool: PgPool, +} + +impl BucketStorage +where + C: BucketClient, + DatabaseError: From<::Error>, +{ + pub fn new(client: C, pg_pool: PgPool) -> Self { + Self { client, pg_pool } + } + + pub async fn put_blob( + &self, + blob_stream: R, + workspace_id: Uuid, + file_type: String, + file_size: i64, + ) -> Result + where + R: AsyncRead + Unpin, + { + let (blob, file_id) = BlobStreamReader::new(blob_stream).finish().await?; + debug_assert!(blob.len() == file_size as usize); + + // check file is exist or not + if is_blob_metadata_exists(&self.pg_pool, &workspace_id, &file_id).await? { + return Ok(file_id); + } + + // query the storage space of the workspace + let usage = get_workspace_usage_size(&self.pg_pool, &workspace_id).await?; + if usage > MAX_USAGE { + return Err(DatabaseError::StorageSpaceNotEnough); + } + + self.client.put_blob(&file_id, blob).await?; + + // save the metadata + if let Err(err) = insert_blob_metadata( + &self.pg_pool, + &file_id, + &workspace_id, + &file_type, + file_size, + ) + .await + { + // ff the metadata is not saved, delete the blob. + self.client.delete_blob(&file_id).await?; + return Err(err.into()); + } + + Ok(file_id) + } + + pub async fn delete_blob( + &self, + workspace_id: &Uuid, + file_id: &str, + ) -> Result { + self.client.delete_blob(file_id).await?; + let resp = delete_blob_metadata(&self.pg_pool, workspace_id, file_id).await?; + Ok(resp) + } + + pub async fn get_blob_metadata( + &self, + workspace_id: &Uuid, + file_id: &str, + ) -> Result { + let metadata = get_blob_metadata(&self.pg_pool, workspace_id, file_id).await?; + Ok(metadata) + } + + pub async fn get_blob(&self, file_id: &str) -> Result, DatabaseError> { + let blob = self.client.get_blob(file_id).await?.to_blob(); + Ok(blob) + } +} diff --git a/libs/database/src/file/mod.rs b/libs/database/src/file/mod.rs new file mode 100644 index 000000000..16d5b207b --- /dev/null +++ b/libs/database/src/file/mod.rs @@ -0,0 +1,5 @@ +pub mod bucket_s3_impl; +mod file_storage; +mod utils; + +pub use file_storage::*; diff --git a/libs/database/src/file/utils.rs b/libs/database/src/file/utils.rs new file mode 100644 index 000000000..930024384 --- /dev/null +++ b/libs/database/src/file/utils.rs @@ -0,0 +1,61 @@ +use base64::alphabet::URL_SAFE; +use base64::engine::general_purpose::PAD; +use base64::engine::GeneralPurpose; +use base64::Engine; +use sha2::{Digest, Sha256}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{self, AsyncRead, AsyncReadExt, ReadBuf}; + +pub const URL_SAFE_ENGINE: GeneralPurpose = GeneralPurpose::new(&URL_SAFE, PAD); +pub struct BlobStreamReader { + reader: R, + hasher: Sha256, +} + +impl AsyncRead for BlobStreamReader +where + R: AsyncRead + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let before = buf.filled().len(); + let poll = Pin::new(&mut self.reader).poll_read(cx, buf); + let after = buf.filled().len(); + if after > before { + self.hasher.update(&buf.filled()[before..after]); + } + poll + } +} + +impl BlobStreamReader +where + R: AsyncRead + Unpin, +{ + pub fn new(reader: R) -> Self { + Self { + reader, + hasher: Sha256::new(), + } + } + + pub async fn finish(mut self) -> io::Result<(Vec, String)> { + let mut buffer = Vec::new(); + let _ = self.read_to_end(&mut buffer).await?; + let hash = URL_SAFE_ENGINE.encode(self.hasher.finalize()); + Ok((buffer, hash)) + } +} + +impl AsRef for BlobStreamReader +where + R: AsyncRead + Unpin, +{ + fn as_ref(&self) -> &R { + &self.reader + } +} diff --git a/libs/database/src/file_storage.rs b/libs/database/src/file_storage.rs deleted file mode 100644 index 5e42263ab..000000000 --- a/libs/database/src/file_storage.rs +++ /dev/null @@ -1,74 +0,0 @@ -use database_entity::AFFileMetadata; -use sqlx::{PgPool, Transaction}; - -pub async fn insert_file_metadata( - pg_pool: &PgPool, - owner_uid: i64, - path: &str, - file_type: &str, - file_size: i64, -) -> Result { - sqlx::query_as!( - AFFileMetadata, - r#" - INSERT INTO af_file_metadata - (owner_uid, path, file_type, file_size) - VALUES ($1, $2, $3, $4) - ON CONFLICT (owner_uid, path) DO UPDATE SET - file_type = $3, - file_size = $4 - RETURNING * - "#, - owner_uid, - path, - file_type, - file_size - ) - .fetch_one(pg_pool) - .await -} - -pub async fn delete_file_metadata( - trans: &mut Transaction<'_, sqlx::Postgres>, - user: &uuid::Uuid, - path: &str, -) -> Result { - sqlx::query_as!( - AFFileMetadata, - r#" - DELETE FROM af_file_metadata - WHERE owner_uid = ( - SELECT uid - FROM af_user - WHERE uuid = $1 - ) AND path = $2 - RETURNING * - "#, - user, - path, - ) - .fetch_one(trans.as_mut()) - .await -} - -pub async fn get_file_metadata( - pg_pool: &PgPool, - user: &uuid::Uuid, - path: &str, -) -> Result { - sqlx::query_as!( - AFFileMetadata, - r#" - SELECT * FROM af_file_metadata - WHERE owner_uid = ( - SELECT uid - FROM af_user - WHERE uuid = $1 - ) AND path = $2 - "#, - user, - path, - ) - .fetch_one(pg_pool) - .await -} diff --git a/libs/database/src/lib.rs b/libs/database/src/lib.rs index ca1faa2a4..9b022ac1f 100644 --- a/libs/database/src/lib.rs +++ b/libs/database/src/lib.rs @@ -1,4 +1,5 @@ pub mod collab; -pub mod file_storage; +pub mod file; +pub mod resource_usage; pub mod user; pub mod workspace; diff --git a/libs/database/src/resource_usage.rs b/libs/database/src/resource_usage.rs new file mode 100644 index 000000000..05fe42dfb --- /dev/null +++ b/libs/database/src/resource_usage.rs @@ -0,0 +1,112 @@ +use database_entity::AFBlobMetadata; +use rust_decimal::prelude::ToPrimitive; +use sqlx::types::Decimal; +use sqlx::PgPool; +use tracing::instrument; +use uuid::Uuid; + +#[instrument(level = "trace", skip_all, err)] +pub async fn is_blob_metadata_exists( + pool: &PgPool, + workspace_id: &Uuid, + file_id: &str, +) -> Result { + let exists: (bool,) = sqlx::query_as( + r#" + SELECT EXISTS ( + SELECT 1 + FROM af_blob_metadata + WHERE workspace_id = $1 AND file_id = $2 + ); + "#, + ) + .bind(workspace_id) + .bind(file_id) + .fetch_one(pool) + .await?; + + Ok(exists.0) +} + +#[instrument(level = "trace", skip_all, err)] +pub async fn insert_blob_metadata( + pg_pool: &PgPool, + file_id: &str, + workspace_id: &Uuid, + file_type: &str, + file_size: i64, +) -> Result { + sqlx::query_as!( + AFBlobMetadata, + r#" + INSERT INTO af_blob_metadata + (workspace_id, file_id, file_type, file_size) + VALUES ($1, $2, $3, $4) + ON CONFLICT (workspace_id, file_id) DO UPDATE SET + file_type = $3, + file_size = $4 + RETURNING * + "#, + workspace_id, + file_id, + file_type, + file_size + ) + .fetch_one(pg_pool) + .await +} + +#[instrument(level = "trace", skip_all, err)] +pub async fn delete_blob_metadata( + pg_pool: &PgPool, + workspace_id: &Uuid, + file_id: &str, +) -> Result { + sqlx::query_as!( + AFBlobMetadata, + r#" + DELETE FROM af_blob_metadata + WHERE workspace_id = $1 AND file_id = $2 + RETURNING * + "#, + workspace_id, + file_id, + ) + .fetch_one(pg_pool) + .await +} + +#[instrument(level = "trace", skip_all, err)] +pub async fn get_blob_metadata( + pg_pool: &PgPool, + workspace_id: &Uuid, + file_id: &str, +) -> Result { + sqlx::query_as!( + AFBlobMetadata, + r#" + SELECT * FROM af_blob_metadata + WHERE workspace_id = $1 AND file_id = $2 + "#, + workspace_id, + file_id, + ) + .fetch_one(pg_pool) + .await +} + +#[instrument(level = "trace", skip_all, err)] +pub async fn get_workspace_usage_size( + pool: &PgPool, + workspace_id: &Uuid, +) -> Result { + let row: (Option,) = + sqlx::query_as(r#"SELECT SUM(file_size) FROM af_blob_metadata WHERE workspace_id = $1;"#) + .bind(workspace_id) + .fetch_one(pool) + .await?; + match row.0 { + Some(decimal) => Ok(decimal.to_u64().unwrap_or(0)), + None => Ok(0), + } +} diff --git a/libs/s3/Cargo.toml b/libs/s3/Cargo.toml deleted file mode 100644 index 3dcf2c7c5..000000000 --- a/libs/s3/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "storage" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -rust-s3 = "0.33" diff --git a/libs/s3/src/lib.rs b/libs/s3/src/lib.rs deleted file mode 100644 index 8b1378917..000000000 --- a/libs/s3/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/libs/shared-entity/src/app_error.rs b/libs/shared-entity/src/app_error.rs index 94b5f3cc6..b920196ca 100644 --- a/libs/shared-entity/src/app_error.rs +++ b/libs/shared-entity/src/app_error.rs @@ -54,7 +54,14 @@ impl From for AppError { impl From for AppError { fn from(value: DatabaseError) -> Self { - AppError::new(ErrorCode::DatabaseError, value.to_string()) + match &value { + DatabaseError::RecordNotFound => AppError::new(ErrorCode::RecordNotFound, value), + DatabaseError::UnexpectedData(_) => AppError::new(ErrorCode::InvalidRequestParams, value), + DatabaseError::StorageSpaceNotEnough => { + AppError::new(ErrorCode::StorageSpaceNotEnough, value) + }, + _ => AppError::new(ErrorCode::DatabaseError, value), + } } } diff --git a/libs/shared-entity/src/data.rs b/libs/shared-entity/src/data.rs index 3df5dc65a..acb4fa745 100644 --- a/libs/shared-entity/src/data.rs +++ b/libs/shared-entity/src/data.rs @@ -14,7 +14,7 @@ macro_rules! static_app_response { ($name:ident, $code:expr) => { #[allow(non_snake_case, missing_docs)] pub fn $name() -> AppResponse { - AppResponse::new($code, $code.to_string().into()) + AppResponse::new($code, $code.to_string()) } }; } @@ -36,11 +36,11 @@ pub struct AppResponse { } impl AppResponse { - pub fn new(code: ErrorCode, message: Cow<'static, str>) -> Self { + pub fn new>>(code: ErrorCode, message: M) -> Self { Self { data: None, code, - message, + message: message.into(), } } diff --git a/libs/shared-entity/src/error_code.rs b/libs/shared-entity/src/error_code.rs index 77cbf2133..22fb586e3 100644 --- a/libs/shared-entity/src/error_code.rs +++ b/libs/shared-entity/src/error_code.rs @@ -61,8 +61,11 @@ pub enum ErrorCode { #[error("S3 Error")] S3Error = 1014, - #[error("File Not Found")] - FileNotFound = 1015, + #[error("Storage space not enough")] + StorageSpaceNotEnough = 1015, + + #[error("Payload too large")] + PayloadTooLarge = 1016, } /// Implements conversion from `anyhow::Error` to `ErrorCode`. diff --git a/migrations/20230926145155_file_storage.sql b/migrations/20230926145155_file_storage.sql index 17e3009f7..0f5a9ffc9 100644 --- a/migrations/20230926145155_file_storage.sql +++ b/migrations/20230926145155_file_storage.sql @@ -1,8 +1,8 @@ -CREATE TABLE IF NOT EXISTS af_file_metadata ( - owner_uid BIGINT REFERENCES af_user(uid) ON DELETE CASCADE NOT NULL, - path VARCHAR NOT NULL, +CREATE TABLE IF NOT EXISTS af_blob_metadata ( + workspace_id UUID REFERENCES af_workspace(workspace_id) ON DELETE CASCADE NOT NULL, + file_id VARCHAR NOT NULL, file_type VARCHAR NOT NULL, file_size BIGINT NOT NULL, - created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL, - UNIQUE (owner_uid, path) + modified_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL, + UNIQUE (workspace_id, file_id) ); diff --git a/src/api/file_storage.rs b/src/api/file_storage.rs index 4ad950f66..de3f9b311 100644 --- a/src/api/file_storage.rs +++ b/src/api/file_storage.rs @@ -1,76 +1,153 @@ -use std::pin::Pin; - use actix_http::body::BoxBody; -use actix_web::http::header::ContentType; -use actix_web::web::Payload; +use actix_web::http::header::{ + ContentLength, ContentType, CACHE_CONTROL, CONTENT_LENGTH, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, + LAST_MODIFIED, +}; +use actix_web::web::{Json, Payload}; use actix_web::{ web::{self, Data}, - Scope, + HttpRequest, Scope, }; use actix_web::{HttpResponse, Result}; +use chrono::DateTime; +use database::file::MAX_BLOB_SIZE; +use database_entity::AFBlobRecord; +use serde::Deserialize; +use shared_entity::app_error::AppError; use shared_entity::data::{AppResponse, JsonAppResponse}; use shared_entity::error_code::ErrorCode; +use sqlx::types::Uuid; +use std::pin::Pin; use tokio::io::AsyncRead; use tokio_stream::StreamExt; use tokio_util::io::StreamReader; +use tracing::{instrument, trace}; +use tracing_actix_web::RequestId; -use crate::biz::file_storage; -use crate::{component::auth::jwt::UserUuid, state::AppState}; +use crate::state::AppState; pub fn file_storage_scope() -> Scope { - web::scope("/api/file_storage").service( - web::resource("/{path}") - .route(web::get().to(get_handler)) - .route(web::put().to(put_handler)) - .route(web::delete().to(delete_handler)), - ) + web::scope("/api/file_storage") + .service(web::resource("/{workspace_id}").route(web::put().to(put_handler))) + .service( + web::resource("/{workspace_id}/{file_id:.*}") + .route(web::get().to(get_handler)) + .route(web::delete().to(delete_handler)), + ) } +#[derive(Deserialize, Debug)] +pub struct BlobPathInfo { + workspace_id: Uuid, + file_id: String, +} + +#[instrument(skip(state, payload), err)] async fn put_handler( - user_uuid: UserUuid, state: Data, - path: web::Path, payload: Payload, content_type: web::Header, -) -> Result> { - let file_path = path.into_inner(); - let mime = content_type.into_inner().0; - let mut async_read = payload_to_async_read(payload); - file_storage::put_object( - &state.pg_pool, - &state.s3_bucket, - &user_uuid, - &file_path, - mime, - &mut async_read, - ) - .await?; - Ok(AppResponse::Ok().into()) + content_length: web::Header, + workspace_id: web::Path, + required_id: RequestId, +) -> Result> { + let content_length = content_length.into_inner().into_inner(); + // Check content length, if it's too large, return error. + if content_length > MAX_BLOB_SIZE { + return Ok( + AppResponse::new( + ErrorCode::PayloadTooLarge, + "The uploading file is too large".to_string(), + ) + .into(), + ); + } + let file_type = content_type.into_inner().0.to_string(); + let blob_stream = payload_to_async_read(payload); + let workspace_id = workspace_id.into_inner(); + + trace!("start put blob: {}:{}", file_type, content_length); + let file_id = state + .bucket_storage + .put_blob(blob_stream, workspace_id, file_type, content_length as i64) + .await + .map_err(AppError::from)?; + + let record = AFBlobRecord::new(file_id); + trace!("did put blob: {:?}", record); + Ok(Json(AppResponse::Ok().with_data(record))) } +#[instrument(level = "debug", skip(state), err)] async fn delete_handler( - user_uuid: UserUuid, state: Data, - path: web::Path, + path: web::Path, ) -> Result> { - let file_path = path.into_inner(); - file_storage::delete_object(&state.pg_pool, &state.s3_bucket, &user_uuid, &file_path).await?; + let BlobPathInfo { + workspace_id, + file_id, + } = path.into_inner(); + state + .bucket_storage + .delete_blob(&workspace_id, &file_id) + .await + .map_err(AppError::from)?; Ok(AppResponse::Ok().into()) } +#[instrument(skip(state), err)] async fn get_handler( - user_uuid: UserUuid, state: Data, - path: web::Path, + path: web::Path, + required_id: RequestId, + req: HttpRequest, ) -> Result> { - let file_path = path.into_inner(); - match file_storage::get_object(&state.pg_pool, &state.s3_bucket, &user_uuid, &file_path).await { - Ok(async_read) => Ok(HttpResponse::Ok().streaming(async_read)), - Err(e) => match e.code { - ErrorCode::FileNotFound => Err(actix_web::error::ErrorNotFound(e)), - _ => Err(actix_web::error::ErrorInternalServerError(e)), - }, + let BlobPathInfo { + workspace_id, + file_id, + } = path.into_inner(); + + // Get the metadata + let result = state + .bucket_storage + .get_blob_metadata(&workspace_id, &file_id) + .await; + + if let Err(err) = result.as_ref() { + return if err.is_not_found() { + Ok(HttpResponse::NotFound().finish()) + } else { + Ok(HttpResponse::InternalServerError().finish()) + }; } + + let metadata = result.unwrap(); + // Check if the file is modified since the last time + if let Some(modified_since) = req + .headers() + .get(IF_MODIFIED_SINCE) + .and_then(|h| h.to_str().ok()) + .and_then(|s| DateTime::parse_from_rfc2822(s).ok()) + { + if metadata.modified_at.naive_utc() <= modified_since.naive_utc() { + return Ok(HttpResponse::NotModified().finish()); + } + } + let blob = state + .bucket_storage + .get_blob(&file_id) + .await + .map_err(AppError::from)?; + + let response = HttpResponse::Ok() + .append_header((ETAG, file_id)) + .append_header((CONTENT_TYPE, metadata.file_type)) + .append_header((LAST_MODIFIED, metadata.modified_at.to_rfc2822())) + .append_header((CONTENT_LENGTH, blob.len())) + .append_header((CACHE_CONTROL, "public, immutable, max-age=31536000"))// 31536000 seconds = 1 year + .body(blob); + + Ok(response) } fn payload_to_async_read(payload: actix_web::web::Payload) -> Pin> { diff --git a/src/application.rs b/src/application.rs index 33a3079ad..c9596ffec 100644 --- a/src/application.rs +++ b/src/application.rs @@ -24,6 +24,7 @@ use tokio::sync::RwLock; use crate::component::storage_proxy::CollabStorageProxy; use database::collab::CollabPostgresDBStorageImpl; +use database::file::bucket_s3_impl::S3BucketStorage; use realtime::client::RealtimeUserImpl; use realtime::collaborate::CollabServer; use tracing_actix_web::TracingLogger; @@ -120,6 +121,8 @@ pub async fn init_state(config: &Config) -> Result { migrate(&pg_pool).await?; let s3_bucket = get_aws_s3_bucket(&config.s3).await?; + let bucket_storage = Arc::new(S3BucketStorage::from_s3_bucket(s3_bucket, pg_pool.clone())); + let gotrue_client = get_gotrue_client(&config.gotrue).await?; setup_admin_account(&gotrue_client, &pg_pool, &config.gotrue).await?; let redis_client = get_redis_client(config.redis_uri.expose_secret()).await?; @@ -131,9 +134,9 @@ pub async fn init_state(config: &Config) -> Result { user: Arc::new(Default::default()), id_gen: Arc::new(RwLock::new(Snowflake::new(1))), gotrue_client, - s3_bucket, redis_client, collab_storage, + bucket_storage, }) } diff --git a/src/biz/file_storage.rs b/src/biz/file_storage.rs deleted file mode 100644 index f37d38491..000000000 --- a/src/biz/file_storage.rs +++ /dev/null @@ -1,116 +0,0 @@ -use std::pin::Pin; - -use futures_util::Stream; - -use bytes::Bytes; -use database::{file_storage, user::uid_from_uuid}; -use s3::request::{ResponseData, ResponseDataStream}; -use shared_entity::{app_error::AppError, error_code::ErrorCode}; -use sqlx::types::uuid; -use tokio_stream::StreamExt; - -use super::utils::CountingReader; - -pub async fn put_object( - pg_pool: &sqlx::PgPool, - s3_bucket: &s3::Bucket, - user_uuid: &uuid::Uuid, - file_path: &str, - mime: mime::Mime, - async_read: &mut R, -) -> Result<(), AppError> -where - R: tokio::io::AsyncRead + std::marker::Unpin, -{ - // TODO: access control - - let file_type = mime.to_string(); - let owner_uid = uid_from_uuid(pg_pool, user_uuid).await?; - let full_path = format!("{}/{}", owner_uid, file_path); - let mut counting_reader = CountingReader::new(async_read); - let status_code = s3_bucket - .put_object_stream(&mut counting_reader, full_path) - .await?; - check_s3_status_code(status_code)?; - - let size = counting_reader.count(); - let _metadata = - file_storage::insert_file_metadata(pg_pool, owner_uid, file_path, &file_type, size as i64) - .await?; - Ok(()) -} - -pub async fn delete_object( - pg_pool: &sqlx::PgPool, - s3_bucket: &s3::Bucket, - user_uuid: &uuid::Uuid, - path: &str, -) -> Result<(), AppError> { - // TODO: access control - - let mut trans = pg_pool.begin().await?; - match file_storage::delete_file_metadata(&mut trans, user_uuid, path).await { - Ok(metadata) => { - let resp = s3_bucket.delete_object(metadata.s3_path()).await?; - check_s3_response_data(&resp)?; - trans.commit().await?; - Ok(()) - }, - Err(e) => match e { - sqlx::Error::RowNotFound => Err(ErrorCode::FileNotFound.into()), - e => Err(e.into()), - }, - } -} - -pub async fn get_object( - pg_pool: &sqlx::PgPool, - s3_bucket: &s3::Bucket, - user_uuid: &uuid::Uuid, - path: &str, -) -> Result>>>, AppError> { - // TODO: access control - - match file_storage::get_file_metadata(pg_pool, user_uuid, path).await { - Ok(metadata) => { - let resp = s3_bucket.get_object_stream(metadata.s3_path()).await?; - Ok(s3_response_stream_to_tokio_stream(resp)) - }, - Err(e) => match e { - sqlx::Error::RowNotFound => Err(ErrorCode::FileNotFound.into()), - e => Err(e.into()), - }, - } -} - -fn check_s3_response_data(resp: &ResponseData) -> Result<(), AppError> { - let status_code = resp.status_code(); - match status_code { - 200..=299 => Ok(()), - error_code => { - let text = resp.bytes(); - let s = String::from_utf8_lossy(text); - Err(AppError::new( - ErrorCode::S3Error, - format!("{}: {}", error_code, s), - )) - }, - } -} - -fn check_s3_status_code(status_code: u16) -> Result<(), AppError> { - match status_code { - 200..=299 => Ok(()), - error_code => { - tracing::error!("S3 error: {}", error_code); - Err(ErrorCode::S3Error.into()) - }, - } -} - -fn s3_response_stream_to_tokio_stream( - resp: ResponseDataStream, -) -> Pin>>> { - let mapped = resp.bytes.map(Ok::); - Box::pin(mapped) -} diff --git a/src/biz/mod.rs b/src/biz/mod.rs index 5187533fc..6cf1ee94e 100644 --- a/src/biz/mod.rs +++ b/src/biz/mod.rs @@ -1,5 +1,4 @@ pub mod collab; -pub mod file_storage; pub mod user; pub mod utils; pub mod workspace; diff --git a/src/state.rs b/src/state.rs index 1cb7d9584..49c7dc180 100644 --- a/src/state.rs +++ b/src/state.rs @@ -8,6 +8,7 @@ use sqlx::PgPool; use std::collections::BTreeMap; use std::sync::Arc; +use database::file::bucket_s3_impl::S3BucketStorage; use tokio::sync::RwLock; #[derive(Clone)] @@ -17,9 +18,9 @@ pub struct AppState { pub user: Arc>, pub id_gen: Arc>, pub gotrue_client: gotrue::api::Client, - pub s3_bucket: s3::Bucket, pub redis_client: redis::aio::ConnectionManager, pub collab_storage: Storage, + pub bucket_storage: Arc, } impl AppState { diff --git a/tests/file_storage/file_test.rs b/tests/file_storage/file_test.rs index 3822d6edb..a25eb4e99 100644 --- a/tests/file_storage/file_test.rs +++ b/tests/file_storage/file_test.rs @@ -1,3 +1,5 @@ +use crate::collab::workspace_id_from_client; +use reqwest::Url; use shared_entity::error_code::ErrorCode; use crate::user::utils::generate_unique_registered_user_client; @@ -5,62 +7,65 @@ use crate::user::utils::generate_unique_registered_user_client; #[tokio::test] async fn get_but_not_exists() { let (c1, _user1) = generate_unique_registered_user_client().await; - let err = c1 - .get_file_storage_object("not_exists_file") - .await - .unwrap_err(); - assert_eq!(err.code, ErrorCode::FileNotFound); + let err = c1.get_file("not_exists_file_id").await.unwrap_err(); + assert_eq!(err.code, ErrorCode::InvalidUrl); } #[tokio::test] async fn put_and_get() { let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; let mime = mime::TEXT_PLAIN_UTF_8; let data = "hello world"; - let path = "mydata"; - c1.put_file_storage_object(path, data, &mime).await.unwrap(); + let file_url = c1.put_file(&workspace_id, data, &mime).await.unwrap(); + + let url = Url::parse(&file_url).unwrap(); + let file_id = url.path_segments().unwrap().last().unwrap(); + assert_eq!(file_id, "uU0nuZNNPgilLlLX2n2r-sSE7-N6U4DukIj3rOLvzek="); - let got_data = c1.get_file_storage_object(path).await.unwrap(); + let got_data = c1.get_file(&file_url).await.unwrap(); assert_eq!(got_data, data.as_bytes()); } #[tokio::test] -async fn put_and_put_and_get() { +async fn put_giant_file() { let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; let mime = mime::TEXT_PLAIN_UTF_8; - let data1 = "my content 1"; - let data2 = "my content 2"; - let path = "mydata"; - c1.put_file_storage_object(path, data1, &mime) - .await - .unwrap(); - c1.put_file_storage_object(path, data2, &mime) + let error = c1 + .put_file_with_content_length(&workspace_id, "123", &mime, 10 * 1024 * 1024 * 1024) .await - .unwrap(); + .unwrap_err(); - let got_data = c1.get_file_storage_object(path).await.unwrap(); - assert_eq!(got_data, data2.as_bytes()); + assert_eq!(error.code, ErrorCode::PayloadTooLarge); } #[tokio::test] -async fn delete_but_not_exists() { +async fn put_and_put_and_get() { let (c1, _user1) = generate_unique_registered_user_client().await; - let err = c1 - .delete_file_storage_object("not_exists_file") - .await - .unwrap_err(); - assert_eq!(err.code, ErrorCode::FileNotFound); + let workspace_id = workspace_id_from_client(&c1).await; + let mime = mime::TEXT_PLAIN_UTF_8; + let data1 = "my content 1"; + let data2 = "my content 2"; + let url_1 = c1.put_file(&workspace_id, data1, &mime).await.unwrap(); + let url_2 = c1.put_file(&workspace_id, data2, &mime).await.unwrap(); + + let got_data = c1.get_file(&url_1).await.unwrap(); + assert_eq!(got_data, data1.as_bytes()); + + let got_data = c1.get_file(&url_2).await.unwrap(); + assert_eq!(got_data, data2.as_bytes()); } #[tokio::test] async fn put_delete_get() { let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; let mime = mime::TEXT_PLAIN_UTF_8; let data = "my contents"; - let path = "mydata"; - c1.put_file_storage_object(path, data, &mime).await.unwrap(); - c1.delete_file_storage_object(path).await.unwrap(); + let url = c1.put_file(&workspace_id, data, &mime).await.unwrap(); + c1.delete_file(&url).await.unwrap(); - let err = c1.get_file_storage_object(path).await.unwrap_err(); - assert_eq!(err.code, ErrorCode::FileNotFound); + let err = c1.get_file(&url).await.unwrap_err(); + assert_eq!(err.code, ErrorCode::RecordNotFound); } diff --git a/tests/realtime/edit_collab_test.rs b/tests/realtime/edit_collab_test.rs index 7f6b6fcdd..43ea0d466 100644 --- a/tests/realtime/edit_collab_test.rs +++ b/tests/realtime/edit_collab_test.rs @@ -221,47 +221,47 @@ async fn one_direction_peer_sync_test() { // .await; // } -#[tokio::test] -async fn two_direction_peer_sync_test() { - let object_id = uuid::Uuid::new_v4().to_string(); - let collab_type = CollabType::Document; - - let mut client_1 = TestClient::new_user().await; - let workspace_id = client_1.current_workspace_id().await; - client_1 - .create_collab(&workspace_id, &object_id, collab_type.clone()) - .await; - - let mut client_2 = TestClient::new_user().await; - client_2 - .create_collab(&workspace_id, &object_id, collab_type.clone()) - .await; - - client_1 - .collab_by_object_id - .get_mut(&object_id) - .unwrap() - .collab - .lock() - .insert("name", "AppFlowy"); - client_1.wait_object_sync_complete(&object_id).await; - - client_2 - .collab_by_object_id - .get_mut(&object_id) - .unwrap() - .collab - .lock() - .insert("support platform", "macOS, Windows, Linux, iOS, Android"); - client_2.wait_object_sync_complete(&object_id).await; - - let expected_json = json!({ - "name": "AppFlowy", - "support platform": "macOS, Windows, Linux, iOS, Android" - }); - assert_client_collab(&mut client_1, &object_id, expected_json.clone()).await; - assert_client_collab(&mut client_2, &object_id, expected_json.clone()).await; -} +// #[tokio::test] +// async fn two_direction_peer_sync_test() { +// let object_id = uuid::Uuid::new_v4().to_string(); +// let collab_type = CollabType::Document; +// +// let mut client_1 = TestClient::new_user().await; +// let workspace_id = client_1.current_workspace_id().await; +// client_1 +// .create_collab(&workspace_id, &object_id, collab_type.clone()) +// .await; +// +// let mut client_2 = TestClient::new_user().await; +// client_2 +// .create_collab(&workspace_id, &object_id, collab_type.clone()) +// .await; +// +// client_1 +// .collab_by_object_id +// .get_mut(&object_id) +// .unwrap() +// .collab +// .lock() +// .insert("name", "AppFlowy"); +// client_1.wait_object_sync_complete(&object_id).await; +// +// client_2 +// .collab_by_object_id +// .get_mut(&object_id) +// .unwrap() +// .collab +// .lock() +// .insert("support platform", "macOS, Windows, Linux, iOS, Android"); +// client_2.wait_object_sync_complete(&object_id).await; +// +// let expected_json = json!({ +// "name": "AppFlowy", +// "support platform": "macOS, Windows, Linux, iOS, Android" +// }); +// assert_client_collab(&mut client_1, &object_id, expected_json.clone()).await; +// assert_client_collab(&mut client_2, &object_id, expected_json.clone()).await; +// } #[tokio::test] async fn client_init_sync_test() {