Skip to content

Commit

Permalink
chore(deps): Bump Rust version to 1.80
Browse files Browse the repository at this point in the history
  • Loading branch information
bruceg committed Jul 26, 2024
1 parent e89661c commit 7bc9dd7
Show file tree
Hide file tree
Showing 33 changed files with 72 additions and 59 deletions.
1 change: 1 addition & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ centiseconds
cernan
cfactor
CFFI
cfgs
CGP
cgroups
chans
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ debug = false # Do not include debug symbols in the executable.
[profile.bench]
debug = true

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

[package.metadata.deb]
name = "vector"
section = "admin"
Expand Down
2 changes: 1 addition & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ load('ext://helm_resource', 'helm_resource', 'helm_repo')
docker_build(
ref='timberio/vector',
context='.',
build_args={'RUST_VERSION': '1.79.0'},
build_args={'RUST_VERSION': '1.80.0'},
dockerfile='tilt/Dockerfile'
)

Expand Down
2 changes: 1 addition & 1 deletion lib/vector-api-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! 2. A GraphQL subscription client, for long-lived, multiplexed subscriptions over WebSockets
//! 3. GraphQL queries/mutations/subscriptions, defined in `graphql/**/*.graphql` files
//! 4. Extension methods for each client, for executing queries/subscriptions, and returning
//! deserialized JSON responses
//! deserialized JSON responses
//!

#![deny(warnings)]
Expand Down
3 changes: 3 additions & 0 deletions lib/vector-buffers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ authors = ["Vector Contributors <vector@datadoghq.com>"]
edition = "2021"
publish = false

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

