diff --git a/Cargo.lock b/Cargo.lock index 435fd017..8b46e96e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,7 +56,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" dependencies = [ "cfg-if", - "getrandom", "once_cell", "version_check", "zerocopy", @@ -71,12 +70,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "allocator-api2" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" - [[package]] name = "amf0" version = "0.0.1" @@ -105,9 +98,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cd2405b3ac1faab2990b74d728624cd9fd115651fcecc7c2d8daf01376275ba" +checksum = "628a8f9bd1e24b4e0db2b4bc2d000b001e7dd032d54afa60a68836aeec5aa54a" dependencies = [ "anstyle", "anstyle-parse", @@ -213,7 +206,7 @@ checksum = "71938f30533e4d95a6d17aa530939da3842c2ab6f4f84b9dae68447e4129f74a" [[package]] name = "async-graphql" version = "7.0.0" -source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#1e83b73f2a909b7707050c0fea822419c2424c6a" +source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#548beefa0b891aefda4a2f23505afccf3a8a565d" dependencies = [ "async-graphql-derive", "async-graphql-parser", @@ -250,7 +243,7 @@ dependencies = [ [[package]] name = "async-graphql-derive" version = "7.0.0" -source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#1e83b73f2a909b7707050c0fea822419c2424c6a" +source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#548beefa0b891aefda4a2f23505afccf3a8a565d" dependencies = [ "Inflector", "async-graphql-parser", @@ -266,7 +259,7 @@ dependencies = [ [[package]] name = "async-graphql-parser" version = "7.0.0" -source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#1e83b73f2a909b7707050c0fea822419c2424c6a" +source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#548beefa0b891aefda4a2f23505afccf3a8a565d" dependencies = [ "async-graphql-value", "pest", @@ -277,7 +270,7 @@ dependencies = [ [[package]] name = "async-graphql-value" version = "7.0.0" -source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#1e83b73f2a909b7707050c0fea822419c2424c6a" +source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#548beefa0b891aefda4a2f23505afccf3a8a565d" dependencies = [ "bytes", "indexmap 2.1.0", @@ -301,7 +294,7 @@ dependencies = [ "once_cell", "rand", "regex", - "ring", + "ring 0.17.7", "rustls 0.21.10", "rustls-native-certs 0.6.3", "rustls-pemfile 1.0.4", @@ -352,25 +345,6 @@ dependencies = [ "syn 2.0.48", ] -[[package]] -name = "atoi" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" -dependencies = [ - "num-traits", -] - -[[package]] -name = "atomic-write-file" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edcdbedc2236483ab103a53415653d6b4442ea6141baf1ffa85df29635e88436" -dependencies = [ - "nix", - "rand", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -425,7 +399,7 @@ dependencies = [ "hex", "http 0.2.11", "hyper 0.14.28", - "ring", + "ring 0.17.7", "time", "tokio", "tracing", @@ -602,7 +576,7 @@ dependencies = [ "once_cell", "p256 0.11.1", "percent-encoding", - "ring", + "ring 0.17.7", "sha2", "subtle", "time", @@ -705,7 +679,7 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "h2 0.3.23", + "h2 0.3.24", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -896,6 +870,16 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bcder" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c627747a6774aab38beb35990d88309481378558875a41da1a4b2e373c906ef0" +dependencies = [ + "bytes", + "smallvec", +] + [[package]] name = "binary-helper" version = "0.0.1" @@ -912,9 +896,9 @@ dependencies = [ "rustls 0.22.2", "rustls-pemfile 2.0.0", "serde", - "sqlx", "thiserror", "tokio", + "tokio-postgres-rustls", "tonic", "tower-layer", "tracing", @@ -955,12 +939,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.1" +version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" -dependencies = [ - "serde", -] +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" [[package]] name = "bitmask-enum" @@ -1203,6 +1184,7 @@ dependencies = [ "bytes", "config", "const_format", + "deadpool-postgres", "dotenvy", "fnv", "fred", @@ -1218,14 +1200,15 @@ dependencies = [ "path-tree", "pin-project", "portpicker", + "postgres-from-row", + "postgres-types", "prost", "serde", "serde_json", - "sqlx", - "sqlx-postgres", "tempfile", "thiserror", "tokio", + "tokio-postgres", "tokio-util", "tonic", "tonic-build", @@ -1351,21 +1334,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc" -version = "3.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - [[package]] name = "crc16" version = "0.4.0" @@ -1541,6 +1509,39 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" +[[package]] +name = "deadpool" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490" +dependencies = [ + "async-trait", + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-postgres" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda39fa1cfff190d8924d447ad04fd22772c250438ca5ce1dfb3c80621c05aaa" +dependencies = [ + "deadpool", + "tokio", + "tokio-postgres", + "tracing", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49" +dependencies = [ + "tokio", +] + [[package]] name = "default-net" version = "0.21.0" @@ -1677,9 +1678,6 @@ name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" -dependencies = [ - "serde", -] [[package]] name = "elliptic-curve" @@ -1759,23 +1757,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "etcetera" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" -dependencies = [ - "cfg-if", - "home", - "windows-sys 0.48.0", -] - -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "exp_golomb" version = "0.0.1" @@ -1791,7 +1772,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "279d3efcc55e19917fff7ab3ddd6c14afb6a90881a0078465196fe2f99d08c56" dependencies = [ "bit_field", - "flume 0.10.14", + "flume", "half", "lebe", "miniz_oxide", @@ -1800,6 +1781,12 @@ dependencies = [ "zune-inflate", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fallible_collections" version = "0.4.9" @@ -1836,9 +1823,9 @@ checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "fdeflate" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "209098dd6dfc4445aa6111f0e98653ac323eaa4dfd212c9ca3931bf9955c31bd" +checksum = "4f9bfee30e4dedf0ab8b422f03af778d9612b63f502710fc500a334ebe2de645" dependencies = [ "simd-adler32", ] @@ -1958,17 +1945,6 @@ dependencies = [ "spin 0.9.8", ] -[[package]] -name = "flume" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" -dependencies = [ - "futures-core", - "futures-sink", - "spin 0.9.8", -] - [[package]] name = "flv" version = "0.0.1" @@ -2003,7 +1979,7 @@ dependencies = [ [[package]] name = "fred" version = "8.0.0" -source = "git+https://github.com/aembke/fred.rs.git?branch=feat/unix-sockets#32c380e1ab384196422e2d0716a02cba5e9891a4" +source = "git+https://github.com/aembke/fred.rs.git?branch=feat/unix-sockets#073a6d2eec4bf5ce1995df464fe26a76f16c5030" dependencies = [ "arc-swap", "async-trait", @@ -2073,17 +2049,6 @@ dependencies = [ "futures-util", ] -[[package]] -name = "futures-intrusive" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" -dependencies = [ - "futures-core", - "lock_api", - "parking_lot", -] - [[package]] name = "futures-io" version = "0.3.30" @@ -2271,9 +2236,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b553656127a00601c8ae5590fcfdc118e4083a7924b6cf4ffc1ea4b99dc429d7" +checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" dependencies = [ "bytes", "fnv", @@ -2290,9 +2255,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "991910e35c615d8cab86b5ab04be67e6ad24d2bf5f4f11fdbbed26da999bbeab" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" dependencies = [ "bytes", "fnv", @@ -2374,34 +2339,18 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" -dependencies = [ - "ahash 0.8.7", - "allocator-api2", -] - -[[package]] -name = "hashlink" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" -dependencies = [ - "hashbrown 0.14.3", -] [[package]] name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -dependencies = [ - "unicode-segmentation", -] [[package]] name = "hermit-abi" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" [[package]] name = "hex" @@ -2531,7 +2480,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.3.23", + "h2 0.3.24", "http 0.2.11", "http-body 0.4.6", "httparse", @@ -2554,7 +2503,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.1", + "h2 0.4.2", "http 1.0.0", "http-body 1.0.0", "httparse", @@ -2762,15 +2711,6 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" -[[package]] -name = "ipnetwork" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf466541e9d546596ee94f9f69590f89473455f88372423e0008fc1a7daf100e" -dependencies = [ - "serde", -] - [[package]] name = "itertools" version = "0.10.5" @@ -2844,7 +2784,7 @@ dependencies = [ "hmac", "p256 0.13.2", "p384", - "pem", + "pem 3.0.3", "rsa", "serde", "serde_json", @@ -2926,17 +2866,6 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" -[[package]] -name = "libsqlite3-sys" -version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" -dependencies = [ - "cc", - "pkg-config", - "vcpkg", -] - [[package]] name = "libwebp-sys2" version = "0.1.9" @@ -2958,9 +2887,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] name = "linux-raw-sys" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" @@ -3222,17 +3151,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" -[[package]] -name = "nix" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" -dependencies = [ - "bitflags 2.4.1", - "cfg-if", - "libc", -] - [[package]] name = "nkeys" version = "0.3.2" @@ -3562,6 +3480,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "pem" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b13fe415cdf3c8e44518e18a7c95a13431d9bdf6d15367d82b23c377fdd441a" +dependencies = [ + "base64 0.21.7", + "serde", +] + [[package]] name = "pem" version = "3.0.3" @@ -3642,6 +3570,24 @@ dependencies = [ "indexmap 2.1.0", ] +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.1.3" @@ -3707,9 +3653,9 @@ dependencies = [ [[package]] name = "pkg-config" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" +checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" [[package]] name = "platform-api" @@ -3731,7 +3677,6 @@ dependencies = [ "chrono", "common", "config", - "dotenvy", "futures", "futures-util", "hmac", @@ -3741,12 +3686,13 @@ dependencies = [ "hyper 1.1.0", "hyper-tungstenite", "hyper-util", - "itertools 0.12.0", "jwt-next", "multer", "path-tree", "pb", "pin-project", + "postgres-from-row", + "postgres-types", "prost", "rand", "reqwest", @@ -3755,8 +3701,6 @@ dependencies = [ "serde", "serde_json", "sha2", - "sqlx", - "sqlx-postgres", "tempfile", "thiserror", "tokio", @@ -3783,19 +3727,18 @@ dependencies = [ "bytes", "common", "config", - "dotenvy", "fast_image_resize", "ffmpeg", "file-format", "futures", "gifski", "imgref", - "itertools 0.12.0", "libavif-sys", "libwebp-sys2", "num_cpus", "pb", "png", + "postgres-from-row", "prost", "reqwest", "rgb", @@ -3803,7 +3746,6 @@ dependencies = [ "serde", "serde_json", "sha2", - "sqlx", "thiserror", "tokio", "tonic", @@ -3839,6 +3781,71 @@ dependencies = [ "rand", ] +[[package]] +name = "postgres-derive" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83145eba741b050ef981a9a1838c843fa7665e154383325aa8b440ae703180a2" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "postgres-from-row" +version = "0.5.2" +source = "git+https://github.com/ScuffleTV/postgres-from-row.git?branch=troy/from_fn#7c7bf7553a5a7dc2b21ccd777434e426eff4625f" +dependencies = [ + "postgres-from-row-derive", + "tokio-postgres", +] + +[[package]] +name = "postgres-from-row-derive" +version = "0.5.2" +source = "git+https://github.com/ScuffleTV/postgres-from-row.git?branch=troy/from_fn#7c7bf7553a5a7dc2b21ccd777434e426eff4625f" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "postgres-protocol" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" +dependencies = [ + "base64 0.21.7", + "byteorder", + "bytes", + "fallible-iterator", + "hmac", + "md-5", + "memchr", + "rand", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d2234cdee9408b523530a9b6d2d6b373d1db34f6a8e51dc03ded1828d7fb67c" +dependencies = [ + "bytes", + "chrono", + "fallible-iterator", + "postgres-derive", + "postgres-protocol", + "serde", + "serde_json", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -4079,9 +4086,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" +checksum = "fa7237101a77a10773db45d62004a272517633fbcc3df19d96455ede1122e051" dependencies = [ "either", "rayon-core", @@ -4089,9 +4096,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.12.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" dependencies = [ "crossbeam-deque", "crossbeam-utils", @@ -4181,7 +4188,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2 0.3.23", + "h2 0.3.24", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -4260,6 +4267,21 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin 0.5.2", + "untrusted 0.7.1", + "web-sys", + "winapi", +] + [[package]] name = "ring" version = "0.17.7" @@ -4270,7 +4292,7 @@ dependencies = [ "getrandom", "libc", "spin 0.9.8", - "untrusted", + "untrusted 0.9.0", "windows-sys 0.48.0", ] @@ -4367,7 +4389,7 @@ version = "0.38.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "errno", "libc", "linux-raw-sys", @@ -4381,7 +4403,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" dependencies = [ "log", - "ring", + "ring 0.17.7", "rustls-webpki 0.101.7", "sct", ] @@ -4393,7 +4415,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" dependencies = [ "log", - "ring", + "ring 0.17.7", "rustls-pki-types", "rustls-webpki 0.102.1", "subtle", @@ -4456,8 +4478,8 @@ version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ - "ring", - "untrusted", + "ring 0.17.7", + "untrusted 0.9.0", ] [[package]] @@ -4466,9 +4488,9 @@ version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef4ca26037c909dedb327b48c3327d0ba91d3dd3c4e05dad328f210ffb68e95b" dependencies = [ - "ring", + "ring 0.17.7", "rustls-pki-types", - "untrusted", + "untrusted 0.9.0", ] [[package]] @@ -4519,8 +4541,8 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" dependencies = [ - "ring", - "untrusted", + "ring 0.17.7", + "untrusted 0.9.0", ] [[package]] @@ -4812,6 +4834,12 @@ dependencies = [ "quote", ] +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + [[package]] name = "slab" version = "0.4.9" @@ -4872,227 +4900,6 @@ dependencies = [ "der 0.7.8", ] -[[package]] -name = "sqlformat" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce81b7bd7c4493975347ef60d8c7e8b742d4694f4c49f93e0a12ea263938176c" -dependencies = [ - "itertools 0.12.0", - "nom", - "unicode_categories", -] - -[[package]] -name = "sqlx" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dba03c279da73694ef99763320dea58b51095dfe87d001b1d4b5fe78ba8763cf" -dependencies = [ - "sqlx-core", - "sqlx-macros", - "sqlx-mysql", - "sqlx-postgres", - "sqlx-sqlite", -] - -[[package]] -name = "sqlx-core" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d84b0a3c3739e220d94b3239fd69fb1f74bc36e16643423bd99de3b43c21bfbd" -dependencies = [ - "ahash 0.8.7", - "atoi", - "byteorder", - "bytes", - "chrono", - "crc", - "crossbeam-queue", - "dotenvy", - "either", - "event-listener", - "futures-channel", - "futures-core", - "futures-intrusive", - "futures-io", - "futures-util", - "hashlink", - "hex", - "indexmap 2.1.0", - "ipnetwork", - "log", - "memchr", - "once_cell", - "paste", - "percent-encoding", - "rustls 0.21.10", - "rustls-pemfile 1.0.4", - "serde", - "serde_json", - "sha2", - "smallvec", - "sqlformat", - "thiserror", - "tokio", - "tokio-stream", - "tracing", - "url", - "uuid", - "webpki-roots", -] - -[[package]] -name = "sqlx-macros" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89961c00dc4d7dffb7aee214964b065072bff69e36ddb9e2c107541f75e4f2a5" -dependencies = [ - "proc-macro2", - "quote", - "sqlx-core", - "sqlx-macros-core", - "syn 1.0.109", -] - -[[package]] -name = "sqlx-macros-core" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0bd4519486723648186a08785143599760f7cc81c52334a55d6a83ea1e20841" -dependencies = [ - "atomic-write-file", - "dotenvy", - "either", - "heck", - "hex", - "once_cell", - "proc-macro2", - "quote", - "serde", - "serde_json", - "sha2", - "sqlx-core", - "sqlx-mysql", - "sqlx-postgres", - "sqlx-sqlite", - "syn 1.0.109", - "tempfile", - "tokio", - "url", -] - -[[package]] -name = "sqlx-mysql" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e37195395df71fd068f6e2082247891bc11e3289624bbc776a0cdfa1ca7f1ea4" -dependencies = [ - "atoi", - "base64 0.21.7", - "bitflags 2.4.1", - "byteorder", - "bytes", - "chrono", - "crc", - "digest", - "dotenvy", - "either", - "futures-channel", - "futures-core", - "futures-io", - "futures-util", - "generic-array", - "hex", - "hkdf", - "hmac", - "itoa", - "log", - "md-5", - "memchr", - "once_cell", - "percent-encoding", - "rand", - "rsa", - "serde", - "sha1", - "sha2", - "smallvec", - "sqlx-core", - "stringprep", - "thiserror", - "tracing", - "uuid", - "whoami", -] - -[[package]] -name = "sqlx-postgres" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6ac0ac3b7ccd10cc96c7ab29791a7dd236bd94021f31eec7ba3d46a74aa1c24" -dependencies = [ - "atoi", - "base64 0.21.7", - "bitflags 2.4.1", - "byteorder", - "chrono", - "crc", - "dotenvy", - "etcetera", - "futures-channel", - "futures-core", - "futures-io", - "futures-util", - "hex", - "hkdf", - "hmac", - "home", - "ipnetwork", - "itoa", - "log", - "md-5", - "memchr", - "once_cell", - "rand", - "serde", - "serde_json", - "sha1", - "sha2", - "smallvec", - "sqlx-core", - "stringprep", - "thiserror", - "tracing", - "uuid", - "whoami", -] - -[[package]] -name = "sqlx-sqlite" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "210976b7d948c7ba9fced8ca835b11cbb2d677c59c79de41ac0d397e14547490" -dependencies = [ - "atoi", - "chrono", - "flume 0.11.0", - "futures-channel", - "futures-core", - "futures-executor", - "futures-intrusive", - "futures-util", - "libsqlite3-sys", - "log", - "percent-encoding", - "serde", - "sqlx-core", - "tracing", - "url", - "urlencoding", - "uuid", -] - [[package]] name = "static_assertions_next" version = "1.1.2" @@ -5350,6 +5157,47 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "tokio-postgres" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d340244b32d920260ae7448cb72b6e238bddc3d4f7603394e7dd46ed8e48f5b8" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures-channel", + "futures-util", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "rand", + "socket2", + "tokio", + "tokio-util", + "whoami", +] + +[[package]] +name = "tokio-postgres-rustls" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23ca59f99c85e77d01626fa504cd56bcd8ece31c1c9bc218460e3c526690a09f" +dependencies = [ + "futures", + "ring 0.17.7", + "rustls 0.22.2", + "tokio", + "tokio-postgres", + "tokio-rustls 0.25.0", + "x509-certificate", +] + [[package]] name = "tokio-retry" version = "0.3.0" @@ -5486,7 +5334,7 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", - "h2 0.3.23", + "h2 0.3.24", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -5783,6 +5631,8 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e37c4b6cbcc59a8dcd09a6429fbc7890286bcbb79215cea7b38a3c4c0921d93" dependencies = [ + "bytes", + "postgres-types", "rand", "serde", "uuid", @@ -5790,9 +5640,9 @@ dependencies = [ [[package]] name = "unicode-bidi" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" @@ -5821,18 +5671,18 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" -[[package]] -name = "unicode_categories" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" - [[package]] name = "unsafe-libyaml" version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "untrusted" version = "0.9.0" @@ -5941,15 +5791,13 @@ dependencies = [ "itertools 0.12.0", "jwt-next", "pb", + "postgres-from-row", "prost", "rand", "rand_chacha", "serde", "serde_json", "sha2", - "sqlx", - "sqlx-core", - "sqlx-postgres", "tokio", "tokio-stream", "tonic", @@ -5981,7 +5829,6 @@ dependencies = [ "serde", "serde_json", "serde_yaml", - "sqlx", "tokio", "tonic", "ulid", @@ -6001,11 +5848,12 @@ dependencies = [ "futures", "futures-util", "pb", + "postgres-from-row", + "postgres-types", "prost", "serde", - "sqlx", - "sqlx-postgres", "tokio", + "tokio-postgres", "tracing", "ulid", "uuid", @@ -6024,7 +5872,6 @@ dependencies = [ "chrono", "common", "config", - "dotenvy", "futures", "futures-util", "hmac", @@ -6034,14 +5881,13 @@ dependencies = [ "itertools 0.12.0", "jwt-next", "pb", + "postgres-from-row", "prost", "rustls 0.22.2", "rustls-pemfile 2.0.0", "serde", "serde_json", "sha2", - "sqlx", - "sqlx-postgres", "thiserror", "tokio", "tokio-rustls 0.25.0", @@ -6081,14 +5927,13 @@ dependencies = [ "mp4", "pb", "portpicker", + "postgres-from-row", "prost", "rtmp", "rustls 0.22.2", "rustls-pemfile 2.0.0", "serde", "serde_json", - "sqlx", - "sqlx-postgres", "tokio", "tokio-rustls 0.25.0", "tokio-stream", @@ -6169,8 +6014,6 @@ dependencies = [ "serde", "serde_json", "sha2", - "sqlx", - "sqlx-postgres", "tempfile", "thiserror", "tokio", @@ -6320,6 +6163,10 @@ name = "whoami" version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" +dependencies = [ + "wasm-bindgen", + "web-sys", +] [[package]] name = "widestring" @@ -6536,6 +6383,24 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "x509-certificate" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5d27c90840e84503cf44364de338794d5d5680bdd1da6272d13f80b0769ee0" +dependencies = [ + "bcder", + "bytes", + "chrono", + "der 0.7.8", + "hex", + "pem 2.0.1", + "ring 0.16.20", + "signature 2.2.0", + "spki 0.7.3", + "thiserror", +] + [[package]] name = "xmlparser" version = "0.13.6" diff --git a/Cargo.toml b/Cargo.toml index 73b79843..66c93ebf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,8 @@ ffmpeg = { path = "ffmpeg" } # These patches are pending PRs to the upstream crates # TODO: Remove these once the PRs are merged [patch.crates-io] +# https://github.com/remkop22/postgres-from-row/pull/9 +postgres-from-row = { git = "https://github.com/ScuffleTV/postgres-from-row.git", branch = "troy/from_fn" } # https://github.com/async-graphql/async-graphql/pull/1424 async-graphql = { git = "https://github.com/ScuffleTV/async-graphql.git", branch = "troy/union-generics" } # https://github.com/aembke/fred.rs/pull/199 diff --git a/binary-helper/Cargo.toml b/binary-helper/Cargo.toml index ff15379f..a0102a55 100644 --- a/binary-helper/Cargo.toml +++ b/binary-helper/Cargo.toml @@ -10,7 +10,6 @@ tokio = { version = "1.35", features = ["full"] } serde = { version = "1.0.1", features = ["derive"] } async-nats = "0.33" ulid = "1.1" -sqlx = { version = "0.7", features = ["postgres"] } async-trait = "0.1" tonic = { version = "0.10", features = ["tls"] } anyhow = "1.0" @@ -20,6 +19,7 @@ futures-util = "0.3" rustls = "0.22" rustls-pemfile = "2.0" fred = { version = "8.0.0", features = ["enable-rustls", "sentinel-client", "dns"] } +tokio-postgres-rustls = "0.11" config = { workspace = true } common = { workspace = true, features = ["default"] } diff --git a/binary-helper/src/global.rs b/binary-helper/src/global.rs index de7436f8..3c42f0a9 100644 --- a/binary-helper/src/global.rs +++ b/binary-helper/src/global.rs @@ -1,16 +1,16 @@ use std::io; -use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use anyhow::Context as _; use async_nats::ServerAddr; use common::config::{DatabaseConfig, NatsConfig, RedisConfig}; +use common::database::deadpool_postgres::{ManagerConfig, PoolConfig, RecyclingMethod, Runtime}; +use common::database::tokio_postgres::NoTls; +use common::database::Pool; use fred::interfaces::ClientLike; use fred::types::ServerConfig; use rustls::RootCertStore; -use sqlx::postgres::PgConnectOptions; -use sqlx::ConnectOptions; #[macro_export] macro_rules! impl_global_traits { @@ -36,7 +36,7 @@ macro_rules! impl_global_traits { impl common::global::GlobalDb for $struct { #[inline(always)] - fn db(&self) -> &Arc { + fn db(&self) -> &Arc { &self.db } } @@ -89,16 +89,69 @@ pub async fn setup_nats( Ok((nats, jetstream)) } -pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result> { - Ok(Arc::new( - sqlx::PgPool::connect_with( - PgConnectOptions::from_str(&config.uri) - .context("failed to parse database uri")? - .disable_statement_logging() - .to_owned(), +pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result> { + let mut pg_config = config + .uri + .parse::() + .context("invalid database uri")?; + + pg_config.ssl_mode(if config.tls.is_some() { + common::database::tokio_postgres::config::SslMode::Require + } else { + common::database::tokio_postgres::config::SslMode::Disable + }); + + let manager = if let Some(tls) = &config.tls { + let cert = tokio::fs::read(&tls.cert).await.context("failed to read redis client cert")?; + let key = tokio::fs::read(&tls.key) + .await + .context("failed to read redis client private key")?; + + let key = rustls_pemfile::pkcs8_private_keys(&mut io::BufReader::new(io::Cursor::new(key))) + .next() + .ok_or_else(|| anyhow::anyhow!("failed to find private key in redis client private key file"))?? + .into(); + + let certs = rustls_pemfile::certs(&mut io::BufReader::new(io::Cursor::new(cert))).collect::, _>>()?; + + let mut cert_store = RootCertStore::empty(); + if let Some(ca_cert) = &tls.ca_cert { + let ca_cert = tokio::fs::read(ca_cert).await.context("failed to read redis ca cert")?; + let ca_certs = + rustls_pemfile::certs(&mut io::BufReader::new(io::Cursor::new(ca_cert))).collect::, _>>()?; + for cert in ca_certs { + cert_store.add(cert).context("failed to add redis ca cert")?; + } + } + + let tls = rustls::ClientConfig::builder() + .with_root_certificates(cert_store) + .with_client_auth_cert(certs, key) + .context("failed to create redis tls config")?; + + common::database::deadpool_postgres::Manager::from_config( + pg_config, + tokio_postgres_rustls::MakeRustlsConnect::new(tls), + ManagerConfig { + recycling_method: RecyclingMethod::Fast, + }, + ) + } else { + common::database::deadpool_postgres::Manager::from_config( + pg_config, + NoTls, + ManagerConfig { + recycling_method: RecyclingMethod::Fast, + }, ) - .await - .context("failed to connect to database")?, + }; + + Ok(Arc::new( + Pool::builder(manager) + .config(PoolConfig::default()) + .runtime(Runtime::Tokio1) + .build() + .context("failed to create database pool")?, )) } diff --git a/common/Cargo.toml b/common/Cargo.toml index c05ad4aa..3e2e1a9e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -12,11 +12,11 @@ context = ["dep:tokio", "dep:tokio-util"] prelude = ["dep:tokio"] signal = ["tokio/signal", "tokio/process"] macros = [] -database = ["dep:sqlx", "dep:sqlx-postgres", "dep:prost", "dep:uuid", "dep:ulid"] +database = ["dep:tokio-postgres", "dep:postgres-types", "dep:deadpool-postgres", "dep:postgres-from-row", "dep:prost", "ulid/postgres"] dataloader = ["dep:fnv", "dep:futures-util", "dep:futures-channel"] config = ["dep:config", "dep:serde", "logging"] ratelimiter = ["dep:fred"] -global = ["context", "dep:fred", "dep:sqlx", "dep:async-nats"] +global = ["context", "dep:fred", "database", "dep:async-nats"] http = ["dep:hyper", "dep:serde_json", "dep:bytes", "dep:http-body-util", "dep:pin-project", "dep:path-tree"] task = ["dep:tokio", "dep:thiserror"] s3 = ["dep:aws-sdk-s3", "dep:aws-credential-types", "dep:aws-config", "dep:aws-smithy-types", "dep:http-body"] @@ -56,8 +56,10 @@ futures-channel = { version = "0.3", optional = true } const_format = { version = "0.2" } -sqlx = { version = "0.7", features = ["postgres", "json", "chrono", "uuid"], optional = true } -sqlx-postgres = { version = "0.7", optional = true } +tokio-postgres = { version = "0.7", optional = true } +postgres-types = { version = "0.2", optional = true, features = ["with-serde_json-1", "with-chrono-0_4", "derive"] } +deadpool-postgres = { version = "0.12", optional = true } +postgres-from-row = { version = "0.5", optional = true } prost = { version = "0.12", optional = true } uuid = { version = "1.6", features = ["v4"], optional = true } ulid = { version = "1.1", features = ["uuid"], optional = true} diff --git a/common/src/config.rs b/common/src/config.rs index b6d0eb1c..e33de0a5 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -134,12 +134,16 @@ impl Default for RedisConfig { pub struct DatabaseConfig { /// The database URL to use pub uri: String, + + /// The TLS configuration + pub tls: Option, } impl Default for DatabaseConfig { fn default() -> Self { Self { uri: "postgres://localhost:5432".to_string(), + tls: None, } } } diff --git a/common/src/database/mod.rs b/common/src/database/mod.rs index e853438d..e1ab8704 100644 --- a/common/src/database/mod.rs +++ b/common/src/database/mod.rs @@ -1,7 +1,19 @@ -mod non_null; mod protobuf; -mod ulid; +mod query_builder; -pub use non_null::*; +pub use deadpool_postgres::Pool; +pub use postgres_from_row::FromRow; +pub use postgres_types::Json; pub use protobuf::*; -pub use ulid::*; +pub use query_builder::*; +pub use {deadpool_postgres, postgres_from_row, postgres_types, tokio_postgres}; + +#[inline] +pub fn json(row: Json) -> T { + row.0 +} + +#[inline] +pub fn non_null_vec(vec: Vec>) -> Vec { + vec.into_iter().flatten().collect() +} diff --git a/common/src/database/non_null.rs b/common/src/database/non_null.rs deleted file mode 100644 index 5c254007..00000000 --- a/common/src/database/non_null.rs +++ /dev/null @@ -1,74 +0,0 @@ -#[derive(Default)] -pub struct PgNonNullVec(Vec); - -impl<'r, T> sqlx::Decode<'r, sqlx::Postgres> for PgNonNullVec -where - Vec>: sqlx::Decode<'r, sqlx::Postgres> + sqlx::Type, -{ - fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result> { - let vec: Vec> = sqlx::Decode::decode(value)?; - Ok(PgNonNullVec(vec.into_iter().flatten().collect())) - } -} - -impl Clone for PgNonNullVec { - fn clone(&self) -> Self { - PgNonNullVec(self.0.clone()) - } -} - -impl std::fmt::Debug for PgNonNullVec { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl<'r, T> sqlx::Type for PgNonNullVec -where - Vec: sqlx::Decode<'r, sqlx::Postgres> + sqlx::Type, -{ - fn type_info() -> sqlx::postgres::PgTypeInfo { - as sqlx::Type>::type_info() - } -} - -impl std::ops::Deref for PgNonNullVec { - type Target = Vec; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl std::ops::DerefMut for PgNonNullVec { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl std::iter::FromIterator for PgNonNullVec { - fn from_iter>(iter: I) -> Self { - PgNonNullVec(iter.into_iter().collect()) - } -} - -impl std::iter::IntoIterator for PgNonNullVec { - type IntoIter = std::vec::IntoIter; - type Item = T; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -impl std::iter::Extend for PgNonNullVec { - fn extend>(&mut self, iter: I) { - self.0.extend(iter) - } -} - -impl PgNonNullVec { - pub fn into_inner(self) -> Vec { - self.0 - } -} diff --git a/common/src/database/protobuf.rs b/common/src/database/protobuf.rs index 8e94af3a..c815a209 100644 --- a/common/src/database/protobuf.rs +++ b/common/src/database/protobuf.rs @@ -1,101 +1,49 @@ -#[repr(transparent)] -pub struct Protobuf(pub T); - -impl Clone for Protobuf -where - T: Clone, -{ - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -pub trait TraitProtobuf { - fn into_inner(self) -> T; -} - -pub trait TraitProtobufVec { - fn into_vec(self) -> Vec; -} - -impl TraitProtobuf for Protobuf { - fn into_inner(self) -> T { - self.0 - } -} - -impl TraitProtobufVec for Vec> { - fn into_vec(self) -> Vec { - self.into_iter().map(|a| a.into_inner()).collect() - } -} - -impl std::fmt::Debug for Protobuf { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} +use bytes::BytesMut; +use postgres_types::{accepts, to_sql_checked, IsNull}; -impl Default for Protobuf { - fn default() -> Self { - Self(T::default()) - } -} +#[derive(Debug, Clone)] +pub struct Protobuf(pub T); -impl sqlx::Type for Protobuf { - fn type_info() -> sqlx::postgres::PgTypeInfo { - as sqlx::Type>::type_info() - } -} +impl postgres_types::FromSql<'_> for Protobuf { + accepts!(BYTEA); -impl sqlx::postgres::PgHasArrayType for Protobuf { - fn array_type_info() -> sqlx::postgres::PgTypeInfo { - as sqlx::postgres::PgHasArrayType>::array_type_info() + fn from_sql(_ty: &postgres_types::Type, raw: &[u8]) -> Result> { + Ok(Self(T::decode(raw)?)) } } -impl sqlx::Encode<'_, sqlx::Postgres> for Protobuf { - fn encode_by_ref(&self, buf: &mut sqlx::postgres::PgArgumentBuffer) -> sqlx::encode::IsNull { - as sqlx::Encode>::encode_by_ref(&self.0.encode_to_vec(), buf) - } -} +impl postgres_types::ToSql for Protobuf { + to_sql_checked!(); -impl sqlx::Decode<'_, sqlx::Postgres> for Protobuf { - fn decode(value: sqlx::postgres::PgValueRef<'_>) -> Result> { - let bytes = as sqlx::Decode>::decode(value)?; - let inner = T::decode(bytes.as_slice())?; - Ok(Self(inner)) + fn to_sql( + &self, + ty: &postgres_types::Type, + w: &mut BytesMut, + ) -> Result> { + <&[u8] as postgres_types::ToSql>::to_sql(&&*self.0.encode_to_vec(), ty, w) } -} -impl AsRef for Protobuf { - fn as_ref(&self) -> &T { - &self.0 + fn accepts(ty: &postgres_types::Type) -> bool { + <&[u8] as postgres_types::ToSql>::accepts(ty) } } -impl AsMut for Protobuf { - fn as_mut(&mut self) -> &mut T { - &mut self.0 - } +#[inline] +pub fn protobuf(row: Protobuf) -> T { + row.0 } -impl std::ops::Deref for Protobuf { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.0 - } +#[inline] +pub fn protobuf_vec(row: Vec>) -> Vec { + row.into_iter().map(|protobuf| protobuf.0).collect() } -impl std::ops::DerefMut for Protobuf { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } +#[inline] +pub fn protobuf_opt(row: Option>) -> Option { + row.map(protobuf) } -impl From for Protobuf { - fn from(inner: T) -> Self { - Self(inner) - } +#[inline] +pub fn protobuf_vec_opt(row: Option>>) -> Option> { + row.map(protobuf_vec) } diff --git a/common/src/database/query_builder.rs b/common/src/database/query_builder.rs new file mode 100644 index 00000000..8b96b2b7 --- /dev/null +++ b/common/src/database/query_builder.rs @@ -0,0 +1,433 @@ +use std::sync::Arc; + +use futures_util::{Stream, StreamExt}; +use postgres_from_row::FromRow; +use postgres_types::{FromSql, ToSql}; +use tokio_postgres::{Error, Row}; + +pub fn query<'a>(query: impl ToString) -> QueryBuilder<'a> { + QueryBuilder::new(query) +} + +#[derive(Default)] +pub struct QueryBuilder<'a> { + query: String, + params: Vec>, +} + +impl<'args> QueryBuilder<'args> { + pub fn new(query: impl ToString) -> Self { + Self { + query: query.to_string(), + params: Vec::new(), + } + } + + pub fn push_bind(&mut self, param: impl ToSql + Send + Sync + 'args) -> &mut Self { + self.params.push(Box::new(param)); + self.query.push_str(format!("${}", self.params.len()).as_str()); + self + } + + pub fn bind(&mut self, param: impl ToSql + Send + Sync + 'args) -> &mut Self { + self.params.push(Box::new(param)); + self + } + + pub fn push(&mut self, query: impl AsRef) -> &mut Self { + self.query.push_str(query.as_ref()); + self + } + + pub fn separated(&mut self, sep: &'args str) -> Separated<'_, 'args> { + Separated { + sep, + first: true, + query_builder: self, + } + } + + pub fn push_tuples( + &mut self, + tuples: impl IntoIterator, + mut f: impl FnMut(Separated<'_, 'args>, T), + ) -> &mut Self { + self.push(" ("); + + let mut separated = self.separated(","); + + for tuple in tuples { + separated.push("("); + + f(separated.query_builder.separated(", "), tuple); + + separated.push_unseparated(")"); + } + + separated.push_unseparated(")"); + + separated.query_builder + } + + pub fn push_values( + &mut self, + values: impl IntoIterator, + mut f: impl FnMut(Separated<'_, 'args>, T), + ) -> &mut Self { + self.push("VALUES "); + + let mut separated = self.separated(","); + + for value in values { + separated.push("("); + + f(separated.query_builder.separated(", "), value); + + separated.push_unseparated(")"); + } + + separated.query_builder + } + + pub fn build(&self) -> Query<'_, NoParse, Row> { + Query { + query: &self.query, + params: &self.params, + _marker: std::marker::PhantomData, + } + } + + pub fn build_query_as(&self) -> Query<'_, FromRowParse, T> { + Query { + query: &self.query, + params: &self.params, + _marker: std::marker::PhantomData, + } + } + + pub fn build_query_scalar(&self) -> Query<'_, ScalarParse, T> { + Query { + query: &self.query, + params: &self.params, + _marker: std::marker::PhantomData, + } + } + + pub fn build_query_single_scalar FromSql<'a>>(&self) -> Query<'_, SingleScalarParse, T> { + Query { + query: &self.query, + params: &self.params, + _marker: std::marker::PhantomData, + } + } + + pub fn sql(&self) -> &str { + self.query.as_str() + } +} + +pub struct ScalarParse(std::marker::PhantomData); + +pub struct SingleScalarParse(std::marker::PhantomData); + +pub struct FromRowParse(std::marker::PhantomData); +pub struct NoParse; + +impl RowParse for ScalarParse { + type Item = T; + + #[inline] + fn try_from_row(row: Row) -> Result { + T::from_row(&row) + } +} + +impl RowParse for SingleScalarParse +where + T: for<'a> FromSql<'a>, +{ + type Item = T; + + #[inline] + fn try_from_row(row: Row) -> Result { + row.try_get(0) + } +} + +impl RowParse for FromRowParse { + type Item = T; + + #[inline] + fn try_from_row(row: Row) -> Result { + T::try_from_row(&row) + } +} + +impl RowParse for NoParse { + type Item = Row; + + #[inline] + fn try_from_row(row: Row) -> Result { + Ok(row) + } +} + +pub trait RowParse { + type Item; + + fn try_from_row(row: Row) -> Result; +} + +pub struct Query<'a, T: RowParse, O> { + query: &'a str, + params: &'a [Box], + _marker: std::marker::PhantomData<(T, O)>, +} + +fn params<'a>(params: &'a [Box]) -> Vec<&'a (dyn ToSql + Sync)> { + params.iter().map(|param| param.as_ref() as _).collect() +} + +pub enum Client<'a, C> { + Owned(C), + Borrowed(&'a C), +} + +impl> std::ops::Deref for Client<'_, C> { + type Target = tokio_postgres::Client; + + fn deref(&self) -> &Self::Target { + match self { + Client::Owned(client) => client.as_ref(), + Client::Borrowed(client) => client.as_ref(), + } + } +} + +#[allow(async_fn_in_trait)] +pub trait IntoClient { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError>; +} + +struct ClientWrapper<'a>(&'a tokio_postgres::Client); + +impl AsRef for ClientWrapper<'_> { + fn as_ref(&self) -> &tokio_postgres::Client { + self.0 + } +} + +struct TransactionWrapper<'a>(&'a tokio_postgres::Transaction<'a>); + +impl AsRef for TransactionWrapper<'_> { + fn as_ref(&self) -> &tokio_postgres::Client { + self.0.client() + } +} + +struct PoolClientWrapperOwned(deadpool_postgres::Client); + +impl AsRef for PoolClientWrapperOwned { + fn as_ref(&self) -> &tokio_postgres::Client { + self.0.as_ref() + } +} + +struct PoolClientWrapperBorrowed<'a>(&'a deadpool_postgres::Client); + +impl AsRef for PoolClientWrapperBorrowed<'_> { + fn as_ref(&self) -> &tokio_postgres::Client { + self.0.as_ref() + } +} + +struct PoolTransactionWrapper<'a>(&'a deadpool_postgres::Transaction<'a>); + +impl AsRef for PoolTransactionWrapper<'_> { + fn as_ref(&self) -> &tokio_postgres::Client { + self.0.client() + } +} + +impl IntoClient for tokio_postgres::Client { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + Ok(ClientWrapper(self)) + } +} + +impl IntoClient for tokio_postgres::Transaction<'_> { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + Ok(TransactionWrapper(self)) + } +} + +impl IntoClient for deadpool_postgres::Pool { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + Ok(PoolClientWrapperOwned(self.get().await?)) + } +} + +impl IntoClient for deadpool_postgres::Client { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + Ok(PoolClientWrapperBorrowed(self)) + } +} + +impl IntoClient for deadpool_postgres::Transaction<'_> { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + Ok(PoolTransactionWrapper(self)) + } +} + +impl IntoClient for Arc { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + self.as_ref().get_client().await + } +} + +impl IntoClient for &T { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + (*self).get_client().await + } +} + +impl, O> Query<'_, T, O> { + pub async fn execute(self, conn: impl IntoClient) -> Result { + Ok(conn + .get_client() + .await? + .as_ref() + .execute(self.query, ¶ms(self.params)) + .await?) + } + + pub async fn fetch_all(self, conn: impl IntoClient) -> Result, deadpool_postgres::PoolError> { + Ok(conn + .get_client() + .await? + .as_ref() + .query(self.query, ¶ms(self.params)) + .await? + .into_iter() + .map(T::try_from_row) + .collect::>()?) + } + + pub async fn fetch_one(self, conn: impl IntoClient) -> Result { + Ok(T::try_from_row( + conn.get_client() + .await? + .as_ref() + .query_one(self.query, ¶ms(self.params)) + .await?, + )?) + } + + pub async fn fetch_optional(self, conn: impl IntoClient) -> Result, deadpool_postgres::PoolError> { + Ok(conn + .get_client() + .await? + .as_ref() + .query_opt(self.query, ¶ms(self.params)) + .await? + .map(T::try_from_row) + .transpose()?) + } + + pub async fn fetch_many( + self, + conn: impl IntoClient, + ) -> Result> + Send + Sync, deadpool_postgres::PoolError> { + Ok(conn + .get_client() + .await? + .as_ref() + .query_raw(self.query, params(self.params).into_iter()) + .await? + .map(|row| Ok(T::try_from_row(row?)?))) + } +} + +pub trait SqlScalar { + fn from_row(row: &Row) -> Result + where + Self: Sized; +} + +macro_rules! impl_sql_scalar { + ($($ty:ident),*) => { + #[allow(unused_parens)] + impl<$($ty),*> SqlScalar for ($($ty),*,) + where + $($ty: for<'a> FromSql<'a>),* + { + #[allow(non_snake_case)] + #[allow(unused_assignments)] + fn from_row(row: &Row) -> Result { + let mut i = 0; + $( + let $ty = row.try_get::<_, $ty>(i)?; + i += 1; + )* + + Ok(($($ty),*,)) + } + } + }; +} + +macro_rules! impl_recursive { + // Match for a single type + ($ty:ident) => { + impl_sql_scalar!($ty); + }; + + // Match for multiple types + ($first:ident, $($rest:ident),+) => { + // Recursively call for the rest of the types + impl_sql_scalar!($first, $($rest),*); + impl_recursive!($($rest),*); + }; +} + +impl_recursive!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T16); + +pub struct Separated<'b, 'args> { + sep: &'b str, + first: bool, + query_builder: &'b mut QueryBuilder<'args>, +} + +impl<'args> Separated<'_, 'args> { + pub fn push_bind(&mut self, param: impl ToSql + Send + Sync + 'args) -> &mut Self { + if self.first { + self.first = false; + } else { + self.query_builder.push(self.sep); + } + + self.query_builder.push_bind(param); + self + } + + pub fn push(&mut self, query: impl AsRef) -> &mut Self { + if self.first { + self.first = false; + } else { + self.query_builder.push(self.sep); + } + + self.query_builder.push(query.as_ref()); + self + } + + pub fn push_unseparated(&mut self, query: impl AsRef) -> &mut Self { + self.query_builder.push(query.as_ref()); + self + } + + pub fn push_bind_unseparated(&mut self, param: impl ToSql + Send + Sync + 'args) -> &mut Self { + self.query_builder.push_bind(param); + self + } +} diff --git a/common/src/database/ulid.rs b/common/src/database/ulid.rs deleted file mode 100644 index 8055fb1f..00000000 --- a/common/src/database/ulid.rs +++ /dev/null @@ -1,74 +0,0 @@ -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -#[repr(transparent)] -pub struct Ulid(pub ulid::Ulid); - -impl sqlx::postgres::PgHasArrayType for Ulid { - fn array_compatible(ty: &sqlx_postgres::PgTypeInfo) -> bool { - ::array_compatible(ty) - } - - fn array_type_info() -> sqlx_postgres::PgTypeInfo { - ::array_type_info() - } -} - -impl Default for Ulid { - fn default() -> Self { - Self(ulid::Ulid::nil()) - } -} - -impl std::fmt::Display for Ulid { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.to_string().fmt(f) - } -} - -impl std::fmt::Debug for Ulid { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl sqlx::Decode<'_, sqlx::Postgres> for Ulid { - fn decode(value: sqlx::postgres::PgValueRef<'_>) -> Result> { - let id = >::decode(value)?; - Ok(Ulid(ulid::Ulid::from(id))) - } -} - -impl sqlx::Encode<'_, sqlx::Postgres> for Ulid { - fn encode_by_ref(&self, buf: &mut sqlx::postgres::PgArgumentBuffer) -> sqlx::encode::IsNull { - >::encode_by_ref(&self.0.into(), buf) - } -} - -impl sqlx::Type for Ulid { - fn type_info() -> sqlx::postgres::PgTypeInfo { - >::type_info() - } -} - -impl From for ulid::Ulid { - fn from(id: Ulid) -> Self { - id.0 - } -} - -impl From for Ulid { - fn from(id: ulid::Ulid) -> Self { - Ulid(id) - } -} - -impl From for Ulid { - fn from(id: uuid::Uuid) -> Self { - Ulid(ulid::Ulid::from(id)) - } -} - -impl From for uuid::Uuid { - fn from(id: Ulid) -> Self { - id.0.into() - } -} diff --git a/common/src/dataloader/mod.rs b/common/src/dataloader/mod.rs index da9c32c8..8af86ce7 100644 --- a/common/src/dataloader/mod.rs +++ b/common/src/dataloader/mod.rs @@ -129,14 +129,14 @@ impl, S: Send + Sync + Default + BuildHasher + 'static> DataLoader< } #[inline(always)] - pub async fn load_many(&self, keys: impl Iterator) -> LoaderOutput { + pub async fn load_many(&self, keys: impl IntoIterator) -> LoaderOutput { self.load_many_with_cache(NoCache, keys).await } pub async fn load_many_with_cache>( &self, mut cache: C, - keys: impl Iterator, + keys: impl IntoIterator, ) -> LoaderOutput { let mut results = HashMap::default(); diff --git a/common/src/global.rs b/common/src/global.rs index 4a996bbd..ccb91fc8 100644 --- a/common/src/global.rs +++ b/common/src/global.rs @@ -25,8 +25,9 @@ pub trait GlobalNats { fn jetstream(&self) -> &async_nats::jetstream::Context; } +#[allow(async_fn_in_trait)] pub trait GlobalDb { - fn db(&self) -> &Arc; + fn db(&self) -> &Arc; } pub trait GlobalRedis { diff --git a/platform/api/Cargo.toml b/platform/api/Cargo.toml index d6388878..a48b2766 100644 --- a/platform/api/Cargo.toml +++ b/platform/api/Cargo.toml @@ -14,8 +14,6 @@ common = { workspace = true, features = ["default"] } rustls = "0.22" rustls-pemfile = "2.0" tokio-rustls = "0.25" -sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "tls-rustls", "json", "chrono", "uuid"] } -sqlx-postgres = "0.7" path-tree = "0.7" serde_json = "1.0" reqwest = { version = "0.11", features = ["json", "rustls-tls"], default-features = false} @@ -42,7 +40,6 @@ async-trait = "0.1" bytes = "1.5" totp-rs = { version = "5.4", features = ["qr"] } thiserror = "1.0" -itertools = "0.12" anyhow = "1.0" multer = "3.0" aws-config = "1.1" @@ -52,6 +49,8 @@ http-body-util = "0.1" hyper-util = "0.1" pin-project = "1.1" base64 = "0.21" +postgres-from-row = "0.5" +postgres-types = "0.2" config = { workspace = true } pb = { workspace = true } @@ -59,5 +58,4 @@ binary-helper = { workspace = true } [dev-dependencies] tempfile = "3.8" -dotenvy = "0.15" http = "1.0" diff --git a/platform/api/src/api/auth.rs b/platform/api/src/api/auth.rs index ae01d8e3..a8189782 100644 --- a/platform/api/src/api/auth.rs +++ b/platform/api/src/api/auth.rs @@ -64,7 +64,7 @@ impl AuthData { let mut user_roles: Vec = global .role_by_id_loader() - .load_many(user.roles.iter().map(|i| i.0)) + .load_many(user.roles.clone()) .await .map_err(|_| AuthError::FetchRoles)? .into_values() @@ -97,7 +97,7 @@ impl AuthData { pub async fn from_session(global: &Arc, session: Session) -> Result { let user = global .user_by_id_loader() - .load(session.user_id.0) + .load(session.user_id) .await .map_err(|_| AuthError::FetchUser)? .ok_or(AuthError::UserNotFound)?; diff --git a/platform/api/src/api/error.rs b/platform/api/src/api/error.rs index f7347e1c..dc1d3dc7 100644 --- a/platform/api/src/api/error.rs +++ b/platform/api/src/api/error.rs @@ -18,5 +18,11 @@ pub enum ApiError { #[error("failed to query turnstile: {0}")] Turnstile(#[from] TurnstileError), #[error("failed to query database: {0}")] - Database(#[from] sqlx::Error), + Database(#[from] common::database::deadpool_postgres::PoolError), +} + +impl From for ApiError { + fn from(value: common::database::tokio_postgres::Error) -> Self { + Self::Database(value.into()) + } } diff --git a/platform/api/src/api/jwt.rs b/platform/api/src/api/jwt.rs index 3ab2c114..4f95013b 100644 --- a/platform/api/src/api/jwt.rs +++ b/platform/api/src/api/jwt.rs @@ -121,10 +121,10 @@ impl JwtState for AuthJwtPayload { impl From for AuthJwtPayload { fn from(session: Session) -> Self { AuthJwtPayload { - user_id: session.user_id.0, - session_id: session.id.0, + user_id: session.user_id, + session_id: session.id, expiration: Some(session.expires_at), - issued_at: Ulid::from(session.id).datetime().into(), + issued_at: session.id.datetime().into(), not_before: None, audience: None, } diff --git a/platform/api/src/api/request_context.rs b/platform/api/src/api/request_context.rs index 6ee388dc..733f6a42 100644 --- a/platform/api/src/api/request_context.rs +++ b/platform/api/src/api/request_context.rs @@ -38,7 +38,7 @@ impl RequestContext { if !auth.session.is_valid() { Err(AuthError::SessionExpired) } else if inner.websocket { - let auth = AuthData::from_session_id(global, auth.session.id.0).await?; + let auth = AuthData::from_session_id(global, auth.session.id).await?; if auth.session.is_valid() { self.set_auth(auth.clone()).await; Ok(Some(auth)) diff --git a/platform/api/src/api/v1/gql/error.rs b/platform/api/src/api/v1/gql/error.rs index 56a26a17..22e90f28 100644 --- a/platform/api/src/api/v1/gql/error.rs +++ b/platform/api/src/api/v1/gql/error.rs @@ -37,7 +37,7 @@ pub enum GqlError { InternalServerError(&'static str), /// A database error occurred. #[error("database error: {0}")] - Sqlx(#[from] Arc), + Database(#[from] Arc), /// The input was invalid. #[error("invalid input for {fields:?}: {message}")] InvalidInput { @@ -73,6 +73,12 @@ pub enum GqlError { Subscription(#[from] Arc), } +impl From for GqlError { + fn from(value: common::database::tokio_postgres::Error) -> Self { + Self::Database(Arc::new(value.into())) + } +} + macro_rules! impl_arc_from { ($err:ty) => { impl From<$err> for GqlError { @@ -83,7 +89,7 @@ macro_rules! impl_arc_from { }; } -impl_arc_from!(sqlx::Error); +impl_arc_from!(common::database::deadpool_postgres::PoolError); impl_arc_from!(turnstile::TurnstileError); impl_arc_from!(async_nats::PublishError); impl_arc_from!(SubscriptionManagerError); @@ -93,7 +99,7 @@ impl GqlError { matches!( self, GqlError::InternalServerError(_) - | GqlError::Sqlx(_) + | GqlError::Database(_) | GqlError::Turnstile(_) | GqlError::Publish(_) | GqlError::Subscription(_) @@ -122,7 +128,7 @@ impl GqlError { GqlError::Turnstile(_) => "Turnstile", GqlError::Publish(_) => "Publish", GqlError::InternalServerError(_) => "InternalServerError", - GqlError::Sqlx(_) => "Sqlx", + GqlError::Database(_) => "Database", GqlError::VideoApi(_) => "VideoApi", GqlError::Subscription(_) => "Subscription", } diff --git a/platform/api/src/api/v1/gql/guards.rs b/platform/api/src/api/v1/gql/guards.rs index 435cd86c..e3a5847e 100644 --- a/platform/api/src/api/v1/gql/guards.rs +++ b/platform/api/src/api/v1/gql/guards.rs @@ -21,7 +21,7 @@ pub async fn auth_guard( let auth = request_context.auth(global).await?; if let Some(auth) = auth { - if Ulid::from(auth.session.user_id) == user_id || auth.user_permissions.has_permission(RolePermission::Admin) { + if auth.session.user_id == user_id || auth.user_permissions.has_permission(RolePermission::Admin) { return Ok(field_value); } } diff --git a/platform/api/src/api/v1/gql/models/category.rs b/platform/api/src/api/v1/gql/models/category.rs index a64f493b..1dabc809 100644 --- a/platform/api/src/api/v1/gql/models/category.rs +++ b/platform/api/src/api/v1/gql/models/category.rs @@ -15,7 +15,7 @@ pub struct Category { impl From for Category { fn from(value: database::Category) -> Self { Self { - id: value.id.0.into(), + id: value.id.into(), name: value.name, revision: value.revision, updated_at: value.updated_at.into(), diff --git a/platform/api/src/api/v1/gql/models/channel.rs b/platform/api/src/api/v1/gql/models/channel.rs index c62f6664..1e53f1b5 100644 --- a/platform/api/src/api/v1/gql/models/channel.rs +++ b/platform/api/src/api/v1/gql/models/channel.rs @@ -1,6 +1,5 @@ use async_graphql::{ComplexObject, Context, SimpleObject}; use chrono::Utc; -use common::database::Ulid; use jwt_next::SignWithKey; use super::category::Category; @@ -58,7 +57,7 @@ impl Channel { async fn followers_count(&self, ctx: &Context<'_>) -> Result { let global = ctx.get_global::(); - let (followers,) = sqlx::query_as( + let followers = common::database::query( r#" SELECT COUNT(*) @@ -69,8 +68,9 @@ impl Channel { AND following = true "#, ) - .bind(self.id.to_uuid()) - .fetch_one(global.db().as_ref()) + .bind(self.id.to_ulid()) + .build_query_single_scalar() + .fetch_one(global.db()) .await .map_err_gql("failed to fetch followers")?; @@ -125,12 +125,13 @@ impl ChannelLive { .await .map_err_gql("failed to fetch playback session count")?; - sqlx::query( + common::database::query( "UPDATE users SET channel_live_viewer_count = $1, channel_live_viewer_count_updated_at = NOW() WHERE id = $2", ) .bind(live_viewer_count) - .bind(Ulid::from(self.channel_id)) - .execute(global.db().as_ref()) + .bind(self.channel_id) + .build() + .execute(global.db()) .await .map_err_gql("failed to update live viewer count")?; @@ -197,18 +198,18 @@ impl From for Channel { fn from(value: database::Channel) -> Self { let stream_key_ = value.get_stream_key(); Self { - id: value.id.0.into(), + id: value.id.into(), title: value.title, description: value.description, - links: value.links.0, - custom_thumbnail_id: value.custom_thumbnail_id.map(|v| Into::into(v.0)), - offline_banner_id: value.offline_banner_id.map(|v| Into::into(v.0)), - category_id: value.category_id.map(|v| Into::into(v.0)), + links: value.links, + custom_thumbnail_id: value.custom_thumbnail_id.map(Into::into), + offline_banner_id: value.offline_banner_id.map(Into::into), + category_id: value.category_id.map(Into::into), live: value.active_connection_id.map(|_| ChannelLive { - room_id: value.room_id.0.into(), + room_id: value.room_id.into(), live_viewer_count_: value.live_viewer_count, live_viewer_count_updated_at_: value.live_viewer_count_updated_at.map(DateRFC3339), - channel_id: value.id.0, + channel_id: value.id, _phantom: std::marker::PhantomData, }), last_live_at: value.last_live_at.map(DateRFC3339), diff --git a/platform/api/src/api/v1/gql/models/chat_message.rs b/platform/api/src/api/v1/gql/models/chat_message.rs index 3d896650..b51cd1e4 100644 --- a/platform/api/src/api/v1/gql/models/chat_message.rs +++ b/platform/api/src/api/v1/gql/models/chat_message.rs @@ -64,9 +64,9 @@ impl ChatMessage { impl From for ChatMessage { fn from(model: database::ChatMessage) -> Self { Self { - id: model.id.0.into(), - channel_id: model.channel_id.0.into(), - user_id: model.user_id.0.into(), + id: model.id.into(), + channel_id: model.channel_id.into(), + user_id: model.user_id.into(), content: model.content, r#type: MessageType::User, _phantom: std::marker::PhantomData, diff --git a/platform/api/src/api/v1/gql/models/image_upload.rs b/platform/api/src/api/v1/gql/models/image_upload.rs index c44c053f..6023f197 100644 --- a/platform/api/src/api/v1/gql/models/image_upload.rs +++ b/platform/api/src/api/v1/gql/models/image_upload.rs @@ -55,8 +55,8 @@ impl ImageUpload { return Ok(None); } - if let Some(uploaded_file_metadata::Metadata::Image(image)) = uploaded_file.metadata.0.metadata { - Ok(Some(Self::new(uploaded_file.id.0, image))) + if let Some(uploaded_file_metadata::Metadata::Image(image)) = uploaded_file.metadata.metadata { + Ok(Some(Self::new(uploaded_file.id, image))) } else { Err(GqlError::InternalServerError("uploaded file is not an image").into()) } diff --git a/platform/api/src/api/v1/gql/models/role.rs b/platform/api/src/api/v1/gql/models/role.rs index 1b9969a4..de1049c6 100644 --- a/platform/api/src/api/v1/gql/models/role.rs +++ b/platform/api/src/api/v1/gql/models/role.rs @@ -16,8 +16,8 @@ pub struct Role { impl From for Role { fn from(value: database::Role) -> Self { Self { - id: value.id.0.into(), - channel_id: value.channel_id.map(|v| v.0.into()), + id: value.id.into(), + channel_id: value.channel_id.map(Into::into), name: value.name, description: value.description, allowed_permissions: value.allowed_permissions.bits(), diff --git a/platform/api/src/api/v1/gql/models/user.rs b/platform/api/src/api/v1/gql/models/user.rs index 0afd3357..2803c83b 100644 --- a/platform/api/src/api/v1/gql/models/user.rs +++ b/platform/api/src/api/v1/gql/models/user.rs @@ -75,17 +75,17 @@ impl User { impl From for User { fn from(value: database::User) -> Self { Self { - id: value.id.0.into(), + id: value.id.into(), username: value.username, display_name: value.display_name, display_color: value.display_color.into(), channel: value.channel.into(), - pending_profile_picture_id: value.pending_profile_picture_id.map(|u| u.0.into()), + pending_profile_picture_id: value.pending_profile_picture_id.map(Into::into), email_: value.email, email_verified_: value.email_verified, last_login_at_: value.last_login_at.into(), totp_enabled_: value.totp_enabled, - profile_picture_: value.profile_picture_id.map(|u| u.0), + profile_picture_: value.profile_picture_id, } } } diff --git a/platform/api/src/api/v1/gql/mutations/auth.rs b/platform/api/src/api/v1/gql/mutations/auth.rs index 891abb0d..33a79e16 100644 --- a/platform/api/src/api/v1/gql/mutations/auth.rs +++ b/platform/api/src/api/v1/gql/mutations/auth.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use async_graphql::{Context, Object, Union}; use chrono::{Duration, Utc}; -use common::database::{TraitProtobuf, Ulid}; use pb::scuffle::platform::internal::two_fa::two_fa_request_action::{Action, Login}; use pb::scuffle::platform::internal::two_fa::TwoFaRequestAction; use prost::Message; @@ -91,7 +90,7 @@ impl AuthMutation { if user.totp_enabled { let request_id = ulid::Ulid::new(); - sqlx::query( + common::database::query( r#" INSERT INTO two_fa_requests ( id, @@ -103,7 +102,7 @@ impl AuthMutation { $3 )"#, ) - .bind(Ulid::from(request_id)) + .bind(request_id) .bind(user.id) .bind( TwoFaRequestAction { @@ -111,7 +110,8 @@ impl AuthMutation { } .encode_to_vec(), ) - .execute(global.db().as_ref()) + .build() + .execute(global.db()) .await?; Ok(TwoFaResponse::TwoFaRequest(TwoFaRequest { id: request_id.into() })) } else { @@ -129,9 +129,9 @@ impl AuthMutation { } Ok(TwoFaResponse::Success(Session { - id: session.id.0.into(), + id: session.id.into(), token, - user_id: session.user_id.0.into(), + user_id: session.user_id.into(), expires_at: session.expires_at.into(), last_used_at: session.last_used_at.into(), })) @@ -149,7 +149,7 @@ impl AuthMutation { let request_context = ctx.get_req_context(); // TODO: Make this a dataloader - let request: database::TwoFaRequest = sqlx::query_as( + let request: database::TwoFaRequest = common::database::query( r#" SELECT * @@ -159,14 +159,15 @@ impl AuthMutation { id = $1 "#, ) - .bind(Ulid::from(id.to_ulid())) - .fetch_optional(global.db().as_ref()) + .bind(id.to_ulid()) + .build_query_as() + .fetch_optional(global.db()) .await? .ok_or(GqlError::NotFound("2fa request"))?; let user = global .user_by_id_loader() - .load(request.user_id.into()) + .load(request.user_id) .await .map_err_ignored_gql("failed to fetch user")? .ok_or(GqlError::NotFound("user"))?; @@ -179,7 +180,7 @@ impl AuthMutation { .into()); } - sqlx::query( + common::database::query( r#" DELETE FROM two_fa_requests @@ -188,10 +189,11 @@ impl AuthMutation { "#, ) .bind(request.id) - .execute(global.db().as_ref()) + .build() + .execute(global.db()) .await?; - match request.action.into_inner().action { + match request.action.action { Some(Action::Login(action)) => { let update_context = action.update_context; let session = action.execute(global, user.id).await?; @@ -207,9 +209,9 @@ impl AuthMutation { } Ok(Some(TwoFaRequestFulfillResponse::Login(Session { - id: session.id.0.into(), + id: session.id.into(), token, - user_id: session.user_id.0.into(), + user_id: session.user_id.into(), expires_at: session.expires_at.into(), last_used_at: session.last_used_at.into(), }))) @@ -240,7 +242,7 @@ impl AuthMutation { })?; // TODO: maybe look to batch this - let session: database::Session = sqlx::query_as( + let session: database::Session = common::database::query( r#" UPDATE user_sessions @@ -252,8 +254,9 @@ impl AuthMutation { * "#, ) - .bind(Ulid::from(jwt.session_id)) - .fetch_optional(global.db().as_ref()) + .bind(jwt.session_id) + .build_query_as() + .fetch_optional(global.db()) .await? .map_err_gql(GqlError::InvalidInput { fields: vec!["sessionToken"], @@ -271,9 +274,9 @@ impl AuthMutation { } Ok(Session { - id: session.id.0.into(), + id: session.id.into(), token: session_token, - user_id: session.user_id.0.into(), + user_id: session.user_id.into(), expires_at: session.expires_at.into(), last_used_at: session.last_used_at.into(), }) @@ -347,10 +350,12 @@ impl AuthMutation { // TODO: what do we do when the next step fails? delete the room again? - let mut tx = global.db().begin().await?; + let mut client = global.db().get().await?; + + let tx = client.transaction().await?; // TODO: maybe look to batch this - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" INSERT INTO users ( id, @@ -373,22 +378,23 @@ impl AuthMutation { ) RETURNING * "#, ) - .bind(Ulid::from(ulid::Ulid::new())) + .bind(ulid::Ulid::new()) .bind(username) .bind(display_name) .bind(database::User::generate_display_color()) .bind(database::User::hash_password(&password)) .bind(email) - .bind(Ulid::from(channel_room_id)) + .bind(channel_room_id) .bind(res.stream_key) - .fetch_one(&mut *tx) + .build_query_as() + .fetch_one(&tx) .await?; let login_duration = validity.unwrap_or(60 * 60 * 24 * 7); // 7 days let expires_at = Utc::now() + Duration::seconds(login_duration as i64); // TODO: maybe look to batch this - let session: database::Session = sqlx::query_as( + let session: database::Session = common::database::query( r#" INSERT INTO user_sessions ( id, @@ -401,10 +407,11 @@ impl AuthMutation { ) RETURNING * "#, ) - .bind(Ulid::from(ulid::Ulid::new())) + .bind(ulid::Ulid::new()) .bind(user.id) .bind(expires_at) - .fetch_one(&mut *tx) + .build_query_as() + .fetch_one(&tx) .await?; let jwt = AuthJwtPayload::from(session.clone()); @@ -412,6 +419,7 @@ impl AuthMutation { let token = jwt.serialize(global).map_err_gql("failed to serialize JWT")?; tx.commit().await?; + drop(client); // We need to update the request context with the new session if update_context.unwrap_or(true) { @@ -431,9 +439,9 @@ impl AuthMutation { } Ok(Session { - id: session.id.0.into(), + id: session.id.into(), token, - user_id: session.user_id.0.into(), + user_id: session.user_id.into(), expires_at: session.expires_at.into(), last_used_at: session.last_used_at.into(), }) @@ -465,11 +473,10 @@ impl AuthMutation { .map_err_gql(GqlError::Auth(AuthError::NotLoggedIn))? .session .id - .0 }; // TODO: maybe look to batch this - sqlx::query( + common::database::query( r#" DELETE FROM user_sessions @@ -477,8 +484,9 @@ impl AuthMutation { id = $1 "#, ) - .bind(Ulid::from(session_id)) - .execute(global.db().as_ref()) + .bind(session_id) + .build() + .execute(global.db()) .await?; if session_token.is_none() { diff --git a/platform/api/src/api/v1/gql/mutations/channel.rs b/platform/api/src/api/v1/gql/mutations/channel.rs index ff25c2a2..8acacf78 100644 --- a/platform/api/src/api/v1/gql/mutations/channel.rs +++ b/platform/api/src/api/v1/gql/mutations/channel.rs @@ -29,7 +29,7 @@ impl ChannelMutation { .await? .map_err_gql(GqlError::Auth(AuthError::NotLoggedIn))?; - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" UPDATE users SET @@ -42,17 +42,16 @@ impl ChannelMutation { ) .bind(title.clone()) .bind(auth.session.user_id) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; - let channel_id = user.id.0; - global .nats() .publish( - SubscriptionTopic::ChannelTitle(channel_id), + SubscriptionTopic::ChannelTitle(user.id), pb::scuffle::platform::internal::events::ChannelTitle { - channel_id: Some(channel_id.into()), + channel_id: Some(user.id.into()), title, } .encode_to_vec() diff --git a/platform/api/src/api/v1/gql/mutations/chat.rs b/platform/api/src/api/v1/gql/mutations/chat.rs index 73ae9a4a..b5360963 100644 --- a/platform/api/src/api/v1/gql/mutations/chat.rs +++ b/platform/api/src/api/v1/gql/mutations/chat.rs @@ -2,7 +2,6 @@ use async_graphql::{Context, Object}; use prost::Message; use tracing::error; use ulid::Ulid; -use uuid::Uuid; use crate::api::auth::AuthError; use crate::api::v1::gql::error::ext::*; @@ -42,7 +41,7 @@ impl ChatMutation { // TODO: Check if the user is allowed to send messages in this chat let message_id = Ulid::new(); - let chat_message: database::ChatMessage = sqlx::query_as( + let chat_message: database::ChatMessage = common::database::query( r#" INSERT INTO chat_messages ( id, @@ -57,11 +56,12 @@ impl ChatMutation { ) RETURNING * "#, ) - .bind(Uuid::from(message_id)) + .bind(message_id) .bind(auth.session.user_id) - .bind(channel_id.to_uuid()) + .bind(channel_id.to_ulid()) .bind(content.clone()) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; match global diff --git a/platform/api/src/api/v1/gql/mutations/user.rs b/platform/api/src/api/v1/gql/mutations/user.rs index 2b232b20..9d544d59 100644 --- a/platform/api/src/api/v1/gql/mutations/user.rs +++ b/platform/api/src/api/v1/gql/mutations/user.rs @@ -1,6 +1,5 @@ use async_graphql::{ComplexObject, Context, SimpleObject}; use bytes::Bytes; -use common::database::Ulid; use pb::scuffle::platform::internal::two_fa::two_fa_request_action::{Action, ChangePassword}; use pb::scuffle::platform::internal::two_fa::TwoFaRequestAction; use prost::Message; @@ -51,7 +50,7 @@ impl UserMutation { .await? .map_err_gql(GqlError::Auth(AuthError::NotLoggedIn))?; - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" UPDATE users SET @@ -65,7 +64,8 @@ impl UserMutation { ) .bind(email) .bind(auth.session.user_id) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; Ok(user.into()) @@ -88,7 +88,7 @@ impl UserMutation { // TDOD: Can we combine the two queries into one? let user: database::User = global .user_by_id_loader() - .load(auth.session.user_id.0) + .load(auth.session.user_id) .await .map_err_ignored_gql("failed to fetch user")? .map_err_gql(GqlError::NotFound("user"))?; @@ -102,7 +102,7 @@ impl UserMutation { .into()); } - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" UPDATE users SET @@ -117,15 +117,16 @@ impl UserMutation { .bind(display_name.clone()) .bind(auth.session.user_id) .bind(user.username) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; global .nats() .publish( - SubscriptionTopic::UserDisplayName(user.id.0), + SubscriptionTopic::UserDisplayName(user.id), pb::scuffle::platform::internal::events::UserDisplayName { - user_id: Some(user.id.0.into()), + user_id: Some(user.id.into()), display_name, } .encode_to_vec() @@ -151,7 +152,7 @@ impl UserMutation { .await? .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" UPDATE users SET @@ -164,15 +165,16 @@ impl UserMutation { ) .bind(*color) .bind(auth.session.user_id) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; global .nats() .publish( - SubscriptionTopic::UserDisplayColor(user.id.0), + SubscriptionTopic::UserDisplayColor(user.id), pb::scuffle::platform::internal::events::UserDisplayColor { - user_id: Some(user.id.0.into()), + user_id: Some(user.id.into()), display_color: *color, } .encode_to_vec() @@ -194,19 +196,20 @@ impl UserMutation { .await? .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( "UPDATE users SET profile_picture_id = NULL, pending_profile_picture_id = NULL WHERE id = $1 RETURNING *", ) .bind(auth.session.user_id) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; global .nats() .publish( - SubscriptionTopic::UserProfilePicture(user.id.0), + SubscriptionTopic::UserProfilePicture(user.id), pb::scuffle::platform::internal::events::UserProfilePicture { - user_id: Some(user.id.0.into()), + user_id: Some(user.id.into()), profile_picture_id: None, } .encode_to_vec() @@ -234,7 +237,7 @@ impl UserMutation { let user = global .user_by_id_loader() - .load(auth.session.user_id.0) + .load(auth.session.user_id) .await .map_err_ignored_gql("failed to fetch user")? .map_err_gql(GqlError::NotFound("user"))?; @@ -249,12 +252,12 @@ impl UserMutation { let change_password = ChangePassword { new_password_hash: database::User::hash_password(&new_password), - current_session_id: Some(auth.session.id.0.into()), + current_session_id: Some(auth.session.id.into()), }; if user.totp_enabled { let request_id = ulid::Ulid::new(); - sqlx::query( + common::database::query( r#" INSERT INTO two_fa_requests ( id, @@ -267,7 +270,7 @@ impl UserMutation { ) "#, ) - .bind(Ulid::from(request_id)) + .bind(request_id) .bind(user.id) .bind( TwoFaRequestAction { @@ -275,7 +278,8 @@ impl UserMutation { } .encode_to_vec(), ) - .execute(global.db().as_ref()) + .build() + .execute(global.db()) .await?; Ok(TwoFaResponse::TwoFaRequest(TwoFaRequest { id: request_id.into() })) } else { @@ -299,7 +303,7 @@ impl UserMutation { .await? .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; - if auth.session.user_id.0 == channel_id.to_ulid() { + if auth.session.user_id == channel_id.to_ulid() { return Err(GqlError::InvalidInput { fields: vec!["channelId"], message: "Cannot follow yourself", @@ -307,7 +311,7 @@ impl UserMutation { .into()); } - sqlx::query( + common::database::query( r#" UPSERT INTO channel_user ( user_id, @@ -321,18 +325,19 @@ impl UserMutation { "#, ) .bind(auth.session.user_id) - .bind(channel_id.to_uuid()) + .bind(channel_id.to_ulid()) .bind(follow) - .execute(global.db().as_ref()) + .build() + .execute(global.db()) .await?; let channel_id = channel_id.to_ulid(); - let user_subject = SubscriptionTopic::UserFollows(auth.session.user_id.0); + let user_subject = SubscriptionTopic::UserFollows(auth.session.user_id); let channel_subject = SubscriptionTopic::ChannelFollows(channel_id); let msg = Bytes::from( pb::scuffle::platform::internal::events::UserFollowChannel { - user_id: Some(auth.session.user_id.0.into()), + user_id: Some(auth.session.user_id.into()), channel_id: Some(channel_id.into()), following: follow, } diff --git a/platform/api/src/api/v1/gql/mutations/user/two_fa.rs b/platform/api/src/api/v1/gql/mutations/user/two_fa.rs index 21d71b0c..c498ab60 100644 --- a/platform/api/src/api/v1/gql/mutations/user/two_fa.rs +++ b/platform/api/src/api/v1/gql/mutations/user/two_fa.rs @@ -34,7 +34,7 @@ impl TwoFaMutation { let user: database::User = global .user_by_id_loader() - .load(auth.session.user_id.0) + .load(auth.session.user_id) .await .map_err_ignored_gql("failed to fetch user")? .map_err_gql(GqlError::NotFound("user"))?; @@ -67,7 +67,7 @@ impl TwoFaMutation { let hex_backup_codes = backup_codes.iter().map(|c| format!("{:08x}", c)).collect(); // Save secret and backup codes to database. - sqlx::query( + common::database::query( r#" UPDATE users @@ -82,7 +82,8 @@ impl TwoFaMutation { .bind(secret) .bind(backup_codes) .bind(auth.session.user_id) - .execute(global.db().as_ref()) + .build() + .execute(global.db()) .await?; let qr_code = totp.get_qr_base64().map_err_ignored_gql("failed generate qr code")?; @@ -105,7 +106,7 @@ impl TwoFaMutation { let user: database::User = global .user_by_id_loader() - .load(auth.session.user_id.0) + .load(auth.session.user_id) .await .map_err_ignored_gql("failed to fetch user")? .map_err_gql(GqlError::NotFound("user"))?; @@ -129,7 +130,7 @@ impl TwoFaMutation { } // Enable 2fa - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" UPDATE users @@ -142,7 +143,8 @@ impl TwoFaMutation { "#, ) .bind(auth.session.user_id) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; // TODO: Log out all other sessions? @@ -162,7 +164,7 @@ impl TwoFaMutation { let user: database::User = global .user_by_id_loader() - .load(auth.session.user_id.0) + .load(auth.session.user_id) .await .map_err_ignored_gql("failed to fetch user")? .map_err_gql(GqlError::NotFound("user"))?; @@ -177,7 +179,7 @@ impl TwoFaMutation { } // Disable 2fa, remove secret and backup codes. - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" UPDATE users SET @@ -191,7 +193,8 @@ impl TwoFaMutation { "#, ) .bind(auth.session.user_id) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; Ok(user.into()) diff --git a/platform/api/src/api/v1/gql/queries/category.rs b/platform/api/src/api/v1/gql/queries/category.rs index 303778a5..5016495b 100644 --- a/platform/api/src/api/v1/gql/queries/category.rs +++ b/platform/api/src/api/v1/gql/queries/category.rs @@ -3,10 +3,10 @@ use async_graphql::{Context, Object, SimpleObject}; use crate::api::v1::gql::error::ext::*; use crate::api::v1::gql::error::Result; use crate::api::v1::gql::ext::ContextExt; +use crate::api::v1::gql::models; use crate::api::v1::gql::models::category::Category; use crate::api::v1::gql::models::search_result::SearchResult; use crate::api::v1::gql::models::ulid::GqlUlid; -use crate::api::v1::gql::models::{self}; use crate::database; use crate::global::ApiGlobal; @@ -61,11 +61,12 @@ impl CategoryQuery { ) -> Result { let global = ctx.get_global::(); - let categories: Vec> = sqlx::query_as("SELECT categories.*, similarity(name, $1), COUNT(*) OVER() AS total_count FROM categories WHERE name % $1 ORDER BY similarity DESC LIMIT $2 OFFSET $3") + let categories: Vec> = common::database::query("SELECT categories.*, similarity(name, $1), COUNT(*) OVER() AS total_count FROM categories WHERE name % $1 ORDER BY similarity DESC LIMIT $2 OFFSET $3") .bind(query) .bind(limit.unwrap_or(5)) .bind(offset.unwrap_or(0)) - .fetch_all(global.db().as_ref()) + .build_query_as() + .fetch_all(global.db()) .await .map_err_gql("failed to search categories")?; diff --git a/platform/api/src/api/v1/gql/queries/mod.rs b/platform/api/src/api/v1/gql/queries/mod.rs index 972deb8a..e354e31a 100644 --- a/platform/api/src/api/v1/gql/queries/mod.rs +++ b/platform/api/src/api/v1/gql/queries/mod.rs @@ -1,6 +1,6 @@ use async_graphql::{ComplexObject, Context, SimpleObject}; -use common::database::Ulid; -use sqlx::FromRow; +use postgres_from_row::FromRow; +use ulid::Ulid; use super::error::ext::*; use super::ext::ContextExt; @@ -48,7 +48,7 @@ impl Query { ) -> Result> { let global = ctx.get_global::(); - let query_results: Vec = sqlx::query_as( + let query_results: Vec = common::database::query( r#" WITH CombinedResults AS ( SELECT @@ -84,7 +84,8 @@ impl Query { .bind(query) .bind(limit.unwrap_or(5)) .bind(offset.unwrap_or(0)) - .fetch_all(global.db().as_ref()) + .build_query_as() + .fetch_all(global.db()) .await .map_err_gql("failed to search")?; @@ -96,7 +97,7 @@ impl Query { 1 => &mut store.1, _ => unreachable!(), } - .push(item.id.0); + .push(item.id); store }); @@ -111,8 +112,8 @@ impl Query { .iter() .filter_map(|r| { let object = match r.r#type { - 0 => SearchAllResultData::User(Box::new(User::from(users.get(&r.id.0)?.clone()))), - 1 => SearchAllResultData::Category(categories.get(&r.id.0)?.clone().into()), + 0 => SearchAllResultData::User(Box::new(User::from(users.get(&r.id)?.clone()))), + 1 => SearchAllResultData::Category(categories.get(&r.id)?.clone().into()), _ => unreachable!(), }; diff --git a/platform/api/src/api/v1/gql/queries/user.rs b/platform/api/src/api/v1/gql/queries/user.rs index 9d4d42d0..a38eaab1 100644 --- a/platform/api/src/api/v1/gql/queries/user.rs +++ b/platform/api/src/api/v1/gql/queries/user.rs @@ -48,7 +48,7 @@ impl UserQuery { global .user_by_id_loader() - .load(auth.session.user_id.0) + .load(auth.session.user_id) .await .map_err_ignored_gql("failed to fetch user")? .map_err_gql(GqlError::NotFound("user")) @@ -93,16 +93,17 @@ impl UserQuery { &self, ctx: &Context<'_>, #[graphql(desc = "The search query.")] query: String, - #[graphql(desc = "The result limit, default: 5", validator(minimum = 0, maximum = 50))] limit: Option, - #[graphql(desc = "The result offset, default: 0", validator(minimum = 0, maximum = 950))] offset: Option, + #[graphql(desc = "The result limit, default: 5", validator(minimum = 0, maximum = 50))] limit: Option, + #[graphql(desc = "The result offset, default: 0", validator(minimum = 0, maximum = 950))] offset: Option, ) -> Result> { let global = ctx.get_global::(); - let users: Vec> = sqlx::query_as("SELECT users.*, similarity(username, $1), COUNT(*) OVER() AS total_count FROM users WHERE username % $1 ORDER BY similarity DESC LIMIT $2 OFFSET $3") + let users: Vec> = common::database::query("SELECT users.*, similarity(username, $1), COUNT(*) OVER() AS total_count FROM users WHERE username % $1 ORDER BY similarity DESC LIMIT $2 OFFSET $3") .bind(query) .bind(limit.unwrap_or(5)) .bind(offset.unwrap_or(0)) - .fetch_all(global.db().as_ref()) + .build_query_as() + .fetch_all(global.db()) .await .map_err_gql("failed to search users")?; @@ -119,7 +120,7 @@ impl UserQuery { .await? .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; - let (is_following,): (bool,) = sqlx::query_as( + let is_following = common::database::query( r#" SELECT following @@ -131,10 +132,11 @@ impl UserQuery { "#, ) .bind(auth.session.user_id) - .bind(channel_id.to_uuid()) - .fetch_optional(global.db().as_ref()) + .bind(channel_id.to_ulid()) + .build_query_single_scalar::() + .fetch_optional(global.db()) .await? - .unwrap_or((false,)); + .unwrap_or_default(); Ok(is_following) } @@ -154,12 +156,12 @@ impl UserQuery { .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; // TODO: Also allow users with permission - if id.to_ulid() != auth.session.user_id.0 { + if id.to_ulid() != auth.session.user_id { return Err(GqlError::Unauthorized { field: "following" }.into()); } // This query is not very good, we should have some paging mechinsm with ids. - let channels: Vec = sqlx::query_as( + let channels: Vec = common::database::query( r#" SELECT users.* @@ -178,9 +180,10 @@ impl UserQuery { LIMIT $2 "#, ) - .bind(id.to_uuid()) + .bind(id.to_ulid()) .bind(limit.map(|l| l as i64)) - .fetch_all(global.db().as_ref()) + .build_query_as() + .fetch_all(global.db()) .await?; Ok(channels.into_iter().map(Into::into).collect()) diff --git a/platform/api/src/api/v1/gql/subscription/channel.rs b/platform/api/src/api/v1/gql/subscription/channel.rs index 732fd999..61516e8f 100644 --- a/platform/api/src/api/v1/gql/subscription/channel.rs +++ b/platform/api/src/api/v1/gql/subscription/channel.rs @@ -49,7 +49,7 @@ impl ChannelSubscription { .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; // TODO: allow other users with permissions - if auth.session.user_id.0 != channel_id.to_ulid() { + if auth.session.user_id != channel_id.to_ulid() { return Err(GqlError::Unauthorized { field: "channel_follows", } @@ -88,7 +88,7 @@ impl ChannelSubscription { let stream = self.channel_follows(ctx, channel_id).await?; - let (mut followers,) = sqlx::query_as( + let mut followers = common::database::query( r#" SELECT COUNT(*) @@ -99,8 +99,9 @@ impl ChannelSubscription { AND following = true "#, ) - .bind(channel_id.to_uuid()) - .fetch_one(global.db().as_ref()) + .bind(channel_id.to_ulid()) + .build_query_single_scalar() + .fetch_one(global.db()) .await?; Ok(stream.map(move |value| { diff --git a/platform/api/src/api/v1/gql/subscription/chat.rs b/platform/api/src/api/v1/gql/subscription/chat.rs index 086cba5e..1b60acaa 100644 --- a/platform/api/src/api/v1/gql/subscription/chat.rs +++ b/platform/api/src/api/v1/gql/subscription/chat.rs @@ -52,12 +52,13 @@ impl ChatSubscription { // load old messages not older than 10 minutes, max 100 messages let not_older_than = chrono::Utc::now() - chrono::Duration::minutes(10); let not_older_than = ulid::Ulid::from_parts(not_older_than.timestamp() as u64, u128::MAX); - let messages: Vec = sqlx::query_as( + let messages: Vec = common::database::query( "SELECT * FROM chat_messages WHERE channel_id = $1 AND deleted_at IS NULL AND id >= $2 ORDER BY id LIMIT 100", ) - .bind(common::database::Ulid::from(channel_id.to_ulid())) - .bind(common::database::Ulid::from(not_older_than)) - .fetch_all(global.db().as_ref()) + .bind(channel_id.to_ulid()) + .bind(not_older_than) + .build_query_as() + .fetch_all(global.db()) .await .map_err_gql("failed to fetch chat messages")?; diff --git a/platform/api/src/api/v1/gql/subscription/file.rs b/platform/api/src/api/v1/gql/subscription/file.rs index 2d335e0d..38b4fa2f 100644 --- a/platform/api/src/api/v1/gql/subscription/file.rs +++ b/platform/api/src/api/v1/gql/subscription/file.rs @@ -71,7 +71,7 @@ impl FileSubscription { FileStatus::Success }; yield Ok(FileStatusStream { - file_id: file.id.0.into(), + file_id: file.id.into(), status, reason: file.failed, // TODO: we don't have access to the friendly message here because it isn't in the db diff --git a/platform/api/src/api/v1/gql/subscription/user.rs b/platform/api/src/api/v1/gql/subscription/user.rs index 78bbeed0..c987fd27 100644 --- a/platform/api/src/api/v1/gql/subscription/user.rs +++ b/platform/api/src/api/v1/gql/subscription/user.rs @@ -164,7 +164,7 @@ impl UserSubscription { let profile_picture = if let Some(profile_picture_id) = profile_picture_id { global .uploaded_file_by_id_loader() - .load(profile_picture_id.0) + .load(profile_picture_id) .await .map_err_ignored_gql("failed to fetch profile picture")? .map(ImageUpload::from_uploaded_file) @@ -221,7 +221,7 @@ impl UserSubscription { .await? .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; - let user_id: Ulid = auth.session.user_id.into(); + let user_id: Ulid = auth.session.user_id; let mut subscription = global .subscription_manager() @@ -231,7 +231,7 @@ impl UserSubscription { Ok(async_stream::stream!({ if let Some(channel_id) = channel_id { - let (is_following,): (bool,) = sqlx::query_as( + let is_following = common::database::query( r#" SELECT following @@ -243,11 +243,12 @@ impl UserSubscription { "#, ) .bind(auth.session.user_id) - .bind(channel_id.to_uuid()) - .fetch_optional(global.db().as_ref()) + .bind(channel_id.to_ulid()) + .build_query_single_scalar::() + .fetch_optional(global.db()) .await .map_err_gql("failed to fetch channel_user")? - .unwrap_or((false,)); + .unwrap_or_default(); yield Ok(FollowStream { user_id: user_id.into(), diff --git a/platform/api/src/api/v1/upload/profile_picture.rs b/platform/api/src/api/v1/upload/profile_picture.rs index 2a455890..cca332a8 100644 --- a/platform/api/src/api/v1/upload/profile_picture.rs +++ b/platform/api/src/api/v1/upload/profile_picture.rs @@ -173,27 +173,32 @@ impl UploadType for ProfilePicture { image_format.ext() ); - let mut tx = global + let mut client = global .db() - .begin() + .get() .await - .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to begin transaction"))?; + .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to get database connection"))?; + let tx = client + .transaction() + .await + .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to start transaction"))?; - sqlx::query("INSERT INTO image_jobs (id, priority, task) VALUES ($1, $2, $3)") - .bind(common::database::Ulid(file_id)) + common::database::query("INSERT INTO image_jobs (id, priority, task) VALUES ($1, $2, $3)") + .bind(file_id) .bind(config.profile_picture_task_priority) .bind(common::database::Protobuf(create_task( file_id, &input_path, config, - auth.session.user_id.0, + auth.session.user_id, ))) - .execute(tx.as_mut()) + .build() + .execute(&tx) .await .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to insert image job"))?; - sqlx::query("INSERT INTO uploaded_files(id, owner_id, uploader_id, name, type, metadata, total_size, pending, path) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)") - .bind(common::database::Ulid(file_id)) // id + common::database::query("INSERT INTO uploaded_files(id, owner_id, uploader_id, name, type, metadata, total_size, pending, path) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)") + .bind(file_id) // id .bind(auth.session.user_id) // owner_id .bind(auth.session.user_id) // uploader_id .bind(name.unwrap_or_else(|| format!("untitled.{}", image_format.ext()))) // name @@ -206,15 +211,17 @@ impl UploadType for ProfilePicture { .bind(file.len() as i64) // total_size .bind(true) // pending .bind(&input_path) // path - .execute(tx.as_mut()) + .build() + .execute(&tx) .await .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to insert uploaded file"))?; if self.set_active { - sqlx::query("UPDATE users SET pending_profile_picture_id = $1 WHERE id = $2") - .bind(common::database::Ulid(file_id)) + common::database::query("UPDATE users SET pending_profile_picture_id = $1 WHERE id = $2") + .bind(file_id) .bind(auth.session.user_id) - .execute(tx.as_mut()) + .build() + .execute(&tx) .await .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to update user"))?; } diff --git a/platform/api/src/config.rs b/platform/api/src/config.rs index 6accc1b8..c470e3b7 100644 --- a/platform/api/src/config.rs +++ b/platform/api/src/config.rs @@ -74,7 +74,7 @@ pub struct ImageUploaderConfig { pub profile_picture_callback_subject: String, /// Profile picture task priority, higher number means higher priority - pub profile_picture_task_priority: i32, + pub profile_picture_task_priority: i64, /// Public endpoint for downloads pub public_endpoint: String, diff --git a/platform/api/src/database/category.rs b/platform/api/src/database/category.rs index be26d2da..a2fdc1d0 100644 --- a/platform/api/src/database/category.rs +++ b/platform/api/src/database/category.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; -use common::database::Ulid; +use ulid::Ulid; -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct Category { pub id: Ulid, pub name: String, diff --git a/platform/api/src/database/channel.rs b/platform/api/src/database/channel.rs index f1e24ad8..597a46fe 100644 --- a/platform/api/src/database/channel.rs +++ b/platform/api/src/database/channel.rs @@ -1,55 +1,56 @@ use async_graphql::SimpleObject; use chrono::{DateTime, Utc}; -use common::database::Ulid; +use common::database::json; +use ulid::Ulid; -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct Channel { /// Ulid of the channel pub id: Ulid, /// Video room id - #[sqlx(rename = "channel_room_id")] + #[from_row(rename = "channel_room_id")] pub room_id: Ulid, /// Active connection id - #[sqlx(rename = "channel_active_connection_id")] + #[from_row(rename = "channel_active_connection_id")] pub active_connection_id: Option, /// The current stream's title - #[sqlx(rename = "channel_title")] + #[from_row(rename = "channel_title")] pub title: Option, /// The current stream's live viewer count - #[sqlx(rename = "channel_live_viewer_count")] + #[from_row(rename = "channel_live_viewer_count")] pub live_viewer_count: Option, /// The time the current stream's live viewer count was last updated - #[sqlx(rename = "channel_live_viewer_count_updated_at")] + #[from_row(rename = "channel_live_viewer_count_updated_at")] pub live_viewer_count_updated_at: Option>, /// The current stream's description - #[sqlx(rename = "channel_description")] + #[from_row(rename = "channel_description")] pub description: Option, /// The social links - #[sqlx(rename = "channel_links")] - pub links: sqlx::types::Json>, + #[from_row(rename = "channel_links", from_fn = "json")] + pub links: Vec, /// The current stream's thumbnail - #[sqlx(rename = "channel_custom_thumbnail_id")] + #[from_row(rename = "channel_custom_thumbnail_id")] pub custom_thumbnail_id: Option, /// The offline banner of the channel - #[sqlx(rename = "channel_offline_banner_id")] + #[from_row(rename = "channel_offline_banner_id")] pub offline_banner_id: Option, /// The current stream's category - #[sqlx(rename = "channel_category_id")] + #[from_row(rename = "channel_category_id")] pub category_id: Option, /// Channel stream key - #[sqlx(rename = "channel_stream_key")] + #[from_row(rename = "channel_stream_key")] pub stream_key: Option, /// Channel roles order - #[sqlx(rename = "channel_role_order")] + #[from_row(rename = "channel_role_order")] pub role_order: Vec, /// Channel default permissions - #[sqlx(rename = "channel_default_permissions")] + #[from_row(rename = "channel_default_permissions")] pub default_permissions: i64, /// Channel permissions for followers - #[sqlx(rename = "channel_following_permission")] + #[from_row(rename = "channel_following_permission")] pub following_permission: i64, /// The time the channel was last live - #[sqlx(rename = "channel_last_live_at")] + #[from_row(rename = "channel_last_live_at")] pub last_live_at: Option>, } diff --git a/platform/api/src/database/chat_message.rs b/platform/api/src/database/chat_message.rs index b6186c2a..5f27c08e 100644 --- a/platform/api/src/database/chat_message.rs +++ b/platform/api/src/database/chat_message.rs @@ -1,6 +1,6 @@ -use common::database::Ulid; +use ulid::Ulid; -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct ChatMessage { /// The unique identifier for the chat message. pub id: Ulid, @@ -17,9 +17,9 @@ pub struct ChatMessage { impl ChatMessage { pub fn to_protobuf(&self) -> pb::scuffle::platform::internal::events::ChatMessage { pb::scuffle::platform::internal::events::ChatMessage { - id: Some(self.id.0.into()), - channel_id: Some(self.channel_id.0.into()), - user_id: Some(self.user_id.0.into()), + id: Some(self.id.into()), + channel_id: Some(self.channel_id.into()), + user_id: Some(self.user_id.into()), content: self.content.clone(), } } diff --git a/platform/api/src/database/file_type.rs b/platform/api/src/database/file_type.rs index af05e759..fc90e04b 100644 --- a/platform/api/src/database/file_type.rs +++ b/platform/api/src/database/file_type.rs @@ -1,13 +1,16 @@ -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, sqlx::Type)] +use postgres_types::{FromSql, ToSql}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ToSql, FromSql)] +#[postgres(name = "file_type")] pub enum FileType { - #[sqlx(rename = "custom_thumbnail")] + #[postgres(name = "custom_thumbnail")] CustomThumbnail, - #[sqlx(rename = "profile_picture")] + #[postgres(name = "profile_picture")] ProfilePicture, - #[sqlx(rename = "offline_banner")] + #[postgres(name = "offline_banner")] OfflineBanner, - #[sqlx(rename = "role_badge")] + #[postgres(name = "role_badge")] RoleBadge, - #[sqlx(rename = "channel_role_badge")] + #[postgres(name = "channel_role_badge")] ChannelRoleBadge, } diff --git a/platform/api/src/database/global_state.rs b/platform/api/src/database/global_state.rs index ba0c6122..0def5554 100644 --- a/platform/api/src/database/global_state.rs +++ b/platform/api/src/database/global_state.rs @@ -1,8 +1,8 @@ -use common::database::Ulid; +use ulid::Ulid; use super::RolePermission; -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct GlobalState { pub role_order: Vec, pub default_permissions: RolePermission, diff --git a/platform/api/src/database/role.rs b/platform/api/src/database/role.rs index 82554d17..e93fc253 100644 --- a/platform/api/src/database/role.rs +++ b/platform/api/src/database/role.rs @@ -1,7 +1,7 @@ use bitmask_enum::bitmask; -use common::database::Ulid; +use ulid::Ulid; -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] /// A role that can be granted to a user. /// Roles can allow or deny permissions to a user. pub struct Role { @@ -33,15 +33,29 @@ pub enum RolePermission { UploadProfilePicture, } -impl sqlx::Decode<'_, sqlx::Postgres> for RolePermission { - fn decode(value: sqlx::postgres::PgValueRef<'_>) -> Result> { - >::decode(value).map(Self::from) +impl<'a> postgres_types::FromSql<'a> for RolePermission { + fn accepts(ty: &postgres_types::Type) -> bool { + ::accepts(ty) + } + + fn from_sql(ty: &postgres_types::Type, raw: &'a [u8]) -> Result> { + ::from_sql(ty, raw).map(Self::from) } } -impl sqlx::Type for RolePermission { - fn type_info() -> sqlx::postgres::PgTypeInfo { - >::type_info() +impl postgres_types::ToSql for RolePermission { + postgres_types::to_sql_checked!(); + + fn to_sql( + &self, + ty: &postgres_types::Type, + out: &mut bytes::BytesMut, + ) -> Result> { + ::to_sql(&self.bits(), ty, out) + } + + fn accepts(ty: &postgres_types::Type) -> bool { + ::accepts(ty) } } diff --git a/platform/api/src/database/search_result.rs b/platform/api/src/database/search_result.rs index f0035ee2..0ea00096 100644 --- a/platform/api/src/database/search_result.rs +++ b/platform/api/src/database/search_result.rs @@ -1,7 +1,7 @@ -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct SearchResult { /// The category. - #[sqlx(flatten)] + #[from_row(flatten)] pub object: T, /// The similarity of the search query to the category's name. pub similarity: f64, diff --git a/platform/api/src/database/session.rs b/platform/api/src/database/session.rs index 19b50164..f6c09c0b 100644 --- a/platform/api/src/database/session.rs +++ b/platform/api/src/database/session.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; -use common::database::Ulid; +use ulid::Ulid; -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct Session { /// The unique identifier for the session. pub id: Ulid, diff --git a/platform/api/src/database/two_fa_request.rs b/platform/api/src/database/two_fa_request.rs index b086a246..ee06f0be 100644 --- a/platform/api/src/database/two_fa_request.rs +++ b/platform/api/src/database/two_fa_request.rs @@ -1,19 +1,22 @@ use std::sync::Arc; use chrono::{Duration, Utc}; -use common::database::{Protobuf, Ulid}; +use common::database::protobuf; use pb::ext::UlidExt; use pb::scuffle::platform::internal::two_fa::two_fa_request_action::{ChangePassword, Login}; use pb::scuffle::platform::internal::two_fa::TwoFaRequestAction; +use ulid::Ulid; use super::{Session, User}; use crate::global::ApiGlobal; -#[derive(Debug, Clone, sqlx::FromRow)] +#[derive(Debug, Clone, postgres_from_row::FromRow)] pub struct TwoFaRequest { pub id: Ulid, pub user_id: Ulid, - pub action: Protobuf, + + #[from_row(from_fn = "protobuf")] + pub action: TwoFaRequestAction, } #[allow(async_fn_in_trait)] @@ -24,15 +27,16 @@ pub trait TwoFaRequestActionTrait { } impl TwoFaRequestActionTrait for Login { - type Result = sqlx::Result; + type Result = Result; async fn execute(self, global: &Arc, user_id: Ulid) -> Self::Result { let expires_at = Utc::now() + Duration::seconds(self.login_duration as i64); // TODO: maybe look to batch this - let mut tx = global.db().begin().await?; + let mut client = global.db().get().await?; + let tx = client.transaction().await?; - let session: Session = sqlx::query_as( + let session = common::database::query( r#" INSERT INTO user_sessions ( id, @@ -45,13 +49,14 @@ impl TwoFaRequestActionTrait for Login { ) RETURNING * "#, ) - .bind(Ulid::from(ulid::Ulid::new())) + .bind(ulid::Ulid::new()) .bind(user_id) .bind(expires_at) - .fetch_one(tx.as_mut()) + .build_query_as() + .fetch_one(&tx) .await?; - sqlx::query( + common::database::query( r#" UPDATE users SET @@ -60,7 +65,8 @@ impl TwoFaRequestActionTrait for Login { "#, ) .bind(user_id) - .execute(tx.as_mut()) + .build() + .execute(&tx) .await?; tx.commit().await?; @@ -70,12 +76,13 @@ impl TwoFaRequestActionTrait for Login { } impl TwoFaRequestActionTrait for ChangePassword { - type Result = sqlx::Result<()>; + type Result = Result<(), common::database::deadpool_postgres::PoolError>; - async fn execute(self, global: &Arc, user_id: Ulid) -> sqlx::Result<()> { - let mut tx = global.db().begin().await?; + async fn execute(self, global: &Arc, user_id: Ulid) -> Self::Result { + let mut client = global.db().get().await?; + let tx = client.transaction().await?; - let user: User = sqlx::query_as( + let user: User = common::database::query( r#" UPDATE users @@ -88,11 +95,12 @@ impl TwoFaRequestActionTrait for ChangePassword { ) .bind(self.new_password_hash) .bind(user_id) - .fetch_one(tx.as_mut()) + .build_query_as() + .fetch_one(&tx) .await?; // Delete all sessions except current - sqlx::query( + common::database::query( r#" DELETE FROM user_sessions @@ -102,8 +110,9 @@ impl TwoFaRequestActionTrait for ChangePassword { "#, ) .bind(user.id) - .bind(Ulid::from(self.current_session_id.into_ulid())) - .execute(tx.as_mut()) + .bind(self.current_session_id.into_ulid()) + .build() + .execute(&tx) .await?; tx.commit().await?; diff --git a/platform/api/src/database/uploaded_file.rs b/platform/api/src/database/uploaded_file.rs index 37c923e0..6534c448 100644 --- a/platform/api/src/database/uploaded_file.rs +++ b/platform/api/src/database/uploaded_file.rs @@ -1,16 +1,18 @@ -use common::database::{Protobuf, Ulid}; +use common::database::protobuf; +use ulid::Ulid; use super::FileType; -#[derive(Debug, Clone, sqlx::FromRow)] +#[derive(Debug, Clone, postgres_from_row::FromRow)] pub struct UploadedFile { pub id: Ulid, pub owner_id: Ulid, pub uploader_id: Ulid, pub name: String, - #[sqlx(rename = "type")] + #[from_row(rename = "type")] pub ty: FileType, - pub metadata: Protobuf, + #[from_row(from_fn = "protobuf")] + pub metadata: pb::scuffle::platform::internal::types::UploadedFileMetadata, pub total_size: i64, pub pending: bool, pub path: String, diff --git a/platform/api/src/database/user.rs b/platform/api/src/database/user.rs index f2e230d3..8967b6f3 100644 --- a/platform/api/src/database/user.rs +++ b/platform/api/src/database/user.rs @@ -2,8 +2,8 @@ use argon2::password_hash::rand_core::OsRng; use argon2::password_hash::SaltString; use argon2::{Argon2, PasswordHash, PasswordHasher, PasswordVerifier}; use chrono::{DateTime, Utc}; -use common::database::Ulid; use rand::Rng; +use ulid::Ulid; use super::Channel; @@ -17,7 +17,7 @@ pub enum TotpError { Generate, } -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct User { /// The unique identifier for the user. pub id: Ulid, @@ -52,7 +52,7 @@ pub struct User { pub roles: Vec, /// Channel - #[sqlx(flatten)] + #[from_row(flatten)] pub channel: Channel, } diff --git a/platform/api/src/dataloader/category.rs b/platform/api/src/dataloader/category.rs index 4a461bb2..3b092555 100644 --- a/platform/api/src/dataloader/category.rs +++ b/platform/api/src/dataloader/category.rs @@ -2,16 +2,15 @@ use std::sync::Arc; use common::dataloader::{DataLoader, Loader, LoaderOutput}; use ulid::Ulid; -use uuid::Uuid; use crate::database::Category; pub struct CategoryByIdLoader { - db: Arc, + db: Arc, } impl CategoryByIdLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -22,14 +21,15 @@ impl Loader for CategoryByIdLoader { type Value = Category; async fn load(&self, keys: &[Self::Key]) -> LoaderOutput { - let results: Vec = sqlx::query_as("SELECT * FROM categories WHERE id = ANY($1)") - .bind(keys.iter().copied().map(Uuid::from).collect::>()) - .fetch_all(self.db.as_ref()) + let results: Vec = common::database::query("SELECT * FROM categories WHERE id = ANY($1)") + .bind(keys) + .build_query_as() + .fetch_all(&self.db) .await .map_err(|e| { tracing::error!(err = %e, "failed to fetch categories by id"); })?; - Ok(results.into_iter().map(|r| (r.id.0, r)).collect()) + Ok(results.into_iter().map(|r| (r.id, r)).collect()) } } diff --git a/platform/api/src/dataloader/global_state.rs b/platform/api/src/dataloader/global_state.rs index 5282bc21..5548a1b4 100644 --- a/platform/api/src/dataloader/global_state.rs +++ b/platform/api/src/dataloader/global_state.rs @@ -6,11 +6,11 @@ use common::dataloader::{DataLoader, Loader, LoaderOutput}; use crate::database::GlobalState; pub struct GlobalStateLoader { - db: Arc, + db: Arc, } impl GlobalStateLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -21,8 +21,9 @@ impl Loader for GlobalStateLoader { type Value = GlobalState; async fn load(&self, _: &[Self::Key]) -> LoaderOutput { - let state = sqlx::query_as("SELECT * FROM global_state") - .fetch_one(self.db.as_ref()) + let state = common::database::query("SELECT * FROM global_state") + .build_query_as() + .fetch_one(&self.db) .await .map_err(|e| { tracing::error!(err = %e, "failed to fetch global state"); diff --git a/platform/api/src/dataloader/role.rs b/platform/api/src/dataloader/role.rs index 7eff9da8..bfb1c92b 100644 --- a/platform/api/src/dataloader/role.rs +++ b/platform/api/src/dataloader/role.rs @@ -2,16 +2,15 @@ use std::sync::Arc; use common::dataloader::{DataLoader, Loader, LoaderOutput}; use ulid::Ulid; -use uuid::Uuid; use crate::database::Role; pub struct RoleByIdLoader { - db: Arc, + db: Arc, } impl RoleByIdLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -22,14 +21,15 @@ impl Loader for RoleByIdLoader { type Value = Role; async fn load(&self, keys: &[Self::Key]) -> LoaderOutput { - let results: Vec = sqlx::query_as("SELECT * FROM roles WHERE id = ANY($1)") - .bind(keys.iter().copied().map(Uuid::from).collect::>()) + let results: Vec = common::database::query("SELECT * FROM roles WHERE id = ANY($1)") + .bind(keys) + .build_query_as() .fetch_all(self.db.as_ref()) .await .map_err(|e| { tracing::error!(err = %e, "failed to fetch roles"); })?; - Ok(results.into_iter().map(|r| (r.id.0, r)).collect()) + Ok(results.into_iter().map(|r| (r.id, r)).collect()) } } diff --git a/platform/api/src/dataloader/session.rs b/platform/api/src/dataloader/session.rs index 4c95ddcd..024e5d01 100644 --- a/platform/api/src/dataloader/session.rs +++ b/platform/api/src/dataloader/session.rs @@ -2,16 +2,15 @@ use std::sync::Arc; use common::dataloader::{DataLoader, Loader, LoaderOutput}; use ulid::Ulid; -use uuid::Uuid; use crate::database::Session; pub struct SessionByIdLoader { - db: Arc, + db: Arc, } impl SessionByIdLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -22,14 +21,15 @@ impl Loader for SessionByIdLoader { type Value = Session; async fn load(&self, keys: &[Self::Key]) -> LoaderOutput { - let results: Vec = sqlx::query_as("SELECT * FROM user_sessions WHERE id = ANY($1)") - .bind(keys.iter().copied().map(Uuid::from).collect::>()) + let results: Vec = common::database::query("SELECT * FROM user_sessions WHERE id = ANY($1)") + .bind(keys) + .build_query_as() .fetch_all(self.db.as_ref()) .await .map_err(|e| { tracing::error!(err = %e, "failed to fetch sessions"); })?; - Ok(results.into_iter().map(|r| (r.id.0, r)).collect()) + Ok(results.into_iter().map(|r| (r.id, r)).collect()) } } diff --git a/platform/api/src/dataloader/uploaded_file.rs b/platform/api/src/dataloader/uploaded_file.rs index 7df07f14..9820e23a 100644 --- a/platform/api/src/dataloader/uploaded_file.rs +++ b/platform/api/src/dataloader/uploaded_file.rs @@ -6,11 +6,11 @@ use ulid::Ulid; use crate::database::UploadedFile; pub struct UploadedFileByIdLoader { - db: Arc, + db: Arc, } impl UploadedFileByIdLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -21,14 +21,15 @@ impl Loader for UploadedFileByIdLoader { type Value = UploadedFile; async fn load(&self, keys: &[Self::Key]) -> LoaderOutput { - let results: Vec = sqlx::query_as("SELECT * FROM uploaded_files WHERE id = ANY($1)") - .bind(keys.iter().copied().map(common::database::Ulid).collect::>()) + let results: Vec = common::database::query("SELECT * FROM uploaded_files WHERE id = ANY($1)") + .bind(keys) + .build_query_as() .fetch_all(self.db.as_ref()) .await .map_err(|e| { tracing::error!(err = %e, "failed to fetch users by username"); })?; - Ok(results.into_iter().map(|r| (r.id.0, r)).collect()) + Ok(results.into_iter().map(|r| (r.id, r)).collect()) } } diff --git a/platform/api/src/dataloader/user.rs b/platform/api/src/dataloader/user.rs index b59adc2b..591d9f12 100644 --- a/platform/api/src/dataloader/user.rs +++ b/platform/api/src/dataloader/user.rs @@ -2,16 +2,15 @@ use std::sync::Arc; use common::dataloader::{DataLoader, Loader, LoaderOutput}; use ulid::Ulid; -use uuid::Uuid; use crate::database::User; pub struct UserByUsernameLoader { - db: Arc, + db: Arc, } impl UserByUsernameLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -22,8 +21,9 @@ impl Loader for UserByUsernameLoader { type Value = User; async fn load(&self, keys: &[Self::Key]) -> LoaderOutput { - let results: Vec = sqlx::query_as("SELECT * FROM users WHERE username = ANY($1)") + let results: Vec = common::database::query("SELECT * FROM users WHERE username = ANY($1)") .bind(keys) + .build_query_as() .fetch_all(self.db.as_ref()) .await .map_err(|e| { @@ -35,11 +35,11 @@ impl Loader for UserByUsernameLoader { } pub struct UserByIdLoader { - db: Arc, + db: Arc, } impl UserByIdLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -50,14 +50,15 @@ impl Loader for UserByIdLoader { type Value = User; async fn load(&self, keys: &[Self::Key]) -> LoaderOutput { - let results: Vec = sqlx::query_as("SELECT * FROM users WHERE id = ANY($1)") - .bind(keys.iter().copied().map(Uuid::from).collect::>()) + let results: Vec = common::database::query("SELECT * FROM users WHERE id = ANY($1)") + .bind(keys) + .build_query_as() .fetch_all(self.db.as_ref()) .await .map_err(|e| { tracing::error!(err = %e, "failed to fetch users by id"); })?; - Ok(results.into_iter().map(|r| (r.id.0, r)).collect()) + Ok(results.into_iter().map(|r| (r.id, r)).collect()) } } diff --git a/platform/api/src/image_upload_callback.rs b/platform/api/src/image_upload_callback.rs index 213d3e3d..9fa43de0 100644 --- a/platform/api/src/image_upload_callback.rs +++ b/platform/api/src/image_upload_callback.rs @@ -68,18 +68,20 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { }; tracing::debug!("received profile picture job result: {:?}", job_result); - let mut tx = global.db().begin().await.context("failed to begin transaction")?; + let mut client = global.db().get().await.context("failed to get db connection")?; + let tx = client.transaction().await.context("failed to start transaction")?; match job_result { processed_image::Result::Success(processed_image::Success { variants }) => { - let uploaded_file: UploadedFile = match sqlx::query_as("UPDATE uploaded_files SET pending = FALSE, metadata = $1, updated_at = NOW() WHERE id = $2 AND pending = TRUE RETURNING *") + let uploaded_file: UploadedFile = match common::database::query("UPDATE uploaded_files SET pending = FALSE, metadata = $1, updated_at = NOW() WHERE id = $2 AND pending = TRUE RETURNING *") .bind(common::database::Protobuf(UploadedFileMetadata { metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { versions: variants, })), })) - .bind(common::database::Ulid(job_id.into_ulid())) - .fetch_optional(tx.as_mut()) + .bind(job_id.into_ulid()) + .build_query_as() + .fetch_optional(&tx) .await .context("failed to get uploaded file")? { Some(uploaded_file) => uploaded_file, @@ -93,21 +95,22 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { global .nats() .publish( - SubscriptionTopic::UploadedFileStatus(uploaded_file.id.0), + SubscriptionTopic::UploadedFileStatus(uploaded_file.id), pb::scuffle::platform::internal::events::UploadedFileStatus { - file_id: Some(uploaded_file.id.0.into()), + file_id: Some(uploaded_file.id.into()), status: Some(pb::scuffle::platform::internal::events::uploaded_file_status::Status::Success(pb::scuffle::platform::internal::events::uploaded_file_status::Success {})), }.encode_to_vec().into(), ) .await .context("failed to publish file update event")?; - let user_updated = sqlx::query("UPDATE users SET profile_picture_id = $1, pending_profile_picture_id = NULL, updated_at = NOW() WHERE id = $2 AND pending_profile_picture_id = $1") + let user_updated = common::database::query("UPDATE users SET profile_picture_id = $1, pending_profile_picture_id = NULL, updated_at = NOW() WHERE id = $2 AND pending_profile_picture_id = $1") .bind(uploaded_file.id) .bind(uploaded_file.owner_id) - .execute(tx.as_mut()) + .build() + .execute(&tx) .await - .context("failed to update user")?.rows_affected() == 1; + .context("failed to update user")? == 1; if let Err(err) = tx.commit().await.context("failed to commit transaction") { tracing::warn!(error = %err, "failed to commit transaction"); @@ -119,10 +122,10 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { global .nats() .publish( - SubscriptionTopic::UserProfilePicture(uploaded_file.owner_id.0), + SubscriptionTopic::UserProfilePicture(uploaded_file.owner_id), pb::scuffle::platform::internal::events::UserProfilePicture { - user_id: Some(uploaded_file.owner_id.0.into()), - profile_picture_id: Some(uploaded_file.id.0.into()), + user_id: Some(uploaded_file.owner_id.into()), + profile_picture_id: Some(uploaded_file.id.into()), }.encode_to_vec().into(), ) .await @@ -130,10 +133,11 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { } }, processed_image::Result::Failure(processed_image::Failure { reason, friendly_message }) => { - let uploaded_file: UploadedFile = match sqlx::query_as("UPDATE uploaded_files SET pending = FALSE, failed = $1, updated_at = NOW() WHERE id = $2 AND pending = TRUE RETURNING *") + let uploaded_file: UploadedFile = match common::database::query("UPDATE uploaded_files SET pending = FALSE, failed = $1, updated_at = NOW() WHERE id = $2 AND pending = TRUE RETURNING *") .bind(reason.clone()) - .bind(common::database::Ulid(job_id.into_ulid())) - .fetch_optional(tx.as_mut()) + .bind(job_id.into_ulid()) + .build_query_as() + .fetch_optional(&tx) .await .context("failed to get uploaded file")? { Some(uploaded_file) => uploaded_file, @@ -147,9 +151,9 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { global .nats() .publish( - SubscriptionTopic::UploadedFileStatus(uploaded_file.id.0), + SubscriptionTopic::UploadedFileStatus(uploaded_file.id), pb::scuffle::platform::internal::events::UploadedFileStatus { - file_id: Some(uploaded_file.id.0.into()), + file_id: Some(uploaded_file.id.into()), status: Some(pb::scuffle::platform::internal::events::uploaded_file_status::Status::Failure(pb::scuffle::platform::internal::events::uploaded_file_status::Failure { reason, friendly_message, @@ -159,10 +163,11 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { .await .context("failed to publish file update event")?; - sqlx::query("UPDATE users SET pending_profile_picture_id = NULL, updated_at = NOW() WHERE id = $1 AND pending_profile_picture_id = $2") + common::database::query("UPDATE users SET pending_profile_picture_id = NULL, updated_at = NOW() WHERE id = $1 AND pending_profile_picture_id = $2") .bind(uploaded_file.owner_id) .bind(uploaded_file.id) - .execute(tx.as_mut()) + .build() + .execute(&tx) .await .context("failed to update user")?; diff --git a/platform/api/src/main.rs b/platform/api/src/main.rs index a3e06a2e..4d356a3c 100644 --- a/platform/api/src/main.rs +++ b/platform/api/src/main.rs @@ -80,7 +80,7 @@ struct GlobalState { config: AppConfig, nats: async_nats::Client, jetstream: async_nats::jetstream::Context, - db: Arc, + db: Arc, category_by_id_loader: DataLoader, global_state_loader: DataLoader, diff --git a/platform/api/src/video_event_handler.rs b/platform/api/src/video_event_handler.rs index 70726d07..0b9a729e 100644 --- a/platform/api/src/video_event_handler.rs +++ b/platform/api/src/video_event_handler.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use anyhow::Context; -use common::database::Ulid; use pb::scuffle::video::v1::types::{event, Event}; use pb::scuffle::video::v1::{EventsAckRequest, EventsFetchRequest}; use prost::Message; @@ -63,19 +62,21 @@ async fn handle_room_event(global: &Arc, event: event::Room, ti .await .context("failed to fetch playback session count")?; - let (channel_id,): (common::database::Ulid,) = sqlx::query_as("UPDATE users SET channel_active_connection_id = $1, channel_live_viewer_count = $2, channel_live_viewer_count_updated_at = NOW(), channel_last_live_at = $3 WHERE channel_room_id = $4 RETURNING id") - .bind(Ulid::from(connection_id.into_ulid())) + let channel_id = common::database::query("UPDATE users SET channel_active_connection_id = $1, channel_live_viewer_count = $2, channel_live_viewer_count_updated_at = NOW(), channel_last_live_at = $3 WHERE channel_room_id = $4 RETURNING id") + .bind(connection_id.into_ulid()) .bind(live_viewer_count) .bind(chrono::NaiveDateTime::from_timestamp_millis(timestamp)) - .bind(Ulid::from(room_id.into_ulid())) - .fetch_one(global.db().as_ref()) + .bind(room_id.into_ulid()) + .build_query_single_scalar() + .fetch_one(global.db()) .await?; + global .nats() .publish( - SubscriptionTopic::ChannelLive(channel_id.0), + SubscriptionTopic::ChannelLive(channel_id), pb::scuffle::platform::internal::events::ChannelLive { - channel_id: Some(channel_id.0.into()), + channel_id: Some(channel_id.into()), live: true, } .encode_to_vec() @@ -88,18 +89,20 @@ async fn handle_room_event(global: &Arc, event: event::Room, ti connection_id: Some(connection_id), .. }) => { - let res: Option<(common::database::Ulid,)> = sqlx::query_as("UPDATE users SET channel_active_connection_id = NULL, channel_live_viewer_count = 0, channel_live_viewer_count_updated_at = NOW() WHERE channel_room_id = $1 AND channel_active_connection_id = $2 RETURNING id") - .bind(Ulid::from(room_id.into_ulid())) - .bind(Ulid::from(connection_id.into_ulid())) - .fetch_optional(global.db().as_ref()) + let res = common::database::query("UPDATE users SET channel_active_connection_id = NULL, channel_live_viewer_count = 0, channel_live_viewer_count_updated_at = NOW() WHERE channel_room_id = $1 AND channel_active_connection_id = $2 RETURNING id") + .bind(room_id.into_ulid()) + .bind(connection_id.into_ulid()) + .build_query_single_scalar() + .fetch_optional(global.db()) .await?; - if let Some((channel_id,)) = res { + + if let Some(channel_id) = res { global .nats() .publish( - SubscriptionTopic::ChannelLive(channel_id.0), + SubscriptionTopic::ChannelLive(channel_id), pb::scuffle::platform::internal::events::ChannelLive { - channel_id: Some(channel_id.0.into()), + channel_id: Some(channel_id.into()), live: false, } .encode_to_vec() diff --git a/platform/image_processor/Cargo.toml b/platform/image_processor/Cargo.toml index ed5805e5..db84f3ea 100644 --- a/platform/image_processor/Cargo.toml +++ b/platform/image_processor/Cargo.toml @@ -10,8 +10,8 @@ tracing = "0.1" tokio = { version = "1.34", features = ["full"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "tls-rustls", "json", "chrono", "uuid"] } ulid = { version = "1.1", features = ["uuid"] } +postgres-from-row = "0.5" prost = "0.12" aws-config = "1.1" aws-sdk-s3 = "1.12" @@ -22,7 +22,6 @@ tonic = "0.10" futures = "0.3" thiserror = "1.0" file-format = "0.23" -itertools = "0.12" scopeguard = "1.2" rgb = "0.8" imgref = "1.10" @@ -42,6 +41,3 @@ config = { workspace = true } pb = { workspace = true } binary-helper = { workspace = true } ffmpeg = { workspace = true, features = ["task-abort", "tracing"] } - -[dev-dependencies] -dotenvy = "0.15.7" diff --git a/platform/image_processor/src/database.rs b/platform/image_processor/src/database.rs index b70b4e12..2a4b292c 100644 --- a/platform/image_processor/src/database.rs +++ b/platform/image_processor/src/database.rs @@ -1,11 +1,13 @@ -use common::database::{Protobuf, Ulid}; +use common::database::protobuf; use pb::scuffle::platform::internal::image_processor::Task; +use ulid::Ulid; // The actual table has more columns but we only need id and task to process a // job -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct Job { pub id: Ulid, - pub task: Protobuf, + #[from_row(from_fn = "protobuf")] + pub task: Task, } diff --git a/platform/image_processor/src/main.rs b/platform/image_processor/src/main.rs index 4fd2b501..ad191f85 100644 --- a/platform/image_processor/src/main.rs +++ b/platform/image_processor/src/main.rs @@ -23,7 +23,7 @@ type AppConfig = binary_helper::config::AppConfig; struct GlobalState { ctx: Context, - db: Arc, + db: Arc, config: AppConfig, nats: async_nats::Client, jetstream: async_nats::jetstream::Context, diff --git a/platform/image_processor/src/processor/error.rs b/platform/image_processor/src/processor/error.rs index c34c021d..dfca20c5 100644 --- a/platform/image_processor/src/processor/error.rs +++ b/platform/image_processor/src/processor/error.rs @@ -21,8 +21,11 @@ pub enum ProcessorError { #[error("semaphore ticket acquire: {0}")] SemaphoreAcquire(#[from] tokio::sync::AcquireError), - #[error("sqlx: {0}")] - Sqlx(#[from] sqlx::Error), + #[error("database: {0}")] + Database(#[from] common::database::tokio_postgres::Error), + + #[error("database pool: {0}")] + DatabasePool(#[from] common::database::deadpool_postgres::PoolError), #[error("lost job")] LostJob, diff --git a/platform/image_processor/src/processor/job/mod.rs b/platform/image_processor/src/processor/job/mod.rs index deea821c..575a144a 100644 --- a/platform/image_processor/src/processor/job/mod.rs +++ b/platform/image_processor/src/processor/job/mod.rs @@ -86,7 +86,7 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { .publish( self.job.task.callback_subject.clone(), pb::scuffle::platform::internal::events::ProcessedImage { - job_id: Some(self.job.id.0.into()), + job_id: Some(self.job.id.into()), result: Some(pb::scuffle::platform::internal::events::processed_image::Result::Failure( pb::scuffle::platform::internal::events::processed_image::Failure { reason: e.to_string(), @@ -106,7 +106,7 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { } // delete job - utils::delete_job(self.global, self.job.id.0).await?; + utils::delete_job(self.global, self.job.id).await?; Ok(()) } @@ -114,7 +114,7 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { async fn process_with_timeout(&self) -> Result<()> { let mut interval = tokio::time::interval(std::time::Duration::from_secs(15)); - let job_id = self.job.id.0; + let job_id = self.job.id; let max_processing_time_ms = self.job.task.limits.as_ref().map(|l| l.max_processing_time_ms); let time_limit = async { @@ -208,7 +208,7 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { .publish( self.job.task.callback_subject.clone(), pb::scuffle::platform::internal::events::ProcessedImage { - job_id: Some(self.job.id.0.into()), + job_id: Some(self.job.id.into()), result: Some(pb::scuffle::platform::internal::events::processed_image::Result::Success( pb::scuffle::platform::internal::events::processed_image::Success { variants: images diff --git a/platform/image_processor/src/processor/utils.rs b/platform/image_processor/src/processor/utils.rs index f573242a..9d187e09 100644 --- a/platform/image_processor/src/processor/utils.rs +++ b/platform/image_processor/src/processor/utils.rs @@ -8,7 +8,7 @@ use crate::global::ImageProcessorGlobal; use crate::processor::error::Result; pub async fn query_job(global: &Arc) -> Result> { - Ok(sqlx::query_as( + Ok(common::database::query( "UPDATE image_jobs SET claimed_by = $1, hold_until = NOW() + INTERVAL '30 seconds' @@ -23,33 +23,32 @@ pub async fn query_job(global: &Arc) -> Result