Skip to content

Commit 730d7dd

Browse files
author
Devdutt Shenoi
committed
fix: deadlock during setup of parseable ingestor node
1 parent 20e66a4 commit 730d7dd

File tree

5 files changed

+77
-84
lines changed

5 files changed

+77
-84
lines changed

src/cli.rs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use clap::Parser;
20-
use std::path::PathBuf;
20+
use std::{env, path::PathBuf};
2121

2222
use url::Url;
2323

@@ -385,4 +385,66 @@ impl Options {
385385
pub fn is_default_creds(&self) -> bool {
386386
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
387387
}
388+
389+
/// TODO: refactor and document
390+
pub fn get_url(&self) -> Url {
391+
if self.ingestor_endpoint.is_empty() {
392+
return format!(
393+
"{}://{}",
394+
self.get_scheme(),
395+
self.address
396+
)
397+
.parse::<Url>() // if the value was improperly set, this will panic before hand
398+
.unwrap_or_else(|err| {
399+
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
400+
});
401+
}
402+
403+
let ingestor_endpoint = &self.ingestor_endpoint;
404+
405+
if ingestor_endpoint.starts_with("http") {
406+
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
407+
}
408+
409+
let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();
410+
411+
if addr_from_env.len() != 2 {
412+
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
413+
}
414+
415+
let mut hostname = addr_from_env[0].to_string();
416+
let mut port = addr_from_env[1].to_string();
417+
418+
// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
419+
// fetch the value from the specified env vars
420+
if hostname.starts_with('$') {
421+
let var_hostname = hostname[1..].to_string();
422+
hostname = env::var(&var_hostname).unwrap_or_default();
423+
424+
if hostname.is_empty() {
425+
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
426+
}
427+
if hostname.starts_with("http") {
428+
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
429+
} else {
430+
hostname = format!("{}://{}", self.get_scheme(), hostname);
431+
}
432+
}
433+
434+
if port.starts_with('$') {
435+
let var_port = port[1..].to_string();
436+
port = env::var(&var_port).unwrap_or_default();
437+
438+
if port.is_empty() {
439+
panic!(
440+
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
441+
var_port
442+
);
443+
}
444+
}
445+
446+
format!("{}://{}:{}", self.get_scheme(), hostname, port)
447+
.parse::<Url>()
448+
.expect("Valid URL")
449+
}
388450
}

src/handlers/http/modal/mod.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ use crate::{
3535
cli::Options,
3636
oidc::Claims,
3737
parseable::PARSEABLE,
38-
storage::PARSEABLE_ROOT_DIRECTORY,
39-
utils::{get_ingestor_id, get_url},
38+
storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY},
39+
utils::get_ingestor_id,
4040
};
4141

4242
use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION};
@@ -213,18 +213,18 @@ impl IngestorMetadata {
213213
}
214214

215215
/// Capture metadata information by either loading it from staging or starting fresh
216-
pub fn load() -> Arc<Self> {
216+
pub fn load(options: &Options, storage: &dyn ObjectStorageProvider) -> Arc<Self> {
217217
// all the files should be in the staging directory root
218-
let entries = std::fs::read_dir(&PARSEABLE.options.local_staging_path)
219-
.expect("Couldn't read from file");
220-
let url = get_url();
218+
let entries =
219+
std::fs::read_dir(&options.local_staging_path).expect("Couldn't read from file");
220+
let url = options.get_url();
221221
let port = url.port().unwrap_or(80).to_string();
222222
let url = url.to_string();
223223
let Options {
224224
username, password, ..
225-
} = PARSEABLE.options.as_ref();
226-
let staging_path = PARSEABLE.staging_dir();
227-
let flight_port = PARSEABLE.options.flight_port.to_string();
225+
} = options;
226+
let staging_path = &options.local_staging_path;
227+
let flight_port = options.flight_port.to_string();
228228

229229
for entry in entries {
230230
// cause the staging directory will have only one file with ingestor in the name
@@ -250,7 +250,7 @@ impl IngestorMetadata {
250250
if obj.get("flight_port").is_none() {
251251
obj.insert(
252252
"flight_port".to_owned(),
253-
Value::String(PARSEABLE.options.flight_port.to_string()),
253+
Value::String(options.flight_port.to_string()),
254254
);
255255
}
256256

@@ -291,7 +291,7 @@ impl IngestorMetadata {
291291
}
292292
}
293293

294-
let storage = PARSEABLE.storage.get_object_store();
294+
let storage = storage.get_object_store();
295295
let meta = Self::new(
296296
port,
297297
url,

src/metrics/prom_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
use crate::handlers::http::base_path_without_preceding_slash;
2020
use crate::handlers::http::ingest::PostError;
2121
use crate::handlers::http::modal::IngestorMetadata;
22-
use crate::utils::get_url;
22+
use crate::parseable::PARSEABLE;
2323
use crate::HTTP_CLIENT;
2424
use actix_web::http::header;
2525
use chrono::NaiveDateTime;
@@ -61,7 +61,7 @@ struct StorageMetrics {
6161

6262
impl Default for Metrics {
6363
fn default() -> Self {
64-
let url = get_url();
64+
let url = PARSEABLE.options.get_url();
6565
let address = format!(
6666
"http://{}:{}",
6767
url.domain()

src/parseable/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ impl Parseable {
129129
storage: Arc<dyn ObjectStorageProvider>,
130130
) -> Self {
131131
let ingestor_metadata = match &options.mode {
132-
Mode::Ingest => Some(IngestorMetadata::load()),
132+
Mode::Ingest => Some(IngestorMetadata::load(&options, storage.as_ref())),
133133
_ => None,
134134
};
135135
Parseable {

src/utils/mod.rs

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,14 @@ pub mod uid;
2626
pub mod update;
2727

2828
use crate::handlers::http::rbac::RBACError;
29-
use crate::parseable::PARSEABLE;
3029
use crate::rbac::role::{Action, Permission};
3130
use crate::rbac::Users;
3231
use actix::extract_session_key_from_req;
3332
use actix_web::HttpRequest;
3433
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc};
3534
use regex::Regex;
3635
use sha2::{Digest, Sha256};
37-
use std::env;
3836
use tracing::debug;
39-
use url::Url;
4037

4138
/// Convert minutes to a slot range
4239
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
@@ -55,72 +52,6 @@ pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option<String> {
5552
Some(format!("{block_start:02}-{block_end:02}"))
5653
}
5754

58-
pub fn get_url() -> Url {
59-
if PARSEABLE.options.ingestor_endpoint.is_empty() {
60-
return format!(
61-
"{}://{}",
62-
PARSEABLE.options.get_scheme(),
63-
PARSEABLE.options.address
64-
)
65-
.parse::<Url>() // if the value was improperly set, this will panic before hand
66-
.unwrap_or_else(|err| {
67-
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", PARSEABLE.options.address)
68-
});
69-
}
70-
71-
let ingestor_endpoint = &PARSEABLE.options.ingestor_endpoint;
72-
73-
if ingestor_endpoint.starts_with("http") {
74-
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
75-
}
76-
77-
let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();
78-
79-
if addr_from_env.len() != 2 {
80-
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
81-
}
82-
83-
let mut hostname = addr_from_env[0].to_string();
84-
let mut port = addr_from_env[1].to_string();
85-
86-
// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
87-
// fetch the value from the specified env vars
88-
if hostname.starts_with('$') {
89-
let var_hostname = hostname[1..].to_string();
90-
hostname = get_from_env(&var_hostname);
91-
92-
if hostname.is_empty() {
93-
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
94-
}
95-
if hostname.starts_with("http") {
96-
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
97-
} else {
98-
hostname = format!("{}://{}", PARSEABLE.options.get_scheme(), hostname);
99-
}
100-
}
101-
102-
if port.starts_with('$') {
103-
let var_port = port[1..].to_string();
104-
port = get_from_env(&var_port);
105-
106-
if port.is_empty() {
107-
panic!(
108-
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
109-
var_port
110-
);
111-
}
112-
}
113-
114-
format!("{}://{}:{}", PARSEABLE.options.get_scheme(), hostname, port)
115-
.parse::<Url>()
116-
.expect("Valid URL")
117-
}
118-
119-
/// util fuction to fetch value from an env var
120-
fn get_from_env(var_to_fetch: &str) -> String {
121-
env::var(var_to_fetch).unwrap_or_else(|_| "".to_string())
122-
}
123-
12455
pub fn get_ingestor_id() -> String {
12556
let now = Utc::now().to_rfc3339();
12657
let id = get_hash(&now).to_string().split_at(15).0.to_string();

0 commit comments

Comments
 (0)