Skip to content

Conversation

haiqi96
Copy link
Contributor

@haiqi96 haiqi96 commented Aug 15, 2025

Description

Currently, if a compression or search job stucks in running state, we don't have a way to recover them, and they will never change their status to succeed or failed. This could confuse user, and will also prevent the archives being properly removed based on the retention period.

This PR propose a change that removes any running jobs when scheduler restarts. When marking a running job as failed, we do the following:

  1. Find all associated running tasks under the job and mark them as failed.
  2. Update the duration to be 0.

Note that we directly mark job as failed instead of rerunning them because

  • For compression job, rerunning a hanging job can cause same data being compressed multiple times.
  • For search job, there's no point rerunning the job since very likely user has started another search.
    In addition, if the hang is caused by some internal bugs, rerunning the job is very unlikely to resolve the issue.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Manually modified the code to make compression job and search job stuck at running stage.
  • made sure that running jobs are marked as failed when scheduler restarts.
    Note, the QueryTask are not properly updated. Refer to https://github.com/y-scope/clp/pull/1208/files#r2279708983 for details.

Summary by CodeRabbit

  • New Features
    • Automatic cleanup of hanging compression and query jobs at scheduler startup.
    • System-wide recognition of a new "killed" terminal job state; tooling and schedulers stop polling when observed.
  • Bug Fixes
    • Schedulers abort startup on cleanup failure with clear logs to avoid unstable runs.
  • Refactor
    • Internal routing/constants harmonized for consistency (no behaviour changes).
  • UI
    • Job status UI updated to display a new "killed" state.

Copy link
Contributor

coderabbitai bot commented Aug 15, 2025

Walkthrough

Renames QueueName → SchedulerType, adds KILLED to multiple job/task enums, updates imports and Celery routing to use SchedulerType, and adds kill_hanging_jobs(sql_adapter, scheduler_type) called at compression and query scheduler startup to mark RUNNING jobs/tasks as KILLED; schedulers abort on cleanup failure.

Changes

Cohort / File(s) Summary
Constants & enums
components/job-orchestration/job_orchestration/scheduler/constants.py
Renamed QueueNameSchedulerType (COMPRESSION/QUERY); added KILLED = auto() to CompressionJobStatus, CompressionTaskStatus, QueryJobStatus, QueryTaskStatus.
Start scripts & Celery routing
components/clp-package-utils/clp_package_utils/scripts/start_clp.py, components/job-orchestration/job_orchestration/executor/compress/celeryconfig.py, components/job-orchestration/job_orchestration/executor/query/celeryconfig.py
Replaced imports of QueueName with SchedulerType and switched task_routes / celery_route values from QueueName.*SchedulerType.*. No other logic/config changes.
Scheduler startup cleanup
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py, components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py, components/job-orchestration/job_orchestration/scheduler/utils.py
Added kill_hanging_jobs(sql_adapter, scheduler_type) in utils; schedulers call it after creating SQL_Adapter. If RUNNING jobs exist, tasks and jobs updated to KILLED and duration set to 0; compression also updates update_time; on cleanup error scheduler logs and exits with -1; success logs killed count and continues.
Client & server typings / native scripts
components/webui/client/src/pages/IngestPage/Jobs/typings.tsx, components/webui/server/src/typings/query.ts, components/clp-package-utils/clp_package_utils/scripts/native/compress.py, components/clp-package-utils/clp_package_utils/scripts/native/utils.py
Added KILLED handling: client adds KILLED to CompressionJobStatus and renders a black "killed" badge; server adds KILLED to QUERY_JOB_STATUS; native scripts treat KILLED as terminal (stop polling / return).

Sequence Diagram(s)

sequenceDiagram
    participant Scheduler as Compression / Query Scheduler
    participant SQLA as SQL_Adapter
    participant Utils as kill_hanging_jobs
    participant DB as Database

    Scheduler->>SQLA: create adapter
    Scheduler->>Utils: kill_hanging_jobs(SQLA, SchedulerType)
    Utils->>DB: SELECT job IDs WHERE status = RUNNING (by scheduler type)
    DB-->>Utils: job IDs or none
    alt jobs found
        Utils->>DB: UPDATE tasks SET status=KILLED, duration=0 WHERE status=RUNNING AND job_id IN (...)
        Utils->>DB: UPDATE jobs SET status=KILLED, duration=0 [, update_time=CURRENT_TIMESTAMP() for compression]
        Utils-->>Scheduler: list of killed job IDs
    else no jobs
        Utils-->>Scheduler: None
    end
    alt cleanup failed
        Scheduler-->>Scheduler: log error and exit (-1)
    else
        Scheduler-->>Scheduler: log count and continue startup
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • junhaoliao
  • haiqi96

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e7e1b5b and 1f883d3.

