|
| 1 | +from rq.job import JobStatus |
| 2 | + |
| 3 | +from .queues import filter_connection_params, get_connection, get_queue, get_unique_connection_configs |
| 4 | +from .workers import get_worker_class |
| 5 | + |
| 6 | +try: |
| 7 | + from prometheus_client import Summary |
| 8 | + from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily |
| 9 | + |
| 10 | + class RQCollector: |
| 11 | + """RQ stats collector""" |
| 12 | + |
| 13 | + summary = Summary('rq_request_processing_seconds', 'Time spent collecting RQ data') |
| 14 | + |
| 15 | + def collect(self): |
| 16 | + from .settings import QUEUES |
| 17 | + |
| 18 | + with self.summary.time(): |
| 19 | + rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues']) |
| 20 | + rq_workers_success = CounterMetricFamily('rq_workers_success', 'RQ workers success count', labels=['name', 'queues']) |
| 21 | + rq_workers_failed = CounterMetricFamily('rq_workers_failed', 'RQ workers fail count', labels=['name', 'queues']) |
| 22 | + rq_workers_working_time = CounterMetricFamily('rq_workers_working_time', 'RQ workers spent seconds', labels=['name', 'queues']) |
| 23 | + |
| 24 | + rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by state', labels=['queue', 'status']) |
| 25 | + |
| 26 | + worker_class = get_worker_class() |
| 27 | + unique_configs = get_unique_connection_configs() |
| 28 | + connections = {} |
| 29 | + for queue_name, config in QUEUES.items(): |
| 30 | + index = unique_configs.index(filter_connection_params(config)) |
| 31 | + if index not in connections: |
| 32 | + connections[index] = connection = get_connection(queue_name) |
| 33 | + |
| 34 | + for worker in worker_class.all(connection): |
| 35 | + name = worker.name |
| 36 | + label_queues = ','.join(worker.queue_names()) |
| 37 | + rq_workers.add_metric([name, worker.get_state(), label_queues], 1) |
| 38 | + rq_workers_success.add_metric([name, label_queues], worker.successful_job_count) |
| 39 | + rq_workers_failed.add_metric([name, label_queues], worker.failed_job_count) |
| 40 | + rq_workers_working_time.add_metric([name, label_queues], worker.total_working_time) |
| 41 | + else: |
| 42 | + connection = connections[index] |
| 43 | + |
| 44 | + queue = get_queue(queue_name, connection=connection) |
| 45 | + rq_jobs.add_metric([queue_name, JobStatus.QUEUED], queue.count) |
| 46 | + rq_jobs.add_metric([queue_name, JobStatus.STARTED], queue.started_job_registry.count) |
| 47 | + rq_jobs.add_metric([queue_name, JobStatus.FINISHED], queue.finished_job_registry.count) |
| 48 | + rq_jobs.add_metric([queue_name, JobStatus.FAILED], queue.failed_job_registry.count) |
| 49 | + rq_jobs.add_metric([queue_name, JobStatus.DEFERRED], queue.deferred_job_registry.count) |
| 50 | + rq_jobs.add_metric([queue_name, JobStatus.SCHEDULED], queue.scheduled_job_registry.count) |
| 51 | + |
| 52 | + yield rq_workers |
| 53 | + yield rq_workers_success |
| 54 | + yield rq_workers_failed |
| 55 | + yield rq_workers_working_time |
| 56 | + yield rq_jobs |
| 57 | + |
| 58 | +except ImportError: |
| 59 | + RQCollector = None # type: ignore[assignment, misc] |
0 commit comments