Skip to content

fix: bugs introduced in #1143 #1185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Feb 14, 2025
Merged

fix: bugs introduced in #1143 #1185

merged 16 commits into from
Feb 14, 2025

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Feb 12, 2025

Fixes #XXXX.

Description

  • While refactoring, a deadlock was unwittingly introduced, this PR resolves it.
  • Similarly, fixing how we handle missing streams in distributed mode.

As coderabbit points out:

The change to pass options and storage to IngestorMetadata::load completes the dependency injection pattern, helping prevent deadlocks by making dependencies explicit.


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • Refactor
    • Centralized configuration for staging directories and URL generation, offering a more consistent and reliable approach across the application.
    • Updated metadata loading to leverage injected settings and storage, reducing dependency on global state.
    • Improved handling of the staging directory across various components for better flexibility and clarity.
    • Enhanced API structure by refining log stream schema retrieval and adding cluster management functionalities.
  • Bug Fixes
    • Enhanced error handling for log stream deletion, ensuring accurate validation against both in-memory and storage data.
  • Chores
    • Removed outdated utilities for URL construction and environment variable retrieval.

Copy link
Contributor

coderabbitai bot commented Feb 12, 2025

Walkthrough

The changes introduce new methods in the Options struct to handle staging directories and URL construction. References across the codebase for accessing the staging directory have been updated to use these methods via the options field. The IngestorMetadata::load method now accepts additional parameters, removing reliance on global state, and a new private from_bytes helper aids JSON updates. Utility functions for URL handling have been removed in favor of the new methods, and error handling in log stream deletion has been enhanced to check for stream existence in both memory and storage.

Changes

File(s) Change Summary
src/cli.rs Added staging_dir and get_url methods in Options for directory creation and URL construction; added env import.
src/handlers/http/modal/mod.rs Updated IngestorMetadata::load to accept &Options and &dyn ObjectStorageProvider; refactored metadata loading and introduced private from_bytes.
src/metrics/prom_utils.rs, src/utils/mod.rs Replaced direct URL construction; removed get_url and get_from_env functions and now use PARSEABLE.options.get_url().
src/parseable/mod.rs, src/banner.rs, src/handlers/http/about.rs, src/migration/mod.rs, src/storage/object_storage.rs, src/storage/store_metadata.rs Refactored staging directory access to use options.staging_dir(); removed staging_dir method from Parseable and updated related metadata and storage functions.
src/handlers/http/logstream.rs Enhanced error handling in the delete function by verifying stream existence in both memory and storage (for Query mode).
src/stats.rs Added new imports and a delete_with_label_prefix function to enhance metric deletion logic.
src/handlers/http/modal/query_server.rs, src/handlers/http/modal/server.rs Updated method calls for log stream schema retrieval from logstream::schema to logstream::get_schema; added get_cluster_web_scope method in QueryServer.

Sequence Diagram(s)

sequenceDiagram
    participant U as User
    participant LS as LogstreamHandler
    participant M as Memory
    participant S as Storage

    U->>LS: Request delete stream
    LS->>M: Check for stream in memory
    alt Stream exists in Memory
        LS->>LS: Proceed with deletion
    else Stream not in Memory
        LS->>LS: Verify mode is "Query"
        LS->>S: Attempt to fetch stream from storage
        alt Stream found or error on creation
            LS->>U: Return StreamNotFound error
        end
    end
Loading

Suggested reviewers

  • nitisht

Poem

I'm a code rabbit, hopping through the change,
Staging paths now updated in a rearranged range.
URLs get built with careful art,
Global states now drift apart.
In memory and storage, checks never stray—
Hop along, dear coder, to a brighter day!
🐰✨


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1e32ad9 and 433a04f.

📒 Files selected for processing (2)
  • src/handlers/http/logstream.rs (9 hunks)
  • src/parseable/mod.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/handlers/http/logstream.rs
  • src/parseable/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@de-sh de-sh requested a review from nitisht February 12, 2025 12:50
