Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add parquet source to python ffi #749

Merged
merged 19 commits into from
Sep 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Pass the object registry through
  • Loading branch information
jordanrfrazier committed Sep 15, 2023
commit cb577ec37c6070988ba4612240691a239ac7436a
2 changes: 1 addition & 1 deletion crates/sparrow-merge/src/in_memory_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl InMemoryBatches {
match recv.recv().await {
Ok((recv_version, batch)) => {
if version < recv_version {
tracing::info!("Recevied version {recv_version}");
tracing::info!("Received version {recv_version}");
yield batch;
version = recv_version;
} else {
Expand Down
16 changes: 15 additions & 1 deletion crates/sparrow-runtime/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,16 @@ pub async fn execute(
..ExecutionOptions::default()
};

let object_stores = Arc::new(ObjectStoreRegistry::default());

execute_new(
plan,
destination,
data_context,
options,
None,
HashMap::new(),
object_stores,
)
.await
}
Expand Down Expand Up @@ -243,6 +246,7 @@ pub async fn execute_new(
options: ExecutionOptions,
key_hash_inverse: Option<Arc<ThreadSafeKeyHashInverse>>,
udfs: HashMap<Uuid, Arc<dyn Udf>>,
object_stores: Arc<ObjectStoreRegistry>,
) -> error_stack::Result<impl Stream<Item = error_stack::Result<ExecuteResponse, Error>>, Error> {
let object_stores = Arc::new(ObjectStoreRegistry::default());

Expand Down Expand Up @@ -353,6 +357,16 @@ pub async fn materialize(

// TODO: Unimplemented feature - UDFs
let udfs = HashMap::new();
let object_stores = Arc::new(ObjectStoreRegistry::default());

execute_new(plan, destination, data_context, options, None, udfs).await
execute_new(
plan,
destination,
data_context,
options,
None,
udfs,
object_stores,
)
.await
}
145 changes: 131 additions & 14 deletions crates/sparrow-runtime/src/prepare/preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ use arrow::compute::SortColumn;
use arrow::datatypes::{ArrowPrimitiveType, DataType, SchemaRef, TimestampNanosecondType};
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
use error_stack::{IntoReport, ResultExt};
use error_stack::{IntoReport, IntoReportCompat, ResultExt};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use sparrow_api::kaskada::v1alpha::{PreparedFile, Source, SourceData, TableConfig};

use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl};

use super::prepared_batches;

#[derive(derive_more::Display, Debug)]
pub enum Error {
Expand All @@ -24,55 +31,165 @@ pub enum Error {
SortingBatch,
#[display(fmt = "unrecognized time unit")]
UnrecognizedTimeUnit(String),
#[display(fmt = "invalid url '{_0:?}'")]
InvalidUrl(String),
#[display(fmt = "internal error")]
Internal,
}

impl error_stack::Context for Error {}

pub struct Preparer {
prepared_schema: SchemaRef,
time_column_name: String,
subsort_column_name: Option<String>,
// time_column_name: String,
// subsort_column_name: Option<String>,
// key_column_name: String,
table_config: Arc<TableConfig>,
next_subsort: AtomicU64,
key_column_name: String,
time_multiplier: Option<i64>,
object_stores: Arc<ObjectStoreRegistry>,
}

impl Preparer {
/// Create a new prepare produce data with the given schema.
// pub fn new(
// time_column_name: String,
// subsort_column_name: Option<String>,
// key_column_name: String,
// prepared_schema: SchemaRef,
// prepare_hash: u64,
// time_unit: Option<&str>,
// object_stores: Arc<ObjectStoreRegistry>,
// ) -> error_stack::Result<Self, Error> {
// let time_multiplier = time_multiplier(time_unit)?;
// Ok(Self {
// prepared_schema,
// time_column_name,
// subsort_column_name,
// next_subsort: prepare_hash,
// key_column_name,
// time_multiplier,
// object_stores,
// })
// }

pub fn new(
time_column_name: String,
subsort_column_name: Option<String>,
key_column_name: String,
table_config: Arc<TableConfig>,
prepared_schema: SchemaRef,
prepare_hash: u64,
time_unit: Option<&str>,
object_stores: Arc<ObjectStoreRegistry>,
) -> error_stack::Result<Self, Error> {
let time_multiplier = time_multiplier(time_unit)?;
Ok(Self {
prepared_schema,
time_column_name,
subsort_column_name,
// time_column_name,
// subsort_column_name,
// key_column_name,
table_config,
next_subsort: prepare_hash.into(),
key_column_name,
time_multiplier,
object_stores,
})
}

pub fn schema(&self) -> SchemaRef {
self.prepared_schema.clone()
}

/// Prepare a parquet file.
/// todo; docs
pub async fn prepare_parquet(
&mut self,
path: &str,
) -> error_stack::Result<Vec<PreparedFile>, Error> {
// TODO:
// * Support Slicing

// TODO: What is the output url?
// in wren it is prepared/prep_<version_id>/<sliceplanhash>/file_id (a uuid)/
// file_id is persisted in wren. Don't know if that matters.
let output_path_prefix = "file://prepared/";
let output_file_prefix = "part";

let output_url = ObjectStoreUrl::from_str(output_path_prefix)
.change_context_lazy(|| Error::InvalidUrl(path.to_owned()))?;

let object_store = self
.object_stores
.object_store(&output_url)
.change_context(Error::Internal)?;

let path = PathBuf::from(path);
let source_data = SourceData {
source: Some(
SourceData::try_from_local(&path)
.into_report()
.change_context(Error::Internal)?,
),
};

let mut prepare_stream =
prepared_batches(&self.object_stores, &source_data, &self.table_config, &None)
.await
.change_context(Error::Internal)?
.enumerate();

let mut prepared_files = Vec::new();
let mut uploads = FuturesUnordered::new();
while let Some((n, next)) = prepare_stream.next().await {
let (data, metadata) = next.change_context(Error::Internal)?;

let data_url = output_url
.join(&format!("{output_file_prefix}-{n}.parquet"))
.change_context(Error::Internal)?;
let metadata_url = output_url
.join(&format!("{output_file_prefix}-{n}-metadata.parquet"))
.change_context(Error::Internal)?;

// Create the prepared file via PreparedMetadata.
// TODO: We could probably do this directly, eliminating the PreparedMetadata struct.
let prepared_file: PreparedFile = PreparedMetadata::try_from_data(
data_url.to_string(),
&data,
metadata_url.to_string(),
)
.into_report()
.change_context(Error::Internal)?
.try_into()
.change_context(Error::Internal)?;
prepared_files.push(prepared_file);

uploads.push(write_parquet(data, data_url, object_store.clone()));
uploads.push(write_parquet(metadata, metadata_url, object_store.clone()));
}

// Wait for the uploads.
while let Some(upload) = uploads.try_next().await? {
tracing::info!("Finished uploading {upload}");
}

Ok(prepared_files)
}

/// Prepare a batch of data.
///
/// - This computes and adds the key columns.
/// - This sorts the batch by time, subsort and key hash.
/// - This adds or casts columns as needed.
pub fn prepare_batch(&self, batch: RecordBatch) -> error_stack::Result<RecordBatch, Error> {
let time = get_required_column(&batch, &self.time_column_name)?;
///
/// Self is mutated as necessary to ensure the `subsort` column is increasing, if
/// it is added.
pub fn prepare_batch(&mut self, batch: RecordBatch) -> error_stack::Result<RecordBatch, Error> {
let time_column_name = self.table_config.time_column_name;
let subsort_column_name = self.table_config.subsort_column_name;
let key_column_name = self.table_config.group_column_name;

let time = get_required_column(&batch, &time_column_name)?;
let time = cast_to_timestamp(time, self.time_multiplier)?;

let num_rows = batch.num_rows();
let subsort = if let Some(subsort_column_name) = self.subsort_column_name.as_ref() {
let subsort = if let Some(subsort_column_name) = subsort_column_name.as_ref() {
let subsort = get_required_column(&batch, subsort_column_name)?;
arrow::compute::cast(time.as_ref(), &DataType::UInt64)
.into_report()
Expand All @@ -85,7 +202,7 @@ impl Preparer {
Arc::new(subsort)
};

let key = get_required_column(&batch, &self.key_column_name)?;
let key = get_required_column(&batch, &key_column_name)?;
let key_hash =
sparrow_arrow::hash::hash(key.as_ref()).change_context(Error::HashingKeyArray)?;
let key_hash: ArrayRef = Arc::new(key_hash);
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/stores/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const SINGLE_PART_UPLOAD_LIMIT_BYTES: u64 = 5_000_000;
///
/// For now, the registry exists as a cache for the clients due to the overhead
/// required to create the cache. The future goal for the registry is to
/// control the number of possibile open connections.
/// control the number of possible open connections.
#[derive(Default, Debug)]
pub struct ObjectStoreRegistry {
object_stores: DashMap<ObjectStoreKey, Arc<dyn ObjectStore>>,
Expand Down
12 changes: 12 additions & 0 deletions crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use hashbrown::HashMap;
use sparrow_runtime::stores::ObjectStoreRegistry;
use std::borrow::Cow;
use std::sync::Arc;

Expand Down Expand Up @@ -32,6 +33,7 @@ pub struct Session {
/// uuid. Once we run on multiple machines, we'll have to serialize/pickle the
/// udf as well.
udfs: HashMap<Uuid, Arc<dyn Udf>>,
object_store_registry: Arc<ObjectStoreRegistry>,
}

#[derive(Default)]
Expand Down Expand Up @@ -97,6 +99,13 @@ impl Session {
let schema_proto = sparrow_api::kaskada::v1alpha::Schema::try_from(schema.as_ref())
.into_report()
.change_context_lazy(|| Error::SchemaForTable(name.to_owned()))?;

// TODO: FRAZ: add_table
// 1. Prepare files
// 2. Pass in file_sets
// Maybe not necessary..could just say "add files" in python
// 3. Slicing?
// 4. Source??
let table = ComputeTable {
config: Some(TableConfig {
name: name.to_owned(),
Expand Down Expand Up @@ -152,6 +161,7 @@ impl Session {
expr,
queryable,
time_unit,
self.object_store_registry.clone(),
)
}

Expand Down Expand Up @@ -458,6 +468,7 @@ impl Session {
let (output_tx, output_rx) = tokio::sync::mpsc::channel(10);

let destination = Destination::Channel(output_tx);
// TODO: FRAZ - Is this empty? How is data context used in session?
let data_context = self.data_context.clone();

let (stop_signal_tx, stop_signal_rx) = tokio::sync::watch::channel(false);
Expand All @@ -483,6 +494,7 @@ impl Session {
options,
Some(key_hash_inverse),
self.udfs.clone(),
self.object_store_registry.clone(),
))
.change_context(Error::Execute)?
.map_err(|e| e.change_context(Error::Execute))
Expand Down
Loading