-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(clp-package): Add support for clp-s s3 ingestion #651
base: main
Are you sure you want to change the base?
Conversation
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
…ss/fs_compression_task.py
Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com>
WalkthroughThe pull request introduces comprehensive enhancements to the CLP (Compressed Log Processing) package, focusing on expanding support for both filesystem and Amazon S3 input types across multiple components. The changes involve refactoring compression-related scripts, adding new utility functions for S3 URL parsing and credential handling, and updating job configuration classes to support diverse input sources. The modifications improve input validation, error handling, and flexibility in processing compression tasks across different storage environments. Changes
Sequence DiagramsequenceDiagram
participant Scheduler
participant Executor
participant S3/FileSystem
participant CompressionTask
Scheduler->>Scheduler: Determine Input Type
alt Filesystem Input
Scheduler->>Executor: Schedule FS Compression
else S3 Input
Scheduler->>S3/FileSystem: Retrieve Object Metadata
S3/FileSystem-->>Scheduler: Return Metadata
Scheduler->>Executor: Schedule S3 Compression
end
Executor->>CompressionTask: Generate Target List
CompressionTask->>CompressionTask: Validate Inputs
CompressionTask->>CompressionTask: Prepare Compression Command
CompressionTask->>S3/FileSystem: Execute Compression
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🔭 Outside diff range comments (2)
components/clp-package-utils/clp_package_utils/scripts/native/compress.py (1)
Line range hint
223-269
: Return code best practices.
Inmain
, returning a negative value (like-1
) for errors is valid. For improved clarity, consider symbolic constants or enumerations (e.g., EXIT_FAILURE) to document the meaning.components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)
Line range hint
1-36
: Fix import block formatting (I001).
According to the pipeline failure, the import block here is not sorted or formatted as expected. Implement a standard import-sorting tool (e.g., isort) to comply with the style guidelines.
🧹 Nitpick comments (19)
tools/docker-images/clp-execution-base-ubuntu-focal/setup-scripts/install-prebuilt-packages.sh (1)
11-11
: Excellent addition of ca-certificates package!The addition of ca-certificates is essential for secure HTTPS connections to AWS S3 endpoints, ensuring proper SSL/TLS certificate validation during S3 ingestion operations.
tools/docker-images/clp-execution-base-ubuntu-jammy/setup-scripts/install-prebuilt-packages.sh (1)
11-11
: Consistent with Focal image changes!The parallel addition of ca-certificates maintains feature parity between Ubuntu versions, which is crucial for consistent S3 ingestion support across different environments.
Consider creating a shared package list file that both scripts can source to reduce duplication and ensure future package changes remain synchronized.
components/clp-py-utils/clp_py_utils/s3_utils.py (2)
14-15
: Consider supporting alternative endpoints.
Currently,AWS_ENDPOINT
is hard-coded to"amazonaws.com"
. For completeness and future adaptability, consider allowing other endpoint formats (e.g., for private cloud or non-public AWS-compatible services).
48-82
: Validate or normalise the region code.
Inparse_s3_url
, consider normalising or validatingregion_code
for uppercase vs. lowercase inconsistencies. This can curb potential mismatches in subsequent S3 calls.components/clp-package-utils/clp_package_utils/scripts/compress.py (3)
28-46
: Use explicit flush or context manager for file writing.
While Python's file objects flush on close, consider adding a context manager or explicit flush calls to ensure the written targets list is immediately saved, especially if subsequent operations rely on it.
121-133
: Validate overlapping paths.
_validate_fs_input_args
disallows specifying bothpaths
andpath_list
. As an extra safeguard, consider checking for potential duplicates or overlapping paths across multiple lines in the file-based listing.
134-159
: Strengthen messaging on credential conflicts.
When users pass both a credentials file and explicit credentials, the error message is appropriate. One could further clarify which method is recommended or display how these conflicts arose.components/clp-package-utils/clp_package_utils/scripts/native/compress.py (2)
130-157
: Optionally sanitise or normalise object keys.
_generate_clp_io_config
sets S3 path details from user input. Depending on your environment, consider normalising or validatingkey_prefix
to avoid unexpected directory traversals or unsafe characters.
159-193
: Validate empty lines.
_get_targets_to_compress
skips empty lines, which is correct. Recommend logging or collecting line numbers of these empty entries if repeated blank lines could indicate user mistakes.components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)
126-153
: Profile large S3 directory listings.
_process_s3_input
may encounter very large key sets. Evaluate memory usage or time overhead with extremely large S3 prefixes, and consider streaming or chunked processing if required.
180-208
: Log scheduling outcomes.
When a job fails due to an invalid input type or an empty S3 prefix, info-level logs might help operators quickly identify the cause, especially in multi-tenant or large-scale scenarios.components/job-orchestration/job_orchestration/scheduler/job_config.py (2)
10-12
: Add docstrings for the InputType enum.
Documenting the usage context (e.g., “FS for local file system, S3 for Amazon S3-based data”) within the enum can enhance clarity and maintainability for future contributors.
22-28
: Consider optional path normalisation.
WithinFsInputConfig
, consider normalising or expanding user-supplied paths to absolute forms. This pre-processing step can prevent confusion or ambiguous references.components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (6)
7-7
: Ensure consistent import usage.
You have addedDict
,List
,Optional
, andTuple
in addition toAny
. If you are not usingAny
anywhere below, consider removing it to keep imports minimal.
125-125
: Document the new tuple return type.
The updated return typeTuple[List[str], Optional[Dict[str, str]]]
is now more precise. Provide docstring-level details about the returned environment dict to help maintainers understand how to use or modify it.
167-167
: Document environment usage for clp_s.
The new function signature returning(List[str], Optional[Dict[str, str]])
is clear. However, consider highlighting the environment usage forclp_s
specifically, especially regarding AWS credentials or expansions in the future.
281-290
: Validate input type coverage.
You have added logic to generate either a filesystem targets file or an S3 targets file. This branching is thorough, but ensure that future input types or edge cases (e.g., emptypaths_to_compress
for S3) are handled gracefully.
303-305
: Consider controlling concurrency or piping.
subprocess.Popen(..., stdout=subprocess.PIPE, ...)
might be fine for smaller tasks, but large-scale compression tasks can produce large STDOUT logs. Ensure you have a strategy to handle this output efficiently to avoid potential memory issues.
373-374
: Clean up after partial failures.
Unlinking the targets list path is done only if compression is successful. Consider whether you want to remove it in cases of failure or partial success to prevent stale files from accumulating.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
components/clp-package-utils/clp_package_utils/scripts/compress.py
(4 hunks)components/clp-package-utils/clp_package_utils/scripts/native/compress.py
(4 hunks)components/clp-py-utils/clp_py_utils/s3_utils.py
(1 hunks)components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py
(9 hunks)components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
(3 hunks)components/job-orchestration/job_orchestration/scheduler/job_config.py
(3 hunks)tools/docker-images/clp-execution-base-ubuntu-focal/setup-scripts/install-prebuilt-packages.sh
(1 hunks)tools/docker-images/clp-execution-base-ubuntu-jammy/setup-scripts/install-prebuilt-packages.sh
(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: clp-lint
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
[error] 1-36: Import block is un-sorted or un-formatted (I001)
🔇 Additional comments (12)
components/clp-py-utils/clp_py_utils/s3_utils.py (2)
18-46
: Ensure credentials file security.
parse_aws_credentials_file
correctly retrieves credentials but relies on a plain-text file. Urge verifying that file permissions are restricted to authorised users to reduce the risk of exposing credentials.
90-131
: Paginate carefully to avoid partial listings.
get_s3_object_metadata
uses the paginator, which is good. Be sure to handle edge cases where large pagination sets might fail or get truncated by S3. Consider tests covering multi-page listing scenarios.components/clp-package-utils/clp_package_utils/scripts/compress.py (1)
55-98
: Expose operation details in logs.
_generate_compress_cmd
elegantly assembles the compression command. If debugging is needed, consider logging the resultant command arguments (excluding sensitive credentials) to assist with troubleshooting.components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)
83-125
: Check for symbolic links or special filesystem objects.
_process_fs_input_paths
currently captures directories and files. Consider also handling symbolic links or special files in a safe manner (e.g., ignoring broken links).components/job-orchestration/job_orchestration/scheduler/job_config.py (1)
50-50
: Praise for consolidated config approach.
ClpIoConfig
unifies FS and S3 input logic under a single property. This design fosters cleaner upstream code. Nicely done!components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py (7)
23-23
: Validate usage of newly imported s3 utilities.
Ensure that the usage ofgenerate_s3_virtual_hosted_style_url
ands3_put
is covered by tests, particularly around error handling and S3 region/bucket configurations.
27-32
: Streamline import structure.
The block of imports forClpIoConfig
,InputType
,PathsToCompress
, andS3InputConfig
is consistent and clear. Ensure that each imported name is actually used within this file to avoid redundant imports.
85-101
: Add tests for empty directory compression logic.
The new_generate_fs_targets_file
function now handlesempty_directories
. This is a notable scenario that might be easily overlooked. Ensure you have test coverage confirming that correct paths for empty directories are added to the targets list.
158-158
: No environment is returned for FS inputs.
ReturningNone
for FS-based compression is correct. If additional environment variables become necessary (for local file workflows), you might easily extend this pattern without changing the function signature.
263-263
: Usage check for make_clp_command_and_env.
You correctly invoke the new function for theCLP
storage engine. Ensure you have adequate integration tests verifying that the returned command is valid and the environment isNone
or used properly.
270-270
: Usage check for make_clp_s_command_and_env.
Similarly, confirm in your tests thatenable_s3_write
flows properly into the environment and command generation for theCLP_S
engine.
293-294
: Clear separation of "files-from" argument.
Appending the--files-from
argument here makes the code straightforward to read, ensuring the target file is always appended at the end of the command. Nicely done.
class S3InputConfig(BaseModel): | ||
type: typing.Literal[InputType.S3.value] = InputType.S3.value | ||
timestamp_key: typing.Optional[str] = None | ||
|
||
region_code: str | ||
bucket: str | ||
key_prefix: str | ||
|
||
aws_access_key_id: typing.Optional[str] = None | ||
aws_secret_access_key: typing.Optional[str] = None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle missing region code more gracefully.
S3InputConfig
currently expects region_code
. Some private S3-like services do not require a formal region. You might consider making region_code
optional or providing a default.
def _generate_s3_targets_file( | ||
output_file_path: pathlib.Path, | ||
paths_to_compress: PathsToCompress, | ||
s3_input_config: S3InputConfig, | ||
) -> None: | ||
# S3 object keys are stored as file_paths in `PathsToCompress` | ||
object_keys = paths_to_compress.file_paths | ||
with open(output_file_path, "w") as file: | ||
if len(object_keys) > 0: | ||
for object_key in object_keys: | ||
s3_virtual_hosted_style_url = generate_s3_virtual_hosted_style_url( | ||
s3_input_config.region_code, s3_input_config.bucket, object_key | ||
) | ||
file.write(s3_virtual_hosted_style_url) | ||
file.write("\n") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider robust error handling for invalid S3 object key generation.
The _generate_s3_targets_file
function writes a list of S3 object URLs to a file. If any failure occurs (e.g., invalid region code or bucket), your code logs no error here. Consider adding checks or exception handling, or ensure such errors are surfaced elsewhere in the pipeline.
if InputType.S3 == clp_config.input.type: | ||
compression_env = { | ||
**os.environ, | ||
"AWS_ACCESS_KEY_ID": clp_config.input.aws_access_key_id, | ||
"AWS_SECRET_ACCESS_KEY": clp_config.input.aws_secret_access_key, | ||
} | ||
compression_cmd.append("--auth") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Protect AWS credentials in logs and error tracing.
Storing credentials in an environment dict is convenient, but there is a potential risk of them being accidentally logged or exposed. Validate that these credentials never get printed in logs or error messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (6)
components/clp-py-utils/clp_py_utils/s3_utils.py (2)
14-16
: Consider making AWS_ENDPOINT configurableThe hardcoded AWS_ENDPOINT constant might limit flexibility when working with different AWS environments or regions. Consider making this configurable through environment variables or configuration files.
# Constants -AWS_ENDPOINT = "amazonaws.com" +AWS_ENDPOINT = os.getenv("AWS_ENDPOINT", "amazonaws.com")
1-134
: Consider implementing S3 operations as a classThe current implementation uses standalone functions. Consider refactoring into a class to:
- Share S3 client configuration
- Reduce duplicate code
- Improve testability
- Enable dependency injection
Example structure:
class S3Utils: def __init__(self, region_code: str, credentials: Optional[Dict[str, str]] = None): self.client = self._create_client(region_code, credentials) def _create_client(self, region_code: str, credentials: Optional[Dict[str, str]]) -> boto3.client: # Shared client configuration pass def get_object_metadata(self, bucket: str, prefix: str) -> Result[List[FileMetadata], str]: # Use self.client pass def put_object(self, bucket: str, key: str, file_path: Path) -> Result[bool, str]: # Use self.client passcomponents/job-orchestration/job_orchestration/executor/compress/compression_task.py (2)
103-119
: S3 target file generator usage.
Consider capturing or logging errors ifgenerate_s3_virtual_hosted_style_url
fails due to invalid bucket names or lack of region information.
189-199
: Storing AWS secrets in environment variables.
While common in containers, be cautious to avoid logging these variables. Evaluate secure vault or IAM role usage if feasible.components/clp-package-utils/clp_package_utils/scripts/compress.py (1)
28-53
: Generating a targets list for FS or S3.
This function elegantly separates path handling by input type. Consider logging a warning if a path is unreadable or if the specified S3 URL is invalid.components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)
83-124
: FS path processing logic.
Recursively enumerating directories is handy. Consider large file counts for scaling or distribution.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
components/clp-package-utils/clp_package_utils/scripts/compress.py
(4 hunks)components/clp-py-utils/clp_py_utils/s3_utils.py
(1 hunks)components/job-orchestration/job_orchestration/executor/compress/celeryconfig.py
(1 hunks)components/job-orchestration/job_orchestration/executor/compress/compression_task.py
(10 hunks)components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
(3 hunks)
🔇 Additional comments (42)
components/clp-py-utils/clp_py_utils/s3_utils.py (1)
48-82
: 🛠️ Refactor suggestionAdd comprehensive S3 URL validation
The current implementation could benefit from additional validation:
- Bucket name constraints (length: 3-63 characters, allowed characters)
- URL-encoded characters in key prefix
- Region code validation
Consider adding these validations:
def parse_s3_url(s3_url: str) -> Tuple[str, str, str]: + from urllib.parse import unquote + + def validate_bucket_name(name: str) -> None: + if not (3 <= len(name) <= 63): + raise ValueError(f"Bucket name length must be between 3 and 63 characters: {name}") + if not re.match(r"^[a-z0-9][a-z0-9.-]*[a-z0-9]$", name): + raise ValueError(f"Invalid bucket name format: {name}") + # ... existing code ... bucket_name = match.group("bucket_name") + validate_bucket_name(bucket_name) # ... existing code ... - key_prefix = match.group("key_prefix") + key_prefix = unquote(match.group("key_prefix")) return region_code, bucket_name, key_prefixLet's verify the bucket naming rules:
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (14)
7-7
: Well-done on extending type hints.
Adding these type hints can improve maintainability and facilitate static analysis.
23-23
: Importing S3 utilities is appropriate.
Ensure consistent handling of S3 exceptions within the workflow to avoid unintended failures.
27-32
: Good re-organization of imports.
ImportingS3InputConfig
andInputType
helps maintain clarity in the code structure.
85-102
: FS target file generation looks correct.
It might be prudent to verify the accessibility of each path before writing, especially if the code is used in multi-tenant environments or with untrusted inputs.
158-158
: Returning no environment variables for FS mode is sensible.
This ensures the default environment does not leak unneeded variables.
161-168
: Enhanced method signature.
This refactor is readable and returns both command and environment, aligning well with the S3 requirement.
249-249
: S3 config retrieval is clearly indicated.
This inline documentation is helpful for future maintainers.
263-263
: Callingmake_clp_command_and_env
.
This integrates FS-based compression efficiently without environment variables.
270-270
: Callingmake_clp_s_command_and_env
.
Seamlessly enables S3-based compression, consistent with the new approach.
281-291
: Correct branching for FS vs. S3 input.
The logic here is straightforward and ensures each input type is handled properly.
293-294
: Appending--files-from
argument.
This is a standard approach that streamlines input specification for the compression tool.
303-305
: Potential subprocess injection risk.
If user inputs are not validated beforehand, injection attacks could occur. Verify or sanitise user-provided strings.
313-313
: Clear comment for combined FS/S3 logic.
The inline explanation clarifies subsequent S3 upload steps.
373-374
: Cleaning up temporary file.
Unlinking the targets file prevents clutter and potential resource leaks.components/job-orchestration/job_orchestration/executor/compress/celeryconfig.py (2)
9-9
: Updated import path is consistent with refactoring.
This ensures that Celery can discover the correct compression task module.
15-15
: Task route updated correctly.
The updated route aligns with the new module name and supports Celery’s queue configuration.components/clp-package-utils/clp_package_utils/scripts/compress.py (18)
7-8
: Import and blank line addition.
UsingList
fromtyping
is appropriate, and the blank line adds readability.
9-9
: New import for S3 utils and config.
This helps unify local and S3 compression logic under one script.
55-76
: Command generator for compression.
Excellent approach to unify parameters across FS and S3. Be mindful of capturing secrets in logs, especially ifaws_access_key_id
oraws_secret_access_key
might appear in command line logs.
77-93
: Handling S3 credential logic.
The fallback to the credentials file is prudent. Ensure that the fallback is tested in real environments.
94-98
: Appending--target-list
argument.
This keeps usage consistent with other compression scripts, simplifying user instructions.
99-99
: Blank line.
No functional changes.
100-102
: Function returns the final command list.
Straightforward approach. The list-based command structure helps avoid shell-specific pitfalls.
116-118
: Added argument to disable progress reporting.
This is helpful for quiet mode in automated pipelines.
121-124
: Validation helper for FS input.
Assuring we do not have contradictory or missing file paths prevents confusion at run time.
125-133
: Exclusive path specification.
Stops the user from mixing a file list with direct paths, avoiding duplication or conflict.
134-159
: Validation helper for S3 input.
Enforces that S3 is used with the correct storage engine and credential specification method. This prevents misconfiguration.
161-164
: Main entrypoint creation.
Assigning a default config file path fosters a consistent user experience.
165-167
: Argument parser creation.
Subparsers for FS and S3: good separation of concerns.
168-174
: FS compressor parser.
Allows the user to specify paths or a path list, aligning with_validate_fs_input_args
.
175-186
: S3 compressor parser.
Correctly includes credentials arguments, consistent with_validate_s3_input_args
.
202-209
: Input type-based branching.
Raising aValueError
for unsupported input types ensures consistency.
219-230
: Creation of target list file.
Your approach of generating a random filename with UUID is a robust way to avoid collisions.
232-237
: Container execution command.
Building the container start command and compression command in tandem is a clean pattern.components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (7)
23-23
: S3 object metadata import.
Centralizing S3 lookups fosters consistency between scheduler and executor layers.
25-25
: Direct import ofcompress
task.
Eliminates unneeded indirection for scheduling compression tasks.
28-28
: Importing new input configs.
Brings S3 and FS config classes into the scheduler, reflecting the broader additions for multi-source support.
34-34
: Use ofresult
library.
ReturningOk
orErr
clarifies success vs. failure states in scheduling logic.
125-153
: S3 input processing method.
Using typedResult
objects for error propagation is a clean approach. Log relevant failures for effective troubleshooting.
180-197
: Flexible scheduling for FS vs. S3.
Splitting out input processing ensures each type is properly handled. The approach fosters clarity and maintainability.
198-208
: Defensive checks for unknown input.
Explicitly marking unrecognised input types as a job failure prevents silent errors.
def generate_s3_virtual_hosted_style_url( | ||
region_code: str, bucket_name: str, object_key: str | ||
) -> str: | ||
return f"https://{bucket_name}.s3.{region_code}.{AWS_ENDPOINT}/{object_key}" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add input validation and URL encoding
The URL generation function should validate inputs and properly encode the object key.
def generate_s3_virtual_hosted_style_url(
region_code: str, bucket_name: str, object_key: str
) -> str:
+ from urllib.parse import quote
+
+ if not region_code:
+ raise ValueError("Region code is required")
+ if not bucket_name:
+ raise ValueError("Bucket name is required")
+ if not object_key:
+ raise ValueError("Object key is required")
+
- return f"https://{bucket_name}.s3.{region_code}.{AWS_ENDPOINT}/{object_key}"
+ encoded_key = quote(object_key, safe="/~")
+ return f"https://{bucket_name}.s3.{region_code}.{AWS_ENDPOINT}/{encoded_key}"
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def generate_s3_virtual_hosted_style_url( | |
region_code: str, bucket_name: str, object_key: str | |
) -> str: | |
return f"https://{bucket_name}.s3.{region_code}.{AWS_ENDPOINT}/{object_key}" | |
def generate_s3_virtual_hosted_style_url( | |
region_code: str, bucket_name: str, object_key: str | |
) -> str: | |
from urllib.parse import quote | |
if not region_code: | |
raise ValueError("Region code is required") | |
if not bucket_name: | |
raise ValueError("Bucket name is required") | |
if not object_key: | |
raise ValueError("Object key is required") | |
encoded_key = quote(object_key, safe="/~") | |
return f"https://{bucket_name}.s3.{region_code}.{AWS_ENDPOINT}/{encoded_key}" |
def parse_aws_credentials_file(credentials_file_path: Path) -> Tuple[str, str]: | ||
""" | ||
Parses the `aws_access_key_id` and `aws_secret_access_key` from the given credentials_file_path. | ||
:param credentials_file_path: path to the file containing aws credentials. | ||
:return: A tuple of (aws_access_key_id, aws_secret_access_key) | ||
:raise: ValueError if the file doesn't exist, or doesn't contain the aws credentials. | ||
""" | ||
|
||
aws_access_key_id = None | ||
aws_secret_access_key = None | ||
|
||
if not credentials_file_path.exists(): | ||
raise ValueError(f"File {credentials_file_path} doesn't exist.") | ||
|
||
with open(credentials_file_path, "r") as f: | ||
for line in f: | ||
line = line.strip() | ||
if line.startswith("aws_access_key_id"): | ||
aws_access_key_id = line.split("=", 1)[1].strip() | ||
elif line.startswith("aws_secret_access_key"): | ||
aws_secret_access_key = line.split("=", 1)[1].strip() | ||
|
||
if aws_access_key_id is None or aws_secret_access_key is None: | ||
raise ValueError( | ||
"The credentials file must contain both 'aws_access_key_id' and 'aws_secret_access_key'." | ||
) | ||
|
||
return aws_access_key_id, aws_secret_access_key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance AWS credentials file parsing
The current implementation has several limitations:
- No support for AWS credential file profiles
- Simple string splitting might break with complex values
- No handling of commented lines or whitespace in values
Consider using the configparser
module for more robust parsing:
def parse_aws_credentials_file(credentials_file_path: Path) -> Tuple[str, str]:
+ import configparser
if not credentials_file_path.exists():
raise ValueError(f"File {credentials_file_path} doesn't exist.")
- aws_access_key_id = None
- aws_secret_access_key = None
- with open(credentials_file_path, "r") as f:
- for line in f:
- line = line.strip()
- if line.startswith("aws_access_key_id"):
- aws_access_key_id = line.split("=", 1)[1].strip()
- elif line.startswith("aws_secret_access_key"):
- aws_secret_access_key = line.split("=", 1)[1].strip()
+ config = configparser.ConfigParser()
+ config.read(credentials_file_path)
+
+ profile = "default"
+ if profile not in config:
+ raise ValueError(f"Profile '{profile}' not found in credentials file")
+
+ try:
+ aws_access_key_id = config[profile]["aws_access_key_id"]
+ aws_secret_access_key = config[profile]["aws_secret_access_key"]
+ except KeyError as e:
+ raise ValueError(f"Missing required credential: {e}")
- if aws_access_key_id is None or aws_secret_access_key is None:
- raise ValueError(
- "The credentials file must contain both 'aws_access_key_id' and 'aws_secret_access_key'."
- )
return aws_access_key_id, aws_secret_access_key
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def parse_aws_credentials_file(credentials_file_path: Path) -> Tuple[str, str]: | |
""" | |
Parses the `aws_access_key_id` and `aws_secret_access_key` from the given credentials_file_path. | |
:param credentials_file_path: path to the file containing aws credentials. | |
:return: A tuple of (aws_access_key_id, aws_secret_access_key) | |
:raise: ValueError if the file doesn't exist, or doesn't contain the aws credentials. | |
""" | |
aws_access_key_id = None | |
aws_secret_access_key = None | |
if not credentials_file_path.exists(): | |
raise ValueError(f"File {credentials_file_path} doesn't exist.") | |
with open(credentials_file_path, "r") as f: | |
for line in f: | |
line = line.strip() | |
if line.startswith("aws_access_key_id"): | |
aws_access_key_id = line.split("=", 1)[1].strip() | |
elif line.startswith("aws_secret_access_key"): | |
aws_secret_access_key = line.split("=", 1)[1].strip() | |
if aws_access_key_id is None or aws_secret_access_key is None: | |
raise ValueError( | |
"The credentials file must contain both 'aws_access_key_id' and 'aws_secret_access_key'." | |
) | |
return aws_access_key_id, aws_secret_access_key | |
def parse_aws_credentials_file(credentials_file_path: Path) -> Tuple[str, str]: | |
""" | |
Parses the `aws_access_key_id` and `aws_secret_access_key` from the given credentials_file_path. | |
:param credentials_file_path: path to the file containing aws credentials. | |
:return: A tuple of (aws_access_key_id, aws_secret_access_key) | |
:raise: ValueError if the file doesn't exist, or doesn't contain the aws credentials. | |
""" | |
import configparser | |
if not credentials_file_path.exists(): | |
raise ValueError(f"File {credentials_file_path} doesn't exist.") | |
config = configparser.ConfigParser() | |
config.read(credentials_file_path) | |
profile = "default" | |
if profile not in config: | |
raise ValueError(f"Profile '{profile}' not found in credentials file") | |
try: | |
aws_access_key_id = config[profile]["aws_access_key_id"] | |
aws_secret_access_key = config[profile]["aws_secret_access_key"] | |
except KeyError as e: | |
raise ValueError(f"Missing required credential: {e}") | |
return aws_access_key_id, aws_secret_access_key |
def get_s3_object_metadata(s3_input_config: S3InputConfig) -> Result[List[FileMetadata], str]: | ||
""" | ||
Gets the metadata of objects under the <bucket>/<key_prefix> specified by s3_input_config. | ||
Note: We reuse FileMetadata class to store the metadata of S3 objects. The object_key is stored | ||
as path in FileMetadata. | ||
|
||
:param s3_input_config: S3 configuration specifying the bucket, key_prefix and credentials. | ||
:return: Result.OK(List[FileMetadata]) containing the object metadata on success, | ||
otherwise Result.Err(str) with the error message. | ||
""" | ||
|
||
file_metadata_list: List[FileMetadata] = list() | ||
|
||
s3_client = boto3.client( | ||
"s3", | ||
region_name=s3_input_config.region_code, | ||
aws_access_key_id=s3_input_config.aws_access_key_id, | ||
aws_secret_access_key=s3_input_config.aws_secret_access_key, | ||
) | ||
|
||
try: | ||
paginator = s3_client.get_paginator("list_objects_v2") | ||
pages = paginator.paginate(Bucket=s3_input_config.bucket, Prefix=s3_input_config.key_prefix) | ||
|
||
for page in pages: | ||
contents = page.get("Contents", None) | ||
if contents is None: | ||
continue | ||
|
||
for obj in contents: | ||
object_key = obj["Key"] | ||
if object_key.endswith("/"): | ||
# Skip any object that resolves to a directory like path | ||
continue | ||
|
||
file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"])) | ||
|
||
except ClientError as e: | ||
error_code = e.response["Error"]["Code"] | ||
error_message = e.response["Error"]["Message"] | ||
return Err(f"ClientError: {error_code} - {error_message}") | ||
except Exception as e: | ||
return Err(f"An unexpected error occurred: {e}") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add retry configuration for S3 client
The S3 client should be configured with retry settings to handle transient failures.
def get_s3_object_metadata(s3_input_config: S3InputConfig) -> Result[List[FileMetadata], str]:
+ config = Config(
+ retries=dict(
+ total_max_attempts=3,
+ mode="adaptive",
+ max_attempts=3
+ )
+ )
s3_client = boto3.client(
"s3",
region_name=s3_input_config.region_code,
aws_access_key_id=s3_input_config.aws_access_key_id,
aws_secret_access_key=s3_input_config.aws_secret_access_key,
+ config=config
)
Also, consider adding rate limiting to handle large buckets:
try:
paginator = s3_client.get_paginator("list_objects_v2")
- pages = paginator.paginate(Bucket=s3_input_config.bucket, Prefix=s3_input_config.key_prefix)
+ pages = paginator.paginate(
+ Bucket=s3_input_config.bucket,
+ Prefix=s3_input_config.key_prefix,
+ PaginationConfig={'PageSize': 1000} # Control page size
+ )
Committable suggestion skipped: line range outside the PR's diff.
_add_common_arguments(s3_compressor_parser, default_config_file_path) | ||
s3_compressor_parser.add_argument("url", metavar="URL", help="URL of object to be compressed") | ||
s3_compressor_parser.add_argument( | ||
"--aws-access-key-id", type=str, default=None, help="AWS access key id." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"--aws-access-key-id", type=str, default=None, help="AWS access key id." | |
"--aws-access-key-id", type=str, default=None, help="AWS access key ID." |
Iterate through all files in fs_input_conf and adds their metadata to the | ||
paths_to_compress_buffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iterate through all files in fs_input_conf and adds their metadata to the | |
paths_to_compress_buffer. | |
Iterates through all files in fs_input_conf and adds their metadata to | |
`paths_to_compress_buffer`. |
""" | ||
Iterate through all files in fs_input_conf and adds their metadata to the | ||
paths_to_compress_buffer. | ||
Note: this method skips any files that do not exist. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this method skips any files that do not exist. | |
NOTE: This method skips files that don't exist. |
Note: this method skips any files that do not exist. | ||
:param fs_input_conf: FS configuration specifying the files to compress. | ||
:param paths_to_compress_buffer: PathsToCompressBuffer containing the scheduling information | ||
:return: None. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:return: None. |
""" | ||
|
||
res = get_s3_object_metadata(s3_input_config) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
): | ||
) -> Tuple[List[str], Optional[Dict[str, str]]]: | ||
""" | ||
Generates the command and environment for a clp_s compression job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generates the command and environment for a clp_s compression job | |
Generates the command and environment variables for a clp_s compression job. |
:param archive_output_dir: | ||
:param clp_config: | ||
:param db_config_file_path: | ||
:param enable_s3_write: Whether to write output to S3 storage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In hindsight, this should probably have been called use_single_file_archive
. What do you think?
:param clp_config: | ||
:param db_config_file_path: | ||
:param enable_s3_write: Whether to write output to S3 storage. | ||
:return: Tuple of (compression_command, compression_env) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:return: Tuple of (compression_command, compression_env) | |
:return: Tuple of (compression_command, compression_env_vars) |
@@ -127,14 +186,25 @@ def make_clp_s_command( | |||
] | |||
# fmt: on | |||
|
|||
if InputType.S3 == clp_config.input.type: | |||
compression_env = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about compression_env_vars
?
@@ -176,7 +246,7 @@ def run_clp( | |||
yaml.safe_dump(clp_metadata_db_connection_config, db_config_file) | |||
db_config_file.close() | |||
|
|||
# Get s3 config | |||
# Get S3 config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring of this function should probably be updated since the source is no longer only FS.
Description
This PR adds s3 ingestion support for clp-s package. Currently, we support compressing with one s3_url(with prefix matching) at a time. User can either specify the credentials through commandline, or via a credentials file. If no credentials are provided, the package will attempt to use the associated IAM if there is any.
The PR contains the following changes:
The design decisions involved in this PR is documented in this doc
Due to time limitation, we leave the following items as todo for future:
If users provide invalid S3 information, such as incorrect credentials or a non-existent bucket, the issue is only detected when the package attempts to schedule the job. Additionally, if the provided S3 credentials lack sufficient privileges (e.g., list_object is allowed, but get_object is not), the package will return unhelpful error messages to the web UI ("see error log.txt blabla"), making it confusing for user.
Ideally, invalid S3 configurations should be detected at package startup to provide immediate and actionable feedback.
Validation performed
Manually verified that both fs compression and s3 compression works from the commandline.
Summary by CodeRabbit
Release Notes
New Features
Improvements
Infrastructure
Bug Fixes