📒 Files selected for processing (3)
  • components/clp-package-utils/clp_package_utils/scripts/start_clp.py (3 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (3 hunks)
  • components/job-orchestration/job_orchestration/scheduler/utils.py (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
PR: y-scope/clp#1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.

Applied to files:

  • components/clp-package-utils/clp_package_utils/scripts/start_clp.py
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
🧬 Code graph analysis (3)
components/job-orchestration/job_orchestration/scheduler/utils.py (2)
components/clp-py-utils/clp_py_utils/sql_adapter.py (2)
  • SQL_Adapter (60-131)
  • create_mysql_connection (64-82)
components/job-orchestration/job_orchestration/scheduler/constants.py (5)
  • CompressionJobStatus (27-32)
  • CompressionTaskStatus (40-45)
  • QueryJobStatus (50-61)
  • QueryTaskStatus (64-74)
  • SchedulerType (9-11)
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
  • SchedulerType (9-11)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)
components/job-orchestration/job_orchestration/scheduler/constants.py (3)
  • CompressionJobStatus (27-32)
  • CompressionTaskStatus (40-45)
  • SchedulerType (9-11)
components/job-orchestration/job_orchestration/scheduler/utils.py (1)
  • kill_hanging_jobs (22-81)
🔇 Additional comments (6)
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (3)

42-42: Rename alignment looks good (QueueName → SchedulerType).

Import swap is consistent with repo-wide renaming and downstream Celery routing. No issues.


651-662: Celery route constant: good change.

Passing SchedulerType.COMPRESSION directly reads cleaner than an f-string and remains a plain queue name ("compression") as required by Celery’s -Q. Looks correct.


676-691: Celery route constant: good change.

Same here for query worker; the route is the expected string "query". All good.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)

35-39: Enum import expansion is correct.

CompressionJobStatus/TaskStatus and SchedulerType are the right symbols for new KILLED flow.


50-50: Startup cleanup import wired correctly.

kill_hanging_jobs is imported from the right module and used below. No issues.

components/job-orchestration/job_orchestration/scheduler/utils.py (1)

66-71: Good: job status coerced to int and update_time managed for compression only.

The explicit int() avoids enum stringification issues; conditional update_time aligns with current table semantics.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@haiqi96 haiqi96 changed the title feat(package): Let schedulers mark running jobs as failed at start up. feat(package): Mark running jobs as failed when scheduler starts. Aug 15, 2025
Comment on lines +68 to +69
if SchedulerType.COMPRESSION == scheduler_type:
field_set_expressions.append("update_time = CURRENT_TIMESTAMP()")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirkrodrigues This update_time has make things more complicated.
In the long term, it might be worthwhile adding the update time for query_job as well though.

Comment on lines 23 to 36
if SchedulerType.COMPRESSION == scheduler_type:
jobs_table_name = COMPRESSION_JOBS_TABLE_NAME
job_status_running = CompressionJobStatus.RUNNING
job_status_failed = CompressionJobStatus.FAILED
tasks_table_name = COMPRESSION_TASKS_TABLE_NAME
task_status_running = CompressionTaskStatus.RUNNING
task_status_failed = CompressionTaskStatus.FAILED
elif SchedulerType.QUERY == scheduler_type:
jobs_table_name = QUERY_JOBS_TABLE_NAME
job_status_running = QueryJobStatus.RUNNING
job_status_failed = QueryJobStatus.FAILED
tasks_table_name = QUERY_TASKS_TABLE_NAME
task_status_running = QueryTaskStatus.RUNNING
task_status_failed = QueryTaskStatus.FAILED
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty ugly. I have discussed with Zhihao and we both agree that having a class managing the Tablename, job/task status might be the right way to go but it would require more refactoring.

For now, I still feel this is better than asking user to passin all 6 arguments by themselves.

