Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow writing to a channel #572

Merged
merged 2 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/sparrow-main/src/serve/compute_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use sparrow_instructions::ComputeStore;
use sparrow_materialize::{Materialization, MaterializationControl};
use sparrow_qfr::kaskada::sparrow::v1alpha::{flight_record_header, FlightRecordHeader};
use sparrow_runtime::execute::error::Error;
use sparrow_runtime::execute::output::Destination;
use sparrow_runtime::stores::{ObjectStoreRegistry, ObjectStoreUrl};
use tempfile::NamedTempFile;

Expand Down Expand Up @@ -301,7 +302,8 @@ fn start_materialization_impl(
let destination = request
.destination
.ok_or(Error::MissingField("destination"))?;

let destination =
Destination::try_from(destination).change_context(Error::InvalidDestination)?;
let materialization = Materialization::new(id, plan, tables, destination);
// TODO: Support lateness
// Spawns the materialization thread and begin exeution
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use error_stack::ResultExt;
use sparrow_api::kaskada::v1alpha::{ComputePlan, ComputeTable, Destination, ExecuteResponse};
use sparrow_api::kaskada::v1alpha::{ComputePlan, ComputeTable, ExecuteResponse};
use sparrow_runtime::execute::output::Destination;
use tokio_stream::Stream;

use crate::Error;
Expand Down
6 changes: 4 additions & 2 deletions crates/sparrow-runtime/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use futures::Stream;
use prost_wkt_types::Timestamp;
use sparrow_api::kaskada::v1alpha::execute_request::Limits;
use sparrow_api::kaskada::v1alpha::{
ComputePlan, ComputeTable, Destination, ExecuteRequest, ExecuteResponse, LateBoundValue,
PerEntityBehavior,
ComputePlan, ComputeTable, ExecuteRequest, ExecuteResponse, LateBoundValue, PerEntityBehavior,
};
use sparrow_arrow::scalar_value::ScalarValue;
use sparrow_compiler::{hash_compute_plan_proto, DataContext};
Expand All @@ -18,6 +17,7 @@ use tracing::Instrument;
use crate::execute::error::Error;
use crate::execute::key_hash_inverse::{KeyHashInverse, ThreadSafeKeyHashInverse};
use crate::execute::operation::OperationContext;
use crate::execute::output::Destination;
use crate::stores::ObjectStoreRegistry;
use crate::RuntimeOptions;

Expand Down Expand Up @@ -52,6 +52,8 @@ pub async fn execute(
let destination = request
.destination
.ok_or(Error::MissingField("destination"))?;
let destination =
Destination::try_from(destination).change_context(Error::InvalidDestination)?;

let changed_since_time = request.changed_since.unwrap_or(Timestamp {
seconds: 0,
Expand Down
5 changes: 3 additions & 2 deletions crates/sparrow-runtime/src/execute/compute_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::{FutureExt, Stream, TryFutureExt};
use prost_wkt_types::Timestamp;
use sparrow_api::kaskada::v1alpha::ComputeSnapshot;
use sparrow_api::kaskada::v1alpha::ComputeSnapshotConfig;
use sparrow_api::kaskada::v1alpha::{self, ExecuteResponse, LateBoundValue, PlanHash};
use sparrow_api::kaskada::v1alpha::{ExecuteResponse, LateBoundValue, PlanHash};
use sparrow_arrow::scalar_value::ScalarValue;
use sparrow_instructions::ComputeStore;
use sparrow_qfr::io::writer::FlightRecordWriter;
Expand All @@ -20,6 +20,7 @@ use tokio_stream::StreamExt;
use tracing::{error, info, info_span, Instrument};

use crate::execute::operation::{OperationContext, OperationExecutor};
use crate::execute::output::Destination;
use crate::execute::progress_reporter::{progress_stream, ProgressUpdate};
use crate::execute::spawner::ComputeTaskSpawner;
use crate::execute::Error;
Expand Down Expand Up @@ -55,7 +56,7 @@ impl ComputeExecutor {
late_bindings: &EnumMap<LateBoundValue, Option<ScalarValue>>,
runtime_options: &RuntimeOptions,
progress_updates_rx: tokio::sync::mpsc::Receiver<ProgressUpdate>,
destination: v1alpha::Destination,
destination: Destination,
stop_signal_rx: Option<tokio::sync::watch::Receiver<bool>>,
) -> error_stack::Result<Self, Error> {
let mut spawner = ComputeTaskSpawner::new();
Expand Down
2 changes: 2 additions & 0 deletions crates/sparrow-runtime/src/execute/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub enum Error {
Internal(&'static str),
#[display(fmt = "invalid operation: {_0}")]
InvalidOperation(String),
#[display(fmt = "invalid destination")]
InvalidDestination,
#[display(fmt = "failed to pre-process next input for operation")]
PreprocessNextInput,
#[display(fmt = "output '{output}' is not supported")]
Expand Down
91 changes: 61 additions & 30 deletions crates/sparrow-runtime/src/execute/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use error_stack::{FutureExt as ESFutureExt, IntoReport, Result, ResultExt};
use futures::stream::BoxStream;
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use sparrow_api::kaskada::v1alpha::destination::Destination;
use sparrow_api::kaskada::v1alpha::execute_request::Limits;
use sparrow_api::kaskada::v1alpha::{self, data_type};
use sparrow_api::kaskada::v1alpha::{data_type, ObjectStoreDestination, PulsarDestination};
use sparrow_arrow::downcast::{downcast_primitive_array, downcast_struct_array};

use crate::execute::key_hash_inverse::ThreadSafeKeyHashInverse;
Expand All @@ -19,7 +18,6 @@ use crate::execute::progress_reporter::ProgressUpdate;
use crate::Batch;

mod object_store;
mod redis;

pub mod pulsar;

Expand All @@ -28,9 +26,8 @@ pub enum Error {
Schema {
detail: String,
},
WritingToDestination {
dest_name: String,
},
#[display(fmt = "writing to destination '{_0}'")]
WritingToDestination(&'static str),
UnspecifiedDestination,
#[cfg(not(feature = "pulsar"))]
FeatureNotEnabled {
Expand All @@ -40,13 +37,49 @@ pub enum Error {

impl error_stack::Context for Error {}

/// The output destination.
///
/// TODO: Replace the protobuf destinations with pure Rust structs.
#[derive(Debug)]
pub enum Destination {
ObjectStore(ObjectStoreDestination),
#[cfg(feature = "pulsar")]
Pulsar(PulsarDestination),
Channel(tokio::sync::mpsc::Sender<RecordBatch>),
}

impl TryFrom<sparrow_api::kaskada::v1alpha::Destination> for Destination {
type Error = error_stack::Report<Error>;

fn try_from(
value: sparrow_api::kaskada::v1alpha::Destination,
) -> std::result::Result<Self, Self::Error> {
let destination = value.destination.ok_or(Error::UnspecifiedDestination)?;
match destination {
sparrow_api::kaskada::v1alpha::destination::Destination::ObjectStore(destination) => {
Ok(Destination::ObjectStore(destination))
}
#[cfg(not(feature = "pulsar"))]
Destination::Pulsar(_) => {
error_stack::bail!(Error::FeatureNotEnabled {
feature: "pulsar".to_owned()
})
}
#[cfg(feature = "pulsar")]
sparrow_api::kaskada::v1alpha::destination::Destination::Pulsar(pulsar) => {
Ok(Destination::Pulsar(pulsar))
}
}
}
}

/// Write the batches to the given output destination.
pub(super) fn write(
context: &OperationContext,
limits: Limits,
batches: BoxStream<'static, Batch>,
progress_updates_tx: tokio::sync::mpsc::Sender<ProgressUpdate>,
destination: v1alpha::Destination,
destination: Destination,
) -> error_stack::Result<impl Future<Output = Result<(), Error>> + 'static, Error> {
let sink_schema = determine_output_schema(context)?;

Expand Down Expand Up @@ -83,9 +116,6 @@ pub(super) fn write(
}
.boxed();

let destination = destination
.destination
.ok_or(Error::UnspecifiedDestination)?;
match destination {
Destination::ObjectStore(destination) => Ok(object_store::write(
context.object_stores.clone(),
Expand All @@ -94,38 +124,39 @@ pub(super) fn write(
progress_updates_tx,
batches,
)
.change_context(Error::WritingToDestination {
dest_name: "object_store".to_owned(),
})
.change_context(Error::WritingToDestination("object_store"))
.boxed()),
Destination::Redis(redis) => {
Ok(
redis::write(redis, sink_schema, progress_updates_tx, batches)
.change_context(Error::WritingToDestination {
dest_name: "redis".to_owned(),
})
.boxed(),
)
}
#[cfg(not(feature = "pulsar"))]
Destination::Pulsar(_) => {
error_stack::bail!(Error::FeatureNotEnabled {
feature: "pulsar".to_owned()
})
Destination::Channel(channel) => {
Ok(write_to_channel(batches, channel, progress_updates_tx).boxed())
}
#[cfg(feature = "pulsar")]
Destination::Pulsar(pulsar) => {
Ok(
pulsar::write(pulsar, sink_schema, progress_updates_tx, batches)
.change_context(Error::WritingToDestination {
dest_name: "pulsar".to_owned(),
})
.change_context(Error::WritingToDestination("pulsar"))
.boxed(),
)
}
}
}

async fn write_to_channel(
mut batches: BoxStream<'static, RecordBatch>,
channel: tokio::sync::mpsc::Sender<RecordBatch>,
_progress_updates_tx: tokio::sync::mpsc::Sender<ProgressUpdate>,
) -> error_stack::Result<(), Error> {
while let Some(next) = batches.next().await {
channel
.send(next)
.await
.map_err(|_e| error_stack::report!(Error::WritingToDestination("channel")))?;

// progress_updates_tx.send(ProgressUpdate::Output { num_rows })
}

Ok(())
}

/// Adds additional information to an output batch.
async fn post_process_batch(
sink_schema: &SchemaRef,
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/execute/output/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ pub(super) async fn write(
// Inform tracker of destination type
progress_updates_tx
.send(ProgressUpdate::Destination {
destination: Destination::ObjectStore(destination.clone()),
destination: Some(Destination::ObjectStore(destination.clone())),
})
.await
.into_report()
Expand Down
4 changes: 2 additions & 2 deletions crates/sparrow-runtime/src/execute/output/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ pub(super) async fn write(
// Inform tracker of output type
progress_updates_tx
.send(ProgressUpdate::Destination {
destination: destination::Destination::Pulsar(PulsarDestination {
destination: Some(destination::Destination::Pulsar(PulsarDestination {
config: Some(pulsar.clone()),
}),
})),
})
.await
.into_report()
Expand Down
21 changes: 0 additions & 21 deletions crates/sparrow-runtime/src/execute/output/redis.rs

This file was deleted.

7 changes: 2 additions & 5 deletions crates/sparrow-runtime/src/execute/progress_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct ProgressTracker {
pub(crate) enum ProgressUpdate {
/// Informs the progress tracker of the output destination.
Destination {
destination: destination::Destination,
destination: Option<destination::Destination>,
},
/// Progress update reported for each table indicating total size.
InputMetadata { total_num_rows: usize },
Expand Down Expand Up @@ -90,7 +90,7 @@ impl ProgressTracker {
fn process_update(&mut self, stats: ProgressUpdate) {
match stats {
ProgressUpdate::Destination { destination } => {
self.destination = Some(destination);
self.destination = destination;
}
ProgressUpdate::InputMetadata { total_num_rows } => {
self.progress.total_input_rows += total_num_rows as i64;
Expand Down Expand Up @@ -181,9 +181,6 @@ impl ProgressTracker {
})),
})
}
destination::Destination::Redis(_) => {
error_stack::bail!(Error::UnsupportedOutput { output: "redis" })
}
}
}
}
Expand Down
53 changes: 4 additions & 49 deletions proto/kaskada/kaskada/v1alpha/destinations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import "kaskada/kaskada/v1alpha/pulsar.proto";

// Describes the destination results are materialized to.
message Destination {
// Used by the old (unsupported / unimplemented) redis destination.
reserved 2;

oneof destination {
ObjectStoreDestination object_store = 1;
RedisDestination redis = 2;

PulsarDestination pulsar = 3;
}
}
Expand Down Expand Up @@ -39,54 +42,6 @@ message ObjectStoreDestination {
}
}

// Writes the results directly to a RedisAI instance.
//
// Uses a series of AI.TENSORSET operations
//
// The query expression's type must be a record.
// The record type must include a field named 'key'.
// The value of the 'key' field is used as the AI.TENSORSET key.
// All other fields must be numeric.
//
// See https://redis.io/topics/protocol
message RedisDestination {
// The hostname of the Redis instance.
string host_name = 1;

// The port of the Redis instance.
int32 port = 2;

// When `true`, TLS will be used to connect to Redis.
bool use_tls = 3;

// The Redis database number 0 to 15.
int32 database_number = 4;

// The password to connect to the Redis instance
string password = 5;

// An X.509 certificate to use for authenticating the server
// to connected clients, masters or cluster peers.
// The string should be PEM formatted.
string tls_cert = 6;

// An X.509 private key to use for authenticating the server
// to connected clients, masters or cluster peers.
// The string should be PEM formatted.
string tls_key = 7;

// A PEM encoded CA's certificate.
string tls_ca_cert = 8;

// InsecureSkipVerify controls whether a client verifies the
// server's certificate chain and host name.
// If this field is true, TLS accepts any certificate
// presented by the server and any host name in that certificate.
// In this mode, TLS is susceptible to man-in-the-middle attacks.
// This should be used only for testing.
bool insecure_skip_verify = 9;
}

message PulsarDestination {
PulsarConfig config = 1;
}