Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle larger z-order jobs with streaming output and spilling #1461

Merged
merged 6 commits into from
Jul 3, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add datafusion-based z-order
  • Loading branch information
wjones127 committed Jun 26, 2023
commit 31e50f0432a63b57516acf914731c1be0ea2d587
134 changes: 129 additions & 5 deletions rust/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow_array::cast::as_generic_binary_array;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::ArrowError;
use arrow_array::RecordBatch;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{Future, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -444,11 +442,17 @@ impl MergePlan {
/// Currently requires loading all the data into memory. This is run for each
/// partition, so it is not a problem for tables where each partition is small.
/// But for large unpartitioned tables, this could be a problem.
#[cfg(not(feature = "datafusion"))]
async fn read_zorder(
columns: Arc<Vec<String>>,
files: MergeBin,
object_store: ObjectStoreRef,
_use_threads: bool,
) -> Result<BoxStream<'static, Result<RecordBatch, ParquetError>>, DeltaTableError> {
use arrow_array::cast::as_generic_binary_array;
use arrow_array::ArrayRef;
use arrow_schema::ArrowError;

let object_store_ref = object_store.clone();
// Read all batches into a vec
let batches: Vec<RecordBatch> = futures::stream::iter(files.clone())
Expand Down Expand Up @@ -503,6 +507,71 @@ impl MergePlan {
.boxed())
}

/// Datafusion-based z-order read.
#[cfg(feature = "datafusion")]
async fn read_zorder(
columns: Arc<Vec<String>>,
files: MergeBin,
object_store: ObjectStoreRef,
_use_threads: bool,
) -> Result<BoxStream<'static, Result<RecordBatch, ParquetError>>, DeltaTableError> {
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
// TODO: usethreads
use datafusion::prelude::{col, ParquetReadOptions, SessionConfig};

// TODO: make this configurable.
// TODO: push this up and share between bins or maybe an overall runtime.
let memory_pool = FairSpillPool::new(8 * 1024 * 1024 * 1024);
let config = RuntimeConfig::new().with_memory_pool(Arc::new(memory_pool));
let runtime = Arc::new(RuntimeEnv::new(config)?);
runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little stumped here. Where should this configuration live?

It makes sense to have the memory pool as a global config, IMO. But the object store should be local to here, since it's specific to this particular table's root.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had some similar question a little while back. For a while I thought the DeltaOps struct could be a place where session state is stored, but the way we use that right now, it's even shorter lived then the DeltaTable.

I guess the only thing that "survives" is the DeltaTableState. Maybe a way to go is to have that as an optional field on theDeltaTable. Then once the first operation is performed, we populate that field in the returned table and re-use if for subsequent operations. This may also give us a way to capture user configuration for such things that would take a similar route through the code as the storage options do right now.


use datafusion::prelude::SessionContext;
use datafusion_expr::expr::ScalarUDF;
use datafusion_expr::Expr;
use url::Url;
let ctx = SessionContext::with_config_rt(SessionConfig::default(), runtime);
ctx.register_udf(zorder::datafusion::zorder_key_udf());

let locations = files
.iter()
.map(|file| format!("delta-rs:///{}", file.location))
.collect_vec();
let df = ctx
.read_parquet(locations, ParquetReadOptions::default())
.await?;

let original_columns = df
.schema()
.fields()
.iter()
.map(|f| col(f.name()))
.collect_vec();

// Add a temporary z-order column we will sort by, and then drop.
const ZORDER_KEY_COLUMN: &str = "__zorder_key";
let cols = columns.iter().map(col).collect_vec();
let expr = Expr::ScalarUDF(ScalarUDF::new(
Arc::new(zorder::datafusion::zorder_key_udf()),
cols,
));
let df = df.with_column(ZORDER_KEY_COLUMN, expr)?;

let df = df.sort(vec![col(ZORDER_KEY_COLUMN).sort(true, true)])?;
let df = df.select(original_columns)?;

let stream = df
.execute_stream()
.await?
.map_err(|err| {
ParquetError::General(format!("Z-order failed while scanning data: {:?}", err))
})
.boxed();

Ok(stream)
}

/// Perform the operations outlined in the plan.
pub async fn execute(
mut self,
Expand Down Expand Up @@ -558,12 +627,18 @@ impl MergePlan {
}
OptimizeOperations::ZOrder(zorder_columns, bins) => {
let zorder_columns = Arc::new(zorder_columns);
// If there aren't enough bins to use all threads, then instead
// use threads within the bins. This is important for the case where
// the table is un-partitioned, in which case the entire table is just
// one big bin.
let use_threads_within_bin = bins.len() <= num_cpus::get();
futures::stream::iter(bins)
.map(|(partition, files)| {
let batch_stream = Self::read_zorder(
zorder_columns.clone(),
files.clone(),
object_store.clone(),
use_threads_within_bin,
);

let object_store = object_store.clone();
Expand Down Expand Up @@ -859,17 +934,19 @@ fn build_zorder_plan(

pub(super) mod util {
use super::*;
use arrow_select::interleave::interleave;
use futures::Future;
use itertools::Itertools;
use tokio::task::JoinError;

/// Interleaves a vector of record batches based on a set of indices
#[cfg(not(feature = "datafusion"))]
pub fn interleave_batches(
batches: Vec<RecordBatch>,
indices: Vec<(usize, usize)>,
batch_size: usize,
) -> BoxStream<'static, Result<RecordBatch, DeltaTableError>> {
use arrow_schema::ArrowError;
use arrow_select::interleave::interleave;

if batches.is_empty() {
return futures::stream::empty().boxed();
}
Expand Down Expand Up @@ -924,6 +1001,53 @@ pub(super) mod zorder {
use arrow_buffer::bit_util::{get_bit_raw, set_bit_raw, unset_bit_raw};
use arrow_row::{Row, RowConverter, SortField};
use arrow_schema::ArrowError;
#[cfg(feature = "datafusion")]
pub(super) mod datafusion {
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_expr::{ColumnarValue, ScalarUDF, Signature, TypeSignature, Volatility};
use itertools::Itertools;

use super::*;

pub const ZORDER_UDF_NAME: &str = "zorder_key";

/// Get the DataFusion UDF struct for zorder_key
pub fn zorder_key_udf() -> ScalarUDF {
let signature = Signature {
type_signature: TypeSignature::VariadicAny,
volatility: Volatility::Immutable,
};
ScalarUDF {
name: ZORDER_UDF_NAME.to_string(),
signature,
return_type: Arc::new(|_| Ok(Arc::new(DataType::Binary))),
fun: Arc::new(zorder_key_datafusion),
}
}

/// Datafusion zorder UDF body
pub fn zorder_key_datafusion(
columns: &[ColumnarValue],
) -> Result<ColumnarValue, DataFusionError> {
let length = columns
.iter()
.map(|col| match col {
ColumnarValue::Array(array) => array.len(),
ColumnarValue::Scalar(_) => 1,
})
.max()
.ok_or(DataFusionError::NotImplemented(
"z-order on zero columns.".to_string(),
))?;
let columns = columns
.iter()
.map(|col| col.clone().into_array(length))
.collect_vec();
let array = zorder_key(&columns)?;
Ok(ColumnarValue::Array(array))
}
}

/// Creates a new binary array containing the zorder keys for the given columns
///
Expand Down