Skip to content

Commit

Permalink
Update metadata-store crate to use prost 0.13, tonic 0.12 and hyper 1.4
Browse files Browse the repository at this point in the history
This fixes #1732.
  • Loading branch information
tillrohrmann committed Jul 24, 2024
1 parent f7fc755 commit b70faa7
Show file tree
Hide file tree
Showing 13 changed files with 473 additions and 95 deletions.
234 changes: 181 additions & 53 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ opentelemetry_sdk = { version = "0.22.1" }
parking_lot = { version = "0.12" }
paste = "1.0"
pin-project = "1.0"
prost = { version = "0.13.1" }
prost-0-12 = { package = "prost", version = "0.12.1" }
prost-build = { version = "0.13.1" }
prost-build-0-12 = { package = "prost-build", version = "0.12.1" }
prost-dto = { version = "0.0.2" }
prost-types = { version = "0.13.1" }
prost-types-0-12 = { package = "prost-types", version = "0.12.1" }
rand = "0.8.5"
rayon = { version = "1.10" }
Expand Down Expand Up @@ -153,9 +156,13 @@ thiserror = "1.0"
tokio = { version = "1.29", default-features = false, features = ["rt-multi-thread", "signal", "macros", ] }
tokio-stream = "0.1.14"
tokio-util = { version = "0.7.10" }
tonic = { version = "0.12.1", default-features = false }
tonic-0-10 = { package = "tonic", version = "0.10.2", default-features = false }
tonic-reflection = { version = "0.12.1" }
tonic-reflection-0-10 = { package = "tonic-reflection", version = "0.10.2" }
tonic-health = { version = "0.12.1" }
tonic-health-0-10 = { package = "tonic-health", version = "0.10.2" }
tonic-build = { version = "0.12.1" }
tonic-build-0-11 = { package = "tonic-build", version = "0.11.0" }
tower = "0.4"
tower-http = { version = "0.5.2", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/task_center_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub enum TaskKind {
#[strum(props(OnCancel = "abort"))]
MetadataBackgroundSync,
RpcServer,
#[strum(props(OnCancel = "abort", OnError = "log"))]
RpcConnection,
/// A type for ingress until we start enforcing timeouts for inflight requests. This enables us
/// to shutdown cleanly without waiting indefinitely.
#[strum(props(OnCancel = "abort", runtime = "ingress"))]
Expand Down
19 changes: 12 additions & 7 deletions crates/metadata-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,25 @@ bytes = { workspace = true }
bytestring = { workspace = true }
derive_builder = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
humantime = { workspace = true }
prost-0-12 = { workspace = true }
prost-types-0-12 = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rocksdb = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
static_assertions = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic-0-10 = { workspace = true, features = ["transport", "codegen", "prost"] }
tonic-reflection-0-10 = { workspace = true }
tonic-health-0-10 = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true, features = ["net"] }
tonic = { workspace = true, features = ["transport", "codegen", "prost"] }
tonic-reflection = { workspace = true }
tonic-health = { workspace = true }
tower = { workspace = true }
tower-http-0-4 = { package = "tower-http", version = "0.4", features = ["trace"] }
tower-http = { workspace = true, features = ["trace"] }
tracing = { workspace = true }

[dev-dependencies]
Expand All @@ -51,4 +56,4 @@ test-log = { workspace = true }
tracing-subscriber = { workspace = true }

[build-dependencies]
tonic-build-0-11 = { workspace = true }
tonic-build = { workspace = true }
2 changes: 1 addition & 1 deletion crates/metadata-store/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::path::PathBuf;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());

tonic_build_0_11::configure()
tonic_build::configure()
.bytes(["."])
.file_descriptor_set_path(out_dir.join("metadata_store_svc.bin"))
// allow older protobuf compiler to be used
Expand Down
5 changes: 2 additions & 3 deletions crates/metadata-store/src/grpc_svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

tonic_0_10::include_proto!("dev.restate.metadata_store_svc");
tonic::include_proto!("dev.restate.metadata_store_svc");

pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic_0_10::include_file_descriptor_set!("metadata_store_svc");
pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("metadata_store_svc");
4 changes: 0 additions & 4 deletions crates/metadata-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

extern crate prost_0_12 as prost;
extern crate prost_types_0_12 as prost_types;
extern crate tonic_0_10 as tonic;

mod grpc_svc;
pub mod local;

Expand Down
8 changes: 4 additions & 4 deletions crates/metadata-store/src/local/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@

use async_trait::async_trait;
use bytestring::ByteString;
use tonic_0_10::transport::Channel;
use tonic_0_10::{Code, Status};
use tonic::transport::Channel;
use tonic::{Code, Status};

use restate_core::metadata_store::{
MetadataStore, Precondition, ReadError, VersionedValue, WriteError,
};
use restate_core::network::grpc_util::create_grpc_channel_from_advertised_address;
use restate_types::net::AdvertisedAddress;
use restate_types::Version;

use crate::grpc_svc::metadata_store_svc_client::MetadataStoreSvcClient;
use crate::grpc_svc::{DeleteRequest, GetRequest, PutRequest};
use crate::local::grpc::net_util::create_tonic_channel_from_advertised_address;
use crate::local::grpc::pb_conversions::ConversionError;

/// Client end to interact with the [`LocalMetadataStore`].
Expand All @@ -31,7 +31,7 @@ pub struct LocalMetadataStoreClient {
}
impl LocalMetadataStoreClient {
pub fn new(metadata_store_address: AdvertisedAddress) -> Self {
let channel = create_grpc_channel_from_advertised_address(metadata_store_address)
let channel = create_tonic_channel_from_advertised_address(metadata_store_address)
.expect("should not fail");

Self {
Expand Down
2 changes: 1 addition & 1 deletion crates/metadata-store/src/local/grpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::local::grpc::pb_conversions::ConversionError;
use crate::local::store::{Error, MetadataStoreRequest, RequestSender};
use async_trait::async_trait;
use tokio::sync::oneshot;
use tonic_0_10::{Request, Response, Status};
use tonic::{Request, Response, Status};

/// Grpc svc handler for the [`LocalMetadataStore`].
#[derive(Debug)]
Expand Down
1 change: 1 addition & 0 deletions crates/metadata-store/src/local/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

pub mod client;
pub mod handler;
pub mod net_util;

pub mod pb_conversions {
use crate::grpc_svc;
Expand Down
238 changes: 238 additions & 0 deletions crates/metadata-store/src/local/grpc/net_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use http::Uri;
use hyper::body::{Body, Incoming};
use hyper::rt::{Read, Write};
use hyper_util::rt::TokioIo;
use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskCenter, TaskKind};
use restate_types::errors::GenericError;
use restate_types::net::{AdvertisedAddress, BindAddress};
use std::fmt::Debug;
use std::future::Future;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::Duration;
use tokio::io;
use tokio::net::{TcpListener, UnixListener, UnixStream};
use tokio_util::net::Listener;
use tonic::transport::{Channel, Endpoint};
use tracing::{debug, info, instrument, Span};

pub fn create_tonic_channel_from_advertised_address(
address: AdvertisedAddress,
) -> Result<Channel, http::Error> {
let channel = match address {
AdvertisedAddress::Uds(uds_path) => {
// dummy endpoint required to specify an uds connector, it is not used anywhere
Endpoint::try_from("http://127.0.0.1")
.expect("/ should be a valid Uri")
.connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
let uds_path = uds_path.clone();
async move {
Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?))
}
}))
}
AdvertisedAddress::Http(uri) => {
// todo: Make the channel settings configurable
Channel::builder(uri)
.connect_timeout(Duration::from_secs(5))
// todo: configure the channel from configuration file
.http2_adaptive_window(true)
.connect_lazy()
}
};
Ok(channel)
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed binding to address '{address}': {source}")]
TcpBinding {
address: SocketAddr,
#[source]
source: io::Error,
},
#[error("failed opening uds '{uds_path}': {source}")]
UdsBinding {
uds_path: PathBuf,
#[source]
source: io::Error,
},
#[error("failed handling hyper connection: {0}")]
HandlingConnection(#[from] GenericError),
#[error("failed listening on incoming connections: {0}")]
Listening(#[from] io::Error),
#[error(transparent)]
Shutdown(#[from] ShutdownError),
}

#[instrument(level = "info", skip_all, fields(server_name = %server_name, uds.path = tracing::field::Empty, net.host.addr = tracing::field::Empty, net.host.port = tracing::field::Empty))]
pub async fn run_hyper_server<S, B>(
bind_address: &BindAddress,
service: S,
server_name: &'static str,
) -> Result<(), Error>
where
S: hyper::service::Service<http::Request<Incoming>, Response = hyper::Response<B>>
+ Send
+ Clone
+ 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
S::Future: Send,
B: Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
match bind_address {
BindAddress::Uds(uds_path) => {
let unix_listener = UnixListener::bind(uds_path).map_err(|err| Error::UdsBinding {
uds_path: uds_path.clone(),
source: err,
})?;

Span::current().record("uds.path", uds_path.display().to_string());
info!("Server listening");

run_listener_loop(unix_listener, service, server_name).await?;
}
BindAddress::Socket(socket_addr) => {
let tcp_listener =
TcpListener::bind(socket_addr)
.await
.map_err(|err| Error::TcpBinding {
address: *socket_addr,
source: err,
})?;

let local_addr = tcp_listener.local_addr().map_err(|err| Error::TcpBinding {
address: *socket_addr,
source: err,
})?;

Span::current().record("net.host.addr", local_addr.ip().to_string());
Span::current().record("net.host.port", local_addr.port());
info!("Server listening");

run_listener_loop(tcp_listener, service, server_name).await?;
}
}

debug!("Stopped server");

Ok(())
}

async fn run_listener_loop<L, S, B>(
mut listener: L,
service: S,
server_name: &'static str,
) -> Result<(), Error>
where
L: Listener,
L::Io: Send + Unpin + 'static,
L::Addr: Debug,
S: hyper::service::Service<http::Request<Incoming>, Response = hyper::Response<B>>
+ Send
+ Clone
+ 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
S::Future: Send,
B: Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let mut shutdown = std::pin::pin!(cancellation_watcher());
let tc = task_center();
let executor = TaskCenterExecutor::new(tc.clone(), server_name);
loop {
tokio::select! {
biased;
_ = &mut shutdown => {
break;
}
incoming_connection = listener.accept() => {
let (stream, remote_addr) = incoming_connection?;
let io = TokioIo::new(stream);

debug!("Accepting incoming connection from '{remote_addr:?}'.");

tc.spawn_child(TaskKind::RpcConnection, server_name, None, handle_connection(
io,
service.clone(),
executor.clone(),
))?;
}
}
}

Ok(())
}

