Skip to content

Commit

Permalink
feat: add sqllite as an option to store signing keys (#723)
Browse files Browse the repository at this point in the history
* fix: updates for light client read

* fix: only increment nonce by 1 on force_change_authorities

* setup sqlx interface

* setup path correctly

* cleanup logs

---------

Co-authored-by: drewstone <drewstone329@gmail.com>
  • Loading branch information
1xstj and drewstone authored Dec 18, 2023
1 parent 125d553 commit 41d0447
Show file tree
Hide file tree
Showing 13 changed files with 683 additions and 32 deletions.
447 changes: 433 additions & 14 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ bytes = { version = "1.4.0", default-features = false }
bincode2 = { version = "2.0.1", default-features = false }
structopt = { version = "0.3.26", default-features = false }
hash-db = { version = "0.16.0", default-features = false }
sqlx = { version = "0.7.1", default-features = false, features = ["macros"] }

[profile.release]
panic = 'unwind'
Binary file added alice
Binary file not shown.
3 changes: 2 additions & 1 deletion dkg-gadget/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ itertools = { workspace = true }
sync_wrapper = { workspace = true }
async-stream = { workspace = true }
lazy_static = { workspace = true }

log = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-native-tls", "sqlite", "json"] }
hash-db = { workspace = true, optional = true }
webb-proposals = { workspace = true }

Expand Down
2 changes: 1 addition & 1 deletion dkg-gadget/src/async_protocols/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use curv::elliptic::curves::Secp256k1;
use dkg_runtime_primitives::{SessionId, StoredUnsignedProposalBatch};
use multi_party_ecdsa::protocols::multi_party_ecdsa::gg_2020::state_machine::keygen::LocalKey;
use serde::{Deserialize, Serialize};
use sqlx::{FromRow, Sqlite};
use std::fmt::{Debug, Formatter};
use wsts::{
common::{PolyCommitment, Signature},
Expand All @@ -17,7 +18,6 @@ pub enum LocalKeyType {
ECDSA(LocalKey<Secp256k1>),
FROST(Vec<PolyCommitment>, PartyState),
}

impl Debug for LocalKeyType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
2 changes: 2 additions & 0 deletions dkg-gadget/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use dkg_primitives::{types::DKGError, SessionId};

mod mem;
mod offchain_storage;
mod sql_storage;

use crate::async_protocols::types::LocalKeyType;
pub use mem::DKGInMemoryDb;
pub use offchain_storage::DKGOffchainStorageDb;
pub use sql_storage::{BackendConfig, SqlBackend, SqliteBackendConfig};

/// A Database backend, specifically for the DKG to store and load important state
///
Expand Down
199 changes: 199 additions & 0 deletions dkg-gadget/src/db/sql_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
use super::*;
use crate::{async_protocols::types::LocalKeyType, debug_logger::DebugLogger, DKGKeystore};
use dkg_primitives::{
types::DKGError,
utils::{decrypt_data, encrypt_data},
SessionId,
};
use dkg_runtime_primitives::offchain::crypto::{Pair as AppPair, Public};
use sc_client_api::Backend;
use sc_keystore::LocalKeystore;
use sp_core::{offchain::OffchainStorage, Pair};
use sp_runtime::{
generic::BlockId,
traits::{BlakeTwo256, Block as BlockT, Header as HeaderT, UniqueSaturatedInto, Zero},
};
use sqlx::{
query::Query,
sqlite::{
SqliteArguments, SqliteConnectOptions, SqlitePool, SqlitePoolOptions, SqliteQueryResult,
},
ConnectOptions, Error, Execute, QueryBuilder, Row, Sqlite,
};
use std::{cmp::Ordering, collections::HashSet, num::NonZeroU32, str::FromStr, sync::Arc};

// Represents the Sqlite connection options that are
/// used to establish a database connection.
#[derive(Debug)]
pub struct SqliteBackendConfig<'a> {
pub path: &'a str,
pub create_if_missing: bool,
pub thread_count: u32,
pub cache_size: u64,
}

/// Represents the backend configurations.
#[derive(Debug)]
pub enum BackendConfig<'a> {
Sqlite(SqliteBackendConfig<'a>),
}

#[derive(Clone)]
pub struct SqlBackend {
/// The Sqlite connection.
pool: SqlitePool,

/// The number of allowed operations for the Sqlite filter call.
/// A value of `0` disables the timeout.
num_ops_timeout: i32,
}

impl SqlBackend {
pub fn new(
config: BackendConfig<'_>,
pool_size: u32,
num_ops_timeout: Option<NonZeroU32>,
) -> Result<Self, Error> {
futures::executor::block_on(async {
Self::new_inner(config, pool_size, num_ops_timeout).await
})
}
/// Creates a new instance of the SQL backend.
pub async fn new_inner(
config: BackendConfig<'_>,
pool_size: u32,
num_ops_timeout: Option<NonZeroU32>,
) -> Result<Self, Error> {
let any_pool = SqlitePoolOptions::new()
.max_connections(pool_size)
.connect_lazy_with(Self::connect_options(&config)?.disable_statement_logging());
let _ = Self::create_database_if_not_exists(&any_pool).await?;
let _ = Self::create_indexes_if_not_exist(&any_pool).await?;
Ok(Self {
pool: any_pool,
num_ops_timeout: num_ops_timeout
.map(|n| n.get())
.unwrap_or(0)
.try_into()
.unwrap_or(i32::MAX),
})
}

fn connect_options(config: &BackendConfig) -> Result<SqliteConnectOptions, Error> {
match config {
BackendConfig::Sqlite(config) => {
log::info!(target: "dkg-gadget", "📑 Connection configuration: {config:?}");
let config = sqlx::sqlite::SqliteConnectOptions::from_str(config.path)?
.create_if_missing(config.create_if_missing)
// https://www.sqlite.org/pragma.html#pragma_busy_timeout
.busy_timeout(std::time::Duration::from_secs(8))
// 200MB, https://www.sqlite.org/pragma.html#pragma_cache_size
.pragma("cache_size", format!("-{}", config.cache_size))
// https://www.sqlite.org/pragma.html#pragma_analysis_limit
.pragma("analysis_limit", "1000")
// https://www.sqlite.org/pragma.html#pragma_threads
.pragma("threads", config.thread_count.to_string())
// https://www.sqlite.org/pragma.html#pragma_threads
.pragma("temp_store", "memory")
// https://www.sqlite.org/wal.html
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
// https://www.sqlite.org/pragma.html#pragma_synchronous
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal);
Ok(config)
},
}
}

/// Get the underlying Sqlite pool.
pub fn pool(&self) -> &SqlitePool {
&self.pool
}

/// Create the Sqlite database if it does not already exist.
async fn create_database_if_not_exists(pool: &SqlitePool) -> Result<SqliteQueryResult, Error> {
sqlx::query(
"BEGIN;
CREATE TABLE IF NOT EXISTS dkg_keys (
id INTEGER PRIMARY KEY,
session_id INTEGER NOT NULL,
local_key BLOB NOT NULL,
);
COMMIT;",
)
.execute(pool)
.await
}

/// Create the Sqlite database indices if it does not already exist.
async fn create_indexes_if_not_exist(pool: &SqlitePool) -> Result<SqliteQueryResult, Error> {
sqlx::query(
"BEGIN;
CREATE INDEX IF NOT EXISTS session_id_index ON dkg_keys (
session_id
);
COMMIT;",
)
.execute(pool)
.await
}

async fn get_local_key(&self, session_id: SessionId) -> Result<Option<LocalKeyType>, DKGError> {
log::info!("{}", format!("SQL Storage : Fetching local keys for session {session_id:?}"));
let session_id: i64 = session_id as i64;
match sqlx::query("SELECT local_key FROM dkg_keys WHERE session_id = ?")
.bind(session_id)
.fetch_optional(self.pool())
.await
{
Ok(result) => {
if let Some(row) = result {
let local_key_json: String = row.get(0);
let local_key: LocalKeyType = serde_json::from_str(&local_key_json).unwrap();
return Ok(Some(local_key))
}
return Err(DKGError::LocalKeyNotFound)
},
Err(err) => {
log::debug!(target: "dkg-gadget", "Failed retrieving key for session_id {err:?}");
return Err(DKGError::LocalKeyNotFound)
},
}
}

async fn store_local_key(
&self,
session_id: SessionId,
local_key: LocalKeyType,
) -> Result<(), DKGError> {
log::info!(
"{}",
format!(
"SQL Storage : Store local keys for session {session_id:?}, Key : {local_key:?}"
)
);
let session_id: i64 = session_id as i64;
let mut tx = self.pool().begin().await.map_err(|_| DKGError::StoringLocalKeyFailed)?;
let local_key = serde_json::to_string(&local_key).unwrap();
sqlx::query("INSERT INTO dkg_keys(session_id, local_key) VALUES (?)")
.bind(session_id)
.bind(local_key)
.execute(&mut *tx)
.await
.map_err(|_| DKGError::StoringLocalKeyFailed)?;
tx.commit().await.map_err(|_| DKGError::StoringLocalKeyFailed)
}
}

