Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add start_time to ProcessUID on StorageLock #8753

Merged
merged 11 commits into from
Jun 12, 2024
1 change: 1 addition & 0 deletions crates/storage/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ mdbx = ["reth-libmdbx"]
bench = []
arbitrary = ["reth-primitives/arbitrary", "reth-db-api/arbitrary"]
optimism = []
disable_lock = []
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this feature enabled do we then need sysinfo dep?

opting out is a lot harder than opting in, but in this case we expect that this is only disabled for testing, so imo this is fine


[[bench]]
name = "hash_keys"
Expand Down
98 changes: 79 additions & 19 deletions crates/storage/db/src/lockfile.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
//! Storage lock utils.

#![cfg_attr(feature = "disable_lock", allow(dead_code))]

use reth_storage_errors::lockfile::StorageLockError;
use reth_tracing::tracing::error;
use std::{
path::{Path, PathBuf},
process,
sync::Arc,
};
use sysinfo::System;
use sysinfo::{ProcessRefreshKind, RefreshKind, System};

/// File lock name.
const LOCKFILE_NAME: &str = "lock";
Expand All @@ -28,15 +30,24 @@ impl StorageLock {
/// Note: In-process exclusivity is not on scope. If called from the same process (or another
/// with the same PID), it will succeed.
pub fn try_acquire(path: &Path) -> Result<Self, StorageLockError> {
let path = path.join(LOCKFILE_NAME);
let file_path = path.join(LOCKFILE_NAME);

if let Some(pid) = parse_lock_file_pid(&path)? {
if pid != (process::id() as usize) && System::new_all().process(pid.into()).is_some() {
return Err(StorageLockError::Taken(pid))
}
#[cfg(feature = "disable_lock")]
{
// Too expensive for ef-tests to write/read lock to/from disk.
Ok(Self(Arc::new(StorageLockInner { file_path })))
}

Ok(Self(Arc::new(StorageLockInner::new(path)?)))
#[cfg(not(feature = "disable_lock"))]
{
if let Some(process_lock) = ProcessUID::parse(&file_path)? {
if process_lock.pid != (process::id() as usize) && process_lock.is_active() {
return Err(StorageLockError::Taken(process_lock.pid))
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
}
}

Ok(Self(Arc::new(StorageLockInner::new(file_path)?)))
}
}
}

Expand Down Expand Up @@ -66,19 +77,61 @@ impl StorageLockInner {
reth_fs_util::create_dir_all(parent)?;
}

reth_fs_util::write(&file_path, format!("{}", process::id()))?;
// Write this process unique identifier (pid & start_time) to file
ProcessUID::own().write(&file_path)?;

Ok(Self { file_path })
}
}

/// Parses the PID from the lock file if it exists.
fn parse_lock_file_pid(path: &Path) -> Result<Option<usize>, StorageLockError> {
if path.exists() {
let contents = reth_fs_util::read_to_string(path)?;
return Ok(contents.trim().parse().ok())
#[derive(Debug)]
struct ProcessUID {
/// OS process identifier
pid: usize,
/// Process start time
start_time: u64,
}

impl ProcessUID {
/// Creates [`Self`] for the provided PID.
fn new(pid: usize) -> Option<Self> {
System::new_with_specifics(RefreshKind::new().with_processes(ProcessRefreshKind::new()))
.process(pid.into())
.map(|process| Self { pid, start_time: process.start_time() })
}

/// Creates [`Self`] from own process.
fn own() -> Self {
Self::new(process::id() as usize).expect("own process")
}

/// Parses [`Self`] from a file.
fn parse(path: &Path) -> Result<Option<Self>, StorageLockError> {
if path.exists() {
if let Ok(contents) = reth_fs_util::read_to_string(path) {
let mut lines = contents.lines();
if let (Some(Ok(pid)), Some(Ok(start_time))) = (
lines.next().map(str::trim).map(str::parse),
lines.next().map(str::trim).map(str::parse),
) {
return Ok(Some(Self { pid, start_time }));
}
}
}
Ok(None)
}

/// Whether a process with this `pid` and `start_time` exists.
fn is_active(&self) -> bool {
System::new_with_specifics(RefreshKind::new().with_processes(ProcessRefreshKind::new()))
.process(self.pid.into())
.is_some_and(|p| p.start_time() == self.start_time)
}

/// Writes `pid` and `start_time` to a file.
fn write(&self, path: &Path) -> Result<(), StorageLockError> {
Ok(reth_fs_util::write(path, format!("{}\n{}", self.pid, self.start_time))?)
}
Ok(None)
}

#[cfg(test)]
Expand All @@ -101,12 +154,19 @@ mod tests {
while system.process(fake_pid.into()).is_some() {
fake_pid += 1;
}
reth_fs_util::write(&lock_file, format!("{}", fake_pid)).unwrap();
assert_eq!(Ok(lock), StorageLock::try_acquire(temp_dir.path()));
ProcessUID { pid: fake_pid, start_time: u64::MAX }.write(&lock_file).unwrap();
assert_eq!(Ok(lock.clone()), StorageLock::try_acquire(temp_dir.path()));

let mut pid_1 = ProcessUID::new(1).unwrap();

// A lock of a different but existing PID cannot be acquired.
reth_fs_util::write(&lock_file, "1").unwrap();
// If a parsed `ProcessUID` exists, the lock can NOT be acquired.
pid_1.write(&lock_file).unwrap();
assert_eq!(Err(StorageLockError::Taken(1)), StorageLock::try_acquire(temp_dir.path()));

// A lock of a different but existing PID can be acquired ONLY IF the start_time differs.
pid_1.start_time += 1;
pid_1.write(&lock_file).unwrap();
assert_eq!(Ok(lock), StorageLock::try_acquire(temp_dir.path()));
}

#[test]
Expand All @@ -116,8 +176,8 @@ mod tests {

let lock = StorageLock::try_acquire(temp_dir.path()).unwrap();

assert!(lock_file.exists());
drop(lock);

assert!(!lock_file.exists());
}
}
2 changes: 1 addition & 1 deletion testing/ef-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ asm-keccak = ["reth-primitives/asm-keccak"]

[dependencies]
reth-primitives.workspace = true
reth-db = { workspace = true, features = ["mdbx", "test-utils"] }
reth-db = { workspace = true, features = ["mdbx", "test-utils", "disable_lock"] }
reth-db-api.workspace = true
reth-provider = { workspace = true, features = ["test-utils"] }
reth-stages.workspace = true
Expand Down
Loading