Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-package): Add support for clp-s s3 ingestion #651

Open
wants to merge 67 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
ca46dca
First version backup
haiqi96 Dec 11, 2024
b763e8b
Small refactor
haiqi96 Dec 11, 2024
4e9529c
First trial for new config
haiqi96 Dec 11, 2024
e9cdea4
Further refactor and polishing
haiqi96 Dec 11, 2024
9ba0a38
Another small refactor
haiqi96 Dec 12, 2024
58befef
small refactor again
haiqi96 Dec 12, 2024
35ec0c3
Combine s3 utils
haiqi96 Dec 12, 2024
5d57b10
Support handling S3 error message
haiqi96 Dec 12, 2024
9991307
Slight logging modification
haiqi96 Dec 12, 2024
5d23790
Linter
haiqi96 Dec 12, 2024
b4bb2af
Add extra verification
haiqi96 Dec 12, 2024
f41c558
Update components/clp-py-utils/clp_py_utils/clp_config.py
haiqi96 Dec 12, 2024
ce5a667
do nothing for now
haiqi96 Dec 12, 2024
f05dc88
backup changes for worker config
haiqi96 Dec 12, 2024
abf5dde
More support
haiqi96 Dec 13, 2024
7d34456
Remove unnecssary change
haiqi96 Dec 13, 2024
a7afd0d
Linter
haiqi96 Dec 13, 2024
99d3094
Handle mount for fs & S3
haiqi96 Dec 13, 2024
1afed1a
Linter
haiqi96 Dec 13, 2024
1de661a
Remove unused functions
haiqi96 Dec 13, 2024
ce3de98
Update components/job-orchestration/job_orchestration/executor/compre…
haiqi96 Dec 13, 2024
f49664f
simplify worker config
haiqi96 Dec 13, 2024
046cdcb
polishing
haiqi96 Dec 13, 2024
242dec2
linter
haiqi96 Dec 14, 2024
ed280cb
Apply suggestions from code review
haiqi96 Dec 16, 2024
0788e59
Fix easier ones
haiqi96 Dec 16, 2024
c198f27
Backup changes
haiqi96 Dec 16, 2024
4819f76
Small fixes
haiqi96 Dec 16, 2024
e5f43fb
fixes
haiqi96 Dec 16, 2024
1246062
add safeguard for archive update failure
haiqi96 Dec 17, 2024
3b870a4
Add docstrings
haiqi96 Dec 17, 2024
214ae3f
Apply suggestions from code review
haiqi96 Dec 18, 2024
6ff92fc
Clean up
haiqi96 Dec 18, 2024
9e07d37
update pyproject.toml
haiqi96 Dec 18, 2024
915b49d
Add docstrings
haiqi96 Dec 18, 2024
a061a29
Apply suggestions from code review
haiqi96 Dec 18, 2024
8301748
Update name as suggested by the code review
haiqi96 Dec 18, 2024
2ada464
a few small fixes to ensure other scripts still work
haiqi96 Dec 18, 2024
6e5aad5
adding safeguard for empty stdout line from clp.
haiqi96 Dec 18, 2024
55c0f36
add safe guard for search
haiqi96 Dec 18, 2024
2d7443e
Polish error messages.
haiqi96 Dec 18, 2024
6f907b2
Linter
haiqi96 Dec 18, 2024
120ffec
Slighlty improve the error message
haiqi96 Dec 18, 2024
d5eae21
Back up
haiqi96 Dec 17, 2024
ce2b440
Backup
haiqi96 Dec 19, 2024
6d2b815
Merge branch 'main' into s3_scheduler
haiqi96 Dec 19, 2024
b8f715d
Update execution image dependency
haiqi96 Dec 19, 2024
57e1912
simplify the code a little bit
haiqi96 Dec 19, 2024
27b8612
fix a previous mistake
haiqi96 Dec 19, 2024
d55f1ad
Keep fixing previous mistake
haiqi96 Dec 19, 2024
4de4fee
add url parsing helper
haiqi96 Dec 19, 2024
4224bd6
Linter
haiqi96 Dec 20, 2024
1cf3d01
Some refactor
haiqi96 Dec 20, 2024
b1655cd
Refactor compress scripts
haiqi96 Dec 20, 2024
d12e173
Initial support for cmdline
haiqi96 Jan 2, 2025
6833ee9
Linter fixes
haiqi96 Jan 2, 2025
a4e92ae
add argument checks
haiqi96 Jan 2, 2025
a638f2d
Polishing
haiqi96 Jan 3, 2025
5685224
Add some docstrings
haiqi96 Jan 3, 2025
fd9dba2
fixes
haiqi96 Jan 3, 2025
f7a175c
Rename task script
haiqi96 Jan 3, 2025
20488a1
fixes
haiqi96 Jan 3, 2025
12d6b97
Some captilization and update to the docstrings
haiqi96 Jan 3, 2025
709dfb0
Merge branch 'main' into s3_scheduler
haiqi96 Jan 13, 2025
fa5ace1
Apply suggestions from code review
haiqi96 Jan 15, 2025
a4675fd
First batch of required updates
haiqi96 Jan 15, 2025
642eb81
Changes missing from the first batch
haiqi96 Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Initial support for cmdline
  • Loading branch information
