Skip to content

Commit

Permalink
Pipeline clusters (#662)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Jun 18, 2024
1 parent f3882e7 commit 758c95a
Show file tree
Hide file tree
Showing 24 changed files with 854 additions and 116 deletions.
80 changes: 80 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 14 additions & 17 deletions crates/arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use anyhow::anyhow;
use axum::response::IntoResponse;
use axum::Json;
use cornucopia_async::DatabaseSource;
use http::StatusCode;
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::net::SocketAddr;
use std::net::{SocketAddr, TcpListener};
use time::OffsetDateTime;
use tonic::transport::Channel;
use tracing::{error, info};
Expand Down Expand Up @@ -38,6 +36,8 @@ use arroyo_rpc::api_types::{checkpoints::*, connections::*, metrics::*, pipeline
use arroyo_rpc::config::config;
use arroyo_rpc::formats::*;
use arroyo_rpc::grpc::compiler_grpc_client::CompilerGrpcClient;
use arroyo_server_common::shutdown::ShutdownGuard;
use arroyo_server_common::wrap_start;

mod cloud;
mod connection_profiles;
Expand Down Expand Up @@ -118,25 +118,22 @@ pub async fn compiler_service() -> Result<CompilerGrpcClient<Channel>, ErrorResp
})
}

pub async fn start_server(database: DatabaseSource) -> anyhow::Result<()> {
pub fn start_server(database: DatabaseSource, guard: ShutdownGuard) -> anyhow::Result<u16> {
let config = config();
let addr = SocketAddr::new(config.api.bind_address, config.api.http_port);
let listener = TcpListener::bind(addr)?;
let local_addr = listener.local_addr()?;

let app = rest::create_rest_app(database, &config.controller_endpoint());

info!("Starting API server on {:?}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.map_err(|e| {
anyhow!(
"Failed to start API server on {}: {}",
addr,
e.source()
.map(|e| e.to_string())
.unwrap_or_else(|| e.to_string())
)
})
info!("Starting API server on {:?}", local_addr);
guard.into_spawn_task(wrap_start(
"api",
local_addr,
axum::Server::from_tcp(listener)?.serve(app.into_make_service()),
));

Ok(local_addr.port())
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use petgraph::visit::NodeRef;
use std::time::Duration;

use crate::{compiler_service, connection_profiles, jobs, types};
use arroyo_datastream::preview_sink;
use arroyo_datastream::default_sink;
use arroyo_rpc::api_types::pipelines::{
Job, Pipeline, PipelinePatch, PipelinePost, PipelineRestart, QueryValidationResult, StopType,
ValidateQueryPost,
Expand Down Expand Up @@ -297,13 +297,13 @@ pub(crate) async fn create_pipeline_int<'a>(
contact support@arroyo.systems for an increase", auth.org_metadata.max_operators)));
}

set_parallelism(&mut compiled.program, 1);
set_parallelism(&mut compiled.program, req.parallelism as usize);

if is_preview && !config().sinks_in_preview {
for node in compiled.program.graph.node_weights_mut() {
// replace all sink connectors with websink for preview
if node.operator_name == OperatorName::ConnectorSink {
node.operator_config = preview_sink().encode_to_vec();
node.operator_config = default_sink().encode_to_vec();
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::polling_http::PollingHTTPConnector;
use crate::preview::PreviewConnector;
use crate::redis::RedisConnector;
use crate::single_file::SingleFileConnector;
use crate::stdout::StdoutConnector;
use crate::webhook::WebhookConnector;
use anyhow::{anyhow, bail, Context};
use arroyo_operator::connector::ErasedConnector;
Expand Down Expand Up @@ -48,6 +49,7 @@ pub mod preview;
pub mod redis;
pub mod single_file;
pub mod sse;
pub mod stdout;
pub mod webhook;
pub mod websocket;

Expand All @@ -69,6 +71,7 @@ pub fn connectors() -> HashMap<&'static str, Box<dyn ErasedConnector>> {
Box::new(RedisConnector {}),
Box::new(SingleFileConnector {}),
Box::new(SSEConnector {}),
Box::new(StdoutConnector {}),
Box::new(WebhookConnector {}),
Box::new(WebsocketConnector {}),
];
Expand Down
Loading

0 comments on commit 758c95a

Please sign in to comment.