Skip to content
1 change: 1 addition & 0 deletions documentation/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Infrastructure / Support
----------------------
* Upgraded dependencies [see `PR #1847 <https://www.github.com/FlexMeasures/flexmeasures/pull/1847>`_]
* Improve general description on SwaggerDocs page and add two settings for hosts: FLEXMEASURES_SUPPORT_PAGE and FLEXMEASURES_TOS_PAGE [see `PR #1851 <https://www.github.com/FlexMeasures/flexmeasures/pull/1851>`_]
* Use ``flexmeasures jobs stats`` to show statistics of the job queueing system [see `PR #1838 <https://www.github.com/FlexMeasures/flexmeasures/pull/1838>`_]

Bugfixes
-----------
Expand Down
212 changes: 212 additions & 0 deletions flexmeasures/cli/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import random
import string
import sys
from datetime import datetime, timedelta
from types import TracebackType
from typing import Type

Expand All @@ -33,6 +34,7 @@
from flexmeasures.data.services.forecasting import handle_forecasting_exception
from flexmeasures.cli.utils import MsgStyle
from flexmeasures.utils.flexmeasures_inflection import join_words_into_a_list
from flexmeasures.utils.time_utils import server_now


REGISTRY_MAP = dict(
Expand All @@ -50,6 +52,216 @@ def fm_jobs():
"""FlexMeasures: Job queueing."""


@fm_jobs.command("stats")
@with_appcontext
@click.option(
"--window",
default=60,
show_default=True,
help="Look-back window (minutes) to estimate per-queue arrival rates.",
)
def stats(window: int):
"""
Show live statistics of the queueing system.

\b
Stats overall:
- ρ = average capacity requirement (consider scaling up the number of workers when close to or higher than 100%)
- L = average number of required workers = average number of jobs being serviced or in queue
- k = total number of available workers (capacity to do work)

\b
Stats per queue:
- W = average time until job is finished
- Ws = average time spent being serviced
- Wq = average time spent waiting in queue
- Ls = average number of jobs being worked on at any given time
- Lq = current queue length
- λ = arrival rate (estimated from enqueue timestamps over the most recent window)

Uses Little's-law to compute the average waiting times for each queue:

W = L / λ

"""
click.echo(f"Estimating arrival rates using a {window}-minute window...")

now = server_now()
cutoff = now - timedelta(minutes=window)

# FlexMeasures makes all queues available under app.queues
L_i = []
rows = []
for queue_name, rq_queue in app.queues.items():

# Lq = current queue length
Lq = rq_queue.count

# λ = jobs per second
lambda_rate = _estimate_arrival_rate_all_registries(rq_queue, cutoff, window)

if lambda_rate <= 0:
click.echo(f"{queue_name}: no recent arrivals → cannot estimate timings.")
rows.append([queue_name, "—", "—", "—", "—", "—", "—"])
continue

# Waiting time in queue
Wq = Lq / lambda_rate
# Time spent being serviced
Ws = _estimate_service_time(rq_queue, cutoff)
# Total time spent in system (waiting and being serviced)
W = Wq + Ws if Ws > 0 else Wq

# Ls = average jobs being worked on at any given time
Ls = lambda_rate * Ws
L_i.append(Lq + Ls)

rows.append(
[
queue_name,
f"{lambda_rate:.4f}",
Lq,
f"{Ls:.2f}",
f"{Wq:.2f}",
f"{Ws:.2f}",
f"{W:.2f}",
]
)

# Overall metrics (not per queue)
# Total number of workers
k_total = len(Worker.all(connection=app.redis_connection))
# Required workers
L_total = sum(L_i)
# Capacity requirements
rho_system = L_total / k_total if k_total > 0 else float("inf")

headers = [
"Queue",
"λ (/s)\narrivals",
"Lq\nqueue",
"Ls\nservice",
"Wq (s)\nwaiting",
"Ws (s)\nservicing",
"W (s)\ntotal",
]

click.secho(
f"\nOverall: k={k_total}, L={L_total:.2f}, ρ={rho_system:.0%}\n",
**(
MsgStyle.SUCCESS
if rho_system < 0.68
else MsgStyle.WARN if rho_system < 0.95 else MsgStyle.ERROR
),
)
click.echo(tabulate(rows, headers=headers, tablefmt="simple"))
click.echo("\n")


def _estimate_arrival_rate_all_registries(
queue: Queue, cutoff: datetime, window: int
) -> float:
"""
Estimate arrival rate λ (jobs/sec) by looking at all jobs belonging to the queue
across all registries (waiting/deferred/scheduled/started/finished/failed/canceled).

Only jobs with enqueued_at >= cutoff count toward recent arrivals.
"""
registries = [
queue,
queue.deferred_job_registry,
queue.scheduled_job_registry,
queue.started_job_registry,
queue.finished_job_registry,
queue.failed_job_registry,
queue.canceled_job_registry,
]

conn = queue.connection
recent = 0

for reg in registries:
try:
job_ids = reg.get_job_ids()
except Exception:
# some registries (rarely) may not implement get_job_ids cleanly
continue

# Scan newest → oldest
for job_id in reversed(job_ids):
raw = conn.hgetall(f"rq:job:{job_id}")
if not raw:
continue

enq = raw.get(b"enqueued_at")
if not enq:
continue

try:
ts = datetime.fromisoformat(enq.decode("utf-8"))
except Exception:
continue

if ts >= cutoff:
recent += 1
else:
# Jobs only get older; stop early for this registry
break

return recent / float(window * 60)


def _estimate_service_time(
queue: Queue, cutoff: datetime, max_jobs: int = 200
) -> float:
"""
Estimate average service time (seconds) using recently finished jobs.

Uses finished_job_registry and processes newest → oldest.
"""
reg = queue.finished_job_registry
conn = queue.connection

durations = []

try:
job_ids = reg.get_job_ids()
except Exception:
return 0.0

for job_id in reversed(job_ids):
raw = conn.hgetall(f"rq:job:{job_id}")
if not raw:
continue

started = raw.get(b"started_at")
ended = raw.get(b"ended_at")

if not started or not ended:
continue

try:
started_ts = datetime.fromisoformat(started.decode("utf-8"))
ended_ts = datetime.fromisoformat(ended.decode("utf-8"))
except Exception:
continue

if ended_ts < cutoff:
break

duration = (ended_ts - started_ts).total_seconds()
if duration >= 0:
durations.append(duration)

if len(durations) >= max_jobs:
break

if not durations:
return 0.0

return sum(durations) / len(durations)


@fm_jobs.command("run-job")
@with_appcontext
@click.option(
Expand Down