Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-package): Add support for clp-s s3 ingestion #651

Open
wants to merge 67 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
ca46dca
First version backup
haiqi96 Dec 11, 2024
b763e8b
Small refactor
haiqi96 Dec 11, 2024
4e9529c
First trial for new config
haiqi96 Dec 11, 2024
e9cdea4
Further refactor and polishing
haiqi96 Dec 11, 2024
9ba0a38
Another small refactor
haiqi96 Dec 12, 2024
58befef
small refactor again
haiqi96 Dec 12, 2024
35ec0c3
Combine s3 utils
haiqi96 Dec 12, 2024
5d57b10
Support handling S3 error message
haiqi96 Dec 12, 2024
9991307
Slight logging modification
haiqi96 Dec 12, 2024
5d23790
Linter
haiqi96 Dec 12, 2024
b4bb2af
Add extra verification
haiqi96 Dec 12, 2024
f41c558
Update components/clp-py-utils/clp_py_utils/clp_config.py
haiqi96 Dec 12, 2024
ce5a667
do nothing for now
haiqi96 Dec 12, 2024
f05dc88
backup changes for worker config
haiqi96 Dec 12, 2024
abf5dde
More support
haiqi96 Dec 13, 2024
7d34456
Remove unnecssary change
haiqi96 Dec 13, 2024
a7afd0d
Linter
haiqi96 Dec 13, 2024
99d3094
Handle mount for fs & S3
haiqi96 Dec 13, 2024
1afed1a
Linter
haiqi96 Dec 13, 2024
1de661a
Remove unused functions
haiqi96 Dec 13, 2024
ce3de98
Update components/job-orchestration/job_orchestration/executor/compre…
haiqi96 Dec 13, 2024
f49664f
simplify worker config
haiqi96 Dec 13, 2024
046cdcb
polishing
haiqi96 Dec 13, 2024
242dec2
linter
haiqi96 Dec 14, 2024
ed280cb
Apply suggestions from code review
haiqi96 Dec 16, 2024
0788e59
Fix easier ones
haiqi96 Dec 16, 2024
c198f27
Backup changes
haiqi96 Dec 16, 2024
4819f76
Small fixes
haiqi96 Dec 16, 2024
e5f43fb
fixes
haiqi96 Dec 16, 2024
1246062
add safeguard for archive update failure
haiqi96 Dec 17, 2024
3b870a4
Add docstrings
haiqi96 Dec 17, 2024
214ae3f
Apply suggestions from code review
haiqi96 Dec 18, 2024
6ff92fc
Clean up
haiqi96 Dec 18, 2024
9e07d37
update pyproject.toml
haiqi96 Dec 18, 2024
915b49d
Add docstrings
haiqi96 Dec 18, 2024
a061a29
Apply suggestions from code review
haiqi96 Dec 18, 2024
8301748
Update name as suggested by the code review
haiqi96 Dec 18, 2024
2ada464
a few small fixes to ensure other scripts still work
haiqi96 Dec 18, 2024
6e5aad5
adding safeguard for empty stdout line from clp.
haiqi96 Dec 18, 2024
55c0f36
add safe guard for search
haiqi96 Dec 18, 2024
2d7443e
Polish error messages.
haiqi96 Dec 18, 2024
6f907b2
Linter
haiqi96 Dec 18, 2024
120ffec
Slighlty improve the error message
haiqi96 Dec 18, 2024
d5eae21
Back up
haiqi96 Dec 17, 2024
ce2b440
Backup
haiqi96 Dec 19, 2024
6d2b815
Merge branch 'main' into s3_scheduler
haiqi96 Dec 19, 2024
b8f715d
Update execution image dependency
haiqi96 Dec 19, 2024
57e1912
simplify the code a little bit
haiqi96 Dec 19, 2024
27b8612
fix a previous mistake
haiqi96 Dec 19, 2024
d55f1ad
Keep fixing previous mistake
haiqi96 Dec 19, 2024
4de4fee
add url parsing helper
haiqi96 Dec 19, 2024
4224bd6
Linter
haiqi96 Dec 20, 2024
1cf3d01
Some refactor
haiqi96 Dec 20, 2024
b1655cd
Refactor compress scripts
haiqi96 Dec 20, 2024
d12e173
Initial support for cmdline
haiqi96 Jan 2, 2025
6833ee9
Linter fixes
haiqi96 Jan 2, 2025
a4e92ae
add argument checks
haiqi96 Jan 2, 2025
a638f2d
Polishing
haiqi96 Jan 3, 2025
5685224
Add some docstrings
haiqi96 Jan 3, 2025
fd9dba2
fixes
haiqi96 Jan 3, 2025
f7a175c
Rename task script
haiqi96 Jan 3, 2025
20488a1
fixes
haiqi96 Jan 3, 2025
12d6b97
Some captilization and update to the docstrings
haiqi96 Jan 3, 2025
709dfb0
Merge branch 'main' into s3_scheduler
haiqi96 Jan 13, 2025
fa5ace1
Apply suggestions from code review
haiqi96 Jan 15, 2025
a4675fd
First batch of required updates
haiqi96 Jan 15, 2025
642eb81
Changes missing from the first batch
haiqi96 Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add argument checks
  • Loading branch information
haiqi96 committed Jan 2, 2025
commit a4e92ae0d67a284ecb95d763402bcce6b41bd91a
79 changes: 56 additions & 23 deletions components/clp-package-utils/clp_package_utils/scripts/compress.py
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import uuid
from typing import List