haiqi96 committed Jan 2, 2025
commit d12e173ff3694664d6a28d6ddccefe8821de0b6a
143 changes: 112 additions & 31 deletions components/clp-package-utils/clp_package_utils/scripts/compress.py
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import subprocess
import sys
import uuid
from typing import List

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand All @@ -16,44 +17,108 @@
load_config_file,
validate_and_load_db_credentials_file,
)
from clp_py_utils.clp_config import StorageEngine
from job_orchestration.scheduler.job_config import InputType

logger = logging.getLogger(__file__)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

args_parser = argparse.ArgumentParser(description="Compresses files/directories")
def generate_targets_list(
input_type: InputType,
container_targets_list_path: pathlib.Path,
parsed_args: argparse.Namespace,
) -> None:
if InputType.FS == input_type:
compression_targets_list_file = parsed_args.path_list
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
with open(container_targets_list_path, "w") as container_targets_list_file:
if compression_targets_list_file is not None:
with open(compression_targets_list_file, "r") as targets_list_file:
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
for line in targets_list_file:
resolved_path = pathlib.Path(line.rstrip()).resolve()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We should still check for an empty path otherwise this will resolve to the current working directory, right?
  • Can we move the operation to add the CONTAINER_INPUT_LOGS_ROOT_DIR prefix back into this script? The native scripts are (or at least were) written in such a way that they shouldn't need to know that they're running in a container whereas the non-native scripts do have to deal with all the container stuff. So I'd rather keep the CONTAINER_INPUT_LOGS_ROOT_DIR logic in the non-native scripts. Ditto below.

container_targets_list_file.write(f"{resolved_path}\n")

for path in parsed_args.paths:
resolved_path = pathlib.Path(path).resolve()
container_targets_list_file.write(f"{resolved_path}\n")

elif InputType.S3 == input_type:
with open(container_targets_list_path, "w") as container_targets_list_file:
container_targets_list_file.write(f"{parsed_args.url}\n")

else:
raise ValueError(f"Unsupported input type: {input_type}.")


def append_input_specific_args(
compress_cmd: List[str],
parsed_args: argparse.Namespace
) -> None:
input_type = parsed_args.input_type

if InputType.FS == input_type:
return
elif InputType.S3 == input_type:
# TODO: also think about credentials from file.
if parsed_args.aws_access_key_id is not None:
compress_cmd.append("--aws-access-key-id")
compress_cmd.append(parsed_args.aws_access_key_id)
if parsed_args.aws_secret_access_key is not None:
compress_cmd.append("--aws-secret-access-key")
compress_cmd.append(parsed_args.aws_secret_access_key)
else:
raise ValueError(f"Unsupported input type: {input_type}.")


def add_common_arguments(
args_parser: argparse.ArgumentParser,
default_config_file_path: pathlib.Path
) -> None:
args_parser.add_argument(
"--config",
"-c",
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument("paths", metavar="PATH", nargs="*", help="Paths to compress.")
args_parser.add_argument(
"-f", "--path-list", dest="path_list", help="A file listing all paths to compress."
)
args_parser.add_argument(
"--timestamp-key",
help="The path (e.g. x.y) for the field containing the log event's timestamp.",
)
args_parser.add_argument(
"-t", "--tags", help="A comma-separated list of tags to apply to the compressed archives."
)
args_parser.add_argument(
"--no-progress-reporting", action="store_true", help="Disables progress reporting."
)

parsed_args = args_parser.parse_args(argv[1:])

paths_to_compress = parsed_args.paths
compression_path_list = parsed_args.path_list
# Validate some input paths were specified
if len(paths_to_compress) == 0 and compression_path_list is None:
args_parser.error("No paths specified.")
def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

# Validate paths were specified using only one method
if len(paths_to_compress) > 0 and compression_path_list is not None:
args_parser.error("Paths cannot be specified on the command line AND through a file.")
args_parser = argparse.ArgumentParser(description="Compresses files from filesystem/s3")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
input_type_args_parser = args_parser.add_subparsers(dest="input_type")

fs_compressor_parser = input_type_args_parser.add_parser(InputType.FS)
add_common_arguments(fs_compressor_parser, default_config_file_path)
fs_compressor_parser.add_argument("paths", metavar="PATH", nargs="*", help="Paths to compress.")
fs_compressor_parser.add_argument(
"-f", "--path-list", dest="path_list", help="A file listing all paths to compress."
)

s3_compressor_parser = input_type_args_parser.add_parser(InputType.S3)
add_common_arguments(s3_compressor_parser, default_config_file_path)
s3_compressor_parser.add_argument("url", metavar="URL", help="URL of object to be compressed")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s3_compressor_parser.add_argument("url", metavar="URL", help="URL of object to be compressed")
s3_compressor_parser.add_argument("url", metavar="URL", help="URL of the object to be compressed")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about URL of objects to be compressed?
Since the URL can be pointing to a path prefix

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Let's also make sure errors indicate that it's objects as well.

s3_compressor_parser.add_argument(
"--aws-access-key-id", type=str, default=None, help="AWS access key id."
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)
s3_compressor_parser.add_argument(
"--aws-secret-access-key", type=str, default=None, help="AWS secret access key."
)
# args_parser.add_argument(
# "--aws-credentials-file", type=str, default=None, help="Access key id."
# )

parsed_args = args_parser.parse_args(argv[1:])

# Validate and load config file
try:
Expand All @@ -67,6 +132,23 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

input_type = parsed_args.input_type
if InputType.FS == input_type:
# Validate some input paths were specified
if len(parsed_args.paths) == 0 and parsed_args.path_list is None:
args_parser.error("No paths specified.")

# Validate paths were specified using only one method
if len(parsed_args.paths) > 0 and parsed_args.path_list is not None:
args_parser.error("Paths cannot be specified on the command line AND through a file.")

elif InputType.S3 == input_type:
if StorageEngine.CLP_S != clp_config.package.storage_engine:
raise ValueError(f"input type {InputType.S3} is only supported with storage engine {StorageEngine.CLP_S}")

else:
raise ValueError(f"Unsupported input type: {input_type}.")

container_name = generate_container_name(str(JobType.COMPRESSION))

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
Expand All @@ -83,6 +165,7 @@ def main(argv):
compress_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.compress",
input_type,
"--config", str(generated_config_path_on_container)
]
# fmt: on
Expand All @@ -92,28 +175,26 @@ def main(argv):
if parsed_args.tags is not None:
compress_cmd.append("--tags")
compress_cmd.append(parsed_args.tags)
if parsed_args.no_progress_reporting is True:
compress_cmd.append("--no-progress-reporting")

append_input_specific_args(compress_cmd, parsed_args)

# Write paths to compress to a file
# Write targets to compress to a file
while True:
# Get unused output path
container_path_list_filename = f"{uuid.uuid4()}.txt"
container_path_list_path = clp_config.logs_directory / container_path_list_filename
if not container_path_list_path.exists():
break

with open(container_path_list_path, "w") as container_path_list_file:
if compression_path_list is not None:
with open(parsed_args.path_list, "r") as path_list_file:
for line in path_list_file:
resolved_path = pathlib.Path(line.rstrip()).resolve()
container_path_list_file.write(f"{resolved_path}\n")

for path in paths_to_compress:
resolved_path = pathlib.Path(path).resolve()
container_path_list_file.write(f"{resolved_path}\n")

generate_targets_list(
input_type,
container_path_list_path,
parsed_args
)
compress_cmd.append("--path-list")
compress_cmd.append(container_clp_config.logs_directory / container_path_list_filename)
compress_cmd.append(str(container_clp_config.logs_directory / container_path_list_filename))

