Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
285658e
add compression scheduler
wraymo Jan 18, 2024
90e5c7f
refactor native/compress.py a little bit
wraymo Jan 19, 2024
c8fb686
some progress
wraymo Jan 19, 2024
88ed8c4
some progress
wraymo Jan 22, 2024
4bbb03c
almost there
wraymo Jan 22, 2024
b786578
clean up
wraymo Jan 22, 2024
39a41a8
further clean up
wraymo Jan 22, 2024
f9e2bd3
fix several bugs
wraymo Jan 22, 2024
caf800a
fix several bugs
wraymo Jan 22, 2024
850fae1
fix several bugs
wraymo Jan 22, 2024
165edc4
fix several bugs
wraymo Jan 22, 2024
54b57c1
fix a bug
wraymo Jan 22, 2024
1b1d5a5
fix a bug
wraymo Jan 22, 2024
9d699a8
fix several bug
wraymo Jan 23, 2024
193d766
fix several bugs
wraymo Jan 23, 2024
756a921
fix a bug
wraymo Jan 23, 2024
cc35ebb
reset logging level to logging.INFO in native/compress.py
wraymo Jan 23, 2024
9d65b66
update failed tasks
wraymo Jan 23, 2024
94b7132
fix a bug
wraymo Jan 23, 2024
7c17a2e
fix a bug
wraymo Jan 23, 2024
276aebd
reformat several files
wraymo Jan 25, 2024
bbd8dcd
clean up and reformat
wraymo Jan 25, 2024
0157791
clean up and reformat
wraymo Jan 25, 2024
c4677fc
fix several bugs
wraymo Jan 25, 2024
ebc8b75
apply review suggestions
wraymo Jan 26, 2024
b7f9291
make json dump simpler
wraymo Jan 26, 2024
1897e33
reduce indentation in compression_scheduler
wraymo Jan 26, 2024
bdf2d79
refactor compression task status
wraymo Jan 26, 2024
a2808e9
refactor native/compress.py
wraymo Jan 26, 2024
a3274c8
use a variable to store clp_io_config
wraymo Jan 26, 2024
0dee35a
update native/compress.py
wraymo Jan 26, 2024
f1f3dbe
fix a bug in compression_scheduler
wraymo Jan 26, 2024
c84491f
update import
wraymo Jan 26, 2024
6c85dda
rename tasks
wraymo Jan 26, 2024
78e12d1
rename CompressionTaskUpdate
wraymo Jan 26, 2024
1a0c7df
refactor scheduler constants
wraymo Jan 26, 2024
dde4ffe
use fetchone() instead of fetchall()
wraymo Jan 26, 2024
e87a2c0
remove comments
wraymo Jan 26, 2024
cba554b
make timestamp fields in database tables consistent
wraymo Jan 26, 2024
50c628d
rename execution_path
wraymo Jan 26, 2024
f733d85
remove a comment in native/comrpess.py
wraymo Jan 26, 2024
92e443a
refactor compression handler
wraymo Jan 26, 2024
fa07fce
undo format changes to two files
wraymo Jan 26, 2024
5d590b4
return values
wraymo Jan 26, 2024
54051dc
move database insertion from PathsToCompressBuffer to compression sch…
wraymo Jan 26, 2024
329686c
bug fix
wraymo Jan 26, 2024
00528c3
bug fix
wraymo Jan 26, 2024
2cf1dee
bug fix
wraymo Jan 26, 2024
e93869e
bug fix
wraymo Jan 26, 2024
76c4c73
bug fix
wraymo Jan 26, 2024
5cefadf
bug fix
wraymo Jan 26, 2024
e8afe93
update docstring
wraymo Jan 26, 2024
c84e4d6
change fetchone back to fetchall
wraymo Jan 27, 2024
c9e7a02
add commit
wraymo Jan 27, 2024
a58f6be
bug fix
wraymo Jan 27, 2024
033ee32
improve readability
wraymo Jan 27, 2024
24404ba
apply comment suggestions
wraymo Jan 27, 2024
c4df646
fix start_clp
wraymo Jan 28, 2024
4e90cf5
make redis work for compression
wraymo Jan 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ directory:
for operating the CLP package.
* [clp-py-utils](components/clp-py-utils) contains Python utilities common to several of the
other components.
* [compression-job-handler](components/compression-job-handler) contains code to submit
compression jobs to a cluster.
* [core](components/core) contains code to compress uncompressed logs, decompress compressed
logs, and search compressed logs.
* [job-orchestration](components/job-orchestration) contains code to schedule compression jobs on
Expand Down
11 changes: 0 additions & 11 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ tasks:
- task: "clean-python-component"
vars:
COMPONENT: "clp-py-utils"
- task: "clean-python-component"
vars:
COMPONENT: "compression-job-handler"
- task: "clean-python-component"
vars:
COMPONENT: "job-orchestration"
Expand Down Expand Up @@ -61,7 +58,6 @@ tasks:
- "core"
- "clp-package-utils"
- "clp-py-utils"
- "compression-job-handler"
- "job-orchestration"
- "package-venv"
cmds:
Expand All @@ -74,7 +70,6 @@ tasks:
pip3 install --upgrade \
components/clp-package-utils/dist/*.whl \
components/clp-py-utils/dist/*.whl \
components/compression-job-handler/dist/*.whl \
components/job-orchestration/dist/*.whl \
-t "{{.PACKAGE_BUILD_DIR}}/lib/python3/site-packages"
- "mkdir -p '{{.PACKAGE_BUILD_DIR}}/bin'"
Expand All @@ -95,7 +90,6 @@ tasks:
- "{{.TASKFILE_DIR}}/Taskfile.yml"
- "components/clp-package-utils/dist/*.whl"
- "components/clp-py-utils/dist/*.whl"
- "components/compression-job-handler/dist/*.whl"
- "components/job-orchestration/dist/*.whl"
- "components/package-template/src/**/*"
status:
Expand Down Expand Up @@ -139,11 +133,6 @@ tasks:
vars:
COMPONENT: "{{.TASK}}"

