-
Notifications
You must be signed in to change notification settings - Fork 576
[FEAT] Dashboard metrics system #1724
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Implement multi-tier metrics aggregation with celery-batches - Add hourly/daily/monthly storage tables with proper indexing - Add dedicated metrics Celery worker (worker-metrics) - Add live query endpoints for immediate data access - Add production readiness: retry config, rate limiting, health checks - Add queue routing and broker resilience configuration - Add comprehensive unit tests for batch processing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
WalkthroughThis pull request introduces a comprehensive dashboard metrics system with batch event processing via Celery, Redis-backed caching, time-bucketed database aggregations (hourly/daily/monthly), REST API endpoints for metrics queries, and core metrics capture infrastructure with pluggable backends. The system enables organizations to track and query key operational metrics across documents, pages, API calls, LLM usage, and ETL executions. Changes
Sequence DiagramssequenceDiagram
actor Client
participant API as Metrics API<br/>(ViewSet)
participant Cache as Redis Cache
participant DB as Database
participant Service as MetricsQueryService
Client->>API: GET /metrics/summary?start=...&end=...
API->>API: Validate query params
Note over API: Check cache key
API->>Cache: Check cached response
alt Cache Hit
Cache-->>API: Return cached data
else Cache Miss
API->>Service: Query aggregated metrics
Service->>DB: SELECT from EventMetrics*
DB-->>Service: Aggregated results
Service-->>API: Metric summaries
API->>Cache: Store with TTL<br/>(30s for current hour,<br/>8h for historical)
end
API-->>Client: MetricsResponseSerializer
sequenceDiagram
participant App as Application<br/>(MetricsCapture)
participant Queue as RabbitMQ
participant Celery as Celery Worker
participant BatchProc as Batch Processor
participant DB as Database
App->>App: record_api_request(<br/>org_id, labels)
App->>Queue: Publish metric event<br/>(QueueBackend)
Note over Queue: Batch accumulates<br/>(flush_every=100,<br/>flush_interval=60s)
Celery->>Queue: Poll for batch
Queue-->>Celery: Batch of events
Celery->>BatchProc: process_dashboard_metric_events()
loop For each event
BatchProc->>BatchProc: Truncate to hour/day/month
BatchProc->>BatchProc: Aggregate in memory
end
BatchProc->>DB: Bulk upsert hourly
DB-->>BatchProc: Created/Updated count
BatchProc->>DB: Bulk upsert daily
DB-->>BatchProc: Created/Updated count
BatchProc->>DB: Bulk upsert monthly
DB-->>BatchProc: Created/Updated count
Note over DB: Merge labels, increment counters<br/>via select_for_update
sequenceDiagram
actor User
participant Client as Client
participant API as Live Series<br/>Endpoint
participant Service as MetricsQueryService
participant Sources as Source Tables<br/>(WorkflowFileExecution,<br/>PageUsage, etc.)
participant Client2 as Client
User->>Client: GET /metrics/live_series?metric_name=...
Client->>API: Request live metrics
API->>Service: get_*_metric(org_id,<br/>start_date, end_date)
par Query all metrics
Service->>Sources: Query DOCUMENTS_PROCESSED
Sources-->>Service: Results
Service->>Sources: Query PAGES_PROCESSED
Sources-->>Service: Results
Service->>Sources: Query LLM_CALLS
Sources-->>Service: Results
end
Note over Service: Aggregate by granularity<br/>(hour/day/week)
Service-->>API: Time-series data
API->>API: Serialize to MetricSeriesSerializer
API-->>Client2: JSON response<br/>(series, summary data)
Client2-->>User: Display metrics dashboard
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Fix all issues with AI Agents 🤖
In @backend/dashboard_metrics/capture.py:
- Around line 51-338: MetricsCapture methods call record() with the metric enum
as the first positional arg and keyword names like org_id= and value=, but core
record signature expects org_id first, metric_name second and metric_value
third, so change every call in record_api_request, record_etl_execution,
record_document_processed (both DOCUMENTS_PROCESSED and PAGES_PROCESSED),
record_llm_call (LLM_CALLS and LLM_USAGE), record_challenge,
record_summarization, and record_prompt_execution to match the core signature:
call record(org_id, MetricName.<X>, <metric_value>, labels=metric_labels,
project=project, tag=None) (use 1 for single-count events, pages for pages, cost
for LLM_USAGE as float), and remove the invalid value= keyword and duplicate
org_id keyword usage.
In @backend/dashboard_metrics/urls.py:
- Around line 9-15: The URL configuration is missing a mapping for the
DashboardMetricsViewSet.health action; add a new view mapping similar to the
others (e.g., create metrics_health = DashboardMetricsViewSet.as_view({"get":
"health"})) and include it in the URL patterns so GET /api/v2/metrics/health/ is
exposed; update any import/urlpatterns list references where other metrics_*
views are registered to include metrics_health.
In @backend/dashboard_metrics/views.py:
- Around line 267-273: Both live_summary and live_series call
UserContext.get_organization() and access .id directly; add the same guard used
in get_queryset(): if organization is None (or falsy) log an explanatory message
and raise PermissionDenied before accessing organization.id, then pass org_id to
MetricsQueryService.get_all_metrics_summary / get_all_metrics_series; apply the
same check for the other occurrence around lines referenced (the block using
UserContext.get_organization() / organization.id at ~310-312).
In @unstract/core/src/unstract/core/metrics/types.py:
- Around line 35-48: Add a unit test that ensures every value of the MetricName
enum has a corresponding key in METRIC_TYPE_MAP to prevent KeyError in
get_metric_type(); specifically, iterate over MetricName (or
MetricName.__members__.values()/MetricName) and assert that each .value is
present in METRIC_TYPE_MAP, referencing METRIC_TYPE_MAP and MetricName; this
complements validate_metric() and prevents missing entries that would cause
get_metric_type() to fail.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/pyproject.toml (1)
15-21: celery-batches version too old—use >=0.9.0 for Celery 5.3.4 and Python 3.12 supportThe constraint
celery-batches>=0.8.0pins to an outdated release (Jun 2023). For Celery 5.3.4 and Python 3.12, usecelery-batches>=0.9.0, which is documented to support Celery ~5.0 and Python 3.9–3.13. Version 0.8.0 predates Python 3.12 and lacks testing against current Celery 5.x releases.
🧹 Nitpick comments (16)
backend/dashboard_metrics/tasks.py (2)
37-75: Time bucketing helpers look correct; naive datetimes are assumed UTCThe truncation helpers correctly normalize numeric timestamps to UTC and bucket to hour/day/month. Note that naive datetimes are implicitly treated as UTC; ensure all producers either send UTC or explicitly aware datetimes so you don’t silently mis-bucket mixed timezones.
77-206: Batched aggregation logic is solid; consider tightening event validationThe batching and in-memory aggregation into hourly/daily/monthly buckets looks good and aligns with celery-batches’ request shape. Two minor points to consider:
event = request.args[0] if request.args else request.kwargs.get("event", {})assumes the first arg is always a dict; if other producers appear later, a simpleisinstance(..., dict)guard could avoid noisy warnings.timestampis expected as float/datetime; if there’s any chance of ISO strings coming through, you may want to normalize them explicitly rather than relying on a genericexcept Exceptionto skip them.Nothing blocking here; more of a future-hardening note.
backend/backend/urls_v2.py (1)
63-67: Metrics URL include is consistent with existing routingMounting
dashboard_metrics.urlsat"metrics/"follows the same pattern as other feature areas (e.g.,"usage/","notifications/"). Just ensure any external docs or clients expecting/api/v2/metrics/...are updated to match this base path if needed.docker/docker-compose.yaml (1)
120-138: worker-metrics service wiring looks consistent with existing workersThe new
worker-metricsservice mirrors the existing Celery worker patterns (image, entrypoint, queue, autoscale, env) and targets thedashboard_metric_eventsqueue as intended. If you expect metrics tasks to need shared workflow data or tool configs, consider whether it should also mount the same volumes as other workers; otherwise this setup looks good.backend/pyproject.toml (1)
145-148: worker-metrics Poe task aligns with docker-compose configurationThe
worker-metricsPoe task mirrors the docker-compose command (same app, log level, queue, autoscale), which is helpful for running the metrics worker locally outside Docker. Keep this in sync with any future queue or autoscale changes to avoid drift between CLI and compose.backend/dashboard_metrics/tests/test_tasks.py (1)
149-164: Consider renaming unused loop variable.The loop variable
iis not used within the loop body. Following Python convention, consider renaming it to_to indicate it's intentionally unused.🔎 Proposed refactor
- for i in range(5): + for _ in range(5): mock_request = MagicMock()unstract/core/src/unstract/core/metrics/__init__.py (1)
113-115: Uselogging.exceptionto preserve the traceback.When catching exceptions during event creation, using
logger.exception()instead oflogger.error()will include the full traceback, which aids debugging.🔎 Suggested fix
except Exception as e: - logger.error(f"Failed to create metric event: {e}") + logger.exception(f"Failed to create metric event: {e}") return Falseunstract/core/src/unstract/core/metrics/registry.py (1)
42-48: Non-deterministic ordering inget_all_metrics().The function returns
list(_REGISTERED_METRICS)where_REGISTERED_METRICSis a set. While this works, the order of returned metrics may vary between calls. If consistent ordering is needed for API responses or testing, consider sorting the result.🔎 Optional fix for deterministic ordering
def get_all_metrics() -> list[str]: """Get all registered metric names. Returns: List of all registered metric names """ - return list(_REGISTERED_METRICS) + return sorted(_REGISTERED_METRICS)backend/dashboard_metrics/migrations/0004_update_cleanup_periodic_tasks.py (1)
38-57: Clarify naming: task cleans "daily" aggregates on a weekly schedule.The comment says "weekly cleanup task for daily metrics" but the task is named
dashboard_metrics_cleanup_daily. This is technically correct (it cleans data from the daily aggregation table on a weekly schedule), but the naming could cause confusion. Consider renaming todashboard_metrics_cleanup_daily_weeklyor updating the description to clarify it runs weekly to clean daily-aggregated data.unstract/core/src/unstract/core/metrics/types.py (1)
85-96: Consider defensive error handling infrom_dict.If required keys (
org_id,metric_name,metric_value,metric_type) are missing fromdata, aKeyErrorwill be raised. While the caller should ensure valid data, adding a brief check or using.get()with explicit validation could provide clearer error messages during deserialization failures.unstract/core/src/unstract/core/metrics/backends/queue.py (1)
151-156: Uselogger.exceptionfor error logging with traceback.Replace
logger.errorwithlogger.exceptionin the exception handlers to include stack traces, which aids debugging production issues.🔎 Suggested fix
except KombuError as e: - logger.error(f"Failed to queue metric {event.metric_name}: {e}") + logger.exception(f"Failed to queue metric {event.metric_name}: {e}") return False except Exception as e: - logger.error(f"Unexpected error queuing metric: {e}") + logger.exception(f"Unexpected error queuing metric: {e}") return Falsebackend/dashboard_metrics/views.py (1)
335-369: Improve error handling and observability inlive_seriesmetric loop.Catching a blanket
Exceptionfor each metric is reasonable to keep the endpoint partially available, but the current logging (logger.error) loses the traceback and makes debugging failures harder.Switching to
logger.exceptionhere will preserve the stack trace while keeping behavior the same.Proposed logging tweak
- except Exception as e: - logger.error(f"Failed to fetch metric {metric_name}: {e}") + except Exception: + logger.exception("Failed to fetch metric %s", metric_name) errors.append(metric_name)backend/dashboard_metrics/cache.py (1)
38-69: Time‑aware TTL helper is well‑designed but currently unused.
get_time_aware_cache_ttl()correctly distinguishes queries that include the current hour from purely historical ones, which would be useful to avoid over‑caching near‑real‑time data. Right now it isn’t wired intocache_metrics_response, so all endpoints rely on static TTLs.Consider integrating
get_time_aware_cache_ttl()into the decorator (e.g., deriving TTL from a parsedend_dateparam) for summary/series endpoints that query recent data.backend/dashboard_metrics/models.py (3)
14-19: Avoid duplicatingMetricTypethat already exists in core metricsThere’s already a
MetricTypeenum inunstract/core/src/unstract/core/metrics/types.pywith the same values ("counter","histogram"). Having two differentMetricTypedefinitions (oneEnum, oneTextChoices) increases the chance of divergence or confusion.Consider either:
- Reusing the core
MetricTypeas the single source of truth and mapping its values into aTextChoices, or- Renaming this one (e.g.,
DashboardMetricType) to make the separation explicit.
21-37: Manager inheritance can be simplified
DefaultOrganizationManagerMixinalready subclassesmodels.Manager(perbackend/utils/models/organization_mixin.py), so inheriting from both is redundant:class EventMetricsHourlyManager(DefaultOrganizationManagerMixin): ...Same applies to
EventMetricsDailyManagerandEventMetricsMonthlyManager.
113-130: Address Ruff RUF012 on mutable Meta attributes (indexes,constraints)Ruff is flagging
indexesandconstraintsas mutable class attributes (RUF012). In DjangoMetaclasses this is common, but if you want to satisfy the linter and improve type hints you can annotate them asClassVar:from typing import ClassVar from django.db import models class Meta: indexes: ClassVar[list[models.Index]] = [ ... ] constraints: ClassVar[list[models.Constraint]] = [ ... ]You can apply the same pattern to the
Metaclasses of all three models, or alternatively disableRUF012for DjangoMetaif you prefer the standard Django style.Also applies to: 219-236, 325-342
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
⛔ Files ignored due to path filters (1)
backend/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (30)
backend/backend/celery_config.pybackend/backend/settings/base.pybackend/backend/urls_v2.pybackend/dashboard_metrics/__init__.pybackend/dashboard_metrics/apps.pybackend/dashboard_metrics/cache.pybackend/dashboard_metrics/capture.pybackend/dashboard_metrics/migrations/0001_initial.pybackend/dashboard_metrics/migrations/0002_setup_cleanup_periodic_task.pybackend/dashboard_metrics/migrations/0003_add_daily_monthly_tables.pybackend/dashboard_metrics/migrations/0004_update_cleanup_periodic_tasks.pybackend/dashboard_metrics/migrations/__init__.pybackend/dashboard_metrics/models.pybackend/dashboard_metrics/serializers.pybackend/dashboard_metrics/services.pybackend/dashboard_metrics/tasks.pybackend/dashboard_metrics/tests/__init__.pybackend/dashboard_metrics/tests/test_tasks.pybackend/dashboard_metrics/urls.pybackend/dashboard_metrics/views.pybackend/pyproject.tomldocker/docker-compose.yamlunstract/core/src/unstract/core/metrics/__init__.pyunstract/core/src/unstract/core/metrics/backends/__init__.pyunstract/core/src/unstract/core/metrics/backends/base.pyunstract/core/src/unstract/core/metrics/backends/noop.pyunstract/core/src/unstract/core/metrics/backends/queue.pyunstract/core/src/unstract/core/metrics/config.pyunstract/core/src/unstract/core/metrics/registry.pyunstract/core/src/unstract/core/metrics/types.py
🧰 Additional context used
🧬 Code graph analysis (16)
unstract/core/src/unstract/core/metrics/backends/noop.py (3)
unstract/core/src/unstract/core/metrics/types.py (1)
MetricEvent(52-96)unstract/core/src/unstract/core/metrics/backends/base.py (3)
AbstractMetricBackend(8-40)record(15-24)flush(27-32)unstract/core/src/unstract/core/metrics/__init__.py (1)
record(58-119)
unstract/core/src/unstract/core/metrics/backends/base.py (4)
unstract/core/src/unstract/core/metrics/types.py (1)
MetricEvent(52-96)unstract/core/src/unstract/core/metrics/__init__.py (1)
record(58-119)unstract/core/src/unstract/core/metrics/backends/noop.py (2)
record(25-40)flush(42-44)unstract/core/src/unstract/core/metrics/backends/queue.py (3)
record(117-156)flush(200-205)close(207-215)
unstract/core/src/unstract/core/metrics/registry.py (2)
unstract/core/src/unstract/core/metrics/types.py (2)
MetricName(15-32)MetricType(8-12)backend/dashboard_metrics/models.py (1)
MetricType(14-18)
backend/dashboard_metrics/tasks.py (1)
backend/dashboard_metrics/models.py (4)
EventMetricsDaily(145-248)EventMetricsHourly(39-142)EventMetricsMonthly(251-354)MetricType(14-18)
unstract/core/src/unstract/core/metrics/types.py (1)
backend/dashboard_metrics/models.py (1)
MetricType(14-18)
backend/dashboard_metrics/urls.py (1)
backend/dashboard_metrics/views.py (1)
DashboardMetricsViewSet(40-451)
backend/dashboard_metrics/views.py (4)
backend/permissions/permission.py (1)
IsOrganizationMember(17-22)backend/utils/user_context.py (1)
UserContext(8-32)backend/dashboard_metrics/cache.py (1)
cache_metrics_response(88-145)backend/dashboard_metrics/services.py (2)
MetricsQueryService(24-447)get_documents_processed(54-86)
backend/dashboard_metrics/models.py (3)
backend/utils/models/organization_mixin.py (2)
DefaultOrganizationManagerMixin(26-29)DefaultOrganizationMixin(7-23)unstract/core/src/unstract/core/metrics/types.py (1)
MetricType(8-12)backend/dashboard_metrics/serializers.py (1)
Meta(110-124)
backend/dashboard_metrics/capture.py (5)
unstract/core/src/unstract/core/metrics/types.py (2)
MetricName(15-32)MetricType(8-12)unstract/core/src/unstract/core/metrics/__init__.py (1)
record(58-119)unstract/core/src/unstract/core/metrics/backends/base.py (1)
record(15-24)unstract/core/src/unstract/core/metrics/backends/noop.py (1)
record(25-40)unstract/core/src/unstract/core/metrics/backends/queue.py (1)
record(117-156)
unstract/core/src/unstract/core/metrics/backends/queue.py (3)
unstract/core/src/unstract/core/metrics/types.py (1)
MetricEvent(52-96)unstract/core/src/unstract/core/metrics/backends/base.py (4)
AbstractMetricBackend(8-40)close(34-40)record(15-24)flush(27-32)unstract/core/src/unstract/core/metrics/__init__.py (1)
record(58-119)
unstract/core/src/unstract/core/metrics/config.py (3)
unstract/core/src/unstract/core/metrics/backends/base.py (1)
AbstractMetricBackend(8-40)unstract/core/src/unstract/core/metrics/backends/noop.py (1)
NoopBackend(14-44)unstract/core/src/unstract/core/metrics/backends/queue.py (1)
QueueBackend(28-215)
backend/dashboard_metrics/serializers.py (2)
backend/dashboard_metrics/models.py (4)
EventMetricsHourly(39-142)Meta(109-142)Meta(215-248)Meta(321-354)backend/dashboard_metrics/views.py (2)
series(113-189)summary(70-109)
backend/dashboard_metrics/migrations/0001_initial.py (1)
backend/dashboard_metrics/migrations/0003_add_daily_monthly_tables.py (1)
Migration(10-261)
unstract/core/src/unstract/core/metrics/backends/__init__.py (3)
unstract/core/src/unstract/core/metrics/backends/base.py (1)
AbstractMetricBackend(8-40)unstract/core/src/unstract/core/metrics/backends/noop.py (1)
NoopBackend(14-44)unstract/core/src/unstract/core/metrics/backends/queue.py (1)
QueueBackend(28-215)
backend/dashboard_metrics/migrations/0004_update_cleanup_periodic_tasks.py (2)
backend/dashboard_metrics/migrations/0002_setup_cleanup_periodic_task.py (1)
Migration(40-51)backend/dashboard_metrics/migrations/0003_add_daily_monthly_tables.py (1)
Migration(10-261)
backend/dashboard_metrics/cache.py (3)
backend/utils/user_context.py (1)
UserContext(8-32)backend/utils/cache_service.py (1)
clear_cache(41-43)backend/dashboard_metrics/views.py (1)
overview(193-245)
🪛 Ruff (0.14.10)
unstract/core/src/unstract/core/metrics/backends/noop.py
25-25: Unused method argument: event
(ARG002)
unstract/core/src/unstract/core/metrics/__init__.py
39-53: __all__ is not sorted
Apply an isort-style sorting to __all__
(RUF022)
113-113: Do not catch blind exception: Exception
(BLE001)
114-114: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
backend/backend/celery_config.py
42-49: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
56-66: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
69-73: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
unstract/core/src/unstract/core/metrics/registry.py
38-38: Avoid specifying long messages outside the exception class
(TRY003)
backend/dashboard_metrics/migrations/0002_setup_cleanup_periodic_task.py
6-6: Unused function argument: schema_editor
(ARG001)
33-33: Unused function argument: schema_editor
(ARG001)
41-44: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
46-51: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
backend/dashboard_metrics/tasks.py
184-184: Do not catch blind exception: Exception
(BLE001)
257-257: Do not catch blind exception: Exception
(BLE001)
258-258: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
311-311: Do not catch blind exception: Exception
(BLE001)
312-312: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
365-365: Do not catch blind exception: Exception
(BLE001)
366-366: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
backend/dashboard_metrics/tests/test_tasks.py
149-149: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
backend/dashboard_metrics/views.py
47-47: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
48-48: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
50-50: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
51-51: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
52-52: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
60-60: Abstract raise to an inner function
(TRY301)
60-60: Avoid specifying long messages outside the exception class
(TRY003)
65-65: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
66-66: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
66-66: Avoid specifying long messages outside the exception class
(TRY003)
193-193: Unused method argument: request
(ARG002)
359-359: Do not catch blind exception: Exception
(BLE001)
360-360: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
403-403: Unused method argument: request
(ARG002)
436-436: Consider moving this statement to an else block
(TRY300)
437-437: Do not catch blind exception: Exception
(BLE001)
438-438: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
448-448: Consider moving this statement to an else block
(TRY300)
449-449: Do not catch blind exception: Exception
(BLE001)
450-450: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
backend/dashboard_metrics/models.py
113-130: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
131-142: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
219-236: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
237-248: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
325-342: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
343-354: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
backend/dashboard_metrics/capture.py
45-45: Consider moving this statement to an else block
(TRY300)
unstract/core/src/unstract/core/metrics/backends/queue.py
95-96: try-except-pass detected, consider logging the exception
(S110)
95-95: Do not catch blind exception: Exception
(BLE001)
101-101: Consider moving this statement to an else block
(TRY300)
103-103: Do not catch blind exception: Exception
(BLE001)
113-113: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
115-115: Avoid specifying long messages outside the exception class
(TRY003)
149-149: Consider moving this statement to an else block
(TRY300)
152-152: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
154-154: Do not catch blind exception: Exception
(BLE001)
155-155: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
212-212: Do not catch blind exception: Exception
(BLE001)
backend/dashboard_metrics/migrations/0003_add_daily_monthly_tables.py
11-14: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
16-261: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
backend/dashboard_metrics/serializers.py
112-123: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
backend/dashboard_metrics/migrations/0001_initial.py
13-15: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
17-136: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
backend/dashboard_metrics/migrations/0004_update_cleanup_periodic_tasks.py
9-9: Unused function argument: schema_editor
(ARG001)
60-60: Unused function argument: schema_editor
(ARG001)
69-72: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
74-79: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
backend/dashboard_metrics/cache.py
84-84: Probable use of insecure hash functions in hashlib: md5
(S324)
163-163: Consider moving this statement to an else block
(TRY300)
164-164: Do not catch blind exception: Exception
(BLE001)
165-165: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
252-252: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (28)
backend/dashboard_metrics/__init__.py (1)
1-4: LGTM!Clean package initializer with clear documentation describing the Dashboard Metrics app's purpose.
backend/dashboard_metrics/migrations/__init__.py (1)
1-1: LGTM!Standard migrations package initializer following Django conventions.
backend/backend/settings/base.py (1)
345-345: LGTM!The dashboard_metrics app is correctly registered in SHARED_APPS, making it available across all organizations in the multi-tenant setup. This is appropriate for organization-scoped metrics collection and querying.
unstract/core/src/unstract/core/metrics/backends/__init__.py (1)
1-11: LGTM!Clean public API definition for the metrics backends package. The
__all__declaration correctly matches the imported backends, providing clear control over the public interface.backend/dashboard_metrics/apps.py (1)
6-11: LGTM!Standard Django app configuration following Django 4.2+ best practices. The BigAutoField default is appropriate, and the app naming is correct.
backend/dashboard_metrics/tests/__init__.py (1)
1-1: Tests package init is fineDocstring-only
__init__is harmless and documents the test package clearly.unstract/core/src/unstract/core/metrics/backends/base.py (1)
1-40: LGTM! Clean abstract interface design.The abstract base class follows SOLID principles with a minimal, focused interface. The concrete
close()method with a default implementation callingflush()is a good design choice that simplifies backend implementations while allowing customization when needed.unstract/core/src/unstract/core/metrics/backends/noop.py (1)
21-44: LGTM! Appropriate no-op implementation.The NoopBackend correctly implements the abstract interface and provides helpful one-time logging to inform users that metrics are disabled. The
eventparameter inrecord()is required by theAbstractMetricBackendinterface, so the static analysis hint (ARG002) is a false positive.backend/dashboard_metrics/tests/test_tasks.py (1)
1-553: Excellent test coverage!The test suite is comprehensive and well-organized, covering time truncation helpers, batch processing, bulk upsert operations, cleanup tasks, and full integration flows. Good use of fixtures, edge cases, and assertions to verify data integrity across all metrics tables.
backend/dashboard_metrics/migrations/0002_setup_cleanup_periodic_task.py (1)
1-51: LGTM! Migration follows Django conventions.The migration correctly sets up a periodic cleanup task with proper forward and rollback operations. The static analysis hints (ARG001, RUF012) are false positives—the
schema_editorparameter and migration class attributes follow Django's required conventions for data migrations.unstract/core/src/unstract/core/metrics/config.py (1)
1-68: LGTM! Well-designed configuration module.The environment-driven backend selection with singleton caching is clean and robust. The fallback to
NoopBackendwhen metrics are disabled or misconfigured ensures safe defaults, and the logging at each decision point aids troubleshooting. Thereset_backend()function is a good addition for testing scenarios.unstract/core/src/unstract/core/metrics/__init__.py (1)
58-119: Well-structured public API.The
record()function provides a clean entry point with proper validation, enum-to-string conversion, and graceful error handling that returnsFalseon failures rather than propagating exceptions. The design appropriately decouples the public API from backend implementation details.unstract/core/src/unstract/core/metrics/registry.py (1)
1-60: Clean registry implementation.The registry provides efficient O(1) validation via set lookup, clear separation between validation and type retrieval, and appropriate error handling with
ValueErrorfor unknown metrics.backend/backend/celery_config.py (2)
36-53: Good production-ready resilience settings.The broker connection retry configuration with exponential backoff (
interval_step,interval_max), socket keepalive, and worker limits (max_tasks_per_child) will help maintain stability under broker disruptions and prevent memory leaks from long-running workers.
68-73: Task names intask_routesare correctly defined and match registered Celery tasks.The task names (
dashboard_metrics.process_events,dashboard_metrics.cleanup_hourly_data,dashboard_metrics.cleanup_daily_data) match their corresponding@shared_taskdefinitions inbackend/dashboard_metrics/tasks.py. Routing is correctly configured.unstract/core/src/unstract/core/metrics/types.py (1)
8-32: Well-structured metric type definitions.The dual inheritance from
strandEnumenables both type safety and string comparisons. The separation of counters (summed values) and histograms (distributions) is clearly documented and follows established metrics patterns.backend/dashboard_metrics/urls.py (1)
17-33: Clean URL configuration with proper ordering.The URL patterns are well-organized with specific paths before the UUID capture pattern, preventing routing conflicts. Using
format_suffix_patternsenables content negotiation for API responses.backend/dashboard_metrics/migrations/0004_update_cleanup_periodic_tasks.py (1)
60-65: Reverse migration is correct as written.Migration 0002 creates the
dashboard_metrics_cleanup_hourlytask. Migration 0004's forward operation usesupdate_or_createon that existing task (not creating a new one), adds the daily task, and deletes a legacy task. The reverse should return to the state after 0002, which means retaining the hourly task (still managed by 0002's reverse function) and removing only what 0004 added—the daily task. The current implementation is correct.Likely an incorrect or invalid review comment.
unstract/core/src/unstract/core/metrics/backends/queue.py (1)
178-198: Celery protocol v2 format is compatible with the project's Celery version (5.3.4+).The message construction follows the standard kombu protocol v2 format
[args, kwargs, embed], which remains compatible with Celery 5.3.4+ (and 5.5.3 in the workers package). The headers correctly specify the task name and ID for Celery task routing. No compatibility concerns.backend/dashboard_metrics/views.py (1)
402-450: Health checks look sound and side‑effect free.
health,_check_database, and_check_cachedo minimal, safe operations (anexists()call and a short‑lived cache set/get) and return structured status with appropriate HTTP 200/503 responses; this is suitable for liveness/readiness probes.backend/dashboard_metrics/migrations/0003_add_daily_monthly_tables.py (1)
16-260: Schema and indexing for daily/monthly tables look consistent and appropriate.The daily/monthly models mirror the hourly schema (including
labelsdefault, project/tag, organization FK, unique constraints, and GIN/index coverage), which should give predictable semantics and good query performance across all granularities.backend/dashboard_metrics/services.py (2)
383-447:get_all_metrics_summaryaggregates are consistent with individual query methods.The summary method correctly reuses the per‑metric query functions and normalizes potential
NULLsums (e.g., pages and LLM usage) withor 0, producing a clean dict of totals for the live summary endpoint.
230-269: The join logic inget_deployed_api_requestsis correct.
WorkflowExecution.pipeline_idis a UUIDField designed to hold IDs of eitherPipelineorAPIDeploymententities (as evidenced by the conditional logic inbackend/workflow_manager/internal_serializers.py). The subquery filteringpipeline_id__in=Subquery(APIDeployment.objects.filter(...).values("id"))correctly extracts deployed API request counts by matching executions where the pipeline_id refers to an API deployment.backend/dashboard_metrics/migrations/0001_initial.py (1)
17-135: Hourly metrics migration aligns with model definition and indexing strategy.Field definitions, indexes, and the
unique_hourly_metricconstraint all match theEventMetricsHourlymodel description, so the initial schema for hourly metrics looks correct and production‑ready.backend/dashboard_metrics/serializers.py (1)
11-124: Query and response serializers are consistent with view usage and enforce sane bounds.
MetricsQuerySerializer.validate()correctly defaults the date range, enforcesstart_date <= end_date, and caps queries at 90 days, while the nested series/summary serializers match the shapes constructed inDashboardMetricsViewSet. TheEventMetricsHourlySerializeris read‑only and exposes the expected fields.backend/dashboard_metrics/cache.py (2)
71-145: Caching decorator behavior and keying strategy look solid.The decorator scopes cache entries by organization, endpoint name, and a stable hash of sorted query params, and only caches 200 responses. It also cleanly bypasses caching when no organization context is present, which is important for correctness with per‑org data.
148-167: No changes needed – code is correct.
CacheServiceis a class inbackend/utils/cache_service.py, andclear_cacheis a static method of that class. The import and call ininvalidate_metrics_cacheare correct. The implementation will properly invalidate cached metrics for the organization.Likely an incorrect or invalid review comment.
backend/dashboard_metrics/models.py (1)
57-85: ConfirmBaseModeldoesn’t defineid, and thatmetric_count’s default is intentionalTwo small checks:
- If
BaseModelalready defines anidfield, redefiningidhere as a primary key can cause confusion or conflicts. IfBaseModelonly adds timestamps and noid, you’re fine; otherwise, consider centralizing the PK definition in one place.metric_countdefaults to1. That makes sense if every row is always created from at least one event, but if you ever create rows with explicitmetric_countor rely on DB-side aggregation, this default could overcount silently.Please double-check that
BaseModeldoes not defineid, and that you never rely on the defaultmetric_countfor anything other than “one event just created this row”.
| class MetricsCapture: | ||
| """Helper class for capturing metrics at integration points.""" | ||
|
|
||
| @staticmethod | ||
| def record_api_request( | ||
| org_id: str, | ||
| api_name: str | None = None, | ||
| project: str = "default", | ||
| labels: dict[str, Any] | None = None, | ||
| ) -> bool: | ||
| """Record an API deployment request. | ||
| Args: | ||
| org_id: Organization ID | ||
| api_name: Name of the API deployment | ||
| project: Project identifier | ||
| labels: Additional labels | ||
| Returns: | ||
| True if recorded successfully | ||
| """ | ||
| if not METRICS_ENABLED: | ||
| return False | ||
|
|
||
| record, MetricName, _ = _get_metrics_module() | ||
| if record is None: | ||
| return False | ||
|
|
||
| metric_labels = labels or {} | ||
| if api_name: | ||
| metric_labels["api_name"] = api_name | ||
|
|
||
| return record( | ||
| MetricName.DEPLOYED_API_REQUESTS, | ||
| org_id=org_id, | ||
| value=1, | ||
| project=project, | ||
| labels=metric_labels, | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def record_etl_execution( | ||
| org_id: str, | ||
| pipeline_name: str | None = None, | ||
| project: str = "default", | ||
| labels: dict[str, Any] | None = None, | ||
| ) -> bool: | ||
| """Record an ETL pipeline execution. | ||
| Args: | ||
| org_id: Organization ID | ||
| pipeline_name: Name of the pipeline | ||
| project: Project identifier | ||
| labels: Additional labels | ||
| Returns: | ||
| True if recorded successfully | ||
| """ | ||
| if not METRICS_ENABLED: | ||
| return False | ||
|
|
||
| record, MetricName, _ = _get_metrics_module() | ||
| if record is None: | ||
| return False | ||
|
|
||
| metric_labels = labels or {} | ||
| if pipeline_name: | ||
| metric_labels["pipeline_name"] = pipeline_name | ||
|
|
||
| return record( | ||
| MetricName.ETL_PIPELINE_EXECUTIONS, | ||
| org_id=org_id, | ||
| value=1, | ||
| project=project, | ||
| labels=metric_labels, | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def record_document_processed( | ||
| org_id: str, | ||
| pages: int = 1, | ||
| file_type: str | None = None, | ||
| project: str = "default", | ||
| labels: dict[str, Any] | None = None, | ||
| ) -> bool: | ||
| """Record document and page processing. | ||
| Args: | ||
| org_id: Organization ID | ||
| pages: Number of pages processed | ||
| file_type: File type (pdf, docx, etc.) | ||
| project: Project identifier | ||
| labels: Additional labels | ||
| Returns: | ||
| True if recorded successfully | ||
| """ | ||
| if not METRICS_ENABLED: | ||
| return False | ||
|
|
||
| record, MetricName, _ = _get_metrics_module() | ||
| if record is None: | ||
| return False | ||
|
|
||
| metric_labels = labels or {} | ||
| if file_type: | ||
| metric_labels["file_type"] = file_type | ||
|
|
||
| # Record document | ||
| doc_result = record( | ||
| MetricName.DOCUMENTS_PROCESSED, | ||
| org_id=org_id, | ||
| value=1, | ||
| project=project, | ||
| labels=metric_labels, | ||
| ) | ||
|
|
||
| # Record pages | ||
| pages_result = record( | ||
| MetricName.PAGES_PROCESSED, | ||
| org_id=org_id, | ||
| value=pages, | ||
| project=project, | ||
| labels=metric_labels, | ||
| ) | ||
|
|
||
| return doc_result and pages_result | ||
|
|
||
| @staticmethod | ||
| def record_llm_call( | ||
| org_id: str, | ||
| model: str | None = None, | ||
| cost: float = 0.0, | ||
| tokens: int = 0, | ||
| project: str = "default", | ||
| labels: dict[str, Any] | None = None, | ||
| ) -> bool: | ||
| """Record an LLM API call. | ||
| Args: | ||
| org_id: Organization ID | ||
| model: LLM model name | ||
| cost: Cost in dollars | ||
| tokens: Total tokens used | ||
| project: Project identifier | ||
| labels: Additional labels | ||
| Returns: | ||
| True if recorded successfully | ||
| """ | ||
| if not METRICS_ENABLED: | ||
| return False | ||
|
|
||
| record, MetricName, _ = _get_metrics_module() | ||
| if record is None: | ||
| return False | ||
|
|
||
| metric_labels = labels or {} | ||
| if model: | ||
| metric_labels["model"] = model | ||
| if tokens: | ||
| metric_labels["tokens"] = str(tokens) | ||
|
|
||
| # Record the call | ||
| call_result = record( | ||
| MetricName.LLM_CALLS, | ||
| org_id=org_id, | ||
| value=1, | ||
| project=project, | ||
| labels=metric_labels, | ||
| ) | ||
|
|
||
| # Record usage cost if provided | ||
| if cost > 0: | ||
| record( | ||
| MetricName.LLM_USAGE, | ||
| org_id=org_id, | ||
| value=cost, | ||
| project=project, | ||
| labels=metric_labels, | ||
| ) | ||
|
|
||
| return call_result | ||
|
|
||
| @staticmethod | ||
| def record_challenge( | ||
| org_id: str, | ||
| challenge_type: str | None = None, | ||
| project: str = "default", | ||
| labels: dict[str, Any] | None = None, | ||
| ) -> bool: | ||
| """Record an LLM challenge call. | ||
| Args: | ||
| org_id: Organization ID | ||
| challenge_type: Type of challenge | ||
| project: Project identifier | ||
| labels: Additional labels | ||
| Returns: | ||
| True if recorded successfully | ||
| """ | ||
| if not METRICS_ENABLED: | ||
| return False | ||
|
|
||
| record, MetricName, _ = _get_metrics_module() | ||
| if record is None: | ||
| return False | ||
|
|
||
| metric_labels = labels or {} | ||
| if challenge_type: | ||
| metric_labels["type"] = challenge_type | ||
|
|
||
| return record( | ||
| MetricName.CHALLENGES, | ||
| org_id=org_id, | ||
| value=1, | ||
| project=project, | ||
| labels=metric_labels, | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def record_summarization( | ||
| org_id: str, | ||
| project: str = "default", | ||
| labels: dict[str, Any] | None = None, | ||
| ) -> bool: | ||
| """Record a summarization call. | ||
| Args: | ||
| org_id: Organization ID | ||
| project: Project identifier | ||
| labels: Additional labels | ||
| Returns: | ||
| True if recorded successfully | ||
| """ | ||
| if not METRICS_ENABLED: | ||
| return False | ||
|
|
||
| record, MetricName, _ = _get_metrics_module() | ||
| if record is None: | ||
| return False | ||
|
|
||
| return record( | ||
| MetricName.SUMMARIZATION_CALLS, | ||
| org_id=org_id, | ||
| value=1, | ||
| project=project, | ||
| labels=labels, | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def record_prompt_execution( | ||
| org_id: str, | ||
| prompt_name: str | None = None, | ||
| project: str = "default", | ||
| labels: dict[str, Any] | None = None, | ||
| ) -> bool: | ||
| """Record a prompt studio execution. | ||
| Args: | ||
| org_id: Organization ID | ||
| prompt_name: Name of the prompt | ||
| project: Project identifier | ||
| labels: Additional labels | ||
| Returns: | ||
| True if recorded successfully | ||
| """ | ||
| if not METRICS_ENABLED: | ||
| return False | ||
|
|
||
| record, MetricName, _ = _get_metrics_module() | ||
| if record is None: | ||
| return False | ||
|
|
||
| metric_labels = labels or {} | ||
| if prompt_name: | ||
| metric_labels["prompt_name"] = prompt_name | ||
|
|
||
| return record( | ||
| MetricName.PROMPT_EXECUTIONS, | ||
| org_id=org_id, | ||
| value=1, | ||
| project=project, | ||
| labels=metric_labels, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incompatible calls to core record() – current signature will raise at runtime.
All MetricsCapture methods call record() with MetricName as the first positional argument and keyword arguments like org_id= and value=, but the provided core signature is:
def record(
org_id: str,
metric_name: str | MetricName,
metric_value: int | float,
labels: dict[str, str] | None = None,
project: str | None = None,
tag: str | None = None,
) -> bool:
...With the current calls, Python will bind the first positional (MetricName.*) to org_id and then also receive org_id= as a keyword, resulting in TypeError: record() got multiple values for argument 'org_id' (and value is not a valid keyword). This means metrics recording will fail at runtime and can break any call site that uses MetricsCapture.
Proposed fix pattern (apply to all `record_*` methods)
Change each call from the current pattern:
return record(
MetricName.DEPLOYED_API_REQUESTS,
org_id=org_id,
value=1,
project=project,
labels=metric_labels,
)to using the correct parameter names and order, for example:
- return record(
- MetricName.DEPLOYED_API_REQUESTS,
- org_id=org_id,
- value=1,
- project=project,
- labels=metric_labels,
- )
+ return record(
+ org_id=org_id,
+ metric_name=MetricName.DEPLOYED_API_REQUESTS,
+ metric_value=1,
+ labels=metric_labels,
+ project=project,
+ )Apply the same pattern to:
record_etl_execution(MetricName.ETL_PIPELINE_EXECUTIONS)record_document_processed(bothDOCUMENTS_PROCESSEDandPAGES_PROCESSED)record_llm_call(LLM_CALLSandLLM_USAGE)record_challenge(CHALLENGES)record_summarization(SUMMARIZATION_CALLS)record_prompt_execution(PROMPT_EXECUTIONS)
🤖 Prompt for AI Agents
In @backend/dashboard_metrics/capture.py around lines 51-338, MetricsCapture
methods call record() with the metric enum as the first positional arg and
keyword names like org_id= and value=, but core record signature expects org_id
first, metric_name second and metric_value third, so change every call in
record_api_request, record_etl_execution, record_document_processed (both
DOCUMENTS_PROCESSED and PAGES_PROCESSED), record_llm_call (LLM_CALLS and
LLM_USAGE), record_challenge, record_summarization, and record_prompt_execution
to match the core signature: call record(org_id, MetricName.<X>, <metric_value>,
labels=metric_labels, project=project, tag=None) (use 1 for single-count events,
pages for pages, cost for LLM_USAGE as float), and remove the invalid value=
keyword and duplicate org_id keyword usage.
| constraints = [ | ||
| models.UniqueConstraint( | ||
| fields=[ | ||
| "organization", | ||
| "timestamp", | ||
| "metric_name", | ||
| "project", | ||
| "tag", | ||
| ], | ||
| name="unique_hourly_metric", | ||
| ) | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "models.py" -path "*/dashboard_metrics/*" | head -20Repository: Zipstack/unstract
Length of output: 98
🏁 Script executed:
# Read the models.py file to see the structure and check lines around 131-142, 237-248, 343-354
wc -l backend/dashboard_metrics/models.pyRepository: Zipstack/unstract
Length of output: 100
🏁 Script executed:
# Read the specific sections mentioned in the review
echo "=== Lines 131-142 ===" && sed -n '131,142p' backend/dashboard_metrics/models.py
echo ""
echo "=== Lines 237-248 ===" && sed -n '237,248p' backend/dashboard_metrics/models.py
echo ""
echo "=== Lines 343-354 ===" && sed -n '343,354p' backend/dashboard_metrics/models.pyRepository: Zipstack/unstract
Length of output: 1127
🏁 Script executed:
# Let's get more context around each section to understand the model definitions
echo "=== Context for first model (lines 120-160) ===" && sed -n '120,160p' backend/dashboard_metrics/models.py
echo ""
echo "=== Context for second model (lines 225-265) ===" && sed -n '225,265p' backend/dashboard_metrics/models.py
echo ""
echo "=== Context for third model (lines 330-370) ===" && sed -n '330,370p' backend/dashboard_metrics/models.pyRepository: Zipstack/unstract
Length of output: 3810
🏁 Script executed:
# Search for the tag field definition across the models to confirm it's nullable
rg "tag\s*=" backend/dashboard_metrics/models.py -A 2 -B 2 | head -40Repository: Zipstack/unstract
Length of output: 459
🏁 Script executed:
# Get a better view of the entire models.py structure to find class definitions and tag fields
cat -n backend/dashboard_metrics/models.py | grep -E "(^[[:space:]]*[0-9]+[[:space:]]*(class|tag|null))" | head -50Repository: Zipstack/unstract
Length of output: 990
🏁 Script executed:
# Read the whole file to understand the structure
head -400 backend/dashboard_metrics/models.pyRepository: Zipstack/unstract
Length of output: 11051
🏁 Script executed:
# Search for tag field definition with null parameter
rg "tag\s*=\s*models\." backend/dashboard_metrics/models.py -A 1 | head -20Repository: Zipstack/unstract
Length of output: 219
🏁 Script executed:
# Search for any field named tag and check all its properties
ast-grep --pattern 'tag = models.$_($$$)' backend/dashboard_metrics/models.pyRepository: Zipstack/unstract
Length of output: 1226
🏁 Script executed:
# Read the full file content
python3 << 'EOF'
with open('backend/dashboard_metrics/models.py', 'r') as f:
lines = f.readlines()
# Print lines around the constraint locations
print("=== Lines 131-142 ===")
for i in range(130, 142):
print(f"{i+1:4d}: {lines[i]}", end='')
print("\n\n=== Lines 237-248 ===")
for i in range(236, 248):
print(f"{i+1:4d}: {lines[i]}", end='')
print("\n\n=== Lines 343-354 ===")
for i in range(342, 354):
print(f"{i+1:4d}: {lines[i]}", end='')
# Search for tag field definition
print("\n\n=== Searching for tag field definition ===")
for i, line in enumerate(lines):
if 'tag' in line.lower() and 'models.' in line and '=' in line:
print(f"{i+1:4d}: {line}", end='')
if i+1 < len(lines):
print(f"{i+2:4d}: {lines[i+1]}", end='')
EOFRepository: Zipstack/unstract
Length of output: 1579
🏁 Script executed:
ls -la backend/dashboard_metrics/models.pyRepository: Zipstack/unstract
Length of output: 146
🏁 Script executed:
# Read the entire models.py file
cat backend/dashboard_metrics/models.pyRepository: Zipstack/unstract
Length of output: 11051
🏁 Script executed:
find . -name "models.py" -type f | grep dashboardRepository: Zipstack/unstract
Length of output: 98
🏁 Script executed:
find . -name "models.py" -type f | head -20Repository: Zipstack/unstract
Length of output: 927
🏁 Script executed:
git ls-files | grep -i dashboard | grep -i modelsRepository: Zipstack/unstract
Length of output: 96
🏁 Script executed:
git ls-files | grep models.pyRepository: Zipstack/unstract
Length of output: 1871
Nullable tag in unique constraint allows duplicate NULL rows in PostgreSQL
The unique constraints in all three metric models include the nullable tag field. In PostgreSQL, NULL values are considered distinct in UNIQUE constraints, allowing multiple rows with identical (organization, timestamp/date/month, metric_name, project) values when tag is NULL.
If you expect exactly one row per combination when no tag is specified, the current constraint will not enforce this.
Solutions:
- Make
tagnon-nullable with a default value (e.g.,"default"or"") - Use a partial unique index with
WHERE tag IS NOT NULLplus application-level checks for NULL cases - Use PostgreSQL's
NULLS NOT DISTINCToption (Django 4.2+) in the UniqueConstraint
Affected constraints:
# EventMetricsHourly (lines 131-142)
models.UniqueConstraint(
fields=["organization", "timestamp", "metric_name", "project", "tag"],
name="unique_hourly_metric",
)
# EventMetricsDaily (lines 237-248)
models.UniqueConstraint(
fields=["organization", "date", "metric_name", "project", "tag"],
name="unique_daily_metric",
)
# EventMetricsMonthly (lines 343-354)
models.UniqueConstraint(
fields=["organization", "month", "metric_name", "project", "tag"],
name="unique_monthly_metric",
)🧰 Tools
🪛 Ruff (0.14.10)
131-142: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
| def _bulk_upsert_hourly(aggregations: dict) -> tuple[int, int]: | ||
| """Bulk upsert hourly aggregations. | ||
| Args: | ||
| aggregations: Dict of aggregated metric data keyed by | ||
| (org_id, hour_ts_str, metric_name, project, tag) | ||
| Returns: | ||
| Tuple of (created_count, updated_count) | ||
| """ | ||
| created, updated = 0, 0 | ||
| with transaction.atomic(): | ||
| for key, agg in aggregations.items(): | ||
| org_id, hour_ts_str, metric_name, project, tag = key | ||
| hour_ts = datetime.fromisoformat(hour_ts_str) | ||
|
|
||
| try: | ||
| obj, was_created = ( | ||
| EventMetricsHourly.objects.select_for_update().get_or_create( | ||
| organization_id=org_id, | ||
| timestamp=hour_ts, | ||
| metric_name=metric_name, | ||
| project=project, | ||
| tag=tag, | ||
| defaults={ | ||
| "metric_type": agg["metric_type"], | ||
| "metric_value": agg["value"], | ||
| "metric_count": agg["count"], | ||
| "labels": agg["labels"], | ||
| }, | ||
| ) | ||
| ) | ||
| if was_created: | ||
| created += 1 | ||
| else: | ||
| obj.metric_value = F("metric_value") + agg["value"] | ||
| obj.metric_count = F("metric_count") + agg["count"] | ||
| if agg["labels"]: | ||
| obj.labels = {**obj.labels, **agg["labels"]} | ||
| obj.save( | ||
| update_fields=[ | ||
| "metric_value", | ||
| "metric_count", | ||
| "labels", | ||
| "modified_at", | ||
| ] | ||
| ) | ||
| updated += 1 | ||
| except Exception as e: | ||
| logger.error(f"Error upserting hourly metric {key}: {e}") | ||
|
|
||
| return created, updated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DB errors are swallowed inside transactions, defeating Celery autoretry and risking silent data loss
In _bulk_upsert_hourly/_daily/_monthly and in the cleanup tasks:
- Each function wraps DB work in
transaction.atomic()but also catches a broadExceptioninside the block and only logs it. - For the Celery tasks (
process_dashboard_metric_events,cleanup_hourly_metrics,cleanup_daily_metrics) you’ve configuredautoretry_for=(DatabaseError, OperationalError), but those DB exceptions are caught and converted into logs/return dicts, so Celery never sees them and won’t retry. - Catching DB errors inside a single
atomic()block without callingtransaction.set_rollback(True)or re‑raising can also leave the transaction marked for rollback and cause subsequent operations in the same block to fail, leading to partial or fully rolled-back writes while the task still reports success.
This combination can make metrics silently incomplete under DB issues (locks, timeouts, transient failures) and is at odds with the production-readiness goal.
I’d recommend:
-
In the upsert helpers: let
DatabaseError/OperationalErrorbubble up so Celery can apply its retry/backoff policy. If you want per-key resilience, move try/except outside theatomic()block or drastically narrow the except clause and re‑raise DB errors. -
In the cleanup tasks: don’t catch
DatabaseError/OperationalErrorat all (or catch, log withlogger.exception(...), thenraise) soautoretry_foractually takes effect. Keep a separateexcept Exceptionhandler only for truly unexpected non-DB errors if you still want to return a failure dict.
Example pattern for the hourly upsert (same idea applies to daily/monthly and cleanup):
with transaction.atomic():
for key, agg in aggregations.items():
org_id, hour_ts_str, metric_name, project, tag = key
hour_ts = datetime.fromisoformat(hour_ts_str)
obj, was_created = (
EventMetricsHourly.objects.select_for_update().get_or_create(
organization_id=org_id,
timestamp=hour_ts,
metric_name=metric_name,
project=project,
tag=tag,
defaults={
"metric_type": agg["metric_type"],
"metric_value": agg["value"],
"metric_count": agg["count"],
"labels": agg["labels"],
},
)
)
# same update logic, no broad except hereand for cleanup:
try:
deleted_count, _ = EventMetricsHourly.objects.filter(
timestamp__lt=cutoff
).delete()
except (DatabaseError, OperationalError):
logger.exception("Error during hourly cleanup")
raise
except Exception:
logger.exception("Unexpected error during hourly cleanup")
return {"success": False, "retention_days": retention_days}This keeps the resilience you want while letting Celery handle transient DB failures correctly.
Also applies to: 263-315, 317-369, 371-414, 424-458
🧰 Tools
🪛 Ruff (0.14.10)
257-257: Do not catch blind exception: Exception
(BLE001)
258-258: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
| metrics_list = DashboardMetricsViewSet.as_view({"get": "list"}) | ||
| metrics_detail = DashboardMetricsViewSet.as_view({"get": "retrieve"}) | ||
| metrics_summary = DashboardMetricsViewSet.as_view({"get": "summary"}) | ||
| metrics_series = DashboardMetricsViewSet.as_view({"get": "series"}) | ||
| metrics_overview = DashboardMetricsViewSet.as_view({"get": "overview"}) | ||
| metrics_live_summary = DashboardMetricsViewSet.as_view({"get": "live_summary"}) | ||
| metrics_live_series = DashboardMetricsViewSet.as_view({"get": "live_series"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing health endpoint mapping.
The PR description mentions a health endpoint (GET /api/v2/metrics/health/) and the DashboardMetricsViewSet has a health action (per the relevant code snippets), but it's not wired in the URL patterns. Add the health endpoint if it should be exposed.
🔎 Add health endpoint
metrics_live_summary = DashboardMetricsViewSet.as_view({"get": "live_summary"})
metrics_live_series = DashboardMetricsViewSet.as_view({"get": "live_series"})
+metrics_health = DashboardMetricsViewSet.as_view({"get": "health"})
urlpatterns = format_suffix_patterns(
[
# Main list endpoint
path("", metrics_list, name="metrics-list"),
# Summary statistics
path("summary/", metrics_summary, name="metrics-summary"),
# Time series data
path("series/", metrics_series, name="metrics-series"),
# Quick overview (last 7 days)
path("overview/", metrics_overview, name="metrics-overview"),
# Live data from source tables
path("live-summary/", metrics_live_summary, name="metrics-live-summary"),
path("live-series/", metrics_live_series, name="metrics-live-series"),
+ # Health check
+ path("health/", metrics_health, name="metrics-health"),
# Individual metric detail
path("<uuid:pk>/", metrics_detail, name="metrics-detail"),
]
)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In @backend/dashboard_metrics/urls.py around lines 9-15, The URL configuration
is missing a mapping for the DashboardMetricsViewSet.health action; add a new
view mapping similar to the others (e.g., create metrics_health =
DashboardMetricsViewSet.as_view({"get": "health"})) and include it in the URL
patterns so GET /api/v2/metrics/health/ is exposed; update any
import/urlpatterns list references where other metrics_* views are registered to
include metrics_health.
| organization = UserContext.get_organization() | ||
| org_id = str(organization.id) | ||
|
|
||
| summary = MetricsQueryService.get_all_metrics_summary( | ||
| org_id, | ||
| params["start_date"], | ||
| params["end_date"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Guard against missing organization context in live_summary / live_series.
Both live_summary and live_series assume UserContext.get_organization() returns a valid object and immediately access .id. If the organization context is not set or lookup fails, this will raise an AttributeError and return a 500 instead of a clean 4xx error, unlike get_queryset() which explicitly handles the no‑org case.
Consider mirroring the get_queryset() pattern here (log + PermissionDenied) before using organization.id.
Proposed fix (pattern for both live endpoints)
@@
- organization = UserContext.get_organization()
- org_id = str(organization.id)
+ organization = UserContext.get_organization()
+ if not organization:
+ logger.warning("No organization context for live metrics request")
+ raise PermissionDenied("No organization context")
+
+ org_id = str(organization.id)
@@
- organization = UserContext.get_organization()
- org_id = str(organization.id)
+ organization = UserContext.get_organization()
+ if not organization:
+ logger.warning("No organization context for live metrics request")
+ raise PermissionDenied("No organization context")
+
+ org_id = str(organization.id)Also applies to: 310-312
🤖 Prompt for AI Agents
In @backend/dashboard_metrics/views.py around lines 267-273, Both live_summary
and live_series call UserContext.get_organization() and access .id directly; add
the same guard used in get_queryset(): if organization is None (or falsy) log an
explanatory message and raise PermissionDenied before accessing organization.id,
then pass org_id to MetricsQueryService.get_all_metrics_summary /
get_all_metrics_series; apply the same check for the other occurrence around
lines referenced (the block using UserContext.get_organization() /
organization.id at ~310-312).
| def _get_connection(self) -> Connection: | ||
| """Get or create a connection to the broker with retry logic.""" | ||
| if self._connection is not None and self._connection.connected: | ||
| return self._connection | ||
| return self._get_connection_with_retry() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Connection connected property may return stale state.
The _connection.connected check at line 65 may return True even if the broker has disconnected (e.g., broker restart). Consider wrapping the connection reuse in a try-except that falls back to _get_connection_with_retry() on first publish failure, or use Kombu's ensure_connection() pattern.
🔎 Suggested improvement
def _get_connection(self) -> Connection:
"""Get or create a connection to the broker with retry logic."""
- if self._connection is not None and self._connection.connected:
- return self._connection
- return self._get_connection_with_retry()
+ if self._connection is not None:
+ try:
+ # Verify connection is actually usable
+ self._connection.ensure_connection(max_retries=1)
+ return self._connection
+ except Exception:
+ pass # Fall through to reconnect
+ return self._get_connection_with_retry()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _get_connection(self) -> Connection: | |
| """Get or create a connection to the broker with retry logic.""" | |
| if self._connection is not None and self._connection.connected: | |
| return self._connection | |
| return self._get_connection_with_retry() | |
| def _get_connection(self) -> Connection: | |
| """Get or create a connection to the broker with retry logic.""" | |
| if self._connection is not None: | |
| try: | |
| # Verify connection is actually usable | |
| self._connection.ensure_connection(max_retries=1) | |
| return self._connection | |
| except Exception: | |
| pass # Fall through to reconnect | |
| return self._get_connection_with_retry() |
| def record(self, event: MetricEvent) -> bool: | ||
| """Send a metric event as a Celery task. | ||
| Args: | ||
| event: The MetricEvent to queue | ||
| Returns: | ||
| True if the event was queued successfully, False otherwise | ||
| """ | ||
| try: | ||
| event_data = self._create_event_data(event) | ||
| task_message = self._create_celery_message(event_data) | ||
| conn = self._get_connection() | ||
|
|
||
| with conn.Producer() as producer: | ||
| producer.publish( | ||
| task_message, | ||
| exchange=self.exchange, | ||
| routing_key=self.queue_name, | ||
| declare=[self.queue], | ||
| delivery_mode=2, # Persistent | ||
| content_type="application/json", | ||
| content_encoding="utf-8", | ||
| headers={ | ||
| "task": self.task_name, | ||
| "id": str(uuid.uuid4()), | ||
| }, | ||
| ) | ||
|
|
||
| logger.debug( | ||
| f"Queued metric task: {event.metric_name} for org {event.org_id}" | ||
| ) | ||
| return True | ||
|
|
||
| except KombuError as e: | ||
| logger.error(f"Failed to queue metric {event.metric_name}: {e}") | ||
| return False | ||
| except Exception as e: | ||
| logger.error(f"Unexpected error queuing metric: {e}") | ||
| return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
Kombu Connection thread-safe publishing messages concurrent
💡 Result:
Short answer: Kombu (and the underlying py-amqp transport) is not safe for concurrent publishing from multiple threads using the same Connection/Channel objects. Share a Connection only if you ensure each thread uses its own Channel (or a thread-safe wrapper/pool) or protect access with a lock; better: give each thread its own Connection/Channel or use a connection pool or a thread-safe transport implementation. [1][2][3][4]
Recommendations
- Do not share Channel objects across threads — channels are channel-bound and can interleave AMQP frames if used concurrently. Use one channel per thread. [3][4]
- Prefer one producer Connection/Channel per thread, or use a Connection/Channel pool to reuse resources safely. [1][4]
- If you must share a Connection, serialize all operations on it with a threading.Lock (but this reduces concurrency). [3]
- Consider using a project that provides a thread-safe kombu transport/wrapper (e.g., kombu-pyamqp-threadsafe) if you need shared connections with multithreading. [1]
Sources
- kombu-pyamqp-threadsafe project (thread-safe wrapper for kombu/pyamqp). [1]
- StackOverflow discussion on publishing from consumer/threads with Kombu (issues when mixing consuming/publishing on same thread). [2]
- Kombu/py-amqp mailing-list / discussion about thread-safety (channels bound to threads; beware interleaved writes). [3]
- rabbitpy documentation on multi-threaded use (recommend one channel per thread). [4]
🏁 Script executed:
# Find and examine the _get_connection method and class initialization
cd unstract/core/src/unstract/core/metrics/backends && grep -n "_get_connection\|__init__\|_connection" queue.py | head -20Repository: Zipstack/unstract
Length of output: 828
🏁 Script executed:
cd unstract/core/src/unstract/core/metrics/backends && sed -n '35,110p' queue.pyRepository: Zipstack/unstract
Length of output: 2738
Kombu Connection is not thread-safe—add locking or use connection pooling.
The code caches self._connection as an instance variable without synchronization. Concurrent calls to record() from multiple threads will race on self._connection state (the check on line 65 is not atomic), and Kombu does not support concurrent publishing from the same Connection. For multi-threaded metrics recording, protect the connection with a threading.Lock, use thread-local connections, or implement a connection pool.
🧰 Tools
🪛 Ruff (0.14.10)
149-149: Consider moving this statement to an else block
(TRY300)
152-152: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
154-154: Do not catch blind exception: Exception
(BLE001)
155-155: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
| # Mapping of metric names to their types | ||
| METRIC_TYPE_MAP: dict[str, MetricType] = { | ||
| # Counters | ||
| MetricName.DOCUMENTS_PROCESSED.value: MetricType.COUNTER, | ||
| MetricName.PAGES_PROCESSED.value: MetricType.COUNTER, | ||
| MetricName.PROMPT_EXECUTIONS.value: MetricType.COUNTER, | ||
| MetricName.LLM_CALLS.value: MetricType.COUNTER, | ||
| MetricName.CHALLENGES.value: MetricType.COUNTER, | ||
| MetricName.SUMMARIZATION_CALLS.value: MetricType.COUNTER, | ||
| # Histograms | ||
| MetricName.DEPLOYED_API_REQUESTS.value: MetricType.HISTOGRAM, | ||
| MetricName.ETL_PIPELINE_EXECUTIONS.value: MetricType.HISTOGRAM, | ||
| MetricName.LLM_USAGE.value: MetricType.HISTOGRAM, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if tests exist to verify METRIC_TYPE_MAP completeness
rg -n 'METRIC_TYPE_MAP' --type=py -g '**/test*'Repository: Zipstack/unstract
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Find MetricName enum definition
rg -n 'class MetricName|enum MetricName' --type=py -A 20Repository: Zipstack/unstract
Length of output: 1805
🏁 Script executed:
#!/bin/bash
# Find get_metric_type() implementation
rg -n 'def get_metric_type' --type=py -A 10Repository: Zipstack/unstract
Length of output: 933
🏁 Script executed:
#!/bin/bash
# Check the full types.py file to understand the structure
cat -n unstract/core/src/unstract/core/metrics/types.pyRepository: Zipstack/unstract
Length of output: 3933
🏁 Script executed:
#!/bin/bash
# Get the full get_metric_type() implementation
cat -n unstract/core/src/unstract/core/metrics/registry.py | head -50Repository: Zipstack/unstract
Length of output: 1566
🏁 Script executed:
#!/bin/bash
# Search for test files in metrics module
fd '.*test.*\.py$' --type f | grep -i metricRepository: Zipstack/unstract
Length of output: 106
🏁 Script executed:
#!/bin/bash
# Search for any validation or assertions related to METRIC_TYPE_MAP
rg -n 'assert.*METRIC_TYPE_MAP|METRIC_TYPE_MAP.*in|in.*METRIC_TYPE_MAP' --type=pyRepository: Zipstack/unstract
Length of output: 200
METRIC_TYPE_MAP must stay in sync with MetricName.
While validate_metric() checks if a metric exists in the enum, it does not ensure the metric is in METRIC_TYPE_MAP. If a new metric is added to MetricName without updating METRIC_TYPE_MAP, get_metric_type() will raise a KeyError when accessing the map on line 39 of registry.py. Add a unit test to verify all MetricName values have corresponding entries in METRIC_TYPE_MAP.
🤖 Prompt for AI Agents
In @unstract/core/src/unstract/core/metrics/types.py around lines 35-48, Add a
unit test that ensures every value of the MetricName enum has a corresponding
key in METRIC_TYPE_MAP to prevent KeyError in get_metric_type(); specifically,
iterate over MetricName (or MetricName.__members__.values()/MetricName) and
assert that each .value is present in METRIC_TYPE_MAP, referencing
METRIC_TYPE_MAP and MetricName; this complements validate_metric() and prevents
missing entries that would cause get_metric_type() to fail.
chandrasekharan-zipstack
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes mostly LGTM. Left some minor comments.
One main question I have is around cache invalidation of the cached, aggregated data - does it happen only on the respective TTLs? I hope possibly receiving stale data for that duration is okay in this usecase
| worker_prefetch_multiplier = 4 | ||
| worker_max_tasks_per_child = 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@athul-rs any reason why this or other config was added? I hope it doesn't have any side effects on other workers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@athul-rs do these tests run on every PR as part of the existing unit tests? If not, please ensure that its integrated there as well otherwise your effort on these tests won't be realised
| CACHE_TTL_CURRENT_HOUR = 30 # 30 seconds for current hour (updating frequently) | ||
| CACHE_TTL_HISTORICAL = 8 * 60 * 60 # 8 hours for historical data (stable) | ||
|
|
||
| # Legacy TTLs for compatibility with existing endpoints |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@athul-rs what existing endpoints do we have that require such legacy TTLs? Check and correct this comment
|
|
||
| # Cache TTL values (in seconds) | ||
| # Time-aware TTLs per documentation | ||
| CACHE_TTL_CURRENT_HOUR = 30 # 30 seconds for current hour (updating frequently) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: @athul-rs I would suggest defining these values in settings/base.py so that they can be configured with an env too optionally with some sane defaults. Make sure the names are updated / prefixed with DASHBOARD_ to group them separately
| """ | ||
| # Sort params for consistent hashing | ||
| sorted_params = json.dumps(params, sort_keys=True, default=str) | ||
| params_hash = hashlib.md5(sorted_params.encode()).hexdigest()[:12] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Possibility for cache collision since we only take first 12 digits, but should suffice mostly since we separate for endpoints. Do check if a change is needed here
| except KeyError as e: | ||
| logger.warning(f"Skipping event with missing required field: {e}") | ||
| errors += 1 | ||
| except Exception as e: | ||
| logger.warning(f"Error processing event: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Consider adding a stack trace
| organization = UserContext.get_organization() | ||
| if not organization: | ||
| logger.warning("No organization context for metrics request") | ||
| raise PermissionDenied("No organization context") | ||
| return EventMetricsHourly.objects.filter(organization=organization) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@athul-rs check if this is required. This would be handled by the model manager itself usually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: This is a cool idea and if refactored a bit to make it generic, we can easily extend the same to any other API / DB call that we wish to cache. Consider this suggestion if its easy to make it generic
| from ..types import MetricEvent | ||
| from .base import AbstractMetricBackend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Consider absolute imports instead
| return self._connection | ||
| return self._get_connection_with_retry() | ||
|
|
||
| def _get_connection_with_retry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@athul-rs why do we need to implement this? Isn't it available / handled within celery / kombu?
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
|


What
Why
How
backend/dashboard_metrics/Django app with models, views, services, and tasksunstract/core/src/unstract/core/metrics/library with queue backend using Kombudashboard_metric_eventsqueueCan this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
Database Migrations
Env Config
Relevant Docs
Related Issues or PRs
Dependencies Versions
Notes on Testing
pytest backend/dashboard_metrics/tests/docker compose up worker-metricsGET /api/v2/metrics/health/Screenshots
N/A
Checklist
I have read and understood the Contribution Guidelines.