Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-package): Add support for clp-s s3 ingestion #651

Open
wants to merge 64 commits into
base: main
Choose a base branch
from

Conversation

haiqi96
Copy link
Contributor

@haiqi96 haiqi96 commented Jan 3, 2025

Description

This PR adds s3 ingestion support for clp-s package. Currently, we support compressing with one s3_url(with prefix matching) at a time. User can either specify the credentials through commandline, or via a credentials file. If no credentials are provided, the package will attempt to use the associated IAM if there is any.

The PR contains the following changes:

  1. Introduces S3InputConfig and FsInputConfig for different ingestion path. Updated compression execution script to handle different input.
  2. Updated compression scheduler to handle scheduling for s3 objects. Note to avoid excessive change, we let s3 objects reuse the existing metadata class for FilePath.
  3. Updated compress.sh interface to allow specifying if the input is from local filesystem (fs) or s3(s3). Simplified native/compress.sh scripts' interface to always accept a target_list.
  4. Update execution container to install necessary dependency for curl lib

The design decisions involved in this PR is documented in this doc

Due to time limitation, we leave the following items as todo for future:

If users provide invalid S3 information, such as incorrect credentials or a non-existent bucket, the issue is only detected when the package attempts to schedule the job. Additionally, if the provided S3 credentials lack sufficient privileges (e.g., list_object is allowed, but get_object is not), the package will return unhelpful error messages to the web UI ("see error log.txt blabla"), making it confusing for user.

Ideally, invalid S3 configurations should be detected at package startup to provide immediate and actionable feedback.

Validation performed

Manually verified that both fs compression and s3 compression works from the commandline.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added support for compressing files from both filesystem and Amazon S3 sources
    • Enhanced configuration management for compression tasks
    • Improved input validation and error handling for compression processes
  • Improvements

    • Refined task routing and execution for compression jobs
    • Added robust metadata handling for different input types
    • Implemented more flexible command generation for compression tasks
  • Infrastructure

    • Updated Docker base images with additional certificate packages
    • Improved utility functions for S3 URL parsing and credential management
  • Bug Fixes

    • Resolved issues with path resolution and input type handling

Copy link
Contributor

coderabbitai bot commented Jan 3, 2025

Walkthrough

The pull request introduces comprehensive enhancements to the CLP (Compressed Log Processing) package, focusing on expanding support for both filesystem and Amazon S3 input types across multiple components. The changes involve refactoring compression-related scripts, adding new utility functions for S3 URL parsing and credential handling, and updating job configuration classes to support diverse input sources. The modifications improve input validation, error handling, and flexibility in processing compression tasks across different storage environments.

Changes

File Change Summary
clp-package-utils/clp_package_utils/scripts/compress.py Added functions for generating target lists, validating input arguments, and constructing compression commands with improved error handling
clp-package-utils/clp_package_utils/scripts/native/compress.py Introduced new functions for generating I/O configurations and processing compression targets with enhanced modularity
clp-py-utils/clp_py_utils/s3_utils.py Added new functions for AWS credential parsing, S3 URL handling, metadata retrieval, and URL generation
job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py Implemented methods for processing filesystem and S3 input paths with dynamic task scheduling
job-orchestration/job_orchestration/scheduler/job_config.py Created new input type enumeration and configuration classes for filesystem and S3 inputs
job-orchestration/job_orchestration/executor/compress/compression_task.py Enhanced compression task handling with new target file generation and improved command creation
Docker setup scripts Added ca-certificates package installation
job-orchestration/job_orchestration/executor/compress/celeryconfig.py Updated import paths and task routing for compression tasks

Sequence Diagram

sequenceDiagram
    participant Scheduler
    participant Executor
    participant S3/FileSystem
    participant CompressionTask

    Scheduler->>Scheduler: Determine Input Type
    alt Filesystem Input
        Scheduler->>Executor: Schedule FS Compression
    else S3 Input
        Scheduler->>S3/FileSystem: Retrieve Object Metadata
        S3/FileSystem-->>Scheduler: Return Metadata
        Scheduler->>Executor: Schedule S3 Compression
    end

    Executor->>CompressionTask: Generate Target List
    CompressionTask->>CompressionTask: Validate Inputs
    CompressionTask->>CompressionTask: Prepare Compression Command
    CompressionTask->>S3/FileSystem: Execute Compression
Loading

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@haiqi96
Copy link
Contributor Author