@@ -385,4 +385,66 @@ impl Options {
pub fn is_default_creds(&self) -> bool {
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
}

/// TODO: refactor and document
Copy link
Contributor Author

@de-sh de-sh Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another PR we can move this code to using Url over a regular String and being optional, which will significantly simplify and make things more readable

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/cli.rs (1)

389-390: Add documentation for the get_url method.

The TODO comment indicates missing documentation. Please add comprehensive documentation explaining:

  • The purpose of the method
  • Parameter requirements
  • Return value
  • Error conditions
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 20e66a4 and 730d7dd.

📒 Files selected for processing (5)
  • src/cli.rs (2 hunks)
  • src/handlers/http/modal/mod.rs (4 hunks)
  • src/metrics/prom_utils.rs (2 hunks)
  • src/parseable/mod.rs (1 hunks)
  • src/utils/mod.rs (0 hunks)
💤 Files with no reviewable changes (1)
  • src/utils/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (5)
src/metrics/prom_utils.rs (1)

64-64: LGTM! URL construction is now handled by the Options struct.

The change to use PARSEABLE.options.get_url() aligns with the new URL construction approach, making the code more maintainable.

src/handlers/http/modal/mod.rs (2)

216-228: Great improvement! Explicit parameter passing reduces global state dependencies.

The changes to accept options and storage parameters make the dependencies explicit and help prevent deadlocks by avoiding global state access.


294-294: LGTM! Consistent use of passed storage parameter.

Using the passed storage parameter instead of accessing global state maintains consistency with the dependency injection approach.

src/cli.rs (1)

391-449: LGTM! Robust URL construction with comprehensive error handling.

The implementation:

  • Handles both empty and non-empty ingestor_endpoint cases
  • Provides clear error messages for invalid configurations
  • Properly validates environment variables
  • Ensures URL construction is consistent
src/parseable/mod.rs (1)

132-132: LGTM! Proper dependency injection completes the deadlock fix.

The change to pass options and storage to IngestorMetadata::load completes the dependency injection pattern, helping prevent deadlocks by making dependencies explicit.

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 12, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/handlers/http/modal/mod.rs (1)

345-345: Consider passing staging_dir as parameter.

The migrate method still accesses PARSEABLE.options.staging_dir() directly. Consider passing it as a parameter to maintain consistency with the dependency injection pattern.

-    resource.put_on_disk(PARSEABLE.options.staging_dir())?;
+    resource.put_on_disk(options.staging_dir())?;
src/cli.rs (1)

397-457: Consider breaking down the URL construction logic.

The method is quite long and handles multiple responsibilities. Consider splitting it into smaller, focused functions:

  • URL construction from scheme and address
  • Hostname and port parsing
  • Environment variable resolution
 impl Options {
+    fn resolve_env_var(var_name: &str, var_type: &str) -> String {
+        let value = env::var(var_name).unwrap_or_default();
+        if value.is_empty() {
+            panic!(
+                "The environement variable `{}` is not set. Please refer to the documentation: https://logg.ing/env for more details.",
+                var_name
+            );
+        }
+        value
+    }
+
+    fn validate_hostname(hostname: &str, var_name: Option<&str>) -> String {
+        if hostname.starts_with("http") {
+            let err_msg = format!(
+                "Invalid value `{}`, please set {} to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.",
+                hostname,
+                var_name.map_or("the hostname".to_string(), |v| format!("the environement variable `{}`", v))
+            );
+            panic!("{}", err_msg);
+        }
+        hostname.to_string()
+    }
+
     pub fn get_url(&self) -> Url {
         if self.ingestor_endpoint.is_empty() {
             return format!(
                 "{}://{}",
                 self.get_scheme(),
                 self.address
             )
             .parse::<Url>()
             .unwrap_or_else(|err| {
                 panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
             });
         }

         let ingestor_endpoint = &self.ingestor_endpoint;

         if ingestor_endpoint.starts_with("http") {
             panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
         }

         let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();

         if addr_from_env.len() != 2 {
             panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
         }

         let mut hostname = addr_from_env[0].to_string();
         let mut port = addr_from_env[1].to_string();

         if hostname.starts_with('$') {
             let var_hostname = hostname[1..].to_string();
             hostname = Self::resolve_env_var(&var_hostname, "hostname");
             hostname = Self::validate_hostname(&hostname, Some(&var_hostname));
             hostname = format!("{}://{}", self.get_scheme(), hostname);
         }

         if port.starts_with('$') {
             let var_port = port[1..].to_string();
             port = Self::resolve_env_var(&var_port, "port");
         }

         format!("{}://{}:{}", self.get_scheme(), hostname, port)
             .parse::<Url>()
             .expect("Valid URL")
     }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 730d7dd and acbb892.

📒 Files selected for processing (8)
  • src/banner.rs (1 hunks)
  • src/cli.rs (2 hunks)
  • src/handlers/http/about.rs (1 hunks)
  • src/handlers/http/modal/mod.rs (5 hunks)
  • src/migration/mod.rs (2 hunks)
  • src/parseable/mod.rs (1 hunks)
  • src/storage/object_storage.rs (1 hunks)
  • src/storage/store_metadata.rs (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/parseable/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (12)
src/handlers/http/modal/mod.rs (2)

216-228: LGTM! Dependency injection pattern improves code quality.

The refactoring to pass options and storage explicitly as parameters instead of relying on the global PARSEABLE variable is a good improvement. This change:

  • Makes dependencies explicit and clear
  • Reduces coupling with global state
  • Helps prevent potential deadlocks during initialization

294-303: LGTM! Consistent use of passed dependencies.

The code correctly uses the passed storage parameter instead of accessing the global state, maintaining consistency with the dependency injection pattern.

src/storage/store_metadata.rs (3)

73-73: LGTM! Consistent staging directory access.

The change to access staging directory through options aligns with the codebase-wide refactoring pattern.


137-161: LGTM! Consistent error handling and staging directory access.

The code maintains proper error handling while updating the staging directory access pattern. The changes are well-structured and maintain the existing functionality.


263-267: LGTM! Consistent path construction.

The code correctly uses the staging directory from options for constructing the metadata file path.

src/handlers/http/about.rs (1)

69-69: LGTM! Consistent staging directory access.

The change to access staging directory through options aligns with the codebase-wide refactoring pattern.

src/banner.rs (1)

107-107: LGTM! Consistent staging directory access.

The change to access staging directory through options aligns with the codebase-wide refactoring pattern.

src/cli.rs (2)

389-395: LGTM! Clear and focused implementation.

The method ensures the staging directory exists and provides a clean interface to access it.


397-398: Please clarify the TODO comment.

The TODO comment "refactor and document" is too vague. Please specify what aspects need refactoring and what documentation is missing.

src/migration/mod.rs (2)

328-329: LGTM! Consistent with the dependency injection pattern.

The change to use config.options.staging_dir() makes the dependency on options explicit and helps prevent deadlocks.


355-355: LGTM! Consistent with the dependency injection pattern.

The change to use config.options.staging_dir() aligns with the pattern used in get_staging_metadata and helps prevent deadlocks.

src/storage/object_storage.rs (1)

648-648: LGTM! Consistent with the dependency injection pattern.

The change to use PARSEABLE.options.staging_dir() aligns with the pattern used across the codebase and helps prevent deadlocks.

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 12, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/handlers/http/modal/mod.rs (1)

229-291: Consider adding error recovery for corrupted metadata.

The code assumes the JSON file is not corrupted and uses expect which will panic if the file is invalid. Consider adding graceful error handling.

-                let text = std::fs::read(path).expect("File should be present");
-                let mut meta: Value = serde_json::from_slice(&text).expect("Valid JSON");
+                let text = std::fs::read(path).map_err(|e| {
+                    warn!("Failed to read ingestor metadata file: {}", e);
+                    e
+                })?;
+                let mut meta: Value = serde_json::from_slice(&text).map_err(|e| {
+                    warn!("Invalid JSON in ingestor metadata file: {}", e);
+                    e
+                })?;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between acbb892 and 11e9a9a.

📒 Files selected for processing (2)
  • src/handlers/http/modal/mod.rs (5 hunks)
  • src/parseable/mod.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/parseable/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (5)
src/handlers/http/modal/mod.rs (5)

215-216: LGTM! Improved dependency injection pattern.

The updated method signature now explicitly accepts dependencies (options and storage) instead of relying on global state, which helps prevent deadlocks and makes the code more maintainable.


218-228: LGTM! Consistent use of passed parameters.

The code now correctly uses the passed options parameter to access configuration values instead of relying on the global PARSEABLE state.


294-308: LGTM! Proper initialization of new metadata.

The code correctly initializes new metadata when no existing metadata is found, using the passed parameters.


345-345: LGTM! Consistent staging directory access.

The migrate method now correctly uses PARSEABLE.options.staging_dir() for consistency with other code paths.


372-416: Add test coverage for the deadlock fix.

While the existing tests cover serialization/deserialization, consider adding tests that verify:

  1. The load method correctly uses passed parameters
  2. Error handling for corrupted metadata files
  3. Migration of metadata with missing fields

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 12, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/handlers/http/modal/mod.rs (2)

218-221: Consider improving error handling by propagating errors instead of using .expect().

Replace .expect() calls with proper error propagation using the ? operator to make the function more robust and maintainable.

-        let entries = options
-            .staging_dir()
-            .read_dir()
-            .expect("Couldn't read from file");
+        let entries = options
+            .staging_dir()
+            .read_dir()
+            .map_err(|e| anyhow::anyhow!("Failed to read staging directory: {}", e))?;

-            let bytes = std::fs::read(path).expect("File should be present");
-            let mut meta = Self::from_bytes(&bytes).expect("Extracted ingestor metadata");
+            let bytes = std::fs::read(path)
+                .map_err(|e| anyhow::anyhow!("Failed to read ingestor file: {}", e))?;
+            let mut meta = Self::from_bytes(&bytes)
+                .map_err(|e| anyhow::anyhow!("Failed to parse ingestor metadata: {}", e))?;

-            meta.put_on_disk(staging_path)
-                .expect("Couldn't write to disk");
+            meta.put_on_disk(staging_path)
+                .map_err(|e| anyhow::anyhow!("Failed to write metadata to disk: {}", e))?;

Also applies to: 244-245, 273-274


316-337: Consider refactoring migrate method in a future PR to follow dependency injection pattern.

The migrate method still heavily relies on global state through PARSEABLE. Consider refactoring it in a future PR to accept explicit parameters, similar to the load method.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 11e9a9a and cd2861a.

📒 Files selected for processing (1)
  • src/handlers/http/modal/mod.rs (5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (2)
src/handlers/http/modal/mod.rs (2)

24-24: LGTM!

The new imports are correctly added to support the refactored token generation and JSON manipulation functionality.

Also applies to: 29-29, 38-38


216-230: LGTM! Dependency injection pattern improves code modularity.

The refactoring to accept explicit parameters (options and storage) instead of accessing global state is a good improvement that:

  • Makes dependencies explicit
  • Helps prevent deadlocks
  • Improves testability

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/handlers/http/modal/mod.rs (1)

355-399: Add test coverage for the new from_bytes method.

While the existing tests cover serialization and deserialization, they don't verify the backward compatibility handling in the new from_bytes method. Consider adding tests for:

  • JSON without flight_port field
  • JSON with invalid flight_port value
 #[cfg(test)]
 mod test {
     use actix_web::body::MessageBody;
     use bytes::Bytes;
     use rstest::rstest;

     use super::IngestorMetadata;

+    #[rstest]
+    fn test_from_bytes_adds_flight_port() {
+        let json = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id"}"#;
+        let meta = IngestorMetadata::from_bytes(json, 8002).unwrap();
+        assert_eq!(meta.flight_port, "8002");
+    }
+
+    #[rstest]
+    fn test_from_bytes_preserves_existing_flight_port() {
+        let json = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"9000"}"#;
+        let meta = IngestorMetadata::from_bytes(json, 8002).unwrap();
+        assert_eq!(meta.flight_port, "9000");
+    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cd2861a and e592421.

📒 Files selected for processing (1)
  • src/handlers/http/modal/mod.rs (5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (3)
src/handlers/http/modal/mod.rs (3)

216-217: LGTM! Dependency injection pattern properly implemented.

The change to accept options and storage parameters instead of accessing global state helps prevent deadlocks by making dependencies explicit.


308-314: LGTM! Well-structured deserialization with backward compatibility.

The method properly handles JSON deserialization and maintains backward compatibility by adding the flight_port field if not present.


325-329: LGTM! Consistent with the refactoring pattern.

The changes maintain consistency with the new from_bytes method while preserving the existing error handling pattern.

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 12, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/handlers/http/modal/mod.rs (1)

231-278: Consider breaking down the metadata update logic.

The metadata update logic in the load method is quite long and handles multiple concerns (file reading, metadata updates, token generation). Consider extracting the update logic into a separate method for better maintainability.

 impl IngestorMetadata {
+    fn update_metadata(&mut self, url: String, port: String, username: &str, password: &str) {
+        if self.domain_name != url {
+            info!(
+                "Domain Name was Updated. Old: {} New: {}",
+                self.domain_name, url
+            );
+            self.domain_name = url;
+        }
+
+        if self.port != port {
+            info!("Port was Updated. Old: {} New: {}", self.port, port);
+            self.port = port;
+        }
+
+        let token = format!(
+            "Basic {}",
+            BASE64_STANDARD.encode(format!("{username}:{password}"))
+        );
+        if self.token != token {
+            warn!(
+                "Credentials were Updated. Tokens updated; Old: {} New: {}",
+                self.token, token
+            );
+            self.token = token;
+        }
+    }

Then update the load method:

             let mut meta =
                 Self::from_bytes(&bytes, options.flight_port).expect("Extracted ingestor metadata");
-
-            // compare url endpoint and port, update
-            if meta.domain_name != url {
-                info!(
-                    "Domain Name was Updated. Old: {} New: {}",
-                    meta.domain_name, url
-                );
-                meta.domain_name = url;
-            }
-
-            if meta.port != port {
-                info!("Port was Updated. Old: {} New: {}", meta.port, port);
-                meta.port = port;
-            }
-
-            let token = format!(
-                "Basic {}",
-                BASE64_STANDARD.encode(format!("{username}:{password}"))
-            );
-            if meta.token != token {
-                warn!(
-                    "Credentials were Updated. Tokens updated; Old: {} New: {}",
-                    meta.token, token
-                );
-                meta.token = token;
-            }
+            meta.update_metadata(url, port, username, password);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 00a3b1e and 56efae3.

📒 Files selected for processing (1)
  • src/handlers/http/modal/mod.rs (6 hunks)
🧰 Additional context used
🧠 Learnings (1)
src/handlers/http/modal/mod.rs (1)
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/modal/mod.rs:218-221
Timestamp: 2025-02-12T17:39:43.392Z
Learning: In the Parseable codebase, staging directory access failures should trigger a panic rather than being handled as recoverable errors, as it indicates a critical system issue.
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (5)
src/handlers/http/modal/mod.rs (5)

216-217: LGTM! Good improvement in dependency injection.

The method signature change to accept options and storage as parameters aligns with the PR's objective of preventing deadlocks by making dependencies explicit.


218-221: Staging directory access correctly triggers panic.

Based on the retrieved learning from @de-sh, staging directory access failures should trigger a panic as it indicates a critical system issue. The current implementation using expect aligns with this requirement.


222-230: LGTM! Clean extraction of configuration.

The code now correctly uses the Options struct methods (get_url(), staging_dir()) instead of accessing global state, which helps prevent deadlocks.


309-315: LGTM! Well-documented and focused helper method.

The new from_bytes method is well-documented and has a single responsibility of handling the flight port field in the JSON data.


381-394: LGTM! Good test coverage for the new functionality.

The new test cases thoroughly verify the behavior of the from_bytes method, including both the addition of missing flight port and preservation of existing flight port values.

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 13, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/parseable/streams.rs (1)

598-600: LGTM! Debug statement added for retention value.

The debug statement will help trace retention values during deadlock investigation. Consider adding more context to the debug output to differentiate between get/set operations.

-        self.metadata.write().expect(LOCK_EXPECT).retention = Some(dbg!(retention));
+        self.metadata.write().expect(LOCK_EXPECT).retention = Some(dbg!("Setting retention" => retention));
src/handlers/http/logstream.rs (1)

262-277: Consider extracting repeated stream validation logic into a helper function.

The pattern of checking stream existence in memory and storage (in query mode) is duplicated across multiple functions. Consider extracting this into a helper function to improve maintainability and reduce code duplication.

+ async fn validate_stream_exists(stream_name: &str) -> Result<(), StreamError> {
+     if !PARSEABLE.streams.contains(stream_name) {
+         if PARSEABLE.options.mode == Mode::Query {
+             match PARSEABLE
+                 .create_stream_and_schema_from_storage(stream_name)
+                 .await
+             {
+                 Ok(true) => Ok(()),
+                 Ok(false) | Err(_) => Err(StreamNotFound(stream_name.to_string()).into()),
+             }
+         } else {
+             Err(StreamNotFound(stream_name.to_string()).into())
+         }
+     } else {
+         Ok(())
+     }
+ }

Then use it in the functions:

- if !PARSEABLE.streams.contains(&stream_name) {
-     if PARSEABLE.options.mode == Mode::Query {
-         match PARSEABLE
-             .create_stream_and_schema_from_storage(&stream_name)
-             .await
-         {
-             Ok(true) => {}
-             Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
-         }
-     } else {
-         return Err(StreamNotFound(stream_name).into());
-     }
- }
+ validate_stream_exists(&stream_name).await?;

Also applies to: 368-380, 479-494, 512-520

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 56efae3 and 75c933e.

📒 Files selected for processing (2)
  • src/handlers/http/logstream.rs (1 hunks)
  • src/parseable/streams.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (3)
src/parseable/streams.rs (1)

571-573: LGTM! Debug statement added for retention value.

The debug statement will help trace retention values during deadlock investigation.

src/handlers/http/logstream.rs (2)

50-61: LGTM! Improved error handling prevents deadlocks.

The enhanced error handling now properly validates stream existence in both memory and storage before deletion, which helps prevent potential deadlocks. The code first checks memory, then storage (in query mode), ensuring all states are properly verified before proceeding.


63-89: LGTM! Well-structured deletion sequence.

The deletion sequence is properly ordered to maintain data consistency:

  1. Storage deletion
  2. Staging directory cleanup
  3. Hot tier removal
  4. Memory and stats cleanup

This order ensures that even if the process fails midway, we don't end up with orphaned data or inconsistent states.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/stats.rs (1)

191-202: Consider enhancing error handling and performance.

While the implementation is functional, there are a few potential improvements:

  1. The error handling could be more descriptive to aid debugging.
  2. The collection step could be more efficient.

Consider these improvements:

 fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) {
-    let families: Vec<MetricFamily> = metrics.collect().into_iter().collect();
-    for metric in families.iter().flat_map(|m| m.get_metric()) {
+    for metric in metrics.collect::<Vec<MetricFamily>>().iter().flat_map(|m| m.get_metric()) {
         let label: Vec<&str> = metric.get_label().iter().map(|l| l.get_value()).collect();
         if !label.starts_with(prefix) {
             continue;
         }
         if let Err(err) = metrics.remove_label_values(&label) {
-            warn!("Error = {err}");
+            warn!("Failed to remove label values {:?}: {}", label, err);
         }
     }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 74a39ce and b1611e3.

📒 Files selected for processing (2)
  • src/stats.rs (2 hunks)
  • src/storage/object_storage.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/storage/object_storage.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (2)
src/stats.rs (2)

18-24: LGTM: Well-organized imports.

The new imports are properly organized and support the new metric deletion functionality.


170-189: LGTM: Improved metric deletion handling.

The changes improve robustness by using a dedicated helper function for date-based metrics deletion, which could help prevent deadlocks by handling metric deletion more systematically.

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 13, 2025
coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 14, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
src/handlers/http/logstream.rs (1)

213-219: ⚠️ Potential issue

Handle potential inconsistency in retention update.

If the memory update fails after the storage update succeeds, the system could be left in an inconsistent state where storage and memory have different retention configurations.

Consider wrapping both operations in a transaction or adding rollback logic:

-    PARSEABLE
-        .storage
-        .get_object_store()
-        .put_retention(&stream_name, &retention)
-        .await?;
-
-    PARSEABLE.get_stream(&stream_name)?.set_retention(retention);
+    let stream = PARSEABLE.get_stream(&stream_name)?;
+    stream.set_retention(retention.clone());
+    match PARSEABLE
+        .storage
+        .get_object_store()
+        .put_retention(&stream_name, &retention)
+        .await
+    {
+        Ok(_) => Ok(()),
+        Err(e) => {
+            // Rollback memory update on storage failure
+            stream.set_retention(old_retention);
+            Err(e)
+        }
+    }?;
🧹 Nitpick comments (2)
src/handlers/http/logstream.rs (2)

135-142: Enhance error handling for distributed mode.

The error handling for distributed mode could be more specific about the failure reason. Currently, it returns a generic StreamNotFound error when create_stream_and_schema_from_storage fails.

Consider adding more specific error types:

-    if PARSEABLE.options.mode == Mode::Query
-        && !PARSEABLE
-            .create_stream_and_schema_from_storage(&stream_name)
-            .await?
-    {
-        return Err(StreamNotFound(stream_name.clone()).into());
-    }
+    if PARSEABLE.options.mode == Mode::Query {
+        match PARSEABLE
+            .create_stream_and_schema_from_storage(&stream_name)
+            .await
+        {
+            Ok(false) => return Err(StreamNotFound(stream_name.clone()).into()),
+            Err(e) => return Err(StreamError::Storage(e)),
+            Ok(true) => {}
+        }
+    }

435-449: Enhance hot tier validation and error handling.

The hot tier validation and update could benefit from more comprehensive validation and better error handling.

Consider adding size validation before updating and handling partial failures:

+    // Validate size before any updates
+    if hottier.size == 0 {
+        return Err(StreamError::HotTierValidation(
+            HotTierValidationError::InvalidSize("Hot tier size cannot be zero".into())
+        ));
+    }
+
     validator::hot_tier(&hottier.size.to_string())?;
 
     stream.set_hot_tier(true);
     let Some(hot_tier_manager) = HotTierManager::global() else {
         return Err(StreamError::HotTierNotEnabled(stream_name));
     };
+
+    // Validate and update atomically
+    let update_result = hot_tier_manager
+        .validate_and_update_hot_tier(&stream_name, &mut hottier)
+        .await;
+
+    match update_result {
+        Ok(_) => Ok(()),
+        Err(e) => {
+            stream.set_hot_tier(false);
+            Err(e)
+        }
+    }?;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f8dea56 and 178c982.

📒 Files selected for processing (1)
  • src/handlers/http/logstream.rs (10 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
🔇 Additional comments (1)
src/handlers/http/logstream.rs (1)

51-61: Consider potential race condition in stream existence check.

The current implementation has a potential race condition between checking for stream existence and attempting to create it. If the stream is deleted between these operations, the error handling might be inconsistent.

Consider wrapping the existence check and creation in a transaction or using a more atomic approach:

-    if !PARSEABLE.streams.contains(&stream_name)
-        && PARSEABLE.options.mode == Mode::Query
-        && matches!(
-            PARSEABLE
-                .create_stream_and_schema_from_storage(&stream_name)
-                .await,
-            Ok(true) | Err(_)
-        )
+    let stream_exists = PARSEABLE.streams.contains(&stream_name);
+    let create_result = if !stream_exists && PARSEABLE.options.mode == Mode::Query {
+        PARSEABLE
+            .create_stream_and_schema_from_storage(&stream_name)
+            .await
+    } else {
+        Ok(false)
+    };
+    if !stream_exists && matches!(create_result, Ok(true) | Err(_))

@de-sh de-sh changed the title fix: deadlock during setup of parseable ingestor node fix: bugs introduced in #1143 Feb 14, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/handlers/http/logstream.rs (1)

654-670: Add tests for stream existence checks.

The commented-out tests should be fixed and uncommented to ensure proper error handling.

Would you like me to help implement these tests to verify the stream existence checks?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 178c982 and 1e32ad9.

📒 Files selected for processing (2)
  • src/handlers/http/logstream.rs (9 hunks)
  • src/storage/object_storage.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/storage/object_storage.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (1)
src/handlers/http/logstream.rs (1)

191-193: LGTM! Type safety improvement.

The change from Json<Value> to Json<Retention> improves type safety by ensuring the input matches the expected structure.

Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

@nitisht nitisht merged commit 523ecc7 into parseablehq:main Feb 14, 2025
14 checks passed
@coderabbitai coderabbitai bot mentioned this pull request Feb 18, 2025
3 tasks
parmesant pushed a commit to parmesant/parseable that referenced this pull request Feb 19, 2025
@de-sh de-sh deleted the ingestor-fix branch February 21, 2025 06:35
This was referenced Mar 5, 2025
This was referenced Mar 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants