Skip to content

Commit 58a60c6

Browse files
authored
feat(clp-package)!: Add query engine option to support starting only compression and UI components when using the Presto query engine. (#1095)
1 parent 759f3b0 commit 58a60c6

File tree

4 files changed

+141
-17
lines changed

4 files changed

+141
-17
lines changed

components/clp-package-utils/clp_package_utils/scripts/start_clp.py

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
COMPRESSION_WORKER_COMPONENT_NAME,
2323
CONTROLLER_TARGET_NAME,
2424
DB_COMPONENT_NAME,
25+
get_components_for_target,
2526
QUERY_JOBS_TABLE_NAME,
2627
QUERY_SCHEDULER_COMPONENT_NAME,
2728
QUERY_WORKER_COMPONENT_NAME,
@@ -880,6 +881,7 @@ def start_webui(
880881

881882
client_settings_json_updates = {
882883
"ClpStorageEngine": clp_config.package.storage_engine,
884+
"ClpQueryEngine": clp_config.package.query_engine,
883885
"MongoDbSearchResultsMetadataCollectionName": clp_config.webui.results_metadata_collection_name,
884886
"SqlDbClpArchivesTableName": archives_table_name,
885887
"SqlDbClpDatasetsTableName": get_datasets_table_name(table_prefix),
@@ -1110,6 +1112,15 @@ def main(argv):
11101112
config_file_path = pathlib.Path(parsed_args.config)
11111113
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
11121114

1115+
runnable_components = clp_config.get_runnable_components()
1116+
components_to_start = get_components_for_target(target)
1117+
components_to_start = components_to_start.intersection(runnable_components)
1118+
1119+
# Exit early if no components to start
1120+
if len(components_to_start) == 0:
1121+
logger.error(f"{target} not available with current configuration")
1122+
return -1
1123+
11131124
# Validate and load necessary credentials
11141125
if target in (
11151126
ALL_TARGET_NAME,
@@ -1184,35 +1195,46 @@ def main(argv):
11841195
conf_dir = clp_home / "etc"
11851196

11861197
# Start components
1187-
if target in (ALL_TARGET_NAME, DB_COMPONENT_NAME):
1198+
if DB_COMPONENT_NAME in components_to_start:
11881199
start_db(instance_id, clp_config, conf_dir)
1189-
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, DB_COMPONENT_NAME):
1200+
1201+
if (
1202+
target == CONTROLLER_TARGET_NAME and DB_COMPONENT_NAME in runnable_components
1203+
) or DB_COMPONENT_NAME in components_to_start:
11901204
create_db_tables(instance_id, clp_config, container_clp_config, mounts)
1191-
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, QUEUE_COMPONENT_NAME):
1205+
1206+
if QUEUE_COMPONENT_NAME in components_to_start:
11921207
start_queue(instance_id, clp_config)
1193-
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, REDIS_COMPONENT_NAME):
1208+
1209+
if REDIS_COMPONENT_NAME in components_to_start:
11941210
start_redis(instance_id, clp_config, conf_dir)
1195-
if target in (ALL_TARGET_NAME, RESULTS_CACHE_COMPONENT_NAME):
1211+
1212+
if RESULTS_CACHE_COMPONENT_NAME in components_to_start:
11961213
start_results_cache(instance_id, clp_config, conf_dir)
1197-
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, RESULTS_CACHE_COMPONENT_NAME):
1214+
1215+
if (
1216+
target == CONTROLLER_TARGET_NAME and RESULTS_CACHE_COMPONENT_NAME in runnable_components
1217+
) or RESULTS_CACHE_COMPONENT_NAME in components_to_start:
11981218
create_results_cache_indices(instance_id, clp_config, container_clp_config, mounts)
1199-
if target in (
1200-
ALL_TARGET_NAME,
1201-
CONTROLLER_TARGET_NAME,
1202-
COMPRESSION_SCHEDULER_COMPONENT_NAME,
1203-
):
1219+
1220+
if COMPRESSION_SCHEDULER_COMPONENT_NAME in components_to_start:
12041221
start_compression_scheduler(instance_id, clp_config, container_clp_config, mounts)
1205-
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, QUERY_SCHEDULER_COMPONENT_NAME):
1222+
1223+
if QUERY_SCHEDULER_COMPONENT_NAME in components_to_start:
12061224
start_query_scheduler(instance_id, clp_config, container_clp_config, mounts)
1207-
if target in (ALL_TARGET_NAME, COMPRESSION_WORKER_COMPONENT_NAME):
1225+
1226+
if COMPRESSION_WORKER_COMPONENT_NAME in components_to_start:
12081227
start_compression_worker(
12091228
instance_id, clp_config, container_clp_config, num_workers, mounts
12101229
)
1211-
if target in (ALL_TARGET_NAME, QUERY_WORKER_COMPONENT_NAME):
1230+
1231+
if QUERY_WORKER_COMPONENT_NAME in components_to_start:
12121232
start_query_worker(instance_id, clp_config, container_clp_config, num_workers, mounts)
1213-
if target in (ALL_TARGET_NAME, REDUCER_COMPONENT_NAME):
1233+
1234+
if REDUCER_COMPONENT_NAME in components_to_start:
12141235
start_reducer(instance_id, clp_config, container_clp_config, num_workers, mounts)
1215-
if target in (ALL_TARGET_NAME, WEBUI_COMPONENT_NAME):
1236+
1237+
if WEBUI_COMPONENT_NAME in components_to_start:
12161238
start_webui(instance_id, clp_config, container_clp_config, mounts)
12171239

