From e0010f72b0b0084c24c328962184d455bf2a02c8 Mon Sep 17 00:00:00 2001 From: Alexey Kuzyashin <33540273+Kuzyashin@users.noreply.github.com> Date: Fri, 12 Mar 2021 03:05:28 +0300 Subject: [PATCH] Add app_name to prometheus metrics (#120) --- faust/sensors/prometheus.py | 72 +++++++++++++---------- tests/unit/sensors/test_prometheus.py | 82 ++++++++++++++------------- 2 files changed, 85 insertions(+), 69 deletions(-) diff --git a/faust/sensors/prometheus.py b/faust/sensors/prometheus.py index ce3959ad3..1a805c534 100644 --- a/faust/sensors/prometheus.py +++ b/faust/sensors/prometheus.py @@ -40,14 +40,17 @@ def setup_prometheus_sensors( - app: AppT, pattern: str = "/metrics", registry: CollectorRegistry = REGISTRY + app: AppT, + pattern: str = "/metrics", + registry: CollectorRegistry = REGISTRY, + name_prefix: str = None, ) -> None: if prometheus_client is None: raise ImproperlyConfigured( "prometheus_client requires `pip install prometheus_client`." ) - faust_metrics = FaustMetrics.create(registry) + faust_metrics = FaustMetrics.create(registry, name_prefix or app.conf.name) app.monitor = PrometheusMonitor(metrics=faust_metrics) @app.page(pattern) @@ -105,119 +108,128 @@ class FaustMetrics(NamedTuple): consumer_commit_latency: Histogram @classmethod - def create(cls, registry: CollectorRegistry) -> "FaustMetrics": + def create(cls, registry: CollectorRegistry, app_name: str) -> "FaustMetrics": messages_received = Counter( - "messages_received", "Total messages received", registry=registry + f"{app_name}_messages_received", + "Total messages received", + registry=registry, ) active_messages = Gauge( - "active_messages", "Total active messages", registry=registry + f"{app_name}_active_messages", "Total active messages", registry=registry ) messages_received_per_topics = Counter( - "messages_received_per_topic", + f"{app_name}_messages_received_per_topic", "Messages received per topic", ["topic"], registry=registry, ) messages_received_per_topics_partition = Gauge( - "messages_received_per_topics_partition", + f"{app_name}_messages_received_per_topics_partition", "Messages received per topic/partition", ["topic", "partition"], registry=registry, ) events_runtime_latency = Histogram( - "events_runtime_ms", "Events runtime in ms", registry=registry + f"{app_name}_events_runtime_ms", "Events runtime in ms", registry=registry ) total_events = Counter( - "total_events", "Total events received", registry=registry + f"{app_name}_total_events", "Total events received", registry=registry ) total_active_events = Gauge( - "total_active_events", "Total active events", registry=registry + f"{app_name}_total_active_events", "Total active events", registry=registry ) total_events_per_stream = Counter( - "total_events_per_stream", + f"{app_name}_total_events_per_stream", "Events received per Stream", ["stream"], registry=registry, ) table_operations = Counter( - "table_operations", + f"{app_name}_table_operations", "Total table operations", ["table", "operation"], registry=registry, ) topic_messages_sent = Counter( - "topic_messages_sent", + f"{app_name}_topic_messages_sent", "Total messages sent per topic", ["topic"], registry=registry, ) total_sent_messages = Counter( - "total_sent_messages", "Total messages sent", registry=registry + f"{app_name}_total_sent_messages", "Total messages sent", registry=registry ) producer_send_latency = Histogram( - "producer_send_latency", "Producer send latency in ms", registry=registry + f"{app_name}_producer_send_latency", + "Producer send latency in ms", + registry=registry, ) total_error_messages_sent = Counter( - "total_error_messages_sent", "Total error messages sent", registry=registry + f"{app_name}_total_error_messages_sent", + "Total error messages sent", + registry=registry, ) producer_error_send_latency = Histogram( - "producer_error_send_latency", + f"{app_name}_producer_error_send_latency", "Producer error send latency in ms", registry=registry, ) assignment_operations = Counter( - "assignment_operations", + f"{app_name}_assignment_operations", "Total assigment operations (completed/error)", ["operation"], registry=registry, ) assign_latency = Histogram( - "assign_latency", "Assignment latency in ms", registry=registry + f"{app_name}_assign_latency", "Assignment latency in ms", registry=registry ) total_rebalances = Gauge( - "total_rebalances", "Total rebalances", registry=registry + f"{app_name}_total_rebalances", "Total rebalances", registry=registry ) total_rebalances_recovering = Gauge( - "total_rebalances_recovering", + f"{app_name}_total_rebalances_recovering", "Total rebalances recovering", registry=registry, ) rebalance_done_consumer_latency = Histogram( - "rebalance_done_consumer_latency", + f"{app_name}_rebalance_done_consumer_latency", "Consumer replying that rebalance is done to broker in ms", registry=registry, ) rebalance_done_latency = Histogram( - "rebalance_done_latency", + f"{app_name}_rebalance_done_latency", "Rebalance finished latency in ms", registry=registry, ) count_metrics_by_name = Gauge( - "metrics_by_name", "Total metrics by name", ["metric"], registry=registry + f"{app_name}_metrics_by_name", + "Total metrics by name", + ["metric"], + registry=registry, ) http_status_codes = Counter( - "http_status_codes", + f"{app_name}_http_status_codes", "Total http_status code", ["status_code"], registry=registry, ) http_latency = Histogram( - "http_latency", "Http response latency in ms", registry=registry + f"{app_name}_http_latency", "Http response latency in ms", registry=registry ) topic_partition_end_offset = Gauge( - "topic_partition_end_offset", + f"{app_name}_topic_partition_end_offset", "Offset ends per topic/partition", ["topic", "partition"], registry=registry, ) topic_partition_offset_commited = Gauge( - "topic_partition_offset_commited", + f"{app_name}_topic_partition_offset_commited", "Offset commited per topic/partition", ["topic", "partition"], registry=registry, ) consumer_commit_latency = Histogram( - "consumer_commit_latency", + f"{app_name}_consumer_commit_latency", "Consumer commit latency in ms", registry=registry, ) @@ -295,7 +307,7 @@ class PrometheusMonitor(Monitor): from faust.sensors.prometheus import setup_prometheus_sensors app = faust.App('example', broker='kafka://') - setup_prometheus_sensors(app, pattern='/metrics') + setup_prometheus_sensors(app, pattern='/metrics', 'example_app_name') """ ERROR = "error" diff --git a/tests/unit/sensors/test_prometheus.py b/tests/unit/sensors/test_prometheus.py index 429fc9e68..3d729df55 100644 --- a/tests/unit/sensors/test_prometheus.py +++ b/tests/unit/sensors/test_prometheus.py @@ -26,7 +26,7 @@ def registry(self) -> CollectorRegistry: @pytest.fixture def metrics(self, registry: CollectorRegistry) -> FaustMetrics: - return FaustMetrics.create(registry) + return FaustMetrics.create(registry, "test") @pytest.fixture def app(self) -> AppT: @@ -67,18 +67,20 @@ def test_on_message_in( monitor.on_message_in(TP1, 400, message) self.assert_has_sample_value( - metrics.messages_received, "messages_received_total", {}, 1 + metrics.messages_received, "test_messages_received_total", {}, 1 + ) + self.assert_has_sample_value( + metrics.active_messages, "test_active_messages", {}, 1 ) - self.assert_has_sample_value(metrics.active_messages, "active_messages", {}, 1) self.assert_has_sample_value( metrics.messages_received_per_topics, - "messages_received_per_topic_total", + "test_messages_received_per_topic_total", {"topic": "foo"}, 1, ) self.assert_has_sample_value( metrics.messages_received_per_topics_partition, - "messages_received_per_topics_partition", + "test_messages_received_per_topics_partition", {"topic": "foo", "partition": "3"}, 400, ) @@ -93,7 +95,7 @@ def test_on_message_out( monitor.on_message_out(TP1, 400, message) self.assert_has_sample_value( - metrics.active_messages, "active_messages", {}, n_messages - 1 + metrics.active_messages, "test_active_messages", {}, n_messages - 1 ) def test_on_stream_event_in( @@ -105,13 +107,15 @@ def test_on_stream_event_in( ) -> None: monitor.on_stream_event_in(TP1, 401, stream, event) - self.assert_has_sample_value(metrics.total_events, "total_events_total", {}, 1) self.assert_has_sample_value( - metrics.total_active_events, "total_active_events", {}, 1 + metrics.total_events, "test_total_events_total", {}, 1 + ) + self.assert_has_sample_value( + metrics.total_active_events, "test_total_active_events", {}, 1 ) self.assert_has_sample_value( metrics.total_events_per_stream, - "total_events_per_stream_total", + "test_total_events_per_stream_total", {"stream": "stream.topic_foo.events"}, 1, ) @@ -131,7 +135,7 @@ def test_on_stream_event_out( self.assert_has_sample_value( metrics.total_active_events, - "total_active_events", + "test_total_active_events", {}, n_events - 1, ) @@ -146,7 +150,7 @@ def test_on_stream_event_out_does_not_measure_latency_without_state( monitor.on_stream_event_out(TP1, 401, stream, event, state=None) self.assert_doesnt_have_sample_values( - metrics.events_runtime_latency, "events_runtime_latency_total", {} + metrics.events_runtime_latency, "test_events_runtime_latency_total", {} ) def test_on_table_get( @@ -156,7 +160,7 @@ def test_on_table_get( self.assert_has_sample_value( metrics.table_operations, - "table_operations_total", + "test_table_operations_total", {"table": f"table.{table.name}", "operation": "keys_retrieved"}, 1, ) @@ -168,7 +172,7 @@ def test_on_table_set( self.assert_has_sample_value( metrics.table_operations, - "table_operations_total", + "test_table_operations_total", {"table": f"table.{table.name}", "operation": "keys_updated"}, 1, ) @@ -180,7 +184,7 @@ def test_on_table_del( self.assert_has_sample_value( metrics.table_operations, - "table_operations_total", + "test_table_operations_total", {"table": f"table.{table.name}", "operation": "keys_deleted"}, 1, ) @@ -195,7 +199,7 @@ def test_on_commit_completed( self.assert_has_sample_value( metrics.consumer_commit_latency, - "consumer_commit_latency_sum", + "test_consumer_commit_latency_sum", {}, monitor.ms_since(float(state)), ) @@ -209,17 +213,17 @@ def test_on_send_initiated_completed( monitor.on_send_completed(producer, state, Mock(name="metadata")) self.assert_has_sample_value( - metrics.total_sent_messages, "total_sent_messages_total", {}, 1 + metrics.total_sent_messages, "test_total_sent_messages_total", {}, 1 ) self.assert_has_sample_value( metrics.topic_messages_sent, - "topic_messages_sent_total", + "test_topic_messages_sent_total", {"topic": "topic.topic1"}, 1, ) self.assert_has_sample_value( metrics.producer_send_latency, - "producer_send_latency_sum", + "test_producer_send_latency_sum", {}, monitor.ms_since(float(state)), ) @@ -234,13 +238,13 @@ def test_on_send_error( self.assert_has_sample_value( metrics.total_error_messages_sent, - "total_error_messages_sent_total", + "test_total_error_messages_sent_total", {}, 1, ) self.assert_has_sample_value( metrics.producer_error_send_latency, - "producer_error_send_latency_sum", + "test_producer_error_send_latency_sum", {}, monitor.ms_since(timestamp), ) @@ -255,13 +259,13 @@ def test_on_assignment_start_completed( self.assert_has_sample_value( metrics.assignment_operations, - "assignment_operations_total", + "test_assignment_operations_total", {"operation": monitor.COMPLETED}, 1, ) self.assert_has_sample_value( metrics.assign_latency, - "assign_latency_sum", + "test_assign_latency_sum", {}, monitor.ms_since(state["time_start"]), ) @@ -276,13 +280,13 @@ def test_on_assignment_start_failed( self.assert_has_sample_value( metrics.assignment_operations, - "assignment_operations_total", + "test_assignment_operations_total", {"operation": monitor.ERROR}, 1, ) self.assert_has_sample_value( metrics.assign_latency, - "assign_latency_sum", + "test_assign_latency_sum", {}, monitor.ms_since(state["time_start"]), ) @@ -293,7 +297,7 @@ def test_on_rebalance( monitor.on_rebalance_start(app) self.assert_has_sample_value( - metrics.total_rebalances, "total_rebalances", {}, 1 + metrics.total_rebalances, "test_total_rebalances", {}, 1 ) def test_on_rebalance_return( @@ -306,17 +310,17 @@ def test_on_rebalance_return( monitor.on_rebalance_return(app, state) self.assert_has_sample_value( - metrics.total_rebalances, "total_rebalances", {}, n_rebalances - 1 + metrics.total_rebalances, "test_total_rebalances", {}, n_rebalances - 1 ) self.assert_has_sample_value( metrics.total_rebalances_recovering, - "total_rebalances_recovering", + "test_total_rebalances_recovering", {}, 1, ) self.assert_has_sample_value( metrics.rebalance_done_consumer_latency, - "rebalance_done_consumer_latency_sum", + "test_rebalance_done_consumer_latency_sum", {}, monitor.ms_since(state["time_return"]), ) @@ -332,13 +336,13 @@ def test_on_rebalance_end( self.assert_has_sample_value( metrics.total_rebalances_recovering, - "total_rebalances_recovering", + "test_total_rebalances_recovering", {}, n_rebalances - 1, ) self.assert_has_sample_value( metrics.rebalance_done_latency, - "rebalance_done_latency_sum", + "test_rebalance_done_latency_sum", {}, monitor.ms_since(state["time_end"]), ) @@ -384,13 +388,13 @@ def assert_on_web_request( self.assert_has_sample_value( metrics.http_status_codes, - "http_status_codes_total", + "test_http_status_codes_total", {"status_code": str(expected_status)}, 1, ) self.assert_has_sample_value( metrics.http_latency, - "http_latency_sum", + "test_http_latency_sum", {}, monitor.ms_since(state["time_end"]), ) @@ -400,7 +404,7 @@ def test_count(self, monitor: PrometheusMonitor, metrics: FaustMetrics) -> None: self.assert_has_sample_value( metrics.count_metrics_by_name, - "metrics_by_name", + "test_metrics_by_name", {"metric": "metric_name"}, 3, ) @@ -414,19 +418,19 @@ def test_on_tp_commit( self.assert_has_sample_value( metrics.topic_partition_offset_commited, - "topic_partition_offset_commited", + "test_topic_partition_offset_commited", {"topic": "foo", "partition": "0"}, 1001, ) self.assert_has_sample_value( metrics.topic_partition_offset_commited, - "topic_partition_offset_commited", + "test_topic_partition_offset_commited", {"topic": "foo", "partition": "1"}, 2002, ) self.assert_has_sample_value( metrics.topic_partition_offset_commited, - "topic_partition_offset_commited", + "test_topic_partition_offset_commited", {"topic": "bar", "partition": "3"}, 3003, ) @@ -438,7 +442,7 @@ def test_track_tp_end_offsets( self.assert_has_sample_value( metrics.topic_partition_end_offset, - "topic_partition_end_offset", + "test_topic_partition_end_offset", {"topic": "foo", "partition": "0"}, 4004, ) @@ -473,14 +477,14 @@ def test_old_labels_are_removed_from_registry_after_rebalance( collected_topics = frozenset( sample.labels["topic"] for metric in registry.collect() - if metric.name == "messages_received_per_topic" + if metric.name == "test_messages_received_per_topic" for sample in metric.samples ) assert collected_topics == frozenset([TP2.topic]) collected_partitions = frozenset( (sample.labels["topic"], sample.labels["partition"]) for metric in registry.collect() - if metric.name == "messages_received_per_topics_partition" + if metric.name == "test_messages_received_per_topics_partition" for sample in metric.samples ) assert collected_partitions == frozenset([(TP2.topic, str(TP2.partition))])