Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: drains output channel when batching results #763

Merged
merged 7 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions crates/sparrow-runtime/src/execute/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ pub(super) fn write(
batch
};


if batch.num_rows() > max_batch_size {
for start in (0..batch.num_rows()).step_by(max_batch_size) {
let end = (start + max_batch_size).min(batch.num_rows());
Expand All @@ -121,7 +120,6 @@ pub(super) fn write(
yield post_process_batch(&sink_schema, batch, &key_hash_inverse).await;
}


if limit_rows && remaining == 0 {
break;
}
Expand Down
67 changes: 42 additions & 25 deletions crates/sparrow-session/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use futures::StreamExt;
use sparrow_api::kaskada::v1alpha::ExecuteResponse;

use crate::Error;

pub struct Execution {
/// Tokio runtme managing this execution.
rt: tokio::runtime::Runtime,
/// Tokio handle managing this execution.
handle: tokio::runtime::Handle,
/// Channel to receive output on.
output: tokio_stream::wrappers::ReceiverStream<RecordBatch>,
/// Future which resolves to the first error or None.
// Future that resolves to the first error, if one occurred.
status: Status,
/// Stop signal. Send `true` to stop execution.
stop_signal_rx: tokio::sync::watch::Sender<bool>,
Expand All @@ -27,21 +27,31 @@ enum Status {

impl Execution {
pub(super) fn new(
rt: tokio::runtime::Runtime,
handle: tokio::runtime::Handle,
output_rx: tokio::sync::mpsc::Receiver<RecordBatch>,
progress: BoxStream<'static, error_stack::Result<ExecuteResponse, Error>>,
stop_signal_rx: tokio::sync::watch::Sender<bool>,
schema: SchemaRef,
) -> Self {
let output = tokio_stream::wrappers::ReceiverStream::new(output_rx);

// Constructs a futures that resolves to the first error, if one occurred.
let status = Status::Running(Box::pin(async move {
let mut progress = progress;
while (progress.try_next().await?).is_some() {}
Ok(())
let mut errors = progress
.filter_map(|result| {
futures::future::ready(if let Err(e) = result { Some(e) } else { None })
})
.boxed();
let first_error = errors.next().await;
if let Some(first_error) = first_error {
Err(first_error)
} else {
Ok(())
}
}));

Self {
rt,
handle,
output,
status,
stop_signal_rx,
Expand All @@ -56,12 +66,12 @@ impl Execution {
/// status (and return) accordingly.
fn is_done(&mut self) -> error_stack::Result<(), Error> {
let result = match &mut self.status {
Status::Running(future) => {
Status::Running(progress) => {
// Based on the implementation of `FutureExt::now_or_never`:
let noop_waker = futures::task::noop_waker();
let mut cx = std::task::Context::from_waker(&noop_waker);

match future.as_mut().poll(&mut cx) {
match progress.as_mut().poll(&mut cx) {
std::task::Poll::Ready(x) => x,
_ => return Ok(()),
}
Expand Down Expand Up @@ -99,29 +109,36 @@ impl Execution {

pub fn next_blocking(&mut self) -> error_stack::Result<Option<RecordBatch>, Error> {
self.is_done()?;
Ok(self.rt.block_on(self.output.next()))
Ok(self.handle.block_on(self.output.next()))
}

pub async fn collect_all(self) -> error_stack::Result<Vec<RecordBatch>, Error> {
// TODO: For large outputs, we likely need to drain the output while waiting for the future.
match self.status {
Status::Running(future) => future.await?,
let progress = match self.status {
Status::Running(progress) => progress,
Status::Failed => error_stack::bail!(Error::ExecutionFailed),
_ => {}
Status::Completed => {
// If the progress channel has completed without error, we know that the output channel
// hasn't filled up, so we can go ahead and collect the output
return Ok(self.output.collect().await);
}
};

Ok(self.output.collect().await)
let output = self.output.collect::<Vec<_>>();

let (first_error, output) = futures::join!(progress, output);
if let Err(e) = first_error {
Err(e)
} else {
Ok(output)
}
}

pub fn collect_all_blocking(self) -> error_stack::Result<Vec<RecordBatch>, Error> {
// TODO: For large outputs, we likely need to drain the output while waiting for the future.
match self.status {
Status::Running(future) => self.rt.block_on(future)?,
Status::Failed => error_stack::bail!(Error::ExecutionFailed),
_ => {}
};

Ok(self.rt.block_on(self.output.collect()))
// In order to check the running status, we have to enter the runtime regardless,
// so there's no reason to check the status prior to entering the runtime
// here.
let handle = self.handle.clone();
handle.block_on(self.collect_all())
}
}

Expand Down
28 changes: 18 additions & 10 deletions crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use uuid::Uuid;
use crate::execution::Execution;
use crate::{Error, Expr, Literal, Table};

#[derive(Default)]
pub struct Session {
data_context: DataContext,
dfg: Dfg,
Expand All @@ -34,6 +33,20 @@ pub struct Session {
/// udf as well.
udfs: HashMap<Uuid, Arc<dyn Udf>>,
object_store_registry: Arc<ObjectStoreRegistry>,
rt: tokio::runtime::Runtime,
}

impl Default for Session {
fn default() -> Self {
Self {
data_context: Default::default(),
dfg: Default::default(),
key_hash_inverse: Default::default(),
udfs: Default::default(),
object_store_registry: Default::default(),
rt: tokio::runtime::Runtime::new().expect("tokio runtime"),
}
}
}

#[derive(Default)]
Expand Down Expand Up @@ -454,13 +467,6 @@ impl Session {
.into_report()
.change_context(Error::Compile)?;

// Switch to the Tokio async pool. This seems gross.
// Create the runtime.
//
// TODO: Figure out how to asynchronously pass results back to Python?
let rt = tokio::runtime::Runtime::new()
.into_report()
.change_context(Error::Execute)?;
let (output_tx, output_rx) = tokio::sync::mpsc::channel(10);
let destination = Destination::Channel(output_tx);

Expand All @@ -481,7 +487,8 @@ impl Session {
});

// Hacky. Use the existing execution logic. This weird things with downloading checkpoints, etc.
let progress = rt
let progress = self
.rt
.block_on(sparrow_runtime::execute::execute_new(
plan,
destination,
Expand All @@ -495,8 +502,9 @@ impl Session {
.map_err(|e| e.change_context(Error::Execute))
.boxed();

let handle = self.rt.handle().clone();
Ok(Execution::new(
rt,
handle,
output_rx,
progress,
stop_signal_tx,
Expand Down
4 changes: 3 additions & 1 deletion python/pysrc/kaskada/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,9 @@ def select(self, *args: str) -> Timestream:
"""
return Timestream._call("select_fields", self, *args)

def substring(self, start: Optional[int] = None, end: Optional[int] = None) -> Timestream:
def substring(
self, start: Optional[int] = None, end: Optional[int] = None
) -> Timestream:
"""Return a Timestream with a substring between the start and end indices.

Args:
Expand Down
10 changes: 9 additions & 1 deletion python/pysrc/kaskada/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,12 @@
from .source import Source


__all__ = ["Source", "CsvString", "Pandas", "JsonlFile", "JsonlString", "PyDict", "Parquet"]
__all__ = [
"Source",
"CsvString",
"Pandas",
"JsonlFile",
"JsonlString",
"PyDict",
"Parquet",
]
9 changes: 5 additions & 4 deletions python/pysrc/kaskada/sources/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@ async def create(
if schema is None:
if csv_string is None:
raise ValueError("Must provide schema or csv_string")
schema = pa.csv.read_csv(csv_string, parse_options=CsvString._parse_options).schema
schema = pa.csv.read_csv(
csv_string, parse_options=CsvString._parse_options
).schema
csv_string.seek(0)

source = CsvString(
Expand All @@ -303,7 +305,7 @@ async def add_string(self, csv_string: str | BytesIO) -> None:
content = pa.csv.read_csv(
csv_string,
convert_options=self._convert_options,
parse_options=CsvString._parse_options
parse_options=CsvString._parse_options,
)
for batch in content.to_batches():
await self._ffi_table.add_pyarrow(batch)
Expand Down Expand Up @@ -396,8 +398,7 @@ async def create(
async def add_file(self, path: str) -> None:
"""Add data to the source."""
batches = pa.json.read_json(
Source._get_absolute_path(path),
parse_options=self._parse_options
Source._get_absolute_path(path), parse_options=self._parse_options
)
for batch in batches.to_batches():
await self._ffi_table.add_pyarrow(batch)
Expand Down
2 changes: 1 addition & 1 deletion python/pysrc/kaskada/sources/source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Provide the base-class for Kaskada sources."""
from typing import Literal, Optional
import os
from typing import Literal, Optional

import kaskada._ffi as _ffi
import pyarrow as pa
Expand Down
6 changes: 1 addition & 5 deletions python/pytests/filter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ async def test_filter_to_merge_preserves_interpolation(source, golden) -> None:
predicate = n < 9
golden.jsonl(
kd.record(
{
"n": n,
"predicate": predicate,
"filter_sum": n.sum().filter(predicate)
}
{"n": n, "predicate": predicate, "filter_sum": n.sum().filter(predicate)}
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{"_time":"2022-01-04T14:38:31.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-04T14:38:31.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":499.48,"item":"0da9b3fd-2c92-4b87-92b0-5137eaf6ff75"}
{"_time":"2022-01-05T20:40:03.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-05T20:40:03.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":498.16,"item":"f9cdde05-40f9-48fd-812e-1c3936589184"}
{"_time":"2022-01-06T04:54:59.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-06T04:54:59.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":491.38,"item":"64cd0de2-02b8-4420-8b8a-57ad4d0b9aa2"}
{"_time":"2022-01-06T07:14:07.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-06T07:14:07.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":498.32,"item":"3940b205-50a6-4141-ab68-aa0464ae0f3d"}
{"_time":"2022-01-07T02:32:48.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-07T02:32:48.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":498.4,"item":"3c1f2b17-8bb0-43b9-8a52-e3d8d81fe311"}
{"_time":"2022-01-08T08:17:28.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-08T08:17:28.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":492.91,"item":"b5a8d6e1-9070-410d-bf44-72754b485faa"}
{"_time":"2022-01-09T15:12:23.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-09T15:12:23.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":493.92,"item":"9976f04f-3faf-46bd-80f6-1dc102632ec6"}
{"_time":"2022-01-10T02:11:28.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-10T02:11:28.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":495.04,"item":"ca02d3d3-a309-4b7b-ac12-29fa4a1a8704"}
{"_time":"2022-01-14T15:06:56.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-14T15:06:56.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":492.81,"item":"3940b205-50a6-4141-ab68-aa0464ae0f3d"}
{"_time":"2022-01-16T05:08:53.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-16T05:08:53.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":492.92,"item":"5a86942a-5bcc-41f7-9286-937b248caccc"}
{"_time":"2022-01-20T03:28:47.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-20T03:28:47.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":495.65,"item":"bcfd7a57-f36e-4b37-9b2d-795401f36459"}
{"_time":"2022-01-21T13:25:25.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-21T13:25:25.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":499.51,"item":"d6789f76-7ac6-415b-a2fa-8b56f80eef74"}
{"_time":"2022-01-23T06:10:21.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-23T06:10:21.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":491.33,"item":"d988eedb-2f3c-4ad5-82ab-7b1c25754ea0"}
{"_time":"2022-01-24T16:50:58.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-24T16:50:58.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":494.25,"item":"69718e27-44e6-4cb1-86ff-fc5b5d4c50a1"}
{"_time":"2022-01-26T20:56:58.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-26T20:56:58.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":492.23,"item":"87c91aeb-dba3-431e-bbda-f65f9164c64d"}
{"_time":"2022-01-26T22:57:18.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-26T22:57:18.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":491.84,"item":"804488a1-9724-465d-a596-1b6930510640"}
{"_time":"2022-01-29T08:46:35.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-29T08:46:35.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":490.18,"item":"bc323957-93e4-4aa8-8fc1-c73411e9ca0b"}
{"_time":"2022-01-29T17:21:29.000000000","_key":"5fec83d4-f5c6-4943-ab05-2b6760330daf","time":"2022-01-29T17:21:29.000000000","user":"5fec83d4-f5c6-4943-ab05-2b6760330daf","amount":491.56,"item":"ad811380-ac9c-4f6a-9015-ba2441abbff0"}
8 changes: 1 addition & 7 deletions python/pytests/len_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,4 @@ async def source() -> kd.sources.CsvString:

async def test_len(source, golden) -> None:
s = source.col("s")
golden.jsonl(
kd.record(
{
"len": s.len()
}
)
)
golden.jsonl(kd.record({"len": s.len()}))
22 changes: 22 additions & 0 deletions python/pytests/parquet_source_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,25 @@ async def test_read_parquet_with_subsort(golden) -> None:

await source.add_file("../testdata/purchases/purchases_part2.parquet")
golden.jsonl(source)


# Verifies that we drain the output and progress channels correctly.
#
# When the parquet file contains more rows than
# (CHANNEL_SIZE / MAX_BATCH_SIZE), the channels previously filled
# up, causing the sender to block. This test verifies that the
# channels correctly drain, allowing the sender to continue.
# See https://github.com/kaskada-ai/kaskada/issues/775
async def test_large_parquet_file(golden) -> None:
source = await kd.sources.Parquet.create(
"../testdata/parquet/purchases_100k.parquet",
time_column="time",
key_column="user",
)
user = source.col("user")
amount = source.col("amount")

# Add a filter to reduce the output file size while ensuring the entire
# file is still processed
predicate = user.eq("5fec83d4-f5c6-4943-ab05-2b6760330daf").and_(amount.gt(490))
golden.jsonl(source.filter(predicate))
Binary file added testdata/parquet/purchases_100k.parquet
Binary file not shown.