from clp_py_utils.clp_config import StorageEngine
from clp_py_utils.clp_config import CLPConfig, StorageEngine
from clp_py_utils.s3_utils import parse_aws_credentials_file
from job_orchestration.scheduler.job_config import InputType

from clp_package_utils.general import (
Expand Down Expand Up @@ -56,13 +57,17 @@ def append_input_specific_args(compress_cmd: List[str], parsed_args: argparse.Na
if InputType.FS == input_type:
return
elif InputType.S3 == input_type:
# TODO: also think about credentials from file.
if parsed_args.aws_access_key_id is not None:
aws_access_key_id = parsed_args.aws_access_key_id
aws_secret_access_key = parsed_args.aws_secret_access_key
if parsed_args.aws_credentials_file:
aws_access_key_id, aws_secret_access_key = parse_aws_credentials_file(
pathlib.Path(parsed_args.aws_credentials_file)
)
if aws_access_key_id and aws_secret_access_key:
compress_cmd.append("--aws-access-key-id")
compress_cmd.append(parsed_args.aws_access_key_id)
if parsed_args.aws_secret_access_key is not None:
compress_cmd.append(aws_access_key_id)
compress_cmd.append("--aws-secret-access-key")
compress_cmd.append(parsed_args.aws_secret_access_key)
compress_cmd.append(aws_secret_access_key)
else:
raise ValueError(f"Unsupported input type: {input_type}.")

Expand All @@ -88,6 +93,46 @@ def add_common_arguments(
)


def validate_fs_input_args(
parsed_args: argparse.Namespace,
args_parser: argparse.ArgumentParser,
) -> None:
# Validate some input paths were specified
if len(parsed_args.paths) == 0 and parsed_args.path_list is None:
args_parser.error("No paths specified.")

# Validate paths were specified using only one method
if len(parsed_args.paths) > 0 and parsed_args.path_list is not None:
args_parser.error("Paths cannot be specified on the command line AND through a file.")


def validate_s3_input_args(
parsed_args: argparse.Namespace, args_parser: argparse.ArgumentParser, clp_config: CLPConfig
) -> None:
if StorageEngine.CLP_S != clp_config.package.storage_engine:
raise ValueError(
f"input type {InputType.S3} is only supported with storage engine {StorageEngine.CLP_S}"
)

# Validate aws credentials were specified using only one method
aws_credential_file = parsed_args.aws_credentials_file
aws_access_key_id = parsed_args.aws_access_key_id
aws_secret_access_key = parsed_args.aws_secret_access_key
if aws_credential_file is not None:
if not pathlib.Path(aws_credential_file).exists():
raise ValueError(f"{aws_credential_file} doesn't exist.")

if aws_access_key_id is not None or aws_secret_access_key is not None:
args_parser.error(
"aws_credentials_file can not be specified together with aws_access_key_id or aws_secret_access_key."
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)

elif bool(aws_access_key_id) != bool(aws_secret_access_key):
args_parser.error(
"aws_access_key_id and aws_secret_access_key must be both set or left unset."
)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH
Expand All @@ -111,10 +156,9 @@ def main(argv):
s3_compressor_parser.add_argument(
"--aws-secret-access-key", type=str, default=None, help="AWS secret access key."
)
# args_parser.add_argument(
# "--aws-credentials-file", type=str, default=None, help="Access key id."
# )

s3_compressor_parser.add_argument(
"--aws-credentials-file", type=str, default=None, help="Path to AWS credentials file."
)
parsed_args = args_parser.parse_args(argv[1:])

# Validate and load config file
Expand All @@ -131,20 +175,9 @@ def main(argv):

input_type = parsed_args.input_type
if InputType.FS == input_type:
# Validate some input paths were specified
if len(parsed_args.paths) == 0 and parsed_args.path_list is None:
args_parser.error("No paths specified.")

# Validate paths were specified using only one method
if len(parsed_args.paths) > 0 and parsed_args.path_list is not None:
args_parser.error("Paths cannot be specified on the command line AND through a file.")

validate_fs_input_args(parsed_args, args_parser)
elif InputType.S3 == input_type:
if StorageEngine.CLP_S != clp_config.package.storage_engine:
raise ValueError(
f"input type {InputType.S3} is only supported with storage engine {StorageEngine.CLP_S}"
)

validate_s3_input_args(parsed_args, args_parser, clp_config)
else:
raise ValueError(f"Unsupported input type: {input_type}.")

Expand Down
23 changes: 23 additions & 0 deletions components/clp-py-utils/clp_py_utils/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,29 @@
AWS_ENDPOINT = "amazonaws.com"


def parse_aws_credentials_file(credentials_file_path: Path) -> Tuple[str, str]:
aws_access_key_id = None
aws_secret_access_key = None

if not credentials_file_path.exists():
raise ValueError(f"File {credentials_file_path} doesn't exist.")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

with open(credentials_file_path, "r") as f:
for line in f:
line = line.strip()
if line.startswith("aws_access_key_id"):
aws_access_key_id = line.split("=", 1)[1].strip()
elif line.startswith("aws_secret_access_key"):
aws_secret_access_key = line.split("=", 1)[1].strip()
Comment on lines +32 to +38
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use configparser instead?


if aws_access_key_id is None or aws_secret_access_key is None:
raise ValueError(
"The credentials file must contain both 'aws_access_key_id' and 'aws_secret_access_key'."
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)

return aws_access_key_id, aws_secret_access_key


def parse_s3_url(s3_url: str) -> Tuple[str, str, str]:
host_style_url_regex = re.compile(
r"https://(?P<bucket_name>[a-z0-9.-]+)\.s3(\.(?P<region_code>[a-z0-9-]+))?"
Expand Down