compression-job-handler:
- task: "python-component"
vars:
COMPONENT: "{{.TASK}}"

job-orchestration:
- task: "python-component"
vars:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,31 @@
import pathlib
import shutil
import sys
import time
import uuid
from contextlib import closing

import msgpack
import zstandard as zstd

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
validate_and_load_config_file,
get_clp_home
)
from clp_py_utils.clp_config import COMPRESSION_JOBS_TABLE_NAME
from clp_py_utils.pretty_size import pretty_size
from clp_py_utils.sql_adapter import SQL_Adapter
from compression_job_handler.compression_job_handler import handle_jobs
from job_orchestration.job_config import (

from job_orchestration.scheduler.job_config import (
ClpIoConfig,
InputConfig,
OutputConfig
)
from job_orchestration.scheduler.constants import (
CompressionJobStatus,
CompressionJobCompletionStatus
)

# Setup logging
# Create logger
Expand All @@ -29,6 +40,91 @@
logger.addHandler(logging_console_handler)


def handle_job_update(db, db_cursor, job_id, no_progress_reporting):
if no_progress_reporting:
polling_query = f"SELECT status, status_msg FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}"
else:
polling_query = f"SELECT status, status_msg, uncompressed_size, compressed_size " \
f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}"

completion_query = f"SELECT duration, uncompressed_size, compressed_size " \
f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}"

job_last_uncompressed_size = 0
while True:
db_cursor.execute(polling_query)
results = db_cursor.fetchall()
db.commit()
if len(results) > 1:
logging.error("Duplicated job_id")
if len(results) == 0:
raise Exception(f"Job with id={job_id} not found in database")

job_row = results[0]
job_status = job_row['status']

if not no_progress_reporting:
job_uncompressed_size = job_row['uncompressed_size']
job_compressed_size = job_row['compressed_size']
if job_uncompressed_size > 0:
compression_ratio = float(job_uncompressed_size) / job_compressed_size
if job_last_uncompressed_size < job_uncompressed_size:
logger.info(
f'Compressed {pretty_size(job_uncompressed_size)} into '
f'{pretty_size(job_compressed_size)} ({compression_ratio:.2f})')
job_last_uncompressed_size = job_uncompressed_size

