From 1f9dad5c2d83bbd43b9eea021f43c18879961ddd Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Thu, 18 Apr 2024 00:26:38 +0000 Subject: [PATCH] point back at main fork branch, cleanup print statements. --- Cargo.lock | 22 +++++++------- Cargo.toml | 14 ++++----- crates/arroyo-df/src/builder.rs | 4 --- crates/arroyo-df/src/extension/sink.rs | 18 ------------ crates/arroyo-df/src/lib.rs | 18 ------------ crates/arroyo-df/src/plan/mod.rs | 1 - crates/arroyo-df/src/tables.rs | 6 ++-- .../src/tables/expiring_time_key_map.rs | 29 +++---------------- .../src/arrow/updating_aggregator.rs | 10 ------- 9 files changed, 25 insertions(+), 97 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 220434259..c326cabcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2823,7 +2823,7 @@ dependencies = [ [[package]] name = "datafusion" version = "36.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=36_combine_partial#56fe262dfba8370dbc602be6ba7112a356d477e9" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=reset_execs_36#d54ac5e555c29a353563474e6650c63a8892a931" dependencies = [ "ahash 0.8.7", "arrow", @@ -2892,7 +2892,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "36.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=36_combine_partial#56fe262dfba8370dbc602be6ba7112a356d477e9" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=reset_execs_36#d54ac5e555c29a353563474e6650c63a8892a931" dependencies = [ "ahash 0.8.7", "arrow", @@ -2932,7 +2932,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "36.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=36_combine_partial#56fe262dfba8370dbc602be6ba7112a356d477e9" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=reset_execs_36#d54ac5e555c29a353563474e6650c63a8892a931" dependencies = [ "arrow", "chrono", @@ -2968,7 +2968,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "36.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=36_combine_partial#56fe262dfba8370dbc602be6ba7112a356d477e9" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=reset_execs_36#d54ac5e555c29a353563474e6650c63a8892a931" dependencies = [ "ahash 0.8.7", "arrow", @@ -2983,7 +2983,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "36.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=36_combine_partial#56fe262dfba8370dbc602be6ba7112a356d477e9" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=reset_execs_36#d54ac5e555c29a353563474e6650c63a8892a931" dependencies = [ "arrow", "base64 0.21.7", @@ -2997,7 +2997,7 @@ dependencies = [ [[package]] name = "datafusion-functions-array" version = "36.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=36_combine_partial#56fe262dfba8370dbc602be6ba7112a356d477e9" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=reset_execs_36#d54ac5e555c29a353563474e6650c63a8892a931" dependencies = [ "arrow", "datafusion-common 36.0.0", @@ -3028,7 +3028,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "36.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=36_combine_partial#56fe262dfba8370dbc602be6ba7112a356d477e9" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=reset_execs_36#d54ac5e555c29a353563474e6650c63a8892a931" dependencies = [ "arrow", "async-trait", @@ -3079,7 +3079,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "36.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=36_combine_partial#56fe262dfba8370dbc602be6ba7112a356d477e9" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=reset_execs_36#d54ac5e555c29a353563474e6650c63a8892a931" dependencies = [ "ahash 0.8.7", "arrow", @@ -3145,7 +3145,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "36.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=36_combine_partial#56fe262dfba8370dbc602be6ba7112a356d477e9" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=reset_execs_36#d54ac5e555c29a353563474e6650c63a8892a931" dependencies = [ "ahash 0.8.7", "arrow", @@ -3190,7 +3190,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "36.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=36_combine_partial#56fe262dfba8370dbc602be6ba7112a356d477e9" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=reset_execs_36#d54ac5e555c29a353563474e6650c63a8892a931" dependencies = [ "arrow", "chrono", @@ -3218,7 +3218,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "36.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=36_combine_partial#56fe262dfba8370dbc602be6ba7112a356d477e9" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=reset_execs_36#d54ac5e555c29a353563474e6650c63a8892a931" dependencies = [ "arrow", "arrow-schema", diff --git a/Cargo.toml b/Cargo.toml index 5b0428f95..54fc41fc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,10 +60,10 @@ arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '50.0 arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '50.0.0/parquet_bytes'} arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '50.0.0/json'} object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '0.9.0/put_part_api'} -datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'} -datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'} -datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'} -datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'} -datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'} -datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'} -datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '36_combine_partial'} +datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'} +datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'} +datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'} +datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'} +datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'} +datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'} +datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'reset_execs_36'} diff --git a/crates/arroyo-df/src/builder.rs b/crates/arroyo-df/src/builder.rs index fa2a595f3..b577138c4 100644 --- a/crates/arroyo-df/src/builder.rs +++ b/crates/arroyo-df/src/builder.rs @@ -27,7 +27,6 @@ use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode}; use petgraph::graph::{DiGraph, NodeIndex}; use tokio::runtime::Runtime; use tokio::sync::oneshot; -use tracing::info; use crate::extension::debezium::{DEBEZIUM_UNROLLING_EXTENSION_NAME, TO_DEBEZIUM_EXTENSION_NAME}; use crate::extension::key_calculation::KeyCalculationExtension; @@ -175,8 +174,6 @@ impl<'a> Planner<'a> { physical_plan_type: Some(PhysicalPlanType::Aggregate(final_aggregate_proto)), }; - info!("partial schema begins as :{:?}", partial_schema); - let (partial_schema, timestamp_index) = if add_timestamp_field { ( add_timestamp_field_arrow(partial_schema.clone()), @@ -371,7 +368,6 @@ impl<'a> TreeNodeVisitor for PlanToGraphVisitor<'a> { } else { vec![] }; - info!("building node: {:?}", node); self.build_extension(input_nodes, arroyo_extension) .map_err(|e| DataFusionError::Plan(format!("error building extension: {}", e)))?; diff --git a/crates/arroyo-df/src/extension/sink.rs b/crates/arroyo-df/src/extension/sink.rs index 72b672da8..675578d72 100644 --- a/crates/arroyo-df/src/extension/sink.rs +++ b/crates/arroyo-df/src/extension/sink.rs @@ -14,7 +14,6 @@ use datafusion_common::{ use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use prost::Message; -use tracing::info; use crate::{ builder::{NamedNode, Planner}, @@ -46,23 +45,6 @@ impl SinkExtension { let input_is_updating = input .schema() .has_column_with_unqualified_name(IS_RETRACT_FIELD); - info!( - "input_is_updating: {}\nschema:\n{:?}\ninput schema:\n{:?}", - input_is_updating, - schema - .fields() - .iter() - .map(|f| f.qualified_name()) - .collect::>() - .join(","), - input - .schema() - .fields() - .iter() - .map(|f| f.qualified_name()) - .collect::>() - .join(",") - ); match &table { Table::ConnectorTable(connector_table) => { match (input_is_updating, connector_table.is_updating()) { diff --git a/crates/arroyo-df/src/lib.rs b/crates/arroyo-df/src/lib.rs index 82a3bbdeb..0ec56e931 100644 --- a/crates/arroyo-df/src/lib.rs +++ b/crates/arroyo-df/src/lib.rs @@ -615,28 +615,10 @@ pub async fn parse_and_get_arrow_program( }; let plan_rewrite = plan.rewrite(&mut UnnestRewriter {})?; - info!( - "fields before rewrite: {:?}", - plan_rewrite - .schema() - .fields() - .iter() - .map(|f| f.name()) - .collect::>() - ); let plan_rewrite = plan_rewrite.rewrite(&mut ArroyoRewriter { schema_provider: &schema_provider, })?; - info!( - "fields after rewrite: {:?}", - plan_rewrite - .schema() - .fields() - .iter() - .map(|f| f.name()) - .collect::>() - ); let mut metadata = SourceMetadataVisitor::new(&schema_provider); plan_rewrite.visit(&mut metadata)?; used_connections.extend(metadata.connection_ids.iter()); diff --git a/crates/arroyo-df/src/plan/mod.rs b/crates/arroyo-df/src/plan/mod.rs index 68cbe590e..7c814a88a 100644 --- a/crates/arroyo-df/src/plan/mod.rs +++ b/crates/arroyo-df/src/plan/mod.rs @@ -255,7 +255,6 @@ impl<'a> TreeNodeRewriter for ArroyoRewriter<'a> { type N = LogicalPlan; fn mutate(&mut self, mut node: Self::N) -> DFResult { - println!("mutating node: {}", node.display()); match node { LogicalPlan::Projection(ref mut projection) => { info!( diff --git a/crates/arroyo-df/src/tables.rs b/crates/arroyo-df/src/tables.rs index a1334b26a..3261e6d53 100644 --- a/crates/arroyo-df/src/tables.rs +++ b/crates/arroyo-df/src/tables.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::sync::Arc; use std::{collections::HashMap, time::Duration}; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail, Context, Result}; use arrow_schema::{DataType, Field, FieldRef, Schema}; use arroyo_connectors::connector_for_type; @@ -697,8 +697,8 @@ fn infer_sink_schema( table_name: String, schema_provider: &mut ArroyoSchemaProvider, ) -> Result<()> { - let plan = produce_optimized_plan(&Statement::Query(source.clone()), schema_provider).unwrap(); - //.context("failed to produce optimized plan"); + let plan = produce_optimized_plan(&Statement::Query(source.clone()), schema_provider) + .context("failed to produce optimized plan")?; let table = schema_provider .get_table_mut(&table_name) .ok_or_else(|| anyhow!("table {} not found", table_name))?; diff --git a/crates/arroyo-state/src/tables/expiring_time_key_map.rs b/crates/arroyo-state/src/tables/expiring_time_key_map.rs index 7632005f5..c4289e02b 100644 --- a/crates/arroyo-state/src/tables/expiring_time_key_map.rs +++ b/crates/arroyo-state/src/tables/expiring_time_key_map.rs @@ -40,7 +40,7 @@ use crate::{ TableData, }; use arroyo_rpc::df::{ArroyoSchema, ArroyoSchemaRef}; -use tracing::{debug, warn}; +use tracing::debug; use super::{table_checkpoint_path, CompactionConfig, Table, TableEpochCheckpointer}; @@ -931,11 +931,6 @@ impl KeyTimeView { pub fn get_batch(&mut self, row: &[u8]) -> Result> { if !self.keyed_data.contains_key(row) { - warn!( - "couldn't find data for {:?}, map has {} keys", - row, - self.keyed_data.len() - ); return Ok(None); } let Some(value) = self.keyed_data.get_mut(row) else { @@ -968,26 +963,10 @@ impl KeyTimeView { data: TableData::RecordBatch(batch.clone()), }) .await?; - Ok(self - .insert_internal(batch)? - .into_iter() - .map(|(row, _)| row) - .collect()) - } - pub async fn insert_and_report_prior_presence( - &mut self, - batch: RecordBatch, - ) -> Result> { - self.state_tx - .send(StateMessage::TableData { - table: self.parent.table_name.to_string(), - data: TableData::RecordBatch(batch.clone()), - }) - .await?; - self.insert_internal(batch) + Ok(self.insert_internal(batch)?) } - fn insert_internal(&mut self, batch: RecordBatch) -> Result> { + fn insert_internal(&mut self, batch: RecordBatch) -> Result> { let sorted_batch = self.schema.sort(batch, false)?; let value_batch = sorted_batch.project(&self.value_indices)?; let mut rows = vec![]; @@ -1004,7 +983,7 @@ impl KeyTimeView { }; let key_row = self.key_converter.convert_columns(&key_columns)?; let contents = self.keyed_data.get_mut(key_row.as_ref()); - rows.push((key_row.clone(), contents.is_some())); + rows.push(key_row.clone()); let batch = match contents { Some(BatchData::BatchVec(vec)) => { vec.push(value_batch); diff --git a/crates/arroyo-worker/src/arrow/updating_aggregator.rs b/crates/arroyo-worker/src/arrow/updating_aggregator.rs index 6f032a2e0..cfe424134 100644 --- a/crates/arroyo-worker/src/arrow/updating_aggregator.rs +++ b/crates/arroyo-worker/src/arrow/updating_aggregator.rs @@ -3,7 +3,6 @@ use std::{ collections::HashMap, pin::Pin, sync::{Arc, RwLock}, - time::SystemTime, }; use anyhow::{anyhow, Result}; @@ -34,7 +33,6 @@ use prost::Message; use std::time::Duration; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_stream::StreamExt; -use tracing::info; pub struct UpdatingAggregatingFunc { partial_aggregation_plan: Arc, @@ -56,8 +54,6 @@ impl UpdatingAggregatingFunc { if self.sender.is_none() { return Ok(()); } - println!("flushing subtask {}", ctx.task_info.task_index); - let flush_start = SystemTime::now(); { self.sender.take(); } @@ -66,10 +62,6 @@ impl UpdatingAggregatingFunc { while let Some(batch) = flushing_exec.next().await { partial_batches.push(batch?); } - info!( - "finished active exec in {:?}", - flush_start.elapsed().unwrap() - ); let new_partial_batch = concat_batches(&self.state_partial_schema.schema, &partial_batches)?; let prior_partials = ctx @@ -80,7 +72,6 @@ impl UpdatingAggregatingFunc { if let Some((prior_partial_batch, _filter)) = prior_partials.get_current_matching_values(&new_partial_batch)? { - println!("found prior partial batch\n{:?}", prior_partial_batch); let combining_batches = vec![new_partial_batch, prior_partial_batch]; let combine_batch = concat_batches(&self.partial_schema.schema, &combining_batches)?; let mut combine_exec = { @@ -154,7 +145,6 @@ impl UpdatingAggregatingFunc { for batch in batches_to_write.into_iter() { ctx.collect(batch).await; } - info!("flushed in {:?}", flush_start.elapsed().unwrap()); Ok(()) }