Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cargo clippy and allow lint clippy:to_string_trait_impl #737

Merged
merged 4 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ jobs:
run: |
cd webui
pnpm build
- name: Run Clippy
run: cargo clippy --all-targets --workspace -- -D warnings
- name: Build
run: cargo build --all-features
- name: Test
Expand Down
16 changes: 7 additions & 9 deletions crates/arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,10 @@ pub(crate) async fn get_all_connection_tables(
let vec: Vec<ConnectionTable> = tables
.into_iter()
.map(|t| t.try_into())
.map(|result| {
if let Err(err) = &result {
.inspect(|result| {
if let Err(err) = result {
debug!("Error building connection table: {}", err);
}
result
})
.filter_map(Result::ok)
.collect();
Expand Down Expand Up @@ -414,11 +413,10 @@ pub(crate) async fn get_connection_tables(
let tables: Vec<ConnectionTable> = tables
.into_iter()
.map(|t| t.try_into())
.map(|result| {
if let Err(err) = &result {
.inspect(|result| {
if let Err(err) = result {
debug!("Error building connection table: {}", err);
}
result
})
.filter_map(Result::ok)
.collect();
Expand Down Expand Up @@ -580,10 +578,10 @@ async fn expand_proto_schema(
.into_iter()
.map(|(name, s)| {
if s.schema_type != ConfluentSchemaType::Protobuf {
return Err(bad_request(format!(
Err(bad_request(format!(
"Schema reference {} has type {:?}, but must be protobuf",
name, s.schema_type
)));
)))
} else {
Ok((name, s.schema))
}
Expand Down Expand Up @@ -818,7 +816,7 @@ pub(crate) async fn test_schema(

match schema_def {
SchemaDefinition::JsonSchema(schema) => {
if let Err(e) = json::schema::to_arrow("test", &schema) {
if let Err(e) = json::schema::to_arrow("test", schema) {
Err(bad_request(e.to_string()))
} else {
Ok(())
Expand Down
5 changes: 3 additions & 2 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ async fn register_schemas(compiled_sql: &mut CompiledSql) -> anyhow::Result<()>
Ok(())
}

#[allow(clippy::too_many_arguments)]
pub(crate) async fn create_pipeline_int<'a>(
name: String,
query: String,
Expand Down Expand Up @@ -388,7 +389,7 @@ pub(crate) async fn create_pipeline_int<'a>(
api_queries::fetch_get_pipeline_id(&db.client().await?, &pub_id, &auth.organization_id)
.await
.map_err(log_and_map)?
.get(0)
.first()
.unwrap()
.id;

Expand All @@ -410,7 +411,7 @@ pub(crate) async fn create_pipeline_int<'a>(
checkpoint_interval,
is_preview,
&auth,
&db,
db,
)
.await?;

Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-api/src/rest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ pub enum ApiError {

pub fn map_insert_err(name: &str, error: DbError) -> ErrorResp {
if error == DbError::DuplicateViolation {
return bad_request(format!("{} with that name already exists", name));
bad_request(format!("{} with that name already exists", name))
} else {
error.into()
}
}

pub fn map_delete_err(name: &str, user: &str, error: DbError) -> ErrorResp {
if error == DbError::ForeignKeyViolation {
return bad_request(format!(
bad_request(format!(
"Cannot delete {}; it is still being used by {}",
name, user
));
))
} else {
error.into()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-compiler-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn start_service() -> anyhow::Result<()> {

wrap_start(
"compiler service",
addr.clone(),
addr,
arroyo_server_common::grpc_server()
.add_service(CompilerGrpcServer::new(service))
.serve(addr),
Expand Down
3 changes: 3 additions & 0 deletions crates/arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ name = "arroyo-connectors"
version = "0.12.0-dev"
edition = "2021"

[lints.clippy]
to_string_trait_impl = "allow"

[features]
default = []

Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl Connector for FileSystemConnector {
let message = TestSourceMessage {
error: failed,
done: true,
message: message,
message,
};
tx.send(message).await.unwrap();
});
Expand Down
10 changes: 5 additions & 5 deletions crates/arroyo-connectors/src/filesystem/sink/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ pub(crate) async fn commit_files_to_delta(
return Ok(None);
}

let add_actions = create_add_actions(&finished_files, &relative_table_path)?;
let table_path = build_table_path(&storage_provider, &relative_table_path);
let storage_options = configure_storage_options(&table_path, &storage_provider).await?;
let add_actions = create_add_actions(finished_files, relative_table_path)?;
let table_path = build_table_path(storage_provider, relative_table_path);
let storage_options = configure_storage_options(&table_path, storage_provider).await?;
let mut table = load_or_create_table(&table_path, storage_options, &schema).await?;

if let Some(new_version) = check_existing_files(
&mut table,
last_version,
&finished_files,
&relative_table_path,
finished_files,
relative_table_path,
)
.await?
{
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/filesystem/sink/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl<V: LocalWriter + Send + 'static> TwoPhaseCommitter for LocalFileSystemWrite
);
tokio::fs::rename(tmp_file, destination).await?;
finished_files.push(FinishedFile {
filename: object_store::path::Path::parse(&destination.to_string_lossy())?
filename: object_store::path::Path::parse(destination.to_string_lossy())?
.to_string(),
partition: None,
size: destination.metadata()?.len() as usize,
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/impulse/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl ImpulseSourceFunc {
return 8192;
}
let batch_size = Duration::from_millis(100).as_micros() / duration_micros;
batch_size.max(1).min(8192) as usize
batch_size.clamp(1, 8192) as usize
}

fn delay(&self, ctx: &mut ArrowContext) -> Duration {
Expand Down
3 changes: 2 additions & 1 deletion crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import_types!(
{type = "string", format = "var-str"} = VarStr
}
);

import_types!(schema = "src/kafka/table.json");

impl KafkaTable {
Expand Down Expand Up @@ -421,7 +422,7 @@ impl KafkaTester {
client_config
.set(
"bootstrap.servers",
&self.connection.bootstrap_servers.to_string(),
self.connection.bootstrap_servers.to_string(),
)
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
Expand Down
12 changes: 5 additions & 7 deletions crates/arroyo-connectors/src/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl KafkaSinkFunc {
rec = rec.timestamp(ts);
}
if let Some(k) = k.as_ref() {
rec = rec.key(&k);
rec = rec.key(k);
}

rec.payload(&v)
Expand Down Expand Up @@ -260,15 +260,13 @@ impl ArrowOperator for KafkaSinkFunc {

for (i, v) in values.enumerate() {
// kafka timestamp as unix millis
let timestamp = if let Some(ts) = timestamps {
Some(if ts.is_null(i) {
let timestamp = timestamps.map(|ts| {
if ts.is_null(i) {
0
} else {
ts.value(i) / 1_000_000
})
} else {
None
};
}
});
// TODO: this copy should be unnecessary but likely needs a custom trait impl
let key = keys.map(|k| k.value(i).as_bytes().to_vec());
self.publish(timestamp, key, v, ctx).await;
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import_types!(
{type = "string", format = "var-str"} = VarStr
}
);

import_types!(schema = "src/mqtt/table.json");
pub struct MqttConnector {}

Expand Down
38 changes: 19 additions & 19 deletions crates/arroyo-connectors/src/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import_types!(
{type = "string", format = "var-str"} = VarStr
}
);

import_types!(schema = "src/nats/table.json");

#[derive(Clone, Debug, Encode, Decode, PartialEq, PartialOrd)]
Expand All @@ -48,7 +49,7 @@ impl NatsConnector {
) -> anyhow::Result<NatsConfig> {
let nats_servers = VarStr::new(pull_opt("servers", options)?);
let nats_auth = options.remove("auth.type");
let nats_auth: NatsConfigAuthentication = match nats_auth.as_ref().map(|t| t.as_str()) {
let nats_auth: NatsConfigAuthentication = match nats_auth.as_deref() {
Some("none") | None => NatsConfigAuthentication::None {},
Some("credentials") => NatsConfigAuthentication::Credentials {
username: VarStr::new(pull_opt("auth.username", options)?),
Expand Down Expand Up @@ -78,71 +79,70 @@ impl NatsConnector {
.remove("consumer.ack_policy")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| AcknowledgmentPolicy::Explicit),
.unwrap_or(AcknowledgmentPolicy::Explicit),
replay_policy: options
.remove("consumer.replay_policy")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| ReplayPolicy::Instant),
.unwrap_or(ReplayPolicy::Instant),
ack_wait: options
.remove("consumer.ack_wait")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 30),
filter_subjects: options.remove("consumer.filter_subjects").map_or_else(
|| Vec::new(),
|s| s.split(',').map(String::from).collect(),
),
.unwrap_or(30),
filter_subjects: options
.remove("consumer.filter_subjects")
.map_or_else(Vec::new, |s| s.split(',').map(String::from).collect()),
sample_frequency: options
.remove("consumer.sample_frequency")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 0),
.unwrap_or(0),
num_replicas: options
.remove("consumer.num_replicas")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 1),
.unwrap_or(1),
inactive_threshold: options
.remove("consumer.inactive_threshold")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 600),
.unwrap_or(600),
rate_limit: options
.remove("consumer.rate_limit")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| -1),
.unwrap_or(-1),
max_ack_pending: options
.remove("consumer.max_ack_pending")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| -1),
.unwrap_or(-1),
max_deliver: options
.remove("consumer.max_deliver")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| -1),
.unwrap_or(-1),
max_waiting: options
.remove("consumer.max_waiting")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 1000000),
.unwrap_or(1000000),
max_batch: options
.remove("consumer.max_batch")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 10000),
.unwrap_or(10000),
max_bytes: options
.remove("consumer.max_bytes")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 104857600),
.unwrap_or(104857600),
max_expires: options
.remove("consumer.max_expires")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 300000),
.unwrap_or(300000),
}),
(None, Some(subject)) => Some(SourceType::Core { subject }),
(Some(_), Some(_)) => bail!("Exactly one of `stream` or `subject` must be set"),
Expand Down Expand Up @@ -325,7 +325,7 @@ impl Connector for NatsConnector {

let table = Self::table_from_options(options)?;

Self::from_config(&self, None, name, connection, table, schema)
Self::from_config(self, None, name, connection, table, schema)
}

fn make_operator(
Expand Down
6 changes: 2 additions & 4 deletions crates/arroyo-connectors/src/nats/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ impl ArrowOperator for NatsSinkFunc {
}

async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut ArrowContext) {
let s = match &self.sink_type {
SinkType::Subject(s) => s,
};
let SinkType::Subject(s) = &self.sink_type;
let nats_subject = async_nats::Subject::from(s.clone());
for msg in self.serializer.serialize(&batch) {
let publisher = self
Expand All @@ -97,7 +95,7 @@ impl ArrowOperator for NatsSinkFunc {
})
.await
.expect("Something went wrong, data will never be received.");
panic!("Panicked while processing element: {}", e.to_string());
panic!("Panicked while processing element: {}", e);
}
}
}
Expand Down
Loading
Loading