Skip to content

Commit

Permalink
unify interface for metadata and snapshot persistent storage (#7452)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: deef6aef58e3f6e2e8e1f802b344ebad60922716
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Oct 9, 2024
1 parent 7174856 commit aee3eee
Show file tree
Hide file tree
Showing 18 changed files with 364 additions and 274 deletions.
2 changes: 1 addition & 1 deletion python/pathway/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ def test_no_pstorage(tmp_path: pathlib.Path):
pw.io.csv.write(table, output_path)
with pytest.raises(
api.EngineError,
match="persistent metadata backend failed: target object should be a directory",
match="persistence backend failed: target object should be a directory",
):
run(
persistence_config=pw.persistence.Config.simple_config(
Expand Down
6 changes: 3 additions & 3 deletions src/engine/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::result;

use super::ColumnPath;
use super::{Key, Value};
use crate::persistence::BackendError as MetadataBackendError;
use crate::persistence::Error as PersistenceBackendError;

use crate::connectors::data_storage::{ReadError, WriteError};
use crate::persistence::ExternalPersistentId;
Expand Down Expand Up @@ -103,8 +103,8 @@ pub enum Error {
#[error("this method cannot extract from key, use extract instead")]
ExtractFromValueNotSupportedForKey,

#[error("persistent metadata backend failed: {0}")]
PersistentStorageError(#[from] MetadataBackendError),
#[error("persistence backend failed: {0}")]
PersistentStorageError(#[from] PersistenceBackendError),

#[error(transparent)]
Other(DynError),
Expand Down
101 changes: 101 additions & 0 deletions src/persistence/backends/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright © 2024 Pathway

use log::{error, warn};
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};

use futures::channel::oneshot;
use futures::channel::oneshot::Receiver as OneShotReceiver;

use crate::fs_helpers::ensure_directory;
use crate::persistence::backends::MetadataBackend;
use crate::persistence::Error;

const TEMPORARY_OBJECT_SUFFIX: &str = ".tmp";

#[derive(Debug)]
pub struct FilesystemKVStorage {
root_path: PathBuf,
}

impl FilesystemKVStorage {
pub fn new(root_path: &Path) -> Result<Self, Error> {
ensure_directory(root_path)?;
Ok(Self {
root_path: root_path.to_path_buf(),
})
}

fn write_file(temp_path: &Path, final_path: &Path, value: &[u8]) -> Result<(), Error> {
let mut output_file = File::create(temp_path)?;
output_file.write_all(value)?;
// Note: if we need Pathway to tolerate not only Pathway failures,
// but only OS crash or power loss, the below line must be uncommented.
// output_file.sync_all()?;
std::fs::rename(temp_path, final_path)?;
Ok(())
}
}

impl MetadataBackend for FilesystemKVStorage {
fn list_keys(&self) -> Result<Vec<String>, Error> {
let mut keys = Vec::new();

for entry in std::fs::read_dir(&self.root_path)? {
if let Err(e) = entry {
error!("Error while doing the folder scan: {e}. Output may duplicate a part of previous run");
continue;
}
let entry = entry.unwrap();
let file_type = entry.file_type();
match file_type {
Ok(file_type) => {
if !file_type.is_file() {
continue;
}
match entry.file_name().into_string() {
Ok(key) => {
let is_temporary = key.ends_with(TEMPORARY_OBJECT_SUFFIX);
if !is_temporary {
keys.push(key);
}
}
Err(name) => warn!("Non-Unicode file name: {name:?}"),
};
}
Err(e) => {
error!("Couldn't detect file type for {entry:?}: {e}");
}
}
}

Ok(keys)
}

fn get_value(&self, key: &str) -> Result<Vec<u8>, Error> {
Ok(std::fs::read(self.root_path.join(key))?)
}

fn put_value(&mut self, key: &str, value: &[u8]) -> OneShotReceiver<Result<(), Error>> {
let (sender, receiver) = oneshot::channel();

let tmp_path = self
.root_path
.join(key.to_owned() + TEMPORARY_OBJECT_SUFFIX);
let final_path = self.root_path.join(key);
let put_value_result = Self::write_file(&tmp_path, &final_path, value);
let send_result = sender.send(put_value_result);
if let Err(unsent_flush_result) = send_result {
error!(
"The receiver no longer waits for the result of this save: {unsent_flush_result:?}"
);
}
receiver
}

fn remove_key(&self, key: &str) -> Result<(), Error> {
std::fs::remove_file(self.root_path.join(key))?;
Ok(())
}
}
38 changes: 38 additions & 0 deletions src/persistence/backends/mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright © 2024 Pathway

use log::error;

use futures::channel::oneshot;
use futures::channel::oneshot::Receiver as OneShotReceiver;

use crate::persistence::backends::MetadataBackend;
use crate::persistence::Error;

#[derive(Debug)]
#[allow(clippy::module_name_repetitions)]
pub struct MockKVStorage {}

impl MetadataBackend for MockKVStorage {
fn list_keys(&self) -> Result<Vec<String>, Error> {
Ok(vec![])
}

fn get_value(&self, _key: &str) -> Result<Vec<u8>, Error> {
unreachable!()
}

fn put_value(&mut self, _key: &str, _value: &[u8]) -> OneShotReceiver<Result<(), Error>> {
let (sender, receiver) = oneshot::channel();
let send_result = sender.send(Ok(()));
if let Err(unsent_flush_result) = send_result {
error!(
"The receiver no longer waits for the result of this save: {unsent_flush_result:?}"
);
}
receiver
}

fn remove_key(&self, _key: &str) -> Result<(), Error> {
Ok(())
}
}
49 changes: 49 additions & 0 deletions src/persistence/backends/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright © 2024 Pathway

use std::fmt::Debug;
use std::io::Error as IoError;
use std::str::Utf8Error;

use ::s3::error::S3Error;
use futures::channel::oneshot::Receiver as OneShotReceiver;
use serde_json::Error as JsonParseError;

pub use file::FilesystemKVStorage;
pub use mock::MockKVStorage;
pub use s3::S3KVStorage;

pub mod file;
pub mod mock;
pub mod s3;

#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
#[error(transparent)]
FileSystem(#[from] IoError),

#[error(transparent)]
S3(#[from] S3Error),

#[error(transparent)]
Utf8(#[from] Utf8Error),

#[error("metadata entry {0:?} incorrectly formatted: {1}")]
IncorrectMetadataFormat(String, #[source] JsonParseError),
}

/// The persistence backend can be implemented over a Key-Value
/// storage that implements the following interface.
pub trait MetadataBackend: Send + Debug {
/// List all keys present in the storage.
fn list_keys(&self) -> Result<Vec<String>, Error>;

/// Get the value corresponding to the `key`.
fn get_value(&self, key: &str) -> Result<Vec<u8>, Error>;

/// Set the value corresponding to the `key` to `value`.
fn put_value(&mut self, key: &str, value: &[u8]) -> OneShotReceiver<Result<(), Error>>;

/// Remove the value corresponding to the `key`.
fn remove_key(&self, key: &str) -> Result<(), Error>;
}
144 changes: 144 additions & 0 deletions src/persistence/backends/s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright © 2024 Pathway

use log::error;
use std::mem::take;
use std::sync::mpsc;
use std::sync::mpsc::Sender;
use std::thread;

use futures::channel::oneshot;
use futures::channel::oneshot::Receiver as OneShotReceiver;
use futures::channel::oneshot::Sender as OneShotSender;
use s3::bucket::Bucket as S3Bucket;

use crate::deepcopy::DeepCopy;
use crate::persistence::backends::MetadataBackend;
use crate::persistence::Error;
use crate::retry::{execute_with_retries, RetryConfig};

const MAX_S3_RETRIES: usize = 2;

#[derive(Debug)]
enum S3UploaderEvent {
UploadObject {
key: String,
value: Vec<u8>,
result_sender: OneShotSender<Result<(), Error>>,
},
Finish,
}

#[derive(Debug)]
#[allow(clippy::module_name_repetitions)]
pub struct S3KVStorage {
bucket: S3Bucket,
root_path: String,
upload_event_sender: Sender<S3UploaderEvent>,
uploader_thread: Option<std::thread::JoinHandle<()>>,
}

impl S3KVStorage {
pub fn new(bucket: S3Bucket, root_path: &str) -> Self {
let mut root_path_prepared = root_path.to_string();
if !root_path.ends_with('/') {
root_path_prepared += "/";
}

let (upload_event_sender, upload_event_receiver) = mpsc::channel();
let uploader_bucket = bucket.deep_copy();
let uploader_thread = thread::Builder::new()
.name("pathway:s3_snapshot-bg-writer".to_string())
.spawn(move || {
loop {
let event = upload_event_receiver.recv().expect("unexpected termination for s3 objects sender");
match event {
S3UploaderEvent::UploadObject { key, value, result_sender } => {
let upload_result = execute_with_retries(
|| uploader_bucket.put_object(&key, &value),
RetryConfig::default(),
MAX_S3_RETRIES,
).map_err(Error::S3).map(|_| ());
let send_result = result_sender.send(upload_result);
if let Err(unsent_flush_result) = send_result {
error!("The receiver no longer waits for the result of this save: {unsent_flush_result:?}");
}
},
S3UploaderEvent::Finish => break,
};
}
})
.expect("s3 uploader failed");

Self {
bucket,
upload_event_sender,
uploader_thread: Some(uploader_thread),
root_path: root_path_prepared,
}
}

fn full_key_path(&self, key: &str) -> String {
self.root_path.clone() + key
}
}

impl Drop for S3KVStorage {
fn drop(&mut self) {
self.upload_event_sender
.send(S3UploaderEvent::Finish)
.expect("failed to submit the graceful shutdown event");
if let Some(uploader_thread) = take(&mut self.uploader_thread) {
if let Err(e) = uploader_thread.join() {
// there is no formatter for std::any::Any
error!("Failed to join s3 snapshot uploader thread: {e:?}");
}
}
}
}

impl MetadataBackend for S3KVStorage {
fn list_keys(&self) -> Result<Vec<String>, Error> {
let mut keys = Vec::new();

let object_lists = self.bucket.list(self.root_path.clone(), None)?;
let prefix_len = self.root_path.len();

for list in &object_lists {
for object in &list.contents {
let key: &str = &object.key;
assert!(key.len() > self.root_path.len());
keys.push(key[prefix_len..].to_string());
}
}

Ok(keys)
}

fn get_value(&self, key: &str) -> Result<Vec<u8>, Error> {
let full_key_path = self.full_key_path(key);
let response_data = execute_with_retries(
|| self.bucket.get_object(&full_key_path), // returns Err on incorrect status code because fail-on-err feature is enabled
RetryConfig::default(),
MAX_S3_RETRIES,
)?;
Ok(response_data.bytes().to_vec())
}

fn put_value(&mut self, key: &str, value: &[u8]) -> OneShotReceiver<Result<(), Error>> {
let (sender, receiver) = oneshot::channel();
self.upload_event_sender
.send(S3UploaderEvent::UploadObject {
key: self.full_key_path(key),
value: value.to_vec(),
result_sender: sender,
})
.expect("chunk queue submission should not fail");
receiver
}

fn remove_key(&self, key: &str) -> Result<(), Error> {
let full_key_path = self.full_key_path(key);
let _ = self.bucket.delete_object(full_key_path)?;
Ok(())
}
}
6 changes: 3 additions & 3 deletions src/persistence/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use crate::connectors::{PersistenceMode, SnapshotAccess};
use crate::deepcopy::DeepCopy;
use crate::engine::{Timestamp, TotalFrontier};
use crate::fs_helpers::ensure_directory;
use crate::persistence::metadata_backends::{
use crate::persistence::backends::{
FilesystemKVStorage, MetadataBackend, MockKVStorage, S3KVStorage,
};
use crate::persistence::state::MetadataAccessor;
use crate::persistence::BackendError as MetadataBackendError;
use crate::persistence::Error as PersistenceBackendError;
use crate::persistence::{PersistentId, SharedSnapshotWriter};

const STREAMS_DIRECTORY_NAME: &str = "streams";
Expand Down Expand Up @@ -162,7 +162,7 @@ impl PersistenceManagerConfig {
}
}

pub fn create_metadata_storage(&self) -> Result<MetadataAccessor, MetadataBackendError> {
pub fn create_metadata_storage(&self) -> Result<MetadataAccessor, PersistenceBackendError> {
let backend: Box<dyn MetadataBackend> = match &self.metadata_storage {
MetadataStorageConfig::Filesystem(root_path) => {
Box::new(FilesystemKVStorage::new(root_path)?)
Expand Down
Loading

0 comments on commit aee3eee

Please sign in to comment.