Skip to content

fix: bugs introduced in #1143 #1185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Feb 14, 2025
Merged
2 changes: 1 addition & 1 deletion src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn storage_info(config: &Parseable) {
Staging Path: \"{}\"",
"Storage:".to_string().bold(),
config.get_storage_mode_string(),
config.staging_dir().to_string_lossy(),
config.options.staging_dir().to_string_lossy(),
);

if let Some(path) = &config.options.hot_tier_storage_path {
Expand Down
72 changes: 71 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use clap::Parser;
use std::path::PathBuf;
use std::{env, fs, path::PathBuf};

use url::Url;

Expand Down Expand Up @@ -385,4 +385,74 @@ impl Options {
pub fn is_default_creds(&self) -> bool {
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
}

/// Path to staging directory, ensures that it exists or panics
pub fn staging_dir(&self) -> &PathBuf {
fs::create_dir_all(&self.local_staging_path)
.expect("Should be able to create dir if doesn't exist");

&self.local_staging_path
}

/// TODO: refactor and document
Copy link
Contributor Author

@de-sh de-sh Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another PR we can move this code to using Url over a regular String and being optional, which will significantly simplify and make things more readable

pub fn get_url(&self) -> Url {
if self.ingestor_endpoint.is_empty() {
return format!(
"{}://{}",
self.get_scheme(),
self.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
});
}

let ingestor_endpoint = &self.ingestor_endpoint;

if ingestor_endpoint.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();

if addr_from_env.len() != 2 {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let mut hostname = addr_from_env[0].to_string();
let mut port = addr_from_env[1].to_string();

// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
// fetch the value from the specified env vars
if hostname.starts_with('$') {
let var_hostname = hostname[1..].to_string();
hostname = env::var(&var_hostname).unwrap_or_default();

if hostname.is_empty() {
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
}
if hostname.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
} else {
hostname = format!("{}://{}", self.get_scheme(), hostname);
}
}

if port.starts_with('$') {
let var_port = port[1..].to_string();
port = env::var(&var_port).unwrap_or_default();

if port.is_empty() {
panic!(
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
var_port
);
}
}

format!("{}://{}:{}", self.get_scheme(), hostname, port)
.parse::<Url>()
.expect("Valid URL")
}
}
2 changes: 1 addition & 1 deletion src/handlers/http/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub async fn about() -> Json<Value> {
let staging = if PARSEABLE.options.mode == Mode::Query {
"".to_string()
} else {
PARSEABLE.staging_dir().display().to_string()
PARSEABLE.options.staging_dir().display().to_string()
};
let grpc_port = PARSEABLE.options.grpc_port;

Expand Down
24 changes: 12 additions & 12 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use crate::{
cli::Options,
oidc::Claims,
parseable::PARSEABLE,
storage::PARSEABLE_ROOT_DIRECTORY,
utils::{get_ingestor_id, get_url},
storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY},
utils::get_ingestor_id,
};

use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION};
Expand Down Expand Up @@ -213,18 +213,18 @@ impl IngestorMetadata {
}

