diff --git a/crates/arroyo-connectors/src/kafka/mod.rs b/crates/arroyo-connectors/src/kafka/mod.rs index 720e9441b..280225602 100644 --- a/crates/arroyo-connectors/src/kafka/mod.rs +++ b/crates/arroyo-connectors/src/kafka/mod.rs @@ -6,7 +6,7 @@ use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, Te use arroyo_rpc::df::ArroyoSchema; use arroyo_rpc::formats::{BadData, Format, JsonFormat}; use arroyo_rpc::schema_resolver::{ - ConfluentSchemaRegistry, ConfluentSchemaRegistryClient, FailingSchemaResolver, SchemaResolver, + ConfluentSchemaRegistry, ConfluentSchemaRegistryClient, SchemaResolver, }; use arroyo_rpc::{schema_resolver, var_str::VarStr, OperatorConfig}; use arroyo_types::string_to_map; @@ -342,14 +342,14 @@ impl Connector for KafkaConnector { .insert("isolation.level".to_string(), "read_committed".to_string()); } - let schema_resolver: Arc = + let schema_resolver: Option> = if let Some(SchemaRegistry::ConfluentSchemaRegistry { endpoint, api_key, api_secret, }) = &profile.schema_registry_enum { - Arc::new( + Some(Arc::new( ConfluentSchemaRegistry::new( endpoint, &table.subject(), @@ -357,9 +357,9 @@ impl Connector for KafkaConnector { api_secret.clone(), ) .expect("failed to construct confluent schema resolver"), - ) + )) } else { - Arc::new(FailingSchemaResolver::new()) + None }; Ok(OperatorNode::from_source(Box::new(KafkaSourceFunc { diff --git a/crates/arroyo-connectors/src/kafka/source/mod.rs b/crates/arroyo-connectors/src/kafka/source/mod.rs index d53ba8af0..d0d03dc63 100644 --- a/crates/arroyo-connectors/src/kafka/source/mod.rs +++ b/crates/arroyo-connectors/src/kafka/source/mod.rs @@ -32,7 +32,7 @@ pub struct KafkaSourceFunc { pub format: Format, pub framing: Option, pub bad_data: Option, - pub schema_resolver: Arc, + pub schema_resolver: Option>, pub client_configs: HashMap, pub messages_per_second: NonZeroU32, } @@ -150,12 +150,20 @@ impl KafkaSourceFunc { .await; } - ctx.initialize_deserializer_with_resolver( - self.format.clone(), - self.framing.clone(), - self.bad_data.clone(), - self.schema_resolver.clone(), - ); + if let Some(schema_resolver) = &self.schema_resolver { + ctx.initialize_deserializer_with_resolver( + self.format.clone(), + self.framing.clone(), + self.bad_data.clone(), + schema_resolver.clone(), + ); + } else { + ctx.initialize_deserializer( + self.format.clone(), + self.framing.clone(), + self.bad_data.clone(), + ); + } let mut flush_ticker = tokio::time::interval(Duration::from_millis(50)); flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); diff --git a/crates/arroyo-connectors/src/kafka/source/test.rs b/crates/arroyo-connectors/src/kafka/source/test.rs index 051c6b27c..0057b7ae8 100644 --- a/crates/arroyo-connectors/src/kafka/source/test.rs +++ b/crates/arroyo-connectors/src/kafka/source/test.rs @@ -18,7 +18,6 @@ use arroyo_operator::operator::SourceOperator; use arroyo_rpc::df::ArroyoSchema; use arroyo_rpc::formats::{Format, RawStringFormat}; use arroyo_rpc::grpc::rpc::{CheckpointMetadata, OperatorCheckpointMetadata, OperatorMetadata}; -use arroyo_rpc::schema_resolver::FailingSchemaResolver; use arroyo_rpc::{CheckpointCompleted, ControlMessage, ControlResp}; use arroyo_types::{ single_item_hash_map, to_micros, ArrowMessage, CheckpointBarrier, SignalMessage, TaskInfo, @@ -85,7 +84,7 @@ impl KafkaTopicTester { format: Format::RawString(RawStringFormat {}), framing: None, bad_data: None, - schema_resolver: Arc::new(FailingSchemaResolver::new()), + schema_resolver: None, client_configs: HashMap::new(), messages_per_second: NonZeroU32::new(100).unwrap(), }); diff --git a/crates/arroyo-formats/src/avro/schema.rs b/crates/arroyo-formats/src/avro/schema.rs index 7a07b6e0c..ee8346407 100644 --- a/crates/arroyo-formats/src/avro/schema.rs +++ b/crates/arroyo-formats/src/avro/schema.rs @@ -134,7 +134,7 @@ fn to_arrow_datatype(schema: &Schema) -> (DataType, bool, Option (DataType::Float32, false, None), Schema::Double => (DataType::Float64, false, None), - Schema::Bytes | Schema::Fixed(_) | Schema::Decimal(_) => (DataType::Binary, false, None), + Schema::Bytes | Schema::Fixed(_) | Schema::Decimal(_) => (DataType::Utf8, false, None), Schema::String | Schema::Enum(_) | Schema::Uuid => (DataType::Utf8, false, None), Schema::Union(union) => { // currently just support unions that have [t, null] as variants, which is the diff --git a/webui/src/routes/connections/SchemaEditor.tsx b/webui/src/routes/connections/SchemaEditor.tsx index 416c9f3dd..3ef346342 100644 --- a/webui/src/routes/connections/SchemaEditor.tsx +++ b/webui/src/routes/connections/SchemaEditor.tsx @@ -1,6 +1,17 @@ import { Dispatch, useEffect, useRef, useState } from 'react'; import { CreateConnectionState } from './CreateConnection'; -import { Alert, AlertIcon, Box, Button, List, ListItem, Stack } from '@chakra-ui/react'; +import { + Alert, + AlertIcon, + Box, + Button, + Checkbox, + FormControl, + FormHelperText, + List, + ListItem, + Stack, +} from '@chakra-ui/react'; import * as monaco from 'monaco-editor/esm/vs/editor/editor.api'; import { ConnectionSchema, post } from '../../lib/data_fetching'; import { formatError } from '../../lib/util'; @@ -22,10 +33,22 @@ export function SchemaEditor({ const [errors, setErrors] = useState | null>(null); const [testing, setTesting] = useState(false); const [tested, setTested] = useState(); + const [rawDatum, setRawDatum] = useState(false); const valid = tested == editor?.getValue() && errors?.length == 0; const testSchema = async () => { + // if avro and raw datum, then we need to add the raw datum encoding + if (format == 'avro' && rawDatum) { + // @ts-ignore + state.schema!.format['avro']!.rawDatums = rawDatum; + // update the state + setState({ + ...state, + schema: state.schema, + }); + } + setTesting(true); setErrors(null); const { error } = await post('/v1/connection_tables/schemas/test', { @@ -68,6 +91,28 @@ export function SchemaEditor({ } } + let avroOptions = null; + if (format == 'avro') { + avroOptions = ( + + + { + console.log('CHECKED = ', e.target.checked); + setRawDatum(e.target.checked); + }} + > + Raw datum encoding + + + This encoding should be used for streams composed of individual avro datums, + rather than complete Avro documents with embedded schemas + + + + ); + } + useEffect(() => { if (monacoEl && !editor && !created.current) { let e = monaco.editor.create(monacoEl.current!, { @@ -90,6 +135,7 @@ export function SchemaEditor({ // @ts-ignore schema.format![format] = {}; + // @ts-ignore schema.definition![format + '_schema'] = e.getValue(); @@ -108,6 +154,7 @@ export function SchemaEditor({ return ( + {avroOptions}