Comment on lines 55 to 64
job_id_placeholders_str = ",".join(["%s"] * len(hanging_job_ids))
db_cursor.execute(
f"""
UPDATE {tasks_table_name}
SET status={task_status_failed}, duration=0
WHERE status={task_status_running}
AND job_id IN ({job_id_placeholders_str})
""",
hanging_job_ids,
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't even update task status from pending to running, so this either won't do anything to task table.
or we hack it by setting task_status_running = QueryTaskStatus.PENDING.

Depends on when we will switch to spider, we can either fix the task status reporting properly, or live with it.

@haiqi96
Copy link
Contributor Author

haiqi96 commented Aug 15, 2025

Depending on whether Junhao can add a new Killed state handle to the webui or not, this PR still needs to be updated slightly. Nevertheless, it should be good for review.

@haiqi96 haiqi96 marked this pull request as ready for review August 15, 2025 19:11
@haiqi96 haiqi96 requested a review from a team as a code owner August 15, 2025 19:11
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🔭 Outside diff range comments (1)
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (1)

614-629: Use the correct logging level per scheduler (compression vs. query).

Per prior learning, generic_start_scheduler always uses the query scheduler’s logging level. Use the compression scheduler’s level when starting it.

Apply:

@@
-    necessary_env_vars = [
+    # Select appropriate logging level based on scheduler type
+    if component_name == COMPRESSION_SCHEDULER_COMPONENT_NAME:
+        scheduler_logging_level = clp_config.compression_scheduler.logging_level
+    else:
+        scheduler_logging_level = clp_config.query_scheduler.logging_level
+
+    necessary_env_vars = [
         f"PYTHONPATH={clp_site_packages_dir}",
@@
-        f"CLP_LOGS_DIR={container_logs_dir}",
-        f"CLP_LOGGING_LEVEL={clp_config.query_scheduler.logging_level}",
+        f"CLP_LOGS_DIR={container_logs_dir}",
+        f"CLP_LOGGING_LEVEL={scheduler_logging_level}",
     ]
♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/scheduler/utils.py (1)

55-64: Query tasks may never be marked RUNNING; also fail PENDING tasks for those jobs

Per known issue, many QueryTask rows remain PENDING even though the job is RUNNING, so the current filter misses them. Update both RUNNING and PENDING tasks to FAILED for the affected job IDs.

-        job_id_placeholders_str = ",".join(["%s"] * len(hanging_job_ids))
-        db_cursor.execute(
-            f"""
-            UPDATE {tasks_table_name}
-            SET status={task_status_failed}, duration=0
-            WHERE status={task_status_running}
-            AND job_id IN ({job_id_placeholders_str})
-            """,
-            hanging_job_ids,
-        )
+        job_id_placeholders_str = ",".join(["%s"] * len(hanging_job_ids))
+        # For QUERY, some implementations never flip tasks from PENDING to RUNNING.
+        # Fail both RUNNING and PENDING tasks under the affected jobs.
+        if SchedulerType.QUERY == scheduler_type:
+            task_statuses_to_fail = (task_status_running, QueryTaskStatus.PENDING)
+        else:
+            task_statuses_to_fail = (task_status_running,)
+
+        status_placeholders_str = ",".join(["%s"] * len(task_statuses_to_fail))
+        task_update_params = [int(task_status_failed)] + [int(s) for s in task_statuses_to_fail] + hanging_job_ids
+
+        db_cursor.execute(
+            f"""
+            UPDATE {tasks_table_name}
+            SET status=%s, duration=0
+            WHERE status IN ({status_placeholders_str})
+            AND job_id IN ({job_id_placeholders_str})
+            """,
+            task_update_params,
+        )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 0892a1c and 8d00d8c.

📒 Files selected for processing (7)
  • components/clp-package-utils/clp_package_utils/scripts/start_clp.py (3 hunks)
  • components/job-orchestration/job_orchestration/executor/compress/celeryconfig.py (2 hunks)
  • components/job-orchestration/job_orchestration/executor/query/celeryconfig.py (1 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (3 hunks)
  • components/job-orchestration/job_orchestration/scheduler/constants.py (1 hunks)
  • components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (3 hunks)
  • components/job-orchestration/job_orchestration/scheduler/utils.py (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
PR: y-scope/clp#1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.

Applied to files:

  • components/clp-package-utils/clp_package_utils/scripts/start_clp.py
🧬 Code Graph Analysis (5)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)
components/job-orchestration/job_orchestration/scheduler/constants.py (3)
  • CompressionJobStatus (27-31)
  • CompressionTaskStatus (39-43)
  • SchedulerType (9-11)
components/job-orchestration/job_orchestration/scheduler/utils.py (1)
  • kill_hanging_jobs (22-81)
components/job-orchestration/job_orchestration/executor/compress/celeryconfig.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
  • SchedulerType (9-11)
components/job-orchestration/job_orchestration/scheduler/utils.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (5)
  • CompressionJobStatus (27-31)
  • CompressionTaskStatus (39-43)
  • QueryJobStatus (48-58)
  • QueryTaskStatus (61-70)
  • SchedulerType (9-11)
components/job-orchestration/job_orchestration/executor/query/celeryconfig.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
  • SchedulerType (9-11)
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
  • SchedulerType (9-11)
🔇 Additional comments (8)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)

9-12: Rename and usage of SchedulerType look good.

Clear constant container, no behavioural change; downstream references align with queue names.

components/clp-package-utils/clp_package_utils/scripts/start_clp.py (1)

44-44: No stale QueueName references found
I ran a repo-wide search for “QueueName” (both precise and broad patterns) and confirmed there are no remaining imports or usages. The rename to SchedulerType is safe to merge.

components/job-orchestration/job_orchestration/executor/query/celeryconfig.py (1)

3-13: Routing updated to SchedulerType. LGTM.

Queue targeting remains “query”, consistent with worker -Q and start scripts.

components/job-orchestration/job_orchestration/executor/compress/celeryconfig.py (1)

3-16: Routing updated to SchedulerType. LGTM.

Compression task correctly maps to the “compression” queue; priority settings unchanged.

components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (2)

52-57: Consolidated imports + SchedulerType use are consistent.

Brings statuses and SchedulerType into scope cleanly.


78-78: Startup cleanup dependency import is appropriate.

Importing kill_hanging_jobs where it’s used keeps intent clear.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)

34-38: Imports update looks good

Bringing in SchedulerType alongside the status enums is consistent with the new utils API.


49-49: Proactive startup cleanup import is appropriate

Importing kill_hanging_jobs here is the right spot, keeping startup concerns localized to the scheduler entrypoint.

Comment on lines 66 to 79
jobs_update_config = {"status": job_status_failed, "duration": 0}
field_set_expressions = [f"{k} = %s" for k in jobs_update_config.keys()]
if SchedulerType.COMPRESSION == scheduler_type:
field_set_expressions.append("update_time = CURRENT_TIMESTAMP()")

values = list(jobs_update_config.values()) + hanging_job_ids
db_cursor.execute(
f"""
UPDATE {jobs_table_name}
SET {", ".join(field_set_expressions)}
WHERE id in ({job_id_placeholders_str})
""",
values,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Coerce job status enum to int before parameterizing

Be explicit about integer values passed to the DB and keep the parameterization consistent.

-        jobs_update_config = {"status": job_status_failed, "duration": 0}
+        jobs_update_config = {"status": int(job_status_failed), "duration": 0}
         field_set_expressions = [f"{k} = %s" for k in jobs_update_config.keys()]
         if SchedulerType.COMPRESSION == scheduler_type:
             field_set_expressions.append("update_time = CURRENT_TIMESTAMP()")
 
-        values = list(jobs_update_config.values()) + hanging_job_ids
+        values = list(jobs_update_config.values()) + hanging_job_ids
         db_cursor.execute(
             f"""
             UPDATE {jobs_table_name}
             SET {", ".join(field_set_expressions)}
             WHERE id in ({job_id_placeholders_str})
             """,
             values,
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
jobs_update_config = {"status": job_status_failed, "duration": 0}
field_set_expressions = [f"{k} = %s" for k in jobs_update_config.keys()]
if SchedulerType.COMPRESSION == scheduler_type:
field_set_expressions.append("update_time = CURRENT_TIMESTAMP()")
values = list(jobs_update_config.values()) + hanging_job_ids
db_cursor.execute(
f"""
UPDATE {jobs_table_name}
SET {", ".join(field_set_expressions)}
WHERE id in ({job_id_placeholders_str})
""",
values,
)
jobs_update_config = {"status": int(job_status_failed), "duration": 0}
field_set_expressions = [f"{k} = %s" for k in jobs_update_config.keys()]
if SchedulerType.COMPRESSION == scheduler_type:
field_set_expressions.append("update_time = CURRENT_TIMESTAMP()")
values = list(jobs_update_config.values()) + hanging_job_ids
db_cursor.execute(
f"""
UPDATE {jobs_table_name}
SET {", ".join(field_set_expressions)}
WHERE id in ({job_id_placeholders_str})
""",
values,
)
🤖 Prompt for AI Agents
In components/job-orchestration/job_orchestration/scheduler/utils.py around
lines 66 to 79, the job status enum is being passed directly into the
parameterized query; coerce the enum to an integer before creating
jobs_update_config so the DB receives an int (e.g. use int(job_status_failed))
and then build field_set_expressions and values from that config as before to
keep parameter ordering consistent; ensure the cast happens before values =
list(...) so the parameterized values list contains the integer status, not the
enum.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (5)
components/job-orchestration/job_orchestration/scheduler/utils.py (4)

30-36: Query tasks likely never reach RUNNING — consider temporary broadened match

Per your note, QueryTask may remain PENDING; restricting to RUNNING will skip killing their tasks. Two short-term options until proper task status reporting exists:

  • For Query only, set task_status_running = QueryTaskStatus.PENDING
  • Or update with status IN (PENDING, RUNNING) for Query

If you prefer the first approach, minimal change:

     elif SchedulerType.QUERY == scheduler_type:
         jobs_table_name = QUERY_JOBS_TABLE_NAME
         job_status_running = QueryJobStatus.RUNNING
         job_status_killed = QueryJobStatus.KILLED
         tasks_table_name = QUERY_TASKS_TABLE_NAME
-        task_status_running = QueryTaskStatus.RUNNING
+        task_status_running = QueryTaskStatus.PENDING  # TODO: switch back to RUNNING once reporting is fixed
         task_status_killed = QueryTaskStatus.KILLED

Would you like me to instead implement a Query-only IN (PENDING, RUNNING) update for tasks? I can draft that variant as well.


22-22: Fix return type: job IDs are numeric

The function returns DB IDs (ints), not strings.

Apply this diff:

-def kill_hanging_jobs(sql_adapter: SQL_Adapter, scheduler_type: str) -> Optional[List[str]]:
+def kill_hanging_jobs(sql_adapter: SQL_Adapter, scheduler_type: str) -> Optional[List[int]]:

40-42: Use generic adapter connection and disable localhost socket

Align with other schedulers and avoid socket resolution issues in containers.

Apply this diff:

-    with closing(sql_adapter.create_mysql_connection()) as db_conn, closing(
+    with closing(sql_adapter.create_connection(True)) as db_conn, closing(
         db_conn.cursor(dictionary=True)
     ) as db_cursor:

43-51: Parameterize SELECT and coerce IntEnum to int

Avoid relying on IntEnum stringification and ensure robust parameter binding.

Apply this diff:

-        db_cursor.execute(
-            f"""
-            SELECT id
-            FROM {jobs_table_name}
-            WHERE status={job_status_running}
-            """
-        )
+        db_cursor.execute(
+            f"""
+            SELECT id
+            FROM {jobs_table_name}
+            WHERE status=%s
+            """,
+            (int(job_status_running),),
+        )
         hanging_job_ids = [row["id"] for row in db_cursor.fetchall()]
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)

427-434: Fix logger message: wrong scheduler in exception text

This is the compression scheduler, but the message says “query”.

Apply this diff:

     try:
         killed_jobs = kill_hanging_jobs(sql_adapter, SchedulerType.COMPRESSION)
         if killed_jobs is not None:
             logger.info(f"Killed {len(killed_jobs)} hanging compression jobs.")
     except Exception:
-        logger.exception("Failed to kill hanging query jobs.")
+        logger.exception("Failed to kill hanging compression jobs.")
         return -1
📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 8d00d8c and 7e25b53.

📒 Files selected for processing (5)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (3 hunks)
  • components/job-orchestration/job_orchestration/scheduler/constants.py (5 hunks)
  • components/job-orchestration/job_orchestration/scheduler/utils.py (1 hunks)
  • components/webui/client/src/pages/IngestPage/Jobs/typings.tsx (2 hunks)
  • components/webui/server/src/typings/query.ts (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}

⚙️ CodeRabbit Configuration File

  • Prefer false == <expression> rather than !<expression>.

Files:

  • components/webui/server/src/typings/query.ts
  • components/webui/client/src/pages/IngestPage/Jobs/typings.tsx
🧠 Learnings (2)
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
PR: y-scope/clp#1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
🧬 Code Graph Analysis (2)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)
components/job-orchestration/job_orchestration/scheduler/constants.py (3)
  • CompressionJobStatus (27-32)
  • CompressionTaskStatus (40-45)
  • SchedulerType (9-11)
components/job-orchestration/job_orchestration/scheduler/utils.py (1)
  • kill_hanging_jobs (22-81)
components/job-orchestration/job_orchestration/scheduler/utils.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (5)
  • CompressionJobStatus (27-32)
  • CompressionTaskStatus (40-45)
  • QueryJobStatus (50-61)
  • QueryTaskStatus (64-74)
  • SchedulerType (9-11)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (10)
components/webui/server/src/typings/query.ts (1)

27-35: Add KILLED status for queries — alignment looks correct

KILLED is appended after CANCELLED, preserving enum stability and it’s correctly excluded from QUERY_JOB_STATUS_WAITING_STATES. No issues spotted.

components/webui/client/src/pages/IngestPage/Jobs/typings.tsx (1)

71-76: KILLED badge rendering is clear

Using a black badge for KILLED reads distinctly and is consistent with the new status semantics. No change needed.

components/job-orchestration/job_orchestration/scheduler/constants.py (5)

9-12: SchedulerType rename looks good

Replacing QueueName with SchedulerType reads clearer and keeps the same literal values; downstream imports should be straightforward.


26-33: CompressionJobStatus: appended KILLED preserves wire compatibility

KILLED is added at the end, keeping prior numeric values stable. Good.


40-46: CompressionTaskStatus: appended KILLED preserves wire compatibility

Same positive note as for job status — appended, not inserted.


50-58: QueryJobStatus: appended KILLED after CANCELLED — OK

Order and helper methods remain intact. Matches UI/server typings changes.


64-71: QueryTaskStatus: appended KILLED after CANCELLED — OK

Consistent with job status and broader PR intent.

components/job-orchestration/job_orchestration/scheduler/utils.py (2)

66-79: Good: cast job status to int and parameterize jobs update

jobs_update_config correctly casts status to int and the UPDATE uses bound parameters. Nice.


22-81: QueryTaskStatus.RUNNING transitions verified – no cleanup expansion needed

We found multiple code paths that set query tasks to RUNNING, so the additional PENDING-hack or IN-clause isn’t required:

• components/job-orchestration/job_orchestration/executor/query/utils.py (line 55):
task_status = QueryTaskStatus.RUNNING passed to update_query_task_metadata
• components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (lines 359–364):
QueryTaskStatus.RUNNING used in the SQL update for QUERY_TASKS_TABLE_NAME
• components/job-orchestration/job_orchestration/scheduler/utils.py (line 35):
task_status_running = QueryTaskStatus.RUNNING in kill_hanging_jobs

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)

34-38: Imports updated for SchedulerType and kill_hanging_jobs — OK

Importing SchedulerType and pulling in kill_hanging_jobs aligns with the new startup cleanup. Looks good.

Copy link
Member

@kirkrodrigues kirkrodrigues left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're missing two changes:

--- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py
+++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py
@@ -91,6 +91,9 @@ def handle_job_update(db, db_cursor, job_id, no_progress_reporting):
             # One or more tasks in the job has failed
             logger.error(f"Compression failed. {job_row['status_msg']}")
             break  # Done
+        if CompressionJobStatus.KILLED == job_status:
+            logger.error(f"Compression killed. {job_row['status_msg']}")
+            break

         if CompressionJobStatus.RUNNING == job_status:
             if not no_progress_reporting:
--- a/components/clp-package-utils/clp_package_utils/scripts/native/utils.py
+++ b/components/clp-package-utils/clp_package_utils/scripts/native/utils.py
@@ -111,6 +111,7 @@ def wait_for_query_job(sql_adapter: SQL_Adapter, job_id: int) -> QueryJobStatus:
             if new_status in (
                 QueryJobStatus.SUCCEEDED,
                 QueryJobStatus.FAILED,
                 QueryJobStatus.CANCELLED,
+                QueryJobStatus.KILLED,
             ):
                 return new_status

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
components/clp-package-utils/clp_package_utils/scripts/native/compress.py (5)

131-137: handle_job always returns SUCCEEDED; map the actual terminal job status

Even when the job is FAILED/KILLED, this function returns CompressionJobCompletionStatus.SUCCEEDED, which is incorrect for CLI exit codes and automation.

Refactor to capture the terminal status returned by handle_job_update and map it:

-            handle_job_update(db, db_cursor, job_id, no_progress_reporting)
+            terminal_status = handle_job_update(db, db_cursor, job_id, no_progress_reporting)
         except Exception as ex:
             logger.error(ex)
             return CompressionJobCompletionStatus.FAILED

         logger.debug(f"Finished job {job_id}")

-        return CompressionJobCompletionStatus.SUCCEEDED
+        if terminal_status == CompressionJobStatus.SUCCEEDED:
+            return CompressionJobCompletionStatus.SUCCEEDED
+        # If there's no explicit KILLED completion-status enum, treat as FAILED for now.
+        return CompressionJobCompletionStatus.FAILED

Note: This assumes you also change the SUCCEEDED/FAILED branches in handle_job_update to return CompressionJobStatus.SUCCEEDED/FAILED instead of break. If CompressionJobCompletionStatus now also includes KILLED, return that specifically rather than folding into FAILED.


61-68: Parameterize SQL and avoid committing after SELECT to reduce risk and overhead

  • The polling queries interpolate job_id into SQL. While job_id is an int under our control, parameterizing is safer and consistent with the rest of the codebase.
  • Calling db.commit() after a SELECT is unnecessary and adds overhead under autocommit.

Proposed diff:

-        polling_query = (
-            f"SELECT status, status_msg FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}"
-        )
+        polling_query = (
+            f"SELECT status, status_msg FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id=%s"
+        )
...
-        polling_query = (
-            f"SELECT start_time, status, status_msg, uncompressed_size, compressed_size, duration "
-            f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}"
-        )
+        polling_query = (
+            f"SELECT start_time, status, status_msg, uncompressed_size, compressed_size, duration "
+            f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id=%s"
+        )
...
-        db_cursor.execute(polling_query)
-        results = db_cursor.fetchall()
-        db.commit()
+        db_cursor.execute(polling_query, (job_id,))
+        results = db_cursor.fetchall()

