Skip to content

Commit bcb7f54

Browse files
Bill-hbrhbrkirkrodriguesMarcodavemarco
authored
feat(clp-json): Use dataset-specific tables and archive directories for compression, decompression, and search. (#868)
Co-authored-by: Kirk Rodrigues <2454684+kirkrodrigues@users.noreply.github.com> Co-authored-by: Marco <david.marcovitch@yscope.com> Co-authored-by: davemarco <83603688+davemarco@users.noreply.github.com>
1 parent f3ffa53 commit bcb7f54

File tree

11 files changed

+207
-29
lines changed

11 files changed

+207
-29
lines changed

components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
from clp_py_utils.clp_config import (
1111
ARCHIVE_TAGS_TABLE_SUFFIX,
1212
ARCHIVES_TABLE_SUFFIX,
13+
CLP_DEFAULT_DATASET_NAME,
1314
Database,
1415
FILES_TABLE_SUFFIX,
16+
StorageEngine,
1517
)
1618
from clp_py_utils.sql_adapter import SQL_Adapter
1719

@@ -182,6 +184,7 @@ def main(argv: typing.List[str]) -> int:
182184
logger.exception("Failed to load config.")
183185
return -1
184186

187+
storage_engine: StorageEngine = clp_config.package.storage_engine
185188
database_config: Database = clp_config.database
186189
archives_dir: Path = clp_config.archive_output.get_directory()
187190
if not archives_dir.exists():
@@ -192,6 +195,8 @@ def main(argv: typing.List[str]) -> int:
192195
return _find_archives(
193196
archives_dir,
194197
database_config,
198+
storage_engine,
199+
CLP_DEFAULT_DATASET_NAME,
195200
parsed_args.begin_ts,
196201
parsed_args.end_ts,
197202
)
@@ -202,6 +207,8 @@ def main(argv: typing.List[str]) -> int:
202207
return _delete_archives(
203208
archives_dir,
204209
database_config,
210+
storage_engine,
211+
CLP_DEFAULT_DATASET_NAME,
205212
delete_handler,
206213
parsed_args.dry_run,
207214
)
@@ -212,6 +219,8 @@ def main(argv: typing.List[str]) -> int:
212219
return _delete_archives(
213220
archives_dir,
214221
database_config,
222+
storage_engine,
223+
CLP_DEFAULT_DATASET_NAME,
215224
delete_handler,
216225
parsed_args.dry_run,
217226
)
@@ -226,6 +235,8 @@ def main(argv: typing.List[str]) -> int:
226235
def _find_archives(
227236
archives_dir: Path,
228237
database_config: Database,
238+
storage_engine: StorageEngine,
239+
dataset: str,
229240
begin_ts: int,
230241
end_ts: int = typing.Optional[int],
231242
) -> int:
@@ -234,6 +245,8 @@ def _find_archives(
234245
`begin_ts <= archive.begin_timestamp` and `archive.end_timestamp <= end_ts`.
235246
:param archives_dir:
236247
:param database_config:
248+
:param storage_engine:
249+
:param dataset:
237250
:param begin_ts:
238251
:param end_ts:
239252
:return: 0 on success, 1 on failure.
@@ -246,6 +259,9 @@ def _find_archives(
246259
database_config.get_clp_connection_params_and_type(True)
247260
)
248261
table_prefix: str = clp_db_connection_params["table_prefix"]
262+
if StorageEngine.CLP_S == storage_engine:
263+
table_prefix = f"{table_prefix}{dataset}_"
264+
249265
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
250266
db_conn.cursor(dictionary=True)
251267
) as db_cursor:
@@ -271,7 +287,7 @@ def _find_archives(
271287
logger.info(f"Found {len(archive_ids)} archives within the specified time range.")
272288
for archive_id in archive_ids:
273289
logger.info(archive_id)
274-
archive_path: Path = archives_dir / archive_id
290+
archive_path: Path = archives_dir / dataset / archive_id
275291
if not archive_path.is_dir():
276292
logger.warning(f"Archive {archive_id} in database not found on disk.")
277293

@@ -286,6 +302,8 @@ def _find_archives(
286302
def _delete_archives(
287303
archives_dir: Path,
288304
database_config: Database,
305+
storage_engine: StorageEngine,
306+
dataset: str,
289307
delete_handler: DeleteHandler,
290308
dry_run: bool = False,
291309
) -> int:
@@ -294,6 +312,8 @@ def _delete_archives(
294312
295313
:param archives_dir:
296314
:param database_config:
315+
:param storage_engine:
316+
:param dataset:
297317
:param delete_handler: Object to handle differences between by-filter and by-ids delete types.
298318
:param dry_run: If True, no changes will be made to the database or disk.
299319
:return: 0 on success, -1 otherwise.
@@ -307,6 +327,9 @@ def _delete_archives(
307327
database_config.get_clp_connection_params_and_type(True)
308328
)
309329
table_prefix = clp_db_connection_params["table_prefix"]
330+
if StorageEngine.CLP_S == storage_engine:
331+
table_prefix = f"{table_prefix}{dataset}_"
332+
310333
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
311334
db_conn.cursor(dictionary=True)
312335
) as db_cursor:
@@ -365,7 +388,7 @@ def _delete_archives(
365388
logger.info(f"Finished deleting archives from the database.")
366389

367390
for archive_id in archive_ids:
368-
archive_path: Path = archives_dir / archive_id
391+
archive_path: Path = archives_dir / dataset / archive_id
369392
if not archive_path.is_dir():
370393
logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.")
371394
continue

components/clp-package-utils/clp_package_utils/scripts/native/decompress.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import yaml
1212
from clp_py_utils.clp_config import (
13+
CLP_DEFAULT_DATASET_NAME,
1314
CLPConfig,
1415
Database,
1516
FILES_TABLE_SUFFIX,
@@ -139,7 +140,9 @@ def handle_extract_stream_cmd(
139140
elif EXTRACT_JSON_CMD == command:
140141
job_type = QueryJobType.EXTRACT_JSON
141142
job_config = ExtractJsonJobConfig(
142-
archive_id=parsed_args.archive_id, target_chunk_size=parsed_args.target_chunk_size
143+
dataset=CLP_DEFAULT_DATASET_NAME,
144+
archive_id=parsed_args.archive_id,
145+
target_chunk_size=parsed_args.target_chunk_size,
143146
)
144147
else:
145148
logger.error(f"Unsupported stream extraction command: {command}")

components/clp-package-utils/clp_package_utils/scripts/start_clp.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
ALL_TARGET_NAME,
1717
ARCHIVES_TABLE_SUFFIX,
1818
AwsAuthType,
19+
CLP_DEFAULT_DATASET_NAME,
1920
CLPConfig,
2021
COMPRESSION_JOBS_TABLE_NAME,
2122
COMPRESSION_SCHEDULER_COMPONENT_NAME,
@@ -31,6 +32,7 @@
3132
REDIS_COMPONENT_NAME,
3233
REDUCER_COMPONENT_NAME,
3334
RESULTS_CACHE_COMPONENT_NAME,
35+
StorageEngine,
3436
StorageType,
3537
WEBUI_COMPONENT_NAME,
3638
)
@@ -864,6 +866,8 @@ def start_webui(instance_id: str, clp_config: CLPConfig, mounts: CLPDockerMounts
864866
# Read and update settings.json
865867
clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True)
866868
table_prefix = clp_db_connection_params["table_prefix"]
869+
if StorageEngine.CLP_S == clp_config.package.storage_engine:
870+
table_prefix = f"{table_prefix}{CLP_DEFAULT_DATASET_NAME}_"
867871
meteor_settings_updates = {
868872
"private": {
869873
"SqlDbHost": clp_config.database.host,

components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
from __future__ import annotations
22

3+
from pathlib import Path
4+
from typing import Set
5+
36
from clp_py_utils.clp_config import (
47
ARCHIVE_TAGS_TABLE_SUFFIX,
58
ARCHIVES_TABLE_SUFFIX,
6-
CLP_DEFAULT_DATASET_NAME,
79
COLUMN_METADATA_TABLE_SUFFIX,
810
DATASETS_TABLE_SUFFIX,
911
FILES_TABLE_SUFFIX,
12+
StorageType,
1013
TAGS_TABLE_SUFFIX,
1114
)
1215

@@ -95,7 +98,7 @@ def _create_column_metadata_table(db_cursor, table_prefix: str) -> None:
9598

9699
def create_datasets_table(db_cursor, table_prefix: str) -> None:
97100
"""
98-
Creates the dataset information table.
101+
Creates the datasets information table.
99102
100103
:param db_cursor: The database cursor to execute the table creation.
101104
:param table_prefix: A string to prepend to the table name.
@@ -115,6 +118,51 @@ def create_datasets_table(db_cursor, table_prefix: str) -> None:
115118
)
116119

117120

121+
def add_dataset(
122+
db_conn,
123+
db_cursor,
124+
table_prefix: str,
125+
dataset_name: str,
126+
archive_storage_type: StorageType,
127+
dataset_archive_storage_directory: Path,
128+
) -> None:
129+
"""
130+
Inserts a new dataset into the `datasets` table and creates the corresponding standard set of
131+
tables for CLP's metadata.
132+
133+
:param db_conn:
134+
:param db_cursor: The database cursor to execute the table row insertion.
135+
:param table_prefix: A string to prepend to the table name.
136+
:param dataset_name:
137+
:param archive_storage_type:
138+
:param dataset_archive_storage_directory:
139+
"""
140+
query = f"""INSERT INTO `{table_prefix}{DATASETS_TABLE_SUFFIX}`
141+
(name, archive_storage_type, archive_storage_directory)
142+
VALUES (%s, %s, %s)
143+
"""
144+
db_cursor.execute(
145+
query, (dataset_name, archive_storage_type, str(dataset_archive_storage_directory))
146+
)
147+
create_metadata_db_tables(db_cursor, table_prefix, dataset_name)
148+
db_conn.commit()
149+
150+
151+
def fetch_existing_datasets(
152+
db_cursor,
153+
table_prefix: str,
154+
) -> Set[str]:
155+
"""
156+
Gets the names of all existing datasets.
157+
158+
:param db_cursor:
159+
:param table_prefix:
160+
"""
161+
db_cursor.execute(f"SELECT name FROM `{table_prefix}{DATASETS_TABLE_SUFFIX}`")
162+
rows = db_cursor.fetchall()
163+
return {row["name"] for row in rows}
164+
165+
118166
def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None = None) -> None:
119167
"""
120168
Creates the standard set of tables for CLP's metadata.
@@ -125,6 +173,7 @@ def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None
125173
"""
126174
if dataset is not None:
127175
table_prefix = f"{table_prefix}{dataset}_"
176+
_create_column_metadata_table(db_cursor, table_prefix)
128177

129178
archives_table_name = f"{table_prefix}{ARCHIVES_TABLE_SUFFIX}"
130179
tags_table_name = f"{table_prefix}{TAGS_TABLE_SUFFIX}"
@@ -136,7 +185,3 @@ def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None
136185
db_cursor, archive_tags_table_name, archives_table_name, tags_table_name
137186
)
138187
_create_files_table(db_cursor, table_prefix)
139-
140-
# TODO: Create this table only for the `CLP_S` storage-engine after the dataset feature is
141-
# fully implemented.
142-
_create_column_metadata_table(db_cursor, f"{table_prefix}{CLP_DEFAULT_DATASET_NAME}_")

components/clp-py-utils/clp_py_utils/initialize-clp-metadata-db.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,10 @@ def main(argv):
5353
with closing(sql_adapter.create_connection(True)) as metadata_db, closing(
5454
metadata_db.cursor(dictionary=True)
5555
) as metadata_db_cursor:
56-
# TODO: After the dataset feature is fully implemented, for clp-json:
57-
# 1. Populate the datasets table with the name and path for the "default" dataset.
58-
# 2. Change the metadata tables to be specific to the "default" dataset.
5956
if StorageEngine.CLP_S == storage_engine:
6057
create_datasets_table(metadata_db_cursor, table_prefix)
61-
create_metadata_db_tables(metadata_db_cursor, table_prefix)
58+
else:
59+
create_metadata_db_tables(metadata_db_cursor, table_prefix)
6260
metadata_db.commit()
6361
except:
6462
logger.exception("Failed to create clp metadata tables.")

components/job-orchestration/job_orchestration/executor/compress/compression_task.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from clp_py_utils.clp_config import (
1313
ARCHIVE_TAGS_TABLE_SUFFIX,
1414
ARCHIVES_TABLE_SUFFIX,
15-
CLP_DEFAULT_DATASET_NAME,
1615
COMPRESSION_JOBS_TABLE_NAME,
1716
COMPRESSION_TASKS_TABLE_NAME,
1817
Database,
@@ -280,6 +279,8 @@ def run_clp(
280279
s3_config = worker_config.archive_output.storage.s3_config
281280
enable_s3_write = True
282281

282+
table_prefix = clp_metadata_db_connection_config["table_prefix"]
283+
input_dataset: str
283284
if StorageEngine.CLP == clp_storage_engine:
284285
compression_cmd, compression_env = _make_clp_command_and_env(
285286
clp_home=clp_home,
@@ -288,6 +289,12 @@ def run_clp(
288289
db_config_file_path=db_config_file_path,
289290
)
290291
elif StorageEngine.CLP_S == clp_storage_engine:
292+
input_dataset = clp_config.input.dataset
293+
table_prefix = f"{table_prefix}{input_dataset}_"
294+
archive_output_dir = archive_output_dir / input_dataset
295+
if StorageType.S3 == storage_type:
296+
s3_config.key_prefix = f"{s3_config.key_prefix}{input_dataset}/"
297+
291298
compression_cmd, compression_env = _make_clp_s_command_and_env(
292299
clp_home=clp_home,
293300
archive_output_dir=archive_output_dir,
@@ -367,7 +374,6 @@ def run_clp(
367374
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
368375
db_conn.cursor(dictionary=True)
369376
) as db_cursor:
370-
table_prefix = clp_metadata_db_connection_config["table_prefix"]
371377
if StorageEngine.CLP_S == clp_storage_engine:
372378
update_archive_metadata(db_cursor, table_prefix, last_archive_stats)
373379
update_job_metadata_and_tags(
@@ -384,7 +390,7 @@ def run_clp(
384390
str(clp_home / "bin" / "indexer"),
385391
"--db-config-file",
386392
str(db_config_file_path),
387-
CLP_DEFAULT_DATASET_NAME,
393+
input_dataset,
388394
archive_path,
389395
]
390396
try:

components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,10 @@ def _make_clp_s_command_and_env_vars(
9494
"x",
9595
]
9696

97+
dataset = extract_json_config.dataset
9798
if StorageType.S3 == storage_type:
9899
s3_config = worker_config.archive_output.storage.s3_config
100+
s3_config.key_prefix = f"{s3_config.key_prefix}{dataset}/"
99101
try:
100102
s3_url = generate_s3_virtual_hosted_style_url(
101103
s3_config.region_code, s3_config.bucket, f"{s3_config.key_prefix}{archive_id}"
@@ -114,9 +116,10 @@ def _make_clp_s_command_and_env_vars(
114116
env_vars = dict(os.environ)
115117
env_vars.update(get_credential_env_vars(s3_config.aws_authentication))
116118
else:
119+
archives_dir = worker_config.archive_output.get_directory() / dataset
117120
# fmt: off
118121
command.extend((
119-
str(worker_config.archive_output.get_directory()),
122+
str(archives_dir),
120123
str(stream_output_dir),
121124
"--archive-id",
122125
archive_id,

components/job-orchestration/job_orchestration/executor/query/fs_search_task.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,15 @@ def _make_core_clp_s_command_and_env_vars(
5454
archive_id: str,
5555
search_config: SearchJobConfig,
5656
) -> Tuple[Optional[List[str]], Optional[Dict[str, str]]]:
57-
archives_dir = worker_config.archive_output.get_directory()
5857
command = [
5958
str(clp_home / "bin" / "clp-s"),
6059
"s",
6160
]
6261

62+
dataset = search_config.dataset
6363
if StorageType.S3 == worker_config.archive_output.storage.type:
6464
s3_config = worker_config.archive_output.storage.s3_config
65+
s3_config.key_prefix = f"{s3_config.key_prefix}{dataset}/"
6566
try:
6667
s3_url = generate_s3_virtual_hosted_style_url(
6768
s3_config.region_code, s3_config.bucket, f"{s3_config.key_prefix}{archive_id}"
@@ -79,6 +80,7 @@ def _make_core_clp_s_command_and_env_vars(
7980
env_vars = dict(os.environ)
8081
env_vars.update(get_credential_env_vars(s3_config.aws_authentication))
8182
else:
83+
archives_dir = worker_config.archive_output.get_directory() / dataset
8284
# fmt: off
8385
command.extend((
8486
str(archives_dir),

0 commit comments

Comments
 (0)