-
-
Notifications
You must be signed in to change notification settings - Fork 145
fix: bugs introduced in #1143 #1185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes introduce new methods in the Changes
Sequence Diagram(s)sequenceDiagram
participant U as User
participant LS as LogstreamHandler
participant M as Memory
participant S as Storage
U->>LS: Request delete stream
LS->>M: Check for stream in memory
alt Stream exists in Memory
LS->>LS: Proceed with deletion
else Stream not in Memory
LS->>LS: Verify mode is "Query"
LS->>S: Attempt to fetch stream from storage
alt Stream found or error on creation
LS->>U: Return StreamNotFound error
end
end
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
⏰ Context from checks skipped due to timeout of 90000ms (10)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
@@ -385,4 +385,66 @@ impl Options { | |||
pub fn is_default_creds(&self) -> bool { | |||
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD | |||
} | |||
|
|||
/// TODO: refactor and document |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In another PR we can move this code to using Url
over a regular String
and being optional, which will significantly simplify and make things more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/cli.rs (1)
389-390
: Add documentation for the get_url method.The TODO comment indicates missing documentation. Please add comprehensive documentation explaining:
- The purpose of the method
- Parameter requirements
- Return value
- Error conditions
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/cli.rs
(2 hunks)src/handlers/http/modal/mod.rs
(4 hunks)src/metrics/prom_utils.rs
(2 hunks)src/parseable/mod.rs
(1 hunks)src/utils/mod.rs
(0 hunks)
💤 Files with no reviewable changes (1)
- src/utils/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
🔇 Additional comments (5)
src/metrics/prom_utils.rs (1)
64-64
: LGTM! URL construction is now handled by the Options struct.The change to use
PARSEABLE.options.get_url()
aligns with the new URL construction approach, making the code more maintainable.src/handlers/http/modal/mod.rs (2)
216-228
: Great improvement! Explicit parameter passing reduces global state dependencies.The changes to accept
options
andstorage
parameters make the dependencies explicit and help prevent deadlocks by avoiding global state access.
294-294
: LGTM! Consistent use of passed storage parameter.Using the passed
storage
parameter instead of accessing global state maintains consistency with the dependency injection approach.src/cli.rs (1)
391-449
: LGTM! Robust URL construction with comprehensive error handling.The implementation:
- Handles both empty and non-empty ingestor_endpoint cases
- Provides clear error messages for invalid configurations
- Properly validates environment variables
- Ensures URL construction is consistent
src/parseable/mod.rs (1)
132-132
: LGTM! Proper dependency injection completes the deadlock fix.The change to pass
options
andstorage
toIngestorMetadata::load
completes the dependency injection pattern, helping prevent deadlocks by making dependencies explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/handlers/http/modal/mod.rs (1)
345-345
: Consider passing staging_dir as parameter.The
migrate
method still accessesPARSEABLE.options.staging_dir()
directly. Consider passing it as a parameter to maintain consistency with the dependency injection pattern.- resource.put_on_disk(PARSEABLE.options.staging_dir())?; + resource.put_on_disk(options.staging_dir())?;src/cli.rs (1)
397-457
: Consider breaking down the URL construction logic.The method is quite long and handles multiple responsibilities. Consider splitting it into smaller, focused functions:
- URL construction from scheme and address
- Hostname and port parsing
- Environment variable resolution
impl Options { + fn resolve_env_var(var_name: &str, var_type: &str) -> String { + let value = env::var(var_name).unwrap_or_default(); + if value.is_empty() { + panic!( + "The environement variable `{}` is not set. Please refer to the documentation: https://logg.ing/env for more details.", + var_name + ); + } + value + } + + fn validate_hostname(hostname: &str, var_name: Option<&str>) -> String { + if hostname.starts_with("http") { + let err_msg = format!( + "Invalid value `{}`, please set {} to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", + hostname, + var_name.map_or("the hostname".to_string(), |v| format!("the environement variable `{}`", v)) + ); + panic!("{}", err_msg); + } + hostname.to_string() + } + pub fn get_url(&self) -> Url { if self.ingestor_endpoint.is_empty() { return format!( "{}://{}", self.get_scheme(), self.address ) .parse::<Url>() .unwrap_or_else(|err| { panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address) }); } let ingestor_endpoint = &self.ingestor_endpoint; if ingestor_endpoint.starts_with("http") { panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint); } let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>(); if addr_from_env.len() != 2 { panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint); } let mut hostname = addr_from_env[0].to_string(); let mut port = addr_from_env[1].to_string(); if hostname.starts_with('$') { let var_hostname = hostname[1..].to_string(); hostname = Self::resolve_env_var(&var_hostname, "hostname"); hostname = Self::validate_hostname(&hostname, Some(&var_hostname)); hostname = format!("{}://{}", self.get_scheme(), hostname); } if port.starts_with('$') { let var_port = port[1..].to_string(); port = Self::resolve_env_var(&var_port, "port"); } format!("{}://{}:{}", self.get_scheme(), hostname, port) .parse::<Url>() .expect("Valid URL") }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
src/banner.rs
(1 hunks)src/cli.rs
(2 hunks)src/handlers/http/about.rs
(1 hunks)src/handlers/http/modal/mod.rs
(5 hunks)src/migration/mod.rs
(2 hunks)src/parseable/mod.rs
(1 hunks)src/storage/object_storage.rs
(1 hunks)src/storage/store_metadata.rs
(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/parseable/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (12)
src/handlers/http/modal/mod.rs (2)
216-228
: LGTM! Dependency injection pattern improves code quality.The refactoring to pass
options
andstorage
explicitly as parameters instead of relying on the globalPARSEABLE
variable is a good improvement. This change:
- Makes dependencies explicit and clear
- Reduces coupling with global state
- Helps prevent potential deadlocks during initialization
294-303
: LGTM! Consistent use of passed dependencies.The code correctly uses the passed
storage
parameter instead of accessing the global state, maintaining consistency with the dependency injection pattern.src/storage/store_metadata.rs (3)
73-73
: LGTM! Consistent staging directory access.The change to access staging directory through
options
aligns with the codebase-wide refactoring pattern.
137-161
: LGTM! Consistent error handling and staging directory access.The code maintains proper error handling while updating the staging directory access pattern. The changes are well-structured and maintain the existing functionality.
263-267
: LGTM! Consistent path construction.The code correctly uses the staging directory from options for constructing the metadata file path.
src/handlers/http/about.rs (1)
69-69
: LGTM! Consistent staging directory access.The change to access staging directory through
options
aligns with the codebase-wide refactoring pattern.src/banner.rs (1)
107-107
: LGTM! Consistent staging directory access.The change to access staging directory through
options
aligns with the codebase-wide refactoring pattern.src/cli.rs (2)
389-395
: LGTM! Clear and focused implementation.The method ensures the staging directory exists and provides a clean interface to access it.
397-398
: Please clarify the TODO comment.The TODO comment "refactor and document" is too vague. Please specify what aspects need refactoring and what documentation is missing.
src/migration/mod.rs (2)
328-329
: LGTM! Consistent with the dependency injection pattern.The change to use
config.options.staging_dir()
makes the dependency on options explicit and helps prevent deadlocks.
355-355
: LGTM! Consistent with the dependency injection pattern.The change to use
config.options.staging_dir()
aligns with the pattern used inget_staging_metadata
and helps prevent deadlocks.src/storage/object_storage.rs (1)
648-648
: LGTM! Consistent with the dependency injection pattern.The change to use
PARSEABLE.options.staging_dir()
aligns with the pattern used across the codebase and helps prevent deadlocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/handlers/http/modal/mod.rs (1)
229-291
: Consider adding error recovery for corrupted metadata.The code assumes the JSON file is not corrupted and uses
expect
which will panic if the file is invalid. Consider adding graceful error handling.- let text = std::fs::read(path).expect("File should be present"); - let mut meta: Value = serde_json::from_slice(&text).expect("Valid JSON"); + let text = std::fs::read(path).map_err(|e| { + warn!("Failed to read ingestor metadata file: {}", e); + e + })?; + let mut meta: Value = serde_json::from_slice(&text).map_err(|e| { + warn!("Invalid JSON in ingestor metadata file: {}", e); + e + })?;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/handlers/http/modal/mod.rs
(5 hunks)src/parseable/mod.rs
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/parseable/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (5)
src/handlers/http/modal/mod.rs (5)
215-216
: LGTM! Improved dependency injection pattern.The updated method signature now explicitly accepts dependencies (
options
andstorage
) instead of relying on global state, which helps prevent deadlocks and makes the code more maintainable.
218-228
: LGTM! Consistent use of passed parameters.The code now correctly uses the passed
options
parameter to access configuration values instead of relying on the globalPARSEABLE
state.
294-308
: LGTM! Proper initialization of new metadata.The code correctly initializes new metadata when no existing metadata is found, using the passed parameters.
345-345
: LGTM! Consistent staging directory access.The
migrate
method now correctly usesPARSEABLE.options.staging_dir()
for consistency with other code paths.
372-416
: Add test coverage for the deadlock fix.While the existing tests cover serialization/deserialization, consider adding tests that verify:
- The
load
method correctly uses passed parameters- Error handling for corrupted metadata files
- Migration of metadata with missing fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/handlers/http/modal/mod.rs (2)
218-221
: Consider improving error handling by propagating errors instead of using .expect().Replace
.expect()
calls with proper error propagation using the?
operator to make the function more robust and maintainable.- let entries = options - .staging_dir() - .read_dir() - .expect("Couldn't read from file"); + let entries = options + .staging_dir() + .read_dir() + .map_err(|e| anyhow::anyhow!("Failed to read staging directory: {}", e))?; - let bytes = std::fs::read(path).expect("File should be present"); - let mut meta = Self::from_bytes(&bytes).expect("Extracted ingestor metadata"); + let bytes = std::fs::read(path) + .map_err(|e| anyhow::anyhow!("Failed to read ingestor file: {}", e))?; + let mut meta = Self::from_bytes(&bytes) + .map_err(|e| anyhow::anyhow!("Failed to parse ingestor metadata: {}", e))?; - meta.put_on_disk(staging_path) - .expect("Couldn't write to disk"); + meta.put_on_disk(staging_path) + .map_err(|e| anyhow::anyhow!("Failed to write metadata to disk: {}", e))?;Also applies to: 244-245, 273-274
316-337
: Consider refactoring migrate method in a future PR to follow dependency injection pattern.The
migrate
method still heavily relies on global state throughPARSEABLE
. Consider refactoring it in a future PR to accept explicit parameters, similar to theload
method.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/handlers/http/modal/mod.rs
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
🔇 Additional comments (2)
src/handlers/http/modal/mod.rs (2)
24-24
: LGTM!The new imports are correctly added to support the refactored token generation and JSON manipulation functionality.
Also applies to: 29-29, 38-38
216-230
: LGTM! Dependency injection pattern improves code modularity.The refactoring to accept explicit parameters (
options
andstorage
) instead of accessing global state is a good improvement that:
- Makes dependencies explicit
- Helps prevent deadlocks
- Improves testability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/handlers/http/modal/mod.rs (1)
355-399
: Add test coverage for the new from_bytes method.While the existing tests cover serialization and deserialization, they don't verify the backward compatibility handling in the new
from_bytes
method. Consider adding tests for:
- JSON without flight_port field
- JSON with invalid flight_port value
#[cfg(test)] mod test { use actix_web::body::MessageBody; use bytes::Bytes; use rstest::rstest; use super::IngestorMetadata; + #[rstest] + fn test_from_bytes_adds_flight_port() { + let json = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id"}"#; + let meta = IngestorMetadata::from_bytes(json, 8002).unwrap(); + assert_eq!(meta.flight_port, "8002"); + } + + #[rstest] + fn test_from_bytes_preserves_existing_flight_port() { + let json = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"9000"}"#; + let meta = IngestorMetadata::from_bytes(json, 8002).unwrap(); + assert_eq!(meta.flight_port, "9000"); + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/handlers/http/modal/mod.rs
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (3)
src/handlers/http/modal/mod.rs (3)
216-217
: LGTM! Dependency injection pattern properly implemented.The change to accept
options
andstorage
parameters instead of accessing global state helps prevent deadlocks by making dependencies explicit.
308-314
: LGTM! Well-structured deserialization with backward compatibility.The method properly handles JSON deserialization and maintains backward compatibility by adding the
flight_port
field if not present.
325-329
: LGTM! Consistent with the refactoring pattern.The changes maintain consistency with the new
from_bytes
method while preserving the existing error handling pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/handlers/http/modal/mod.rs (1)
231-278
: Consider breaking down the metadata update logic.The metadata update logic in the
load
method is quite long and handles multiple concerns (file reading, metadata updates, token generation). Consider extracting the update logic into a separate method for better maintainability.impl IngestorMetadata { + fn update_metadata(&mut self, url: String, port: String, username: &str, password: &str) { + if self.domain_name != url { + info!( + "Domain Name was Updated. Old: {} New: {}", + self.domain_name, url + ); + self.domain_name = url; + } + + if self.port != port { + info!("Port was Updated. Old: {} New: {}", self.port, port); + self.port = port; + } + + let token = format!( + "Basic {}", + BASE64_STANDARD.encode(format!("{username}:{password}")) + ); + if self.token != token { + warn!( + "Credentials were Updated. Tokens updated; Old: {} New: {}", + self.token, token + ); + self.token = token; + } + }Then update the
load
method:let mut meta = Self::from_bytes(&bytes, options.flight_port).expect("Extracted ingestor metadata"); - - // compare url endpoint and port, update - if meta.domain_name != url { - info!( - "Domain Name was Updated. Old: {} New: {}", - meta.domain_name, url - ); - meta.domain_name = url; - } - - if meta.port != port { - info!("Port was Updated. Old: {} New: {}", meta.port, port); - meta.port = port; - } - - let token = format!( - "Basic {}", - BASE64_STANDARD.encode(format!("{username}:{password}")) - ); - if meta.token != token { - warn!( - "Credentials were Updated. Tokens updated; Old: {} New: {}", - meta.token, token - ); - meta.token = token; - } + meta.update_metadata(url, port, username, password);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/handlers/http/modal/mod.rs
(6 hunks)
🧰 Additional context used
🧠 Learnings (1)
src/handlers/http/modal/mod.rs (1)
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/modal/mod.rs:218-221
Timestamp: 2025-02-12T17:39:43.392Z
Learning: In the Parseable codebase, staging directory access failures should trigger a panic rather than being handled as recoverable errors, as it indicates a critical system issue.
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (5)
src/handlers/http/modal/mod.rs (5)
216-217
: LGTM! Good improvement in dependency injection.The method signature change to accept
options
andstorage
as parameters aligns with the PR's objective of preventing deadlocks by making dependencies explicit.
218-221
: Staging directory access correctly triggers panic.Based on the retrieved learning from @de-sh, staging directory access failures should trigger a panic as it indicates a critical system issue. The current implementation using
expect
aligns with this requirement.
222-230
: LGTM! Clean extraction of configuration.The code now correctly uses the
Options
struct methods (get_url()
,staging_dir()
) instead of accessing global state, which helps prevent deadlocks.
309-315
: LGTM! Well-documented and focused helper method.The new
from_bytes
method is well-documented and has a single responsibility of handling the flight port field in the JSON data.
381-394
: LGTM! Good test coverage for the new functionality.The new test cases thoroughly verify the behavior of the
from_bytes
method, including both the addition of missing flight port and preservation of existing flight port values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/parseable/streams.rs (1)
598-600
: LGTM! Debug statement added for retention value.The debug statement will help trace retention values during deadlock investigation. Consider adding more context to the debug output to differentiate between get/set operations.
- self.metadata.write().expect(LOCK_EXPECT).retention = Some(dbg!(retention)); + self.metadata.write().expect(LOCK_EXPECT).retention = Some(dbg!("Setting retention" => retention));src/handlers/http/logstream.rs (1)
262-277
: Consider extracting repeated stream validation logic into a helper function.The pattern of checking stream existence in memory and storage (in query mode) is duplicated across multiple functions. Consider extracting this into a helper function to improve maintainability and reduce code duplication.
+ async fn validate_stream_exists(stream_name: &str) -> Result<(), StreamError> { + if !PARSEABLE.streams.contains(stream_name) { + if PARSEABLE.options.mode == Mode::Query { + match PARSEABLE + .create_stream_and_schema_from_storage(stream_name) + .await + { + Ok(true) => Ok(()), + Ok(false) | Err(_) => Err(StreamNotFound(stream_name.to_string()).into()), + } + } else { + Err(StreamNotFound(stream_name.to_string()).into()) + } + } else { + Ok(()) + } + }Then use it in the functions:
- if !PARSEABLE.streams.contains(&stream_name) { - if PARSEABLE.options.mode == Mode::Query { - match PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } - } else { - return Err(StreamNotFound(stream_name).into()); - } - } + validate_stream_exists(&stream_name).await?;Also applies to: 368-380, 479-494, 512-520
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/handlers/http/logstream.rs
(1 hunks)src/parseable/streams.rs
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (3)
src/parseable/streams.rs (1)
571-573
: LGTM! Debug statement added for retention value.The debug statement will help trace retention values during deadlock investigation.
src/handlers/http/logstream.rs (2)
50-61
: LGTM! Improved error handling prevents deadlocks.The enhanced error handling now properly validates stream existence in both memory and storage before deletion, which helps prevent potential deadlocks. The code first checks memory, then storage (in query mode), ensuring all states are properly verified before proceeding.
63-89
: LGTM! Well-structured deletion sequence.The deletion sequence is properly ordered to maintain data consistency:
- Storage deletion
- Staging directory cleanup
- Hot tier removal
- Memory and stats cleanup
This order ensures that even if the process fails midway, we don't end up with orphaned data or inconsistent states.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/stats.rs (1)
191-202
: Consider enhancing error handling and performance.While the implementation is functional, there are a few potential improvements:
- The error handling could be more descriptive to aid debugging.
- The collection step could be more efficient.
Consider these improvements:
fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) { - let families: Vec<MetricFamily> = metrics.collect().into_iter().collect(); - for metric in families.iter().flat_map(|m| m.get_metric()) { + for metric in metrics.collect::<Vec<MetricFamily>>().iter().flat_map(|m| m.get_metric()) { let label: Vec<&str> = metric.get_label().iter().map(|l| l.get_value()).collect(); if !label.starts_with(prefix) { continue; } if let Err(err) = metrics.remove_label_values(&label) { - warn!("Error = {err}"); + warn!("Failed to remove label values {:?}: {}", label, err); } } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/stats.rs
(2 hunks)src/storage/object_storage.rs
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/storage/object_storage.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (2)
src/stats.rs (2)
18-24
: LGTM: Well-organized imports.The new imports are properly organized and support the new metric deletion functionality.
170-189
: LGTM: Improved metric deletion handling.The changes improve robustness by using a dedicated helper function for date-based metrics deletion, which could help prevent deadlocks by handling metric deletion more systematically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
src/handlers/http/logstream.rs (1)
213-219
:⚠️ Potential issueHandle potential inconsistency in retention update.
If the memory update fails after the storage update succeeds, the system could be left in an inconsistent state where storage and memory have different retention configurations.
Consider wrapping both operations in a transaction or adding rollback logic:
- PARSEABLE - .storage - .get_object_store() - .put_retention(&stream_name, &retention) - .await?; - - PARSEABLE.get_stream(&stream_name)?.set_retention(retention); + let stream = PARSEABLE.get_stream(&stream_name)?; + stream.set_retention(retention.clone()); + match PARSEABLE + .storage + .get_object_store() + .put_retention(&stream_name, &retention) + .await + { + Ok(_) => Ok(()), + Err(e) => { + // Rollback memory update on storage failure + stream.set_retention(old_retention); + Err(e) + } + }?;
🧹 Nitpick comments (2)
src/handlers/http/logstream.rs (2)
135-142
: Enhance error handling for distributed mode.The error handling for distributed mode could be more specific about the failure reason. Currently, it returns a generic StreamNotFound error when create_stream_and_schema_from_storage fails.
Consider adding more specific error types:
- if PARSEABLE.options.mode == Mode::Query - && !PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await? - { - return Err(StreamNotFound(stream_name.clone()).into()); - } + if PARSEABLE.options.mode == Mode::Query { + match PARSEABLE + .create_stream_and_schema_from_storage(&stream_name) + .await + { + Ok(false) => return Err(StreamNotFound(stream_name.clone()).into()), + Err(e) => return Err(StreamError::Storage(e)), + Ok(true) => {} + } + }
435-449
: Enhance hot tier validation and error handling.The hot tier validation and update could benefit from more comprehensive validation and better error handling.
Consider adding size validation before updating and handling partial failures:
+ // Validate size before any updates + if hottier.size == 0 { + return Err(StreamError::HotTierValidation( + HotTierValidationError::InvalidSize("Hot tier size cannot be zero".into()) + )); + } + validator::hot_tier(&hottier.size.to_string())?; stream.set_hot_tier(true); let Some(hot_tier_manager) = HotTierManager::global() else { return Err(StreamError::HotTierNotEnabled(stream_name)); }; + + // Validate and update atomically + let update_result = hot_tier_manager + .validate_and_update_hot_tier(&stream_name, &mut hottier) + .await; + + match update_result { + Ok(_) => Ok(()), + Err(e) => { + stream.set_hot_tier(false); + Err(e) + } + }?;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/handlers/http/logstream.rs
(10 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
- GitHub Check: Build Default x86_64-unknown-linux-gnu
🔇 Additional comments (1)
src/handlers/http/logstream.rs (1)
51-61
: Consider potential race condition in stream existence check.The current implementation has a potential race condition between checking for stream existence and attempting to create it. If the stream is deleted between these operations, the error handling might be inconsistent.
Consider wrapping the existence check and creation in a transaction or using a more atomic approach:
- if !PARSEABLE.streams.contains(&stream_name) - && PARSEABLE.options.mode == Mode::Query - && matches!( - PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await, - Ok(true) | Err(_) - ) + let stream_exists = PARSEABLE.streams.contains(&stream_name); + let create_result = if !stream_exists && PARSEABLE.options.mode == Mode::Query { + PARSEABLE + .create_stream_and_schema_from_storage(&stream_name) + .await + } else { + Ok(false) + }; + if !stream_exists && matches!(create_result, Ok(true) | Err(_))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/handlers/http/logstream.rs (1)
654-670
: Add tests for stream existence checks.The commented-out tests should be fixed and uncommented to ensure proper error handling.
Would you like me to help implement these tests to verify the stream existence checks?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/handlers/http/logstream.rs
(9 hunks)src/storage/object_storage.rs
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/storage/object_storage.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (1)
src/handlers/http/logstream.rs (1)
191-193
: LGTM! Type safety improvement.The change from
Json<Value>
toJson<Retention>
improves type safety by ensuring the input matches the expected structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good
Fixes #XXXX.
Description
As coderabbit points out:
This PR has:
Summary by CodeRabbit