(Apply the same parameter tuple for both polling-query shapes.)

If you’d like, I can search the repo and convert similar f-strings to parameterized queries in a follow-up.

Also applies to: 73-76, 99-107


41-56: Guard against division-by-zero and invalid durations in progress logging

compression_ratio = float(job_uncompressed_size) / job_compressed_size will raise if compressed_size is 0. Similarly, computing speed with duration can divide by 0 if duration is not yet populated.

Consider:

  • Only compute ratio when job_compressed_size > 0.
  • Only compute speed on SUCCEEDED when job_row["duration"] > 0, otherwise fall back to elapsed wall-clock as you already do.

Example refactor:

-    compression_ratio = float(job_uncompressed_size) / job_compressed_size
+    compression_ratio = (
+        float(job_uncompressed_size) / job_compressed_size
+        if job_compressed_size > 0
+        else None
+    )
...
-    logger.info(
-        f"Compressed {pretty_size(job_uncompressed_size)} into "
-        f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}x). "
-        f"Speed: {pretty_size(compression_speed)}/s."
-    )
+    ratio_str = f" ({compression_ratio:.2f}x)" if compression_ratio else ""
+    logger.info(
+        f"Compressed {pretty_size(job_uncompressed_size)} into "
+        f"{pretty_size(job_compressed_size)}{ratio_str}. "
+        f"Speed: {pretty_size(compression_speed)}/s."
+    )