/// Capture metadata information by either loading it from staging or starting fresh
pub fn load() -> Arc<Self> {
pub fn load(options: &Options, storage: &dyn ObjectStorageProvider) -> Arc<Self> {
// all the files should be in the staging directory root
let entries = std::fs::read_dir(&PARSEABLE.options.local_staging_path)
.expect("Couldn't read from file");
let url = get_url();
let entries =
std::fs::read_dir(&options.local_staging_path).expect("Couldn't read from file");
let url = options.get_url();
let port = url.port().unwrap_or(80).to_string();
let url = url.to_string();
let Options {
username, password, ..
} = PARSEABLE.options.as_ref();
let staging_path = PARSEABLE.staging_dir();
let flight_port = PARSEABLE.options.flight_port.to_string();
} = options;
let staging_path = &options.local_staging_path;
let flight_port = options.flight_port.to_string();

for entry in entries {
// cause the staging directory will have only one file with ingestor in the name
Expand All @@ -250,7 +250,7 @@ impl IngestorMetadata {
if obj.get("flight_port").is_none() {
obj.insert(
"flight_port".to_owned(),
Value::String(PARSEABLE.options.flight_port.to_string()),
Value::String(options.flight_port.to_string()),
);
}

Expand Down Expand Up @@ -291,7 +291,7 @@ impl IngestorMetadata {
}
}

let storage = PARSEABLE.storage.get_object_store();
let storage = storage.get_object_store();
let meta = Self::new(
port,
url,
Expand Down Expand Up @@ -342,7 +342,7 @@ impl IngestorMetadata {
let bytes = Bytes::from(serde_json::to_vec(&json)?);

let resource: IngestorMetadata = serde_json::from_value(json)?;
resource.put_on_disk(PARSEABLE.staging_dir())?;
resource.put_on_disk(PARSEABLE.options.staging_dir())?;

PARSEABLE
.storage
Expand Down
4 changes: 2 additions & 2 deletions src/metrics/prom_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::handlers::http::base_path_without_preceding_slash;
use crate::handlers::http::ingest::PostError;
use crate::handlers::http::modal::IngestorMetadata;
use crate::utils::get_url;
use crate::parseable::PARSEABLE;
use crate::HTTP_CLIENT;
use actix_web::http::header;
use chrono::NaiveDateTime;
Expand Down Expand Up @@ -61,7 +61,7 @@ struct StorageMetrics {

impl Default for Metrics {
fn default() -> Self {
let url = get_url();
let url = PARSEABLE.options.get_url();
let address = format!(
"http://{}:{}",
url.domain()
Expand Down
5 changes: 3 additions & 2 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ pub fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes {
}

pub fn get_staging_metadata(config: &Parseable) -> anyhow::Result<Option<serde_json::Value>> {
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(config.staging_dir());
let path =
RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(config.options.staging_dir());
let bytes = match std::fs::read(path) {
Ok(bytes) => bytes,
Err(err) => match err.kind() {
Expand All @@ -351,7 +352,7 @@ pub fn put_staging_metadata(
config: &Parseable,
metadata: &serde_json::Value,
) -> anyhow::Result<()> {
let path = config.staging_dir().join(".parseable.json");
let path = config.options.staging_dir().join(".parseable.json");
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
Expand Down
6 changes: 1 addition & 5 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Parseable {
storage: Arc<dyn ObjectStorageProvider>,
) -> Self {
let ingestor_metadata = match &options.mode {
Mode::Ingest => Some(IngestorMetadata::load()),
Mode::Ingest => Some(IngestorMetadata::load(&options, storage.as_ref())),
_ => None,
};
Parseable {
Expand Down Expand Up @@ -217,10 +217,6 @@ impl Parseable {
self.storage.clone()
}

pub fn staging_dir(&self) -> &PathBuf {
&self.options.local_staging_path
}

pub fn hot_tier_dir(&self) -> &Option<PathBuf> {
&self.options.hot_tier_storage_path
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
}

async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> {
if !Path::new(&PARSEABLE.staging_dir()).exists() {
if !Path::new(&PARSEABLE.options.staging_dir()).exists() {
return Ok(());
}

Expand Down
22 changes: 13 additions & 9 deletions src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Default for StorageMetadata {
Self {
version: CURRENT_STORAGE_METADATA_VERSION.to_string(),
mode: PARSEABLE.storage.name().to_owned(),
staging: PARSEABLE.staging_dir().to_path_buf(),
staging: PARSEABLE.options.staging_dir().to_path_buf(),
storage: PARSEABLE.storage.get_endpoint(),
deployment_id: uid::gen(),
server_mode: PARSEABLE.options.mode,
Expand Down Expand Up @@ -134,8 +134,8 @@ pub async fn resolve_parseable_metadata(
if metadata.server_mode== Mode::All && PARSEABLE.options.mode == Mode::Ingest {
Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet")
} else {
create_dir_all(PARSEABLE.staging_dir())?;
metadata.staging = PARSEABLE.staging_dir().canonicalize()?;
create_dir_all(PARSEABLE.options.staging_dir())?;
metadata.staging = PARSEABLE.options.staging_dir().canonicalize()?;
// this flag is set to true so that metadata is copied to staging
overwrite_staging = true;
// overwrite remote in all and query mode
Expand All @@ -151,20 +151,20 @@ pub async fn resolve_parseable_metadata(
Mode::Query => {
overwrite_remote = true;
metadata.server_mode = PARSEABLE.options.mode;
metadata.staging = PARSEABLE.staging_dir().to_path_buf();
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
},
Mode::Ingest => {
// if ingest server is started fetch the metadata from remote
// update the server mode for local metadata
metadata.server_mode = PARSEABLE.options.mode;
metadata.staging = PARSEABLE.staging_dir().to_path_buf();
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
},
}
Ok(metadata)
}
}
EnvChange::CreateBoth => {
create_dir_all(PARSEABLE.staging_dir())?;
create_dir_all(PARSEABLE.options.staging_dir())?;
let metadata = StorageMetadata::default();
// new metadata needs to be set
// if mode is query or all then both staging and remote
Expand Down Expand Up @@ -237,7 +237,8 @@ pub enum EnvChange {
}

pub fn get_staging_metadata() -> io::Result<Option<StorageMetadata>> {
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(PARSEABLE.staging_dir());
let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME)
.to_path(PARSEABLE.options.staging_dir());
let bytes = match fs::read(path) {
Ok(bytes) => bytes,
Err(err) => match err.kind() {
Expand All @@ -259,8 +260,11 @@ pub async fn put_remote_metadata(metadata: &StorageMetadata) -> Result<(), Objec
pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> {
let mut staging_metadata = meta.clone();
staging_metadata.server_mode = PARSEABLE.options.mode;
staging_metadata.staging = PARSEABLE.staging_dir().to_path_buf();
let path = PARSEABLE.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
staging_metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
let path = PARSEABLE
.options
.staging_dir()
.join(PARSEABLE_METADATA_FILE_NAME);
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
Expand Down
69 changes: 0 additions & 69 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,14 @@ pub mod uid;
pub mod update;

use crate::handlers::http::rbac::RBACError;
use crate::parseable::PARSEABLE;
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use actix::extract_session_key_from_req;
use actix_web::HttpRequest;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc};
use regex::Regex;
use sha2::{Digest, Sha256};
use std::env;
use tracing::debug;
use url::Url;

/// Convert minutes to a slot range
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
Expand All @@ -55,72 +52,6 @@ pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option<String> {
Some(format!("{block_start:02}-{block_end:02}"))
}

pub fn get_url() -> Url {
if PARSEABLE.options.ingestor_endpoint.is_empty() {
return format!(
"{}://{}",
PARSEABLE.options.get_scheme(),
PARSEABLE.options.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", PARSEABLE.options.address)
});
}

let ingestor_endpoint = &PARSEABLE.options.ingestor_endpoint;

if ingestor_endpoint.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();

if addr_from_env.len() != 2 {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let mut hostname = addr_from_env[0].to_string();
let mut port = addr_from_env[1].to_string();

// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
// fetch the value from the specified env vars
if hostname.starts_with('$') {
let var_hostname = hostname[1..].to_string();
hostname = get_from_env(&var_hostname);

if hostname.is_empty() {
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
}
if hostname.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
} else {
hostname = format!("{}://{}", PARSEABLE.options.get_scheme(), hostname);
}
}

if port.starts_with('$') {
let var_port = port[1..].to_string();
port = get_from_env(&var_port);

if port.is_empty() {
panic!(
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
var_port
);
}
}

format!("{}://{}:{}", PARSEABLE.options.get_scheme(), hostname, port)
.parse::<Url>()
.expect("Valid URL")
}

/// util fuction to fetch value from an env var
fn get_from_env(var_to_fetch: &str) -> String {
env::var(var_to_fetch).unwrap_or_else(|_| "".to_string())
}

pub fn get_ingestor_id() -> String {
let now = Utc::now().to_rfc3339();
let id = get_hash(&now).to_string().split_at(15).0.to_string();
Expand Down
Loading