haiqi96 commented Jan 3, 2025

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Jan 3, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🔭 Outside diff range comments (2)
components/clp-package-utils/clp_package_utils/scripts/native/compress.py (1)

Line range hint 223-269: Return code best practices.
In main, returning a negative value (like -1) for errors is valid. For improved clarity, consider symbolic constants or enumerations (e.g., EXIT_FAILURE) to document the meaning.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)

Line range hint 1-36: Fix import block formatting (I001).
According to the pipeline failure, the import block here is not sorted or formatted as expected. Implement a standard import-sorting tool (e.g., isort) to comply with the style guidelines.

🧹 Nitpick comments (19)
tools/docker-images/clp-execution-base-ubuntu-focal/setup-scripts/install-prebuilt-packages.sh (1)

11-11: Excellent addition of ca-certificates package!

The addition of ca-certificates is essential for secure HTTPS connections to AWS S3 endpoints, ensuring proper SSL/TLS certificate validation during S3 ingestion operations.

tools/docker-images/clp-execution-base-ubuntu-jammy/setup-scripts/install-prebuilt-packages.sh (1)

11-11: Consistent with Focal image changes!

The parallel addition of ca-certificates maintains feature parity between Ubuntu versions, which is crucial for consistent S3 ingestion support across different environments.

Consider creating a shared package list file that both scripts can source to reduce duplication and ensure future package changes remain synchronized.

components/clp-py-utils/clp_py_utils/s3_utils.py (2)

14-15: Consider supporting alternative endpoints.
Currently, AWS_ENDPOINT is hard-coded to "amazonaws.com". For completeness and future adaptability, consider allowing other endpoint formats (e.g., for private cloud or non-public AWS-compatible services).


48-82: Validate or normalise the region code.
In parse_s3_url, consider normalising or validating region_code for uppercase vs. lowercase inconsistencies. This can curb potential mismatches in subsequent S3 calls.

components/clp-package-utils/clp_package_utils/scripts/compress.py (3)

28-46: Use explicit flush or context manager for file writing.
While Python's file objects flush on close, consider adding a context manager or explicit flush calls to ensure the written targets list is immediately saved, especially if subsequent operations rely on it.


121-133: Validate overlapping paths.
_validate_fs_input_args disallows specifying both paths and path_list. As an extra safeguard, consider checking for potential duplicates or overlapping paths across multiple lines in the file-based listing.


134-159: Strengthen messaging on credential conflicts.
When users pass both a credentials file and explicit credentials, the error message is appropriate. One could further clarify which method is recommended or display how these conflicts arose.

components/clp-package-utils/clp_package_utils/scripts/native/compress.py (2)

130-157: Optionally sanitise or normalise object keys.
_generate_clp_io_config sets S3 path details from user input. Depending on your environment, consider normalising or validating key_prefix to avoid unexpected directory traversals or unsafe characters.


159-193: Validate empty lines.
_get_targets_to_compress skips empty lines, which is correct. Recommend logging or collecting line numbers of these empty entries if repeated blank lines could indicate user mistakes.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)

126-153: Profile large S3 directory listings.
_process_s3_input may encounter very large key sets. Evaluate memory usage or time overhead with extremely large S3 prefixes, and consider streaming or chunked processing if required.


180-208: Log scheduling outcomes.
When a job fails due to an invalid input type or an empty S3 prefix, info-level logs might help operators quickly identify the cause, especially in multi-tenant or large-scale scenarios.

components/job-orchestration/job_orchestration/scheduler/job_config.py (2)

10-12: Add docstrings for the InputType enum.
Documenting the usage context (e.g., “FS for local file system, S3 for Amazon S3-based data”) within the enum can enhance clarity and maintainability for future contributors.


22-28: Consider optional path normalisation.
Within FsInputConfig, consider normalising or expanding user-supplied paths to absolute forms. This pre-processing step can prevent confusion or ambiguous references.

components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (6)

7-7: Ensure consistent import usage.
You have added Dict, List, Optional, and Tuple in addition to Any. If you are not using Any anywhere below, consider removing it to keep imports minimal.


125-125: Document the new tuple return type.
The updated return type Tuple[List[str], Optional[Dict[str, str]]] is now more precise. Provide docstring-level details about the returned environment dict to help maintainers understand how to use or modify it.