This avoids noisy tracebacks during early RUNNING updates.


38-38: Use module name for logger to preserve hierarchical logging

logging.getLogger(__file__) bakes in a file path. Prefer __name__ to participate in hierarchical logger config.

-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)

1-261: Apply Black formatting to compress.py

CI reports that compress.py would be reformatted. Please run Black with the project’s style settings and commit the changes to resolve the lint failure:

black --line-length 100 components/clp-package-utils/clp_package_utils/scripts/native/compress.py
components/clp-package-utils/clp_package_utils/scripts/native/utils.py (2)

105-111: Parameterize the SELECT and avoid commit after SELECT

Minor hardening and perf nit:

  • Replace string interpolation with a parameter placeholder to future-proof this path.
  • Remove db_conn.commit() after a read-only query.
-            db_cursor.execute(
-                f"SELECT `status` FROM `{QUERY_JOBS_TABLE_NAME}` WHERE `id` = {job_id}"
-            )
+            db_cursor.execute(
+                f"SELECT `status` FROM `{QUERY_JOBS_TABLE_NAME}` WHERE `id` = %s",
+                (job_id,),
+            )
...
-            db_conn.commit()
+            # No commit needed after SELECT

111-118: Optional: Document return semantics now that KILLED is terminal

Consider updating the docstring of wait_for_query_job to explicitly state that it can return SUCCEEDED/FAILED/CANCELLED/KILLED, since callers might gate user messaging or exit codes based on this.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 9ea0651 and e750a68.

