Skip to content

Commit

Permalink
Migrate nexmark (ArroyoSystems#530)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Feb 16, 2024
1 parent 247fd2a commit 5c60a21
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 295 deletions.
8 changes: 0 additions & 8 deletions crates/arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,6 @@ pub(crate) fn source_field(name: &str, field_type: FieldType) -> SourceField {
}
}

pub(crate) fn nullable_field(name: &str, field_type: SourceFieldType) -> SourceField {
SourceField {
field_name: name.to_string(),
field_type,
nullable: true,
}
}

fn construct_http_client(endpoint: &str, headers: Option<String>) -> anyhow::Result<Client> {
if let Err(e) = reqwest::Url::parse(&endpoint) {
bail!("invalid endpoint '{}': {:?}", endpoint, e)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,91 +1,94 @@
mod operator;
#[cfg(test)]
mod test;

use anyhow::{anyhow, bail};
use arrow::datatypes::{Field, Schema, TimeUnit};
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::api_types::connections::FieldType::Primitive;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, FieldType, SourceFieldType, StructType,
TestSourceMessage,
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
};
use arroyo_rpc::OperatorConfig;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use typify::import_types;

use crate::{nullable_field, pull_opt, source_field, EmptyConfig};
use crate::nexmark::operator::NexmarkSourceFunc;
use crate::{pull_opt, EmptyConfig};

const TABLE_SCHEMA: &str = include_str!("./table.json");
const ICON: &str = include_str!("./nexmark.svg");

import_types!(schema = "src/nexmark/table.json");

pub(crate) fn person_fields() -> Vec<Field> {
use arrow::datatypes::DataType::*;

vec![
Field::new("id", Int64, false),
Field::new("name", Utf8, false),
Field::new("email_address", Utf8, false),
Field::new("credit_card", Utf8, false),
Field::new("city", Utf8, false),
Field::new("state", Utf8, false),
Field::new("datetime", Timestamp(TimeUnit::Nanosecond, None), false),
Field::new("extra", Utf8, false),
]
}

pub(crate) fn auction_fields() -> Vec<Field> {
use arrow::datatypes::DataType::*;

vec![
Field::new("id", Int64, false),
Field::new("description", Utf8, false),
Field::new("item_name", Utf8, false),
Field::new("initial_bid", Int64, false),
Field::new("reserve", Int64, false),
Field::new("datetime", Timestamp(TimeUnit::Nanosecond, None), false),
Field::new("expires", Timestamp(TimeUnit::Nanosecond, None), false),
Field::new("seller", Int64, false),
Field::new("category", Int64, false),
Field::new("extra", Utf8, false),
]
}

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/nexmark/table.json");
const ICON: &str = include_str!("../resources/nexmark.svg");
pub(crate) fn bid_fields() -> Vec<Field> {
use arrow::datatypes::DataType::*;

vec![
Field::new("auction", Int64, false),
Field::new("bidder", Int64, false),
Field::new("price", Int64, false),
Field::new("channel", Utf8, false),
Field::new("url", Utf8, false),
Field::new("datetime", Timestamp(TimeUnit::Nanosecond, None), false),
Field::new("extra", Utf8, false),
]
}

import_types!(schema = "../connector-schemas/nexmark/table.json");
fn arrow_schema() -> Schema {
use arrow::datatypes::DataType::Struct;
Schema::new(vec![
Field::new("person", Struct(person_fields().into()), true),
Field::new("auction", Struct(auction_fields().into()), true),
Field::new("bid", Struct(bid_fields().into()), true),
])
}

pub fn nexmark_schema() -> ConnectionSchema {
use arroyo_rpc::api_types::connections::PrimitiveType::*;
ConnectionSchema {
format: None,
bad_data: None,
framing: None,
struct_name: Some("arroyo_types::nexmark::Event".to_string()),
fields: vec![
nullable_field(
"person",
SourceFieldType {
r#type: FieldType::Struct(StructType {
name: Some("arroyo_types::nexmark::Person".to_string()),
fields: vec![
source_field("id", Primitive(Int64)),
source_field("name", Primitive(String)),
source_field("email_address", Primitive(String)),
source_field("credit_card", Primitive(String)),
source_field("city", Primitive(String)),
source_field("state", Primitive(String)),
source_field("datetime", Primitive(UnixMillis)),
source_field("extra", Primitive(String)),
],
}),
sql_name: None,
},
),
nullable_field(
"bid",
SourceFieldType {
r#type: FieldType::Struct(StructType {
name: Some("arroyo_types::nexmark::Bid".to_string()),
fields: vec![
source_field("auction", Primitive(Int64)),
source_field("bidder", Primitive(Int64)),
source_field("price", Primitive(Int64)),
source_field("channel", Primitive(String)),
source_field("url", Primitive(String)),
source_field("datetime", Primitive(UnixMillis)),
source_field("extra", Primitive(String)),
],
}),
sql_name: None,
},
),
nullable_field(
"auction",
SourceFieldType {
r#type: FieldType::Struct(StructType {
name: Some("arroyo_types::nexmark::Auction".to_string()),
fields: vec![
source_field("id", Primitive(Int64)),
source_field("description", Primitive(String)),
source_field("item_name", Primitive(String)),
source_field("initial_bid", Primitive(Int64)),
source_field("reserve", Primitive(Int64)),
source_field("datetime", Primitive(UnixMillis)),
source_field("expires", Primitive(UnixMillis)),
source_field("seller", Primitive(Int64)),
source_field("category", Primitive(Int64)),
source_field("extra", Primitive(String)),
],
}),
sql_name: None,
},
),
],
struct_name: None,
fields: arrow_schema()
.fields
.iter()
.map(|f| (**f).clone().try_into().unwrap())
.collect(),
definition: None,
inferred: None,
}
Expand Down Expand Up @@ -224,9 +227,11 @@ impl Connector for NexmarkConnector {
fn make_operator(
&self,
_: Self::ProfileT,
_: Self::TableT,
table: Self::TableT,
_: OperatorConfig,
) -> anyhow::Result<OperatorNode> {
todo!()
Ok(OperatorNode::from_source(Box::new(
NexmarkSourceFunc::from_config(&table),
)))
}
}
Loading

0 comments on commit 5c60a21

Please sign in to comment.