impl super::DKGDbBackend for SqlBackend {
fn get_local_key(&self, session_id: SessionId) -> Result<Option<LocalKeyType>, DKGError> {
futures::executor::block_on(async { self.get_local_key(session_id).await })
}

fn store_local_key(
&self,
session_id: SessionId,
local_key: LocalKeyType,
) -> Result<(), DKGError> {
futures::executor::block_on(async { self.store_local_key(session_id, local_key).await })
}
}
40 changes: 30 additions & 10 deletions dkg-gadget/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{marker::PhantomData, sync::Arc};

use debug_logger::DebugLogger;
use dkg_runtime_primitives::{crypto::AuthorityId, DKGApi, MaxAuthorities, MaxProposalLength};
use parking_lot::RwLock;
Expand All @@ -26,6 +23,11 @@ use sp_api::{NumberFor, ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::Block;
use std::{
marker::PhantomData,
path::{Path, PathBuf},
sync::Arc,
};

mod error;
/// Stores keypairs for DKG
Expand Down Expand Up @@ -107,6 +109,8 @@ where
pub prometheus_registry: Option<Registry>,
/// For logging
pub debug_logger: DebugLogger,
/// database path
pub db_path: PathBuf,
/// Phantom block type
pub _block: PhantomData<B>,
}
Expand Down Expand Up @@ -134,6 +138,7 @@ where
local_keystore,
_block,
debug_logger,
db_path,
} = dkg_params;

let dkg_keystore: DKGKeystore = DKGKeystore::new(key_store, debug_logger.clone());
Expand Down Expand Up @@ -180,13 +185,28 @@ where

// In memory backend, not used for now
// let db_backend = Arc::new(db::DKGInMemoryDb::new());
let offchain_db_backend = db::DKGOffchainStorageDb::new(
backend.clone(),
dkg_keystore.clone(),
local_keystore.clone(),
debug_logger.clone(),
);
let db_backend = Arc::new(offchain_db_backend);
// let offchain_db_backend = db::DKGOffchainStorageDb::new(
// backend.clone(),
// dkg_keystore.clone(),
// local_keystore.clone(),
// debug_logger.clone(),
// );

let path = Path::new("sqlite:///").join(db_path).join("frontier.db3");

let sql_backend = db::SqlBackend::new(
db::BackendConfig::Sqlite(db::SqliteBackendConfig {
path: path.to_str().unwrap(),
create_if_missing: true,
cache_size: 20480,
thread_count: 4,
}),
1,
None,
)
.unwrap();

let db_backend = Arc::new(sql_backend);
let worker_params = worker::WorkerParams {
latest_header,
client,
Expand Down
4 changes: 4 additions & 0 deletions dkg-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ pub enum DKGError {
InvalidSigningSet,
InputOutOfBounds,
CannotSign,
LocalKeyNotFound,
StoringLocalKeyFailed,
}

impl fmt::Display for DKGError {
Expand All @@ -242,6 +244,8 @@ impl fmt::Display for DKGError {
InvalidSigningSet => "Invalid Signing Set!".to_string(),
InputOutOfBounds => "Input value out of bounds set by runtime".to_string(),
CannotSign => "Could not sign public key".to_string(),
LocalKeyNotFound => "Local key not found!".to_string(),
StoringLocalKeyFailed => "Local key not be saved!".to_string(),
};
write!(f, "DKGError of type {label}")
}
Expand Down
10 changes: 5 additions & 5 deletions scripts/run-local-testnet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,28 @@ echo "*** Start Webb DKG Node ***"
# Alice
./target/release/dkg-standalone-node --tmp --chain local --validator -lerror --alice --output-path=./tmp/alice/output.log \
--rpc-cors all --rpc-external --rpc-methods=unsafe \
--port 30333 \
--port 30333 --db-path=./tmp/alice/db \
--rpc-port 9944 --node-key 0000000000000000000000000000000000000000000000000000000000000001 &
# Bob
./target/release/dkg-standalone-node --tmp --chain local --validator -lerror --bob --output-path=./tmp/bob/output.log \
--rpc-cors all --rpc-external --rpc-methods=unsafe \
--port 30305 \
--port 30305 --db-path=./tmp/bob/db \
--rpc-port 9945 --bootnodes /ip4/127.0.0.1/tcp/30333/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp &
# Charlie
./target/release/dkg-standalone-node --tmp --chain local --validator -lerror --charlie --output-path=./tmp/charlie/output.log \
--rpc-cors all --rpc-external --rpc-methods=unsafe \
--port 30308 \
--port 30308 --db-path=./tmp/charlie/db \
--rpc-port 9946 --bootnodes /ip4/127.0.0.1/tcp/30333/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp &
# Dave
./target/release/dkg-standalone-node --tmp --chain local --validator -lerror --dave --output-path=./tmp/dave/output.log \
--rpc-cors all --rpc-external --rpc-methods=unsafe \
--port 30309 \
--port 30309 --db-path=./tmp/dave/db \
--rpc-port 9947 --bootnodes /ip4/127.0.0.1/tcp/30333/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp &
# Eve
./target/release/dkg-standalone-node --tmp --chain local --validator -linfo --eve --output-path=./tmp/eve/output.log \
--rpc-cors all --rpc-external \
--rpc-port 9948 \
--port 30310 \
--port 30310 --db-path=./tmp/eve/db \
-ldkg=debug \
-ldkg_gadget::worker=debug \
-lruntime::dkg_metadata=debug \
Expand Down
2 changes: 2 additions & 0 deletions standalone/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub struct Cli {
pub subcommand: Option<Subcommand>,
#[clap(flatten)]
pub run: RunCmd,
#[arg(long, short = 'd')]
pub db_path: std::path::PathBuf,
#[arg(long, short = 'o')]
pub output_path: Option<std::path::PathBuf>,
#[clap(flatten)]
Expand Down
1 change: 1 addition & 0 deletions standalone/node/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ pub fn run() -> sc_cli::Result<()> {
runner.run_node_until_exit(|config| async move {
service::new_full(service::RunFullParams {
config,
db_path: cli.db_path,
debug_output: cli.output_path,
relayer_cmd: cli.relayer_cmd,
})
Expand Down
Loading

0 comments on commit 41d0447

Please sign in to comment.