12181240
except Exception as ex:

components/clp-py-utils/clp_py_utils/clp_config.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import pathlib
22
from enum import auto
3-
from typing import Literal, Optional, Union
3+
from typing import Literal, Optional, Set, Union
44

55
from dotenv import dotenv_values
66
from pydantic import BaseModel, PrivateAttr, root_validator, validator
@@ -27,10 +27,41 @@
2727
QUERY_WORKER_COMPONENT_NAME = "query_worker"
2828
WEBUI_COMPONENT_NAME = "webui"
2929

30+
# Component groups
31+
GENERAL_SCHEDULING_COMPONENTS = {
32+
QUEUE_COMPONENT_NAME,
33+
REDIS_COMPONENT_NAME,
34+
}
35+
COMPRESSION_COMPONENTS = GENERAL_SCHEDULING_COMPONENTS | {
36+
DB_COMPONENT_NAME,
37+
COMPRESSION_SCHEDULER_COMPONENT_NAME,
38+
COMPRESSION_WORKER_COMPONENT_NAME,
39+
}
40+
QUERY_COMPONENTS = GENERAL_SCHEDULING_COMPONENTS | {
41+
DB_COMPONENT_NAME,
42+
QUERY_SCHEDULER_COMPONENT_NAME,
43+
QUERY_WORKER_COMPONENT_NAME,
44+
REDUCER_COMPONENT_NAME,
45+
}
46+
UI_COMPONENTS = {
47+
RESULTS_CACHE_COMPONENT_NAME,
48+
WEBUI_COMPONENT_NAME,
49+
}
50+
ALL_COMPONENTS = COMPRESSION_COMPONENTS | QUERY_COMPONENTS | UI_COMPONENTS
51+
3052
# Target names
3153
ALL_TARGET_NAME = ""
3254
CONTROLLER_TARGET_NAME = "controller"
3355

56+
TARGET_TO_COMPONENTS = {
57+
ALL_TARGET_NAME: ALL_COMPONENTS,
58+
CONTROLLER_TARGET_NAME: GENERAL_SCHEDULING_COMPONENTS
59+
| {
60+
COMPRESSION_SCHEDULER_COMPONENT_NAME,
61+
QUERY_SCHEDULER_COMPONENT_NAME,
62+
},
63+
}
64+
3465
QUERY_JOBS_TABLE_NAME = "query_jobs"
3566
QUERY_TASKS_TABLE_NAME = "query_tasks"
3667
COMPRESSION_JOBS_TABLE_NAME = "compression_jobs"
@@ -49,6 +80,12 @@ class StorageEngine(KebabCaseStrEnum):
4980
CLP_S = auto()
5081

5182

83+
class QueryEngine(KebabCaseStrEnum):
84+
CLP = auto()
85+
CLP_S = auto()
86+
PRESTO = auto()
87+
88+
5289
class StorageType(LowercaseStrEnum):
5390
FS = auto()
5491
S3 = auto()
@@ -62,10 +99,12 @@ class AwsAuthType(LowercaseStrEnum):
6299

63100

64101
VALID_STORAGE_ENGINES = [storage_engine.value for storage_engine in StorageEngine]
102+
VALID_QUERY_ENGINES = [query_engine.value for query_engine in QueryEngine]
65103

66104

67105
class Package(BaseModel):
68106
storage_engine: str = "clp"
107+
query_engine: str = "clp"
69108

70109
@validator("storage_engine")
71110
def validate_storage_engine(cls, field):
@@ -76,6 +115,37 @@ def validate_storage_engine(cls, field):
76115
)
77116
return field
78117

