Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 111 additions & 10 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,8 @@
*
*/

use std::{
collections::{HashMap, HashSet},
fs::{self, File, OpenOptions, remove_file, write},
num::NonZeroU32,
path::{Path, PathBuf},
sync::{Arc, Mutex, RwLock},
time::{Instant, SystemTime, UNIX_EPOCH},
};

use arrow_array::RecordBatch;
use arrow_ipc::reader::StreamReader;
use arrow_schema::{Field, Fields, Schema};
use chrono::{NaiveDateTime, Timelike, Utc};
use derive_more::derive::{Deref, DerefMut};
Expand All @@ -41,6 +33,15 @@ use parquet::{
schema::types::ColumnPath,
};
use relative_path::RelativePathBuf;
use std::io::BufReader;
use std::{
collections::{HashMap, HashSet},
fs::{self, File, OpenOptions, remove_file, write},
num::NonZeroU32,
path::{Path, PathBuf},
sync::{Arc, Mutex, RwLock},
time::{Instant, SystemTime, UNIX_EPOCH},
};
use tokio::task::JoinSet;
use tracing::{error, info, trace, warn};
use ulid::Ulid;
Expand All @@ -61,7 +62,7 @@ use crate::{
};

use super::{
ARROW_FILE_EXTENSION, LogStream,
ARROW_FILE_EXTENSION, LogStream, PART_FILE_EXTENSION,
staging::{
StagingError,
reader::{MergedRecordReader, MergedReverseRecordReader},
Expand Down Expand Up @@ -994,12 +995,112 @@ impl Stream {
None
}

/// Recovers orphaned .part files from a previous interrupted run.
/// These are incomplete arrow files that weren't finalized before the server crashed.
/// Valid .part files are renamed to .arrows for processing, invalid ones are removed.
fn recover_orphan_part_files(&self) {
let Ok(dir) = self.data_path.read_dir() else {
return;
};

for entry in dir.flatten() {
let path = entry.path();
if path
.extension()
.is_some_and(|ext| ext == PART_FILE_EXTENSION)
{
info!(
"Found orphaned .part file: {:?} for stream {}",
path, self.stream_name
);

// Check if file is non-empty and potentially valid
match path.metadata() {
Ok(meta) if meta.len() == 0 => {
warn!(
"Removing empty orphaned .part file: {:?} for stream {}",
path, self.stream_name
);
if let Err(e) = remove_file(&path) {
error!("Failed to remove empty .part file {:?}: {e}", path);
}
continue;
}
Ok(_) => {
// Try to validate the arrow file by reading its schema
match File::open(&path) {
Ok(file) => {
match StreamReader::try_new(BufReader::new(file), None) {
Ok(_reader) => {
// File has valid schema, rename to .arrows
let mut arrow_path = path.clone();
arrow_path.set_extension(ARROW_FILE_EXTENSION);

// If arrow file with same name exists, generate a unique name
if arrow_path.exists() {
let file_name =
arrow_path.file_name().unwrap().to_string_lossy();
if let Some(date_pos) = file_name.find(".date") {
let random_suffix = ulid::Ulid::new().to_string();
let new_name = format!(
"{}{}",
random_suffix,
&file_name[date_pos..]
);
arrow_path.set_file_name(new_name);
}
}

info!(
"Recovering orphaned .part file: {:?} -> {:?} for stream {}",
path, arrow_path, self.stream_name
);
if let Err(e) = std::fs::rename(&path, &arrow_path) {
error!(
"Failed to rename .part file {:?} to {:?}: {e}",
path, arrow_path
);
}
}
Err(e) => {
// File is invalid/corrupted, remove it
warn!(
"Removing invalid/corrupted .part file: {:?} for stream {}: {e}",
path, self.stream_name
);
if let Err(e) = remove_file(&path) {
error!(
"Failed to remove invalid .part file {:?}: {e}",
path
);
}
}
}
}
Err(e) => {
error!("Failed to open .part file {:?} for validation: {e}", path);
}
}
}
Err(e) => {
warn!("Could not get metadata for .part file {:?}: {e}", path);
}
}
}
}
}

/// First flushes arrows onto disk and then converts the arrow into parquet
pub fn flush_and_convert(
&self,
init_signal: bool,
shutdown_signal: bool,
) -> Result<(), StagingError> {
// On init, recover any orphaned .part files from previous interrupted runs
if init_signal {
self.recover_orphan_part_files();
}

let start_flush = Instant::now();
// Force flush for init or shutdown signals to convert all .part files to .arrows
// For regular cycles, use false to only flush non-current writers
Expand Down
Loading