167-167: Document environment usage for clp_s.
The new function signature returning (List[str], Optional[Dict[str, str]]) is clear. However, consider highlighting the environment usage for clp_s specifically, especially regarding AWS credentials or expansions in the future.


281-290: Validate input type coverage.
You have added logic to generate either a filesystem targets file or an S3 targets file. This branching is thorough, but ensure that future input types or edge cases (e.g., empty paths_to_compress for S3) are handled gracefully.


303-305: Consider controlling concurrency or piping.
subprocess.Popen(..., stdout=subprocess.PIPE, ...) might be fine for smaller tasks, but large-scale compression tasks can produce large STDOUT logs. Ensure you have a strategy to handle this output efficiently to avoid potential memory issues.


373-374: Clean up after partial failures.
Unlinking the targets list path is done only if compression is successful. Consider whether you want to remove it in cases of failure or partial success to prevent stale files from accumulating.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 61f9902 and fd9dba2.

📒 Files selected for processing (8)
  • components/clp-package-utils/clp_package_utils/scripts/compress.py (4 hunks)
  • components/clp-package-utils/clp_package_utils/scripts/native/compress.py (4 hunks)
  • components/clp-py-utils/clp_py_utils/s3_utils.py (1 hunks)
  • components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (9 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (3 hunks)
  • components/job-orchestration/job_orchestration/scheduler/job_config.py (3 hunks)
  • tools/docker-images/clp-execution-base-ubuntu-focal/setup-scripts/install-prebuilt-packages.sh (1 hunks)
  • tools/docker-images/clp-execution-base-ubuntu-jammy/setup-scripts/install-prebuilt-packages.sh (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: clp-lint
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

[error] 1-36: Import block is un-sorted or un-formatted (I001)

🔇 Additional comments (12)
components/clp-py-utils/clp_py_utils/s3_utils.py (2)

18-46: Ensure credentials file security.
parse_aws_credentials_file correctly retrieves credentials but relies on a plain-text file. Urge verifying that file permissions are restricted to authorised users to reduce the risk of exposing credentials.


90-131: Paginate carefully to avoid partial listings.
get_s3_object_metadata uses the paginator, which is good. Be sure to handle edge cases where large pagination sets might fail or get truncated by S3. Consider tests covering multi-page listing scenarios.

components/clp-package-utils/clp_package_utils/scripts/compress.py (1)

55-98: Expose operation details in logs.
_generate_compress_cmd elegantly assembles the compression command. If debugging is needed, consider logging the resultant command arguments (excluding sensitive credentials) to assist with troubleshooting.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)

83-125: Check for symbolic links or special filesystem objects.
_process_fs_input_paths currently captures directories and files. Consider also handling symbolic links or special files in a safe manner (e.g., ignoring broken links).

components/job-orchestration/job_orchestration/scheduler/job_config.py (1)

50-50: Praise for consolidated config approach.
ClpIoConfig unifies FS and S3 input logic under a single property. This design fosters cleaner upstream code. Nicely done!

components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (7)

23-23: Validate usage of newly imported s3 utilities.
Ensure that the usage of generate_s3_virtual_hosted_style_url and s3_put is covered by tests, particularly around error handling and S3 region/bucket configurations.


27-32: Streamline import structure.
The block of imports for ClpIoConfig, InputType, PathsToCompress, and S3InputConfig is consistent and clear. Ensure that each imported name is actually used within this file to avoid redundant imports.


85-101: Add tests for empty directory compression logic.
The new _generate_fs_targets_file function now handles empty_directories. This is a notable scenario that might be easily overlooked. Ensure you have test coverage confirming that correct paths for empty directories are added to the targets list.


158-158: No environment is returned for FS inputs.
Returning None for FS-based compression is correct. If additional environment variables become necessary (for local file workflows), you might easily extend this pattern without changing the function signature.


263-263: Usage check for make_clp_command_and_env.
You correctly invoke the new function for the CLP storage engine. Ensure you have adequate integration tests verifying that the returned command is valid and the environment is None or used properly.


270-270: Usage check for make_clp_s_command_and_env.
Similarly, confirm in your tests that enable_s3_write flows properly into the environment and command generation for the CLP_S engine.


293-294: Clear separation of "files-from" argument.
Appending the --files-from argument here makes the code straightforward to read, ensuring the target file is always appended at the end of the command. Nicely done.

Comment on lines +29 to +39
class S3InputConfig(BaseModel):
type: typing.Literal[InputType.S3.value] = InputType.S3.value
timestamp_key: typing.Optional[str] = None

region_code: str
bucket: str
key_prefix: str

aws_access_key_id: typing.Optional[str] = None
aws_secret_access_key: typing.Optional[str] = None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle missing region code more gracefully.
S3InputConfig currently expects region_code. Some private S3-like services do not require a formal region. You might consider making region_code optional or providing a default.

Comment on lines +103 to +118
def _generate_s3_targets_file(
output_file_path: pathlib.Path,
paths_to_compress: PathsToCompress,
s3_input_config: S3InputConfig,
) -> None:
# S3 object keys are stored as file_paths in `PathsToCompress`
object_keys = paths_to_compress.file_paths
with open(output_file_path, "w") as file:
if len(object_keys) > 0:
for object_key in object_keys:
s3_virtual_hosted_style_url = generate_s3_virtual_hosted_style_url(
s3_input_config.region_code, s3_input_config.bucket, object_key
)
file.write(s3_virtual_hosted_style_url)
file.write("\n")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider robust error handling for invalid S3 object key generation.
The _generate_s3_targets_file function writes a list of S3 object URLs to a file. If any failure occurs (e.g., invalid region code or bucket), your code logs no error here. Consider adding checks or exception handling, or ensure such errors are surfaced elsewhere in the pipeline.

Comment on lines +189 to +195
if InputType.S3 == clp_config.input.type:
compression_env = {
**os.environ,
"AWS_ACCESS_KEY_ID": clp_config.input.aws_access_key_id,
"AWS_SECRET_ACCESS_KEY": clp_config.input.aws_secret_access_key,
}
compression_cmd.append("--auth")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Protect AWS credentials in logs and error tracing.
Storing credentials in an environment dict is convenient, but there is a potential risk of them being accidentally logged or exposed. Validate that these credentials never get printed in logs or error messages.

@haiqi96 haiqi96 changed the title feat(clp-package): Support s3 ingestion(Draft) feat(clp-package): Add support for clp-s s3 ingestion Jan 3, 2025
@haiqi96 haiqi96 marked this pull request as ready for review January 3, 2025 21:10
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (6)
components/clp-py-utils/clp_py_utils/s3_utils.py (2)

14-16: Consider making AWS_ENDPOINT configurable

The hardcoded AWS_ENDPOINT constant might limit flexibility when working with different AWS environments or regions. Consider making this configurable through environment variables or configuration files.

# Constants
-AWS_ENDPOINT = "amazonaws.com"
+AWS_ENDPOINT = os.getenv("AWS_ENDPOINT", "amazonaws.com")

1-134: Consider implementing S3 operations as a class

The current implementation uses standalone functions. Consider refactoring into a class to:

  1. Share S3 client configuration
  2. Reduce duplicate code
  3. Improve testability
  4. Enable dependency injection

Example structure:

class S3Utils:
    def __init__(self, region_code: str, credentials: Optional[Dict[str, str]] = None):
        self.client = self._create_client(region_code, credentials)
    
    def _create_client(self, region_code: str, credentials: Optional[Dict[str, str]]) -> boto3.client:
        # Shared client configuration
        pass
    
    def get_object_metadata(self, bucket: str, prefix: str) -> Result[List[FileMetadata], str]:
        # Use self.client
        pass
    
    def put_object(self, bucket: str, key: str, file_path: Path) -> Result[bool, str]:
        # Use self.client
        pass
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2)

