Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge 0.10-dev branch into master #561

Merged
merged 70 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
9fb225b
initial changes for migrating to RecordBatch/DataFusion compute.
jacksonrnewhouse Dec 19, 2023
bc23d5b
update docker with arroyo-sql -> arroyo-df
jacksonrnewhouse Dec 21, 2023
282ee2e
bump to datafusion 34.0.0 and start on arroyo-df refactoring.
jacksonrnewhouse Dec 21, 2023
fa0e5fa
switch tumbling aggregator to be a two-parter.
jacksonrnewhouse Dec 22, 2023
a559f35
Arrow dataflow (#471)
mwylde Dec 27, 2023
dd67236
call on_close at the end of run_behavior()
jacksonrnewhouse Dec 27, 2023
a2de4a5
Arrow networking (#473)
mwylde Dec 29, 2023
63c6da5
Raw string format + sse and kafka sources (#474)
mwylde Jan 2, 2024
0ead44e
Arrow json decoding (#477)
mwylde Jan 3, 2024
b3a786d
Record batch table state (#475)
jacksonrnewhouse Jan 9, 2024
10b5127
Add avro support for arrow deserializer (#481)
mwylde Jan 9, 2024
c39d948
operator / source trait redesign (#482)
mwylde Jan 9, 2024
dd63e62
implement smoke tests for record batching. (#485)
jacksonrnewhouse Jan 13, 2024
0011bb4
don't add _timestamp field to ArroyoSchemaProvider schemas, as they a…
jacksonrnewhouse Jan 18, 2024
b3dfc59
windowing and state work. (#497)
jacksonrnewhouse Jan 18, 2024
8a5b869
Add methods in operator trait to allow polling arbitrary futures (#501)
mwylde Jan 19, 2024
e81c92d
Json serialization and kafka sink (#500)
mwylde Jan 19, 2024
124437e
use a better method for global tables so you don't need to repeat the…
jacksonrnewhouse Jan 19, 2024
900800f
Merge master
mwylde Jan 19, 2024
6e7ae2f
Merge pull request #503 from ArroyoSystems/update_dev
mwylde Jan 19, 2024
cde586b
switch github action cache key for record batch work. (#504)
jacksonrnewhouse Jan 19, 2024
f0b9cba
Clean up dev branch (#505)
mwylde Jan 19, 2024
c381cec
Increment version to 0.10.0-dev
mwylde Jan 20, 2024
bb31db2
Migrate json_schema to arrow and remove old sql type system (#509)
mwylde Jan 23, 2024
c2b6986
Record batch session windows (#508)
jacksonrnewhouse Jan 23, 2024
2d0bc10
Rewrite table scans as projections
jbeisen Jan 26, 2024
236d014
Generate watermarks and set event time using expressions (#511)
jbeisen Jan 26, 2024
2c86e00
Support non-windowed inner joins. (#510)
jacksonrnewhouse Jan 26, 2024
ff48816
Remove build cache, as saving/restoring it is slower than just recrea…
mwylde Jan 26, 2024
d2e6cea
Connector migrations and redesign (#512)
mwylde Jan 26, 2024
e954733
bump datafusion to 35. (#516)
jacksonrnewhouse Jan 31, 2024
b6928e9
Add a new binary crate 'arroyo-bin' to package the entire system (#514)
mwylde Jan 31, 2024
01ab966
implement compaction for record batches. (#515)
jacksonrnewhouse Jan 31, 2024
306f3c1
Maintain projections in source rewriter
jbeisen Jan 31, 2024
58085e2
arroyo-df: nested aggregations (#517)
jacksonrnewhouse Feb 1, 2024
783ef47
refactor filesystem source. (#519)
jacksonrnewhouse Feb 2, 2024
c0c1151
Implement unnest in arrow land (#521)
mwylde Feb 2, 2024
967fe2d
remove old code. (#522)
jacksonrnewhouse Feb 2, 2024
966bbc9
Add a new embedded scheduler to run workers within the controller pro…
mwylde Feb 8, 2024
2091c2f
Add JSON functions to datafusion branch (#525)
mwylde Feb 8, 2024
1758a1b
change handle_watermark() method to return Watermark to broadcast. (#…
jacksonrnewhouse Feb 9, 2024
7968a14
Load dynamically linked UDF libraries at runtime
jbeisen Feb 16, 2024
ad0bba4
Filesystem Sink + Two Phase Commits (#524)
jacksonrnewhouse Feb 16, 2024
86d2d93
Fix panic when converting from ArrowProgram to LogicalProgram
jbeisen Feb 16, 2024
247fd2a
Migrate docker and kubernetes to arrow (#529)
mwylde Feb 16, 2024
5c60a21
Migrate nexmark (#530)
mwylde Feb 16, 2024
9f136f2
rework SourceRewriter and TimestampRewriter to preserve table scan al…
jacksonrnewhouse Feb 16, 2024
2ac9d6f
Use hash of UDF definition for saving libraries
jbeisen Feb 19, 2024
9993895
Windowed Joins (#533)
jacksonrnewhouse Feb 21, 2024
7edcfd8
working smoketest (#535)
jacksonrnewhouse Feb 21, 2024
af8538a
Add UDF compilation support to docker images (#538)
mwylde Feb 22, 2024
c8c3c1d
Support vector arguments in UDFs (#540)
jbeisen Feb 27, 2024
fa96654
Avro and raw_string serialization (#541)
mwylde Feb 27, 2024
5b28c66
Update smoke tests (#544)
mwylde Feb 27, 2024
d3321bb
Overhaul DataFrame plannning to use DataFusion Extensions. (#542)
jacksonrnewhouse Feb 28, 2024
1d7ef7b
Implement views for datafusion (#545)
jacksonrnewhouse Feb 29, 2024
b6e582d
Use table name for virtual field qualifier. (#546)
jacksonrnewhouse Feb 29, 2024
5c676c3
Add more SQL plan tests (#547)
mwylde Feb 29, 2024
7b3bb0d
Standardize Arroyo's use of DataFusion's extensions on ArroyoExtensio…
jacksonrnewhouse Mar 1, 2024
e0e0851
Make key_indices optional to distinguish from an empty group by and n…
jacksonrnewhouse Mar 4, 2024
254f8a3
Bump DataFusion to 36.0 (#551)
jacksonrnewhouse Mar 5, 2024
fcdfda5
Report correct metrics for record batches (#554)
mwylde Mar 8, 2024
3a6eeeb
use a fork of datafusion that allows for cheapily reseting execution …
jacksonrnewhouse Mar 8, 2024
7fc492a
Fix json decoding when BATCH_SIZE is greater than 1024 (#555)
mwylde Mar 8, 2024
c0b24c6
Merge master
mwylde Mar 11, 2024
b38747e
Pin openapi version manager (#559)
mwylde Mar 12, 2024
905d5d4
Merge branch 'merge_master_3_11' into devy
mwylde Mar 12, 2024
1e3ec85
Queue based on batch size, rather than batch count (#556)
mwylde Mar 12, 2024
5acfac1
Update docker tag for dev (#557)
mwylde Mar 12, 2024
ad10163
arroyo-sql: Implement better window checks, allow calling window func…
jacksonrnewhouse Mar 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Avro and raw_string serialization (#541)
  • Loading branch information
mwylde authored Feb 27, 2024
commit fa96654b4db7d608a3902a837da6aeab7aabe0f5
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use tracing::warn;
use arroyo_connectors::confluent::ConfluentProfile;
use arroyo_connectors::connector_for_type;
use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry};
use arroyo_formats::avro;
use arroyo_formats::json_schema::to_arrow;
use arroyo_formats::avro::schema;
use arroyo_formats::avro::schema::to_arrow;
use arroyo_operator::connector::ErasedConnector;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionTable, ConnectionTablePost, ConnectionType,
Expand Down Expand Up @@ -526,7 +526,7 @@ async fn expand_avro_schema(
);
}

let fields: Result<_, String> = avro::to_arrow(&name, &definition)
let fields: Result<_, String> = schema::to_arrow(&name, &definition)
.map_err(|e| bad_request(format!("Invalid avro schema: {}", e)))?
.fields
.into_iter()
Expand Down
52 changes: 15 additions & 37 deletions crates/arroyo-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ use arroyo_rpc::api_types::{
};
use arroyo_rpc::grpc;
use arroyo_rpc::grpc::api::{
CreateJobReq, JobStatus, OperatorCheckpointDetail, TaskCheckpointDetail,
TaskCheckpointEventType,
CreateJobReq, OperatorCheckpointDetail, TaskCheckpointDetail, TaskCheckpointEventType,
};
use arroyo_rpc::grpc::controller_grpc_client::ControllerGrpcClient;
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use axum::extract::{Path, Query, State};
use axum::response::sse::{Event, Sse};
use axum::Json;
use cornucopia_async::GenericClient;
use cornucopia_async::Params;
use deadpool_postgres::Transaction;
use futures_util::stream::Stream;
Expand Down Expand Up @@ -63,10 +61,20 @@ pub(crate) async fn create_job<'a>(
));
}

let running_jobs = get_job_statuses(&auth, client)
.await?
let running_jobs = api_queries::get_jobs()
.bind(client, &auth.organization_id)
.all()
.await
.map_err(log_and_map)?
.iter()
.filter(|j| j.running_desired && j.state != "Failed" && j.state != "Finished")
.filter(|j| {
j.stop == public::StopMode::none
&& !j
.state
.as_ref()
.map(|s| s == "Failed" || s == "Finished")
.unwrap_or(false)
})
.count();

if running_jobs > auth.org_metadata.max_running_jobs as usize {
Expand Down Expand Up @@ -111,36 +119,6 @@ pub(crate) async fn create_job<'a>(
Ok(job_id)
}

pub(crate) async fn get_job_statuses(
auth: &AuthData,
client: &impl GenericClient,
) -> Result<Vec<JobStatus>, ErrorResp> {
let res = api_queries::get_jobs()
.bind(client, &auth.organization_id)
.all()
.await
.map_err(log_and_map)?;

res.into_iter()
.map(|rec| {
Ok(JobStatus {
job_id: rec.id,
pipeline_name: rec.pipeline_name,
running_desired: rec.stop == public::StopMode::none,
state: rec.state.unwrap_or_else(|| "Created".to_string()),
run_id: rec.run_id.unwrap_or(0) as u64,
start_time: rec.start_time.map(to_micros),
finish_time: rec.finish_time.map(to_micros),
tasks: rec.tasks.map(|t| t as u64),
definition: rec.textual_repr,
udfs: serde_json::from_value(rec.udfs).map_err(log_and_map)?,
pipeline_id: format!("{}", rec.pipeline_id),
failure_message: rec.failure_message,
})
})
.collect()
}

pub(crate) fn get_action(state: &str, running_desired: &bool) -> (String, Option<StopType>, bool) {
enum Progress {
InProgress,
Expand Down Expand Up @@ -483,7 +461,7 @@ pub async fn get_job_output(
.graph
.nodes
.iter()
.any(|n| n.operator.contains("PreviewSink"))
.any(|n| n.operator.contains("preview"))
{
// TODO: make this check more robust
return Err(bad_request("Job does not have a preview sink".to_string()));
Expand Down
65 changes: 36 additions & 29 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,30 @@ use cornucopia_async::{GenericClient, Params};
use deadpool_postgres::{Object, Transaction};
use http::StatusCode;

use petgraph::Direction;
use petgraph::{Direction, EdgeDirection};
use std::collections::HashMap;
use std::env;

use petgraph::visit::NodeRef;
use std::time::Duration;

use crate::{compiler_service, connection_profiles, jobs, pipelines, types};
use arroyo_datastream::ConnectorOp;
use arroyo_datastream::preview_sink;
use arroyo_rpc::api_types::pipelines::{
Job, Pipeline, PipelinePatch, PipelinePost, PipelineRestart, QueryValidationResult, StopType,
ValidateQueryPost,
};
use arroyo_rpc::api_types::udfs::{GlobalUdf, Udf};
use arroyo_rpc::api_types::{JobCollection, PaginationQueryParams, PipelineCollection};
use arroyo_rpc::grpc::api as api_proto;
use arroyo_rpc::grpc::api::{
create_pipeline_req, ArrowProgram, CreateJobReq, CreatePipelineReq, CreateSqlJob,
create_pipeline_req, ArrowProgram, ConnectorOp, CreateJobReq, CreatePipelineReq, CreateSqlJob,
};
use arroyo_rpc::grpc::{api as api_proto, api};

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry};
use arroyo_datastream::logical::{LogicalProgram, OperatorName};
use arroyo_df::{has_duplicate_udf_names, ArroyoSchemaProvider, CompiledSql, ParsedUdf, SqlConfig};
use arroyo_formats::avro::arrow_to_avro_schema;
use arroyo_formats::json::arrow_to_json_schema;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_rpc::formats::Format;
use arroyo_rpc::grpc::compiler_grpc_client::CompilerGrpcClient;
use arroyo_rpc::public_ids::{generate_id, IdTypes};
Expand Down Expand Up @@ -212,7 +212,7 @@ async fn try_register_confluent_schema(
match config.format.clone() {
Some(Format::Avro(mut avro)) => {
if avro.confluent_schema_registry && avro.schema_id.is_none() {
let avro_schema = arrow_to_avro_schema("avro_root", schema.fields());
let avro_schema = ArrowSerializer::avro_schema(&*schema);

let id = schema_registry
.write_schema(avro_schema.canonical_form(), ConfluentSchemaType::Avro)
Expand All @@ -224,7 +224,7 @@ async fn try_register_confluent_schema(
}
Some(Format::Json(mut json)) => {
if json.confluent_schema_registry && json.schema_id.is_none() {
let json_schema = arrow_to_json_schema(schema.fields());
let json_schema = ArrowSerializer::json_schema(&*schema);

let id = schema_registry
.write_schema(json_schema.to_string(), ConfluentSchemaType::Json)
Expand All @@ -245,26 +245,35 @@ async fn try_register_confluent_schema(
}

async fn register_schemas(compiled_sql: &mut CompiledSql) -> anyhow::Result<()> {
for node in compiled_sql.program.graph.node_indices() {
let Some(_input) = compiled_sql
// register schemas for sinks
for idx in compiled_sql
.program
.graph
.externals(Direction::Outgoing)
.collect::<Vec<_>>()
{
let edge = compiled_sql
.program
.graph
.edges_directed(node, Direction::Incoming)
.edges_directed(idx, EdgeDirection::Incoming)
.next()
else {
continue;
};
.ok_or_else(|| anyhow!("no incoming edges for sink node: {:?}", idx.weight()))?;

let schema = edge.weight().schema.schema.clone();

// TODO: schema registration
// let Some(value_schema) = compiled_sql.schemas.get(&input.weight().value) else {
// continue;
// };
//
// let node = compiled_sql.program.graph.node_weight_mut(node).unwrap();
//
// if let Operator::ConnectorSink(connector) = &mut node.operator {
// try_register_confluent_schema(connector, value_schema).await?;
// }
let node = compiled_sql.program.graph.node_weight_mut(idx).unwrap();
if node.operator_name == OperatorName::ConnectorSink {
let mut op = ConnectorOp::decode(&node.operator_config[..]).map_err(|_| {
anyhow!(
"failed to decode configuration for connector node {:?}",
node
)
})?;

try_register_confluent_schema(&mut op, &schema).await?;

node.operator_config = op.encode_to_vec();
}
}

Ok(())
Expand Down Expand Up @@ -296,7 +305,6 @@ pub(crate) async fn create_pipeline<'a>(
.try_into()
.map_err(log_and_map)?,
connection_ids: vec![],
schemas: HashMap::new(),
};
text = None;
udfs = None;
Expand Down Expand Up @@ -356,8 +364,7 @@ pub(crate) async fn create_pipeline<'a>(
for node in compiled.program.graph.node_weights_mut() {
// replace all sink connectors with websink for preview
if node.operator_name == OperatorName::ConnectorSink {
node.operator_config =
api::ConnectorOp::from(ConnectorOp::web_sink()).encode_to_vec();
node.operator_config = preview_sink().encode_to_vec();
}
}
}
Expand Down Expand Up @@ -455,7 +462,7 @@ impl TryInto<Pipeline> for DbPipeline {
checkpoint_interval_micros: self.checkpoint_interval_micros as u64,
stop,
created_at: to_micros(self.created_at),
graph: program.as_job_graph().into(),
graph: program.try_into().map_err(log_and_map)?,
action: action.map(|a| a.into()),
action_text,
action_in_progress,
Expand Down Expand Up @@ -514,7 +521,7 @@ pub async fn validate_query(
//optimizations::optimize(&mut program.graph);

QueryValidationResult {
graph: Some(program.as_job_graph().into()),
graph: Some(program.try_into().map_err(log_and_map)?),
errors: None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/filesystem/sink/json.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{fs::File, io::Write, time::Instant};

use arrow::record_batch::RecordBatch;
use arroyo_formats::serialize::ArrowSerializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_rpc::{df::ArroyoSchemaRef, formats::Format};

use super::{
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/fluvio/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, bail};
use arroyo_formats::serialize::ArrowSerializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/fluvio/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use arrow::array::RecordBatch;
use async_trait::async_trait;
use fluvio::{Fluvio, FluvioConfig, TopicProducer};

use arroyo_formats::serialize::ArrowSerializer;
use arroyo_formats::ser::ArrowSerializer;
use tracing::info;

use arroyo_operator::context::ArrowContext;
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{anyhow, bail};
use arroyo_formats::serialize::ArrowSerializer;
use arroyo_formats::ArrowDeserializer;
use arroyo_formats::de::ArrowDeserializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::Connection;
use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
use arroyo_rpc::df::ArroyoSchema;
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use rdkafka::util::Timeout;
use rdkafka::ClientConfig;

use arrow::array::RecordBatch;
use arroyo_formats::serialize::ArrowSerializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::ArrowOperator;
use arroyo_types::CheckpointBarrier;
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/kafka/sink/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::{Duration, SystemTime};
use arrow::array::{RecordBatch, UInt32Array};
use arrow::datatypes::Field;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use arroyo_formats::serialize::ArrowSerializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::ArrowOperator;
use arroyo_rpc::df::ArroyoSchema;
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/kinesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{anyhow, bail, Result};
use std::collections::HashMap;
use typify::import_types;

use arroyo_formats::serialize::ArrowSerializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::Connection;
use arroyo_rpc::api_types::connections::{ConnectionProfile, TestSourceMessage};
use arroyo_rpc::{api_types, OperatorConfig};
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/kinesis/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::{Duration, SystemTime};

use anyhow::Result;
use arrow::array::RecordBatch;
use arroyo_formats::serialize::ArrowSerializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::ArrowOperator;
use arroyo_types::CheckpointBarrier;
Expand Down
2 changes: 0 additions & 2 deletions crates/arroyo-connectors/src/preview/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl ArrowOperator for PreviewSink {
.map(|nanos| from_nanos(nanos as u128))
.unwrap_or_else(|| SystemTime::now()),
),
key: value.clone(),
value,
done: false,
})
Expand All @@ -72,7 +71,6 @@ impl ArrowOperator for PreviewSink {
operator_id: ctx.task_info.operator_id.clone(),
subtask_index: ctx.task_info.task_index as u32,
timestamp: to_micros(SystemTime::now()),
key: "".to_string(),
value: "".to_string(),
done: true,
})
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod operator;
use std::collections::HashMap;

use anyhow::{anyhow, bail};
use arroyo_formats::serialize::ArrowSerializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::var_str::VarStr;
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/redis/operator/sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::redis::{ListOperation, RedisClient, RedisTable, TableType, Target};
use arrow::array::{AsArray, RecordBatch};
use arroyo_formats::serialize::ArrowSerializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::context::{ArrowContext, ErrorReporter};
use arroyo_operator::operator::ArrowOperator;
use arroyo_types::CheckpointBarrier;
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/webhook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::SystemTime;
use anyhow::anyhow;
use arroyo_rpc::OperatorConfig;

use arroyo_formats::serialize::ArrowSerializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::Connection;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/webhook/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::sync::{Mutex, Semaphore};
use tracing::warn;

use crate::webhook::MAX_INFLIGHT;
use arroyo_formats::serialize::ArrowSerializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::ArrowOperator;
use arroyo_rpc::grpc::TableConfig;
Expand Down
Loading
Loading