Skip to content

Commit

Permalink
ensure all workers participate in the threshold timestamp calculation…
Browse files Browse the repository at this point in the history
… (#7554)

GitOrigin-RevId: e89444a63004f299fc547c5d020de861179310a6
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Oct 30, 2024
1 parent 9e2c496 commit 543b094
Show file tree
Hide file tree
Showing 14 changed files with 280 additions and 166 deletions.
4 changes: 4 additions & 0 deletions integration_tests/wordcount/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
(10, 5.0, 10.0),
(3, 3.0, 6.0),
(3, 6.0, 9.0),
(10, 8.0, 16.0),
(10, 5.0, 15.0),
(10, 10.0, 20.0),
(10, 9.0, 18.0),
],
)
def test_integration_failure_recovery(
Expand Down
46 changes: 46 additions & 0 deletions python/pathway/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3338,3 +3338,49 @@ def test_airbyte_persistence_error_message(tmp_path_with_airbyte_config):
snapshot_access=api.SnapshotAccess.OFFSETS_ONLY,
)
)


@needs_multiprocessing_fork
def test_persistence_one_worker_has_no_committed_timestamp(tmp_path):
# This test only makes sense when there are at least two Pathway workers
input_path = tmp_path / "input.csv"
output_path = tmp_path / "output.csv"
persistent_storage_path = tmp_path / "PStorage"
data = """
k | v
1 | foo
2 | bar
3 | baz
"""
write_csv(input_path, data)

def run_identity_transformation():
G.clear()

class InputSchema(pw.Schema):
k: int = pw.column_definition(primary_key=True)
v: str

table = pw.io.csv.read(input_path, schema=InputSchema, mode="static")
pw.io.csv.write(table, output_path)
persistence_config = pw.persistence.Config(
pw.persistence.Backend.filesystem(persistent_storage_path),
)
run_all(persistence_config=persistence_config)

run_identity_transformation()
result = pd.read_csv(output_path, usecols=["k", "v"], index_col=["k"]).sort_index()
expected = pd.read_csv(input_path, usecols=["k", "v"], index_col=["k"]).sort_index()
assert result.equals(expected)

# Remove metadata saved by the worker `0`
for file in persistent_storage_path.iterdir():
if file.is_file() and file.match("*-0-[01]"):
file.unlink()

# Even though there are other workers that still have the advanced time,
# we must run the whole process again
run_identity_transformation()
result = pd.read_csv(output_path, usecols=["k", "v"], index_col=["k"]).sort_index()
expected = pd.read_csv(input_path, usecols=["k", "v"], index_col=["k"]).sort_index()
assert result.equals(expected)
11 changes: 2 additions & 9 deletions src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use crate::persistence::frontier::OffsetAntichain;
use crate::persistence::input_snapshot::{Event as SnapshotEvent, SnapshotMode};
use crate::persistence::tracker::WorkerPersistentStorage;
use crate::persistence::{ExternalPersistentId, PersistentId, SharedSnapshotWriter};
use crate::timestamp::current_unix_timestamp_ms;

