diff --git a/python/pathway/tests/test_io.py b/python/pathway/tests/test_io.py index 71d74dd3..f9abb24f 100644 --- a/python/pathway/tests/test_io.py +++ b/python/pathway/tests/test_io.py @@ -1071,7 +1071,7 @@ def test_no_pstorage(tmp_path: pathlib.Path): pw.io.csv.write(table, output_path) with pytest.raises( api.EngineError, - match="persistent metadata backend failed: target object should be a directory", + match="persistence backend failed: target object should be a directory", ): run( persistence_config=pw.persistence.Config.simple_config( diff --git a/src/engine/error.rs b/src/engine/error.rs index dccae2e9..d2195133 100644 --- a/src/engine/error.rs +++ b/src/engine/error.rs @@ -7,7 +7,7 @@ use std::result; use super::ColumnPath; use super::{Key, Value}; -use crate::persistence::BackendError as MetadataBackendError; +use crate::persistence::Error as PersistenceBackendError; use crate::connectors::data_storage::{ReadError, WriteError}; use crate::persistence::ExternalPersistentId; @@ -103,8 +103,8 @@ pub enum Error { #[error("this method cannot extract from key, use extract instead")] ExtractFromValueNotSupportedForKey, - #[error("persistent metadata backend failed: {0}")] - PersistentStorageError(#[from] MetadataBackendError), + #[error("persistence backend failed: {0}")] + PersistentStorageError(#[from] PersistenceBackendError), #[error(transparent)] Other(DynError), diff --git a/src/persistence/backends/file.rs b/src/persistence/backends/file.rs new file mode 100644 index 00000000..90943780 --- /dev/null +++ b/src/persistence/backends/file.rs @@ -0,0 +1,101 @@ +// Copyright © 2024 Pathway + +use log::{error, warn}; +use std::fs::File; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use futures::channel::oneshot; +use futures::channel::oneshot::Receiver as OneShotReceiver; + +use crate::fs_helpers::ensure_directory; +use crate::persistence::backends::MetadataBackend; +use crate::persistence::Error; + +const TEMPORARY_OBJECT_SUFFIX: &str = ".tmp"; + +#[derive(Debug)] +pub struct FilesystemKVStorage { + root_path: PathBuf, +} + +impl FilesystemKVStorage { + pub fn new(root_path: &Path) -> Result { + ensure_directory(root_path)?; + Ok(Self { + root_path: root_path.to_path_buf(), + }) + } + + fn write_file(temp_path: &Path, final_path: &Path, value: &[u8]) -> Result<(), Error> { + let mut output_file = File::create(temp_path)?; + output_file.write_all(value)?; + // Note: if we need Pathway to tolerate not only Pathway failures, + // but only OS crash or power loss, the below line must be uncommented. + // output_file.sync_all()?; + std::fs::rename(temp_path, final_path)?; + Ok(()) + } +} + +impl MetadataBackend for FilesystemKVStorage { + fn list_keys(&self) -> Result, Error> { + let mut keys = Vec::new(); + + for entry in std::fs::read_dir(&self.root_path)? { + if let Err(e) = entry { + error!("Error while doing the folder scan: {e}. Output may duplicate a part of previous run"); + continue; + } + let entry = entry.unwrap(); + let file_type = entry.file_type(); + match file_type { + Ok(file_type) => { + if !file_type.is_file() { + continue; + } + match entry.file_name().into_string() { + Ok(key) => { + let is_temporary = key.ends_with(TEMPORARY_OBJECT_SUFFIX); + if !is_temporary { + keys.push(key); + } + } + Err(name) => warn!("Non-Unicode file name: {name:?}"), + }; + } + Err(e) => { + error!("Couldn't detect file type for {entry:?}: {e}"); + } + } + } + + Ok(keys) + } + + fn get_value(&self, key: &str) -> Result, Error> { + Ok(std::fs::read(self.root_path.join(key))?) + } + + fn put_value(&mut self, key: &str, value: &[u8]) -> OneShotReceiver> { + let (sender, receiver) = oneshot::channel(); + + let tmp_path = self + .root_path + .join(key.to_owned() + TEMPORARY_OBJECT_SUFFIX); + let final_path = self.root_path.join(key); + let put_value_result = Self::write_file(&tmp_path, &final_path, value); + let send_result = sender.send(put_value_result); + if let Err(unsent_flush_result) = send_result { + error!( + "The receiver no longer waits for the result of this save: {unsent_flush_result:?}" + ); + } + receiver + } + + fn remove_key(&self, key: &str) -> Result<(), Error> { + std::fs::remove_file(self.root_path.join(key))?; + Ok(()) + } +} diff --git a/src/persistence/backends/mock.rs b/src/persistence/backends/mock.rs new file mode 100644 index 00000000..7ea6399c --- /dev/null +++ b/src/persistence/backends/mock.rs @@ -0,0 +1,38 @@ +// Copyright © 2024 Pathway + +use log::error; + +use futures::channel::oneshot; +use futures::channel::oneshot::Receiver as OneShotReceiver; + +use crate::persistence::backends::MetadataBackend; +use crate::persistence::Error; + +#[derive(Debug)] +#[allow(clippy::module_name_repetitions)] +pub struct MockKVStorage {} + +impl MetadataBackend for MockKVStorage { + fn list_keys(&self) -> Result, Error> { + Ok(vec![]) + } + + fn get_value(&self, _key: &str) -> Result, Error> { + unreachable!() + } + + fn put_value(&mut self, _key: &str, _value: &[u8]) -> OneShotReceiver> { + let (sender, receiver) = oneshot::channel(); + let send_result = sender.send(Ok(())); + if let Err(unsent_flush_result) = send_result { + error!( + "The receiver no longer waits for the result of this save: {unsent_flush_result:?}" + ); + } + receiver + } + + fn remove_key(&self, _key: &str) -> Result<(), Error> { + Ok(()) + } +} diff --git a/src/persistence/backends/mod.rs b/src/persistence/backends/mod.rs new file mode 100644 index 00000000..08dd7c65 --- /dev/null +++ b/src/persistence/backends/mod.rs @@ -0,0 +1,49 @@ +// Copyright © 2024 Pathway + +use std::fmt::Debug; +use std::io::Error as IoError; +use std::str::Utf8Error; + +use ::s3::error::S3Error; +use futures::channel::oneshot::Receiver as OneShotReceiver; +use serde_json::Error as JsonParseError; + +pub use file::FilesystemKVStorage; +pub use mock::MockKVStorage; +pub use s3::S3KVStorage; + +pub mod file; +pub mod mock; +pub mod s3; + +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + #[error(transparent)] + FileSystem(#[from] IoError), + + #[error(transparent)] + S3(#[from] S3Error), + + #[error(transparent)] + Utf8(#[from] Utf8Error), + + #[error("metadata entry {0:?} incorrectly formatted: {1}")] + IncorrectMetadataFormat(String, #[source] JsonParseError), +} + +/// The persistence backend can be implemented over a Key-Value +/// storage that implements the following interface. +pub trait MetadataBackend: Send + Debug { + /// List all keys present in the storage. + fn list_keys(&self) -> Result, Error>; + + /// Get the value corresponding to the `key`. + fn get_value(&self, key: &str) -> Result, Error>; + + /// Set the value corresponding to the `key` to `value`. + fn put_value(&mut self, key: &str, value: &[u8]) -> OneShotReceiver>; + + /// Remove the value corresponding to the `key`. + fn remove_key(&self, key: &str) -> Result<(), Error>; +} diff --git a/src/persistence/backends/s3.rs b/src/persistence/backends/s3.rs new file mode 100644 index 00000000..1b8752c2 --- /dev/null +++ b/src/persistence/backends/s3.rs @@ -0,0 +1,144 @@ +// Copyright © 2024 Pathway + +use log::error; +use std::mem::take; +use std::sync::mpsc; +use std::sync::mpsc::Sender; +use std::thread; + +use futures::channel::oneshot; +use futures::channel::oneshot::Receiver as OneShotReceiver; +use futures::channel::oneshot::Sender as OneShotSender; +use s3::bucket::Bucket as S3Bucket; + +use crate::deepcopy::DeepCopy; +use crate::persistence::backends::MetadataBackend; +use crate::persistence::Error; +use crate::retry::{execute_with_retries, RetryConfig}; + +const MAX_S3_RETRIES: usize = 2; + +#[derive(Debug)] +enum S3UploaderEvent { + UploadObject { + key: String, + value: Vec, + result_sender: OneShotSender>, + }, + Finish, +} + +#[derive(Debug)] +#[allow(clippy::module_name_repetitions)] +pub struct S3KVStorage { + bucket: S3Bucket, + root_path: String, + upload_event_sender: Sender, + uploader_thread: Option>, +} + +impl S3KVStorage { + pub fn new(bucket: S3Bucket, root_path: &str) -> Self { + let mut root_path_prepared = root_path.to_string(); + if !root_path.ends_with('/') { + root_path_prepared += "/"; + } + + let (upload_event_sender, upload_event_receiver) = mpsc::channel(); + let uploader_bucket = bucket.deep_copy(); + let uploader_thread = thread::Builder::new() + .name("pathway:s3_snapshot-bg-writer".to_string()) + .spawn(move || { + loop { + let event = upload_event_receiver.recv().expect("unexpected termination for s3 objects sender"); + match event { + S3UploaderEvent::UploadObject { key, value, result_sender } => { + let upload_result = execute_with_retries( + || uploader_bucket.put_object(&key, &value), + RetryConfig::default(), + MAX_S3_RETRIES, + ).map_err(Error::S3).map(|_| ()); + let send_result = result_sender.send(upload_result); + if let Err(unsent_flush_result) = send_result { + error!("The receiver no longer waits for the result of this save: {unsent_flush_result:?}"); + } + }, + S3UploaderEvent::Finish => break, + }; + } + }) + .expect("s3 uploader failed"); + + Self { + bucket, + upload_event_sender, + uploader_thread: Some(uploader_thread), + root_path: root_path_prepared, + } + } + + fn full_key_path(&self, key: &str) -> String { + self.root_path.clone() + key + } +} + +impl Drop for S3KVStorage { + fn drop(&mut self) { + self.upload_event_sender + .send(S3UploaderEvent::Finish) + .expect("failed to submit the graceful shutdown event"); + if let Some(uploader_thread) = take(&mut self.uploader_thread) { + if let Err(e) = uploader_thread.join() { + // there is no formatter for std::any::Any + error!("Failed to join s3 snapshot uploader thread: {e:?}"); + } + } + } +} + +impl MetadataBackend for S3KVStorage { + fn list_keys(&self) -> Result, Error> { + let mut keys = Vec::new(); + + let object_lists = self.bucket.list(self.root_path.clone(), None)?; + let prefix_len = self.root_path.len(); + + for list in &object_lists { + for object in &list.contents { + let key: &str = &object.key; + assert!(key.len() > self.root_path.len()); + keys.push(key[prefix_len..].to_string()); + } + } + + Ok(keys) + } + + fn get_value(&self, key: &str) -> Result, Error> { + let full_key_path = self.full_key_path(key); + let response_data = execute_with_retries( + || self.bucket.get_object(&full_key_path), // returns Err on incorrect status code because fail-on-err feature is enabled + RetryConfig::default(), + MAX_S3_RETRIES, + )?; + Ok(response_data.bytes().to_vec()) + } + + fn put_value(&mut self, key: &str, value: &[u8]) -> OneShotReceiver> { + let (sender, receiver) = oneshot::channel(); + self.upload_event_sender + .send(S3UploaderEvent::UploadObject { + key: self.full_key_path(key), + value: value.to_vec(), + result_sender: sender, + }) + .expect("chunk queue submission should not fail"); + receiver + } + + fn remove_key(&self, key: &str) -> Result<(), Error> { + let full_key_path = self.full_key_path(key); + let _ = self.bucket.delete_object(full_key_path)?; + Ok(()) + } +} diff --git a/src/persistence/config.rs b/src/persistence/config.rs index 57fc8887..0f375764 100644 --- a/src/persistence/config.rs +++ b/src/persistence/config.rs @@ -22,11 +22,11 @@ use crate::connectors::{PersistenceMode, SnapshotAccess}; use crate::deepcopy::DeepCopy; use crate::engine::{Timestamp, TotalFrontier}; use crate::fs_helpers::ensure_directory; -use crate::persistence::metadata_backends::{ +use crate::persistence::backends::{ FilesystemKVStorage, MetadataBackend, MockKVStorage, S3KVStorage, }; use crate::persistence::state::MetadataAccessor; -use crate::persistence::BackendError as MetadataBackendError; +use crate::persistence::Error as PersistenceBackendError; use crate::persistence::{PersistentId, SharedSnapshotWriter}; const STREAMS_DIRECTORY_NAME: &str = "streams"; @@ -162,7 +162,7 @@ impl PersistenceManagerConfig { } } - pub fn create_metadata_storage(&self) -> Result { + pub fn create_metadata_storage(&self) -> Result { let backend: Box = match &self.metadata_storage { MetadataStorageConfig::Filesystem(root_path) => { Box::new(FilesystemKVStorage::new(root_path)?) diff --git a/src/persistence/input_snapshot.rs b/src/persistence/input_snapshot.rs index c498c920..b38b844e 100644 --- a/src/persistence/input_snapshot.rs +++ b/src/persistence/input_snapshot.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use crate::engine::{Key, Timestamp, Value}; use crate::persistence::frontier::OffsetAntichain; -use crate::persistence::BackendError; +use crate::persistence::Error; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum Event { @@ -27,7 +27,7 @@ pub struct InputSnapshotReader {} impl InputSnapshotReader { /// This method will be called every so often to read the persisted snapshot. /// When there are no entries left, it must return `Event::Finished`. - pub fn read(&mut self) -> Result { + pub fn read(&mut self) -> Result { todo!() } @@ -50,7 +50,7 @@ pub struct InputSnapshotWriter {} impl InputSnapshotWriter { /// A non-blocking call, pushing an entry in the buffer. /// The buffer should not be flushed in the same thread. - pub fn write(&mut self, _event: &Event) -> Result<(), BackendError> { + pub fn write(&mut self, _event: &Event) -> Result<(), Error> { todo!() } @@ -59,7 +59,7 @@ impl InputSnapshotWriter { /// /// We use `futures::channel::oneshot::channel` here instead of Future/Promise /// because it uses modern Rust Futures that are also used by `async`. - pub fn flush(&mut self) -> OneShotReceiver> { + pub fn flush(&mut self) -> OneShotReceiver> { todo!() } } diff --git a/src/persistence/metadata_backends/file.rs b/src/persistence/metadata_backends/file.rs deleted file mode 100644 index b6daab24..00000000 --- a/src/persistence/metadata_backends/file.rs +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright © 2024 Pathway - -use log::{error, warn}; - -use std::path::{Path, PathBuf}; - -use crate::fs_helpers::ensure_directory; -use crate::persistence::metadata_backends::MetadataBackend; -use crate::persistence::BackendError as Error; - -#[derive(Debug)] -pub struct FilesystemKVStorage { - root_path: PathBuf, -} - -impl FilesystemKVStorage { - pub fn new(root_path: &Path) -> Result { - ensure_directory(root_path)?; - Ok(Self { - root_path: root_path.to_path_buf(), - }) - } -} - -impl MetadataBackend for FilesystemKVStorage { - fn list_keys(&self) -> Result, Error> { - let mut keys = Vec::new(); - - for entry in std::fs::read_dir(&self.root_path)? { - if let Err(e) = entry { - error!("Error while doing the folder scan: {e}. Output may duplicate a part of previous run"); - continue; - } - let entry = entry.unwrap(); - let file_type = entry.file_type(); - match file_type { - Ok(file_type) => { - if !file_type.is_file() { - continue; - } - match entry.file_name().into_string() { - Ok(key) => keys.push(key), - Err(name) => warn!("Non-Unicode file name: {name:?}"), - }; - } - Err(e) => { - error!("Couldn't detect file type for {entry:?}: {e}"); - } - } - } - - Ok(keys) - } - - fn get_value(&self, key: &str) -> Result { - Ok(std::fs::read_to_string(self.root_path.join(key))?) - } - - fn put_value(&mut self, key: &str, value: &str) -> Result<(), Error> { - std::fs::write(self.root_path.join(key), value)?; - Ok(()) - } - - fn remove_key(&self, key: &str) -> Result<(), Error> { - std::fs::remove_file(self.root_path.join(key))?; - Ok(()) - } -} diff --git a/src/persistence/metadata_backends/mock.rs b/src/persistence/metadata_backends/mock.rs deleted file mode 100644 index cf4bbd60..00000000 --- a/src/persistence/metadata_backends/mock.rs +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright © 2024 Pathway - -use crate::persistence::metadata_backends::MetadataBackend; -use crate::persistence::BackendError as Error; - -#[derive(Debug)] -#[allow(clippy::module_name_repetitions)] -pub struct MockKVStorage {} - -impl MetadataBackend for MockKVStorage { - fn list_keys(&self) -> Result, Error> { - Ok(vec![]) - } - - fn get_value(&self, _key: &str) -> Result { - unreachable!() - } - - fn put_value(&mut self, _key: &str, _value: &str) -> Result<(), Error> { - Ok(()) - } - - fn remove_key(&self, _key: &str) -> Result<(), Error> { - Ok(()) - } -} diff --git a/src/persistence/metadata_backends/mod.rs b/src/persistence/metadata_backends/mod.rs deleted file mode 100644 index 9a3ccf51..00000000 --- a/src/persistence/metadata_backends/mod.rs +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright © 2024 Pathway - -use std::fmt::Debug; - -pub use file::FilesystemKVStorage; -pub use mock::MockKVStorage; -pub use s3::S3KVStorage; - -pub mod file; -pub mod mock; -pub mod s3; -use crate::persistence::BackendError; - -/// The metadata backend works with key-value (KV) storages. -/// -/// It stores essential information such as data source types, -/// the last finalized Pathway timestamp, and more. This is used -/// during the second phase of the two-phase state commit: once the -/// snapshot is up-to-date, the relevant information about the -/// frontiers of the processed data is saved in the metadata storage. -pub trait MetadataBackend: Send + Debug { - /// List all keys present in the storage. - fn list_keys(&self) -> Result, BackendError>; - - /// Get the value corresponding to the `key`. - fn get_value(&self, key: &str) -> Result; - - /// Set the value corresponding to the `key` to `value`. - fn put_value(&mut self, key: &str, value: &str) -> Result<(), BackendError>; - - /// Remove the value corresponding to the `key`. - fn remove_key(&self, key: &str) -> Result<(), BackendError>; -} diff --git a/src/persistence/metadata_backends/s3.rs b/src/persistence/metadata_backends/s3.rs deleted file mode 100644 index 32f61f42..00000000 --- a/src/persistence/metadata_backends/s3.rs +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright © 2024 Pathway - -use s3::bucket::Bucket as S3Bucket; - -use crate::persistence::metadata_backends::MetadataBackend; -use crate::persistence::BackendError as Error; - -#[derive(Debug)] -#[allow(clippy::module_name_repetitions)] -pub struct S3KVStorage { - bucket: S3Bucket, - root_path: String, -} - -impl S3KVStorage { - pub fn new(bucket: S3Bucket, root_path: &str) -> Self { - let mut root_path_prepared = root_path.to_string(); - if !root_path.ends_with('/') { - root_path_prepared += "/"; - } - Self { - bucket, - root_path: root_path_prepared, - } - } - - fn full_key_path(&self, key: &str) -> String { - self.root_path.clone() + key - } -} - -impl MetadataBackend for S3KVStorage { - fn list_keys(&self) -> Result, Error> { - let mut keys = Vec::new(); - - let object_lists = self.bucket.list(self.root_path.clone(), None)?; - let prefix_len = self.root_path.len(); - - for list in &object_lists { - for object in &list.contents { - let key: &str = &object.key; - assert!(key.len() > self.root_path.len()); - keys.push(key[prefix_len..].to_string()); - } - } - - Ok(keys) - } - - fn get_value(&self, key: &str) -> Result { - let full_key_path = self.full_key_path(key); - let response_data = self.bucket.get_object(full_key_path)?; - Ok(response_data.to_string()?) - } - - fn put_value(&mut self, key: &str, value: &str) -> Result<(), Error> { - let full_key_path = self.full_key_path(key); - let _ = self.bucket.put_object(full_key_path, value.as_bytes())?; - Ok(()) - } - - fn remove_key(&self, key: &str) -> Result<(), Error> { - let full_key_path = self.full_key_path(key); - let _ = self.bucket.delete_object(full_key_path)?; - Ok(()) - } -} diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs index f64a57ae..ba27f48a 100644 --- a/src/persistence/mod.rs +++ b/src/persistence/mod.rs @@ -1,21 +1,14 @@ // Copyright © 2024 Pathway -use std::fmt::Debug; -use std::io::Error as IoError; -use std::str::Utf8Error; use std::sync::{Arc, Mutex}; use crate::connectors::snapshot::WriteSnapshotEvent; use xxhash_rust::xxh3::Xxh3 as Hasher; -use ::s3::error::S3Error; -use serde_json::Error as JsonParseError; - +pub mod backends; pub mod config; pub mod frontier; pub mod input_snapshot; -pub mod metadata_backends; -pub mod snapshot_backends; pub mod state; pub mod tracker; @@ -23,21 +16,7 @@ pub type PersistentId = u128; pub type ExternalPersistentId = String; pub type SharedSnapshotWriter = Arc>>; -#[derive(Debug, thiserror::Error)] -#[non_exhaustive] -pub enum BackendError { - #[error(transparent)] - FileSystem(#[from] IoError), - - #[error(transparent)] - S3(#[from] S3Error), - - #[error(transparent)] - Utf8(#[from] Utf8Error), - - #[error("metadata entry {0:?} incorrectly formatted: {1}")] - IncorrectMetadataFormat(String, #[source] JsonParseError), -} +pub use backends::Error; pub trait IntoPersistentId { fn into_persistent_id(self) -> PersistentId; diff --git a/src/persistence/snapshot_backends/mod.rs b/src/persistence/snapshot_backends/mod.rs deleted file mode 100644 index 7358b9d3..00000000 --- a/src/persistence/snapshot_backends/mod.rs +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright © 2024 Pathway - -use std::fmt::Debug; -use std::io::Read; - -use futures::channel::oneshot::Receiver as OneShotReceiver; - -use crate::persistence::BackendError; - -/// Snapshot backend operates with chunks. -/// A chunk - is a binary large object that has the name and the contents. -/// -/// Below is the interface that any backend must implement -/// to be able to store persistence chunks. -pub trait SnapshotBackend: Send + Debug { - /// Append `contents` to the end of the current chunk. - fn write(&self, contents: &[u8]) -> Result<(), BackendError>; - - /// Ensure that all data is written and available in the - /// backend storage. - fn flush(&self) -> OneShotReceiver>; - - /// Finalize writing the current chunk and name it `chunk_name`. - fn finalize_chunk(&self, chunk_name: &str) -> OneShotReceiver>; - - /// List all chunks available in the storage. - fn list_chunks(&self) -> Result, BackendError>; - - /// Read the full contents of a chunk `chunk_name`. - fn read(&self, chunk_name: &str) -> Result, BackendError>; -} diff --git a/src/persistence/state.rs b/src/persistence/state.rs index 9a25d86e..d7d166fb 100644 --- a/src/persistence/state.rs +++ b/src/persistence/state.rs @@ -11,8 +11,8 @@ use serde::{Deserialize, Serialize}; use crate::connectors::data_storage::StorageType; use crate::engine::{Timestamp, TotalFrontier}; -use crate::persistence::metadata_backends::MetadataBackend; -use crate::persistence::{BackendError as Error, PersistentId}; +use crate::persistence::backends::MetadataBackend; +use crate::persistence::{Error, PersistentId}; const EXPECTED_KEY_PARTS: usize = 3; @@ -40,7 +40,8 @@ impl StoredMetadata { } } - pub fn parse(data: &str) -> Result { + pub fn parse(bytes: &[u8]) -> Result { + let data = std::str::from_utf8(bytes)?; let result = serde_json::from_str::(data.trim_end()) .map_err(|e| Error::IncorrectMetadataFormat(data.to_string(), e))?; Ok(result) @@ -222,8 +223,12 @@ impl MetadataAccessor { pub fn save_current_state(&mut self) -> Result<(), Error> { let serialized_state = self.internal_state.serialize(); - self.backend - .put_value(&self.current_key_to_use, &serialized_state)?; + futures::executor::block_on(async { + self.backend + .put_value(&self.current_key_to_use, serialized_state.as_bytes()) + .await + .expect("unexpected future cancelling") + })?; swap(&mut self.current_key_to_use, &mut self.next_key_to_use); Ok(()) } diff --git a/src/persistence/tracker.rs b/src/persistence/tracker.rs index 278229e9..19f12cad 100644 --- a/src/persistence/tracker.rs +++ b/src/persistence/tracker.rs @@ -11,7 +11,7 @@ use crate::connectors::PersistenceMode; use crate::engine::{Timestamp, TotalFrontier}; use crate::persistence::config::{PersistenceManagerConfig, ReadersQueryPurpose}; use crate::persistence::state::MetadataAccessor; -use crate::persistence::BackendError as MetadataBackendError; +use crate::persistence::Error as MetadataBackendError; use crate::persistence::{PersistentId, SharedSnapshotWriter}; /// The main coordinator for state persistence within a worker diff --git a/tests/integration/test_file_kv.rs b/tests/integration/test_file_kv.rs index 3b4e23b0..d0460c58 100644 --- a/tests/integration/test_file_kv.rs +++ b/tests/integration/test_file_kv.rs @@ -2,8 +2,7 @@ use tempfile::tempdir; -use pathway_engine::persistence::metadata_backends::file::FilesystemKVStorage; -use pathway_engine::persistence::metadata_backends::MetadataBackend; +use pathway_engine::persistence::backends::{FilesystemKVStorage, MetadataBackend}; #[test] fn test_simple_kv_operations() -> eyre::Result<()> { @@ -13,18 +12,18 @@ fn test_simple_kv_operations() -> eyre::Result<()> { let mut storage = FilesystemKVStorage::new(test_storage_path)?; assert_eq!(storage.list_keys()?, Vec::::new()); - storage.put_value("1", "one")?; + futures::executor::block_on(async { storage.put_value("1", b"one").await.unwrap() }).unwrap(); assert_eq!(storage.list_keys()?, vec!["1"]); - storage.put_value("2", "two")?; + futures::executor::block_on(async { storage.put_value("2", b"two").await.unwrap() }).unwrap(); assert_eq!(storage.list_keys()?, vec!["1", "2"]); - assert_eq!(storage.get_value("1")?, "one"); - assert_eq!(storage.get_value("2")?, "two"); + assert_eq!(storage.get_value("1")?, b"one"); + assert_eq!(storage.get_value("2")?, b"two"); - storage.put_value("1", "three")?; + futures::executor::block_on(async { storage.put_value("1", b"three").await.unwrap() }).unwrap(); assert_eq!(storage.list_keys()?, vec!["1", "2"]); - assert_eq!(storage.get_value("1")?, "three"); + assert_eq!(storage.get_value("1")?, b"three"); Ok(()) } diff --git a/tests/integration/test_offsets_storage.rs b/tests/integration/test_offsets_storage.rs index 67e392a3..022dd4e5 100644 --- a/tests/integration/test_offsets_storage.rs +++ b/tests/integration/test_offsets_storage.rs @@ -13,8 +13,8 @@ use pathway_engine::connectors::data_storage::StorageType; use pathway_engine::connectors::{Connector, Entry, PersistenceMode}; use pathway_engine::connectors::{OffsetKey, OffsetValue}; use pathway_engine::engine::{Timestamp, TotalFrontier}; +use pathway_engine::persistence::backends::FilesystemKVStorage; use pathway_engine::persistence::frontier::OffsetAntichain; -use pathway_engine::persistence::metadata_backends::FilesystemKVStorage; use pathway_engine::persistence::state::MetadataAccessor; fn assert_frontiers_equal(