118+
@validator("query_engine")
119+
def validate_query_engine(cls, field):
120+
if field not in VALID_QUERY_ENGINES:
121+
raise ValueError(
122+
f"package.query_engine must be one of the following"
123+
f" {'|'.join(VALID_QUERY_ENGINES)}"
124+
)
125+
return field
126+
127+
@root_validator
128+
def validate_query_engine_package_compatibility(cls, values):
129+
query_engine = values.get("query_engine")
130+
storage_engine = values.get("storage_engine")
131+
132+
if query_engine in [QueryEngine.CLP, QueryEngine.CLP_S]:
133+
if query_engine != storage_engine:
134+
raise ValueError(
135+
f"query_engine '{query_engine}' is only compatible with "
136+
f"storage_engine '{query_engine}'."
137+
)
138+
elif query_engine == QueryEngine.PRESTO:
139+
if storage_engine != StorageEngine.CLP_S:
140+
raise ValueError(
141+
f"query_engine '{QueryEngine.PRESTO}' is only compatible with "
142+
f"storage_engine '{StorageEngine.CLP_S}'."
143+
)
144+
else:
145+
raise ValueError(f"Unsupported query_engine '{query_engine}'.")
146+
147+
return values
148+
79149

80150
class Database(BaseModel):
81151
type: str = "mariadb"
@@ -757,6 +827,12 @@ def load_redis_credentials_from_file(self):
757827
f"Credentials file '{self.credentials_file_path}' does not contain key '{ex}'."
758828
)
759829

830+
def get_runnable_components(self) -> Set[str]:
831+
if QueryEngine.PRESTO == self.package.query_engine:
832+
return COMPRESSION_COMPONENTS | UI_COMPONENTS
833+
else:
834+
return ALL_COMPONENTS
835+
760836
def dump_to_primitive_dict(self):
761837
d = self.dict()
762838
d["logs_input"] = self.logs_input.dump_to_primitive_dict()
@@ -791,3 +867,12 @@ def dump_to_primitive_dict(self):
791867
d["stream_output"] = self.stream_output.dump_to_primitive_dict()
792868

793869
return d
870+
871+
872+
def get_components_for_target(target: str) -> Set[str]:
873+
if target in TARGET_TO_COMPONENTS:
874+
return TARGET_TO_COMPONENTS[target]
875+
elif target in ALL_COMPONENTS:
876+
return {target}
877+
else:
878+
return set()

components/package-template/src/etc/clp-config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#
1313
#package:
1414
# storage_engine: "clp"
15+
# query_engine: "clp"
1516
#
1617
#database:
1718
# type: "mariadb" # "mariadb" or "mysql"

taskfile.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ tasks:
470470
# NOTE: The trailing slash after the source is necessary so that rsync copies
471471
# `/parents/A` -> `/parents/B` rather than `/parents/A` -> `/parents/B/A`
472472
- "rsync --archive '{{.G_PACKAGE_BUILD_DIR}}/' '{{.OUTPUT_DIR}}'"
473+
473474
# Set the storage engine for the package
474475
- task: "utils:misc:replace-text"
475476
vars:
@@ -481,6 +482,21 @@ tasks:
481482
FILE_PATH: "{{.OUTPUT_DIR}}/etc/clp-config.yml"
482483
SED_EXP: >-
483484
s/(\#[[:space:]]*storage_engine: ")[^"]+"/\1{{.STORAGE_ENGINE}}"/
485+
486+
# Set the query engine for the package
487+
# NOTE: We set `query_engine` to `STORAGE_ENGINE` intentionally since compatible CLP storage
488+
# and query engines have the same name.
489+
- task: "utils:misc:replace-text"
490+
vars:
491+
FILE_PATH: "{{.OUTPUT_DIR}}/lib/python3/site-packages/clp_py_utils/clp_config.py"
492+
SED_EXP: >-
493+
s/([[:space:]]*query_engine: str = ")[^"]+"/\1{{.STORAGE_ENGINE}}"/
494+
- task: "utils:misc:replace-text"
495+
vars:
496+
FILE_PATH: "{{.OUTPUT_DIR}}/etc/clp-config.yml"
497+
SED_EXP: >-
498+
s/(\#[[:space:]]*query_engine: ")[^"]+"/\1{{.STORAGE_ENGINE}}"/
499+
484500
- >-
485501
tar czf '{{.OUTPUT_FILE}}'
486502
--directory '{{.G_BUILD_DIR}}'

0 commit comments

Comments
 (0)