[dependencies]
async-recursion = "1.1.1"
async-stream = "0.3.5"
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/benches/sized_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl DataDir {

fn next(&mut self) -> PathGuard {
let mut nxt = self.base.clone();
nxt.push(&self.index.to_string());
nxt.push(self.index.to_string());
self.index += 1;
std::fs::create_dir_all(&nxt).expect("could not make next dir");

Expand Down
3 changes: 3 additions & 0 deletions lib/vector-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ authors = ["Vector Contributors <vector@datadoghq.com>"]
edition = "2021"
publish = false

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(ddsketch_extended)'] }

[dependencies]
async-graphql = { version = "7.0.7", default-features = false, features = ["playground" ], optional = true }
async-trait = { version = "0.1", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions lib/vector-core/src/event/discriminant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ mod tests {

#[test]
fn with_hash_map() {
#[allow(clippy::mutable_key_type)]
let mut map: HashMap<Discriminant, usize> = HashMap::new();

let event_stream_1 = {
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ dyn_clone::clone_trait_object!(FunctionTransform);
/// # Invariants
///
/// * It is an illegal invariant to implement `FunctionTransform` for a
/// `TaskTransform` or vice versa.
/// `TaskTransform` or vice versa.
pub trait TaskTransform<T: EventContainer + 'static>: Send + 'static {
fn transform(
self: Box<Self>,
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.79"
channel = "1.80"
profile = "default"
2 changes: 1 addition & 1 deletion src/config/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl TransformContext {
}
}

#[cfg(any(test, feature = "test"))]
#[cfg(test)]
pub fn new_test(
schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion src/config/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::Error;
/// But, config and topology reload logic can handle:
/// - Invalid config, caused either by user or by data race.
/// - Frequent changes, caused by user/editor modifying/saving file in small chunks.
/// so we can use smaller, more responsive delay.
/// so we can use smaller, more responsive delay.
#[cfg(unix)]
const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1);

Expand Down
2 changes: 1 addition & 1 deletion src/convert_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ mod tests {
let extension = path.extension().unwrap().to_str().unwrap();
if extension == Format::Yaml.to_string() {
// Note that here we read the converted string directly.
let converted_config = fs::read_to_string(&output_dir.join(&path)).unwrap();
let converted_config = fs::read_to_string(output_dir.join(&path)).unwrap();
assert_eq!(converted_config, original_config);
count += 1;
}
Expand Down
8 changes: 4 additions & 4 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,9 @@ impl Default for KeepaliveConfig {
///
/// **Notes:**
/// - This is intended to be used in a Hyper server (or similar) that will automatically close
/// the connection after a response with a `Connection: close` header is sent.
/// the connection after a response with a `Connection: close` header is sent.
/// - This layer assumes that it is instantiated once per connection, which is true within the
/// Hyper framework.
/// Hyper framework.

pub struct MaxConnectionAgeLayer {
start_reference: Instant,
Expand Down Expand Up @@ -496,9 +496,9 @@ where
///
/// **Notes:**
/// - This is intended to be used in a Hyper server (or similar) that will automatically close
/// the connection after a response with a `Connection: close` header is sent.
/// the connection after a response with a `Connection: close` header is sent.
/// - This service assumes that it is instantiated once per connection, which is true within the
/// Hyper framework.
/// Hyper framework.
#[derive(Clone)]
pub struct MaxConnectionAgeService<S> {
service: S,
Expand Down
1 change: 0 additions & 1 deletion src/internal_events/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
feature = "sources-apache_metrics",
feature = "sources-aws_ecs_metrics",
feature = "sources-aws_kinesis_firehose",
feature = "sources-http-client",
feature = "sources-utils-http",
))]
pub(crate) fn http_error_code(code: u16) -> String {
Expand Down
2 changes: 1 addition & 1 deletion src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl KafkaAuthConfig {
if let Some(verify_certificate) = &tls.options.verify_certificate {
client.set(
"enable.ssl.certificate.verification",
&verify_certificate.to_string(),
verify_certificate.to_string(),
);
}

Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_kinesis/firehose/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn firehose_put_records_without_partition_key() {
#[allow(clippy::needless_collect)] // https://github.com/rust-lang/rust-clippy/issues/6909
let input = input
.into_iter()
.map(|rec| serde_json::to_value(&rec.into_log()).unwrap())
.map(|rec| serde_json::to_value(rec.into_log()).unwrap())
.collect::<Vec<_>>();
for hit in hits {
let hit = hit
Expand Down Expand Up @@ -235,7 +235,7 @@ async fn firehose_put_records_with_partition_key() {
#[allow(clippy::needless_collect)] // https://github.com/rust-lang/rust-clippy/issues/6909
let input = input
.into_iter()
.map(|rec| serde_json::to_value(&rec.into_log()).unwrap())
.map(|rec| serde_json::to_value(rec.into_log()).unwrap())
.collect::<Vec<_>>();
for hit in hits {
let hit = hit
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/elasticsearch/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ impl Encoder<Vec<ProcessedEvent>> for ElasticsearchEncoder {
written_bytes +=
as_tracked_write::<_, _, io::Error>(writer, &log, |mut writer, log| {
writer.write_all(&[b'\n'])?;
// False positive clippy hit on the following line. Clippy wants us to skip the
// borrow, but then the value is moved for the following line.
#[allow(clippy::needless_borrows_for_generic_args)]
serde_json::to_writer(&mut writer, log)?;
writer.write_all(&[b'\n'])?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/elasticsearch/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ async fn run_insert_tests_with_config(
// https://github.com/rust-lang/rust-clippy/issues/6909
let input = input
.into_iter()
.map(|rec| serde_json::to_value(&rec.into_log()).unwrap())
.map(|rec| serde_json::to_value(rec.into_log()).unwrap())
.collect::<Vec<_>>();

for hit in hits {
Expand Down
8 changes: 4 additions & 4 deletions src/sinks/greptimedb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
//! This sink transforms metrics into GreptimeDB table using following rules:
//!
//! - Table name: `{namespace}_{metric_name}`. If the metric doesn't have a
//! namespace, we will use metric_name for table name.
//! namespace, we will use metric_name for table name.
//! - Timestamp: timestamp is stored as a column called `ts`.
//! - Tags: metric tags are stored as string columns with its name as column
//! name
//! name
//! - Counter and Gauge: the value of counter and gauge are stored in a column
//! called `val`
//! called `val`
//! - Set: the number of set items is stored in a column called `val`.
//! - Distribution, Histogram and Summary, Sketch: Statistical attributes like
//! `sum`, `count`, "max", "min", quantiles and buckets are stored as columns.
//! `sum`, `count`, "max", "min", quantiles and buckets are stored as columns.
//!
use vector_lib::sensitive_string::SensitiveString;

Expand Down
12 changes: 6 additions & 6 deletions src/sinks/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl KafkaSinkConfig {
.set("bootstrap.servers", &self.bootstrap_servers)
.set(
"socket.timeout.ms",
&self.socket_timeout_ms.as_millis().to_string(),
self.socket_timeout_ms.as_millis().to_string(),
)
.set("statistics.interval.ms", "1000");

Expand All @@ -166,10 +166,10 @@ impl KafkaSinkConfig {
// All batch options are producer only.
if kafka_role == KafkaRole::Producer {
client_config
.set("compression.codec", &to_string(self.compression))
.set("compression.codec", to_string(self.compression))
.set(
"message.timeout.ms",
&self.message_timeout_ms.as_millis().to_string(),
self.message_timeout_ms.as_millis().to_string(),
);

if let Some(value) = self.batch.timeout_secs {
Expand All @@ -190,7 +190,7 @@ impl KafkaSinkConfig {
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &((value * 1000.0).round().to_string()));
client_config.set(key, (value * 1000.0).round().to_string());
}
if let Some(value) = self.batch.max_events {
// Maximum number of messages batched in one MessageSet. The total MessageSet size is
Expand All @@ -208,7 +208,7 @@ impl KafkaSinkConfig {
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
client_config.set(key, value.to_string());
}
if let Some(value) = self.batch.max_bytes {
// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
Expand All @@ -229,7 +229,7 @@ impl KafkaSinkConfig {
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
client_config.set(key, value.to_string());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/kafka/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ mod integration_test {
// read back everything from the beginning
let mut client_config = rdkafka::ClientConfig::new();
client_config.set("bootstrap.servers", server.as_str());
client_config.set("group.id", &random_string(10));
client_config.set("group.id", random_string(10));
client_config.set("enable.partition.eof", "true");
kafka_auth.apply(&mut client_config).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod aws_cloudwatch_metrics;
pub mod aws_kinesis;
#[cfg(feature = "sinks-aws_s3")]
pub mod aws_s3;
#[cfg(any(feature = "sinks-aws_sqs", feature = "sinks-sinks-aws_sns"))]
#[cfg(feature = "sinks-aws_sqs")]
pub mod aws_s_s;
#[cfg(feature = "sinks-axiom")]
pub mod axiom;
Expand Down
6 changes: 4 additions & 2 deletions src/sinks/prometheus/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,9 +943,11 @@ mod tests {
}

/// According to the [spec](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md?plain=1#L115)
///
/// > Label names MUST be unique within a LabelSet.
/// Prometheus itself will reject the metric with an error. Largely to remain backward compatible with older versions of Vector,
/// we only publish the last tag in the list.
///
/// Prometheus itself will reject the metric with an error. Largely to remain backward
/// compatible with older versions of Vector, we only publish the last tag in the list.
#[test]
fn encodes_duplicate_tags() {
let tags = metric_tags!(
Expand Down
24 changes: 12 additions & 12 deletions src/sources/docker_logs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ fn exclude_self() {

#[cfg(all(test, feature = "docker-logs-integration-tests"))]
mod integration_tests {
use bollard::{
container::{
Config as ContainerConfig, CreateContainerOptions, KillContainerOptions,
RemoveContainerOptions, StartContainerOptions, WaitContainerOptions,
},
image::{CreateImageOptions, ListImagesOptions},
};
use futures::{stream::TryStreamExt, FutureExt};
use itertools::Itertools as _;
use similar_asserts::assert_eq;
use vrl::value;

use crate::sources::docker_logs::*;
use crate::sources::docker_logs::{CONTAINER, CREATED_AT, IMAGE, NAME};
use crate::{
Expand All @@ -36,16 +48,6 @@ mod integration_tests {
},
SourceSender,
};
use bollard::{
container::{
Config as ContainerConfig, CreateContainerOptions, KillContainerOptions,
RemoveContainerOptions, StartContainerOptions, WaitContainerOptions,
},
image::{CreateImageOptions, ListImagesOptions},
};
use futures::{stream::TryStreamExt, FutureExt};
use similar_asserts::assert_eq;
use vrl::value;

/// None if docker is not present on the system
async fn source_with<'a, L: Into<Option<&'a str>>>(
Expand Down Expand Up @@ -869,7 +871,6 @@ mod integration_tests {
let command = emitted_messages
.into_iter()
.map(|message| format!("echo {:?}", message))
.collect::<Box<_>>()
.join(" && ");

let id = cmd_container(name, None, vec!["sh", "-c", &command], &docker, false).await;
Expand Down Expand Up @@ -940,7 +941,6 @@ mod integration_tests {
let command = emitted_messages
.into_iter()
.map(|message| format!("echo {:?}", message))
.collect::<Box<_>>()
.join(" && ");

let id = cmd_container(name, None, vec!["sh", "-c", &command], &docker, false).await;
Expand Down
2 changes: 1 addition & 1 deletion src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ impl Decoder for FluentDecoder {
src.advance(byte_size);

let maybe_item = self.handle_message(res, byte_size).map_err(|error| {
let base64_encoded_message = BASE64_STANDARD.encode(&src);
let base64_encoded_message = BASE64_STANDARD.encode(&src[..]);
emit!(FluentMessageDecodeError {
error: &error,
base64_encoded_message
Expand Down
8 changes: 4 additions & 4 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,21 +1202,21 @@ fn create_consumer(
.set("auto.offset.reset", &config.auto_offset_reset)
.set(
"session.timeout.ms",
&config.session_timeout_ms.as_millis().to_string(),
config.session_timeout_ms.as_millis().to_string(),
)
.set(
"socket.timeout.ms",
&config.socket_timeout_ms.as_millis().to_string(),
config.socket_timeout_ms.as_millis().to_string(),
)
.set(
"fetch.wait.max.ms",
&config.fetch_wait_max_ms.as_millis().to_string(),
config.fetch_wait_max_ms.as_millis().to_string(),
)
.set("enable.partition.eof", "false")
.set("enable.auto.commit", "true")
.set(
"auto.commit.interval.ms",
&config.commit_interval_ms.as_millis().to_string(),
config.commit_interval_ms.as_millis().to_string(),
)
.set("enable.auto.offset.store", "false")
.set("statistics.interval.ms", "1000")
Expand Down
4 changes: 2 additions & 2 deletions src/sources/util/framestream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ async fn handle_stream(
frame_handler.insert_tls_client_metadata(certificate_metadata);

let span = info_span!("connection");
span.record("peer_addr", &field::debug(&peer_addr));
span.record("peer_addr", field::debug(&peer_addr));
let received_from: Option<Bytes> = Some(peer_addr.to_string().into());
let active_parsing_task_nums = Arc::new(AtomicU32::new(0));

Expand Down Expand Up @@ -688,7 +688,7 @@ pub fn build_framestream_unix_source(
let span = info_span!("connection");
let path = if let Some(addr) = peer_addr {
if let Some(path) = addr.as_pathname().map(|e| e.to_owned()) {
span.record("peer_path", &field::debug(&path));
span.record("peer_path", field::debug(&path));
Some(path)
} else {
None
Expand Down
Loading

0 comments on commit 7bc9dd7

Please sign in to comment.