103-119: S3 target file generator usage.
Consider capturing or logging errors if generate_s3_virtual_hosted_style_url fails due to invalid bucket names or lack of region information.


189-199: Storing AWS secrets in environment variables.
While common in containers, be cautious to avoid logging these variables. Evaluate secure vault or IAM role usage if feasible.

components/clp-package-utils/clp_package_utils/scripts/compress.py (1)

28-53: Generating a targets list for FS or S3.
This function elegantly separates path handling by input type. Consider logging a warning if a path is unreadable or if the specified S3 URL is invalid.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)

83-124: FS path processing logic.
Recursively enumerating directories is handy. Consider large file counts for scaling or distribution.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fd9dba2 and 12d6b97.

📒 Files selected for processing (5)
  • components/clp-package-utils/clp_package_utils/scripts/compress.py (4 hunks)
  • components/clp-py-utils/clp_py_utils/s3_utils.py (1 hunks)
  • components/job-orchestration/job_orchestration/executor/compress/celeryconfig.py (1 hunks)
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py (10 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (3 hunks)
🔇 Additional comments (42)
components/clp-py-utils/clp_py_utils/s3_utils.py (1)

48-82: 🛠️ Refactor suggestion

Add comprehensive S3 URL validation

The current implementation could benefit from additional validation:

  1. Bucket name constraints (length: 3-63 characters, allowed characters)
  2. URL-encoded characters in key prefix
  3. Region code validation

Consider adding these validations:

def parse_s3_url(s3_url: str) -> Tuple[str, str, str]:
+    from urllib.parse import unquote
+    
+    def validate_bucket_name(name: str) -> None:
+        if not (3 <= len(name) <= 63):
+            raise ValueError(f"Bucket name length must be between 3 and 63 characters: {name}")
+        if not re.match(r"^[a-z0-9][a-z0-9.-]*[a-z0-9]$", name):
+            raise ValueError(f"Invalid bucket name format: {name}")
+
     # ... existing code ...
     
     bucket_name = match.group("bucket_name")
+    validate_bucket_name(bucket_name)
     
     # ... existing code ...
     
-    key_prefix = match.group("key_prefix")
+    key_prefix = unquote(match.group("key_prefix"))
     
     return region_code, bucket_name, key_prefix

Let's verify the bucket naming rules:

components/job-orchestration/job_orchestration/executor/compress/compression_task.py (14)

7-7: Well-done on extending type hints.
Adding these type hints can improve maintainability and facilitate static analysis.


23-23: Importing S3 utilities is appropriate.
Ensure consistent handling of S3 exceptions within the workflow to avoid unintended failures.


27-32: Good re-organization of imports.
Importing S3InputConfig and InputType helps maintain clarity in the code structure.


85-102: FS target file generation looks correct.
It might be prudent to verify the accessibility of each path before writing, especially if the code is used in multi-tenant environments or with untrusted inputs.


158-158: Returning no environment variables for FS mode is sensible.
This ensures the default environment does not leak unneeded variables.


161-168: Enhanced method signature.
This refactor is readable and returns both command and environment, aligning well with the S3 requirement.


249-249: S3 config retrieval is clearly indicated.
This inline documentation is helpful for future maintainers.


263-263: Calling make_clp_command_and_env.
This integrates FS-based compression efficiently without environment variables.


270-270: Calling make_clp_s_command_and_env.
Seamlessly enables S3-based compression, consistent with the new approach.


281-291: Correct branching for FS vs. S3 input.
The logic here is straightforward and ensures each input type is handled properly.


293-294: Appending --files-from argument.
This is a standard approach that streamlines input specification for the compression tool.


303-305: Potential subprocess injection risk.
If user inputs are not validated beforehand, injection attacks could occur. Verify or sanitise user-provided strings.


313-313: Clear comment for combined FS/S3 logic.
The inline explanation clarifies subsequent S3 upload steps.


373-374: Cleaning up temporary file.
Unlinking the targets file prevents clutter and potential resource leaks.

components/job-orchestration/job_orchestration/executor/compress/celeryconfig.py (2)

9-9: Updated import path is consistent with refactoring.
This ensures that Celery can discover the correct compression task module.


15-15: Task route updated correctly.
The updated route aligns with the new module name and supports Celery’s queue configuration.

components/clp-package-utils/clp_package_utils/scripts/compress.py (18)

7-8: Import and blank line addition.
Using List from typing is appropriate, and the blank line adds readability.


9-9: New import for S3 utils and config.
This helps unify local and S3 compression logic under one script.


55-76: Command generator for compression.
Excellent approach to unify parameters across FS and S3. Be mindful of capturing secrets in logs, especially if aws_access_key_id or aws_secret_access_key might appear in command line logs.


77-93: Handling S3 credential logic.
The fallback to the credentials file is prudent. Ensure that the fallback is tested in real environments.


94-98: Appending --target-list argument.
This keeps usage consistent with other compression scripts, simplifying user instructions.


99-99: Blank line.
No functional changes.


100-102: Function returns the final command list.
Straightforward approach. The list-based command structure helps avoid shell-specific pitfalls.


116-118: Added argument to disable progress reporting.
This is helpful for quiet mode in automated pipelines.


121-124: Validation helper for FS input.
Assuring we do not have contradictory or missing file paths prevents confusion at run time.


125-133: Exclusive path specification.
Stops the user from mixing a file list with direct paths, avoiding duplication or conflict.


134-159: Validation helper for S3 input.
Enforces that S3 is used with the correct storage engine and credential specification method. This prevents misconfiguration.


161-164: Main entrypoint creation.
Assigning a default config file path fosters a consistent user experience.


165-167: Argument parser creation.
Subparsers for FS and S3: good separation of concerns.


168-174: FS compressor parser.
Allows the user to specify paths or a path list, aligning with _validate_fs_input_args.


175-186: S3 compressor parser.
Correctly includes credentials arguments, consistent with _validate_s3_input_args.


202-209: Input type-based branching.
Raising a ValueError for unsupported input types ensures consistency.


219-230: Creation of target list file.
Your approach of generating a random filename with UUID is a robust way to avoid collisions.


232-237: Container execution command.
Building the container start command and compression command in tandem is a clean pattern.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (7)

23-23: S3 object metadata import.
Centralizing S3 lookups fosters consistency between scheduler and executor layers.


25-25: Direct import of compress task.
Eliminates unneeded indirection for scheduling compression tasks.


28-28: Importing new input configs.
Brings S3 and FS config classes into the scheduler, reflecting the broader additions for multi-source support.


34-34: Use of result library.
Returning Ok or Err clarifies success vs. failure states in scheduling logic.


125-153: S3 input processing method.
Using typed Result objects for error propagation is a clean approach. Log relevant failures for effective troubleshooting.


180-197: Flexible scheduling for FS vs. S3.
Splitting out input processing ensures each type is properly handled. The approach fosters clarity and maintainability.


198-208: Defensive checks for unknown input.
Explicitly marking unrecognised input types as a job failure prevents silent errors.

Comment on lines +84 to +88
def generate_s3_virtual_hosted_style_url(
region_code: str, bucket_name: str, object_key: str
) -> str:
return f"https://{bucket_name}.s3.{region_code}.{AWS_ENDPOINT}/{object_key}"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation and URL encoding

The URL generation function should validate inputs and properly encode the object key.

def generate_s3_virtual_hosted_style_url(
    region_code: str, bucket_name: str, object_key: str
) -> str:
+    from urllib.parse import quote
+    
+    if not region_code:
+        raise ValueError("Region code is required")
+    if not bucket_name:
+        raise ValueError("Bucket name is required")
+    if not object_key:
+        raise ValueError("Object key is required")
+    
-    return f"https://{bucket_name}.s3.{region_code}.{AWS_ENDPOINT}/{object_key}"
+    encoded_key = quote(object_key, safe="/~")
+    return f"https://{bucket_name}.s3.{region_code}.{AWS_ENDPOINT}/{encoded_key}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def generate_s3_virtual_hosted_style_url(
region_code: str, bucket_name: str, object_key: str
) -> str:
return f"https://{bucket_name}.s3.{region_code}.{AWS_ENDPOINT}/{object_key}"
def generate_s3_virtual_hosted_style_url(
region_code: str, bucket_name: str, object_key: str
) -> str:
from urllib.parse import quote
if not region_code:
raise ValueError("Region code is required")
if not bucket_name:
raise ValueError("Bucket name is required")
if not object_key:
raise ValueError("Object key is required")
encoded_key = quote(object_key, safe="/~")
return f"https://{bucket_name}.s3.{region_code}.{AWS_ENDPOINT}/{encoded_key}"