📒 Files selected for processing (2)
  • components/clp-package-utils/clp_package_utils/scripts/native/compress.py (1 hunks)
  • components/clp-package-utils/clp_package_utils/scripts/native/utils.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
components/clp-package-utils/clp_package_utils/scripts/native/utils.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
  • QueryJobStatus (50-61)
components/clp-package-utils/clp_package_utils/scripts/native/compress.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
  • CompressionJobStatus (27-32)
🪛 GitHub Actions: clp-lint
components/clp-package-utils/clp_package_utils/scripts/native/compress.py

[error] 1-1: lint:check-no-cpp failed: Black formatting would reformat compress.py. Run 'black' to format the file.

🔇 Additional comments (1)
components/clp-package-utils/clp_package_utils/scripts/native/utils.py (1)

115-116: Treat KILLED as terminal in query wait loop — good change

Adding QueryJobStatus.KILLED ensures the waiter returns promptly on scheduler cleanup, keeping CLI behaviour consistent with newly introduced terminal state semantics.

Comment on lines 94 to 97
if CompressionJobStatus.KILLED == job_status:
# The job is killed
logger.error(f"Compression killed. {job_row['status_msg']}")
break # Done
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Return a terminal status instead of breaking; otherwise callers will report success for killed jobs

Right now, this branch breaks the loop and handle_job later returns CompressionJobCompletionStatus.SUCCEEDED unconditionally, which will misreport killed jobs as succeeded. Return a terminal status from handle_job_update and let the caller map it to a completion status.

