-
Notifications
You must be signed in to change notification settings - Fork 82
feat(package): Mark running jobs as failed when schedulers start. #1208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughRenames QueueName → SchedulerType, adds KILLED to multiple job/task enums, updates imports and Celery routing to use SchedulerType, and adds kill_hanging_jobs(sql_adapter, scheduler_type) called at compression and query scheduler startup to mark RUNNING jobs/tasks as KILLED; schedulers abort on cleanup failure. Changes
Sequence Diagram(s)sequenceDiagram
participant Scheduler as Compression / Query Scheduler
participant SQLA as SQL_Adapter
participant Utils as kill_hanging_jobs
participant DB as Database
Scheduler->>SQLA: create adapter
Scheduler->>Utils: kill_hanging_jobs(SQLA, SchedulerType)
Utils->>DB: SELECT job IDs WHERE status = RUNNING (by scheduler type)
DB-->>Utils: job IDs or none
alt jobs found
Utils->>DB: UPDATE tasks SET status=KILLED, duration=0 WHERE status=RUNNING AND job_id IN (...)
Utils->>DB: UPDATE jobs SET status=KILLED, duration=0 [, update_time=CURRENT_TIMESTAMP() for compression]
Utils-->>Scheduler: list of killed job IDs
else no jobs
Utils-->>Scheduler: None
end
alt cleanup failed
Scheduler-->>Scheduler: log error and exit (-1)
else
Scheduler-->>Scheduler: log count and continue startup
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. 📜 Recent review detailsConfiguration used: CodeRabbit UI 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (3)
🧰 Additional context used🧠 Learnings (2)📚 Learning: 2025-08-08T06:59:42.436Z
Applied to files:
📚 Learning: 2025-01-16T16:58:43.190Z
Applied to files:
🧬 Code graph analysis (3)components/job-orchestration/job_orchestration/scheduler/utils.py (2)
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)
🔇 Additional comments (6)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
if SchedulerType.COMPRESSION == scheduler_type: | ||
field_set_expressions.append("update_time = CURRENT_TIMESTAMP()") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kirkrodrigues This update_time has make things more complicated.
In the long term, it might be worthwhile adding the update time for query_job as well though.
if SchedulerType.COMPRESSION == scheduler_type: | ||
jobs_table_name = COMPRESSION_JOBS_TABLE_NAME | ||
job_status_running = CompressionJobStatus.RUNNING | ||
job_status_failed = CompressionJobStatus.FAILED | ||
tasks_table_name = COMPRESSION_TASKS_TABLE_NAME | ||
task_status_running = CompressionTaskStatus.RUNNING | ||
task_status_failed = CompressionTaskStatus.FAILED | ||
elif SchedulerType.QUERY == scheduler_type: | ||
jobs_table_name = QUERY_JOBS_TABLE_NAME | ||
job_status_running = QueryJobStatus.RUNNING | ||
job_status_failed = QueryJobStatus.FAILED | ||
tasks_table_name = QUERY_TASKS_TABLE_NAME | ||
task_status_running = QueryTaskStatus.RUNNING | ||
task_status_failed = QueryTaskStatus.FAILED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks pretty ugly. I have discussed with Zhihao and we both agree that having a class managing the Tablename, job/task status might be the right way to go but it would require more refactoring.
For now, I still feel this is better than asking user to passin all 6 arguments by themselves.
job_id_placeholders_str = ",".join(["%s"] * len(hanging_job_ids)) | ||
db_cursor.execute( | ||
f""" | ||
UPDATE {tasks_table_name} | ||
SET status={task_status_failed}, duration=0 | ||
WHERE status={task_status_running} | ||
AND job_id IN ({job_id_placeholders_str}) | ||
""", | ||
hanging_job_ids, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we don't even update task status from pending to running, so this either won't do anything to task table.
or we hack it by setting task_status_running = QueryTaskStatus.PENDING
.
Depends on when we will switch to spider, we can either fix the task status reporting properly, or live with it.
Depending on whether Junhao can add a new Killed state handle to the webui or not, this PR still needs to be updated slightly. Nevertheless, it should be good for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🔭 Outside diff range comments (1)
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (1)
614-629
: Use the correct logging level per scheduler (compression vs. query).Per prior learning, generic_start_scheduler always uses the query scheduler’s logging level. Use the compression scheduler’s level when starting it.
Apply:
@@ - necessary_env_vars = [ + # Select appropriate logging level based on scheduler type + if component_name == COMPRESSION_SCHEDULER_COMPONENT_NAME: + scheduler_logging_level = clp_config.compression_scheduler.logging_level + else: + scheduler_logging_level = clp_config.query_scheduler.logging_level + + necessary_env_vars = [ f"PYTHONPATH={clp_site_packages_dir}", @@ - f"CLP_LOGS_DIR={container_logs_dir}", - f"CLP_LOGGING_LEVEL={clp_config.query_scheduler.logging_level}", + f"CLP_LOGS_DIR={container_logs_dir}", + f"CLP_LOGGING_LEVEL={scheduler_logging_level}", ]
♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/scheduler/utils.py (1)
55-64
: Query tasks may never be marked RUNNING; also fail PENDING tasks for those jobsPer known issue, many QueryTask rows remain PENDING even though the job is RUNNING, so the current filter misses them. Update both RUNNING and PENDING tasks to FAILED for the affected job IDs.
- job_id_placeholders_str = ",".join(["%s"] * len(hanging_job_ids)) - db_cursor.execute( - f""" - UPDATE {tasks_table_name} - SET status={task_status_failed}, duration=0 - WHERE status={task_status_running} - AND job_id IN ({job_id_placeholders_str}) - """, - hanging_job_ids, - ) + job_id_placeholders_str = ",".join(["%s"] * len(hanging_job_ids)) + # For QUERY, some implementations never flip tasks from PENDING to RUNNING. + # Fail both RUNNING and PENDING tasks under the affected jobs. + if SchedulerType.QUERY == scheduler_type: + task_statuses_to_fail = (task_status_running, QueryTaskStatus.PENDING) + else: + task_statuses_to_fail = (task_status_running,) + + status_placeholders_str = ",".join(["%s"] * len(task_statuses_to_fail)) + task_update_params = [int(task_status_failed)] + [int(s) for s in task_statuses_to_fail] + hanging_job_ids + + db_cursor.execute( + f""" + UPDATE {tasks_table_name} + SET status=%s, duration=0 + WHERE status IN ({status_placeholders_str}) + AND job_id IN ({job_id_placeholders_str}) + """, + task_update_params, + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (7)
components/clp-package-utils/clp_package_utils/scripts/start_clp.py
(3 hunks)components/job-orchestration/job_orchestration/executor/compress/celeryconfig.py
(2 hunks)components/job-orchestration/job_orchestration/executor/query/celeryconfig.py
(1 hunks)components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
(3 hunks)components/job-orchestration/job_orchestration/scheduler/constants.py
(1 hunks)components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
(3 hunks)components/job-orchestration/job_orchestration/scheduler/utils.py
(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.
Applied to files:
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
PR: y-scope/clp#1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.
Applied to files:
components/clp-package-utils/clp_package_utils/scripts/start_clp.py
🧬 Code Graph Analysis (5)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)
components/job-orchestration/job_orchestration/scheduler/constants.py (3)
CompressionJobStatus
(27-31)CompressionTaskStatus
(39-43)SchedulerType
(9-11)components/job-orchestration/job_orchestration/scheduler/utils.py (1)
kill_hanging_jobs
(22-81)
components/job-orchestration/job_orchestration/executor/compress/celeryconfig.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
SchedulerType
(9-11)
components/job-orchestration/job_orchestration/scheduler/utils.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (5)
CompressionJobStatus
(27-31)CompressionTaskStatus
(39-43)QueryJobStatus
(48-58)QueryTaskStatus
(61-70)SchedulerType
(9-11)
components/job-orchestration/job_orchestration/executor/query/celeryconfig.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
SchedulerType
(9-11)
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
SchedulerType
(9-11)
🔇 Additional comments (8)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
9-12
: Rename and usage of SchedulerType look good.Clear constant container, no behavioural change; downstream references align with queue names.
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (1)
44-44
: No stale QueueName references found
I ran a repo-wide search for “QueueName” (both precise and broad patterns) and confirmed there are no remaining imports or usages. The rename toSchedulerType
is safe to merge.components/job-orchestration/job_orchestration/executor/query/celeryconfig.py (1)
3-13
: Routing updated to SchedulerType. LGTM.Queue targeting remains “query”, consistent with worker -Q and start scripts.
components/job-orchestration/job_orchestration/executor/compress/celeryconfig.py (1)
3-16
: Routing updated to SchedulerType. LGTM.Compression task correctly maps to the “compression” queue; priority settings unchanged.
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (2)
52-57
: Consolidated imports + SchedulerType use are consistent.Brings statuses and SchedulerType into scope cleanly.
78-78
: Startup cleanup dependency import is appropriate.Importing kill_hanging_jobs where it’s used keeps intent clear.
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)
34-38
: Imports update looks goodBringing in SchedulerType alongside the status enums is consistent with the new utils API.
49-49
: Proactive startup cleanup import is appropriateImporting kill_hanging_jobs here is the right spot, keeping startup concerns localized to the scheduler entrypoint.
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Show resolved
Hide resolved
components/job-orchestration/job_orchestration/scheduler/utils.py
Outdated
Show resolved
Hide resolved
jobs_update_config = {"status": job_status_failed, "duration": 0} | ||
field_set_expressions = [f"{k} = %s" for k in jobs_update_config.keys()] | ||
if SchedulerType.COMPRESSION == scheduler_type: | ||
field_set_expressions.append("update_time = CURRENT_TIMESTAMP()") | ||
|
||
values = list(jobs_update_config.values()) + hanging_job_ids | ||
db_cursor.execute( | ||
f""" | ||
UPDATE {jobs_table_name} | ||
SET {", ".join(field_set_expressions)} | ||
WHERE id in ({job_id_placeholders_str}) | ||
""", | ||
values, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
Coerce job status enum to int before parameterizing
Be explicit about integer values passed to the DB and keep the parameterization consistent.
- jobs_update_config = {"status": job_status_failed, "duration": 0}
+ jobs_update_config = {"status": int(job_status_failed), "duration": 0}
field_set_expressions = [f"{k} = %s" for k in jobs_update_config.keys()]
if SchedulerType.COMPRESSION == scheduler_type:
field_set_expressions.append("update_time = CURRENT_TIMESTAMP()")
- values = list(jobs_update_config.values()) + hanging_job_ids
+ values = list(jobs_update_config.values()) + hanging_job_ids
db_cursor.execute(
f"""
UPDATE {jobs_table_name}
SET {", ".join(field_set_expressions)}
WHERE id in ({job_id_placeholders_str})
""",
values,
)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
jobs_update_config = {"status": job_status_failed, "duration": 0} | |
field_set_expressions = [f"{k} = %s" for k in jobs_update_config.keys()] | |
if SchedulerType.COMPRESSION == scheduler_type: | |
field_set_expressions.append("update_time = CURRENT_TIMESTAMP()") | |
values = list(jobs_update_config.values()) + hanging_job_ids | |
db_cursor.execute( | |
f""" | |
UPDATE {jobs_table_name} | |
SET {", ".join(field_set_expressions)} | |
WHERE id in ({job_id_placeholders_str}) | |
""", | |
values, | |
) | |
jobs_update_config = {"status": int(job_status_failed), "duration": 0} | |
field_set_expressions = [f"{k} = %s" for k in jobs_update_config.keys()] | |
if SchedulerType.COMPRESSION == scheduler_type: | |
field_set_expressions.append("update_time = CURRENT_TIMESTAMP()") | |
values = list(jobs_update_config.values()) + hanging_job_ids | |
db_cursor.execute( | |
f""" | |
UPDATE {jobs_table_name} | |
SET {", ".join(field_set_expressions)} | |
WHERE id in ({job_id_placeholders_str}) | |
""", | |
values, | |
) |
🤖 Prompt for AI Agents
In components/job-orchestration/job_orchestration/scheduler/utils.py around
lines 66 to 79, the job status enum is being passed directly into the
parameterized query; coerce the enum to an integer before creating
jobs_update_config so the DB receives an int (e.g. use int(job_status_failed))
and then build field_set_expressions and values from that config as before to
keep parameter ordering consistent; ensure the cast happens before values =
list(...) so the parameterized values list contains the integer status, not the
enum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (5)
components/job-orchestration/job_orchestration/scheduler/utils.py (4)
30-36
: Query tasks likely never reach RUNNING — consider temporary broadened matchPer your note, QueryTask may remain PENDING; restricting to RUNNING will skip killing their tasks. Two short-term options until proper task status reporting exists:
- For Query only, set task_status_running = QueryTaskStatus.PENDING
- Or update with status IN (PENDING, RUNNING) for Query
If you prefer the first approach, minimal change:
elif SchedulerType.QUERY == scheduler_type: jobs_table_name = QUERY_JOBS_TABLE_NAME job_status_running = QueryJobStatus.RUNNING job_status_killed = QueryJobStatus.KILLED tasks_table_name = QUERY_TASKS_TABLE_NAME - task_status_running = QueryTaskStatus.RUNNING + task_status_running = QueryTaskStatus.PENDING # TODO: switch back to RUNNING once reporting is fixed task_status_killed = QueryTaskStatus.KILLEDWould you like me to instead implement a Query-only IN (PENDING, RUNNING) update for tasks? I can draft that variant as well.
22-22
: Fix return type: job IDs are numericThe function returns DB IDs (ints), not strings.
Apply this diff:
-def kill_hanging_jobs(sql_adapter: SQL_Adapter, scheduler_type: str) -> Optional[List[str]]: +def kill_hanging_jobs(sql_adapter: SQL_Adapter, scheduler_type: str) -> Optional[List[int]]:
40-42
: Use generic adapter connection and disable localhost socketAlign with other schedulers and avoid socket resolution issues in containers.
Apply this diff:
- with closing(sql_adapter.create_mysql_connection()) as db_conn, closing( + with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor:
43-51
: Parameterize SELECT and coerce IntEnum to intAvoid relying on IntEnum stringification and ensure robust parameter binding.
Apply this diff:
- db_cursor.execute( - f""" - SELECT id - FROM {jobs_table_name} - WHERE status={job_status_running} - """ - ) + db_cursor.execute( + f""" + SELECT id + FROM {jobs_table_name} + WHERE status=%s + """, + (int(job_status_running),), + ) hanging_job_ids = [row["id"] for row in db_cursor.fetchall()]components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)
427-434
: Fix logger message: wrong scheduler in exception textThis is the compression scheduler, but the message says “query”.
Apply this diff:
try: killed_jobs = kill_hanging_jobs(sql_adapter, SchedulerType.COMPRESSION) if killed_jobs is not None: logger.info(f"Killed {len(killed_jobs)} hanging compression jobs.") except Exception: - logger.exception("Failed to kill hanging query jobs.") + logger.exception("Failed to kill hanging compression jobs.") return -1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
(3 hunks)components/job-orchestration/job_orchestration/scheduler/constants.py
(5 hunks)components/job-orchestration/job_orchestration/scheduler/utils.py
(1 hunks)components/webui/client/src/pages/IngestPage/Jobs/typings.tsx
(2 hunks)components/webui/server/src/typings/query.ts
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}
⚙️ CodeRabbit Configuration File
- Prefer
false == <expression>
rather than!<expression>
.
Files:
components/webui/server/src/typings/query.ts
components/webui/client/src/pages/IngestPage/Jobs/typings.tsx
🧠 Learnings (2)
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
PR: y-scope/clp#1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.
Applied to files:
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.
Applied to files:
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
🧬 Code Graph Analysis (2)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)
components/job-orchestration/job_orchestration/scheduler/constants.py (3)
CompressionJobStatus
(27-32)CompressionTaskStatus
(40-45)SchedulerType
(9-11)components/job-orchestration/job_orchestration/scheduler/utils.py (1)
kill_hanging_jobs
(22-81)
components/job-orchestration/job_orchestration/scheduler/utils.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (5)
CompressionJobStatus
(27-32)CompressionTaskStatus
(40-45)QueryJobStatus
(50-61)QueryTaskStatus
(64-74)SchedulerType
(9-11)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (10)
components/webui/server/src/typings/query.ts (1)
27-35
: Add KILLED status for queries — alignment looks correctKILLED is appended after CANCELLED, preserving enum stability and it’s correctly excluded from QUERY_JOB_STATUS_WAITING_STATES. No issues spotted.
components/webui/client/src/pages/IngestPage/Jobs/typings.tsx (1)
71-76
: KILLED badge rendering is clearUsing a black badge for KILLED reads distinctly and is consistent with the new status semantics. No change needed.
components/job-orchestration/job_orchestration/scheduler/constants.py (5)
9-12
: SchedulerType rename looks goodReplacing QueueName with SchedulerType reads clearer and keeps the same literal values; downstream imports should be straightforward.
26-33
: CompressionJobStatus: appended KILLED preserves wire compatibilityKILLED is added at the end, keeping prior numeric values stable. Good.
40-46
: CompressionTaskStatus: appended KILLED preserves wire compatibilitySame positive note as for job status — appended, not inserted.
50-58
: QueryJobStatus: appended KILLED after CANCELLED — OKOrder and helper methods remain intact. Matches UI/server typings changes.
64-71
: QueryTaskStatus: appended KILLED after CANCELLED — OKConsistent with job status and broader PR intent.
components/job-orchestration/job_orchestration/scheduler/utils.py (2)
66-79
: Good: cast job status to int and parameterize jobs updatejobs_update_config correctly casts status to int and the UPDATE uses bound parameters. Nice.
22-81
: QueryTaskStatus.RUNNING transitions verified – no cleanup expansion neededWe found multiple code paths that set query tasks to RUNNING, so the additional PENDING-hack or IN-clause isn’t required:
• components/job-orchestration/job_orchestration/executor/query/utils.py (line 55):
task_status = QueryTaskStatus.RUNNING
passed toupdate_query_task_metadata
• components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (lines 359–364):
QueryTaskStatus.RUNNING
used in the SQL update for QUERY_TASKS_TABLE_NAME
• components/job-orchestration/job_orchestration/scheduler/utils.py (line 35):
task_status_running = QueryTaskStatus.RUNNING
inkill_hanging_jobs
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)
34-38
: Imports updated for SchedulerType and kill_hanging_jobs — OKImporting SchedulerType and pulling in kill_hanging_jobs aligns with the new startup cleanup. Looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're missing two changes:
--- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py
+++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py
@@ -91,6 +91,9 @@ def handle_job_update(db, db_cursor, job_id, no_progress_reporting):
# One or more tasks in the job has failed
logger.error(f"Compression failed. {job_row['status_msg']}")
break # Done
+ if CompressionJobStatus.KILLED == job_status:
+ logger.error(f"Compression killed. {job_row['status_msg']}")
+ break
if CompressionJobStatus.RUNNING == job_status:
if not no_progress_reporting:
--- a/components/clp-package-utils/clp_package_utils/scripts/native/utils.py
+++ b/components/clp-package-utils/clp_package_utils/scripts/native/utils.py
@@ -111,6 +111,7 @@ def wait_for_query_job(sql_adapter: SQL_Adapter, job_id: int) -> QueryJobStatus:
if new_status in (
QueryJobStatus.SUCCEEDED,
QueryJobStatus.FAILED,
QueryJobStatus.CANCELLED,
+ QueryJobStatus.KILLED,
):
return new_status
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (7)
components/clp-package-utils/clp_package_utils/scripts/native/compress.py (5)
131-137
:handle_job
always returns SUCCEEDED; map the actual terminal job statusEven when the job is FAILED/KILLED, this function returns
CompressionJobCompletionStatus.SUCCEEDED
, which is incorrect for CLI exit codes and automation.Refactor to capture the terminal status returned by
handle_job_update
and map it:- handle_job_update(db, db_cursor, job_id, no_progress_reporting) + terminal_status = handle_job_update(db, db_cursor, job_id, no_progress_reporting) except Exception as ex: logger.error(ex) return CompressionJobCompletionStatus.FAILED logger.debug(f"Finished job {job_id}") - return CompressionJobCompletionStatus.SUCCEEDED + if terminal_status == CompressionJobStatus.SUCCEEDED: + return CompressionJobCompletionStatus.SUCCEEDED + # If there's no explicit KILLED completion-status enum, treat as FAILED for now. + return CompressionJobCompletionStatus.FAILEDNote: This assumes you also change the SUCCEEDED/FAILED branches in
handle_job_update
toreturn CompressionJobStatus.SUCCEEDED/FAILED
instead ofbreak
. IfCompressionJobCompletionStatus
now also includesKILLED
, return that specifically rather than folding into FAILED.
61-68
: Parameterize SQL and avoid committing after SELECT to reduce risk and overhead
- The polling queries interpolate
job_id
into SQL. Whilejob_id
is an int under our control, parameterizing is safer and consistent with the rest of the codebase.- Calling
db.commit()
after a SELECT is unnecessary and adds overhead under autocommit.Proposed diff:
- polling_query = ( - f"SELECT status, status_msg FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}" - ) + polling_query = ( + f"SELECT status, status_msg FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id=%s" + ) ... - polling_query = ( - f"SELECT start_time, status, status_msg, uncompressed_size, compressed_size, duration " - f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}" - ) + polling_query = ( + f"SELECT start_time, status, status_msg, uncompressed_size, compressed_size, duration " + f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id=%s" + ) ... - db_cursor.execute(polling_query) - results = db_cursor.fetchall() - db.commit() + db_cursor.execute(polling_query, (job_id,)) + results = db_cursor.fetchall()(Apply the same parameter tuple for both polling-query shapes.)
If you’d like, I can search the repo and convert similar f-strings to parameterized queries in a follow-up.
Also applies to: 73-76, 99-107
41-56
: Guard against division-by-zero and invalid durations in progress logging
compression_ratio = float(job_uncompressed_size) / job_compressed_size
will raise ifcompressed_size
is 0. Similarly, computing speed withduration
can divide by 0 if duration is not yet populated.Consider:
- Only compute ratio when
job_compressed_size > 0
.- Only compute speed on SUCCEEDED when
job_row["duration"] > 0
, otherwise fall back to elapsed wall-clock as you already do.Example refactor:
- compression_ratio = float(job_uncompressed_size) / job_compressed_size + compression_ratio = ( + float(job_uncompressed_size) / job_compressed_size + if job_compressed_size > 0 + else None + ) ... - logger.info( - f"Compressed {pretty_size(job_uncompressed_size)} into " - f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}x). " - f"Speed: {pretty_size(compression_speed)}/s." - ) + ratio_str = f" ({compression_ratio:.2f}x)" if compression_ratio else "" + logger.info( + f"Compressed {pretty_size(job_uncompressed_size)} into " + f"{pretty_size(job_compressed_size)}{ratio_str}. " + f"Speed: {pretty_size(compression_speed)}/s." + )This avoids noisy tracebacks during early RUNNING updates.
38-38
: Use module name for logger to preserve hierarchical logging
logging.getLogger(__file__)
bakes in a file path. Prefer__name__
to participate in hierarchical logger config.-logger = logging.getLogger(__file__) +logger = logging.getLogger(__name__)
1-261
: Apply Black formatting to compress.pyCI reports that
compress.py
would be reformatted. Please run Black with the project’s style settings and commit the changes to resolve the lint failure:black --line-length 100 components/clp-package-utils/clp_package_utils/scripts/native/compress.pycomponents/clp-package-utils/clp_package_utils/scripts/native/utils.py (2)
105-111
: Parameterize the SELECT and avoid commit after SELECTMinor hardening and perf nit:
- Replace string interpolation with a parameter placeholder to future-proof this path.
- Remove
db_conn.commit()
after a read-only query.- db_cursor.execute( - f"SELECT `status` FROM `{QUERY_JOBS_TABLE_NAME}` WHERE `id` = {job_id}" - ) + db_cursor.execute( + f"SELECT `status` FROM `{QUERY_JOBS_TABLE_NAME}` WHERE `id` = %s", + (job_id,), + ) ... - db_conn.commit() + # No commit needed after SELECT
111-118
: Optional: Document return semantics now that KILLED is terminalConsider updating the docstring of
wait_for_query_job
to explicitly state that it can return SUCCEEDED/FAILED/CANCELLED/KILLED, since callers might gate user messaging or exit codes based on this.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
components/clp-package-utils/clp_package_utils/scripts/native/compress.py
(1 hunks)components/clp-package-utils/clp_package_utils/scripts/native/utils.py
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
components/clp-package-utils/clp_package_utils/scripts/native/utils.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
QueryJobStatus
(50-61)
components/clp-package-utils/clp_package_utils/scripts/native/compress.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
CompressionJobStatus
(27-32)
🪛 GitHub Actions: clp-lint
components/clp-package-utils/clp_package_utils/scripts/native/compress.py
[error] 1-1: lint:check-no-cpp failed: Black formatting would reformat compress.py. Run 'black' to format the file.
🔇 Additional comments (1)
components/clp-package-utils/clp_package_utils/scripts/native/utils.py (1)
115-116
: Treat KILLED as terminal in query wait loop — good changeAdding
QueryJobStatus.KILLED
ensures the waiter returns promptly on scheduler cleanup, keeping CLI behaviour consistent with newly introduced terminal state semantics.
if CompressionJobStatus.KILLED == job_status: | ||
# The job is killed | ||
logger.error(f"Compression killed. {job_row['status_msg']}") | ||
break # Done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return a terminal status instead of breaking; otherwise callers will report success for killed jobs
Right now, this branch breaks the loop and handle_job
later returns CompressionJobCompletionStatus.SUCCEEDED
unconditionally, which will misreport killed jobs as succeeded. Return a terminal status from handle_job_update
and let the caller map it to a completion status.
Apply this minimal change here (and make analogous changes in the SUCCEEDED/FAILED branches) so the status is propagated:
- if CompressionJobStatus.KILLED == job_status:
- # The job is killed
- logger.error(f"Compression killed. {job_row['status_msg']}")
- break # Done
+ if CompressionJobStatus.KILLED == job_status:
+ # The job is killed
+ logger.error(f"Compression killed. {job_row['status_msg']}")
+ return CompressionJobStatus.KILLED
Follow-up: Update handle_job_update
to return a CompressionJobStatus
for all terminal states and update handle_job
to map that to a CompressionJobCompletionStatus
(see separate comment below).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if CompressionJobStatus.KILLED == job_status: | |
# The job is killed | |
logger.error(f"Compression killed. {job_row['status_msg']}") | |
break # Done | |
if CompressionJobStatus.KILLED == job_status: | |
# The job is killed | |
logger.error(f"Compression killed. {job_row['status_msg']}") | |
return CompressionJobStatus.KILLED |
🤖 Prompt for AI Agents
In components/clp-package-utils/clp_package_utils/scripts/native/compress.py
around lines 94 to 97, the branch for CompressionJobStatus.KILLED currently
breaks the loop which causes handle_job to later report success; change this to
return the terminal CompressionJobStatus (e.g., return
CompressionJobStatus.KILLED) instead of break, and make the analogous change in
the SUCCEEDED and FAILED branches so handle_job_update always returns a terminal
CompressionJobStatus for terminal states; also update the caller handle_job to
accept that return value and map returned CompressionJobStatus ->
CompressionJobCompletionStatus (SUCCEEDED/FAILED/KILLED) before exiting so the
final completion status is propagated correctly.
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Show resolved
Hide resolved
…ess/compression_scheduler.py
components/clp-package-utils/clp_package_utils/scripts/start_clp.py
Outdated
Show resolved
Hide resolved
components/clp-package-utils/clp_package_utils/scripts/start_clp.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)
429-436
: Wrong subsystem in log messages: use “compression”, not “query”Both the info and exception messages reference query jobs in the compression scheduler context.
Apply this diff:
- if killed_jobs is not None: - logger.info(f"Killed {len(killed_jobs)} hanging query jobs.") + if killed_jobs is not None: + logger.info(f"Killed {len(killed_jobs)} hanging compression jobs.") @@ - except Exception: - logger.exception("Failed to kill hanging query jobs.") + except Exception: + logger.exception("Failed to kill hanging compression jobs.")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
(3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
PR: y-scope/clp#1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.
Applied to files:
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.
Applied to files:
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
🧬 Code Graph Analysis (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)
components/job-orchestration/job_orchestration/scheduler/constants.py (3)
CompressionJobStatus
(27-32)CompressionTaskStatus
(40-45)SchedulerType
(9-11)components/job-orchestration/job_orchestration/scheduler/utils.py (1)
kill_hanging_jobs
(22-81)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: antlr-code-committed (macos-15)
🔇 Additional comments (2)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)
35-39
: Imports aligned with new enums — LGTMBringing in SchedulerType alongside CompressionJobStatus/CompressionTaskStatus is consistent with the new startup cleanup flow. No issues spotted.
50-50
: Startup cleanup utility import — LGTMImporting kill_hanging_jobs here is correct and keeps the scheduler self-contained for its own startup hygiene.
try: | ||
killed_jobs = kill_hanging_jobs(sql_adapter, SchedulerType.COMPRESSION) | ||
if killed_jobs is not None: | ||
logger.info(f"Killed {len(killed_jobs)} hanging query jobs.") | ||
except Exception: | ||
logger.exception("Failed to kill hanging query jobs.") | ||
return -1 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
Confirm: should startup abort if cleanup fails?
Current behaviour exits with -1 on any cleanup failure. If DB hiccups or transient auth issues occur, permanently preventing scheduling might be undesirable operationally.
If the product decision is to prefer availability over strict hygiene, consider logging the failure and proceeding, leaving periodic/next-restart cleanup (or scheduling a background retry). Example change:
- except Exception:
- logger.exception("Failed to kill hanging compression jobs.")
- return -1
+ except Exception:
+ logger.exception("Failed to kill hanging compression jobs.")
+ logger.warning("Continuing startup despite cleanup failure.")
If strict abort is intended, ignore this suggestion but please capture the rationale in a short code comment for future maintainers.
Committable suggestion skipped: line range outside the PR's diff.
try: | ||
killed_jobs = kill_hanging_jobs(sql_adapter, SchedulerType.COMPRESSION) | ||
if killed_jobs is not None: | ||
logger.info(f"Killed {len(killed_jobs)} hanging query jobs.") | ||
except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
Optional: add a debug log when nothing needed to be killed
Improves observability in ops runs where no action was taken.
killed_jobs = kill_hanging_jobs(sql_adapter, SchedulerType.COMPRESSION)
- if killed_jobs is not None:
- logger.info(f"Killed {len(killed_jobs)} hanging compression jobs.")
+ if killed_jobs is not None:
+ logger.info(f"Killed {len(killed_jobs)} hanging compression jobs.")
+ else:
+ logger.debug("No hanging compression jobs to kill.")
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
around lines 429 to 433, the current logic only logs when killed_jobs is truthy;
add a debug log when no jobs needed killing to improve observability. After
calling kill_hanging_jobs(sql_adapter, SchedulerType.COMPRESSION) check if
killed_jobs is None or an empty list and call logger.debug with a concise
message like "No hanging compression jobs to kill." so operator runs show
explicit no-op events; keep the existing info log when jobs were killed and
preserve the try/except structure.
Description
Currently, if a compression or search job stucks in running state, we don't have a way to recover them, and they will never change their status to succeed or failed. This could confuse user, and will also prevent the archives being properly removed based on the retention period.
This PR propose a change that removes any running jobs when scheduler restarts. When marking a running job as failed, we do the following:
Note that we directly mark job as failed instead of rerunning them because
In addition, if the hang is caused by some internal bugs, rerunning the job is very unlikely to resolve the issue.
Checklist
breaking change.
Validation performed
Note, the QueryTask are not properly updated. Refer to https://github.com/y-scope/clp/pull/1208/files#r2279708983 for details.
Summary by CodeRabbit