Skip to content

Commit 7f37402

Browse files
committed
lint related changes
1 parent a73278f commit 7f37402

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+236
-267
lines changed

src/alerts/alert_structs.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,11 @@ pub struct BasicAlertFields {
7575
pub severity: Severity,
7676
}
7777

78+
pub type AlertMap = HashMap<Ulid, Box<dyn AlertTrait>>;
79+
7880
#[derive(Debug)]
7981
pub struct Alerts {
80-
pub alerts: RwLock<HashMap<String, HashMap<Ulid, Box<dyn AlertTrait>>>>,
82+
pub alerts: RwLock<HashMap<String, AlertMap>>,
8183
pub sender: mpsc::Sender<AlertTask>,
8284
}
8385

src/alerts/alert_types.rs

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use std::{str::FromStr, time::Duration};
2020

21+
use actix_web::http::header::{HeaderMap, HeaderName, HeaderValue};
2122
use chrono::{DateTime, Utc};
2223
use serde_json::Value;
2324
use tonic::async_trait;
@@ -39,11 +40,9 @@ use crate::{
3940
metastore::metastore_traits::MetastoreObject,
4041
parseable::PARSEABLE,
4142
query::resolve_stream_names,
42-
rbac::{
43-
map::{SessionKey, roles, users},
44-
role::model::DefaultPrivilege,
45-
},
43+
rbac::map::SessionKey,
4644
storage::object_storage::alert_json_path,
45+
tenants::TENANT_METADATA,
4746
utils::user_auth_for_query,
4847
};
4948