Apply this minimal change here (and make analogous changes in the SUCCEEDED/FAILED branches) so the status is propagated:

-        if CompressionJobStatus.KILLED == job_status:
-            # The job is killed
-            logger.error(f"Compression killed. {job_row['status_msg']}")
-            break # Done
+        if CompressionJobStatus.KILLED == job_status:
+            # The job is killed
+            logger.error(f"Compression killed. {job_row['status_msg']}")
+            return CompressionJobStatus.KILLED

Follow-up: Update handle_job_update to return a CompressionJobStatus for all terminal states and update handle_job to map that to a CompressionJobCompletionStatus (see separate comment below).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if CompressionJobStatus.KILLED == job_status:
# The job is killed
logger.error(f"Compression killed. {job_row['status_msg']}")
break # Done
if CompressionJobStatus.KILLED == job_status:
# The job is killed
logger.error(f"Compression killed. {job_row['status_msg']}")
return CompressionJobStatus.KILLED
🤖 Prompt for AI Agents
In components/clp-package-utils/clp_package_utils/scripts/native/compress.py
around lines 94 to 97, the branch for CompressionJobStatus.KILLED currently
breaks the loop which causes handle_job to later report success; change this to
return the terminal CompressionJobStatus (e.g., return
CompressionJobStatus.KILLED) instead of break, and make the analogous change in
the SUCCEEDED and FAILED branches so handle_job_update always returns a terminal
CompressionJobStatus for terminal states; also update the caller handle_job to
accept that return value and map returned CompressionJobStatus ->
CompressionJobCompletionStatus (SUCCEEDED/FAILED/KILLED) before exiting so the
final completion status is propagated correctly.