cmd = container_start_cmd + compress_cmd
subprocess.run(cmd, check=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
import pathlib
import sys
import time
import typing
from contextlib import closing
from typing import List

import brotli
import msgpack
from clp_py_utils.clp_config import COMPRESSION_JOBS_TABLE_NAME
from clp_py_utils.pretty_size import pretty_size
from clp_py_utils.s3_utils import parse_s3_url
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.scheduler.constants import (
CompressionJobCompletionStatus,
Expand All @@ -18,7 +21,9 @@
from job_orchestration.scheduler.job_config import (
ClpIoConfig,
FsInputConfig,
S3InputConfig,
OutputConfig,
InputType
)

from clp_package_utils.general import (
Expand Down Expand Up @@ -122,11 +127,77 @@ def handle_job(sql_adapter: SQL_Adapter, clp_io_config: ClpIoConfig, no_progress
return CompressionJobCompletionStatus.SUCCEEDED


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH
def generate_clp_io_config(
targets_to_compress: List[str],
parsed_args: argparse.Namespace
) -> typing.Union[S3InputConfig, FsInputConfig]:
input_type = parsed_args.input_type

if InputType.FS == input_type:
return FsInputConfig(
paths_to_compress=targets_to_compress,
timestamp_key=parsed_args.timestamp_key,
path_prefix_to_remove=str(CONTAINER_INPUT_LOGS_ROOT_DIR),
)
elif InputType.S3 == input_type:
if len(targets_to_compress) != 1:
logger.error(f"Unexpected number of targets: {targets_to_compress}")
exit(-1)
s3_url = targets_to_compress[0]
region_code, bucket_name, key_prefix = parse_s3_url(s3_url)
return S3InputConfig(
region_code=region_code,
bucket=bucket_name,
key_prefix=key_prefix,
aws_access_key_id=parsed_args.aws_access_key_id,
aws_secret_access_key=parsed_args.aws_secret_access_key,
timestamp_key=parsed_args.timestamp_key,
)
else:
raise ValueError(f"Unsupported input type: {input_type}")


def get_targets_to_compress(
compress_path_list_path: pathlib.Path,
input_type: InputType
) -> List[str]:
# Define the path processing function based on the input type
process_path_func: typing.Callable[[str], str]

def process_fs_path(path_str: str) -> str:
stripped_path = pathlib.Path(path_str)
container_file_path = CONTAINER_INPUT_LOGS_ROOT_DIR / pathlib.Path(
stripped_path
).relative_to(stripped_path.anchor)
return str(container_file_path.resolve())

def process_s3_path(path_str: str) -> str:
return path_str

if input_type == InputType.FS:
process_path_func = process_fs_path
elif input_type == InputType.S3:
process_path_func = process_s3_path
else:
raise ValueError(f"Unsupported input type: {input_type}")

targets_to_compress = []
# Read targets from the input file
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
with open(compress_path_list_path, "r") as f:
for path in f:
stripped_path_str = path.strip()
if "" == stripped_path_str:
# Skip empty paths
continue
targets_to_compress.append(process_path_func(stripped_path_str))

return targets_to_compress

args_parser = argparse.ArgumentParser(description="Compresses log files.")

def add_common_arguments(
args_parser: argparse.ArgumentParser,
default_config_file_path: pathlib.Path
) -> None:
args_parser.add_argument(
"--config",
"-c",
Expand All @@ -150,9 +221,29 @@ def main(argv):
args_parser.add_argument(
"-t", "--tags", help="A comma-separated list of tags to apply to the compressed archives."
)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH
args_parser = argparse.ArgumentParser(description="Compresses files from filesystem/s3")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
input_type_args_parser = args_parser.add_subparsers(dest="input_type")

fs_compressor_parser = input_type_args_parser.add_parser(InputType.FS)
add_common_arguments(fs_compressor_parser, default_config_file_path)

s3_compressor_parser = input_type_args_parser.add_parser(InputType.S3)
add_common_arguments(s3_compressor_parser, default_config_file_path)
s3_compressor_parser.add_argument(
"--aws-access-key-id", type=str, default=None, help="AWS access key id."
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)
s3_compressor_parser.add_argument(
"--aws-secret-access-key", type=str, default=None, help="AWS secret access key."
)

parsed_args = args_parser.parse_args(argv[1:])
compress_path_list_arg = parsed_args.path_list

input_type = parsed_args.input_type
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
# Validate and load config file
try:
config_file_path = pathlib.Path(parsed_args.config)
Expand All @@ -166,35 +257,20 @@ def main(argv):
comp_jobs_dir = clp_config.logs_directory / "comp-jobs"
comp_jobs_dir.mkdir(parents=True, exist_ok=True)

paths_to_compress = []
# Read paths from the input file
compress_path_list_path = pathlib.Path(compress_path_list_arg).resolve()
with open(compress_path_list_path, "r") as f:
for path in f:
stripped_path_str = path.strip()
if "" == stripped_path_str:
# Skip empty paths
continue
stripped_path = pathlib.Path(stripped_path_str)
container_file_path = CONTAINER_INPUT_LOGS_ROOT_DIR / pathlib.Path(
stripped_path
).relative_to(stripped_path.anchor)
resolved_path_str = str(container_file_path.resolve())
paths_to_compress.append(resolved_path_str)

mysql_adapter = SQL_Adapter(clp_config.database)
clp_input_config = FsInputConfig(
paths_to_compress=paths_to_compress,
timestamp_key=parsed_args.timestamp_key,
path_prefix_to_remove=str(CONTAINER_INPUT_LOGS_ROOT_DIR),
targets_to_compress = get_targets_to_compress(
pathlib.Path(parsed_args.path_list).resolve(),
input_type
)

clp_input_config = generate_clp_io_config(targets_to_compress, parsed_args)
clp_output_config = OutputConfig.parse_obj(clp_config.archive_output)
if parsed_args.tags:
tag_list = [tag.strip().lower() for tag in parsed_args.tags.split(",") if tag]
if len(tag_list) > 0:
clp_output_config.tags = tag_list
clp_io_config = ClpIoConfig(input=clp_input_config, output=clp_output_config)

mysql_adapter = SQL_Adapter(clp_config.database)
return handle_job(
sql_adapter=mysql_adapter,
clp_io_config=clp_io_config,
Expand Down