Skip to content

Commit

Permalink
Collect version metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
ofek committed Oct 6, 2020
1 parent 109ab42 commit 0c3176f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
11 changes: 11 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ def check(self, instance):
self._report_highwater_offsets(self._context_limit)
self._report_consumer_offsets_and_lag(self._context_limit - len(self._highwater_offsets))

self._collect_broker_metadata()

def _create_kafka_admin_client(self, api_version):
"""Return a KafkaAdminClient."""
kafka_connect_str = self.instance.get('kafka_connect_str')
Expand Down Expand Up @@ -458,6 +460,15 @@ def _send_event(self, title, text, tags, event_type, aggregation_key, severity='
}
self.event(event_dict)

@AgentCheck.metadata_entrypoint
def _collect_broker_metadata(self):
version_data = [str(part) for part in self.kafka_client._client.check_version()]
version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)}

self.set_metadata(
'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts
)

@classmethod
def _determine_kafka_version(cls, init_config, instance):
"""Return the Kafka cluster version as a tuple."""
Expand Down
15 changes: 15 additions & 0 deletions kafka_consumer/tests/test_kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,18 @@ def test_no_partitions(aggregator, kafka_instance):
kafka_consumer_check.check(kafka_instance)

assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}})


@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
def test_version_metadata(datadog_agent, kafka_instance):
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
kafka_consumer_check.check_id = 'test:123'

version_data = [str(part) for part in kafka_consumer_check.kafka_client._client.check_version()]
version_parts = {'version.{}'.format(name): part for name, part in zip(('major', 'minor', 'patch'), version_data)}
version_parts['version.scheme'] = 'semver'
version_parts['version.raw'] = '.'.join(version_data)

kafka_consumer_check.check(kafka_instance)
datadog_agent.assert_metadata('test:123', version_parts)

0 comments on commit 0c3176f

Please sign in to comment.