Skip to content

Commit

Permalink
point back at main fork branch, cleanup print statements.
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse committed Apr 18, 2024
1 parent 542d562 commit 1f9dad5
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 97 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '50.0
arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '50.0.0/parquet_bytes'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '50.0.0/json'}
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '0.9.0/put_part_api'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'}
4 changes: 0 additions & 4 deletions crates/arroyo-df/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode};
use petgraph::graph::{DiGraph, NodeIndex};
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tracing::info;

use crate::extension::debezium::{DEBEZIUM_UNROLLING_EXTENSION_NAME, TO_DEBEZIUM_EXTENSION_NAME};
use crate::extension::key_calculation::KeyCalculationExtension;
Expand Down Expand Up @@ -175,8 +174,6 @@ impl<'a> Planner<'a> {
physical_plan_type: Some(PhysicalPlanType::Aggregate(final_aggregate_proto)),
};

info!("partial schema begins as :{:?}", partial_schema);

let (partial_schema, timestamp_index) = if add_timestamp_field {
(
add_timestamp_field_arrow(partial_schema.clone()),
Expand Down Expand Up @@ -371,7 +368,6 @@ impl<'a> TreeNodeVisitor for PlanToGraphVisitor<'a> {
} else {
vec![]
};
info!("building node: {:?}", node);
self.build_extension(input_nodes, arroyo_extension)
.map_err(|e| DataFusionError::Plan(format!("error building extension: {}", e)))?;

Expand Down
18 changes: 0 additions & 18 deletions crates/arroyo-df/src/extension/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use datafusion_common::{
use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};

use prost::Message;
use tracing::info;

use crate::{
builder::{NamedNode, Planner},
Expand Down Expand Up @@ -46,23 +45,6 @@ impl SinkExtension {
let input_is_updating = input
.schema()
.has_column_with_unqualified_name(IS_RETRACT_FIELD);
info!(
"input_is_updating: {}\nschema:\n{:?}\ninput schema:\n{:?}",
input_is_updating,
schema
.fields()
.iter()
.map(|f| f.qualified_name())
.collect::<Vec<_>>()
.join(","),
input
.schema()
.fields()
.iter()
.map(|f| f.qualified_name())
.collect::<Vec<_>>()
.join(",")
);
match &table {
Table::ConnectorTable(connector_table) => {
match (input_is_updating, connector_table.is_updating()) {
Expand Down
18 changes: 0 additions & 18 deletions crates/arroyo-df/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,28 +615,10 @@ pub async fn parse_and_get_arrow_program(
};

let plan_rewrite = plan.rewrite(&mut UnnestRewriter {})?;
info!(
"fields before rewrite: {:?}",
plan_rewrite
.schema()
.fields()
.iter()
.map(|f| f.name())
.collect::<Vec<_>>()
);

let plan_rewrite = plan_rewrite.rewrite(&mut ArroyoRewriter {
schema_provider: &schema_provider,
})?;
info!(
"fields after rewrite: {:?}",
plan_rewrite
.schema()
.fields()
.iter()
.map(|f| f.name())
.collect::<Vec<_>>()
);
let mut metadata = SourceMetadataVisitor::new(&schema_provider);
plan_rewrite.visit(&mut metadata)?;
used_connections.extend(metadata.connection_ids.iter());
Expand Down
1 change: 0 additions & 1 deletion crates/arroyo-df/src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ impl<'a> TreeNodeRewriter for ArroyoRewriter<'a> {
type N = LogicalPlan;

fn mutate(&mut self, mut node: Self::N) -> DFResult<Self::N> {
println!("mutating node: {}", node.display());
match node {
LogicalPlan::Projection(ref mut projection) => {
info!(
Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-df/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};

use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail, Context, Result};
use arrow_schema::{DataType, Field, FieldRef, Schema};
use arroyo_connectors::connector_for_type;

Expand Down Expand Up @@ -697,8 +697,8 @@ fn infer_sink_schema(
table_name: String,
schema_provider: &mut ArroyoSchemaProvider,
) -> Result<()> {
let plan = produce_optimized_plan(&Statement::Query(source.clone()), schema_provider).unwrap();
//.context("failed to produce optimized plan");
let plan = produce_optimized_plan(&Statement::Query(source.clone()), schema_provider)
.context("failed to produce optimized plan")?;
let table = schema_provider
.get_table_mut(&table_name)
.ok_or_else(|| anyhow!("table {} not found", table_name))?;
Expand Down
29 changes: 4 additions & 25 deletions crates/arroyo-state/src/tables/expiring_time_key_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
TableData,
};
use arroyo_rpc::df::{ArroyoSchema, ArroyoSchemaRef};
use tracing::{debug, warn};
use tracing::debug;

use super::{table_checkpoint_path, CompactionConfig, Table, TableEpochCheckpointer};

Expand Down Expand Up @@ -931,11 +931,6 @@ impl KeyTimeView {

pub fn get_batch(&mut self, row: &[u8]) -> Result<Option<&RecordBatch>> {
if !self.keyed_data.contains_key(row) {
warn!(
"couldn't find data for {:?}, map has {} keys",
row,
self.keyed_data.len()
);
return Ok(None);
}
let Some(value) = self.keyed_data.get_mut(row) else {
Expand Down Expand Up @@ -968,26 +963,10 @@ impl KeyTimeView {
data: TableData::RecordBatch(batch.clone()),
})
.await?;
Ok(self
.insert_internal(batch)?
.into_iter()
.map(|(row, _)| row)
.collect())
}
pub async fn insert_and_report_prior_presence(
&mut self,
batch: RecordBatch,
) -> Result<Vec<(OwnedRow, bool)>> {
self.state_tx
.send(StateMessage::TableData {
table: self.parent.table_name.to_string(),
data: TableData::RecordBatch(batch.clone()),
})
.await?;
self.insert_internal(batch)
Ok(self.insert_internal(batch)?)
}

fn insert_internal(&mut self, batch: RecordBatch) -> Result<Vec<(OwnedRow, bool)>> {
fn insert_internal(&mut self, batch: RecordBatch) -> Result<Vec<OwnedRow>> {
let sorted_batch = self.schema.sort(batch, false)?;
let value_batch = sorted_batch.project(&self.value_indices)?;
let mut rows = vec![];
Expand All @@ -1004,7 +983,7 @@ impl KeyTimeView {
};
let key_row = self.key_converter.convert_columns(&key_columns)?;
let contents = self.keyed_data.get_mut(key_row.as_ref());
rows.push((key_row.clone(), contents.is_some()));
rows.push(key_row.clone());
let batch = match contents {
Some(BatchData::BatchVec(vec)) => {
vec.push(value_batch);
Expand Down
10 changes: 0 additions & 10 deletions crates/arroyo-worker/src/arrow/updating_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
collections::HashMap,
pin::Pin,
sync::{Arc, RwLock},
time::SystemTime,
};

use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -34,7 +33,6 @@ use prost::Message;
use std::time::Duration;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio_stream::StreamExt;
use tracing::info;

pub struct UpdatingAggregatingFunc {
partial_aggregation_plan: Arc<dyn ExecutionPlan>,
Expand All @@ -56,8 +54,6 @@ impl UpdatingAggregatingFunc {
if self.sender.is_none() {
return Ok(());
}
println!("flushing subtask {}", ctx.task_info.task_index);
let flush_start = SystemTime::now();
{
self.sender.take();
}
Expand All @@ -66,10 +62,6 @@ impl UpdatingAggregatingFunc {
while let Some(batch) = flushing_exec.next().await {
partial_batches.push(batch?);
}
info!(
"finished active exec in {:?}",
flush_start.elapsed().unwrap()
);
let new_partial_batch =
concat_batches(&self.state_partial_schema.schema, &partial_batches)?;
let prior_partials = ctx
Expand All @@ -80,7 +72,6 @@ impl UpdatingAggregatingFunc {
if let Some((prior_partial_batch, _filter)) =
prior_partials.get_current_matching_values(&new_partial_batch)?
{
println!("found prior partial batch\n{:?}", prior_partial_batch);
let combining_batches = vec![new_partial_batch, prior_partial_batch];
let combine_batch = concat_batches(&self.partial_schema.schema, &combining_batches)?;
let mut combine_exec = {
Expand Down Expand Up @@ -154,7 +145,6 @@ impl UpdatingAggregatingFunc {
for batch in batches_to_write.into_iter() {
ctx.collect(batch).await;
}
info!("flushed in {:?}", flush_start.elapsed().unwrap());
Ok(())
}

Expand Down

0 comments on commit 1f9dad5

Please sign in to comment.