Skip to content

Commit

Permalink
Finish avro support
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jan 7, 2024
1 parent 31a3258 commit d778796
Show file tree
Hide file tree
Showing 12 changed files with 503 additions and 291 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use tracing::warn;
use arroyo_connectors::confluent::ConfluentProfile;
use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry};
use arroyo_connectors::{connector_for_type, ErasedConnector};
use arroyo_df::avro;
use arroyo_df::json_schema::convert_json_schema;
use arroyo_df::types::{StructField, TypeDef};
use arroyo_formats::avro;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionTable, ConnectionTablePost, ConnectionType,
SchemaDefinition,
Expand Down Expand Up @@ -521,8 +521,9 @@ async fn expand_avro_schema(
);
}

let fields: Result<_, String> = avro::convert_avro_schema(&name, &definition)
let fields: Result<_, String> = avro::to_arrow(&name, &definition)
.map_err(|e| bad_request(format!("Invalid avro schema: {}", e)))?
.fields
.into_iter()
.map(|f| (**f).clone().try_into())
.collect();
Expand Down
73 changes: 53 additions & 20 deletions arroyo-connectors/src/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use anyhow::{anyhow, bail};
use arroyo_formats::avro::deserialize_slice_avro;
use arroyo_formats::{avro, ArrowDeserializer};
use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
use arroyo_rpc::formats::{Format, JsonFormat};
use arroyo_rpc::formats::{BadData, Format, JsonFormat};
use arroyo_rpc::schema_resolver::{ConfluentSchemaRegistryClient, FailingSchemaResolver};
use arroyo_rpc::{schema_resolver, var_str::VarStr, OperatorConfig};
use arroyo_rpc::{schema_resolver, var_str::VarStr, ArroyoSchema, OperatorConfig};
use axum::response::sse::Event;
use futures::TryFutureExt;
use rdkafka::{
Expand All @@ -15,7 +15,7 @@ use serde_json::Value;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Receiver;
use tokio::sync::{oneshot, Mutex};
Expand Down Expand Up @@ -437,6 +437,7 @@ impl KafkaTester {
pub async fn validate_schema(
&self,
table: &KafkaTable,
schema: &ConnectionSchema,
format: &Format,
msg: Vec<u8>,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -483,24 +484,54 @@ impl KafkaTester {
bail!("Message appears to be encoded as normal Avro, rather than SR-Avro, but the schema registry is enabled. Ensure that the format and schema type are correct.");
}

let schema_registry = Arc::new(Mutex::new(HashMap::new()));

let _ = deserialize_slice_avro::<Value>(avro, schema_registry, Arc::new(schema_resolver), &msg)
let aschema: ArroyoSchema = schema.clone().into();
let mut deserializer = ArrowDeserializer::with_schema_resolver(
format.clone(),
None,
aschema.clone(),
BadData::Fail {},
Arc::new(schema_resolver),
);
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now())
.await
.map_err(|e|
anyhow!("Failed to parse message as schema-registry Avro (SR-Avro): {:?}. Ensure that the format and schema type are correct.", e))?;
} else {
let resolver = Arc::new(FailingSchemaResolver::new());
let registry = Arc::new(Mutex::new(HashMap::new()));
.into_iter()
.next();
if let Some(Err(e)) = deserializer.flush_buffer() {
error.replace(e);
}

let _ = deserialize_slice_avro::<Value>(avro, registry, resolver, &msg)
if let Some(error) = error {
bail!("Failed to parse message as schema-registry Avro (SR-Avro): {:?}. Ensure that the format and schema type are correct.", error.details());
}
} else {
let aschema: ArroyoSchema = schema.clone().into();
let mut deserializer = ArrowDeserializer::new(
format.clone(),
aschema.clone(),
None,
BadData::Fail {},
);
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now())
.await
.map_err(|e|
if msg[0] == 0 {
anyhow!("Failed to parse message as regular Avro. It may be encoded as SR-Avro, but the schema registry is not enabled. Ensure that the format and schema type are correct.")
} else {
anyhow!("Failed to parse message as Avro: {:?}. Ensure that the format and schema type are correct.", e)
})?;
.into_iter()
.next();
if let Some(Err(e)) = deserializer.flush_buffer() {
error.replace(e);
}

if let Some(error) = error {
if msg[0] == 0 {
bail!("Failed to parse message as regular Avro. It may be encoded as SR-Avro, but the schema registry is not enabled. Ensure that the format and schema type are correct.");
} else {
bail!("Failed to parse message as Avro: {:?}. Ensure that the format and schema type are correct.", error.details());
};
}
}
}
Format::Parquet(_) => {
Expand All @@ -522,7 +553,8 @@ impl KafkaTester {
mut tx: Sender<Result<Event, Infallible>>,
) -> anyhow::Result<()> {
let format = schema
.and_then(|s| s.format)
.as_ref()
.and_then(|s| s.format.clone())
.ok_or_else(|| anyhow!("No format defined for Kafka connection"))?;

let client = self.connect().await.map_err(|e| anyhow!("{}", e))?;
Expand Down Expand Up @@ -587,6 +619,7 @@ impl KafkaTester {
self.info(&mut tx, "Received message from Kafka").await;
self.validate_schema(
&table,
schema.as_ref().unwrap(),
&format,
message
.detach()
Expand Down
4 changes: 2 additions & 2 deletions arroyo-df/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow_schema::{DataType, Field, FieldRef};
use arroyo_connectors::{connector_for_type, Connection};
use arroyo_datastream::ConnectorOp;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, SchemaDefinition, SourceField,
ConnectionProfile, ConnectionSchema, ConnectionType, SourceField,
};
use arroyo_rpc::formats::{BadData, Format, Framing};
use arroyo_types::ArroyoExtensionType;
Expand All @@ -27,7 +27,7 @@ use datafusion_expr::{
};
use tracing::info;

use crate::{avro, DEFAULT_IDLE_TIME};
use crate::DEFAULT_IDLE_TIME;
use crate::{
external::{ProcessingMode, SqlSource},
json_schema,
Expand Down
3 changes: 1 addition & 2 deletions arroyo-df/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ use datafusion::sql::sqlparser::ast::{
ArrayElemTypeDef, DataType as SQLDataType, ExactNumberInfo, TimezoneInfo,
};

use crate::avro;
use arroyo_rpc::api_types::connections::{
FieldType, PrimitiveType, SourceField, SourceFieldType, StructType,
};
use arroyo_types::ArroyoExtensionType;
use datafusion_common::{DFField, DFSchemaRef, ScalarValue};
use proc_macro2::{Ident, TokenStream};
use quote::{format_ident, quote};
use quote::quote;
use regex::Regex;
use syn::PathArguments::AngleBracketed;
use syn::{parse_quote, parse_str, GenericArgument, Type};
Expand Down
Loading

0 comments on commit d778796

Please sign in to comment.