Comment on lines +18 to +45
def parse_aws_credentials_file(credentials_file_path: Path) -> Tuple[str, str]:
"""
Parses the `aws_access_key_id` and `aws_secret_access_key` from the given credentials_file_path.
:param credentials_file_path: path to the file containing aws credentials.
:return: A tuple of (aws_access_key_id, aws_secret_access_key)
:raise: ValueError if the file doesn't exist, or doesn't contain the aws credentials.
"""

aws_access_key_id = None
aws_secret_access_key = None

if not credentials_file_path.exists():
raise ValueError(f"File {credentials_file_path} doesn't exist.")

with open(credentials_file_path, "r") as f:
for line in f:
line = line.strip()
if line.startswith("aws_access_key_id"):
aws_access_key_id = line.split("=", 1)[1].strip()
elif line.startswith("aws_secret_access_key"):
aws_secret_access_key = line.split("=", 1)[1].strip()

if aws_access_key_id is None or aws_secret_access_key is None:
raise ValueError(
"The credentials file must contain both 'aws_access_key_id' and 'aws_secret_access_key'."
)

return aws_access_key_id, aws_secret_access_key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance AWS credentials file parsing

The current implementation has several limitations:

  1. No support for AWS credential file profiles
  2. Simple string splitting might break with complex values
  3. No handling of commented lines or whitespace in values

