-
Notifications
You must be signed in to change notification settings - Fork 81
feat(clp-json): Use dataset-specific tables and archive directories for compression, decompression, and search. #868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Note Reviews pausedUse the following commands to manage reviews:
## Walkthrough
This set of changes introduces dataset-aware logic across several components, primarily by dynamically adjusting SQL table prefixes and archive storage paths based on the storage engine and dataset name. Function signatures and internal logic are updated to propagate `storage_engine` and `dataset` parameters, especially for the `CLP_S` storage engine, ensuring that database operations and archive management are correctly namespaced per dataset. New helper functions are added for dataset registration and caching, and command construction for compression and search tasks is adjusted to use dataset-specific paths and prefixes. No changes are made to error handling or overall control flow beyond these parameterizations.
## Changes
| File(s) | Change Summary |
|---------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py` | Added `storage_engine` and `dataset` parameters to main, `_find_archives`, and `_delete_archives`. Table prefix is now conditionally suffixed with dataset name for `CLP_S` engine. |
| `components/clp-package-utils/clp_package_utils/scripts/native/decompress.py` | Updated `get_orig_file_id` to accept and use `storage_engine` and `dataset` for table prefixing. Adjusted `handle_extract_stream_cmd` to pass these parameters. |
| `components/clp-package-utils/clp_package_utils/scripts/start_clp.py` | Modified `start_webui` to dynamically adjust `table_prefix` with dataset name for `CLP_S` storage engine when constructing Meteor settings. |
| `components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py` | Added `insert_new_datasets_table_entry` for dataset registration. Adjusted table creation logic to use dataset-aware prefixes. Cleaned up imports and docstrings. |
| `components/clp-py-utils/clp_py_utils/initialize-clp-metadata-db.py` | Changed logic so only one of `create_datasets_table` or `create_metadata_db_tables` is called based on storage engine, making them mutually exclusive. |
| `components/job-orchestration/job_orchestration/executor/compress/compression_task.py` | Updated `run_clp` to adjust archive output directory, table prefix, and S3 key prefix with dataset name for `CLP_S`. Passes dataset name to indexer command. |
| `components/job-orchestration/job_orchestration/executor/query/fs_search_task.py` | Modified `_make_core_clp_s_command_and_env_vars` to append dataset name to archive directory and S3 key prefix. |
| `components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py` | Added `_fetch_existing_datasets` for dataset caching. Updated `search_and_schedule_new_tasks` and `main` to handle datasets, adjust table prefixes, and register new datasets as needed for `CLP_S`. |
| `components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py` | Added `StorageEngine` parameter to key functions. Adjusted table prefixing in `handle_pending_query_jobs` to include dataset name for `CLP_S`. Propagated storage engine parameter through function calls. |
## Sequence Diagram(s)
```mermaid
sequenceDiagram
participant Scheduler
participant MetadataDB
participant DatasetCache
Scheduler->>MetadataDB: Fetch existing datasets
MetadataDB-->>Scheduler: Return set of dataset names
loop For each new compression job
Scheduler->>DatasetCache: Check if dataset is registered
alt Dataset not in cache
Scheduler->>MetadataDB: Insert new dataset entry
Scheduler->>MetadataDB: Create metadata tables for dataset
MetadataDB-->>Scheduler: Acknowledge
Scheduler->>DatasetCache: Add dataset to cache
end
Scheduler->>MetadataDB: Use dataset-aware table prefix for job metadata
end sequenceDiagram
participant Executor
participant Config
participant ArchiveStorage
participant DB
Executor->>Config: Get storage_engine and dataset
alt storage_engine == CLP_S
Executor->>ArchiveStorage: Set archive dir and S3 key prefix with dataset name
Executor->>DB: Use table_prefix with dataset name
else
Executor->>ArchiveStorage: Use default archive dir and key prefix
Executor->>DB: Use default table_prefix
end
Executor->>Indexer: Run with dataset argument
Possibly related PRs
Suggested reviewers
|
…eduler can create them on demand
clp-s
.CLP_S
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (9)
components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py
(11 hunks)components/clp-package-utils/clp_package_utils/scripts/native/decompress.py
(4 hunks)components/clp-package-utils/clp_package_utils/scripts/start_clp.py
(3 hunks)components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py
(4 hunks)components/clp-py-utils/clp_py_utils/initialize-clp-metadata-db.py
(1 hunks)components/job-orchestration/job_orchestration/executor/compress/compression_task.py
(2 hunks)components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
(1 hunks)components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
(4 hunks)components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
(6 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (6)
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)
StorageEngine
(55-57)
components/clp-package-utils/clp_package_utils/scripts/native/decompress.py (1)
components/clp-py-utils/clp_py_utils/clp_config.py (3)
CLPConfig
(608-794)Database
(88-167)StorageEngine
(55-57)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)
components/clp-py-utils/clp_py_utils/clp_config.py (2)
StorageEngine
(55-57)StorageType
(60-62)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (1)
components/clp-py-utils/clp_py_utils/clp_config.py (3)
get_directory
(541-542)get_directory
(563-564)StorageType
(60-62)
components/clp-py-utils/clp_py_utils/initialize-clp-metadata-db.py (1)
components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py (1)
create_metadata_db_tables
(133-154)
components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py (1)
components/clp-py-utils/clp_py_utils/clp_config.py (2)
Database
(88-167)StorageEngine
(55-57)
🔇 Additional comments (26)
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (2)
19-19
: Added necessary imports for dataset-specific functionalityThe addition of
CLP_DEFAULT_DATASET_NAME
andStorageEngine
imports supports the new dataset-specific table prefixing logic.Also applies to: 35-35
869-870
: Correctly implements dataset-specific table prefixes for CLP_SThe conditional logic appropriately modifies the
table_prefix
by appending the default dataset name when using theCLP_S
storage engine. This ensures all table references in the Meteor settings are properly namespaced per the PR objectives.components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (2)
57-58
: Correctly implements dataset-specific archive pathsThe code now extracts the dataset from the search configuration and appends it to the archives directory path when constructing the search command. This ensures search operations use the correct dataset-specific archive location.
66-66
: Properly modifies S3 key prefix for dataset isolationThe S3 key prefix is modified to include the dataset name, ensuring physical isolation of archives in S3 storage. This change maintains consistency with the filesystem-based approach and aligns with the PR objectives for dataset isolation.
components/clp-py-utils/clp_py_utils/initialize-clp-metadata-db.py (1)
57-59
: Correctly implements mutually exclusive table creation logicThe conditional logic ensures that for the
CLP_S
storage engine, only thecreate_datasets_table
function is called, while for other storage engines, only thecreate_metadata_db_tables
function is called. This approach appropriately sets up the database structure based on the storage engine type.components/clp-package-utils/clp_package_utils/scripts/native/decompress.py (4)
13-13
: Added necessary imports for dataset-specific functionalityThe imports of
CLP_DEFAULT_DATASET_NAME
andStorageEngine
support the new dataset-specific table prefixing logic needed for decompress operations.Also applies to: 17-17
44-46
: Updated function signature to support dataset-specific tablesThe
get_orig_file_id
function signature now includesstorage_engine
anddataset
parameters, allowing it to work with dataset-specific table names. This change is necessary to support the dataset isolation features described in the PR objectives.
59-61
: Correctly implements dataset-specific table prefixesThe conditional logic properly modifies the
table_prefix
for theCLP_S
storage engine by appending the dataset name, ensuring queries target the correct dataset-specific tables.
139-144
: Updated function call with required dataset parametersThe call to
get_orig_file_id
has been updated to include the storage engine and the default dataset name, ensuring that file lookups work correctly with the updated function signature. This maintains compatibility with the dataset-aware architecture.components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py (3)
3-3
: Appropriate import added for Path object.The addition of the
Path
import frompathlib
is appropriate as it's used for type hinting in the newly addedinsert_new_datasets_table_entry
function.
99-99
: Corrected docstring to use plural form.Minor improvement in docstring accuracy, changing "dataset information table" to "datasets information table" to better reflect the table's purpose.
143-143
: Appropriate relocation of column metadata table creation.Moving the
_create_column_metadata_table
call inside the dataset conditional block ensures dataset-specific column metadata tables are only created when a dataset is specified, which aligns with the dataset isolation approach.components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2)
283-292
: Good implementation of dataset-specific handling for CLP_S storage engine.This addition properly modifies both the table prefix and archive paths based on the dataset name when using the CLP_S storage engine. The code follows a consistent pattern:
- For database operations: appends the dataset name to the table prefix
- For filesystem operations: places dataset archives in dataset-specific subdirectories
- For S3 operations: adds dataset name to the key prefix path
The implementation maintains proper isolation between datasets, which aligns with the PR objective.
396-396
: Correctly using dynamic dataset name for indexer.Replacing the hardcoded
CLP_DEFAULT_DATASET_NAME
with the dynamically determinedinput_dataset
ensures the indexer correctly associates archives with their specific datasets.components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py (7)
13-13
: Appropriate imports added to support dataset-aware functionality.The addition of
CLP_DEFAULT_DATASET_NAME
andStorageEngine
imports enables the archive manager to incorporate dataset-specific logic.Also applies to: 16-17
187-187
: Correctly fetching storage engine from configuration.Storage engine detection is properly implemented by extracting it from the configuration object.
198-200
: Consistently passing dataset parameters to archive functions.The code now correctly passes the storage engine and default dataset name to the archive management functions. This maintains consistency with the dataset-specific approach implemented throughout the codebase.
Also applies to: 210-212, 222-224
238-239
: Properly updated function signature to support dataset-aware logic.The
_find_archives
function signature now includesstorage_engine
anddataset
parameters, making the function capable of handling dataset-specific archives.
262-264
: Correct implementation of dataset-specific table prefix.The conditional modification of the table prefix when using the CLP_S storage engine follows the established pattern and ensures archives are queried from the correct dataset-specific tables.
305-306
: Properly updated function signature to support dataset-aware logic.The
_delete_archives
function signature now includesstorage_engine
anddataset
parameters, making the function capable of handling dataset-specific archive deletion.
330-332
: Correct implementation of dataset-specific table prefix for deletion.The conditional modification of the table prefix for deletion operations when using the CLP_S storage engine ensures that archives are deleted from the correct dataset-specific tables.
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (5)
40-40
: Appropriate import added for storage engine enum.The addition of
StorageEngine
import enables dataset-aware functionality in the query scheduler.
618-618
: Properly updated function signature to support dataset-aware logic.The
handle_pending_query_jobs
function signature now includes theclp_storage_engine
parameter, making the function capable of handling dataset-specific queries.
638-644
: Good implementation of dataset-specific table prefix logic.The code correctly extracts the dataset from the search configuration and modifies the table prefix to include the dataset name when using the CLP_S storage engine. This ensures that queries target the correct dataset-specific tables.
1057-1057
: Correctly propagating storage engine through the function call chain.The
clp_storage_engine
parameter is properly propagated through the function call chain, ensuring dataset-aware behavior is consistently applied throughout the query processing workflow.Also applies to: 1072-1072
1157-1157
: Properly passing storage engine from configuration.The code correctly passes the storage engine from the configuration to the job handling function, ensuring dataset-aware behavior is implemented based on the system's configuration.
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Outdated
Show resolved
Hide resolved
components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py
Show resolved
Hide resolved
components/clp-package-utils/clp_package_utils/scripts/native/decompress.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Outdated
Show resolved
Hide resolved
Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor suggestion.
For the PR title, how about:
feat(clp-json): Use dataset-specific tables and archive directories for compression, decompression, and search.
Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com>
CLP_S
.
Description
This PR updates the compression, decompressoin, and search workflows to support dataset-specific operations when using the
CLP_S
storage engine:clp_{dataset_name}_
, with the exception of the following:clp_datasets
: The central table where all datasets info are storedcompression_jobs
compression_tasks
query_jobs
query_tasks
fs
ands3
, and stream output directories are suffixed with the dataset name to ensure physical isolation.This PR succeeds:
clp-s
to compression task workers. #819dataset
fields to input configs of compression and search jobs. #839CLP_S
storage engine. #831table-name
todataset-name
for consistency; Update the name of the column metadata table name to put the dataset name first. #846table_prefix
from connection params and table suffix constants. #864and covers step no. 3 of the dataset feature implementation plan.
Checklist
breaking change.
Validation performed
For

clp-s
, the initial db table set:CMD with interface changes taking effect:
Summary by CodeRabbit
New Features
Bug Fixes
Chores