use data_format::{ParseError, ParseResult, ParsedEvent, ParsedEventWithErrors, Parser};
use data_storage::{
Expand Down Expand Up @@ -118,9 +117,7 @@ impl PersistenceMode {
fn on_before_reading_snapshot(self, sender: &Sender<Entry>) {
// In case of Batch replay we need to start with AdvanceTime to set a new timestamp
if matches!(self, PersistenceMode::Batch) {
let timestamp = u64::try_from(current_unix_timestamp_ms())
.expect("number of milliseconds should fit in 64 bits");
let timestamp = Timestamp(timestamp);
let timestamp = Timestamp::new_from_current_time();
let send_res = sender.send(Entry::Snapshot(SnapshotEvent::AdvanceTime(
timestamp,
OffsetAntichain::new(),
Expand Down Expand Up @@ -205,11 +202,7 @@ impl Connector {
}

fn advance_time(&mut self, input_session: &mut dyn InputAdaptor<Timestamp>) -> Timestamp {
let new_timestamp = u64::try_from(current_unix_timestamp_ms())
.expect("number of milliseconds should fit in 64 bits");
let new_timestamp = (new_timestamp / 2) * 2; //use only even times (required by alt-neu)
let new_timestamp = Timestamp(new_timestamp);

let new_timestamp = Timestamp::new_from_current_time();
let timestamp_updated = self.current_timestamp <= new_timestamp;
if timestamp_updated {
self.current_timestamp = new_timestamp;
Expand Down
10 changes: 3 additions & 7 deletions src/engine/dataflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::persistence::input_snapshot::{Event as SnapshotEvent, SnapshotMode};
use crate::persistence::tracker::WorkerPersistentStorage;
use crate::persistence::{ExternalPersistentId, IntoPersistentId};
use crate::retry::{execute_with_retries, RetryConfig};
use crate::timestamp::current_unix_timestamp_ms;

use std::borrow::{Borrow, Cow};
use std::cell::RefCell;
Expand Down Expand Up @@ -538,10 +537,8 @@ impl ErrorLogInner {
});
if flush {
self.last_flush = Some(now);
let new_timestamp = u64::try_from(current_unix_timestamp_ms())
.expect("number of milliseconds should fit in 64 bits");
let new_timestamp = (new_timestamp / 2) * 2; //use only even times (required by alt-neu)
self.input_session.advance_to(Timestamp(new_timestamp));
let new_timestamp = Timestamp::new_from_current_time();
self.input_session.advance_to(new_timestamp);
self.input_session.flush();
}
self.last_flush.expect("last_flush should be set") + ERROR_LOG_FLUSH_PERIOD
Expand Down Expand Up @@ -3263,7 +3260,6 @@ impl<S: MaybeTotalScope<MaybeTotalTimestamp = Timestamp>> DataflowGraphInner<S>

if realtime_reader_needed || persisted_table {
let persistent_id = reader.persistent_id();
let reader_storage_type = reader.storage_type();
let persistence_mode = self
.persistence_config
.as_ref()
Expand Down Expand Up @@ -3322,7 +3318,7 @@ impl<S: MaybeTotalScope<MaybeTotalTimestamp = Timestamp>> DataflowGraphInner<S>
.unwrap()
.lock()
.unwrap()
.register_input_source(persistent_id, &reader_storage_type);
.register_input_source(persistent_id);
}
self.connector_monitors.push(state.connector_monitor);
}
Expand Down
10 changes: 10 additions & 0 deletions src/engine/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,20 @@ use super::dataflow::maybe_total::MaybeTotalTimestamp;
use super::dataflow::maybe_total::Total;
use super::dataflow::operators::time_column::Epsilon;
use super::dataflow::operators::time_column::MaxTimestamp;
use crate::timestamp::current_unix_timestamp_ms;

#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Timestamp(pub u64);

impl Timestamp {
pub fn new_from_current_time() -> Self {
let new_timestamp = u64::try_from(current_unix_timestamp_ms())
.expect("number of milliseconds should fit in 64 bits");
let new_timestamp = (new_timestamp / 2) * 2; //use only even times (required by alt-neu)
Timestamp(new_timestamp)
}
}

impl PartialOrder for Timestamp {
fn less_equal(&self, other: &Self) -> bool {
self.0 <= other.0
Expand Down
7 changes: 6 additions & 1 deletion src/persistence/backends/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ impl PersistenceBackend for S3KVStorage {
for object in &list.contents {
let key: &str = &object.key;
assert!(key.len() > self.root_path.len());
keys.push(key[prefix_len..].to_string());
let prepared_key = key[prefix_len..].to_string();
if !prepared_key.contains('/') {
// Similarly to the filesystem backend,
// we take only files in the imaginary folder
keys.push(prepared_key);
}
}
}

Expand Down
15 changes: 6 additions & 9 deletions src/persistence/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,16 @@ impl PersistenceManagerConfig {

pub fn create_metadata_storage(&self) -> Result<MetadataAccessor, PersistenceBackendError> {
let backend = self.backend.create()?;
MetadataAccessor::new(backend, self.worker_id)
MetadataAccessor::new(backend, self.worker_id, self.total_workers)
}

pub fn create_snapshot_readers(
&self,
persistent_id: PersistentId,
threshold_times: &HashMap<usize, TotalFrontier<Timestamp>>,
threshold_time: TotalFrontier<Timestamp>,
query_purpose: ReadersQueryPurpose,
) -> Result<Vec<Box<dyn ReadInputSnapshot>>, PersistenceBackendError> {
let min_threshold_time = *threshold_times
.values()
.min()
.unwrap_or(&TotalFrontier::At(Timestamp(0)));
info!("Using threshold time: {threshold_time:?}");
let mut result: Vec<Box<dyn ReadInputSnapshot>> = Vec::new();
match &self.backend {
PersistentStorageConfig::Filesystem(root_path) => {
Expand All @@ -181,7 +178,7 @@ impl PersistenceManagerConfig {
let backend = FilesystemKVStorage::new(&path)?;
let reader = InputSnapshotReader::new(
Box::new(backend),
min_threshold_time,
threshold_time,
query_purpose.truncate_at_end(),
)?;
result.push(Box::new(reader));
Expand All @@ -199,7 +196,7 @@ impl PersistenceManagerConfig {
let backend = S3KVStorage::new(bucket.deep_copy(), &path);
let reader = InputSnapshotReader::new(
Box::new(backend),
min_threshold_time,
threshold_time,
query_purpose.truncate_at_end(),
)?;
result.push(Box::new(reader));
Expand Down Expand Up @@ -236,7 +233,7 @@ impl PersistenceManagerConfig {
}
};
let snapshot_writer = InputSnapshotWriter::new(backend, snapshot_mode);
Ok(Arc::new(Mutex::new(snapshot_writer)))
Ok(Arc::new(Mutex::new(snapshot_writer?)))
}

fn snapshot_writer_path(
Expand Down
57 changes: 36 additions & 21 deletions src/persistence/input_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,25 @@ use crate::engine::{Key, Timestamp, TotalFrontier, Value};
use crate::persistence::backends::PersistenceBackend;
use crate::persistence::frontier::OffsetAntichain;
use crate::persistence::Error;
use crate::timestamp::current_unix_timestamp_ms;

const MAX_ENTRIES_PER_CHUNK: usize = 100_000;
const MAX_CHUNK_LENGTH: usize = 10_000_000;

pub type SnapshotWriterFlushFuture = OneShotReceiver<Result<(), Error>>;
type ChunkId = u64;

fn get_chunk_ids_with_backend(backend: &dyn PersistenceBackend) -> Result<Vec<ChunkId>, Error> {
let mut chunk_ids = Vec::new();
let chunk_keys = backend.list_keys()?;
for chunk_key in chunk_keys {
if let Ok(chunk_id) = chunk_key.parse() {
chunk_ids.push(chunk_id);
} else {
error!("Unparsable chunk id: {chunk_key}");
}
}
Ok(chunk_ids)
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum Event {
Expand Down Expand Up @@ -60,7 +73,7 @@ pub struct InputSnapshotReader {

reader: Option<BufReader<Cursor<Vec<u8>>>>,
last_frontier: OffsetAntichain,
times_advanced: Vec<Timestamp>,
chunk_ids: Vec<ChunkId>,
next_chunk_idx: usize,
entries_read: usize,
}
Expand Down Expand Up @@ -100,31 +113,23 @@ impl InputSnapshotReader {
threshold_time: TotalFrontier<Timestamp>,
truncate_at_end: bool,
) -> Result<Self, Error> {
let mut times_advanced = Vec::new();
let chunk_keys = backend.list_keys()?;
for chunk_key in chunk_keys {
if let Ok(parsed_time) = chunk_key.parse() {
times_advanced.push(parsed_time);
} else {
error!("Unparsable timestamp: {chunk_key}");
}
}
times_advanced.sort_unstable();
let mut chunk_ids = get_chunk_ids_with_backend(backend.as_ref())?;
chunk_ids.sort_unstable();
Ok(Self {
backend,
threshold_time,
truncate_at_end,
reader: None,
last_frontier: OffsetAntichain::new(),
times_advanced,
chunk_ids,
next_chunk_idx: 0,
entries_read: 0,
})
}

fn truncate(&mut self) -> Result<(), Error> {
if let Some(ref mut reader) = &mut self.reader {
let current_chunk_key = format!("{}", self.times_advanced[self.next_chunk_idx - 1]);
let current_chunk_key = format!("{}", self.chunk_ids[self.next_chunk_idx - 1]);
let stable_position = reader.stream_position()?;
info!("Truncate: Shrink {current_chunk_key:?} to {stable_position} bytes");

Expand All @@ -139,7 +144,7 @@ impl InputSnapshotReader {
})?;
}

for unreachable_part in &self.times_advanced[self.next_chunk_idx..] {
for unreachable_part in &self.chunk_ids[self.next_chunk_idx..] {
info!("Truncate: Remove {unreachable_part:?}");
self.backend.remove_key(&format!("{unreachable_part}"))?;
}
Expand All @@ -163,10 +168,11 @@ impl InputSnapshotReader {
},
},
None => {
if self.next_chunk_idx >= self.times_advanced.len() {
if self.next_chunk_idx >= self.chunk_ids.len() {
break;
}
let next_chunk_key = format!("{}", self.times_advanced[self.next_chunk_idx]);
let next_chunk_key = format!("{}", self.chunk_ids[self.next_chunk_idx]);
info!("Snapshot reader proceeds to the chunk {next_chunk_key} after {} snapshot entries", self.entries_read);
let contents = self.backend.get_value(&next_chunk_key)?;
let cursor = Cursor::new(contents);
self.reader = Some(BufReader::new(cursor));
Expand Down Expand Up @@ -216,17 +222,20 @@ pub struct InputSnapshotWriter {
current_chunk: Vec<u8>,
current_chunk_entries: usize,
chunk_save_futures: Vec<SnapshotWriterFlushFuture>,
next_chunk_id: ChunkId,
}

impl InputSnapshotWriter {
pub fn new(backend: Box<dyn PersistenceBackend>, mode: SnapshotMode) -> Self {
Self {
pub fn new(backend: Box<dyn PersistenceBackend>, mode: SnapshotMode) -> Result<Self, Error> {
let chunk_keys = get_chunk_ids_with_backend(backend.as_ref())?;
Ok(Self {
backend,
mode,
current_chunk: Vec::new(),
current_chunk_entries: 0,
chunk_save_futures: Vec::new(),
}
next_chunk_id: chunk_keys.iter().max().copied().unwrap_or_default() + 1,
})
}

/// A non-blocking call, pushing an entry in the buffer.
Expand Down Expand Up @@ -262,7 +271,13 @@ impl InputSnapshotWriter {
}

fn save_current_chunk(&mut self) -> SnapshotWriterFlushFuture {
let chunk_name = format!("{}", current_unix_timestamp_ms());
info!(
"Persisting a chunk of {} entries ({} bytes)",
self.current_chunk_entries,
self.current_chunk.len()
);
let chunk_name = format!("{}", self.next_chunk_id);
self.next_chunk_id += 1;
self.current_chunk_entries = 0;
self.backend
.put_value(&chunk_name, take(&mut self.current_chunk))
Expand Down
Loading

0 comments on commit 543b094

Please sign in to comment.