Skip to content

Commit

Permalink
Add option to set raw_datums in avro schema editor (#685)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jul 12, 2024
1 parent 6d2806d commit d127018
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 16 deletions.
10 changes: 5 additions & 5 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, Te
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{BadData, Format, JsonFormat};
use arroyo_rpc::schema_resolver::{
ConfluentSchemaRegistry, ConfluentSchemaRegistryClient, FailingSchemaResolver, SchemaResolver,
ConfluentSchemaRegistry, ConfluentSchemaRegistryClient, SchemaResolver,
};
use arroyo_rpc::{schema_resolver, var_str::VarStr, OperatorConfig};
use arroyo_types::string_to_map;
Expand Down Expand Up @@ -342,24 +342,24 @@ impl Connector for KafkaConnector {
.insert("isolation.level".to_string(), "read_committed".to_string());
}

let schema_resolver: Arc<dyn SchemaResolver + Sync> =
let schema_resolver: Option<Arc<dyn SchemaResolver + Sync>> =
if let Some(SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
api_secret,
}) = &profile.schema_registry_enum
{
Arc::new(
Some(Arc::new(
ConfluentSchemaRegistry::new(
endpoint,
&table.subject(),
api_key.clone(),
api_secret.clone(),
)
.expect("failed to construct confluent schema resolver"),
)
))
} else {
Arc::new(FailingSchemaResolver::new())
None
};

Ok(OperatorNode::from_source(Box::new(KafkaSourceFunc {
Expand Down
22 changes: 15 additions & 7 deletions crates/arroyo-connectors/src/kafka/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct KafkaSourceFunc {
pub format: Format,
pub framing: Option<Framing>,
pub bad_data: Option<BadData>,
pub schema_resolver: Arc<dyn SchemaResolver + Sync>,
pub schema_resolver: Option<Arc<dyn SchemaResolver + Sync>>,
pub client_configs: HashMap<String, String>,
pub messages_per_second: NonZeroU32,
}
Expand Down Expand Up @@ -150,12 +150,20 @@ impl KafkaSourceFunc {
.await;
}

ctx.initialize_deserializer_with_resolver(
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
self.schema_resolver.clone(),
);
if let Some(schema_resolver) = &self.schema_resolver {
ctx.initialize_deserializer_with_resolver(
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
schema_resolver.clone(),
);
} else {
ctx.initialize_deserializer(
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
);
}

let mut flush_ticker = tokio::time::interval(Duration::from_millis(50));
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
Expand Down
3 changes: 1 addition & 2 deletions crates/arroyo-connectors/src/kafka/source/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use arroyo_operator::operator::SourceOperator;
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{Format, RawStringFormat};
use arroyo_rpc::grpc::rpc::{CheckpointMetadata, OperatorCheckpointMetadata, OperatorMetadata};
use arroyo_rpc::schema_resolver::FailingSchemaResolver;
use arroyo_rpc::{CheckpointCompleted, ControlMessage, ControlResp};
use arroyo_types::{
single_item_hash_map, to_micros, ArrowMessage, CheckpointBarrier, SignalMessage, TaskInfo,
Expand Down Expand Up @@ -85,7 +84,7 @@ impl KafkaTopicTester {
format: Format::RawString(RawStringFormat {}),
framing: None,
bad_data: None,
schema_resolver: Arc::new(FailingSchemaResolver::new()),
schema_resolver: None,
client_configs: HashMap::new(),
messages_per_second: NonZeroU32::new(100).unwrap(),
});
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-formats/src/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ fn to_arrow_datatype(schema: &Schema) -> (DataType, bool, Option<ArroyoExtension
),
Schema::Float => (DataType::Float32, false, None),
Schema::Double => (DataType::Float64, false, None),
Schema::Bytes | Schema::Fixed(_) | Schema::Decimal(_) => (DataType::Binary, false, None),
Schema::Bytes | Schema::Fixed(_) | Schema::Decimal(_) => (DataType::Utf8, false, None),
Schema::String | Schema::Enum(_) | Schema::Uuid => (DataType::Utf8, false, None),
Schema::Union(union) => {
// currently just support unions that have [t, null] as variants, which is the
Expand Down
49 changes: 48 additions & 1 deletion webui/src/routes/connections/SchemaEditor.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
import { Dispatch, useEffect, useRef, useState } from 'react';
import { CreateConnectionState } from './CreateConnection';
import { Alert, AlertIcon, Box, Button, List, ListItem, Stack } from '@chakra-ui/react';
import {
Alert,
AlertIcon,
Box,
Button,
Checkbox,
FormControl,
FormHelperText,
List,
ListItem,
Stack,
} from '@chakra-ui/react';
import * as monaco from 'monaco-editor/esm/vs/editor/editor.api';
import { ConnectionSchema, post } from '../../lib/data_fetching';
import { formatError } from '../../lib/util';
Expand All @@ -22,10 +33,22 @@ export function SchemaEditor({
const [errors, setErrors] = useState<Array<string> | null>(null);
const [testing, setTesting] = useState<boolean>(false);
const [tested, setTested] = useState<string | undefined>();
const [rawDatum, setRawDatum] = useState<boolean>(false);

const valid = tested == editor?.getValue() && errors?.length == 0;

const testSchema = async () => {
// if avro and raw datum, then we need to add the raw datum encoding
if (format == 'avro' && rawDatum) {
// @ts-ignore
state.schema!.format['avro']!.rawDatums = rawDatum;
// update the state
setState({
...state,
schema: state.schema,
});
}

setTesting(true);
setErrors(null);
const { error } = await post('/v1/connection_tables/schemas/test', {
Expand Down Expand Up @@ -68,6 +91,28 @@ export function SchemaEditor({
}
}

let avroOptions = null;
if (format == 'avro') {
avroOptions = (
<Box maxW={'lg'}>
<FormControl>
<Checkbox
onChange={e => {
console.log('CHECKED = ', e.target.checked);
setRawDatum(e.target.checked);
}}
>
Raw datum encoding
</Checkbox>
<FormHelperText>
This encoding should be used for streams composed of individual <i>avro datums</i>,
rather than complete Avro documents with embedded schemas
</FormHelperText>
</FormControl>
</Box>
);
}

useEffect(() => {
if (monacoEl && !editor && !created.current) {
let e = monaco.editor.create(monacoEl.current!, {
Expand All @@ -90,6 +135,7 @@ export function SchemaEditor({

// @ts-ignore
schema.format![format] = {};

// @ts-ignore
schema.definition![format + '_schema'] = e.getValue();

Expand All @@ -108,6 +154,7 @@ export function SchemaEditor({

return (
<Stack spacing={4}>
{avroOptions}
<Box marginTop={5} width="100%">
<div className="editor" ref={monacoEl}></div>
</Box>
Expand Down

0 comments on commit d127018

Please sign in to comment.