async fn handle_connection<S, B, I>(
io: I,
service: S,
executor: TaskCenterExecutor,
) -> anyhow::Result<()>
where
S: hyper::service::Service<http::Request<Incoming>, Response = hyper::Response<B>>
+ Send
+ Clone
+ 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
S::Future: Send,
B: Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
I: Read + Write + Unpin + 'static,
{
let builder = hyper_util::server::conn::auto::Builder::new(executor);
let connection = builder.serve_connection(io, service);

tokio::select! {
res = connection => {
// propagate errors
res.map_err(Error::HandlingConnection)?;
},
_ = cancellation_watcher() => {}
}

Ok(())
}

#[derive(Clone)]
struct TaskCenterExecutor {
task_center: TaskCenter,
name: &'static str,
}

impl TaskCenterExecutor {
fn new(task_center: TaskCenter, name: &'static str) -> Self {
Self { task_center, name }
}
}

impl<F> hyper::rt::Executor<F> for TaskCenterExecutor
where
F: Future + 'static + Send,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
// ignore shutdown error
let _ =
self.task_center
.spawn_child(TaskKind::RpcConnection, self.name, None, async move {
// ignore the future output
let _ = fut.await;
Ok(())
});
}
}
Loading

0 comments on commit b70faa7

Please sign in to comment.