Skip to content

refactor: associate commit_schema to the appropriate objects #1225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 17 additions & 33 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,17 @@
*
*/

pub mod format;
use std::{collections::HashMap, sync::Arc};

use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use itertools::Itertools;
use std::sync::Arc;

use self::error::EventError;
use crate::{
metadata::update_stats,
parseable::{StagingError, PARSEABLE},
storage::StreamType,
LOCK_EXPECT,
};
use arrow_schema::Field;
use chrono::NaiveDateTime;
use std::collections::HashMap;
use error::EventError;
use itertools::Itertools;

use crate::{metadata::update_stats, parseable::PARSEABLE, storage::StreamType};

pub mod format;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const USER_AGENT_KEY: &str = "p_user_agent";
Expand Down Expand Up @@ -67,11 +62,12 @@ impl Event {
}
}

let stream = PARSEABLE.get_or_create_stream(&self.stream_name);
if self.is_first_event {
commit_schema(&self.stream_name, self.rb.schema())?;
stream.commit_schema(self.rb.schema())?;
}

PARSEABLE.get_or_create_stream(&self.stream_name).push(
stream.push(
&key,
&self.rb,
self.parsed_timestamp,
Expand Down Expand Up @@ -117,32 +113,20 @@ pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
format!("{hash:x}")
}

pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), StagingError> {
let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned");

let map = &mut stream_metadata
.get_mut(stream_name)
.expect("map has entry for this stream name")
.metadata
.write()
.expect(LOCK_EXPECT)
.schema;
let current_schema = Schema::new(map.values().cloned().collect::<Fields>());
let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?;
map.clear();
map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone())));
Ok(())
}

pub mod error {

use crate::{parseable::StagingError, storage::ObjectStorageError};
use crate::{
parseable::{StagingError, StreamNotFound},
storage::ObjectStorageError,
};

#[derive(Debug, thiserror::Error)]
pub enum EventError {
#[error("Staging Failed: {0}")]
Staging(#[from] StagingError),
#[error("ObjectStorage Error: {0}")]
ObjectStorage(#[from] ObjectStorageError),
#[error("{0}")]
NotFound(#[from] StreamNotFound),
}
}
15 changes: 9 additions & 6 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ use tracing::error;

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;

use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::Mode;
use crate::parseable::{StreamNotFound, PARSEABLE};
Expand All @@ -45,7 +43,6 @@ use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::QueryResponse;
use crate::storage::object_storage::commit_schema_to_storage;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::time::{TimeParseError, TimeRange};
Expand Down Expand Up @@ -173,9 +170,15 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
for table in tables {
if let Ok(new_schema) = fetch_schema(table).await {
// commit schema merges the schema internally and updates the schema in storage.
commit_schema_to_storage(table, new_schema.clone()).await?;

commit_schema(table, Arc::new(new_schema))?;
PARSEABLE
.storage
.get_object_store()
.commit_schema(table, new_schema.clone())
.await?;

PARSEABLE
.get_stream(table)?
.commit_schema(Arc::new(new_schema))?;
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::{
};

use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use arrow_schema::{Field, Fields, Schema, SchemaRef};
use chrono::{NaiveDateTime, Timelike, Utc};
use derive_more::{Deref, DerefMut};
use itertools::Itertools;
Expand Down Expand Up @@ -591,6 +591,18 @@ impl Stream {
self.metadata.read().expect(LOCK_EXPECT).schema_version
}

/// Stores updated schema in-memory
pub fn commit_schema(&self, schema: SchemaRef) -> Result<(), StagingError> {
let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
let current_schema = Schema::new(metadata.schema.values().cloned().collect::<Fields>());
let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?;
metadata.schema.clear();
metadata
.schema
.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone())));
Ok(())
}

pub fn get_schema(&self) -> Arc<Schema> {
let metadata = self.metadata.read().expect(LOCK_EXPECT);

Expand Down
3 changes: 3 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ pub enum ObjectStorageError {

#[error("JoinError: {0}")]
JoinError(#[from] JoinError),

#[error("Arrow Error: {0}")]
Arrow(#[from] arrow_schema::ArrowError),
}

pub fn to_object_store_path(path: &RelativePath) -> Path {
Expand Down
21 changes: 11 additions & 10 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
for path in stream.schema_files() {
let file = File::open(&path)?;
let schema: Schema = serde_json::from_reader(file)?;
commit_schema_to_storage(&stream_name, schema).await?;
self.commit_schema(&stream_name, schema).await?;
if let Err(e) = remove_file(path) {
warn!("Failed to remove staged file: {e}");
}
Expand All @@ -878,16 +878,17 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {

Ok(())
}
}

pub async fn commit_schema_to_storage(
stream_name: &str,
schema: Schema,
) -> Result<(), ObjectStorageError> {
let storage = PARSEABLE.storage().get_object_store();
let stream_schema = storage.get_schema(stream_name).await?;
let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap();
storage.put_schema(stream_name, &new_schema).await
/// Stores updated schema in storage
async fn commit_schema(
&self,
stream_name: &str,
schema: Schema,
) -> Result<(), ObjectStorageError> {
let stream_schema = self.get_schema(stream_name).await?;
let new_schema = Schema::try_merge(vec![schema, stream_schema])?;
self.put_schema(stream_name, &new_schema).await
}
}

#[inline(always)]
Expand Down
Loading