@haiqi96 haiqi96 requested a review from kirkrodrigues August 21, 2025 03:29
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)

429-436: Wrong subsystem in log messages: use “compression”, not “query”

Both the info and exception messages reference query jobs in the compression scheduler context.

Apply this diff:

-        if killed_jobs is not None:
-            logger.info(f"Killed {len(killed_jobs)} hanging query jobs.")
+        if killed_jobs is not None:
+            logger.info(f"Killed {len(killed_jobs)} hanging compression jobs.")
@@
-    except Exception:
-        logger.exception("Failed to kill hanging query jobs.")
+    except Exception:
+        logger.exception("Failed to kill hanging compression jobs.")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between c849848 and e7e1b5b.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
PR: y-scope/clp#1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
🧬 Code Graph Analysis (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)
components/job-orchestration/job_orchestration/scheduler/constants.py (3)
  • CompressionJobStatus (27-32)
  • CompressionTaskStatus (40-45)
  • SchedulerType (9-11)
components/job-orchestration/job_orchestration/scheduler/utils.py (1)
  • kill_hanging_jobs (22-81)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: antlr-code-committed (macos-15)
🔇 Additional comments (2)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)

35-39: Imports aligned with new enums — LGTM

Bringing in SchedulerType alongside CompressionJobStatus/CompressionTaskStatus is consistent with the new startup cleanup flow. No issues spotted.


50-50: Startup cleanup utility import — LGTM

Importing kill_hanging_jobs here is correct and keeps the scheduler self-contained for its own startup hygiene.

Comment on lines 429 to 436
try:
killed_jobs = kill_hanging_jobs(sql_adapter, SchedulerType.COMPRESSION)
if killed_jobs is not None:
logger.info(f"Killed {len(killed_jobs)} hanging query jobs.")
except Exception:
logger.exception("Failed to kill hanging query jobs.")
return -1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Confirm: should startup abort if cleanup fails?

Current behaviour exits with -1 on any cleanup failure. If DB hiccups or transient auth issues occur, permanently preventing scheduling might be undesirable operationally.

If the product decision is to prefer availability over strict hygiene, consider logging the failure and proceeding, leaving periodic/next-restart cleanup (or scheduling a background retry). Example change:

-    except Exception:
-        logger.exception("Failed to kill hanging compression jobs.")
-        return -1
+    except Exception:
+        logger.exception("Failed to kill hanging compression jobs.")
+        logger.warning("Continuing startup despite cleanup failure.")

If strict abort is intended, ignore this suggestion but please capture the rationale in a short code comment for future maintainers.

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 429 to 433
try:
killed_jobs = kill_hanging_jobs(sql_adapter, SchedulerType.COMPRESSION)
if killed_jobs is not None:
logger.info(f"Killed {len(killed_jobs)} hanging query jobs.")
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Optional: add a debug log when nothing needed to be killed

Improves observability in ops runs where no action was taken.

         killed_jobs = kill_hanging_jobs(sql_adapter, SchedulerType.COMPRESSION)
-        if killed_jobs is not None:
-            logger.info(f"Killed {len(killed_jobs)} hanging compression jobs.")
+        if killed_jobs is not None:
+            logger.info(f"Killed {len(killed_jobs)} hanging compression jobs.")
+        else:
+            logger.debug("No hanging compression jobs to kill.")

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
around lines 429 to 433, the current logic only logs when killed_jobs is truthy;
add a debug log when no jobs needed killing to improve observability. After
calling kill_hanging_jobs(sql_adapter, SchedulerType.COMPRESSION) check if
killed_jobs is None or an empty list and call logger.debug with a concise
message like "No hanging compression jobs to kill." so operator runs show
explicit no-op events; keep the existing info log when jobs were killed and
preserve the try/except structure.

@kirkrodrigues kirkrodrigues changed the title feat(package): Mark running jobs as failed when scheduler starts. feat(package): Mark running jobs as failed when schedulers start. Aug 21, 2025
@kirkrodrigues kirkrodrigues merged commit 8a00bff into y-scope:main Aug 21, 2025
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants