Skip to content

Commit

Permalink
Merge branch 'merge_master_3_11' into devy
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Mar 12, 2024
2 parents b38747e + c0b24c6 commit 905d5d4
Show file tree
Hide file tree
Showing 30 changed files with 1,691 additions and 114 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,17 @@ jobs:
curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
- name: Install Kafka
run: |
wget --progress=dot --show-progress https://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz
wget --progress=dot --show-progress https://archive.apache.org/dist/kafka/3.5.0/kafka_2.12-3.5.0.tgz
tar xvfz kafka*.tgz
mkdir /tmp/kraft-combined-logs
kafka_*/bin/kafka-storage.sh format -t 9v5PspiySuWU2l5NjTgRuA -c kafka_*/config/kraft/server.properties
kafka_*/bin/kafka-server-start.sh -daemon kafka_*/config/kraft/server.properties
- name: Install mosquitto
run: |
sudo apt-get install -y mosquitto
sudo service mosquitto start
- name: Check Formatting
run: cargo fmt -- --check
- name: Build
run: cargo build --all-features
- name: Validate API
Expand Down
111 changes: 75 additions & 36 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 12 additions & 8 deletions crates/arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,14 +632,18 @@ async fn get_schema(
));
};

let resolver =
ConfluentSchemaRegistry::new(&endpoint, &table.topic, api_key.clone(), api_secret.clone())
.map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})?;
let resolver = ConfluentSchemaRegistry::new(
&endpoint,
&table.subject(),
api_key.clone(),
api_secret.clone(),
)
.map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
))
})?;

resolver.get_schema_for_version(None).await.map_err(|e| {
bad_request(format!(
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ async fn try_register_confluent_schema(
};

let schema_registry =
ConfluentSchemaRegistry::new(&endpoint, &table.topic, api_key, api_secret)?;
ConfluentSchemaRegistry::new(&endpoint, &table.subject(), api_key, api_secret)?;

match config.format.clone() {
Some(Format::Avro(mut avro)) => {
Expand Down
11 changes: 9 additions & 2 deletions crates/arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ regex = "1"
##########################

# Kafka
rdkafka = { version = "0.33", features = ["cmake-build"] }
rdkafka = { version = "0.33", features = ["cmake-build", "tracing"] }
rdkafka-sys = "4.5.0"

# SSE
eventsource-client = "0.12.0"
Expand Down Expand Up @@ -77,5 +78,11 @@ object_store = { workspace = true }
deltalake = {version = "0.17", features = ["s3", "datafusion"] }
async-compression = { version = "0.4.3", features = ["tokio", "zstd", "gzip"] }

# MQTT
rumqttc = { version = "0.23.0", features = ["url"] }
rustls-native-certs = "0.6"
rustls-pemfile = "1"
tokio-rustls = "0.24"

[build-dependencies]
glob = "0.3"
glob = "0.3"
Loading

0 comments on commit 905d5d4

Please sign in to comment.