Skip to content

feat: merge finish .arrows and convert to .parquet #1200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions src/handlers/http/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use actix_web::{
HttpResponse,
};
use http::StatusCode;
use tokio::sync::Mutex;
use tokio::{sync::Mutex, task::JoinSet};
use tracing::{error, info, warn};

use crate::parseable::PARSEABLE;

Expand Down Expand Up @@ -60,8 +61,29 @@ pub async fn shutdown() {
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
*shutdown_flag = true;

let mut joinset = JoinSet::new();

// Sync staging
PARSEABLE.flush_all_streams();
PARSEABLE.streams.flush_and_convert(&mut joinset, true);

while let Some(res) = joinset.join_next().await {
match res {
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
Err(err) => error!("Failed to join async task: {err}"),
}
}

if let Err(e) = PARSEABLE
.storage
.get_object_store()
.upload_files_from_staging()
.await
{
warn!("Failed to sync local data with object store. {:?}", e);
} else {
info!("Successfully synced all data to S3.");
}
}

pub async fn readiness() -> HttpResponse {
Expand Down
20 changes: 0 additions & 20 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,26 +136,6 @@ pub trait ParseableServer {

health_check::shutdown().await;

// Perform S3 sync and wait for completion
info!("Starting data sync to S3...");

if let Err(e) = PARSEABLE.streams.prepare_parquet(true) {
warn!("Failed to convert arrow files to parquet. {:?}", e);
} else {
info!("Successfully converted arrow files to parquet.");
}

if let Err(e) = PARSEABLE
.storage
.get_object_store()
.upload_files_from_staging()
.await
{
warn!("Failed to sync local data with object store. {:?}", e);
} else {
info!("Successfully synced all data to S3.");
}

// Initiate graceful shutdown
info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
Expand Down
11 changes: 9 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,15 @@ use reqwest::{Client, ClientBuilder};
// It is very unlikely that panic will occur when dealing with locks.
pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock";

pub const STORAGE_CONVERSION_INTERVAL: u64 = 60;
pub const STORAGE_UPLOAD_INTERVAL: u64 = 30;
/// Describes the duration at the end of which in-memory buffers are flushed,
/// arrows files are "finished" and compacted into parquet files.
pub const LOCAL_SYNC_INTERVAL: Duration = Duration::from_secs(60);

/// Duration used to configure prefix generation.
pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as u32 / 60;

/// Describes the duration at the end of which parquets are pushed into objectstore.
pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30);

// A single HTTP client for all outgoing HTTP requests from the parseable server
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
Expand Down
10 changes: 0 additions & 10 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,6 @@ impl Parseable {
.unwrap_or_default())
}

/// Writes all streams in staging onto disk, awaiting conversion into parquet.
/// Deletes all in memory recordbatches, freeing up rows in mem-writer.
pub fn flush_all_streams(&self) {
let streams = self.streams.read().unwrap();

for staging in streams.values() {
staging.flush()
}
}

// validate the storage, if the proper path for staging directory is provided
// if the proper data directory is provided, or s3 bucket is provided etc
pub async fn validate_storage(&self) -> Result<Option<Bytes>, ObjectStorageError> {
Expand Down
29 changes: 18 additions & 11 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use parquet::{
};
use rand::distributions::DistString;
use relative_path::RelativePathBuf;
use tokio::task::JoinSet;
use tracing::{error, info, trace, warn};

use crate::{
Expand All @@ -49,11 +50,9 @@ use crate::{
metadata::{LogStreamMetadata, SchemaVersion},
metrics,
option::Mode,
storage::{
object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY,
},
storage::{object_storage::to_bytes, retention::Retention, StreamType},
utils::minute_to_slot,
LOCK_EXPECT,
LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY,
};

use super::{
Expand Down Expand Up @@ -653,6 +652,13 @@ impl Stream {
pub fn get_stream_type(&self) -> StreamType {
self.metadata.read().expect(LOCK_EXPECT).stream_type
}

/// First flushes arrows onto disk and then converts the arrow into parquet
pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> {
self.flush();

self.prepare_parquet(shutdown_signal)
}
}

#[derive(Deref, DerefMut, Default)]
Expand Down Expand Up @@ -719,21 +725,22 @@ impl Streams {
.collect()
}

/// Convert arrow files into parquet, preparing it for upload
pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> {
/// Asynchronously flushes arrows and compacts into parquet data on all streams in staging,
/// so that it is ready to be pushed onto objectstore.
pub fn flush_and_convert(
&self,
joinset: &mut JoinSet<Result<(), StagingError>>,
shutdown_signal: bool,
) {
let streams: Vec<Arc<Stream>> = self
.read()
.expect(LOCK_EXPECT)
.values()
.map(Arc::clone)
.collect();
for stream in streams {
stream
.prepare_parquet(shutdown_signal)
.inspect_err(|err| error!("Failed to run conversion task {err:?}"))?;
joinset.spawn(async move { stream.flush_and_convert(shutdown_signal) });
}

Ok(())
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/query/listing_table_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ use itertools::Itertools;
use object_store::{path::Path, ObjectMeta, ObjectStore};

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY},
utils::time::TimeRange,
event::DEFAULT_TIMESTAMP_KEY, storage::ObjectStorage, utils::time::TimeRange,
OBJECT_STORE_DATA_GRANULARITY,
};

use super::PartialTimeFilter;
Expand Down
8 changes: 0 additions & 8 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,6 @@ pub const SCHEMA_FILE_NAME: &str = ".schema";
pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts";
pub const MANIFEST_FILE: &str = "manifest.json";

/// local sync interval to move data.records to /tmp dir of that stream.
/// 60 sec is a reasonable value.
pub const LOCAL_SYNC_INTERVAL: u64 = 60;

/// duration used to configure prefix in objectstore and local disk structure
/// used for storage. Defaults to 1 min.
pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60;

// max concurrent request allowed for datafusion object store
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;

Expand Down
2 changes: 1 addition & 1 deletion src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
}

async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> {
if !Path::new(&PARSEABLE.options.staging_dir()).exists() {
if !PARSEABLE.options.staging_dir().exists() {
return Ok(());
}

Expand Down
110 changes: 24 additions & 86 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ use chrono::{TimeDelta, Timelike};
use std::future::Future;
use std::panic::AssertUnwindSafe;
use tokio::sync::oneshot;
use tokio::task::JoinSet;
use tokio::time::{interval_at, sleep, Duration, Instant};
use tokio::{select, task};
use tracing::{error, info, trace, warn};

use crate::alerts::{alerts_utils, AlertConfig, AlertError};
use crate::parseable::PARSEABLE;
use crate::storage::LOCAL_SYNC_INTERVAL;
use crate::{STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL};
use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL};

// Calculates the instant that is the start of the next minute
fn next_minute() -> Instant {
Expand Down Expand Up @@ -63,7 +63,7 @@ where
if warned_once {
warn!(
"Task '{task_name}' took longer than expected: {:?} (threshold: {threshold:?})",
start_time.elapsed() - threshold
start_time.elapsed()
);
}
break res.expect("Task handle shouldn't error");
Expand All @@ -74,28 +74,22 @@ where

/// Flushes arrows onto disk every `LOCAL_SYNC_INTERVAL` seconds, packs arrows into parquet every
/// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds.
#[tokio::main(flavor = "current_thread")]
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> {
let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
let (localsync_handler, mut localsync_outbox, localsync_inbox) = local_sync();
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
object_store_sync();
let (mut remote_conversion_handler, mut remote_conversion_outbox, mut remote_conversion_inbox) =
arrow_conversion();
loop {
select! {
_ = &mut cancel_rx => {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
remote_conversion_inbox.send(()).unwrap_or(());
if let Err(e) = localsync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
error!("Error joining localsync_handler: {e:?}");
}
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
error!("Error joining remote_sync_handler: {e:?}");
}
return Ok(());
},
Expand All @@ -107,17 +101,10 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()>
_ = &mut remote_sync_outbox => {
// remote_sync failed, this is recoverable by just starting remote_sync thread again
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
error!("Error joining remote_sync_handler: {e:?}");
}
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync();
},
_ = &mut remote_conversion_outbox => {
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = arrow_conversion();
},
}
}
}
Expand All @@ -132,8 +119,7 @@ pub fn object_store_sync() -> (

let handle = task::spawn(async move {
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
let mut sync_interval =
interval_at(next_minute(), Duration::from_secs(STORAGE_UPLOAD_INTERVAL));
let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL);

let mut inbox_rx = AssertUnwindSafe(inbox_rx);

Expand Down Expand Up @@ -183,64 +169,8 @@ pub fn object_store_sync() -> (
(handle, outbox_rx, inbox_tx)
}

pub fn arrow_conversion() -> (
task::JoinHandle<()>,
oneshot::Receiver<()>,
oneshot::Sender<()>,
) {
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();

let handle = task::spawn(async move {
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
let mut sync_interval = interval_at(
next_minute() + Duration::from_secs(5), // 5 second delay
Duration::from_secs(STORAGE_CONVERSION_INTERVAL),
);

let mut inbox_rx = AssertUnwindSafe(inbox_rx);

loop {
select! {
_ = sync_interval.tick() => {
trace!("Converting Arrow to Parquet... ");
if let Err(e) = monitor_task_duration(
"arrow_conversion",
Duration::from_secs(30),
|| async { PARSEABLE.streams.prepare_parquet(false) },
).await
{
warn!("failed to convert local arrow data to parquet. {e:?}");
}
},
res = &mut inbox_rx => {match res{
Ok(_) => break,
Err(_) => {
warn!("Inbox channel closed unexpectedly");
break;
}}
}
}
}
}));

match result {
Ok(future) => {
future.await;
}
Err(panic_error) => {
error!("Panic in object store sync task: {panic_error:?}");
let _ = outbox_tx.send(());
}
}

info!("Object store sync task ended");
});

(handle, outbox_rx, inbox_tx)
}

pub fn run_local_sync() -> (
/// Flush arrows onto disk and convert them into parquet files
pub fn local_sync() -> (
task::JoinHandle<()>,
oneshot::Receiver<()>,
oneshot::Sender<()>,
Expand All @@ -253,15 +183,23 @@ pub fn run_local_sync() -> (
let mut inbox_rx = inbox_rx;

let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
let mut sync_interval =
interval_at(next_minute(), Duration::from_secs(LOCAL_SYNC_INTERVAL));
let mut sync_interval = interval_at(next_minute(), LOCAL_SYNC_INTERVAL);
let mut joinset = JoinSet::new();

loop {
select! {
// Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds
_ = sync_interval.tick() => {
trace!("Flushing Arrows to disk...");
PARSEABLE.flush_all_streams();
PARSEABLE.streams.flush_and_convert(&mut joinset, false)
},
// Joins and logs errors in spawned tasks
Some(res) = joinset.join_next(), if !joinset.is_empty() => {
match res {
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
Err(err) => error!("Issue joining flush+conversion task: {err}"),
}
}
res = &mut inbox_rx => {match res{
Ok(_) => break,
Err(_) => {
Expand All @@ -278,7 +216,7 @@ pub fn run_local_sync() -> (
future.await;
}
Err(panic_error) => {
error!("Panic in local sync task: {:?}", panic_error);
error!("Panic in local sync task: {panic_error:?}");
}
}

Expand Down
Loading