Skip to content

Commit

Permalink
[Housekeeping] Upgrade toolchain and deps
Browse files Browse the repository at this point in the history
- Upgrade rocksdb
- Upgrade Rust toolchain to 1.79.0
- Remove unused functions that new clippy complains about (including get_next_message in shuffle that we stopped using recently)
  • Loading branch information
AhmedSoliman committed Jun 20, 2024
1 parent d654872 commit 2d91b67
Show file tree
Hide file tree
Showing 22 changed files with 172 additions and 304 deletions.
238 changes: 126 additions & 112 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 10 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ assert2 = "0.3.11"
async-channel = "2.1.1"
async-trait = "0.1.73"
axum = { version = "0.6.18", default-features = false }
base64 = "0.21"
base64 = "0.22"
bytes = { version = "1.3", features = ["serde"] }
bytes-utils = "0.1.3"
bytestring = { version = "1.2", features = ["serde"] }
Expand All @@ -92,8 +92,8 @@ crossterm = { version = "0.27.0" }
dashmap = { version = "5.5.3" }
datafusion = { version = "35.0.0" }
datafusion-expr = { version = "35.0.0" }
derive-getters = { version = "0.3.0" }
derive_builder = "0.12.0"
derive-getters = { version = "0.4.0" }
derive_builder = "0.20.0"
derive_more = { version = "0.99.17" }
dialoguer = { version = "0.11.0" }
enum-map = { version = "2.7.3" }
Expand All @@ -108,9 +108,9 @@ http = "0.2.9"
humantime = "2.1.0"
hyper = { version = "0.14.24", default-features = false }
hyper-rustls = { version = "0.24.1", features = ["http2"] }
itertools = "0.11.0"
metrics = { version = "0.22" }
metrics-exporter-prometheus = { version = "0.14", default-features = false, features = ["async-runtime"] }
itertools = "0.13.0"
metrics = { version = "0.23" }
metrics-exporter-prometheus = { version = "0.15", default-features = false, features = ["async-runtime"] }
once_cell = "1.18"
opentelemetry = { version = "0.22.0" }
opentelemetry-http = { version = "0.11.1" }
Expand All @@ -124,20 +124,21 @@ prost-dto = { version = "0.0.2" }
prost-types = "0.12.1"
rand = "0.8.5"
rayon = { version = "1.10" }
regress = { version = "0.10" }
rlimit = { version = "0.10.1" }
rocksdb = { version = "0.22.0", features = ["multi-threaded-cf"], git = "https://github.com/restatedev/rust-rocksdb", rev="64a3c698910380e4fcbd8e56ce459779932cf1ff" }
rocksdb = { version = "0.22.0", features = ["multi-threaded-cf"], git = "https://github.com/restatedev/rust-rocksdb", rev="c7ccbbcd261bdec011c4976c441676512a1a4841" }
rustls = "0.21.6"
schemars = { version = "0.8", features = ["bytes", "enumset"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_with = "2.2"
serde_with = "3.8"
serde_yaml = "0.9"
sha2 = "0.10.8"
smartstring = { version = "1.0.1" }
static_assertions = { version = "1.1.0" }
strum = { version = "0.26.1" }
strum_macros = { version = "0.26.1" }
sync_wrapper = "0.1.2"
sync_wrapper = "1.0.1"
smallvec = { version = "1.13.2", features = ["serde"] }
tempfile = "3.6.0"
test-log = { version = "0.2.11", default-features = false, features = ["trace"] }
Expand Down
3 changes: 3 additions & 0 deletions cli/src/clients/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub use self::client::CloudClient;
pub use self::interface::CloudClientInterface;

pub mod generated {
#![allow(clippy::to_string_trait_impl)]

use serde::{Deserialize, Serialize};

typify::import_types!(schema = "src/clients/cloud/schema.json");
}
13 changes: 0 additions & 13 deletions cli/src/clients/datafusion_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,6 @@ pub async fn find_active_invocations_simple(
Ok(rows)
}

#[derive(Debug, Clone)]
pub struct InvocationDetailed {
pub invocation: Invocation,
pub journal: Vec<JournalEntry>,
}

#[derive(Debug, Clone)]
pub struct Invocation {
pub id: String,
Expand All @@ -296,7 +290,6 @@ pub struct Invocation {
// If it **requires** this deployment.
pub pinned_deployment_id: Option<String>,
pub pinned_deployment_exists: bool,
pub deployment_id_at_latest_svc_revision: String,
// Last attempted deployment
pub last_attempt_deployment_id: Option<String>,
pub last_attempt_server: Option<String>,
Expand Down Expand Up @@ -557,7 +550,6 @@ pub struct ServiceHandlerLockedKeysMap {
#[derive(Clone, Default, Debug)]
pub struct LockedKeyInfo {
pub num_pending: i64,
pub oldest_pending: Option<chrono::DateTime<Local>>,
// Who is holding the lock
pub invocation_holding_lock: Option<String>,
pub invocation_method_holding_lock: Option<String>,
Expand Down Expand Up @@ -631,11 +623,9 @@ pub async fn get_locked_keys_status(
let service = batch.column(0).as_string::<i32>().value(i);
let key = value_as_string(&batch, 1, i);
let num_pending = value_as_i64(&batch, 2, i);
let oldest_pending = value_as_dt_opt(&batch, 3, i);

let info = LockedKeyInfo {
num_pending,
oldest_pending,
..LockedKeyInfo::default()
};
key_map.insert(service, key, info);
Expand Down Expand Up @@ -854,9 +844,6 @@ pub async fn find_active_invocations(
next_retry_at: row.next_retry_at.map(Into::into),
pinned_deployment_id: row.pinned_deployment_id,
pinned_deployment_exists: row.known_deployment_id.is_some(),
deployment_id_at_latest_svc_revision: row
.comp_latest_deployment
.expect("comp_latest_deployment"),
last_failure_message: row.last_failure,
last_failure_entry_index: row.last_failure_related_entry_index,
last_failure_entry_name: row.last_failure_related_entry_name,
Expand Down
10 changes: 5 additions & 5 deletions crates/codederror/derive/src/attr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ impl<'a> Attrs<'a> {
}
}

#[derive(Copy, Clone)]
pub struct Error<'a> {
pub original: &'a Attribute,
pub is_transparent: bool,
}
// #[derive(Copy, Clone)]
// pub struct Error<'a> {
// pub original: &'a Attribute,
// pub is_transparent: bool,
// }

#[derive(Clone)]
pub struct Code<'a> {
Expand Down
4 changes: 2 additions & 2 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ humantime = { workspace = true }
hyper = { workspace = true, features = ["full"] }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
metrics-tracing-context = { version = "0.15.0" }
metrics-util = { version = "0.16.0" }
metrics-tracing-context = { version = "0.16.0" }
metrics-util = { version = "0.17.0" }
once_cell = { workspace = true }
prost-types = { workspace = true }
rocksdb = { workspace = true }
Expand Down
7 changes: 6 additions & 1 deletion crates/node/src/cluster_marker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ fn validate_and_update_cluster_marker_inner(
// write the new cluster marker file
let new_cluster_marker_file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(tmp_cluster_marker_filepath.as_path())
.map_err(ClusterValidationError::CreateFile)?;
Expand Down Expand Up @@ -247,7 +248,11 @@ mod tests {
cluster_marker: &ClusterMarker,
path: impl AsRef<Path>,
) -> anyhow::Result<()> {
let file = OpenOptions::new().create(true).write(true).open(path)?;
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(path)?;
serde_json::to_writer(&file, cluster_marker)?;
Ok(())
}
Expand Down
9 changes: 3 additions & 6 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,9 @@ impl Node {

let server = NetworkServer::new(
networking.connection_manager(),
worker_role.as_ref().map(|worker| {
WorkerDependencies::new(
worker.storage_query_context().clone(),
worker.subscription_controller(),
)
}),
worker_role
.as_ref()
.map(|worker| WorkerDependencies::new(worker.storage_query_context().clone())),
admin_role.as_ref().map(|cluster_controller| {
AdminDependencies::new(
cluster_controller.cluster_controller_handle(),
Expand Down
3 changes: 2 additions & 1 deletion crates/node/src/network_server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_tracing_context::TracingContextLayer;
use metrics_util::{layers::Layer, MetricKindMask};
use metrics_util::layers::Layer;
use metrics_util::MetricKindMask;

use restate_types::config::CommonOptions;

Expand Down
12 changes: 2 additions & 10 deletions crates/node/src/network_server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use restate_core::{cancellation_watcher, task_center};
use restate_metadata_store::MetadataStoreClient;
use restate_storage_query_datafusion::context::QueryContext;
use restate_types::config::CommonOptions;
use restate_worker::SubscriptionControllerHandle;

use crate::network_server::handler;
use crate::network_server::handler::cluster_ctrl::ClusterCtrlSvcHandler;
Expand Down Expand Up @@ -132,18 +131,11 @@ async fn handler_404() -> (http::StatusCode, &'static str) {

pub struct WorkerDependencies {
pub query_context: QueryContext,
pub subscription_controller: Option<SubscriptionControllerHandle>,
}

impl WorkerDependencies {
pub fn new(
query_context: QueryContext,
subscription_controller: Option<SubscriptionControllerHandle>,
) -> Self {
WorkerDependencies {
query_context,
subscription_controller,
}
pub fn new(query_context: QueryContext) -> Self {
WorkerDependencies { query_context }
}
}

Expand Down
6 changes: 1 addition & 5 deletions crates/node/src/roles/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use restate_types::schema::subscriptions::SubscriptionResolver;
use restate_types::schema::UpdateableSchema;
use restate_types::Version;
use restate_worker::SubscriptionController;
use restate_worker::{SubscriptionControllerHandle, Worker};
use restate_worker::Worker;

#[derive(Debug, thiserror::Error, CodedError)]
pub enum WorkerRoleError {
Expand Down Expand Up @@ -89,10 +89,6 @@ impl WorkerRole {
self.worker.storage_query_context()
}

pub fn subscription_controller(&self) -> Option<SubscriptionControllerHandle> {
Some(self.worker.subscription_controller_handle())
}

pub async fn start(self) -> anyhow::Result<()> {
let tc = task_center();
// todo: only run subscriptions on node 0 once being distributed
Expand Down
2 changes: 1 addition & 1 deletion crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl PartitionStoreManager {

pub async fn has_partition(&self, partition_id: PartitionId) -> bool {
let guard = self.lookup.lock().await;
guard.live.get(&partition_id).is_some()
guard.live.contains_key(&partition_id)
}

pub async fn get_partition_store(&self, partition_id: PartitionId) -> Option<PartitionStore> {
Expand Down
2 changes: 1 addition & 1 deletion crates/service-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ itertools = { workspace = true }
once_cell = { workspace = true }
paste = { workspace = true, optional = true }
prost = { workspace = true }
regress = { version = "0.9", optional = true }
regress = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
size = { version = "0.4.1", optional = true }
Expand Down
1 change: 1 addition & 0 deletions crates/storage-query-datafusion/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl SelectPartitions for MockPartitionSelector {
}
}

#[allow(dead_code)]
pub(crate) struct MockQueryEngine(PartitionStoreManager, PartitionStore, QueryContext);

impl MockQueryEngine {
Expand Down
4 changes: 2 additions & 2 deletions crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ prost = { workspace = true }
prost-dto = { workspace = true }
prost-types = { workspace = true }
rand = { workspace = true }
regress = { version = "0.9" }
regress = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true, features = ["rc"] }
serde_json = { workspace = true }
Expand Down Expand Up @@ -75,5 +75,5 @@ prettyplease = "0.2"
schemars = { workspace = true }
serde_json = { workspace = true }
syn = "2.0"
typify = { version = "0.0.16" }
typify = { version = "0.1.0" }
jsonptr = "0.4.7"
1 change: 1 addition & 0 deletions crates/types/src/endpoint_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

mod generated {
#![allow(clippy::clone_on_copy)]
#![allow(clippy::to_string_trait_impl)]

include!(concat!(env!("OUT_DIR"), "/endpoint_manifest.rs"));
}
Expand Down
40 changes: 1 addition & 39 deletions crates/worker/src/partition/shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use restate_core::cancellation_watcher;
use restate_storage_api::deduplication_table::DedupInformation;
use restate_storage_api::outbox_table::OutboxMessage;
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey};
use restate_types::message::{AckKind, MessageIndex};
use restate_types::message::MessageIndex;
use restate_types::NodeId;
use restate_wal_protocol::{append_envelope_to_bifrost, Destination, Envelope, Header, Source};

Expand Down Expand Up @@ -54,9 +54,6 @@ impl OutboxTruncation {
}
}

#[derive(Debug, Clone)]
pub(crate) struct ShuffleInput(pub(crate) AckKind);

pub(crate) fn wrap_outbox_message_in_envelope(
message: OutboxMessage,
seq_number: MessageIndex,
Expand Down Expand Up @@ -101,11 +98,6 @@ pub(super) trait OutboxReader {
&mut self,
next_sequence_number: MessageIndex,
) -> impl Future<Output = Result<Option<(MessageIndex, OutboxMessage)>, OutboxReaderError>> + Send;

fn get_message(
&mut self,
next_sequence_number: MessageIndex,
) -> impl Future<Output = Result<Option<OutboxMessage>, OutboxReaderError>> + Send;
}

/// The hint sender allows to send hints to the shuffle service. If more hints are sent than the
Expand Down Expand Up @@ -516,20 +508,6 @@ mod tests {
)
}))
}

async fn get_message(
&mut self,
next_sequence_number: MessageIndex,
) -> Result<Option<OutboxMessage>, OutboxReaderError> {
Ok(self
.subslice_from_index(next_sequence_number)
.first()
.and_then(|x| {
x.clone().map(|service_invocation| {
OutboxMessage::ServiceInvocation(service_invocation)
})
}))
}
}

/// Outbox reader which is used to let the shuffler fail in a controlled manner so that we
Expand Down Expand Up @@ -580,22 +558,6 @@ mod tests {
)
}))
}

async fn get_message(
&mut self,
next_sequence_number: MessageIndex,
) -> Result<Option<OutboxMessage>, OutboxReaderError> {
self.check_fail(next_sequence_number)?;

Ok(self
.records
.get(next_sequence_number as usize)
.and_then(|msg| {
msg.clone().map(|service_invocation| {
OutboxMessage::ServiceInvocation(service_invocation)
})
}))
}
}

async fn collect_invoke_commands_until(
Expand Down
12 changes: 0 additions & 12 deletions crates/worker/src/partition/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,18 +727,6 @@ where

Ok(result)
}

async fn get_message(
&mut self,
sequence_number: MessageIndex,
) -> Result<Option<OutboxMessage>, OutboxReaderError> {
let partition_id = self.partition_id;

self.storage
.get_outbox_message(partition_id, sequence_number)
.await
.map_err(OutboxReaderError::Storage)
}
}

impl<Storage> TimerReader<TimerKeyValue> for PartitionStorage<Storage>
Expand Down
Loading

0 comments on commit 2d91b67

Please sign in to comment.