Consider using the configparser module for more robust parsing:

def parse_aws_credentials_file(credentials_file_path: Path) -> Tuple[str, str]:
+    import configparser
     if not credentials_file_path.exists():
         raise ValueError(f"File {credentials_file_path} doesn't exist.")
 
-    aws_access_key_id = None
-    aws_secret_access_key = None
-    with open(credentials_file_path, "r") as f:
-        for line in f:
-            line = line.strip()
-            if line.startswith("aws_access_key_id"):
-                aws_access_key_id = line.split("=", 1)[1].strip()
-            elif line.startswith("aws_secret_access_key"):
-                aws_secret_access_key = line.split("=", 1)[1].strip()
+    config = configparser.ConfigParser()
+    config.read(credentials_file_path)
+    
+    profile = "default"
+    if profile not in config:
+        raise ValueError(f"Profile '{profile}' not found in credentials file")
+    
+    try:
+        aws_access_key_id = config[profile]["aws_access_key_id"]
+        aws_secret_access_key = config[profile]["aws_secret_access_key"]
+    except KeyError as e:
+        raise ValueError(f"Missing required credential: {e}")
 
-    if aws_access_key_id is None or aws_secret_access_key is None:
-        raise ValueError(
-            "The credentials file must contain both 'aws_access_key_id' and 'aws_secret_access_key'."
-        )
     return aws_access_key_id, aws_secret_access_key
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def parse_aws_credentials_file(credentials_file_path: Path) -> Tuple[str, str]:
"""
Parses the `aws_access_key_id` and `aws_secret_access_key` from the given credentials_file_path.
:param credentials_file_path: path to the file containing aws credentials.
:return: A tuple of (aws_access_key_id, aws_secret_access_key)
:raise: ValueError if the file doesn't exist, or doesn't contain the aws credentials.
"""
aws_access_key_id = None
aws_secret_access_key = None
if not credentials_file_path.exists():
raise ValueError(f"File {credentials_file_path} doesn't exist.")
with open(credentials_file_path, "r") as f:
for line in f:
line = line.strip()
if line.startswith("aws_access_key_id"):
aws_access_key_id = line.split("=", 1)[1].strip()
elif line.startswith("aws_secret_access_key"):
aws_secret_access_key = line.split("=", 1)[1].strip()
if aws_access_key_id is None or aws_secret_access_key is None:
raise ValueError(
"The credentials file must contain both 'aws_access_key_id' and 'aws_secret_access_key'."
)
return aws_access_key_id, aws_secret_access_key
def parse_aws_credentials_file(credentials_file_path: Path) -> Tuple[str, str]:
"""
Parses the `aws_access_key_id` and `aws_secret_access_key` from the given credentials_file_path.
:param credentials_file_path: path to the file containing aws credentials.
:return: A tuple of (aws_access_key_id, aws_secret_access_key)
:raise: ValueError if the file doesn't exist, or doesn't contain the aws credentials.
"""
import configparser
if not credentials_file_path.exists():
raise ValueError(f"File {credentials_file_path} doesn't exist.")
config = configparser.ConfigParser()
config.read(credentials_file_path)
profile = "default"
if profile not in config:
raise ValueError(f"Profile '{profile}' not found in credentials file")
try:
aws_access_key_id = config[profile]["aws_access_key_id"]
aws_secret_access_key = config[profile]["aws_secret_access_key"]
except KeyError as e:
raise ValueError(f"Missing required credential: {e}")
return aws_access_key_id, aws_secret_access_key

