Skip to content

fix: bugs introduced in #1143 #1185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Feb 14, 2025
Merged
2 changes: 1 addition & 1 deletion src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn storage_info(config: &Parseable) {
Staging Path: \"{}\"",
"Storage:".to_string().bold(),
config.get_storage_mode_string(),
config.staging_dir().to_string_lossy(),
config.options.staging_dir().to_string_lossy(),
);

if let Some(path) = &config.options.hot_tier_storage_path {
Expand Down
72 changes: 71 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use clap::Parser;
use std::path::PathBuf;
use std::{env, fs, path::PathBuf};

use url::Url;

Expand Down Expand Up @@ -385,4 +385,74 @@ impl Options {
pub fn is_default_creds(&self) -> bool {
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
}

/// Path to staging directory, ensures that it exists or panics
pub fn staging_dir(&self) -> &PathBuf {
fs::create_dir_all(&self.local_staging_path)
.expect("Should be able to create dir if doesn't exist");

&self.local_staging_path
}

/// TODO: refactor and document
Copy link
Contributor Author

@de-sh de-sh Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another PR we can move this code to using Url over a regular String and being optional, which will significantly simplify and make things more readable

pub fn get_url(&self) -> Url {
if self.ingestor_endpoint.is_empty() {
return format!(
"{}://{}",
self.get_scheme(),
self.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
});
}

let ingestor_endpoint = &self.ingestor_endpoint;

if ingestor_endpoint.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();

if addr_from_env.len() != 2 {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let mut hostname = addr_from_env[0].to_string();
let mut port = addr_from_env[1].to_string();

// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
// fetch the value from the specified env vars
if hostname.starts_with('$') {
let var_hostname = hostname[1..].to_string();
hostname = env::var(&var_hostname).unwrap_or_default();

if hostname.is_empty() {
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
}
if hostname.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
} else {
hostname = format!("{}://{}", self.get_scheme(), hostname);
}
}

if port.starts_with('$') {
let var_port = port[1..].to_string();
port = env::var(&var_port).unwrap_or_default();

if port.is_empty() {
panic!(
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
var_port
);
}
}

format!("{}://{}:{}", self.get_scheme(), hostname, port)
.parse::<Url>()
.expect("Valid URL")
}
}
2 changes: 1 addition & 1 deletion src/handlers/http/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub async fn about() -> Json<Value> {
let staging = if PARSEABLE.options.mode == Mode::Query {
"".to_string()
} else {
PARSEABLE.staging_dir().display().to_string()
PARSEABLE.options.staging_dir().display().to_string()
};
let grpc_port = PARSEABLE.options.grpc_port;

Expand Down
120 changes: 30 additions & 90 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::event::format::override_data_type;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
use crate::metadata::SchemaVersion;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::Mode;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::rbac::role::Action;
use crate::rbac::Users;
Expand All @@ -47,7 +46,8 @@ use tracing::warn;

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
if !PARSEABLE.streams.contains(&stream_name) {
// Error out if stream doesn't exist in memory, or in the case of query node, in storage as well
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name).into());
}

Expand Down Expand Up @@ -120,15 +120,11 @@ pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, St
Ok((web::Json(schema), StatusCode::OK))
}

pub async fn schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();

// Ensure parseable is aware of stream in distributed mode
if PARSEABLE.options.mode == Mode::Query
&& PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await?
{
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name.clone()).into());
}

Expand Down Expand Up @@ -164,14 +160,8 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name.clone()).into());
}

let retention = PARSEABLE
Expand All @@ -183,36 +173,24 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,

pub async fn put_retention(
stream_name: Path<String>,
Json(json): Json<Value>,
Json(retention): Json<Retention>,
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();

// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name).into());
}
let stream = PARSEABLE.get_stream(&stream_name)?;

let retention: Retention = match serde_json::from_value(json) {
Ok(retention) => retention,
Err(err) => return Err(StreamError::InvalidRetentionConfig(err)),
};

PARSEABLE
.storage
.get_object_store()
.put_retention(&stream_name, &retention)
.await?;

stream.set_retention(retention);
PARSEABLE.get_stream(&stream_name)?.set_retention(retention);

Ok((
format!("set retention configuration for log stream {stream_name}"),
Expand Down Expand Up @@ -250,21 +228,11 @@ pub async fn get_stats(
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();

if !PARSEABLE.streams.contains(&stream_name) {
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
} else {
return Err(StreamNotFound(stream_name).into());
}
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name.clone()).into());
}

let query_string = req.query_string();
Expand Down Expand Up @@ -356,19 +324,13 @@ pub async fn get_stats(

pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
if !PARSEABLE.streams.contains(&stream_name) {
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
} else {
return Err(StreamNotFound(stream_name).into());
}
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name.clone()).into());
}

let storage = PARSEABLE.storage.get_object_store();
// if first_event_at is not found in memory map, check if it exists in the storage
// if it exists in the storage, update the first_event_at in memory map
Expand Down Expand Up @@ -417,14 +379,8 @@ pub async fn put_stream_hot_tier(
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name).into());
}

let stream = PARSEABLE.get_stream(&stream_name)?;
Expand Down Expand Up @@ -467,21 +423,11 @@ pub async fn put_stream_hot_tier(
pub async fn get_stream_hot_tier(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();

if !PARSEABLE.streams.contains(&stream_name) {
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
} else {
return Err(StreamNotFound(stream_name).into());
}
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name.clone()).into());
}

let Some(hot_tier_manager) = HotTierManager::global() else {
Expand All @@ -500,14 +446,8 @@ pub async fn delete_stream_hot_tier(
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name).into());
}

if PARSEABLE.get_stream(&stream_name)?.get_stream_type() == StreamType::Internal {
Expand Down
Loading
Loading