Skip to content

Commit 329eb7f

Browse files
Eden-D-Zhanghaiqi96kirkrodrigues
authored
feat(clp-package)!: Refactor S3 log-ingestion configuration to use S3 URL as compression script argument (fixes #842). (#852)
Co-authored-by: haiqi96 <14502009+haiqi96@users.noreply.github.com> Co-authored-by: Kirk Rodrigues <2454684+kirkrodrigues@users.noreply.github.com>
1 parent 175e8d9 commit 329eb7f

File tree

11 files changed

+151
-89
lines changed

11 files changed

+151
-89
lines changed

components/clp-package-utils/clp_package_utils/general.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ def validate_results_cache_config(
507507
def validate_worker_config(clp_config: CLPConfig):
508508
clp_config.validate_logs_input_config()
509509
clp_config.validate_archive_output_config()
510-
clp_config.validate_stream_output_dir()
510+
clp_config.validate_stream_output_config()
511511

512512

513513
def validate_webui_config(

components/clp-package-utils/clp_package_utils/scripts/compress.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def _validate_s3_input_args(
113113
f" {StorageEngine.CLP_S}."
114114
)
115115
if len(parsed_args.paths) != 1:
116-
args_parser.error(f"Only one key prefix can be specified for input type {InputType.S3}.")
116+
args_parser.error(f"Only one URL can be specified for input type {InputType.S3}.")
117117
if parsed_args.path_list is not None:
118118
args_parser.error(f"Path list file is unsupported for input type {InputType.S3}.")
119119

@@ -141,7 +141,9 @@ def main(argv):
141141
args_parser.add_argument(
142142
"--no-progress-reporting", action="store_true", help="Disables progress reporting."
143143
)
144-
args_parser.add_argument("paths", metavar="PATH", nargs="*", help="Paths to compress.")
144+
args_parser.add_argument(
145+
"paths", metavar="PATH", nargs="*", help="Paths or an S3 URL to compress."
146+
)
145147
args_parser.add_argument(
146148
"-f", "--path-list", dest="path_list", help="A file listing all paths to compress."
147149
)

components/clp-package-utils/clp_package_utils/scripts/native/compress.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import msgpack
1313
from clp_py_utils.clp_config import CLPConfig, COMPRESSION_JOBS_TABLE_NAME
1414
from clp_py_utils.pretty_size import pretty_size
15+
from clp_py_utils.s3_utils import parse_s3_url
1516
from clp_py_utils.sql_adapter import SQL_Adapter
1617
from job_orchestration.scheduler.constants import (
1718
CompressionJobCompletionStatus,
@@ -136,31 +137,34 @@ def _generate_clp_io_config(
136137
input_type = clp_config.logs_input.type
137138

138139
if InputType.FS == input_type:
140+
if len(logs_to_compress) == 0:
141+
raise ValueError(f"No input paths given.")
139142
return FsInputConfig(
140143
paths_to_compress=logs_to_compress,
141144
timestamp_key=parsed_args.timestamp_key,
142145
path_prefix_to_remove=str(CONTAINER_INPUT_LOGS_ROOT_DIR),
143146
)
144147
elif InputType.S3 == input_type:
145-
if len(logs_to_compress) != 1:
146-
raise ValueError(f"Too many key prefixes: {len(logs_to_compress)} > 1")
147-
148-
s3_config = clp_config.logs_input.s3_config
148+
if len(logs_to_compress) == 0:
149+
raise ValueError(f"No URLs given.")
150+
elif len(logs_to_compress) != 1:
151+
raise ValueError(f"Too many URLs: {len(logs_to_compress)} > 1")
152+
153+
s3_url = logs_to_compress[0]
154+
region_code, bucket_name, key_prefix = parse_s3_url(s3_url)
155+
aws_authentication = clp_config.logs_input.aws_authentication
149156
return S3InputConfig(
150-
region_code=s3_config.region_code,
151-
bucket=s3_config.bucket,
152-
key_prefix=s3_config.key_prefix + logs_to_compress[0],
153-
aws_authentication=s3_config.aws_authentication,
157+
region_code=region_code,
158+
bucket=bucket_name,
159+
key_prefix=key_prefix,
160+
aws_authentication=aws_authentication,
154161
timestamp_key=parsed_args.timestamp_key,
155162
)
156163
else:
157164
raise ValueError(f"Unsupported input type: {input_type}")
158165

159166

160167
def _get_logs_to_compress(logs_list_path: pathlib.Path) -> List[str]:
161-
# Define the path processing function based on the input type
162-
process_path_func: typing.Callable[[str], str]
163-
164168
# Read logs from the input file
165169
logs_to_compress = []
166170
with open(logs_list_path, "r") as f:

components/clp-py-utils/clp_py_utils/clp_config.py

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -393,13 +393,13 @@ def validate_bucket(cls, field):
393393
raise ValueError("bucket cannot be empty")
394394
return field
395395

396-
@validator("key_prefix")
397-
def validate_key_prefix(cls, field):
398-
if "" == field:
399-
raise ValueError("key_prefix cannot be empty")
400-
if not field.endswith("/"):
401-
raise ValueError('key_prefix must end with "/"')
402-
return field
396+
397+
class S3IngestionConfig(BaseModel):
398+
type: Literal[StorageType.S3.value] = StorageType.S3.value
399+
aws_authentication: AwsAuthentication
400+
401+
def dump_to_primitive_dict(self):
402+
return self.dict()
403403

404404

405405
class FsStorage(BaseModel):
@@ -424,13 +424,6 @@ def dump_to_primitive_dict(self):
424424
class S3Storage(BaseModel):
425425
type: Literal[StorageType.S3.value] = StorageType.S3.value
426426
s3_config: S3Config
427-
428-
def dump_to_primitive_dict(self):
429-
d = self.dict()
430-
return d
431-
432-
433-
class OutputS3Storage(S3Storage):
434427
staging_directory: pathlib.Path
435428

436429
@validator("staging_directory")
@@ -439,16 +432,28 @@ def validate_staging_directory(cls, field):
439432
raise ValueError("staging_directory cannot be empty")
440433
return field
441434

435+
@root_validator
436+
def validate_key_prefix(cls, values):
437+
s3_config = values.get("s3_config")
438+
if not hasattr(s3_config, "key_prefix"):
439+
raise ValueError("s3_config must have field key_prefix")
440+
key_prefix = s3_config.key_prefix
441+
if "" == key_prefix:
442+
raise ValueError("s3_config.key_prefix cannot be empty")
443+
if not key_prefix.endswith("/"):
444+
raise ValueError('s3_config.key_prefix must end with "/"')
445+
return values
446+
442447
def make_config_paths_absolute(self, clp_home: pathlib.Path):
443448
self.staging_directory = make_config_path_absolute(clp_home, self.staging_directory)
444449

445450
def dump_to_primitive_dict(self):
446-
d = super().dump_to_primitive_dict()
451+
d = self.dict()
447452
d["staging_directory"] = str(d["staging_directory"])
448453
return d
449454

450455

451-
class InputFsStorage(FsStorage):
456+
class FsIngestionConfig(FsStorage):
452457
directory: pathlib.Path = pathlib.Path("/")
453458

454459

@@ -460,16 +465,16 @@ class StreamFsStorage(FsStorage):
460465
directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "streams"
461466

462467

463-
class ArchiveS3Storage(OutputS3Storage):
468+
class ArchiveS3Storage(S3Storage):
464469
staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-archives"
465470

466471

467-
class StreamS3Storage(OutputS3Storage):
472+
class StreamS3Storage(S3Storage):
468473
staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-streams"
469474

470475

471476
def _get_directory_from_storage_config(
472-
storage_config: Union[FsStorage, OutputS3Storage],
477+
storage_config: Union[FsStorage, S3Storage],
473478
) -> pathlib.Path:
474479
storage_type = storage_config.type
475480
if StorageType.FS == storage_type:
@@ -481,7 +486,7 @@ def _get_directory_from_storage_config(
481486

482487

483488
def _set_directory_for_storage_config(
484-
storage_config: Union[FsStorage, OutputS3Storage], directory
489+
storage_config: Union[FsStorage, S3Storage], directory
485490
) -> None:
486491
storage_type = storage_config.type
487492
if StorageType.FS == storage_type:
@@ -603,7 +608,7 @@ def validate_port(cls, field):
603608
class CLPConfig(BaseModel):
604609
execution_container: Optional[str] = None
605610

606-
logs_input: Union[InputFsStorage, S3Storage] = InputFsStorage()
611+
logs_input: Union[FsIngestionConfig, S3IngestionConfig] = FsIngestionConfig()
607612

608613
package: Package = Package()
609614
database: Database = Database()
@@ -638,14 +643,20 @@ def make_config_paths_absolute(self, clp_home: pathlib.Path):
638643
self._os_release_file_path = make_config_path_absolute(clp_home, self._os_release_file_path)
639644

640645
def validate_logs_input_config(self):
641-
if StorageType.FS == self.logs_input.type:
646+
logs_input_type = self.logs_input.type
647+
if StorageType.FS == logs_input_type:
642648
# NOTE: This can't be a pydantic validator since input_logs_dir might be a
643649
# package-relative path that will only be resolved after pydantic validation
644650
input_logs_dir = self.logs_input.directory
645651
if not input_logs_dir.exists():
646652
raise ValueError(f"logs_input.directory '{input_logs_dir}' doesn't exist.")
647653
if not input_logs_dir.is_dir():
648654
raise ValueError(f"logs_input.directory '{input_logs_dir}' is not a directory.")
655+
if StorageType.S3 == logs_input_type and StorageEngine.CLP_S != self.package.storage_engine:
656+
raise ValueError(
657+
f"logs_input.type = 's3' is only supported with package.storage_engine"
658+
f" = '{StorageEngine.CLP_S}'"
659+
)
649660

650661
def validate_archive_output_config(self):
651662
if (
@@ -661,7 +672,15 @@ def validate_archive_output_config(self):
661672
except ValueError as ex:
662673
raise ValueError(f"archive_output.storage's directory is invalid: {ex}")
663674

664-
def validate_stream_output_dir(self):
675+
def validate_stream_output_config(self):
676+
if (
677+
StorageType.S3 == self.stream_output.storage.type
678+
and StorageEngine.CLP_S != self.package.storage_engine
679+
):
680+
raise ValueError(
681+
f"stream_output.storage.type = 's3' is only supported with package.storage_engine"
682+
f" = '{StorageEngine.CLP_S}'"
683+
)
665684
try:
666685
validate_path_could_be_dir(self.stream_output.get_directory())
667686
except ValueError as ex:
@@ -681,17 +700,16 @@ def validate_logs_dir(self):
681700

682701
def validate_aws_config_dir(self):
683702
profile_auth_used = False
684-
storage_configs = []
703+
auth_configs = []
685704

686705
if StorageType.S3 == self.logs_input.type:
687-
storage_configs.append(self.logs_input)
706+
auth_configs.append(self.logs_input.aws_authentication)
688707
if StorageType.S3 == self.archive_output.storage.type:
689-
storage_configs.append(self.archive_output.storage)
708+
auth_configs.append(self.archive_output.storage.s3_config.aws_authentication)
690709
if StorageType.S3 == self.stream_output.storage.type:
691-
storage_configs.append(self.stream_output.storage)
710+
auth_configs.append(self.stream_output.storage.s3_config.aws_authentication)
692711

693-
for storage in storage_configs:
694-
auth = storage.s3_config.aws_authentication
712+
for auth in auth_configs:
695713
if AwsAuthType.profile.value == auth.type:
696714
profile_auth_used = True
697715
break

components/clp-py-utils/clp_py_utils/s3_utils.py

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import re
23
from pathlib import Path
34
from typing import Dict, List, Optional, Tuple, Union
45

@@ -7,6 +8,7 @@
78
from job_orchestration.scheduler.job_config import S3InputConfig
89

910
from clp_py_utils.clp_config import (
11+
AwsAuthentication,
1012
AwsAuthType,
1113
CLPConfig,
1214
COMPRESSION_SCHEDULER_COMPONENT_NAME,
@@ -51,16 +53,16 @@ def _get_session_credentials(aws_profile: Optional[str] = None) -> Optional[S3Cr
5153
)
5254

5355

54-
def get_credential_env_vars(config: S3Config) -> Dict[str, str]:
56+
def get_credential_env_vars(auth: AwsAuthentication) -> Dict[str, str]:
5557
"""
5658
Generates AWS credential environment variables for tasks.
57-
:param config: S3Config or S3InputConfig from which to retrieve credentials.
58-
:return: A [str, str] Dict which access key pair and session token if applicable.
59-
:raise: ValueError if auth type is not supported
59+
:param auth: AwsAuthentication
60+
:return: A dictionary containing an access key-pair and optionally, a session token; or an empty
61+
dictionary if the AWS-credential environment-variables should've been set already.
62+
:raise: ValueError if `auth.type` is not a supported type or fails to authenticate with the
63+
given `auth`.
6064
"""
6165
env_vars: Optional[Dict[str, str]] = None
62-
auth = config.aws_authentication
63-
6466
aws_credentials: Optional[S3Credentials] = None
6567

6668
if AwsAuthType.env_vars == auth.type:
@@ -104,20 +106,22 @@ def generate_container_auth_options(
104106
:return: Tuple of (whether aws config mount is needed, credential env_vars to set).
105107
:raises: ValueError if environment variables are not set correctly.
106108
"""
107-
storages_by_component_type: List[Union[S3Storage, FsStorage]] = []
109+
output_storages_by_component_type: List[Union[S3Storage, FsStorage]] = []
110+
input_storage_needed = False
111+
108112
if component_type in (
109113
COMPRESSION_SCHEDULER_COMPONENT_NAME,
110114
COMPRESSION_WORKER_COMPONENT_NAME,
111115
):
112-
storages_by_component_type = [clp_config.logs_input, clp_config.archive_output.storage]
116+
output_storages_by_component_type = [clp_config.archive_output.storage]
117+
input_storage_needed = True
113118
elif component_type in (LOG_VIEWER_WEBUI_COMPONENT_NAME,):
114-
storages_by_component_type = [clp_config.stream_output.storage]
119+
output_storages_by_component_type = [clp_config.stream_output.storage]
115120
elif component_type in (
116121
QUERY_SCHEDULER_COMPONENT_NAME,
117122
QUERY_WORKER_COMPONENT_NAME,
118123
):
119-
storages_by_component_type = [
120-
clp_config.logs_input,
124+
output_storages_by_component_type = [
121125
clp_config.archive_output.storage,
122126
clp_config.stream_output.storage,
123127
]
@@ -126,14 +130,21 @@ def generate_container_auth_options(
126130
config_mount = False
127131
add_env_vars = False
128132

129-
for storage in storages_by_component_type:
133+
for storage in output_storages_by_component_type:
130134
if StorageType.S3 == storage.type:
131135
auth = storage.s3_config.aws_authentication
132136
if AwsAuthType.profile == auth.type:
133137
config_mount = True
134138
elif AwsAuthType.env_vars == auth.type:
135139
add_env_vars = True
136140

141+
if input_storage_needed and StorageType.S3 == clp_config.logs_input.type:
142+
auth = clp_config.logs_input.aws_authentication
143+
if AwsAuthType.profile == auth.type:
144+
config_mount = True
145+
elif AwsAuthType.env_vars == auth.type:
146+
add_env_vars = True
147+
137148
credentials_env_vars = []
138149

139150
if add_env_vars:
@@ -186,6 +197,41 @@ def _create_s3_client(s3_config: S3Config, boto3_config: Optional[Config] = None
186197
return s3_client
187198

188199

200+
def parse_s3_url(s3_url: str) -> Tuple[str, str, str]:
201+
"""
202+
Parses the region_code, bucket, and key_prefix from the given S3 URL.
203+
:param s3_url: A host-style URL or path-style URL.
204+
:return: A tuple of (region_code, bucket, key_prefix).
205+
:raise: ValueError if `s3_url` is not a valid host-style URL or path-style URL.
206+
"""
207+
208+
host_style_url_regex = re.compile(
209+
r"https://(?P<bucket_name>[a-z0-9.-]+)\.s3(\.(?P<region_code>[a-z]+-[a-z]+-[0-9]))"
210+
r"\.(?P<endpoint>[a-z0-9.-]+)/(?P<key_prefix>[^?]+).*"
211+
)
212+
match = host_style_url_regex.match(s3_url)
213+
214+
if match is None:
215+
path_style_url_regex = re.compile(
216+
r"https://s3(\.(?P<region_code>[a-z]+-[a-z]+-[0-9]))\.(?P<endpoint>[a-z0-9.-]+)/"
217+
r"(?P<bucket_name>[a-z0-9.-]+)/(?P<key_prefix>[^?]+).*"
218+
)
219+
match = path_style_url_regex.match(s3_url)
220+
221+
if match is None:
222+
raise ValueError(f"Unsupported URL format: {s3_url}")
223+
224+
region_code = match.group("region_code")
225+
bucket_name = match.group("bucket_name")
226+
endpoint = match.group("endpoint")
227+
key_prefix = match.group("key_prefix")
228+
229+
if AWS_ENDPOINT != endpoint:
230+
raise ValueError(f"Unsupported endpoint: {endpoint}")
231+
232+
return region_code, bucket_name, key_prefix
233+
234+
189235
def generate_s3_virtual_hosted_style_url(
190236
region_code: str, bucket_name: str, object_key: str
191237
) -> str:

components/job-orchestration/job_orchestration/executor/compress/compression_task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def _make_clp_s_command_and_env(
208208

209209
if InputType.S3 == clp_config.input.type:
210210
compression_env_vars = dict(os.environ)
211-
compression_env_vars.update(get_credential_env_vars(clp_config.input))
211+
compression_env_vars.update(get_credential_env_vars(clp_config.input.aws_authentication))
212212
compression_cmd.append("--auth")
213213
compression_cmd.append("s3")
214214
else:

components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ def _make_clp_s_command_and_env_vars(
112112
))
113113
# fmt: on
114114
env_vars = dict(os.environ)
115-
env_vars.update(get_credential_env_vars(s3_config))
115+
env_vars.update(get_credential_env_vars(s3_config.aws_authentication))
116116
else:
117117
# fmt: off
118118
command.extend((

0 commit comments

Comments
 (0)