@@ -88,34 +87,15 @@ impl MetastoreObject for ThresholdAlert {
8887
impl AlertTrait for ThresholdAlert {
8988
async fn eval_alert(&self) -> Result<Option<String>, AlertError> {
9089
let time_range = extract_time_range(&self.eval_config)?;
91-
let auth = if let Some(tenant) = &self.tenant_id
92-
&& let Some(tenant_users) = users().get(tenant)
93-
&& let Some(tenant_roles) = roles().get(tenant)
94-
&& let Some(user) = tenant_users.iter().find_map(|(_, user)| {
95-
let mut res = None;
96-
for role in &user.roles {
97-
if let Some(role) = tenant_roles.get(role)
98-
&& role.contains(&DefaultPrivilege::Admin)
99-
{
100-
res = Some(user.clone());
101-
break;
102-
}
103-
}
104-
res
105-
}) {
106-
// fetch admin credentials for tenant
107-
match user.ty {
108-
crate::rbac::user::UserType::Native(basic) => {
109-
// Create a protected user whose details can't be edited
110-
// save that user's basic auth
111-
// use that to send request
112-
None
113-
}
114-
crate::rbac::user::UserType::OAuth(_) => {
115-
tracing::warn!("admin user is oauth");
116-
None
117-
}
118-
}
90+
let auth = if let Some(tenant) = self.tenant_id.as_ref()
91+
&& let Some(header) = TENANT_METADATA.get_global_query_auth(tenant)
92+
{
93+
let mut map = HeaderMap::new();
94+
map.insert(
95+
HeaderName::from_static("authorization"),
96+
HeaderValue::from_str(&header).unwrap(),
97+
);
98+
Some(map)
11999
} else {
120100
None
121101
};

src/alerts/alerts_utils.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
use std::{collections::HashMap, fmt::Display};
2020

21-
use actix_web::Either;
21+
use actix_web::{Either, http::header::HeaderMap};
2222
use arrow_array::{Array, Float64Array, Int64Array, RecordBatch};
2323
use datafusion::{
2424
logical_expr::{Literal, LogicalPlan},
@@ -75,7 +75,7 @@ pub fn extract_time_range(eval_config: &super::EvalConfig) -> Result<TimeRange,
7575

7676
/// Execute the alert query based on the current mode and return structured group results
7777
pub async fn execute_alert_query(
78-
auth_token: Option<String>,
78+
auth_token: Option<HeaderMap>,
7979
query: &str,
8080
time_range: &TimeRange,
8181
tenant_id: &Option<String>,
@@ -128,7 +128,7 @@ async fn execute_local_query(
128128

129129
/// Execute alert query remotely (Prism mode)
130130
async fn execute_remote_query(
131-
auth_token: Option<String>,
131+
auth_token: Option<HeaderMap>,
132132
query: &str,
133133
time_range: &TimeRange,
134134
tenant_id: &Option<String>,
@@ -146,7 +146,7 @@ async fn execute_remote_query(
146146
filter_tags: None,
147147
};
148148

149-
let (result_value, _) = send_query_request(None, &query_request, tenant_id)
149+
let (result_value, _) = send_query_request(auth_token, &query_request, tenant_id)
150150
.await
151151
.map_err(|err| AlertError::CustomError(format!("Failed to send query request: {err}")))?;
152152

src/alerts/mod.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -624,9 +624,9 @@ impl AlertConfig {
624624
}
625625

626626
// get active sessions
627-
let active_sessions = sessions().get_active_sessions();
627+
let active_session = sessions().get_active_sessions();
628628
let mut broadcast_to = vec![];
629-
for session in active_sessions {
629+
for (session, _, _) in active_session {
630630
if user_auth_for_query(&session, &self.query).await.is_ok()
631631
&& let SessionKey::SessionId(id) = &session
632632
{
@@ -1150,7 +1150,7 @@ impl AlertManagerTrait for Alerts {
11501150
tags: Vec<String>,
11511151
) -> Result<Vec<AlertConfig>, AlertError> {
11521152
let tenant_id = get_tenant_id_from_key(&session);
1153-
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
1153+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
11541154
// First, collect all alerts without performing auth checks to avoid holding the lock
11551155
let all_alerts: Vec<AlertConfig> = {
11561156
let alerts_guard = self.alerts.read().await;
@@ -1226,7 +1226,7 @@ impl AlertManagerTrait for Alerts {
12261226
id: Ulid,
12271227
tenant_id: &Option<String>,
12281228
) -> Result<Box<dyn AlertTrait>, AlertError> {
1229-
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
1229+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
12301230
let read_access = self.alerts.read().await;
12311231
if let Some(alerts) = read_access.get(tenant)
12321232
&& let Some(alert) = alerts.get(&id)
@@ -1241,7 +1241,7 @@ impl AlertManagerTrait for Alerts {
12411241

12421242
/// Update the in-mem vector of alerts
12431243
async fn update(&self, alert: &dyn AlertTrait) {
1244-
let tenant = alert.get_tenant_id().as_ref().map_or(DEFAULT_TENANT, |v| v);
1244+
let tenant = alert.get_tenant_id().as_deref().unwrap_or(DEFAULT_TENANT);
12451245
self.alerts
12461246
.write()
12471247
.await
@@ -1331,7 +1331,7 @@ impl AlertManagerTrait for Alerts {
13311331
{
13321332
let mut write_access = self.alerts.write().await;
13331333

1334-
let tenant = alert.get_tenant_id().as_ref().map_or(DEFAULT_TENANT, |v| v);
1334+
let tenant = alert.get_tenant_id().as_deref().unwrap_or(DEFAULT_TENANT);
13351335
if let Some(alerts) = write_access.get_mut(tenant) {
13361336
alerts.insert(*alert.get_id(), alert.clone_box());
13371337
}
@@ -1350,7 +1350,7 @@ impl AlertManagerTrait for Alerts {
13501350
) -> Result<(), AlertError> {
13511351
// read and modify alert
13521352
let mut write_access = self.alerts.write().await;
1353-
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
1353+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
13541354
let mut alert: Box<dyn AlertTrait> = if let Some(alerts) = write_access.get(tenant)
13551355
&& let Some(alert) = alerts.get(&alert_id)
13561356
{
@@ -1384,7 +1384,7 @@ impl AlertManagerTrait for Alerts {
13841384

13851385
/// Remove alert and scheduled task from disk and memory
13861386
async fn delete(&self, alert_id: Ulid, tenant_id: &Option<String>) -> Result<(), AlertError> {
1387-
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
1387+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
13881388
if let Some(alerts) = self.alerts.write().await.get_mut(tenant)
13891389
&& let Some(_) = alerts.remove(&alert_id)
13901390
{
@@ -1406,7 +1406,7 @@ impl AlertManagerTrait for Alerts {
14061406
alert_id: Ulid,
14071407
tenant_id: &Option<String>,
14081408
) -> Result<AlertState, AlertError> {
1409-
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
1409+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
14101410
let read_access = self.alerts.read().await;
14111411

14121412
if let Some(alerts) = read_access.get(tenant)
@@ -1441,7 +1441,7 @@ impl AlertManagerTrait for Alerts {
14411441
/// List tags from all alerts
14421442
/// This function returns a list of unique tags from all alerts
14431443
async fn list_tags(&self, tenant_id: &Option<String>) -> Vec<String> {
1444-
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
1444+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
14451445
// let alerts = self.alerts.read().await;
14461446
let mut tags = if let Some(alerts) = self.alerts.read().await.get(tenant) {
14471447
alerts
@@ -1461,7 +1461,7 @@ impl AlertManagerTrait for Alerts {
14611461
&self,
14621462
tenant_id: &Option<String>,
14631463
) -> HashMap<Ulid, Box<dyn AlertTrait>> {
1464-
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
1464+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
14651465
if let Some(alerts) = self.alerts.read().await.get(tenant) {
14661466
alerts.iter().map(|(k, v)| (*k, v.clone_box())).collect()
14671467
} else {

src/alerts/target.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ impl TargetConfigs {
7878
.put_target(&target, &target.tenant)
7979
.await?;
8080
let mut map = self.target_configs.write().await;
81-
let tenant_id = target.tenant.as_ref().map_or(DEFAULT_TENANT, |v| v);
81+
let tenant_id = target.tenant.as_deref().unwrap_or(DEFAULT_TENANT);
8282
map.entry(tenant_id.to_owned())
8383
.or_default()
8484
.insert(target.id, target);
@@ -87,7 +87,7 @@ impl TargetConfigs {
8787
}
8888

8989
pub async fn list(&self, tenant_id: &Option<String>) -> Result<Vec<Target>, AlertError> {
90-
let tenant_id = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
90+
let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
9191
let targets = if let Some(targets) = self.target_configs.read().await.get(tenant_id) {
9292
targets.values().cloned().collect_vec()
9393
} else {
@@ -108,7 +108,7 @@ impl TargetConfigs {
108108
target_id: &Ulid,
109109
tenant_id: &Option<String>,
110110
) -> Result<Target, AlertError> {
111-
let tenant_id = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
111+
let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
112112
let target = if let Some(targets) = self.target_configs.read().await.get(tenant_id) {
113113
targets
114114
.get(target_id)
@@ -133,7 +133,7 @@ impl TargetConfigs {
133133
target_id: &Ulid,
134134
tenant_id: &Option<String>,
135135
) -> Result<Target, AlertError> {
136-
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
136+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
137137
// ensure that the target is not being used by any alert
138138
let guard = ALERTS.read().await;
139139
let alerts = if let Some(alerts) = guard.as_ref() {

src/analytics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ fn total_event_stats() -> (Stats, Stats, Stats) {
207207
let mut deleted_json_bytes: u64 = 0;
208208

209209
let tenants = if let Some(tenants) = PARSEABLE.list_tenants() {
210-
tenants.into_iter().map(|v| Some(v)).collect()
210+
tenants.into_iter().map(Some).collect()
211211
} else {
212212
vec![None]
213213
};

src/catalog/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ async fn create_manifest(
464464
.get_manifest_path(stream_name, lower_bound, upper_bound, tenant_id)
465465
.await
466466
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?;
467-
tracing::warn!("manifest path_url= {path_url}");
467+
468468
let new_snapshot_entry = snapshot::ManifestItem {
469469
manifest_path: path_url.to_owned(),
470470
time_lower_bound: lower_bound,

src/correlation.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl Correlations {
8484
let mut user_correlations = vec![];
8585
let permissions = Users.get_permissions(session_key);
8686
let tenant_id = get_tenant_id_from_key(session_key);
87-
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
87+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
8888
if let Some(corrs) = self.read().await.get(tenant) {
8989
for correlation in corrs.values() {
9090
let tables = &correlation
@@ -109,17 +109,17 @@ impl Correlations {
109109
correlation_id: &str,
110110
tenant_id: &Option<String>,
111111
) -> Result<CorrelationConfig, CorrelationError> {
112-
let tenant_id = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
112+
let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
113113
if let Some(corrs) = self.read().await.get(tenant_id) {
114114
corrs.get(correlation_id).cloned().ok_or_else(|| {
115115
CorrelationError::AnyhowError(anyhow::Error::msg(format!(
116116
"Unable to find correlation with ID- {correlation_id}"
117117
)))
118118
})
119119
} else {
120-
return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
120+
Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
121121
"Unable to find correlation with ID- {correlation_id}"
122-
))));
122+
))))
123123
}
124124
}
125125

@@ -137,7 +137,7 @@ impl Correlations {
137137
.metastore
138138
.put_correlation(&correlation, &tenant_id)
139139
.await?;
140-
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
140+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
141141
// Update in memory
142142
if let Some(corrs) = self.write().await.get_mut(tenant) {
143143
corrs.insert(correlation.id.to_owned(), correlation.clone());
@@ -173,7 +173,7 @@ impl Correlations {
173173
.put_correlation(&updated_correlation, &tenant_id)
174174
.await?;
175175

176-
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
176+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
177177
// Update in memory
178178
if let Some(corrs) = self.write().await.get_mut(tenant) {
179179
corrs.insert(

src/event/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ impl Event {
8787
self.stream_type,
8888
)?;
8989

90-
let tenant = self.tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
90+
let tenant = self.tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
9191
update_stats(
9292
&self.stream_name,
9393
self.origin_format,
@@ -145,7 +145,7 @@ pub fn commit_schema(
145145
tenant_id: &Option<String>,
146146
) -> Result<(), StagingError> {
147147
let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned");
148-
let tenant_id = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
148+
let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
149149
let map = &mut stream_metadata
150150
.get_mut(tenant_id)
151151
.ok_or_else(|| TenantNotFound(tenant_id.to_owned()))?

src/handlers/http/cluster/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use crate::handlers::http::modal::ingest::SyncRole;
4545
use crate::handlers::http::query::{Query, QueryError, TIME_ELAPSED_HEADER};
4646
use crate::metrics::prom_utils::Metrics;
4747
use crate::option::Mode;
48-
use crate::parseable::{DEFAULT_TENANT, PARSEABLE};
48+
use crate::parseable::PARSEABLE;
4949
use crate::rbac::role::model::DefaultPrivilege;
5050
use crate::rbac::user::User;
5151
use crate::stats::Stats;
@@ -710,7 +710,6 @@ pub async fn sync_password_reset_with_ingestors(
710710

711711
// forward the put role request to all ingestors and queriers to keep them in sync
712712
pub async fn sync_role_update(
713-
req: HttpRequest,
714713
name: String,
715714
privileges: Vec<DefaultPrivilege>,
716715
tenant_id: &Option<String>,
@@ -1902,6 +1901,7 @@ pub async fn send_query_request(
19021901
let mut map = reqwest::header::HeaderMap::new();
19031902

19041903
if let Some(auth) = auth_token {
1904+
// always basic auth
19051905
for (key, value) in auth.iter() {
19061906
if let Ok(name) = reqwest::header::HeaderName::from_bytes(key.as_str().as_bytes())
19071907
&& let Ok(val) = reqwest::header::HeaderValue::from_bytes(value.as_bytes())

0 commit comments

Comments
 (0)