if CompressionJobStatus.SUCCEEDED == job_status:
# All tasks in the job is done
speed = 0
if not no_progress_reporting:
db_cursor.execute(completion_query)
job_row = db_cursor.fetchone()
if job_row['duration'] and job_row['duration'] > 0:
speed = job_row['uncompressed_size'] / job_row['duration']
logger.info(f"Compression finished. Runtime: {job_row['duration']}s. "
f"Speed: {pretty_size(speed)}/s.")
break # Done
if CompressionJobStatus.FAILED == job_status:
# One or more tasks in the job has failed
logger.error(f"Compression failed. {job_row['status_msg']}")
break # Done
if CompressionJobStatus.RUNNING == job_status or \
CompressionJobStatus.PENDING == job_status:
pass # Simply wait another iteration
else:
error_msg = f"Unhandled CompressionJobStatus: {job_status}"
raise NotImplementedError(error_msg)

time.sleep(0.5)


def handle_job(sql_adapter: SQL_Adapter, clp_io_config: ClpIoConfig, no_progress_reporting: bool):
zstd_cctx = zstd.ZstdCompressor(level=3)

with closing(sql_adapter.create_connection(True)) as db, \
closing(db.cursor(dictionary=True)) as db_cursor:
try:
compressed_clp_io_config = zstd_cctx.compress(
msgpack.packb(clp_io_config.dict(exclude_none=True, exclude_unset=True)))
db_cursor.execute(
f'INSERT INTO {COMPRESSION_JOBS_TABLE_NAME} (clp_config) VALUES (%s)',
(compressed_clp_io_config,)
)
db.commit()
job_id = db_cursor.lastrowid
logger.info(f"Compression job {job_id} submitted.")

handle_job_update(db, db_cursor, job_id, no_progress_reporting)
except Exception as ex:
logger.error(ex)
return CompressionJobCompletionStatus.FAILED

logger.debug(f'Finished job {job_id}')

return CompressionJobCompletionStatus.SUCCEEDED


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH
Expand Down Expand Up @@ -81,8 +177,6 @@ def main(argv):
log_list_path = pathlib.Path(parsed_args.input_list).resolve()
shutil.copy(log_list_path, comp_jobs_dir / log_list_path.name)

logger.info("Compression job submitted to compression-job-handler.")

mysql_adapter = SQL_Adapter(clp_config.database)
clp_input_config = InputConfig(list_path=str(log_list_path))
if parsed_args.remove_path_prefix:
Expand All @@ -92,13 +186,8 @@ def main(argv):
output=OutputConfig.parse_obj(clp_config.archive_output)
)

# Execute compression-job-handler.handle_jobs
logs_directory_abs = str(pathlib.Path(clp_config.logs_directory).resolve())
handle_jobs(sql_adapter=mysql_adapter, clp_io_config=clp_io_config, logs_dir_abs=logs_directory_abs,
fs_logs_required_parent_dir=pathlib.Path(clp_config.input_logs_directory),
no_progress_reporting=parsed_args.no_progress_reporting)

return 0
return handle_job(sql_adapter=mysql_adapter, clp_io_config=clp_io_config,
no_progress_reporting=parsed_args.no_progress_reporting)


if '__main__' == __name__:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
ResultsCache
)
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.job_config import SearchConfig
from job_orchestration.search_scheduler.common import JobStatus
from job_orchestration.scheduler.constants import SearchJobStatus
from job_orchestration.scheduler.job_config import SearchConfig

# Setup logging
# Create logger
Expand Down Expand Up @@ -95,7 +95,7 @@ def create_and_monitor_job_in_db(db_config: Database, results_cache: ResultsCach
# There will only ever be one row since it's impossible to have more than one job with the same ID
new_status = db_cursor.fetchall()[0]['status']
db_conn.commit()
if new_status in (JobStatus.SUCCESS, JobStatus.FAILED, JobStatus.CANCELLED):
if new_status in (SearchJobStatus.SUCCEEDED, SearchJobStatus.FAILED, SearchJobStatus.CANCELLED):
break

time.sleep(0.5)
Expand Down
Loading