From 75783b3301e34d24b6743ca22547261ff5c9ce5c Mon Sep 17 00:00:00 2001 From: imotai Date: Sat, 4 Mar 2023 22:38:14 +0800 Subject: [PATCH] Support typed data (#343) * fix: replace tx with mutation * feat: add typed data verify * feat: support typed data * fix: remove install open jdk * fix: fix warning * fix: move doc check to test * fix: update case * fix: add some log * feat: meta mask test passed --- .github/workflows/ci.yml | 33 +- Cargo.toml | 4 +- src/base/src/bson_util.rs | 4 +- src/base/src/test_base.rs | 3 + src/bridge/Cargo.toml | 2 +- src/bridge/src/evm_chain_watcher.rs | 7 +- src/bridge/src/storage_chain_minter.rs | 4 +- src/cmd/Cargo.toml | 2 +- src/cmd/src/command.rs | 17 +- src/cmd/src/deposit.rs | 8 +- src/cmd/src/show_evm_account.rs | 3 +- src/crypto/Cargo.toml | 10 +- src/crypto/src/db3_keypair.rs | 26 +- src/crypto/src/db3_signature.rs | 19 + src/crypto/src/db3_signer.rs | 143 ++++++- src/crypto/src/db3_verifier.rs | 31 +- src/crypto/src/id.rs | 3 +- src/faucet/Cargo.toml | 2 +- src/node/Cargo.toml | 3 +- src/node/src/abci_impl.rs | 441 ++++++++++++---------- src/node/src/auth_storage.rs | 1 + src/node/src/command.rs | 15 +- src/node/src/hash_util.rs | 25 -- src/node/src/lib.rs | 1 - src/node/src/storage_node_impl.rs | 136 +++++-- src/node/tests/command_test.rs | 1 - src/node/tests/node_test.rs | 37 +- src/proto/build.rs | 1 + src/proto/proto/db3_event.proto | 84 +++++ src/proto/proto/db3_mutation.proto | 3 + src/proto/proto/db3_node.proto | 15 + src/proto/proto/db3_session.proto | 32 +- src/proto/src/lib.rs | 3 + src/sdk/Cargo.toml | 4 +- src/sdk/src/faucet_sdk.rs | 8 +- src/sdk/src/mutation_sdk.rs | 159 ++++++-- src/sdk/src/sdk_test.rs | 30 +- src/sdk/src/store_sdk.rs | 302 +++++++++++---- src/session/Cargo.toml | 5 +- src/session/src/query_session_verifier.rs | 269 +++++++++---- src/session/src/session_manager.rs | 1 + src/storage/src/bill_store.rs | 1 - src/storage/src/db3_document.rs | 25 +- src/storage/src/db_key.rs | 3 + src/storage/src/db_store.rs | 18 +- src/storage/src/event_store.rs | 2 +- src/storage/src/faucet_key.rs | 1 + src/storage/src/faucet_store.rs | 2 +- src/types/src/bill_key.rs | 4 +- src/types/src/cost.rs | 2 +- tools/start_localnet.sh | 1 + 51 files changed, 1334 insertions(+), 622 deletions(-) delete mode 100644 src/node/src/hash_util.rs create mode 100644 src/proto/proto/db3_event.proto diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4c04fe6c..babb1fcb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,30 +29,6 @@ jobs: command: fmt args: --all -- --check - docs: - name: Docs - runs-on: ubuntu-latest - steps: - - name: Checkout repository - uses: actions/checkout@v3 - with: - submodules: recursive - - name: Install Rust toolchain - uses: actions-rs/toolchain@v1 - with: - toolchain: nightly - profile: minimal - override: true - components: rustfmt - - uses: Swatinem/rust-cache@v2 - - name: Check documentation - env: - RUSTDOCFLAGS: -D warnings - uses: actions-rs/cargo@v1 - with: - command: doc - args: --no-deps --document-private-items --all-features --workspace --examples - coverage: name: test runs-on: ubuntu-latest @@ -76,9 +52,16 @@ jobs: cd bridge yarn install npx hardhat test + - name: Check documentation + env: + RUSTDOCFLAGS: -D warnings + uses: actions-rs/cargo@v1 + with: + command: doc + args: --no-deps --document-private-items --all-features --workspace --examples - name: Setup test env run: | - sudo apt-get install protobuf-compiler openjdk-8-jdk maven + sudo apt-get install protobuf-compiler protoc --version cargo build cd tools diff --git a/Cargo.toml b/Cargo.toml index f3ea3a65..f9d3f213 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,5 +15,5 @@ members = [ ] [workspace.dependencies] -fastcrypto = {git = "https://github.com/MystenLabs/fastcrypto", rev = "bbb2d02a7a64c27314721748cc4d015b00490dbe"} - +fastcrypto = {git = "https://github.com/MystenLabs/fastcrypto", rev = "306465d4fe04f6c26359d885f3b0a548b661de40"} +ethers = {git="https://github.com/gakonst/ethers-rs", rev="ed47eaadad7d751cff9c0f59517b0fc3fb284cd3"} diff --git a/src/base/src/bson_util.rs b/src/base/src/bson_util.rs index db28788d..85014018 100644 --- a/src/base/src/bson_util.rs +++ b/src/base/src/bson_util.rs @@ -140,7 +140,7 @@ pub fn filter_from_json_value(json_str: &str) -> std::result::Result filter_value_from_bson_value(v)?, @@ -203,9 +203,7 @@ mod tests { bson_document_into_bytes, bson_into_comparison_bytes, bytes_to_bson_document, json_str_to_bson_document, }; - use bson::raw::RawBson; use bson::Bson; - use bson::Document; use chrono::Utc; #[test] fn json_str_to_bson_document_ut() { diff --git a/src/base/src/test_base.rs b/src/base/src/test_base.rs index 0f3ce4f4..0fa5f233 100644 --- a/src/base/src/test_base.rs +++ b/src/base/src/test_base.rs @@ -21,8 +21,10 @@ use ethereum_types::Address as AccountAddress; use hex::FromHex; use rand::Rng; +// // this function is used for testing // +#[allow(dead_code)] pub fn get_a_static_keypair() -> Keypair { let secret_key: &[u8] = b"833fe62409237b9d62ec77587520911e9a759cec1d19755b7da901b96dca3d42"; let public_key: &[u8] = b"ec172b93ad5e563bf4932c70e1245034c35467ef2efd4d64ebf819683467e2bf"; @@ -43,6 +45,7 @@ pub fn get_a_static_address() -> AccountAddress { get_address_from_pk(&kp.public) } +#[allow(dead_code)] pub fn get_a_ts_static_keypair() -> Keypair { let secret_key: &[u8] = b"ea82176302fbf6b10a6c7ff25dc77b4b7dee0126841af0fc3621d7ed0ac7c9c99806d5ba5c35c68ff63850fb3f4c5dfc79135c3c2c76a560eeaee6f2135830d6"; let public_key: &[u8] = b"9806d5ba5c35c68ff63850fb3f4c5dfc79135c3c2c76a560eeaee6f2135830d6"; diff --git a/src/bridge/Cargo.toml b/src/bridge/Cargo.toml index 01c1f14d..70a26f11 100644 --- a/src/bridge/Cargo.toml +++ b/src/bridge/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["database", "web3", "db3"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -ethers = { version = "1.0.0", features = ["ws"] } +ethers = {workspace=true, features=["ws"]} eyre = "0.6" serde = { version = "1.0.144", features = ["derive"] } serde_json = "1.0.64" diff --git a/src/bridge/src/evm_chain_watcher.rs b/src/bridge/src/evm_chain_watcher.rs index 21198527..55881ac8 100644 --- a/src/bridge/src/evm_chain_watcher.rs +++ b/src/bridge/src/evm_chain_watcher.rs @@ -22,15 +22,12 @@ use ethers::abi::RawLog; use ethers::types::Address; use ethers::types::Filter; use ethers::{ - contract::{abigen, Contract, EthEvent}, - core::types::{ - transaction::eip2718::TypedTransaction, Log, Signature, Transaction, ValueOrArray, - }, + contract::{abigen, EthEvent}, + core::types::{transaction::eip2718::TypedTransaction, Log, Signature, Transaction}, providers::{Middleware, Provider, StreamExt, Ws}, }; use redb::Database; -use std::path::Path; use std::sync::atomic::AtomicBool; use std::sync::mpsc::SyncSender; use std::sync::Arc; diff --git a/src/bridge/src/storage_chain_minter.rs b/src/bridge/src/storage_chain_minter.rs index 9954e5a0..1dfa8689 100644 --- a/src/bridge/src/storage_chain_minter.rs +++ b/src/bridge/src/storage_chain_minter.rs @@ -24,7 +24,7 @@ use db3_proto::db3_mutation_proto::MintCreditsMutation; use db3_sdk::mutation_sdk::MutationSDK; use db3_storage::event_store::EventStore; use elliptic_curve::{consts::U32, sec1::ToEncodedPoint}; -use ethers::types::{RecoveryMessage, Signature, H256, U256}; +use ethers::types::{H256, U256}; use generic_array::GenericArray; use hex; use redb::Database; @@ -36,7 +36,7 @@ use tracing::{info, warn}; use k256::{ ecdsa::{ recoverable::{Id as RecoveryId, Signature as RecoverableSignature}, - Error as K256SignatureError, Signature as K256Signature, + Signature as K256Signature, }, PublicKey as K256PublicKey, }; diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index e43f86e6..dcfe7bdd 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -37,7 +37,7 @@ serde = { version = "1.0.144", features = ["derive"] } serde_json = "1.0.88" hex = "0.4.3" fastcrypto = { workspace = true, features = ["copy_key"] } -ethers = { version = "1.0.0", features = ["ws"] } +ethers = {workspace=true} tonic = "0.8.3" http = "0.2" eyre = "0.6" diff --git a/src/cmd/src/command.rs b/src/cmd/src/command.rs index 0f09c663..652712e6 100644 --- a/src/cmd/src/command.rs +++ b/src/cmd/src/command.rs @@ -39,7 +39,7 @@ use http::Uri; use prettytable::{format, Table}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use tonic::transport::{ClientTlsConfig, Endpoint, Server}; +use tonic::transport::{ClientTlsConfig, Endpoint}; pub struct DB3ClientContext { pub mutation_sdk: Option, @@ -210,7 +210,7 @@ impl DB3ClientCommand { fn show_document(documents: Vec) -> std::result::Result { let mut table = Table::new(); table.set_format(*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR); - table.set_titles(row!["id_base64", "owner", "document", "tx_id"]); + table.set_titles(row!["id_base64", "owner", "document", "mutation_id"]); let mut error_cnt = 0; for document in documents { if let Ok(id) = DocumentId::try_from_bytes(document.id.as_slice()) { @@ -263,7 +263,7 @@ impl DB3ClientCommand { table.set_titles(row![ "database address", "sender address", - "related transactions", + "related mutations", "collections" ]); let tx_list: String = database @@ -510,7 +510,7 @@ impl DB3ClientCommand { println!("send add collection done!"); let mut table = Table::new(); table.set_format(*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR); - table.set_titles(row!["tx_id"]); + table.set_titles(row!["mutation_id"]); table.add_row(row![tx_id.to_base64()]); Ok(table) } @@ -615,7 +615,7 @@ impl DB3ClientCommand { Ok((db_id, tx_id)) => { let mut table = Table::new(); table.set_format(*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR); - table.set_titles(row!["database address", "transaction id"]); + table.set_titles(row!["database address", "mutation id"]); table.add_row(row![db_id.to_hex(), tx_id.to_base64()]); Ok(table) } @@ -666,7 +666,7 @@ impl DB3ClientCommand { println!("send add document done"); let mut table = Table::new(); table.set_format(*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR); - table.set_titles(row!["transaction id"]); + table.set_titles(row!["mutation id"]); table.add_row(row![tx_id.to_base64()]); Ok(table) } @@ -722,10 +722,9 @@ impl DB3ClientCommand { .await { Ok((_, tx_id)) => { - println!("send update document done"); let mut table = Table::new(); table.set_format(*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR); - table.set_titles(row!["transaction id"]); + table.set_titles(row!["mutation id"]); table.add_row(row![tx_id.to_base64()]); Ok(table) } @@ -774,7 +773,7 @@ impl DB3ClientCommand { println!("send delete document done"); let mut table = Table::new(); table.set_format(*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR); - table.set_titles(row!["transaction id"]); + table.set_titles(row!["mutation id"]); table.add_row(row![tx_id.to_base64()]); Ok(table) } diff --git a/src/cmd/src/deposit.rs b/src/cmd/src/deposit.rs index 7c70d750..5bc3d85a 100644 --- a/src/cmd/src/deposit.rs +++ b/src/cmd/src/deposit.rs @@ -15,13 +15,12 @@ // limitations under the License. // -use ethers::providers::Middleware; use ethers::{ contract::abigen, core::types::{Address, U256}, middleware::SignerMiddleware, providers::{Provider, Ws}, - signers::{LocalWallet, Signer}, + signers::LocalWallet, }; use eyre::Result; @@ -51,16 +50,15 @@ pub async fn lock_balance( let provider_arc = Arc::new(provider); let token_address = erc20_token_addr.parse::
().unwrap(); let rollup_address = rollup_addr.parse::
().unwrap(); - let my_address = wallet.address(); let signable_client = SignerMiddleware::new(provider_arc.clone(), wallet); let client = Arc::new(signable_client); let token_contract = DB3TokenContract::new(token_address, client.clone()); let approve_amount = U256::from(100_000_000_000 as u64); // 10 db3 let approve_request = token_contract.approve(rollup_address, approve_amount); - let result = approve_request.send().await; + let _result = approve_request.send().await; let rollup_contract = DB3RollupContract::new(rollup_address, client); let deposit_amount = U256::from((amount * 1000_000_000.0) as u64); let deposit_request = rollup_contract.deposit(deposit_amount); - let result = deposit_request.send().await; + let _result = deposit_request.send().await; Ok(()) } diff --git a/src/cmd/src/show_evm_account.rs b/src/cmd/src/show_evm_account.rs index 6bb82c70..01432931 100644 --- a/src/cmd/src/show_evm_account.rs +++ b/src/cmd/src/show_evm_account.rs @@ -17,9 +17,8 @@ use ethers::{ contract::abigen, - core::types::{Address, TransactionRequest, U256}, + core::types::{Address, U256}, providers::{Provider, Ws}, - signers::{LocalWallet, Signer}, }; use eyre::Result; diff --git a/src/crypto/Cargo.toml b/src/crypto/Cargo.toml index d0227bb7..e355bedb 100644 --- a/src/crypto/Cargo.toml +++ b/src/crypto/Cargo.toml @@ -18,15 +18,17 @@ anyhow = "1.0.68" prost = "0.11" prost-types = "0.11" rand = "0.8.5" -bytes = "1" +#bytes = "1" +bytes = { version = "1.3.0", features = ["serde"] } hex = "0.4.3" base64ct = { version = "1.5.3", features = ["alloc"] } schemars ="0.8.10" serde = { version = "1.0.144", features = ["derive"] } serde-name = "0.2.1" thiserror = "1.0.34" -serde_bytes = "0.11.7" -serde_json = "1.0.88" +#serde_bytes = "0.11.7" +#serde_json = "1.0.88" +serde_json = { version = "1.0.64", default-features = false } serde_with = "2.1.0" serde_repr = "0.1" signature = "1.6.0" @@ -38,7 +40,7 @@ slip10_ed25519 = "0.1.3" byteorder = "1.4.3" rust_secp256k1 = { version = "0.24.0", package = "secp256k1", features = ["bitcoin_hashes"] } bip32 = "0.4.0" - +ethers = { workspace = true } [dev-dependencies] tiny-bip39 = "1.0.0" diff --git a/src/crypto/src/db3_keypair.rs b/src/crypto/src/db3_keypair.rs index 0fe860ba..b7aaa3ce 100644 --- a/src/crypto/src/db3_keypair.rs +++ b/src/crypto/src/db3_keypair.rs @@ -52,6 +52,14 @@ impl DB3KeyPair { DB3KeyPair::Secp256k1(kp) => DB3PublicKey::Secp256k1(kp.public().clone()), } } + pub fn try_sign_hashed_message(&self, msg: &[u8]) -> std::result::Result, DB3Error> { + match self { + DB3KeyPair::Ed25519(_) => Err(DB3Error::SignError( + "signing hashed message is not supperted with ed25519".to_string(), + )), + DB3KeyPair::Secp256k1(kp) => Secp256k1DB3Signature::new_hashed(&kp, msg), + } + } } impl Signer for DB3KeyPair { @@ -167,6 +175,7 @@ mod tests { use crate::db3_signature::DB3Signature; use crate::key_derive; use bip39::{Language, Mnemonic, Seed}; + use fastcrypto::hash::{HashFunction, Sha3_256}; use hex; #[test] fn keypair_smoke_test_secp256k1() { @@ -256,21 +265,4 @@ mod tests { let is_ok = ts_signature_ret.unwrap().verify(msg.as_ref()); assert_eq!(true, is_ok.is_ok()); } - - #[test] - fn test_generate_evm_address() { - let private_key_hex = "ad689d9b7751da07b0fb39c5091672cbfe50f59131db015f8a0e76c9790a6fcc"; - let data = hex::decode(private_key_hex).unwrap(); - let result = Secp256k1PrivateKey::from_bytes(data.as_ref()); - assert!(result.is_ok()); - let kp = Secp256k1KeyPair::from(result.unwrap()); - let db3_kp = DB3KeyPair::Secp256k1(kp); - let msg: [u8; 1] = [0; 1]; - let signature = db3_kp.try_sign(&msg).unwrap(); - let db3_address = signature.verify(&msg).unwrap(); - assert_eq!( - "\"0x94fdb38548dca0df72b2ff43ce0d77a8867dd123\"", - serde_json::to_string(&db3_address).unwrap() - ); - } } diff --git a/src/crypto/src/db3_signature.rs b/src/crypto/src/db3_signature.rs index 7577eeb9..6b2478c7 100644 --- a/src/crypto/src/db3_signature.rs +++ b/src/crypto/src/db3_signature.rs @@ -27,6 +27,7 @@ use fastcrypto::secp256k1::{Secp256k1KeyPair, Secp256k1PublicKey, Secp256k1Signa use fastcrypto::traits::KeyPair as KeypairTraits; use fastcrypto::traits::{Authenticator, ToFromBytes, VerifyingKey}; use fastcrypto::Verifier; +use rust_secp256k1::{Message, Secp256k1}; use schemars::JsonSchema; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_with::{serde_as, Bytes}; @@ -169,6 +170,24 @@ pub struct Secp256k1DB3Signature( [u8; Secp256k1PublicKey::LENGTH + Secp256k1Signature::LENGTH + 1], ); +impl Secp256k1DB3Signature { + pub fn new_hashed(kp: &Secp256k1KeyPair, msg: &[u8]) -> Result> { + let secp = Secp256k1::signing_only(); + let message = Message::from_slice(msg) + .map_err(|e| DB3Error::InvalidSignature(format!("bad message for {e}")))?; + let sig = secp.sign_ecdsa_recoverable(&message, &kp.secret.privkey); + let (recovery_id, sig) = sig.serialize_compact(); + let mut signature_bytes: Vec = + Vec::with_capacity(Secp256k1PublicKey::LENGTH + Secp256k1Signature::LENGTH + 1); + let scheme = SignatureScheme::Secp256k1; + signature_bytes.extend_from_slice(&[scheme.flag()]); + signature_bytes.extend_from_slice(&sig); + signature_bytes.extend_from_slice(&[recovery_id.to_i32() as u8]); + signature_bytes.extend_from_slice(kp.public().as_ref()); + Ok(signature_bytes) + } +} + impl AsRef<[u8]> for Secp256k1DB3Signature { fn as_ref(&self) -> &[u8] { self.0.as_ref() diff --git a/src/crypto/src/db3_signer.rs b/src/crypto/src/db3_signer.rs index 854cbefc..28f1a749 100644 --- a/src/crypto/src/db3_signer.rs +++ b/src/crypto/src/db3_signer.rs @@ -19,6 +19,7 @@ use crate::db3_address::DB3Address; use crate::db3_keypair::DB3KeyPair; use crate::db3_signature::Signature; use db3_error::{DB3Error, Result}; +use ethers::core::types::transaction::eip712::{Eip712, TypedData}; use signature::Signer; pub struct Db3MultiSchemeSigner { @@ -39,6 +40,13 @@ impl Db3MultiSchemeSigner { Ok(signature) } + pub fn sign_typed_data(&self, typed_data: &TypedData) -> Result> { + let hashed = typed_data.encode_eip712().map_err(|e| { + DB3Error::SignError(format!("fail to generate typed data hash for {e}")) + })?; + self.kp.try_sign_hashed_message(&hashed) + } + pub fn get_address(&self) -> Result { let pk = self.kp.public(); Ok(DB3Address::from(&pk)) @@ -49,13 +57,18 @@ impl Db3MultiSchemeSigner { mod tests { use super::*; use crate::db3_signature::DB3Signature; + use crate::db3_verifier::DB3Verifier; use crate::key_derive; use crate::signature_scheme::SignatureScheme; use bytes::BytesMut; use db3_proto::db3_base_proto::{BroadcastMeta, ChainId, ChainRole}; - use db3_proto::db3_mutation_proto::{DatabaseAction, DatabaseMutation}; - + use db3_proto::db3_mutation_proto::{DatabaseAction, DatabaseMutation, PayloadType}; + use ethers::core::types::transaction::eip712::{EIP712Domain, Types}; + use ethers::core::types::Bytes; + use hex; use prost::Message; + use std::collections::BTreeMap; + fn db3_signer_smoke_test(scheme: &SignatureScheme) { let meta = BroadcastMeta { //TODO get from network @@ -93,6 +106,87 @@ mod tests { ); } + fn db3_signer_typed_data(scheme: &SignatureScheme) -> Result<()> { + let meta = BroadcastMeta { + //TODO get from network + nonce: 1, + //TODO use config + chain_id: ChainId::DevNet.into(), + //TODO use config + chain_role: ChainRole::StorageShardChain.into(), + }; + let dm = DatabaseMutation { + meta: Some(meta), + collection_mutations: vec![], + db_address: vec![], + action: DatabaseAction::CreateDb.into(), + document_mutations: vec![], + }; + + let mut payload = BytesMut::with_capacity(1024 * 8); + dm.encode(&mut payload) + .map_err(|e| DB3Error::SignError(format!("{e}"))) + .unwrap(); + let payload = payload.freeze(); + let json = serde_json::json!({ + "EIP712Domain": [ + ], + "Message":[ + {"name":"payload", "type":"bytes"}, + {"name":"payloadType", "type":"string"} + ] + }); + let types: Types = serde_json::from_value(json).unwrap(); + let payload: String = format!("{}", Bytes::from(payload)); + assert_eq!(2, types.len()); + let mut message: BTreeMap = BTreeMap::new(); + message.insert( + "payload".to_string(), + serde_json::Value::from(payload.as_str()), + ); + message.insert( + "payloadType".to_string(), + serde_json::Value::from(format!("{}", PayloadType::DatabasePayload as i32)), + ); + let typed_data = TypedData { + domain: EIP712Domain { + name: None, + version: None, + chain_id: None, + verifying_contract: None, + salt: None, + }, + types, + primary_type: "Message".to_string(), + message, + }; + let seed: [u8; 32] = [2; 32]; + let (address, keypair) = + key_derive::derive_key_pair_from_path(&seed, None, scheme).unwrap(); + println!("keypair pk {:?}", keypair.public().as_ref()); + let signer = Db3MultiSchemeSigner::new(keypair); + let signature = signer.sign_typed_data(&typed_data)?; + let hashed_message: [u8; 32] = typed_data.encode_eip712().unwrap(); + let account_id = DB3Verifier::verify_hashed(&hashed_message, signature.as_ref())?; + if account_id.addr != address { + Err(DB3Error::SignMessageError("bad signature".to_string())) + } else { + Ok(()) + } + } + + #[test] + fn db3_signer_ed25519_typed_data_smoke_test() { + assert!(db3_signer_typed_data(&SignatureScheme::ED25519).is_err()); + } + + #[test] + fn db3_signer_secp256k1_typed_data_smoke_test() { + if !db3_signer_typed_data(&SignatureScheme::Secp256k1).is_ok() { + assert!(false); + } + } + #[test] fn db3_signer_ed25519_smoke_test() { db3_signer_smoke_test(&SignatureScheme::ED25519); @@ -102,4 +196,49 @@ mod tests { fn db3_signer_secp256k1_smoke_test() { db3_signer_smoke_test(&SignatureScheme::Secp256k1); } + + #[test] + fn test_metamask_signature() { + let signature_hex = "0175fc4a2894184c644b0369a9fbea2425c86ffbd7b60a23d07ce3c25a394ebb05673e5970ad937c5d949f2c34e30c69345e345451fe1f40283cd85d2b00220c9e00032d4ebab1b807ed6b326f88fb44c68e674103a9c39c371592eec75c5f7955f419"; + let hashed_msg_hex = "fee42a65dfa333a4f14c957d51bf69518c241b0e16d0df524a0dc0cdce19cf25"; + let json = serde_json::json!({ + "EIP712Domain": [ + ], + "Message":[ + {"name":"payload", "type":"bytes"}, + {"name":"payloadType", "type":"string"} + ] + }); + let types: Types = serde_json::from_value(json).unwrap(); + let mut message: BTreeMap = BTreeMap::new(); + let payload = "0x48656c6c6f20776f726c6421"; + message.insert( + "payload".to_string(), + serde_json::Value::from(payload.to_string()), + ); + message.insert( + "payloadType".to_string(), + serde_json::Value::from("1".to_string()), + ); + let typed_data = TypedData { + domain: EIP712Domain { + name: None, + version: None, + chain_id: None, + verifying_contract: None, + salt: None, + }, + types, + primary_type: "Message".to_string(), + message, + }; + let hashed_message: [u8; 32] = typed_data.encode_eip712().unwrap(); + assert_eq!(hex::encode(&hashed_message).as_str(), hashed_msg_hex); + let signature = hex::decode(signature_hex).unwrap(); + let result = DB3Verifier::verify_hashed(&hashed_message, signature.as_ref()); + assert!(result.is_ok()); + let account = result.unwrap(); + println!("{}", serde_json::to_string(&account.addr).unwrap()); + println!("0x2df74619717c29a7253455e5767f4d992cfb6e3e"); + } } diff --git a/src/crypto/src/db3_verifier.rs b/src/crypto/src/db3_verifier.rs index 8247b297..1a76649a 100644 --- a/src/crypto/src/db3_verifier.rs +++ b/src/crypto/src/db3_verifier.rs @@ -16,9 +16,13 @@ // limitations under the License. // -use crate::account_id::AccountId; +use crate::db3_address::DB3Address; +use crate::db3_public_key::DB3PublicKey; use crate::db3_signature::{DB3Signature, Signature}; +use crate::id::AccountId; +use crate::signature_scheme::SignatureScheme; use db3_error::{DB3Error, Result}; +use fastcrypto::secp256k1::Secp256k1Signature; use signature::Signature as _; pub struct DB3Verifier {} @@ -30,6 +34,31 @@ impl DB3Verifier { let db3_address = signature.verify(&msg)?; Ok(AccountId::new(db3_address)) } + + pub fn verify_hashed(hashed: &[u8], signature_raw: &[u8]) -> Result { + let signature = Signature::from_bytes(signature_raw) + .map_err(|e| DB3Error::InvalidSignature(format!("fail to generate signature {e}")))?; + let spk = + DB3PublicKey::try_from_bytes(SignatureScheme::Secp256k1, signature.public_key_bytes()) + .map_err(|e| DB3Error::InvalidSignature(format!("bad public key {e}")))?; + let sig = Secp256k1Signature::from_bytes(signature.signature_bytes()) + .map_err(|e| DB3Error::InvalidSignature(format!("bad signature scheme {e}")))?; + if let Signature::Secp256k1DB3Signature(_) = signature { + let db3_address = DB3Address::from(&spk); + if let DB3PublicKey::Secp256k1(internal_pk) = spk { + internal_pk.verify_hashed(hashed, &sig).map_err(|e| { + DB3Error::InvalidSignature(format!("invalid hashed message for {e}")) + })?; + Ok(AccountId::new(db3_address)) + } else { + Err(DB3Error::InvalidSignature("bad signature".to_string())) + } + } else { + Err(DB3Error::InvalidSignature( + "bad signature secp256k1 expected".to_string(), + )) + } + } } #[cfg(test)] diff --git a/src/crypto/src/id.rs b/src/crypto/src/id.rs index 3063c1ed..0950ea09 100644 --- a/src/crypto/src/id.rs +++ b/src/crypto/src/id.rs @@ -23,7 +23,7 @@ use db3_error::{DB3Error, Result}; use fastcrypto::hash::{HashFunction, Sha3_256}; use rust_secp256k1::hashes::{sha256, Hash}; use rust_secp256k1::ThirtyTwoByteHash; -use std::{fmt, mem}; +use std::fmt; // it's ethereum compatiable account id #[derive(Eq, Default, PartialEq, Ord, PartialOrd, Copy, Clone)] @@ -296,6 +296,7 @@ impl DocumentId { pub fn to_base64(&self) -> String { base64ct::Base64::encode_string(self.as_ref()) } + pub fn try_from_base64(input: &str) -> std::result::Result { Self::try_from_bytes(base64ct::Base64::decode_vec(input).unwrap().as_slice()) } diff --git a/src/faucet/Cargo.toml b/src/faucet/Cargo.toml index e332ff86..51ceb786 100644 --- a/src/faucet/Cargo.toml +++ b/src/faucet/Cargo.toml @@ -11,7 +11,7 @@ keywords = ["database", "web3", "db3"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -ethers = { version = "1.0.0", features = ["ws"] } +ethers = {workspace=true, features=['ws']} eyre = "0.6" serde = { version = "1.0.144", features = ["derive"] } serde_json = "1.0.64" diff --git a/src/node/Cargo.toml b/src/node/Cargo.toml index 8c00dcef..35f77b6a 100644 --- a/src/node/Cargo.toml +++ b/src/node/Cargo.toml @@ -29,7 +29,7 @@ db3-sdk={path="../sdk", version="0.1.0"} db3-session={path="../session", version="0.1.0"} db3-bridge={path="../bridge", version="0.1.0"} db3-faucet={path="../faucet", version="0.1.0"} -ethers = { version = "1.0.0", features = ["ws"] } +ethers = { workspace = true } tracing = "0.1" tracing-subscriber = "0.3" fastcrypto = { workspace = true, features = ["copy_key"] } @@ -47,6 +47,7 @@ tonic-web = "0.5.0" prost = "0.11" prost-types = "0.11" tokio = { version = "1.17.0", features = ["full"] } +tokio-stream = "0.1.12" clap = { version = "4.0.20", features = ["derive"] } subtle-encoding = { version = "0.5", default-features = false, features = ["bech32-preview"] } http = "0.2" diff --git a/src/node/src/abci_impl.rs b/src/node/src/abci_impl.rs index 98139990..62de0ad9 100644 --- a/src/node/src/abci_impl.rs +++ b/src/node/src/abci_impl.rs @@ -19,25 +19,53 @@ use shadow_rs::shadow; shadow!(build); use crate::node_storage::NodeStorage; use bytes::Bytes; -use db3_crypto::{db3_address::DB3Address as AccountAddress, db3_verifier, id::TxId}; +use db3_crypto::{ + db3_address::DB3Address as AccountAddress, db3_verifier, id::AccountId, id::TxId, +}; +use db3_error::{DB3Error, Result}; use db3_proto::db3_mutation_proto::{ DatabaseMutation, MintCreditsMutation, PayloadType, WriteRequest, }; use db3_proto::db3_session_proto::{QuerySession, QuerySessionInfo}; use db3_session::query_session_verifier; -use fastcrypto::encoding::{Base64, Encoding}; +use ethers::core::types::transaction::eip712::{Eip712, TypedData}; +use ethers::core::types::Bytes as EthersBytes; use hex; use prost::Message; use std::pin::Pin; +use std::str::FromStr; use std::sync::{Arc, Mutex}; use tendermint_abci::Application; use tendermint_proto::abci::{ - Event, RequestBeginBlock, RequestCheckTx, RequestDeliverTx, RequestInfo, RequestQuery, + RequestBeginBlock, RequestCheckTx, RequestDeliverTx, RequestInfo, RequestQuery, ResponseBeginBlock, ResponseCheckTx, ResponseCommit, ResponseDeliverTx, ResponseInfo, ResponseQuery, }; use tracing::{debug, info, span, warn, Level}; +macro_rules! parse_mutation { + ($func:ident, $type:ident) => { + fn $func(&self, payload: &[u8]) -> Result<$type> { + match $type::decode(payload) { + Ok(dm) => match &dm.meta { + Some(_) => Ok(dm), + None => { + warn!("no meta for mutation"); + Err(DB3Error::ApplyMutationError("meta is none".to_string())) + } + }, + Err(e) => { + //TODO add event ? + warn!("invalid mutation data {e}"); + Err(DB3Error::ApplyMutationError( + "invalid mutation data".to_string(), + )) + } + } + } + }; +} + #[derive(Clone)] pub struct AbciImpl { node_store: Arc>>>, @@ -56,6 +84,99 @@ impl AbciImpl { pending_credits: Arc::new(Mutex::new(Vec::new())), } } + + parse_mutation!(parse_database_mutation, DatabaseMutation); + parse_mutation!(parse_mint_credits_mutation, MintCreditsMutation); + parse_mutation!(parse_query_session, QuerySession); + + fn unwrap_and_verify( + &self, + req: WriteRequest, + ) -> Result<(EthersBytes, PayloadType, AccountId)> { + if req.payload_type == 3 { + // typed data + match serde_json::from_slice::(req.payload.as_ref()) { + Ok(data) => { + let hashed_message = data.encode_eip712().map_err(|e| { + DB3Error::ApplyMutationError(format!("invalid payload type for err {e}")) + })?; + let account_id = db3_verifier::DB3Verifier::verify_hashed( + &hashed_message, + req.signature.as_ref(), + )?; + if let (Some(payload), Some(payload_type)) = + (data.message.get("payload"), data.message.get("payloadType")) + { + //TODO advoid data copy + let data: EthersBytes = + serde_json::from_value(payload.clone()).map_err(|e| { + DB3Error::ApplyMutationError(format!( + "invalid payload type for err {e}" + )) + })?; + let internal_data_type = i32::from_str(payload_type.as_str().ok_or( + DB3Error::QuerySessionVerifyError("invalid payload type".to_string()), + )?) + .map_err(|e| { + DB3Error::QuerySessionVerifyError(format!( + "fail to convert payload type to i32 {e}" + )) + })?; + let data_type: PayloadType = PayloadType::from_i32(internal_data_type) + .ok_or(DB3Error::ApplyMutationError( + "invalid payload type".to_string(), + ))?; + info!("account {} apply mutaion check done", account_id.to_hex()); + Ok((data, data_type, account_id)) + } else { + Err(DB3Error::ApplyMutationError("bad typed data".to_string())) + } + } + Err(e) => Err(DB3Error::ApplyMutationError(format!( + "bad typed data for err {e}" + ))), + } + } else { + let account_id = + db3_verifier::DB3Verifier::verify(req.payload.as_ref(), req.signature.as_ref())?; + let data_type: PayloadType = PayloadType::from_i32(req.payload_type).ok_or( + DB3Error::ApplyMutationError("invalid payload type".to_string()), + )?; + let data = Bytes::from(req.payload); + info!("account {} apply mutaion check done", account_id.to_hex()); + Ok((EthersBytes(data), data_type, account_id)) + } + } + + fn build_check_response(&self, ok: bool, msg: &str) -> ResponseCheckTx { + if ok { + ResponseCheckTx { + code: 0, + ..Default::default() + } + } else { + ResponseCheckTx { + code: 1, + log: msg.to_string(), + ..Default::default() + } + } + } + + fn build_delivered_response(&self, ok: bool, msg: &str) -> ResponseDeliverTx { + if ok { + ResponseDeliverTx { + code: 0, + ..Default::default() + } + } else { + ResponseDeliverTx { + code: 1, + log: msg.to_string(), + ..Default::default() + } + } + } } impl Application for AbciImpl { @@ -107,246 +228,162 @@ impl Application for AbciImpl { } fn check_tx(&self, request: RequestCheckTx) -> ResponseCheckTx { - // decode the request - match WriteRequest::decode(request.tx.as_ref()) { - Ok(request) => match db3_verifier::DB3Verifier::verify( - request.payload.as_ref(), - request.signature.as_ref(), - ) { - Ok(_) => { - let payload_type = PayloadType::from_i32(request.payload_type); - match payload_type { - Some(PayloadType::DatabasePayload) => { - match DatabaseMutation::decode(request.payload.as_ref()) { - Ok(dm) => match &dm.meta { - Some(_) => { - return ResponseCheckTx { - code: 0, - data: Bytes::new(), - log: "".to_string(), - info: "".to_string(), - gas_wanted: 1, - gas_used: 0, - events: vec![], - codespace: "".to_string(), - ..Default::default() - }; - } - None => { - //TODO add event - warn!("no meta for database mutation"); - } - }, - Err(_) => { - //TODO add event ? - warn!("invalid database byte data"); - } + let wrequest = WriteRequest::decode(request.tx.as_ref()); + match wrequest { + Ok(req) => match self.unwrap_and_verify(req) { + Ok((data, data_type, _)) => match data_type { + PayloadType::DatabasePayload => { + match self.parse_database_mutation(data.as_ref()) { + Ok(_) => { + return self.build_check_response(true, ""); } - } - - Some(PayloadType::MintCreditsPayload) => { - match MintCreditsMutation::decode(request.payload.as_ref()) { - Ok(_) => { - return ResponseCheckTx { - code: 0, - data: Bytes::new(), - log: "".to_string(), - info: "".to_string(), - gas_wanted: 1, - gas_used: 0, - events: vec![], - codespace: "".to_string(), - ..Default::default() - }; - } - Err(e) => { - warn!("invalid mint credist mutation has been checked for error {}", e); - } + Err(e) => { + warn!("fail to parse mutation for err {e}"); + let msg = format!("{e}"); + return self.build_check_response(false, msg.as_str()); } } - - Some(PayloadType::QuerySessionPayload) => { - match QuerySession::decode(request.payload.as_ref()) { - Ok(query_session) => { - match query_session_verifier::verify_query_session( - &query_session, - ) { - Ok(_) => { - return ResponseCheckTx { - code: 0, - data: Bytes::new(), - log: "".to_string(), - info: "".to_string(), - gas_wanted: 1, - gas_used: 0, - events: vec![], - codespace: "".to_string(), - ..Default::default() - }; - } - Err(e) => { - warn!( - "invalid transaction has been checked for error {}", - e - ); - } + } + PayloadType::QuerySessionPayload => { + match self.parse_query_session(data.as_ref()) { + Ok(qs) => { + match query_session_verifier::verify_query_session( + qs.payload.as_ref(), + qs.payload_type, + qs.client_signature.as_ref(), + ) { + Ok(_) => { + return self.build_check_response(true, ""); + } + Err(e) => { + let msg = format!("{e}"); + return self.build_check_response(false, msg.as_str()); } - } - Err(e) => { - warn!("invalid transaction has been checked for error {}", e); } } + Err(e) => { + warn!("fail to parse query session for err {e}"); + let msg = format!("{e}"); + return self.build_check_response(false, msg.as_str()); + } } - _ => { - warn!("invalid transaction with null payload type"); + } + PayloadType::MintCreditsPayload => { + match self.parse_mint_credits_mutation(data.as_ref()) { + Ok(_) => { + return self.build_check_response(true, ""); + } + Err(e) => { + warn!("fail to parse mint credits for err {e}"); + let msg = format!("{e}"); + return self.build_check_response(false, msg.as_str()); + } } } - } + _ => { + warn!("bad mutaion payload type"); + return self.build_check_response(false, "bad mutation payload"); + } + }, Err(e) => { - let payload: &[u8] = request.payload.as_ref(); - let signature: &[u8] = request.signature.as_ref(); - warn!("invalid transaction has been checked for error {}", e); - warn!( - "payload {}, signature {}", - Base64::encode(payload), - Base64::encode(signature) - ); + let msg = format!("{e}"); + warn!("verify request err {e}"); + return self.build_check_response(false, msg.as_str()); } }, Err(e) => { - warn!("fail to decode WriteRequest for error {}", e); + let msg = format!("{e}"); + warn!("bad request err {e}"); + return self.build_check_response(false, msg.as_str()); } } - // the tx should be removed from mempool - return ResponseCheckTx { - code: 1, - data: Bytes::new(), - log: "bad request".to_string(), - info: "".to_string(), - gas_wanted: 1, - gas_used: 0, - events: vec![], - codespace: "".to_string(), - ..Default::default() - }; } fn deliver_tx(&self, request: RequestDeliverTx) -> ResponseDeliverTx { //TODO match the hash fucntion with tendermint let tx_id = TxId::from(request.tx.as_ref()); - if let Ok(wrequest) = WriteRequest::decode(request.tx.as_ref()) { - if let Ok(account_id) = db3_verifier::DB3Verifier::verify( - wrequest.payload.as_ref(), - wrequest.signature.as_ref(), - ) { - let payload_type = PayloadType::from_i32(wrequest.payload_type); - match payload_type { - Some(PayloadType::MintCreditsPayload) => { - if let Ok(mint_credits) = - MintCreditsMutation::decode(wrequest.payload.as_ref()) - { - match self.pending_credits.lock() { + let wrequest = WriteRequest::decode(request.tx.as_ref()); + match wrequest { + Ok(req) => match self.unwrap_and_verify(req) { + Ok((data, data_type, account_id)) => match data_type { + PayloadType::DatabasePayload => { + match self.parse_database_mutation(data.as_ref()) { + Ok(dm) => match self.pending_databases.lock() { Ok(mut s) => { - info!("put mint credits request to queue"); - s.push((account_id.addr, mint_credits, tx_id)); - return ResponseDeliverTx { - code: 0, - data: Bytes::new(), - log: "".to_string(), - info: "apply_mint_credits".to_string(), - gas_wanted: 0, - gas_used: 0, - events: vec![Event { - r#type: "apply".to_string(), - attributes: vec![], - }], - codespace: "".to_string(), - }; + s.push((account_id.addr, dm, tx_id)); + return self.build_delivered_response(true, ""); } - _ => {} + _ => { + todo!(); + } + }, + Err(e) => { + let msg = format!("{e}"); + return self.build_delivered_response(false, msg.as_str()); } - } else { - warn!("fail to decode mint credits"); } } - Some(PayloadType::DatabasePayload) => { - if let Ok(dr) = DatabaseMutation::decode(wrequest.payload.as_ref()) { - match self.pending_databases.lock() { - Ok(mut s) => { - s.push((account_id.addr, dr, tx_id)); - return ResponseDeliverTx { - code: 0, - data: Bytes::new(), - log: "".to_string(), - info: "apply_database".to_string(), - gas_wanted: 0, - gas_used: 0, - events: vec![Event { - r#type: "apply".to_string(), - attributes: vec![], - }], - codespace: "".to_string(), - }; + PayloadType::QuerySessionPayload => { + match self.parse_query_session(data.as_ref()) { + Ok(qs) => match ( + self.pending_query_session.lock(), + query_session_verifier::verify_query_session( + qs.payload.as_ref(), + qs.payload_type, + qs.client_signature.as_ref(), + ), + ) { + (Ok(mut s), Ok((qsi, client_id))) => { + s.push(( + client_id.addr, // the client address + account_id.addr, // the query service provider addree + tx_id, + qsi, + )); + return self.build_delivered_response(true, ""); } _ => { todo!(); } + }, + Err(e) => { + let msg = format!("{e}"); + return self.build_delivered_response(false, msg.as_str()); } } } - Some(PayloadType::QuerySessionPayload) => { - if let Ok(query_session) = QuerySession::decode(wrequest.payload.as_ref()) { - if let Ok((client_account_id, _)) = - query_session_verifier::verify_query_session(&query_session) - { - match self.pending_query_session.lock() { - Ok(mut s) => { - //TODO check the node query session info - s.push(( - client_account_id.addr, - account_id.addr, - tx_id, - query_session.node_query_session_info.unwrap(), - )); - return ResponseDeliverTx { - code: 0, - data: Bytes::new(), - log: "".to_string(), - info: "deliver_query_session".to_string(), - gas_wanted: 0, - gas_used: 0, - events: vec![Event { - r#type: "deliver".to_string(), - attributes: vec![], - }], - codespace: "".to_string(), - }; - } - Err(_) => todo!(), + PayloadType::MintCreditsPayload => { + match self.parse_mint_credits_mutation(data.as_ref()) { + Ok(mm) => match self.pending_credits.lock() { + Ok(mut s) => { + s.push((account_id.addr, mm, tx_id)); + return self.build_delivered_response(true, ""); + } + Err(e) => { + let msg = format!("{e}"); + return self.build_delivered_response(false, msg.as_str()); } + }, + Err(e) => { + let msg = format!("{e}"); + return self.build_delivered_response(false, msg.as_str()); } } } _ => { - warn!("invalid transaction with null payload type"); + return self.build_delivered_response(false, "bad mutation payload"); } + }, + Err(e) => { + let msg = format!("{e}"); + return self.build_delivered_response(false, msg.as_str()); } + }, + Err(e) => { + let msg = format!("{e}"); + return self.build_delivered_response(false, msg.as_str()); } } - warn!("invalid transaction has been checked"); - ResponseDeliverTx { - code: 1, - data: Bytes::new(), - log: "".to_string(), - info: "".to_string(), - gas_wanted: 0, - gas_used: 0, - events: vec![Event { - r#type: "deliver".to_string(), - attributes: vec![], - }], - codespace: "".to_string(), - } } fn commit(&self) -> ResponseCommit { diff --git a/src/node/src/auth_storage.rs b/src/node/src/auth_storage.rs index 96b0066c..3b13bdd1 100644 --- a/src/node/src/auth_storage.rs +++ b/src/node/src/auth_storage.rs @@ -227,6 +227,7 @@ impl AuthStorage { owner: addr.to_vec(), to: query_addr.to_vec(), }; + //TODO account query service gas fee self.network_state .total_session_count .fetch_add(1, std::sync::atomic::Ordering::Relaxed); diff --git a/src/node/src/command.rs b/src/node/src/command.rs index 5e815080..35c10b5e 100644 --- a/src/node/src/command.rs +++ b/src/node/src/command.rs @@ -228,10 +228,10 @@ impl DB3Command { } let kp = db3_cmd::keystore::KeyStore::get_keypair(None).unwrap(); let signer = Db3MultiSchemeSigner::new(kp); - let mutation_sdk = MutationSDK::new(node.clone(), signer); + let mutation_sdk = MutationSDK::new(node.clone(), signer, true); let kp = db3_cmd::keystore::KeyStore::get_keypair(None).unwrap(); let signer = Db3MultiSchemeSigner::new(kp); - let store_sdk = StoreSDK::new(node, signer); + let store_sdk = StoreSDK::new(node, signer, true); DB3ClientContext { mutation_sdk: Some(mutation_sdk), store_sdk: Some(store_sdk), @@ -666,10 +666,6 @@ mod tests { // r#"{"name": "Mike","age": 44,"phones": ["+44 1234567","+44 2345678"]}"#.to_string(), // r#"{"name": "Bill","age": 44,"phones": ["+44 1234567","+44 2345678"]}"#.to_string(), // r#"{"name": "Bill","age": 45,"phones": ["+44 1234567","+44 2345678"]}"#.to_string(), - let mut doc_id1 = String::new(); - let mut doc_id2 = String::new(); - let mut doc_id3 = String::new(); - let mut doc_id4 = String::new(); let cmd = DB3ClientCommand::ShowDocument { addr: addr.clone(), collection_name: collection_books.to_string(), @@ -678,10 +674,9 @@ mod tests { }; let table = cmd.execute(&mut ctx).await.unwrap(); assert_eq!(4, table.len()); - doc_id1 = table.get_row(0).unwrap().get_cell(0).unwrap().get_content(); - doc_id2 = table.get_row(1).unwrap().get_cell(0).unwrap().get_content(); - doc_id3 = table.get_row(2).unwrap().get_cell(0).unwrap().get_content(); - doc_id4 = table.get_row(3).unwrap().get_cell(0).unwrap().get_content(); + let doc_id2 = table.get_row(1).unwrap().get_cell(0).unwrap().get_content(); + let doc_id3 = table.get_row(2).unwrap().get_cell(0).unwrap().get_content(); + let doc_id4 = table.get_row(3).unwrap().get_cell(0).unwrap().get_content(); // run show document limit 2 let cmd = DB3ClientCommand::ShowDocument { diff --git a/src/node/src/hash_util.rs b/src/node/src/hash_util.rs deleted file mode 100644 index 8da84923..00000000 --- a/src/node/src/hash_util.rs +++ /dev/null @@ -1,25 +0,0 @@ -// -// hash_util.rs -// Copyright (C) 2022 db3.network Author imotai -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use db3_error::{DB3Error, Result}; -use subtle_encoding::base64; -use tendermint::hash::Hash; - -#[warn(dead_code)] -pub fn base64_to_hash(data: &str) -> Result { - let decoded = base64::decode(data).map_err(|_| DB3Error::HashCodecError)?; - Hash::try_from(decoded).map_err(|_| DB3Error::HashCodecError) -} diff --git a/src/node/src/lib.rs b/src/node/src/lib.rs index 41bc770e..8735f093 100644 --- a/src/node/src/lib.rs +++ b/src/node/src/lib.rs @@ -19,7 +19,6 @@ pub mod abci_impl; pub mod auth_storage; pub mod command; pub mod context; -mod hash_util; mod json_rpc; pub mod json_rpc_impl; pub mod node_key; diff --git a/src/node/src/storage_node_impl.rs b/src/node/src/storage_node_impl.rs index dd5722ce..3285e849 100644 --- a/src/node/src/storage_node_impl.rs +++ b/src/node/src/storage_node_impl.rs @@ -19,9 +19,11 @@ use super::context::Context; use db3_crypto::db3_address::DB3Address; use db3_crypto::db3_signer::Db3MultiSchemeSigner; use db3_crypto::{db3_verifier::DB3Verifier, id::DbId, id::DocumentId}; +use ethers::core::types::transaction::eip712::{Eip712, TypedData}; use bytes::BytesMut; -use db3_proto::db3_base_proto::{ChainId, ChainRole}; +use db3_proto::db3_base_proto::{BroadcastMeta, ChainId, ChainRole}; +use db3_proto::db3_event_proto::EventMessage; use db3_proto::db3_mutation_proto::{PayloadType, WriteRequest}; use db3_proto::db3_node_proto::{ storage_node_server::StorageNode, BroadcastRequest, BroadcastResponse, CloseSessionRequest, @@ -29,18 +31,19 @@ use db3_proto::db3_node_proto::{ GetDocumentResponse, GetSessionInfoRequest, GetSessionInfoResponse, NetworkStatus, OpenSessionRequest, OpenSessionResponse, QueryBillRequest, QueryBillResponse, RunQueryRequest, RunQueryResponse, ShowDatabaseRequest, ShowDatabaseResponse, ShowNetworkStatusRequest, + SubscribeRequest, }; -use db3_proto::db3_session_proto::{ - CloseSessionPayload, OpenSessionPayload, QuerySession, QuerySessionInfo, -}; +use db3_proto::db3_session_proto::{OpenSessionPayload, QuerySession, QuerySessionInfo}; use db3_session::query_session_verifier; use db3_session::session_manager::DEFAULT_SESSION_PERIOD; use db3_session::session_manager::DEFAULT_SESSION_QUERY_LIMIT; +use ethers::types::Bytes; use prost::Message; use std::boxed::Box; use std::sync::atomic::Ordering; use std::time::{SystemTime, UNIX_EPOCH}; use tendermint_rpc::Client; +use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; use tracing::info; @@ -60,6 +63,7 @@ impl StorageNodeImpl { #[tonic::async_trait] impl StorageNode for StorageNodeImpl { + type SubscribeStream = ReceiverStream>; async fn show_database( &self, request: Request, @@ -184,15 +188,44 @@ impl StorageNode for StorageNodeImpl { request: Request, ) -> std::result::Result, Status> { let r = request.into_inner(); - let account_id = DB3Verifier::verify(r.payload.as_ref(), r.signature.as_ref()) - .map_err(|e| Status::internal(format!("{:?}", e)))?; - let payload = OpenSessionPayload::decode(r.payload.as_ref()) - .map_err(|_| Status::internal("fail to decode open session request ".to_string()))?; - let header = payload.header; + let (account_id, session) = match r.payload_type { + // Typeddatapayload + 3 => { + info!("get open session request"); + let typed_data = serde_json::from_slice::(r.payload.as_ref()) + .map_err(|e| Status::internal(format!("bad typed data format for {e}")))?; + let hashed_message = typed_data.encode_eip712().map_err(|e| { + Status::internal(format!("encode typed data to hash error {e}")) + })?; + let account_id = DB3Verifier::verify_hashed(&hashed_message, r.signature.as_ref()) + .map_err(|e| Status::internal(format!("bad typed data signature for {e}")))?; + let typed_payload = typed_data + .message + .get("payload") + .ok_or(Status::internal("no typed payload was found".to_string()))?; + let binary_payload: Bytes = serde_json::from_value(typed_payload.to_owned()) + .map_err(|e| Status::internal(format!("invalid payload for err {e}")))?; + let session = OpenSessionPayload::decode(binary_payload.as_ref()).map_err(|e| { + Status::internal(format!("fail to decode open session request for {e} ")) + })?; + info!("session account {}", account_id.to_hex()); + (account_id, session) + } + // Querysessionpayload + _ => { + let account_id = DB3Verifier::verify(r.payload.as_ref(), r.signature.as_ref()) + .map_err(|e| Status::internal(format!("bad signature for {e}")))?; + let payload = OpenSessionPayload::decode(r.payload.as_ref()).map_err(|e| { + Status::internal(format!("fail to decode open session request for {e} ")) + })?; + (account_id, payload) + } + }; + let header = session.header; match self.context.node_store.lock() { Ok(mut node_store) => { let sess_store = node_store.get_session_store(); - match sess_store.add_new_session(&header, payload.start_time, account_id.addr) { + match sess_store.add_new_session(&header, session.start_time, account_id.addr) { Ok((session_token, query_session_info)) => { // Takes a reference and returns Option<&V> Ok(Response::new(OpenSessionResponse { @@ -208,64 +241,94 @@ impl StorageNode for StorageNodeImpl { Err(e) => Err(Status::internal(format!("{}", e))), } } + async fn close_query_session( &self, request: Request, ) -> std::result::Result, Status> { let r = request.into_inner(); - let client_query_session: &[u8] = r.payload.as_ref(); - let client_signature: &[u8] = r.signature.as_ref(); - DB3Verifier::verify(client_query_session, client_signature) - .map_err(|e| Status::internal(format!("{:?}", e)))?; - let payload = CloseSessionPayload::decode(r.payload.as_ref()) - .map_err(|_| Status::internal("fail to decode query_session_info ".to_string()))?; - let mut node_query_session_info: Option = None; - match self.context.node_store.lock() { + let (_, session) = match r.payload_type { + // Typeddatapayload + 3 => { + let typed_data = serde_json::from_slice::(r.payload.as_ref()) + .map_err(|e| Status::internal(format!("bad typed data format for {e}")))?; + let hashed_message = typed_data.encode_eip712().map_err(|e| { + Status::internal(format!("encode typed data to hash error {e}")) + })?; + let account_id = DB3Verifier::verify_hashed(&hashed_message, r.signature.as_ref()) + .map_err(|e| Status::internal(format!("bad typed data signature for {e}")))?; + let typed_payload = typed_data + .message + .get("payload") + .ok_or(Status::internal("no typed payload was found".to_string()))?; + let binary_payload: Bytes = serde_json::from_value(typed_payload.to_owned()) + .map_err(|e| Status::internal(format!("invalid payload for err {e}")))?; + let session = QuerySessionInfo::decode(binary_payload.as_ref()).map_err(|e| { + Status::internal(format!("fail to decode open session request for {e} ")) + })?; + (account_id, session) + } + // Querysessionpayload + _ => { + let account_id = DB3Verifier::verify(r.payload.as_ref(), r.signature.as_ref()) + .map_err(|e| Status::internal(format!("bad signature for {e}")))?; + let payload = QuerySessionInfo::decode(r.payload.as_ref()).map_err(|e| { + Status::internal(format!("fail to decode open session request for {e} ")) + })?; + (account_id, payload) + } + }; + let node_query_session_info = match self.context.node_store.lock() { Ok(mut node_store) => { let sess_store = node_store.get_session_store(); // Verify query session sdk - match sess_store.get_session_mut(&payload.session_token) { + match sess_store.get_session_mut(&r.session_token) { Some(sess) => { - let query_session_info = &payload.session_info.unwrap(); if !query_session_verifier::check_query_session_info( &sess.get_session_info(), - &query_session_info, + &session, ) { return Err(Status::invalid_argument(format!( "query session verify fail. expect query count {} but {}", sess.get_session_query_count(), - query_session_info.query_count + session.query_count ))); } } None => { return Err(Status::not_found(format!( "session {} not found in the session store", - payload.session_token + r.session_token ))); } } // Takes a reference and returns Option<&V> let sess = sess_store - .remove_session(&payload.session_token) + .remove_session(&r.session_token) .map_err(|e| Status::internal(format!("{}", e))) .unwrap(); - node_query_session_info = Some(sess.get_session_info()); + Some(sess.get_session_info()) } Err(e) => return Err(Status::internal(format!("{}", e))), - } + }; // Generate Nonce let nonce = match SystemTime::now().duration_since(UNIX_EPOCH) { Ok(n) => n.as_secs(), Err(_) => 0, }; - let query_session = QuerySession { + let meta = BroadcastMeta { + //TODO get from network nonce, - chain_id: ChainId::MainNet.into(), + //TODO use config + chain_id: ChainId::DevNet.into(), + //TODO use config chain_role: ChainRole::StorageShardChain.into(), - node_query_session_info: node_query_session_info.clone(), - client_query_session: client_query_session.to_vec().to_owned(), - client_signature: client_signature.to_vec().to_owned(), + }; + let query_session = QuerySession { + payload: r.payload.to_vec(), + client_signature: r.signature.to_vec(), + meta: Some(meta), + payload_type: r.payload_type, }; // Submit query session let mut mbuf = BytesMut::with_capacity(1024 * 4); @@ -273,28 +336,23 @@ impl StorageNode for StorageNodeImpl { Status::internal(format!("fail to submit query session with error {}", e)) })?; let mbuf = mbuf.freeze(); - let signature = self.signer.sign(mbuf.as_ref()).map_err(|e| { Status::internal(format!("fail to submit query session with error {e}")) })?; - let request = WriteRequest { signature: signature.as_ref().to_vec().to_owned(), payload: mbuf.as_ref().to_vec().to_owned(), payload_type: PayloadType::QuerySessionPayload.into(), }; - //TODO add the capacity to mutation sdk configuration let mut buf = BytesMut::with_capacity(1024 * 4); request.encode(&mut buf).map_err(|e| { Status::internal(format!("fail to submit query session with error {e}")) })?; - let buf = buf.freeze(); let r = BroadcastRequest { body: buf.as_ref().to_vec(), }; - let request = tonic::Request::new(r); let response = self .broadcast(request) @@ -434,6 +492,12 @@ impl StorageNode for StorageNodeImpl { Err(e) => Err(Status::internal(format!("{}", e))), } } + async fn subscribe( + &self, + _request: Request, + ) -> std::result::Result, Status> { + Err(Status::internal(format!(""))) + } } #[cfg(test)] diff --git a/src/node/tests/command_test.rs b/src/node/tests/command_test.rs index 56485494..445458e3 100644 --- a/src/node/tests/command_test.rs +++ b/src/node/tests/command_test.rs @@ -1,5 +1,4 @@ mod cmd_integration { - use db3_cmd::command::DB3ClientCommand; #[actix_web::test] async fn cmd_smoke_test() {} diff --git a/src/node/tests/node_test.rs b/src/node/tests/node_test.rs index cf965da1..f05d5021 100644 --- a/src/node/tests/node_test.rs +++ b/src/node/tests/node_test.rs @@ -2,46 +2,13 @@ mod node_integration { use bytes::BytesMut; - use db3_base::get_a_random_nonce; use db3_crypto::db3_signer::Db3MultiSchemeSigner; use db3_proto::db3_base_proto::{BroadcastMeta, ChainId, ChainRole}; - use db3_proto::db3_mutation_proto::{ - DatabaseAction, DatabaseMutation, PayloadType, WriteRequest, - }; - use db3_proto::db3_node_proto::storage_node_client::StorageNodeClient; - use db3_sdk::mutation_sdk::MutationSDK; - use db3_sdk::store_sdk::StoreSDK; + use db3_proto::db3_mutation_proto::{DatabaseAction, DatabaseMutation}; + use db3_proto::db3_mutation_proto::{PayloadType, WriteRequest}; use prost::Message; - use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; - use std::{thread, time}; use subtle_encoding::base64; - use tonic::transport::Endpoint; - - fn get_mutation_sdk() -> MutationSDK { - let public_grpc_url = "http://127.0.0.1:26659"; - db3_cmd::keystore::KeyStore::recover_keypair(None).unwrap(); - // create storage node sdk - let kp = db3_cmd::keystore::KeyStore::get_keypair(None).unwrap(); - let signer = Db3MultiSchemeSigner::new(kp); - let rpc_endpoint = Endpoint::new(public_grpc_url).unwrap(); - let channel = rpc_endpoint.connect_lazy(); - let client = Arc::new(StorageNodeClient::new(channel)); - // broadcast client - let sdk = MutationSDK::new(client, signer); - sdk - } - - fn get_store_sdk() -> StoreSDK { - let public_grpc_url = "http://127.0.0.1:26659"; - // create storage node sdk - let kp = db3_cmd::keystore::KeyStore::get_keypair(None).unwrap(); - let signer = Db3MultiSchemeSigner::new(kp); - let rpc_endpoint = Endpoint::new(public_grpc_url).unwrap(); - let channel = rpc_endpoint.connect_lazy(); - let client = Arc::new(StorageNodeClient::new(channel)); - StoreSDK::new(client, signer) - } fn current_seconds() -> u64 { match SystemTime::now().duration_since(UNIX_EPOCH) { diff --git a/src/proto/build.rs b/src/proto/build.rs index 23a38eaf..194f5e9a 100644 --- a/src/proto/build.rs +++ b/src/proto/build.rs @@ -31,6 +31,7 @@ fn main() { "proto/db3_database.proto", "proto/db3_message.proto", "proto/db3_faucet.proto", + "proto/db3_event.proto", ], &["proto"], ) diff --git a/src/proto/proto/db3_event.proto b/src/proto/proto/db3_event.proto new file mode 100644 index 00000000..d4169b57 --- /dev/null +++ b/src/proto/proto/db3_event.proto @@ -0,0 +1,84 @@ +// +// db3_event.proto +// Copyright (C) 2023 db3.network Author imotai +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +syntax = "proto3"; + +package db3_event_proto; + +enum EventType { + Block = 0; + Mutation = 1; + Query = 2; +} + +// the node will dispatch a block event when a new block has been proposed +message BlockEvent { + uint64 height= 1; + bytes block_hash = 2; + bytes app_hash = 3; + string chain_id = 4; + uint64 gas = 5; +} + +// the node will dispatch a mutation event when a mutation has been checked +message MutationEvent { + bytes sender = 1; + enum MutationEventStatus { + // the mutation has been deliveried + Deliveried = 0; + // not enough gas + OutOfGas = 1; + // invalid mutation + Invalid = 2; + } + enum ToAddressType { + // mutation for data + Database = 0; + // mutation for credit + Account = 1; + } + MutationEventStatus status = 2; + // the address that mutation has been send to + bytes to = 3; + uint64 gas = 4; +} + +message EventMessage { + EventType type = 1; + oneof event { + MutationEvent mutation_event = 2; + BlockEvent block_event = 3; + } +} + +message MutationEventFilter { + bytes sender = 1; +} + +message BlockEventFilter {} + +message EventFilter { + oneof filter { + MutationEventFilter mfilter = 1; + BlockEventFilter bfilter = 2; + } +} + + +message Subscribution { + repeated EventType topics = 1; + repeated EventFilter filters = 2; +} diff --git a/src/proto/proto/db3_mutation.proto b/src/proto/proto/db3_mutation.proto index caa7d5a1..e1e23c38 100644 --- a/src/proto/proto/db3_mutation.proto +++ b/src/proto/proto/db3_mutation.proto @@ -19,6 +19,7 @@ syntax = "proto3"; import "db3_base.proto"; import "db3_database.proto"; + package db3_mutation_proto; enum DatabaseAction { @@ -72,6 +73,8 @@ enum PayloadType { QuerySessionPayload =0; DatabasePayload = 1; MintCreditsPayload = 2; + // a evm chain request + TypedDataPayload = 3; } message WriteRequest { diff --git a/src/proto/proto/db3_node.proto b/src/proto/proto/db3_node.proto index 0d14de82..a3524cb5 100644 --- a/src/proto/proto/db3_node.proto +++ b/src/proto/proto/db3_node.proto @@ -20,6 +20,9 @@ import "db3_bill.proto"; import "db3_account.proto"; import "db3_session.proto"; import "db3_database.proto"; +import "db3_event.proto"; +import "db3_mutation.proto"; + package db3_node_proto; message NetworkStatus { @@ -67,6 +70,7 @@ message GetSessionInfoRequest { message OpenSessionRequest { bytes payload = 1; bytes signature = 2; + db3_mutation_proto.PayloadType payload_type = 3; } message OpenSessionResponse { @@ -77,8 +81,12 @@ message OpenSessionResponse { } message CloseSessionRequest { + // the query session info bytes payload = 1; + // the signature of query session info bytes signature = 2; + string session_token = 3; + db3_mutation_proto.PayloadType payload_type = 4; } message CloseSessionResponse { @@ -132,6 +140,11 @@ message RunQueryResponse{ message ShowNetworkStatusRequest {} +message SubscribeRequest { + string session_token = 1; + db3_event_proto.Subscribution sub = 2; +} + service StorageNode { // method for querying bills by height rpc QueryBill(QueryBillRequest) returns (QueryBillResponse) {} @@ -152,4 +165,6 @@ service StorageNode { rpc GetDocument(GetDocumentRequest) returns (GetDocumentResponse) {} // method for show the network status rpc ShowNetworkStatus(ShowNetworkStatusRequest) returns (NetworkStatus){} + // method for subscribution + rpc Subscribe(SubscribeRequest) returns (stream db3_event_proto.EventMessage) {} } diff --git a/src/proto/proto/db3_session.proto b/src/proto/proto/db3_session.proto index bde8a624..83342a2a 100644 --- a/src/proto/proto/db3_session.proto +++ b/src/proto/proto/db3_session.proto @@ -18,36 +18,32 @@ syntax = "proto3"; import "db3_base.proto"; +import "db3_mutation.proto"; + package db3_session_proto; message QuerySessionInfo { // the hex encoded string int32 id = 1; int64 start_time = 2; - int32 query_count = 4; + int32 query_count = 3; + // the meta for client + db3_base_proto.BroadcastMeta meta = 4; } - -message CloseSessionPayload { - QuerySessionInfo session_info = 1; - string session_token = 2; -} message OpenSessionPayload { string header = 1; int64 start_time = 2; } + +// the session for mutation message QuerySession { - // the counter of account - uint64 nonce = 1; - // the chain id of db3 - db3_base_proto.ChainId chain_id = 2; - // the chain role of db3 - db3_base_proto.ChainRole chain_role = 3; - // node query session info - QuerySessionInfo node_query_session_info = 4; - // client query session info - bytes client_query_session = 5; + // the meta for node client + db3_base_proto.BroadcastMeta meta = 1; + // the agreed session info + bytes payload = 2; // client signature - bytes client_signature = 6; + bytes client_signature = 3; + // the type of payload + db3_mutation_proto.PayloadType payload_type = 4; } - diff --git a/src/proto/src/lib.rs b/src/proto/src/lib.rs index 55903502..d1392135 100644 --- a/src/proto/src/lib.rs +++ b/src/proto/src/lib.rs @@ -43,3 +43,6 @@ pub mod db3_message_proto { pub mod db3_faucet_proto { tonic::include_proto!("db3_faucet_proto"); } +pub mod db3_event_proto { + tonic::include_proto!("db3_event_proto"); +} diff --git a/src/sdk/Cargo.toml b/src/sdk/Cargo.toml index 8563477d..6bfabf53 100644 --- a/src/sdk/Cargo.toml +++ b/src/sdk/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["database", "web3", "db3"] [dependencies] -ethers = { version = "1.0.0", features = ["ws"] } +ethers = { workspace = true } db3-proto={path="../proto", version="0.1.0"} db3-error={path="../error", version="0.1.0"} db3-crypto={path="../crypto", version="0.1.0"} @@ -26,8 +26,8 @@ chrono = "0.4.22" enum-primitive-derive = "^0.2" num-traits = "^0.2" rand = "0.8.5" -[dev-dependencies] serde_json = "1.0.88" +[dev-dependencies] db3-base={path="../base", version="0.1.0"} db3-cmd={path="../cmd", version="0.1.0"} criterion = { version = "0.3.4", default-features = false,features = ["async_futures", "async_tokio"]} diff --git a/src/sdk/src/faucet_sdk.rs b/src/sdk/src/faucet_sdk.rs index 9a4f440c..3578dc93 100644 --- a/src/sdk/src/faucet_sdk.rs +++ b/src/sdk/src/faucet_sdk.rs @@ -15,15 +15,9 @@ // limitations under the License. // -use bytes::BytesMut; use db3_error::{DB3Error, Result}; use db3_proto::db3_faucet_proto::{faucet_node_client::FaucetNodeClient, FaucetRequest}; -use ethers::{ - core::types::{Signature, H256}, - core::utils::hash_message, - signers::{LocalWallet, Signer}, -}; -use prost::Message; +use ethers::{core::utils::hash_message, signers::LocalWallet}; use std::sync::Arc; pub struct FaucetSDK { diff --git a/src/sdk/src/mutation_sdk.rs b/src/sdk/src/mutation_sdk.rs index 4669732b..de70186a 100644 --- a/src/sdk/src/mutation_sdk.rs +++ b/src/sdk/src/mutation_sdk.rs @@ -15,6 +15,11 @@ // limitations under the License. // +use ethers::core::types::{ + transaction::eip712::{EIP712Domain, TypedData, Types}, + Bytes, +}; + use bytes::BytesMut; use db3_crypto::{ db3_signer::Db3MultiSchemeSigner, @@ -26,19 +31,37 @@ use db3_proto::db3_mutation_proto::{ }; use db3_proto::db3_node_proto::{storage_node_client::StorageNodeClient, BroadcastRequest}; use prost::Message; +use std::collections::BTreeMap; use std::sync::Arc; pub struct MutationSDK { signer: Db3MultiSchemeSigner, client: Arc>, + types: Types, + use_typed_format: bool, } impl MutationSDK { pub fn new( client: Arc>, signer: Db3MultiSchemeSigner, + use_typed_format: bool, ) -> Self { - Self { client, signer } + let json = serde_json::json!({ + "EIP712Domain": [ + ], + "Message":[ + {"name":"payload", "type":"bytes"}, + {"name":"payloadType", "type":"string"} + ] + }); + let types: Types = serde_json::from_value(json).unwrap(); + Self { + client, + signer, + types, + use_typed_format, + } } pub async fn submit_mint_credit_mutation( @@ -85,16 +108,48 @@ impl MutationSDK { Ok(tx_id) } - pub async fn submit_database_mutation( + fn wrap_typed_database_mutation( &self, database_mutation: &DatabaseMutation, - ) -> Result<(DbId, TxId)> { - let nonce: u64 = match &database_mutation.meta { - Some(m) => Ok(m.nonce), - None => Err(DB3Error::SubmitMutationError( - "meta in mutation is none".to_string(), - )), - }?; + ) -> Result { + let mut mbuf = BytesMut::with_capacity(1024 * 4); + database_mutation + .encode(&mut mbuf) + .map_err(|e| DB3Error::SubmitMutationError(format!("{e}")))?; + let mbuf = Bytes(mbuf.freeze()); + let mut message: BTreeMap = BTreeMap::new(); + message.insert( + "payload".to_string(), + serde_json::Value::from(format!("{mbuf}")), + ); + message.insert("payloadType".to_string(), serde_json::Value::from("1")); + let typed_data = TypedData { + domain: EIP712Domain { + name: None, + version: None, + chain_id: None, + verifying_contract: None, + salt: None, + }, + types: self.types.clone(), + primary_type: "Message".to_string(), + message, + }; + let signature = self.signer.sign_typed_data(&typed_data)?; + let buf = serde_json::to_vec(&typed_data) + .map_err(|e| DB3Error::SubmitMutationError(format!("{e}")))?; + let request = WriteRequest { + signature, + payload: buf, + payload_type: PayloadType::TypedDataPayload.into(), + }; + Ok(request) + } + + fn wrap_proto_database_mutation( + &self, + database_mutation: &DatabaseMutation, + ) -> Result { let mut mbuf = BytesMut::with_capacity(1024 * 4); database_mutation .encode(&mut mbuf) @@ -106,8 +161,29 @@ impl MutationSDK { payload: mbuf.as_ref().to_vec().to_owned(), payload_type: PayloadType::DatabasePayload.into(), }; + Ok(request) + } - // + pub async fn submit_database_mutation( + &self, + database_mutation: &DatabaseMutation, + ) -> Result<(DbId, TxId)> { + let nonce: u64 = match &database_mutation.meta { + Some(m) => Ok(m.nonce), + None => Err(DB3Error::SubmitMutationError( + "meta in mutation is none".to_string(), + )), + }?; + let request = match self.use_typed_format { + true => { + let r = self.wrap_typed_database_mutation(database_mutation)?; + r + } + false => { + let r = self.wrap_proto_database_mutation(database_mutation)?; + r + } + }; //TODO generate the address from local currently // let mut buf = BytesMut::with_capacity(1024 * 4); @@ -142,53 +218,68 @@ mod tests { use crate::mutation_sdk::StorageNodeClient; use crate::sdk_test; use crate::store_sdk::StoreSDK; - use db3_base::get_a_random_nonce; - use db3_proto::db3_base_proto::{ChainId, ChainRole}; - use rand::Rng; use std::sync::Arc; use std::{thread, time}; use tonic::transport::Endpoint; - #[tokio::test] - async fn it_mint_credits_mutation_smoke_test() { - let ep = "http://127.0.0.1:26659"; - let rpc_endpoint = Endpoint::new(ep.to_string()).unwrap(); - let channel = rpc_endpoint.connect_lazy(); - let client = Arc::new(StorageNodeClient::new(channel)); - let (to_address, _signer) = sdk_test::gen_ed25519_signer(127); - let (sender_address, signer) = sdk_test::gen_secp256k1_signer(); - let sdk = MutationSDK::new(client.clone(), signer); + async fn run_mint_credits_mutation_flow( + use_typed_format: bool, + client: Arc>, + counter: i64, + ) { + let (to_address, _signer) = sdk_test::gen_secp256k1_signer(counter + 1); + let (sender_address, signer) = sdk_test::gen_secp256k1_signer(counter + 2); + let sdk = MutationSDK::new(client.clone(), signer, use_typed_format); let dm = sdk_test::create_a_mint_mutation(&sender_address, &to_address); let result = sdk.submit_mint_credit_mutation(&dm).await; assert!(result.is_ok()); let millis = time::Duration::from_millis(2000); thread::sleep(millis); - let (_, signer) = sdk_test::gen_secp256k1_signer(); - let store_sdk = StoreSDK::new(client, signer); + let (_, signer) = sdk_test::gen_secp256k1_signer(counter + 1); + let store_sdk = StoreSDK::new(client, signer, use_typed_format); let account = store_sdk.get_account(&to_address).await.unwrap(); assert_eq!(account.credits, 9 * 1000_000_000); } - #[tokio::test] - async fn it_database_mutation_smoke_test() { - let ep = "http://127.0.0.1:26659"; - let rpc_endpoint = Endpoint::new(ep.to_string()).unwrap(); - let channel = rpc_endpoint.connect_lazy(); - let client = Arc::new(StorageNodeClient::new(channel)); - let (_, signer) = sdk_test::gen_secp256k1_signer(); - let sdk = MutationSDK::new(client.clone(), signer); + async fn run_database_mutation_flow( + use_typed_format: bool, + client: Arc>, + counter: i64, + ) { + let (_, signer) = sdk_test::gen_secp256k1_signer(counter); + let sdk = MutationSDK::new(client.clone(), signer, use_typed_format); let dm = sdk_test::create_a_database_mutation(); let result = sdk.submit_database_mutation(&dm).await; assert!(result.is_ok()); let (db_id, _) = result.unwrap(); let millis = time::Duration::from_millis(2000); thread::sleep(millis); - let (_, signer) = sdk_test::gen_secp256k1_signer(); - let mut store_sdk = StoreSDK::new(client, signer); + let (_, signer) = sdk_test::gen_secp256k1_signer(counter); + let mut store_sdk = StoreSDK::new(client, signer, use_typed_format); let database_ret = store_sdk.get_database(db_id.to_hex().as_str()).await; assert!(database_ret.is_ok()); assert!(database_ret.unwrap().is_some()); let result = store_sdk.close_session().await; assert!(result.is_ok()); } + + #[tokio::test] + async fn smoke_test_run_mint_credits_mutation_flow() { + let ep = "http://127.0.0.1:26659"; + let rpc_endpoint = Endpoint::new(ep.to_string()).unwrap(); + let channel = rpc_endpoint.connect_lazy(); + let client = Arc::new(StorageNodeClient::new(channel)); + run_mint_credits_mutation_flow(false, client.clone(), 10).await; + run_mint_credits_mutation_flow(true, client.clone(), 100).await; + } + + #[tokio::test] + async fn it_database_mutation_smoke_test() { + let ep = "http://127.0.0.1:26659"; + let rpc_endpoint = Endpoint::new(ep.to_string()).unwrap(); + let channel = rpc_endpoint.connect_lazy(); + let client = Arc::new(StorageNodeClient::new(channel)); + run_database_mutation_flow(false, client.clone(), 20).await; + run_database_mutation_flow(true, client.clone(), 30).await; + } } diff --git a/src/sdk/src/sdk_test.rs b/src/sdk/src/sdk_test.rs index bf2b91d5..92e242af 100644 --- a/src/sdk/src/sdk_test.rs +++ b/src/sdk/src/sdk_test.rs @@ -27,19 +27,31 @@ use db3_proto::db3_mutation_proto::DocumentMutation; use db3_proto::db3_mutation_proto::{DatabaseAction, DatabaseMutation, MintCreditsMutation}; use std::time::{SystemTime, UNIX_EPOCH}; -pub fn gen_ed25519_signer(seed_u8: u8) -> (DB3Address, Db3MultiSchemeSigner) { - let seed: [u8; 32] = [seed_u8; 32]; +#[cfg(test)] +pub fn gen_ed25519_signer(seed: i64) -> (DB3Address, Db3MultiSchemeSigner) { + let mut seeds: Vec = vec![]; + seeds.extend_from_slice(&seed.to_be_bytes()); + seeds.extend_from_slice(&seed.to_be_bytes()); + seeds.extend_from_slice(&seed.to_be_bytes()); + seeds.extend_from_slice(&seed.to_be_bytes()); let (addr, kp) = - key_derive::derive_key_pair_from_path(&seed, None, &SignatureScheme::ED25519).unwrap(); + key_derive::derive_key_pair_from_path(&seeds, None, &SignatureScheme::ED25519).unwrap(); (addr, Db3MultiSchemeSigner::new(kp)) } -pub fn gen_secp256k1_signer() -> (DB3Address, Db3MultiSchemeSigner) { - let seed: [u8; 32] = [0; 32]; +#[cfg(test)] +pub fn gen_secp256k1_signer(seed: i64) -> (DB3Address, Db3MultiSchemeSigner) { + let mut seeds: Vec = vec![]; + seeds.extend_from_slice(&seed.to_be_bytes()); + seeds.extend_from_slice(&seed.to_be_bytes()); + seeds.extend_from_slice(&seed.to_be_bytes()); + seeds.extend_from_slice(&seed.to_be_bytes()); let (addr, kp) = - key_derive::derive_key_pair_from_path(&seed, None, &SignatureScheme::Secp256k1).unwrap(); + key_derive::derive_key_pair_from_path(&seeds, None, &SignatureScheme::Secp256k1).unwrap(); (addr, Db3MultiSchemeSigner::new(kp)) } + +#[cfg(test)] fn current_seconds() -> u64 { match SystemTime::now().duration_since(UNIX_EPOCH) { Ok(n) => n.as_secs(), @@ -47,7 +59,8 @@ fn current_seconds() -> u64 { } } -pub fn create_a_mint_mutation(sender: &DB3Address, to: &DB3Address) -> MintCreditsMutation { +#[cfg(test)] +pub fn create_a_mint_mutation(_sender: &DB3Address, to: &DB3Address) -> MintCreditsMutation { let meta = BroadcastMeta { //TODO get from network nonce: current_seconds(), @@ -67,6 +80,7 @@ pub fn create_a_mint_mutation(sender: &DB3Address, to: &DB3Address) -> MintCredi } } +#[cfg(test)] pub fn create_a_database_mutation() -> DatabaseMutation { let meta = BroadcastMeta { //TODO get from network @@ -87,6 +101,7 @@ pub fn create_a_database_mutation() -> DatabaseMutation { dm } +#[cfg(test)] pub fn create_a_collection_mutataion(name: &str, addr: &DB3Address) -> DatabaseMutation { let meta = BroadcastMeta { //TODO get from network @@ -114,6 +129,7 @@ pub fn create_a_collection_mutataion(name: &str, addr: &DB3Address) -> DatabaseM dm } +#[cfg(test)] pub fn add_documents(name: &str, addr: &DB3Address, doc_vec: &Vec<&str>) -> DatabaseMutation { let meta = BroadcastMeta { //TODO get from network diff --git a/src/sdk/src/store_sdk.rs b/src/sdk/src/store_sdk.rs index eea9c030..b97a28f5 100644 --- a/src/sdk/src/store_sdk.rs +++ b/src/sdk/src/store_sdk.rs @@ -19,19 +19,26 @@ use bytes::BytesMut; use chrono::Utc; use db3_crypto::{db3_address::DB3Address, db3_signer::Db3MultiSchemeSigner}; use db3_proto::db3_account_proto::Account; +use db3_proto::db3_base_proto::{BroadcastMeta, ChainId, ChainRole}; use db3_proto::db3_bill_proto::Bill; use db3_proto::db3_database_proto::structured_query::{Limit, Projection}; use db3_proto::db3_database_proto::{Database, Document, StructuredQuery}; +use db3_proto::db3_mutation_proto::PayloadType; use db3_proto::db3_node_proto::{ storage_node_client::StorageNodeClient, CloseSessionRequest, GetAccountRequest, GetDocumentRequest, GetSessionInfoRequest, NetworkStatus, OpenSessionRequest, OpenSessionResponse, QueryBillKey, QueryBillRequest, RunQueryRequest, RunQueryResponse, SessionIdentifier, ShowDatabaseRequest, ShowNetworkStatusRequest, }; -use db3_proto::db3_session_proto::{CloseSessionPayload, OpenSessionPayload, QuerySessionInfo}; +use db3_proto::db3_session_proto::{OpenSessionPayload, QuerySessionInfo}; use db3_session::session_manager::{SessionPool, SessionStatus}; +use ethers::core::types::{ + transaction::eip712::{EIP712Domain, TypedData, Types}, + Bytes, +}; use num_traits::cast::FromPrimitive; use prost::Message; +use std::collections::BTreeMap; use std::sync::Arc; use tonic::Status; use uuid::Uuid; @@ -40,17 +47,32 @@ pub struct StoreSDK { client: Arc>, signer: Db3MultiSchemeSigner, session_pool: SessionPool, + types: Types, + use_typed_format: bool, } impl StoreSDK { pub fn new( client: Arc>, signer: Db3MultiSchemeSigner, + use_typed_format: bool, ) -> Self { + let json = serde_json::json!({ + "EIP712Domain": [ + ], + "Message":[ + {"name":"payload", "type":"bytes"}, + {"name":"payloadType", "type":"string"} + ] + }); + let types: Types = serde_json::from_value(json).unwrap(); + Self { client, signer, session_pool: SessionPool::new(), + types, + use_typed_format, } } @@ -60,7 +82,8 @@ impl StoreSDK { Some(session) => { if session.get_session_query_count() > 2000 { // close session - self.close_session_internal(&token).await?; + self.close_session_internal(&token, self.use_typed_format) + .await?; let response = self.open_session().await?; Ok(response.session_token) } else { @@ -196,18 +219,16 @@ impl StoreSDK { header: Uuid::new_v4().to_string(), start_time: Utc::now().timestamp(), }; - let mut buf = BytesMut::with_capacity(1024 * 8); - payload - .encode(&mut buf) - .map_err(|e| Status::internal(format!("{e}")))?; - let buf = buf.freeze(); - let signature = self - .signer - .sign(buf.as_ref()) - .map_err(|e| Status::internal(format!("{e}")))?; - let r = OpenSessionRequest { - payload: buf.as_ref().to_vec(), - signature: signature.as_ref().to_vec(), + + let r = match self.use_typed_format { + true => { + let r = self.wrap_typed_open_session(&payload)?; + r + } + false => { + let r = self.wrap_proto_open_session(&payload)?; + r + } }; let request = tonic::Request::new(r); let mut client = self.client.as_ref().clone(); @@ -223,35 +244,164 @@ impl StoreSDK { } } + fn wrap_proto_open_session( + &self, + payload: &OpenSessionPayload, + ) -> std::result::Result { + let mut buf = BytesMut::with_capacity(1024 * 8); + payload + .encode(&mut buf) + .map_err(|e| Status::internal(format!("{e}")))?; + let buf = buf.freeze(); + let signature = self + .signer + .sign(buf.as_ref()) + .map_err(|e| Status::internal(format!("{e}")))?; + let r = OpenSessionRequest { + payload: buf.as_ref().to_vec(), + signature: signature.as_ref().to_vec(), + payload_type: PayloadType::QuerySessionPayload.into(), + }; + Ok(r) + } + + fn wrap_typed_open_session( + &self, + payload: &OpenSessionPayload, + ) -> std::result::Result { + let mut mbuf = BytesMut::with_capacity(1024 * 4); + payload + .encode(&mut mbuf) + .map_err(|e| Status::internal(format!("{e}")))?; + let mbuf = Bytes(mbuf.freeze()); + let mut message: BTreeMap = BTreeMap::new(); + message.insert( + "payload".to_string(), + serde_json::Value::from(format!("{mbuf}")), + ); + message.insert("payloadType".to_string(), serde_json::Value::from("0")); + let typed_data = TypedData { + domain: EIP712Domain { + name: None, + version: None, + chain_id: None, + verifying_contract: None, + salt: None, + }, + types: self.types.clone(), + primary_type: "Message".to_string(), + message, + }; + let signature = self + .signer + .sign_typed_data(&typed_data) + .map_err(|e| Status::internal(format!("{e}")))?; + let buf = serde_json::to_vec(&typed_data).map_err(|e| Status::internal(format!("{e}")))?; + let r = OpenSessionRequest { + payload: buf, + signature, + payload_type: PayloadType::TypedDataPayload.into(), + }; + Ok(r) + } + + fn wrap_proto_close_session( + &self, + session: &QuerySessionInfo, + token: &str, + ) -> std::result::Result { + let mut buf = BytesMut::with_capacity(1024 * 8); + session + .encode(&mut buf) + .map_err(|e| Status::internal(format!("{e}")))?; + let buf = buf.freeze(); + let signature = self + .signer + .sign(buf.as_ref()) + .map_err(|e| Status::internal(format!("{e}")))?; + // protobuf payload + let r = CloseSessionRequest { + payload: buf.as_ref().to_vec(), + signature: signature.as_ref().to_vec(), + session_token: token.to_string(), + payload_type: PayloadType::QuerySessionPayload.into(), + }; + Ok(r) + } + fn wrap_typed_close_session( + &self, + session: &QuerySessionInfo, + token: &str, + ) -> std::result::Result { + let mut mbuf = BytesMut::with_capacity(1024 * 4); + session + .encode(&mut mbuf) + .map_err(|e| Status::internal(format!("{e}")))?; + let mbuf = Bytes(mbuf.freeze()); + let mut message: BTreeMap = BTreeMap::new(); + message.insert( + "payload".to_string(), + serde_json::Value::from(format!("{mbuf}")), + ); + message.insert("payloadType".to_string(), serde_json::Value::from("0")); + let typed_data = TypedData { + domain: EIP712Domain { + name: None, + version: None, + chain_id: None, + verifying_contract: None, + salt: None, + }, + types: self.types.clone(), + primary_type: "Message".to_string(), + message, + }; + let signature = self + .signer + .sign_typed_data(&typed_data) + .map_err(|e| Status::internal(format!("{e}")))?; + let buf = serde_json::to_vec(&typed_data).map_err(|e| Status::internal(format!("{e}")))?; + let r = CloseSessionRequest { + payload: buf, + signature, + session_token: token.to_string(), + payload_type: PayloadType::TypedDataPayload.into(), + }; + Ok(r) + } + async fn close_session_internal( &mut self, token: &str, + use_typed_format: bool, ) -> std::result::Result { match self.session_pool.get_session(token) { Some(sess) => { let query_session_info = sess.get_session_info(); - let payload = CloseSessionPayload { - session_info: Some(query_session_info.clone()), - session_token: token.to_string(), + let meta = BroadcastMeta { + //TODO get from network + nonce: Utc::now().timestamp() as u64, + //TODO use config + chain_id: ChainId::DevNet.into(), + //TODO use config + chain_role: ChainRole::StorageShardChain.into(), }; - - let mut buf = BytesMut::with_capacity(1024 * 8); - payload - .encode(&mut buf) - .map_err(|e| Status::internal(format!("{e}")))?; - - let buf = buf.freeze(); - - let signature = self - .signer - .sign(buf.as_ref()) - .map_err(|e| Status::internal(format!("{e}")))?; - - let r = CloseSessionRequest { - payload: buf.as_ref().to_vec(), - signature: signature.as_ref().to_vec(), + let session = QuerySessionInfo { + meta: Some(meta), + id: query_session_info.id, + start_time: query_session_info.start_time, + query_count: query_session_info.query_count, + }; + let r = match use_typed_format { + true => { + let close_request = self.wrap_typed_close_session(&session, token)?; + close_request + } + false => { + let close_request = self.wrap_proto_close_session(&session, token)?; + close_request + } }; - let request = tonic::Request::new(r); let mut client = self.client.as_ref().clone(); match client.close_query_session(request).await { @@ -268,13 +418,15 @@ impl StoreSDK { None => Err(Status::internal(format!("Session {} not exist", token))), } } + /// close session /// 1. verify Account /// 2. request close_query_session /// 3. return node's CloseSessionResponse(query session info and signature) and client's CloseSessionResponse (query session info and signature) pub async fn close_session(&mut self) -> std::result::Result<(), Status> { if let Some(token) = self.session_pool.get_last_token() { - self.close_session_internal(token.as_str()).await?; + self.close_session_internal(token.as_str(), self.use_typed_format) + .await?; } Ok(()) } @@ -352,61 +504,62 @@ mod tests { use bytes::BytesMut; use chrono::Utc; - use db3_proto::db3_base_proto::{ChainId, ChainRole}; use db3_proto::db3_database_proto::structured_query::field_filter::Operator; use db3_proto::db3_database_proto::structured_query::filter::FilterType; use db3_proto::db3_database_proto::structured_query::value::ValueType; - use db3_proto::db3_database_proto::structured_query::{ - FieldFilter, Filter, Limit, Projection, Value, - }; + use db3_proto::db3_database_proto::structured_query::{FieldFilter, Filter, Projection, Value}; use db3_proto::db3_node_proto::storage_node_client::StorageNodeClient; use db3_proto::db3_node_proto::OpenSessionRequest; use db3_proto::db3_session_proto::OpenSessionPayload; - use rand::random; use std::sync::Arc; use std::time; use tonic::transport::Endpoint; use uuid::Uuid; - #[tokio::test] - async fn it_get_bills() { - let ep = "http://127.0.0.1:26659"; - let rpc_endpoint = Endpoint::new(ep.to_string()).unwrap(); - let channel = rpc_endpoint.connect_lazy(); - let client = Arc::new(StorageNodeClient::new(channel)); + async fn run_get_bills_flow( + use_typed_format: bool, + client: Arc>, + counter: i64, + ) { let mclient = client.clone(); - let seed_u8: u8 = random(); { - let (_, signer) = sdk_test::gen_ed25519_signer(seed_u8); - let msdk = MutationSDK::new(mclient, signer); + let (_, signer) = sdk_test::gen_secp256k1_signer(counter); + let msdk = MutationSDK::new(mclient, signer, use_typed_format); let dm = sdk_test::create_a_database_mutation(); let result = msdk.submit_database_mutation(&dm).await; assert!(result.is_ok(), "{:?}", result.err()); let ten_millis = time::Duration::from_millis(2000); std::thread::sleep(ten_millis); } - let (_, signer) = sdk_test::gen_ed25519_signer(seed_u8); - let mut sdk = StoreSDK::new(client, signer); + let (_, signer) = sdk_test::gen_secp256k1_signer(counter); + let mut sdk = StoreSDK::new(client, signer, use_typed_format); let result = sdk.get_block_bills(1).await; if let Err(ref e) = result { println!("{}", e); assert!(false); } assert!(result.is_ok()); + let result = sdk.close_session().await; + assert!(result.is_ok()); } #[tokio::test] - async fn doc_curd_happy_path_smoke_test() { + async fn get_bills_smoke_test() { let ep = "http://127.0.0.1:26659"; let rpc_endpoint = Endpoint::new(ep.to_string()).unwrap(); let channel = rpc_endpoint.connect_lazy(); let client = Arc::new(StorageNodeClient::new(channel)); - let seed_u8: u8 = random(); + run_get_bills_flow(false, client.clone(), 3).await; + run_get_bills_flow(true, client.clone(), 5).await; + } - let (_, signer) = sdk_test::gen_ed25519_signer(seed_u8); - let msdk = MutationSDK::new(client.clone(), signer); - // create a database - // + async fn run_doc_crud_happy_path( + use_typed_format: bool, + client: Arc>, + counter: i64, + ) { + let (_, signer) = sdk_test::gen_secp256k1_signer(counter); + let msdk = MutationSDK::new(client.clone(), signer, use_typed_format); let dm = sdk_test::create_a_database_mutation(); let result = msdk.submit_database_mutation(&dm).await; assert!(result.is_ok(), "{:?}", result.err()); @@ -419,8 +572,8 @@ mod tests { let result = msdk.submit_database_mutation(&cm).await; assert!(result.is_ok()); std::thread::sleep(two_seconds); - let (addr, signer) = sdk_test::gen_ed25519_signer(seed_u8); - let mut sdk = StoreSDK::new(client.clone(), signer); + let (addr, signer) = sdk_test::gen_secp256k1_signer(counter); + let mut sdk = StoreSDK::new(client.clone(), signer, use_typed_format); let database = sdk.get_database(db_id.to_hex().as_str()).await; if let Ok(Some(db)) = database { assert_eq!(&db.address, db_id.address().as_ref()); @@ -480,6 +633,7 @@ mod tests { assert_eq!(documents.documents.len(), 2); let result = sdk.close_session().await; + println!("{:?}", result); assert!(result.is_ok()); std::thread::sleep(two_seconds); @@ -489,6 +643,23 @@ mod tests { assert_eq!(account.total_mutation_count, 3); assert_eq!(account.total_session_count, 1); } + #[tokio::test] + async fn proto_doc_curd_happy_path_smoke_test() { + let ep = "http://127.0.0.1:26659"; + let rpc_endpoint = Endpoint::new(ep.to_string()).unwrap(); + let channel = rpc_endpoint.connect_lazy(); + let client = Arc::new(StorageNodeClient::new(channel)); + run_doc_crud_happy_path(false, client.clone(), 32).await; + } + + #[tokio::test] + async fn typed_data_doc_curd_happy_path_smoke_test() { + let ep = "http://127.0.0.1:26659"; + let rpc_endpoint = Endpoint::new(ep.to_string()).unwrap(); + let channel = rpc_endpoint.connect_lazy(); + let client = Arc::new(StorageNodeClient::new(channel)); + run_doc_crud_happy_path(true, client.clone(), 31).await; + } #[tokio::test] async fn open_session_replay_attack() { @@ -496,8 +667,7 @@ mod tests { let rpc_endpoint = Endpoint::new(ep.to_string()).unwrap(); let channel = rpc_endpoint.connect_lazy(); let mut client = StorageNodeClient::new(channel); - let seed_u8: u8 = random(); - let (_, signer) = sdk_test::gen_ed25519_signer(seed_u8); + let (_, signer) = sdk_test::gen_ed25519_signer(40); let payload = OpenSessionPayload { header: Uuid::new_v4().to_string(), start_time: Utc::now().timestamp(), @@ -512,6 +682,7 @@ mod tests { let r = OpenSessionRequest { payload: buf.as_ref().to_vec(), signature: signature.as_ref().to_vec(), + payload_type: PayloadType::QuerySessionPayload.into(), }; let request = tonic::Request::new(r.clone()); let response = client.open_query_session(request).await; @@ -529,8 +700,7 @@ mod tests { let rpc_endpoint = Endpoint::new(ep.to_string()).unwrap(); let channel = rpc_endpoint.connect_lazy(); let mut client = StorageNodeClient::new(channel); - let seed_u8: u8 = random(); - let (_, signer) = sdk_test::gen_ed25519_signer(seed_u8); + let (_, signer) = sdk_test::gen_ed25519_signer(20); let payload = OpenSessionPayload { header: Uuid::new_v4().to_string(), start_time: Utc::now().timestamp() - 6, @@ -545,6 +715,7 @@ mod tests { let r = OpenSessionRequest { payload: buf.as_ref().to_vec(), signature: signature.as_ref().to_vec(), + payload_type: PayloadType::QuerySessionPayload.into(), }; let request = tonic::Request::new(r.clone()); let response = client.open_query_session(request).await; @@ -557,9 +728,8 @@ mod tests { let rpc_endpoint = Endpoint::new(ep.to_string()).unwrap(); let channel = rpc_endpoint.connect_lazy(); let client = Arc::new(StorageNodeClient::new(channel)); - let seed_u8: u8 = random(); - let (_addr, signer) = sdk_test::gen_ed25519_signer(seed_u8); - let sdk = StoreSDK::new(client.clone(), signer); + let (_addr, signer) = sdk_test::gen_ed25519_signer(50); + let sdk = StoreSDK::new(client.clone(), signer, false); let result = sdk.get_state().await; assert!(result.is_ok()); } diff --git a/src/session/Cargo.toml b/src/session/Cargo.toml index 9a575184..8f59f97c 100644 --- a/src/session/Cargo.toml +++ b/src/session/Cargo.toml @@ -18,6 +18,9 @@ prost = "0.11" prost-types = "0.11" enum-primitive-derive = "^0.2" num-traits = "^0.2" +bytes = "1.0" +serde_json = "1.0" +ethers = {workspace=true} [dependencies.uuid] version = "1.2.2" features = [ @@ -26,7 +29,7 @@ features = [ "macro-diagnostics", # Enable better diagnostics for compile-time UUIDs ] [dev-dependencies] -fastcrypto="0.1.3" +fastcrypto = { workspace = true, features = ["copy_key"] } db3-base={ path ="../base" } bytes = "1" hex = "0.4.3" diff --git a/src/session/src/query_session_verifier.rs b/src/session/src/query_session_verifier.rs index 5f5fe94f..c901d918 100644 --- a/src/session/src/query_session_verifier.rs +++ b/src/session/src/query_session_verifier.rs @@ -1,40 +1,102 @@ -use db3_crypto::account_id::AccountId; +// +// query_session_verifier.rs +// Copyright (C) 2022 db3.network Author imotai +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + use db3_crypto::db3_verifier; +use db3_crypto::id::AccountId; use db3_error::{DB3Error, Result}; -use db3_proto::db3_session_proto::{CloseSessionPayload, QuerySession, QuerySessionInfo}; +use db3_proto::db3_mutation_proto::PayloadType; +use db3_proto::db3_session_proto::QuerySessionInfo; +use ethers::types::{ + transaction::eip712::{Eip712, TypedData}, + Bytes, +}; use prost::Message; +use std::str::FromStr; -pub fn verify_query_session(query_session: &QuerySession) -> Result<(AccountId, QuerySessionInfo)> { - match query_session.node_query_session_info.as_ref() { - Some(node_query_session_info) => match db3_verifier::DB3Verifier::verify( - query_session.client_query_session.as_ref(), - query_session.client_signature.as_ref(), - ) { - Ok(client_account) => { - match CloseSessionPayload::decode(query_session.client_query_session.as_ref()) { - Ok(client_query_session) => { - if check_query_session_info( - &node_query_session_info, - &client_query_session.session_info.as_ref().unwrap(), - ) { - Ok((client_account, node_query_session_info.clone())) - } else { - Err(DB3Error::QuerySessionVerifyError(format!( - "node query count and client query count inconsistent" - ))) - } +fn decode_query_session_info(payload: &[u8]) -> Result { + match QuerySessionInfo::decode(payload) { + Ok(qi) => match &qi.meta { + Some(_) => Ok(qi), + None => Err(DB3Error::QuerySessionVerifyError( + "meta is none".to_string(), + )), + }, + Err(_) => Err(DB3Error::QuerySessionVerifyError( + "invalid mutation data".to_string(), + )), + } +} + +/// +/// +/// verify the query session and return the client account id +/// +/// +pub fn verify_query_session( + payload: &[u8], + payload_type: i32, + signature: &[u8], +) -> Result<(QuerySessionInfo, AccountId)> { + // typeddata + if payload_type as i32 == 3 { + match serde_json::from_slice::(payload) { + Ok(data) => { + let hashed_message: [u8; 32] = data.encode_eip712().map_err(|e| { + DB3Error::QuerySessionVerifyError(format!("invalid payload type for err {e}")) + })?; + + let account_id = + db3_verifier::DB3Verifier::verify_hashed(&hashed_message, signature)?; + + if let (Some(payload), Some(internal_data_type)) = + (data.message.get("payload"), data.message.get("payloadType")) + { + let data: Bytes = serde_json::from_value(payload.clone()).map_err(|e| { + DB3Error::QuerySessionVerifyError(format!( + "invalid payload type for err {e}" + )) + })?; + let internal_data_type = i32::from_str(internal_data_type.as_str().ok_or( + DB3Error::QuerySessionVerifyError("invalid payload type".to_string()), + )?) + .map_err(|e| { + DB3Error::QuerySessionVerifyError(format!( + "fail to convert payload type to i32 {e}" + )) + })?; + if internal_data_type != PayloadType::QuerySessionPayload as i32 { + return Err(DB3Error::QuerySessionVerifyError( + "invalid payload type and query session payload expected".to_string(), + )); } - Err(e) => Err(DB3Error::VerifyFailed(format!( - "invalid client query session info {}", - e - ))), + Ok((decode_query_session_info(data.as_ref())?, account_id)) + } else { + Err(DB3Error::QuerySessionVerifyError( + "bad typed data".to_string(), + )) } } - Err(e) => Err(e), - }, - None => Err(DB3Error::QuerySessionVerifyError(format!( - "node query session info is none" - ))), + Err(e) => Err(DB3Error::QuerySessionVerifyError(format!( + "invalid payload type for err {e}" + ))), + } + } else { + let account_id = db3_verifier::DB3Verifier::verify(payload, signature)?; + Ok((decode_query_session_info(payload)?, account_id)) } } @@ -52,7 +114,10 @@ mod tests { use chrono::Utc; use db3_crypto::db3_signer::Db3MultiSchemeSigner; use db3_crypto::{db3_keypair::DB3KeyPair, key_derive, signature_scheme::SignatureScheme}; - use db3_proto::db3_base_proto::{ChainId, ChainRole}; + use db3_proto::db3_base_proto::{BroadcastMeta, ChainId, ChainRole}; + use ethers::types::transaction::eip712::EIP712Domain; + use ethers::types::transaction::eip712::Types; + use std::collections::BTreeMap; fn get_a_static_keypair() -> DB3KeyPair { let seed: [u8; 32] = [0; 32]; @@ -61,71 +126,121 @@ mod tests { .unwrap(); keypair } + #[test] - fn test_verify_happy_path() -> Result<()> { - let client_query_session_info = QuerySessionInfo { - id: 1, - start_time: Utc::now().timestamp(), - query_count: 10, - }; - let client_query_session = CloseSessionPayload { - session_info: Some(client_query_session_info), - session_token: "DummyToken".to_string(), + fn test_verify_typed_data_happy_path() -> Result<()> { + let meta = BroadcastMeta { + //TODO get from network + nonce: 10, + //TODO use config + chain_id: ChainId::DevNet.into(), + //TODO use config + chain_role: ChainRole::StorageShardChain.into(), }; - let node_query_session_info = QuerySessionInfo { + // the client query session + let query_session_info = QuerySessionInfo { id: 1, start_time: Utc::now().timestamp(), query_count: 10, + meta: Some(meta), }; - // encode and sign client_query_session_info - let kp = get_a_static_keypair(); let mut buf = BytesMut::with_capacity(1024 * 8); - client_query_session.encode(&mut buf).unwrap(); - let buf = buf.freeze(); - let signer = Db3MultiSchemeSigner::new(kp); - let signature_raw = signer.sign(buf.as_ref())?; - let query_session = QuerySession { - nonce: 1, - chain_id: ChainId::MainNet.into(), - chain_role: ChainRole::StorageShardChain.into(), - node_query_session_info: Some(node_query_session_info), - client_query_session: buf.as_ref().to_vec().to_owned(), - client_signature: signature_raw.as_ref().to_vec().to_owned(), + query_session_info.encode(&mut buf).unwrap(); + let payload_session_info = Bytes(buf.freeze()); + let json = serde_json::json!({ + "EIP712Domain": [ + ], + "Message":[ + {"name":"payload", "type":"bytes"}, + {"name":"payloadType", "type":"string"} + ] + }); + let types: Types = serde_json::from_value(json).unwrap(); + assert_eq!(2, types.len()); + let mut message: BTreeMap = BTreeMap::new(); + message.insert( + "payload".to_string(), + serde_json::Value::from(format!("{payload_session_info}")), + ); + message.insert("payloadType".to_string(), serde_json::Value::from("0")); + let typed_data = TypedData { + domain: EIP712Domain { + name: None, + version: None, + chain_id: None, + verifying_contract: None, + salt: None, + }, + types, + primary_type: "Message".to_string(), + message, }; - let res = verify_query_session(&query_session); - assert!(res.is_ok()); + let kp = get_a_static_keypair(); + let signer = Db3MultiSchemeSigner::new(kp); + let sign_account = AccountId::new(signer.get_address().unwrap()); + let signature_raw = signer.sign_typed_data(&typed_data)?; + let typed_data_buf = serde_json::to_vec(&typed_data).unwrap(); + match verify_query_session( + typed_data_buf.as_ref(), + PayloadType::TypedDataPayload as i32, + signature_raw.as_ref(), + ) { + Ok((session_info, account_id)) => { + println!( + "{:?} \n {:?}", + serde_json::to_string(&sign_account.addr).unwrap(), + serde_json::to_string(&account_id.addr).unwrap() + ); + assert!(sign_account.addr == account_id.addr); + assert_eq!(query_session_info, session_info) + } + Err(e) => { + println!("{e}"); + assert!(false) + } + } Ok(()) } #[test] - fn test_verify_fail() -> Result<()> { - let client_query_session_info = QuerySessionInfo { - id: 1, - start_time: Utc::now().timestamp(), - query_count: 100, + fn test_verify_protobuf_happy_path() -> Result<()> { + let meta = BroadcastMeta { + //TODO get from network + nonce: 10, + //TODO use config + chain_id: ChainId::DevNet.into(), + //TODO use config + chain_role: ChainRole::StorageShardChain.into(), }; - let node_query_session_info = QuerySessionInfo { + + // the client query session + let query_session_info = QuerySessionInfo { id: 1, start_time: Utc::now().timestamp(), query_count: 10, + meta: Some(meta), }; - // encode and sign client_query_session_info - let kp = get_a_static_keypair(); + let mut buf = BytesMut::with_capacity(1024 * 8); - client_query_session_info.encode(&mut buf).unwrap(); - let buf = buf.freeze(); + query_session_info.encode(&mut buf).unwrap(); + let payload_session_info = buf.freeze(); + let kp = get_a_static_keypair(); let signer = Db3MultiSchemeSigner::new(kp); - let signature_raw = signer.sign(buf.as_ref())?; - let query_session = QuerySession { - nonce: 1, - chain_id: ChainId::MainNet.into(), - chain_role: ChainRole::StorageShardChain.into(), - node_query_session_info: Some(node_query_session_info), - client_query_session: buf.as_ref().to_vec().to_owned(), - client_signature: signature_raw.as_ref().to_vec().to_owned(), - }; - let res = verify_query_session(&query_session); - assert!(res.is_err()); + let sign_account = AccountId::new(signer.get_address().unwrap()); + let signature_raw = signer.sign(payload_session_info.as_ref())?; + match verify_query_session( + payload_session_info.as_ref(), + PayloadType::QuerySessionPayload as i32, + signature_raw.as_ref(), + ) { + Ok((session_info, account_id)) => { + assert!(sign_account.addr == account_id.addr); + assert_eq!(query_session_info, session_info) + } + Err(_) => { + assert!(false) + } + } Ok(()) } } diff --git a/src/session/src/session_manager.rs b/src/session/src/session_manager.rs index fd4c9712..86bd62d3 100644 --- a/src/session/src/session_manager.rs +++ b/src/session/src/session_manager.rs @@ -267,6 +267,7 @@ impl SessionManager { id, start_time, query_count: 0, + meta: None, }, status: SessionStatus::Running.into(), } diff --git a/src/storage/src/bill_store.rs b/src/storage/src/bill_store.rs index ebd5cc16..e0e186f3 100644 --- a/src/storage/src/bill_store.rs +++ b/src/storage/src/bill_store.rs @@ -85,7 +85,6 @@ mod tests { use super::*; use db3_base::get_a_static_address; use db3_proto::db3_bill_proto::BillType; - use merkdb::proofs::{Decoder, Node}; use std::boxed::Box; use tempdir::TempDir; #[test] diff --git a/src/storage/src/db3_document.rs b/src/storage/src/db3_document.rs index 6d196092..c9b55560 100644 --- a/src/storage/src/db3_document.rs +++ b/src/storage/src/db3_document.rs @@ -1,3 +1,21 @@ +// +// db3_document.rs +// Copyright (C) 2022 db3.network Author imotai +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + use bson::spec::BinarySubtype; use bson::Document; use bson::{Binary, Bson}; @@ -36,6 +54,8 @@ impl DB3Document { Err(err) => Err(DB3Error::DocumentDecodeError(format!("{:?}", err))), } } + + #[cfg(test)] pub fn create_from_json_str( document_json: &str, document_id: &DocumentId, @@ -47,6 +67,7 @@ impl DB3Document { Err(err) => Err(DB3Error::DocumentDecodeError(format!("{:?}", err))), } } + pub fn into_bytes(&self) -> Vec { bson_util::bson_document_into_bytes(&self.root) } @@ -61,6 +82,7 @@ impl DB3Document { .map_err(|e| DB3Error::DocumentDecodeError(format!("{:?}", e)))?; Ok(doc) } + fn add_document_id(&mut self, doc_id: &DocumentId) { self.root.insert( "_doc_id", @@ -71,6 +93,7 @@ impl DB3Document { ); } + #[cfg(test)] pub fn get_document_id(&self) -> std::result::Result { match self.root.get_binary_generic("_doc_id") { Ok(doc_id) => DocumentId::try_from_bytes(doc_id.as_slice()), @@ -159,8 +182,6 @@ impl TryFrom> for DB3Document { #[cfg(test)] mod tests { use super::*; - use bson::spec::ElementType; - use byteorder::ReadBytesExt; use db3_crypto::id::{AccountId, CollectionId, DocumentEntryId}; use db3_proto::db3_database_proto::{ index::index_field::{Order, ValueMode}, diff --git a/src/storage/src/db_key.rs b/src/storage/src/db_key.rs index 8067a309..5e7fd509 100644 --- a/src/storage/src/db_key.rs +++ b/src/storage/src/db_key.rs @@ -35,6 +35,7 @@ impl DbKey { /// /// decode the database key /// + #[allow(dead_code)] pub fn decode(data: &[u8]) -> Result { const MIN_KEY_TOTAL_LEN: usize = DBID_LENGTH + DATABASE.len(); if data.len() < MIN_KEY_TOTAL_LEN { @@ -50,11 +51,13 @@ impl DbKey { Ok(Self(id)) } + #[allow(dead_code)] #[inline] pub fn max() -> Self { DbKey(DbId::max_id()) } + #[allow(dead_code)] #[inline] pub fn min() -> Self { DbKey(DbId::min_id()) diff --git a/src/storage/src/db_store.rs b/src/storage/src/db_store.rs index 51ea9056..bb2995f8 100644 --- a/src/storage/src/db_store.rs +++ b/src/storage/src/db_store.rs @@ -26,10 +26,6 @@ use db3_crypto::{ use db3_error::{DB3Error, Result}; use db3_proto::db3_database_proto::structured_query::field_filter::Operator; use db3_proto::db3_database_proto::structured_query::filter::FilterType; -use db3_proto::db3_database_proto::structured_query::value::ValueType; -use db3_proto::db3_database_proto::structured_query::{ - FieldFilter, Filter, Limit, Projection, Value, -}; use db3_proto::db3_database_proto::{Collection, Database, Document, Index, StructuredQuery}; use db3_proto::db3_mutation_proto::{DatabaseAction, DatabaseMutation}; use db3_types::cost::DbStoreOp; @@ -37,7 +33,7 @@ use itertools::Itertools; use merkdb::proofs::{query::Query, Node, Op as ProofOp}; use merkdb::{tree::Tree, BatchEntry, Merk, Op}; use prost::Message; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::pin::Pin; use tracing::{debug, info, span, warn, Level}; @@ -124,8 +120,8 @@ impl DbStore { .iter() .map(move |x| { idx += 1; - collection_count += 1; - index_count += x.index.len() as u64; + collection_count = collection_count + 1; + index_count = index_count + x.index.len() as u64; ( x.collection_name.to_string(), Collection { @@ -164,6 +160,7 @@ impl DbStore { mutation_id: u16, ) -> Result<(BatchEntry, DbStoreOp)> { let dbid = DbId::try_from((sender, nonce))?; + info!("create a database with id {}", dbid.to_hex()); let (db, mut ops) = Self::new_database(&dbid, sender, tx, mutation, block_id, mutation_id); let (entry, data_in_bytes) = Self::encode_database(dbid, &db)?; ops.update_data_size(data_in_bytes as u64); @@ -490,6 +487,7 @@ impl DbStore { info!("skip update doc when masks fields are empty"); continue; } + info!("document id {}", document_mutation.ids[idx].as_str()); let document_id = DocumentId::try_from_base64(document_mutation.ids[idx].as_str())?; let old_document = if let Some(v) = db @@ -903,6 +901,9 @@ mod tests { use db3_base::bson_util; use db3_crypto::key_derive; use db3_crypto::signature_scheme::SignatureScheme; + use db3_proto::db3_database_proto::structured_query::{ + value::ValueType, FieldFilter, Filter, Limit, Projection, Value, + }; use db3_proto::db3_database_proto::{ index::index_field::{Order, ValueMode}, index::IndexField, @@ -911,7 +912,6 @@ mod tests { use db3_proto::db3_mutation_proto::CollectionMutation; use db3_proto::db3_mutation_proto::DocumentMask; use db3_proto::db3_mutation_proto::DocumentMutation; - use merkdb::rocksdb::merge_operator::delete_callback; use std::boxed::Box; use tempdir::TempDir; @@ -1759,8 +1759,8 @@ mod tests { r#"ApplyDocumentError("invalid update document mutation, ids and masks size different")"#, format!("{:?}", res.err().unwrap()) ); - mutation_id += 1; } + #[test] fn db_store_smoke_test() { let tmp_dir_path = TempDir::new("db_store_test").expect("create temp dir"); diff --git a/src/storage/src/event_store.rs b/src/storage/src/event_store.rs index c3a571fa..5ed73fa8 100644 --- a/src/storage/src/event_store.rs +++ b/src/storage/src/event_store.rs @@ -224,7 +224,7 @@ mod tests { Ok(a) => { assert_eq!(11, a.block_id); } - Err(e) => { + Err(_e) => { assert!(false); } } diff --git a/src/storage/src/faucet_key.rs b/src/storage/src/faucet_key.rs index b4f7fa62..f28b71b6 100644 --- a/src/storage/src/faucet_key.rs +++ b/src/storage/src/faucet_key.rs @@ -28,6 +28,7 @@ pub fn build_faucet_key(addr: &[u8], ts: u32) -> Result> { Ok(buf) } +#[allow(dead_code)] pub fn decode_faucet_key(data: &[u8]) -> Result<(Vec, u32)> { if data.len() != 24 { return Err(DB3Error::KeyCodecError("bad data length".to_string())); diff --git a/src/storage/src/faucet_store.rs b/src/storage/src/faucet_store.rs index 09526879..27c7d46f 100644 --- a/src/storage/src/faucet_store.rs +++ b/src/storage/src/faucet_store.rs @@ -21,7 +21,7 @@ use db3_error::{DB3Error, Result}; use db3_proto::db3_faucet_proto::FaucetRecord; use prost::Message; use redb::ReadableTable; -use redb::{ReadTransaction, TableDefinition, WriteTransaction}; +use redb::{TableDefinition, WriteTransaction}; const FAUCET_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("FAUCET_TABLE"); diff --git a/src/types/src/bill_key.rs b/src/types/src/bill_key.rs index 0601be0e..c99d5747 100644 --- a/src/types/src/bill_key.rs +++ b/src/types/src/bill_key.rs @@ -15,13 +15,11 @@ // limitations under the License. // -use db3_crypto::id::{BillId, BILL_ID_LENGTH}; +use db3_crypto::id::BillId; use db3_error::Result; const BLOCK_BILL: &str = "/bl/"; pub struct BillKey<'a>(pub &'a BillId); -const BILL_KEY_SIZE: usize = BLOCK_BILL.len() + BILL_ID_LENGTH; - impl<'a> BillKey<'a> { pub fn encode(&self) -> Result> { let mut encoded_key = BLOCK_BILL.as_bytes().to_vec(); diff --git a/src/types/src/cost.rs b/src/types/src/cost.rs index 90cccbdc..60b0bd43 100644 --- a/src/types/src/cost.rs +++ b/src/types/src/cost.rs @@ -102,7 +102,6 @@ pub fn estimate_query_session_gas(query_session_info: &QuerySessionInfo) -> u64 mod tests { use super::*; use chrono::Utc; - use db3_proto::db3_base_proto::{ChainId, ChainRole}; use db3_proto::db3_session_proto::QuerySessionInfo; #[test] fn it_estimate_gas_doc_ops() { @@ -136,6 +135,7 @@ mod tests { id: 1, start_time: Utc::now().timestamp(), query_count: 10, + meta: None, }; let gas_fee = estimate_query_session_gas(&node_query_session_info); let target_fee = 1000; diff --git a/tools/start_localnet.sh b/tools/start_localnet.sh index 0b82425a..5ae3e3f7 100644 --- a/tools/start_localnet.sh +++ b/tools/start_localnet.sh @@ -38,6 +38,7 @@ then fi echo "start db3 node..." ./tendermint init > tm.log 2>&1 +export RUST_BACKTRACE=1 ../target/${BUILD_MODE}/db3 start >db3.log 2>&1 & sleep 1 echo "start tendermint node..."