Comment on lines +90 to +133
def get_s3_object_metadata(s3_input_config: S3InputConfig) -> Result[List[FileMetadata], str]:
"""
Gets the metadata of objects under the <bucket>/<key_prefix> specified by s3_input_config.
Note: We reuse FileMetadata class to store the metadata of S3 objects. The object_key is stored
as path in FileMetadata.

:param s3_input_config: S3 configuration specifying the bucket, key_prefix and credentials.
:return: Result.OK(List[FileMetadata]) containing the object metadata on success,
otherwise Result.Err(str) with the error message.
"""

file_metadata_list: List[FileMetadata] = list()

s3_client = boto3.client(
"s3",
region_name=s3_input_config.region_code,
aws_access_key_id=s3_input_config.aws_access_key_id,
aws_secret_access_key=s3_input_config.aws_secret_access_key,
)

try:
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=s3_input_config.bucket, Prefix=s3_input_config.key_prefix)

for page in pages:
contents = page.get("Contents", None)
if contents is None:
continue

for obj in contents:
object_key = obj["Key"]
if object_key.endswith("/"):
# Skip any object that resolves to a directory like path
continue

file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"]))

except ClientError as e:
error_code = e.response["Error"]["Code"]
error_message = e.response["Error"]["Message"]
return Err(f"ClientError: {error_code} - {error_message}")
except Exception as e:
return Err(f"An unexpected error occurred: {e}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add retry configuration for S3 client

The S3 client should be configured with retry settings to handle transient failures.

def get_s3_object_metadata(s3_input_config: S3InputConfig) -> Result[List[FileMetadata], str]:
+    config = Config(
+        retries=dict(
+            total_max_attempts=3,
+            mode="adaptive",
+            max_attempts=3
+        )
+    )
     
     s3_client = boto3.client(
         "s3",
         region_name=s3_input_config.region_code,
         aws_access_key_id=s3_input_config.aws_access_key_id,
         aws_secret_access_key=s3_input_config.aws_secret_access_key,
+        config=config
     )

Also, consider adding rate limiting to handle large buckets:

     try:
         paginator = s3_client.get_paginator("list_objects_v2")
-        pages = paginator.paginate(Bucket=s3_input_config.bucket, Prefix=s3_input_config.key_prefix)
+        pages = paginator.paginate(
+            Bucket=s3_input_config.bucket,
+            Prefix=s3_input_config.key_prefix,
+            PaginationConfig={'PageSize': 1000}  # Control page size
+        )

Committable suggestion skipped: line range outside the PR's diff.

_add_common_arguments(s3_compressor_parser, default_config_file_path)
s3_compressor_parser.add_argument("url", metavar="URL", help="URL of object to be compressed")
s3_compressor_parser.add_argument(
"--aws-access-key-id", type=str, default=None, help="AWS access key id."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"--aws-access-key-id", type=str, default=None, help="AWS access key id."
"--aws-access-key-id", type=str, default=None, help="AWS access key ID."

Comment on lines +87 to +88
Iterate through all files in fs_input_conf and adds their metadata to the
paths_to_compress_buffer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Iterate through all files in fs_input_conf and adds their metadata to the
paths_to_compress_buffer.
Iterates through all files in fs_input_conf and adds their metadata to
`paths_to_compress_buffer`.

"""
Iterate through all files in fs_input_conf and adds their metadata to the
paths_to_compress_buffer.
Note: this method skips any files that do not exist.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Note: this method skips any files that do not exist.
NOTE: This method skips files that don't exist.

Note: this method skips any files that do not exist.
:param fs_input_conf: FS configuration specifying the files to compress.
:param paths_to_compress_buffer: PathsToCompressBuffer containing the scheduling information
:return: None.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:return: None.

"""

res = get_s3_object_metadata(s3_input_config)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change

):
) -> Tuple[List[str], Optional[Dict[str, str]]]:
"""
Generates the command and environment for a clp_s compression job
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Generates the command and environment for a clp_s compression job
Generates the command and environment variables for a clp_s compression job.

:param archive_output_dir:
:param clp_config:
:param db_config_file_path:
:param enable_s3_write: Whether to write output to S3 storage.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hindsight, this should probably have been called use_single_file_archive. What do you think?

:param clp_config:
:param db_config_file_path:
:param enable_s3_write: Whether to write output to S3 storage.
:return: Tuple of (compression_command, compression_env)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:return: Tuple of (compression_command, compression_env)
:return: Tuple of (compression_command, compression_env_vars)

@@ -127,14 +186,25 @@ def make_clp_s_command(
]
# fmt: on

if InputType.S3 == clp_config.input.type:
compression_env = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about compression_env_vars?

@@ -176,7 +246,7 @@ def run_clp(
yaml.safe_dump(clp_metadata_db_connection_config, db_config_file)
db_config_file.close()

# Get s3 config
# Get S3 config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring of this function should probably be updated since the source is no longer only FS.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants