Skip to content

Commit

Permalink
Accountsdb plugin write ordering (solana-labs#20948)
Browse files Browse the repository at this point in the history
Use the write_version in the Accounts's meta data so that account write with lower write_version would not overwrite the higher ones.
  • Loading branch information
lijunwangs authored Oct 25, 2021
1 parent cf0fd5b commit bbe3ce3
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct ReplicaAccountInfo<'a> {
pub executable: bool,
pub rent_epoch: u64,
pub data: &'a [u8],
pub write_version: u64,
}

pub enum ReplicaAccountInfoVersions<'a> {
Expand Down
13 changes: 7 additions & 6 deletions accountsdb-plugin-manager/src/accounts_update_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ use {
solana_metrics::*,
solana_runtime::{
accounts_update_notifier_interface::AccountsUpdateNotifierInterface,
append_vec::StoredAccountMeta,
append_vec::{StoredAccountMeta, StoredMeta},
},
solana_sdk::{
account::{AccountSharedData, ReadableAccount},
clock::Slot,
pubkey::Pubkey,
},
std::sync::{Arc, RwLock},
};
Expand All @@ -24,8 +23,8 @@ pub(crate) struct AccountsUpdateNotifierImpl {
}

impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl {
fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData) {
if let Some(account_info) = self.accountinfo_from_shared_account_data(pubkey, account) {
fn notify_account_update(&self, slot: Slot, meta: &StoredMeta, account: &AccountSharedData) {
if let Some(account_info) = self.accountinfo_from_shared_account_data(meta, account) {
self.notify_plugins_of_account_update(account_info, slot, false);
}
}
Expand Down Expand Up @@ -108,16 +107,17 @@ impl AccountsUpdateNotifierImpl {

fn accountinfo_from_shared_account_data<'a>(
&self,
pubkey: &'a Pubkey,
meta: &'a StoredMeta,
account: &'a AccountSharedData,
) -> Option<ReplicaAccountInfo<'a>> {
Some(ReplicaAccountInfo {
pubkey: pubkey.as_ref(),
pubkey: meta.pubkey.as_ref(),
lamports: account.lamports(),
owner: account.owner().as_ref(),
executable: account.executable(),
rent_epoch: account.rent_epoch(),
data: account.data(),
write_version: meta.write_version,
})
}

Expand All @@ -132,6 +132,7 @@ impl AccountsUpdateNotifierImpl {
executable: stored_account_meta.account_meta.executable,
rent_epoch: stored_account_meta.account_meta.rent_epoch,
data: stored_account_meta.data,
write_version: stored_account_meta.meta.write_version,
})
}

Expand Down
6 changes: 4 additions & 2 deletions accountsdb-plugin-postgres/scripts/create_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ CREATE TABLE account (
executable BOOL NOT NULL,
rent_epoch BIGINT NOT NULL,
data BYTEA,
write_version BIGINT NOT NULL,
updated_on TIMESTAMP NOT NULL
);

Expand All @@ -35,14 +36,15 @@ CREATE TABLE account_audit (
executable BOOL NOT NULL,
rent_epoch BIGINT NOT NULL,
data BYTEA,
write_version BIGINT NOT NULL,
updated_on TIMESTAMP NOT NULL
);

CREATE FUNCTION audit_account_update() RETURNS trigger AS $audit_account_update$
BEGIN
INSERT INTO account_audit (pubkey, owner, lamports, slot, executable, rent_epoch, data, updated_on)
INSERT INTO account_audit (pubkey, owner, lamports, slot, executable, rent_epoch, data, write_version, updated_on)
VALUES (OLD.pubkey, OLD.owner, OLD.lamports, OLD.slot,
OLD.executable, OLD.rent_epoch, OLD.data, OLD.updated_on);
OLD.executable, OLD.rent_epoch, OLD.data, OLD.write_version, OLD.updated_on);
RETURN NEW;
END;

Expand Down
30 changes: 23 additions & 7 deletions accountsdb-plugin-postgres/src/postgres_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const MAX_ASYNC_REQUESTS: usize = 40960;
const DEFAULT_POSTGRES_PORT: u16 = 5432;
const DEFAULT_THREADS_COUNT: usize = 100;
const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10;
const ACCOUNT_COLUMN_COUNT: usize = 8;
const ACCOUNT_COLUMN_COUNT: usize = 9;

struct PostgresSqlClientWrapper {
client: Client,
Expand Down Expand Up @@ -63,6 +63,7 @@ pub struct DbAccountInfo {
pub rent_epoch: i64,
pub data: Vec<u8>,
pub slot: i64,
pub write_version: i64,
}

impl DbAccountInfo {
Expand All @@ -76,6 +77,7 @@ impl DbAccountInfo {
rent_epoch: account.rent_epoch() as i64,
data,
slot: slot as i64,
write_version: account.write_version(),
}
}
}
Expand All @@ -87,6 +89,7 @@ pub trait ReadableAccountInfo: Sized {
fn executable(&self) -> bool;
fn rent_epoch(&self) -> i64;
fn data(&self) -> &[u8];
fn write_version(&self) -> i64;
}

impl ReadableAccountInfo for DbAccountInfo {
Expand All @@ -113,6 +116,10 @@ impl ReadableAccountInfo for DbAccountInfo {
fn data(&self) -> &[u8] {
&self.data
}

fn write_version(&self) -> i64 {
self.write_version
}
}

impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> {
Expand All @@ -139,6 +146,10 @@ impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> {
fn data(&self) -> &[u8] {
self.data
}

fn write_version(&self) -> i64 {
self.write_version as i64
}
}

pub trait PostgresClient {
Expand Down Expand Up @@ -191,11 +202,11 @@ impl SimplePostgresClient {
let batch_size = config
.batch_size
.unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) VALUES");
let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) VALUES");
for j in 0..batch_size {
let row = j * ACCOUNT_COLUMN_COUNT;
let val_str = format!(
"(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})",
"(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})",
row + 1,
row + 2,
row + 3,
Expand All @@ -204,6 +215,7 @@ impl SimplePostgresClient {
row + 6,
row + 7,
row + 8,
row + 9,
);

if j == 0 {
Expand All @@ -214,7 +226,8 @@ impl SimplePostgresClient {
}

let handle_conflict = "ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
data=excluded.data, updated_on=excluded.updated_on WHERE acct.slot <= excluded.slot";
data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\
acct.slot = excluded.slot AND acct.write_version < excluded.write_version)";

stmt = format!("{} {}", stmt, handle_conflict);

Expand All @@ -238,10 +251,11 @@ impl SimplePostgresClient {
client: &mut Client,
config: &AccountsDbPluginPostgresConfig,
) -> Result<Statement, AccountsDbPluginError> {
let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
data=excluded.data, updated_on=excluded.updated_on WHERE acct.slot <= excluded.slot";
data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\
acct.slot = excluded.slot AND acct.write_version < excluded.write_version)";

let stmt = client.prepare(stmt);

Expand Down Expand Up @@ -277,6 +291,7 @@ impl SimplePostgresClient {
&account.executable(),
&rent_epoch,
&account.data(),
&account.write_version(),
&updated_on,
],
);
Expand Down Expand Up @@ -324,6 +339,7 @@ impl SimplePostgresClient {
values.push(&account.executable);
values.push(&account.rent_epoch);
values.push(&account.data);
values.push(&account.write_version);
values.push(&updated_on);
}
measure.stop();
Expand Down
23 changes: 21 additions & 2 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,18 @@ impl AccountsDb {
)
}

pub fn new_for_tests_with_caching(paths: Vec<PathBuf>, cluster_type: &ClusterType) -> Self {
AccountsDb::new_with_config(
paths,
cluster_type,
AccountSecondaryIndexes::default(),
true,
AccountShrinkThreshold::default(),
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None,
)
}

pub fn new_with_config(
paths: Vec<PathBuf>,
cluster_type: &ClusterType,
Expand Down Expand Up @@ -1680,6 +1692,13 @@ impl AccountsDb {
}
}

pub fn new_single_for_tests_with_caching() -> Self {
AccountsDb {
min_num_stores: 0,
..AccountsDb::new_for_tests_with_caching(Vec::new(), &ClusterType::Development)
}
}

fn new_storage_entry(&self, slot: Slot, path: &Path, size: u64) -> AccountStorageEntry {
AccountStorageEntry::new(
path,
Expand Down Expand Up @@ -4833,6 +4852,8 @@ impl AccountsDb {
lamports: account.lamports(),
};

self.notify_account_at_accounts_update(slot, meta, &account);

let cached_account = self.accounts_cache.store(slot, &meta.pubkey, account, hash);
// hash this account in the bg
match &self.sender_bg_hasher {
Expand Down Expand Up @@ -6227,8 +6248,6 @@ impl AccountsDb {

pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) {
self.store(slot, accounts, self.caching_enabled);

self.notify_account_at_accounts_update(slot, accounts);
}

/// Store the account update.
Expand Down
Loading

0 comments on commit bbe3ce3

Please sign in to comment.