Skip to content

fix: capture time created from metadata not filename #1211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 25, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ use std::{
path::{Path, PathBuf},
process,
sync::{Arc, Mutex, RwLock},
time::{SystemTime, UNIX_EPOCH},
};

use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use arrow_schema::{Field, Fields, Schema};
use chrono::{NaiveDateTime, Timelike, Utc};
use chrono::{NaiveDateTime, Timelike};
use derive_more::{Deref, DerefMut};
use itertools::Itertools;
use parquet::{
Expand Down Expand Up @@ -72,6 +73,14 @@ const ARROW_FILE_EXTENSION: &str = "data.arrows";

pub type StreamRef = Arc<Stream>;

/// Gets the unix timestamp for the minute as described by the `SystemTime`
fn minute_from_system_time(time: SystemTime) -> u128 {
time.duration_since(UNIX_EPOCH)
.expect("Legitimate time")
.as_millis()
/ 60000
}

/// All state associated with a single logstream in Parseable.
pub struct Stream {
pub stream_name: String,
Expand Down Expand Up @@ -156,8 +165,7 @@ impl Stream {
hostname.push_str(id);
}
let filename = format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand Down Expand Up @@ -192,7 +200,7 @@ impl Stream {
/// Only includes ones starting from the previous minute
pub fn arrow_files_grouped_exclude_time(
&self,
exclude: NaiveDateTime,
exclude: SystemTime,
shutdown_signal: bool,
) -> HashMap<PathBuf, Vec<PathBuf>> {
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
Expand All @@ -202,12 +210,13 @@ impl Stream {
// don't keep the ones for the current minute
if !shutdown_signal {
arrow_files.retain(|path| {
!path
.file_name()
.unwrap()
.to_str()
.unwrap()
.starts_with(&exclude.format("%Y%m%dT%H%M").to_string())
let creation = path
.metadata()
.expect("Arrow file should exist on disk")
.created()
.expect("Creation time should be accessible");
// Compare if creation time is actually from previous minute
minute_from_system_time(creation) < minute_from_system_time(exclude)
});
}

Expand Down Expand Up @@ -429,8 +438,8 @@ impl Stream {
) -> Result<Option<Schema>, StagingError> {
let mut schemas = Vec::new();

let time = chrono::Utc::now().naive_utc();
let staging_files = self.arrow_files_grouped_exclude_time(time, shutdown_signal);
let now = SystemTime::now();
let staging_files = self.arrow_files_grouped_exclude_time(now, shutdown_signal);
if staging_files.is_empty() {
metrics::STAGING_FILES
.with_label_values(&[&self.stream_name])
Expand Down Expand Up @@ -757,7 +766,7 @@ mod tests {

use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray};
use arrow_schema::{DataType, Field, TimeUnit};
use chrono::{NaiveDate, TimeDelta};
use chrono::{NaiveDate, TimeDelta, Utc};
use temp_dir::TempDir;
use tokio::time::sleep;

Expand Down Expand Up @@ -874,8 +883,7 @@ mod tests {
);

let expected_path = staging.data_path.join(format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
"{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand Down Expand Up @@ -909,8 +917,7 @@ mod tests {
);

let expected_path = staging.data_path.join(format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand Down
Loading