Skip to content

Commit

Permalink
protobuf schema editor
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Aug 14, 2024
1 parent 16a61e0 commit 6eb0ba6
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 56 deletions.
34 changes: 12 additions & 22 deletions crates/arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use crate::{
};
use arroyo_formats::proto::schema::{protobuf_to_arrow, schema_file_to_descriptor};
use cornucopia_async::{Database, DatabaseSource};
use prost_reflect::DescriptorPool;

async fn get_and_validate_connector(
req: &ConnectionTablePost,
Expand Down Expand Up @@ -468,17 +467,7 @@ pub(crate) async fn expand_schema(
Format::Parquet(_) => Ok(schema),
Format::RawString(_) => Ok(schema),
Format::RawBytes(_) => Ok(schema),
Format::Protobuf(_) => {
expand_proto_schema(
name,
connector,
connection_type,
schema,
profile_config,
table_config,
)
.await
}
Format::Protobuf(_) => expand_proto_schema(schema).await,
}
}

Expand Down Expand Up @@ -546,14 +535,7 @@ async fn expand_avro_schema(
Ok(schema)
}

async fn expand_proto_schema(
name: &str,
connector: &str,
connection_type: ConnectionType,
mut schema: ConnectionSchema,
profile_config: &Value,
table_config: &Value,
) -> Result<ConnectionSchema, ErrorResp> {
async fn expand_proto_schema(mut schema: ConnectionSchema) -> Result<ConnectionSchema, ErrorResp> {
let Some(Format::Protobuf(ProtobufFormat {
message_name,
compiled_schema,
Expand All @@ -572,7 +554,10 @@ async fn expand_proto_schema(
return Err(bad_request("Schema is not a protobuf schema"));
};

let message_name = message_name.as_ref().expect("no message name provided");
let message_name = message_name
.as_ref()
.filter(|m| !m.is_empty())
.ok_or_else(|| bad_request("message name must be provided for protobuf schemas"))?;

let encoded = schema_file_to_descriptor(protobuf_schema, dependencies)
.await
Expand All @@ -588,6 +573,7 @@ async fn expand_proto_schema(
message_name,
pool.all_messages()
.map(|m| m.full_name().to_string())
.filter(|m| !m.starts_with("google.protobuf."))
.collect::<Vec<_>>()
.join(", ")
))
Expand Down Expand Up @@ -741,7 +727,7 @@ async fn get_schema(
pub(crate) async fn test_schema(
WithRejection(Json(req), _): WithRejection<Json<ConnectionSchema>, ApiError>,
) -> Result<(), ErrorResp> {
let Some(schema_def) = req.definition else {
let Some(schema_def) = &req.definition else {
return Ok(());
};

Expand All @@ -753,6 +739,10 @@ pub(crate) async fn test_schema(
Ok(())
}
}
SchemaDefinition::ProtobufSchema { .. } => {
let _ = expand_proto_schema(req.clone()).await?;
Ok(())
}
_ => {
// TODO: add testing for other schema types
Ok(())
Expand Down
21 changes: 20 additions & 1 deletion crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,26 @@ impl KafkaTester {
// all bytes are valid
}
Format::Protobuf(_) => {
todo!("Validate protobuf schema");
let aschema: ArroyoSchema = schema.clone().into();
let mut deserializer =
ArrowDeserializer::new(format.clone(), aschema.clone(), None, BadData::Fail {});
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now())
.await
.into_iter()
.next();
if let Some(Err(e)) = deserializer.flush_buffer() {
error.replace(e);
}

if let Some(error) = error {
bail!(
"Failed to parse message according to the provided Protobuf schema: {}",
error.details()
);
}
}
};

Expand Down
4 changes: 1 addition & 3 deletions crates/arroyo-formats/src/proto/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use arroyo_rpc::formats::ProtobufFormat;
use arroyo_types::SourceError;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use prost_reflect::{
DescriptorPool, DynamicMessage, FieldDescriptor, Kind, MapKey, MessageDescriptor, Value,
};
use prost_reflect::{DescriptorPool, DynamicMessage, FieldDescriptor, Kind, MapKey, Value};
use serde_json::Value as JsonValue;

pub(crate) fn deserialize_proto(
Expand Down
12 changes: 6 additions & 6 deletions crates/arroyo-formats/src/proto/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;
fn test_basic_types() {
let pool = DescriptorPool::decode(include_bytes!("protos/basic_types.bin").as_ref()).unwrap();
let message = pool.all_messages().next().unwrap();
let arrow_schema = protobuf_to_arrow(message).unwrap();
let arrow_schema = protobuf_to_arrow(&message).unwrap();

assert_eq!(arrow_schema.fields().len(), 7);
assert_field(&arrow_schema, "bool_field", DataType::Boolean, true);
Expand All @@ -25,7 +25,7 @@ fn test_string_and_bytes() {
let pool =
DescriptorPool::decode(include_bytes!("protos/string_and_bytes.bin").as_ref()).unwrap();
let message = pool.all_messages().next().unwrap();
let arrow_schema = protobuf_to_arrow(message).unwrap();
let arrow_schema = protobuf_to_arrow(&message).unwrap();

assert_eq!(arrow_schema.fields().len(), 2);
assert_field(&arrow_schema, "string_field", DataType::Utf8, true);
Expand All @@ -37,7 +37,7 @@ fn test_nested_message() {
let pool =
DescriptorPool::decode(include_bytes!("protos/nested_message.bin").as_ref()).unwrap();
let message = pool.all_messages().next().unwrap();
let arrow_schema = protobuf_to_arrow(message).unwrap();
let arrow_schema = protobuf_to_arrow(&message).unwrap();

assert_eq!(arrow_schema.fields().len(), 2);
assert_field(
Expand Down Expand Up @@ -68,7 +68,7 @@ fn test_repeated_fields() {
let pool =
DescriptorPool::decode(include_bytes!("protos/repeated_fields.bin").as_ref()).unwrap();
let message = pool.all_messages().next().unwrap();
let arrow_schema = protobuf_to_arrow(message).unwrap();
let arrow_schema = protobuf_to_arrow(&message).unwrap();

assert_eq!(arrow_schema.fields().len(), 2);
assert_field(
Expand All @@ -90,7 +90,7 @@ fn test_map_fields() {
let pool = DescriptorPool::decode(include_bytes!("protos/map_fields.bin").as_ref()).unwrap();
let message = pool.all_messages().next().unwrap();

let arrow_schema = protobuf_to_arrow(message).unwrap();
let arrow_schema = protobuf_to_arrow(&message).unwrap();

assert_eq!(arrow_schema.fields().len(), 2);
assert_eq!(
Expand All @@ -115,7 +115,7 @@ fn test_map_fields() {
fn test_enum_fields() {
let pool = DescriptorPool::decode(include_bytes!("protos/enum_fields.bin").as_ref()).unwrap();
let message = pool.all_messages().next().unwrap();
let arrow_schema = protobuf_to_arrow(message).unwrap();
let arrow_schema = protobuf_to_arrow(&message).unwrap();

assert_eq!(arrow_schema.fields().len(), 1);
assert_field(&arrow_schema, "enum_field", DataType::Utf8, true);
Expand Down
16 changes: 5 additions & 11 deletions crates/arroyo-rpc/src/formats.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use prost_reflect::DescriptorPool;
use regex::Regex;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::cmp::Ordering;
Expand Down Expand Up @@ -242,19 +241,14 @@ pub struct ProtobufFormat {

#[serde(default)]
pub compiled_schema: Option<Vec<u8>>,

#[serde(default)]
pub confluent_schema_registry: bool,
}

impl ProtobufFormat {
pub fn from_opts(opts: &mut HashMap<String, String>) -> Result<Self, String> {
// Ok(Self {
// schema: None,
// into_unstructured_json: opts.remove("protobuf.into_unstructured_json")
// .filter(|t| t == "true")
// .is_some(),
// message_name: None,
// compiled_schema: None,
// })
todo!("from opts")
pub fn from_opts(_opts: &mut HashMap<String, String>) -> Result<Self, String> {
Err("Protobuf is not yet supported in CREATE TABLE statements".to_string())
}
}

Expand Down
16 changes: 15 additions & 1 deletion webui/src/gen/api-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ export interface components {
json: components["schemas"]["JsonFormat"];
}, {
avro: components["schemas"]["AvroFormat"];
}, {
protobuf: components["schemas"]["ProtobufFormat"];
}, {
parquet: components["schemas"]["ParquetFormat"];
}, {
Expand Down Expand Up @@ -436,6 +438,13 @@ export interface components {
};
/** @enum {string} */
PrimitiveType: "Int32" | "Int64" | "UInt32" | "UInt64" | "F32" | "F64" | "Bool" | "String" | "Bytes" | "UnixMillis" | "UnixMicros" | "UnixNanos" | "DateTime" | "Json";
ProtobufFormat: {
/** Format: binary */
compiledSchema?: string | null;
confluentSchemaRegistry?: boolean;
intoUnstructuredJson?: boolean;
messageName?: string | null;
};
QueryValidationResult: {
errors: (string)[];
graph?: components["schemas"]["PipelineGraph"] | null;
Expand All @@ -445,7 +454,12 @@ export interface components {
SchemaDefinition: OneOf<[{
json_schema: string;
}, {
protobuf_schema: string;
protobuf_schema: {
dependencies?: {
[key: string]: string | undefined;
};
schema: string;
};
}, {
avro_schema: string;
}, {
Expand Down
21 changes: 17 additions & 4 deletions webui/src/routes/connections/DefineSchema.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const SchemaFormatEditor = ({
state: CreateConnectionState;
setState: Dispatch<CreateConnectionState>;
next: () => void;
format: 'json' | 'avro';
format: 'json' | 'avro' | 'protobuf';
}) => {
type SchemaTypeOption = { name: string; value: string };
let schemaTypeOptions: SchemaTypeOption[] = [
Expand All @@ -61,14 +61,17 @@ const SchemaFormatEditor = ({
schemaTypeOptions.push({ name: 'Confluent Schema Registry', value: 'confluent' });
}

let def_name: 'json_schema' | 'avro_schema';
let def_name: 'json_schema' | 'avro_schema' | 'protobuf_schema';
switch (format) {
case 'json':
def_name = 'json_schema';
break;
case 'avro':
def_name = 'avro_schema';
break;
case 'protobuf':
def_name = 'protobuf_schema';
break;
default:
throw new Error('unknown format: ' + format);
}
Expand Down Expand Up @@ -359,9 +362,19 @@ export const DefineSchema = ({
el: <RawBytesEditor state={state} setState={setState} next={next} />,
},
{
name: 'Protobuf (coming soon)',
name: 'Protobuf',
value: 'protobuf',
disabled: true,
el: (
<SchemaFormatEditor
key="protoeditor"
connector={connector}
connectionProfiles={connectionProfiles!}
state={state}
setState={setState}
next={next}
format={'protobuf'}
/>
),
},
];

Expand Down
Loading

0 comments on commit 6eb0ba6

Please sign in to comment.