Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logger with Admin Client #1758

Merged
merged 6 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

v2.4.1 is a maintenance release with the following fixes and enhancements:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Version can be changed to 2.5.0 when we merge delete_records

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already done that in delete_records as that is a new feature.


- Added an example to show the usage of the custom logger with `AdminClient`
- Removed usage of `strcpy` to enhance security of the client (#1745)
- Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745)
- Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy.
- Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object
- Fixed `logger` not working when provided as an argument to `AdminClient`
Copy link
Contributor

@emasab emasab Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify it was possible before:

Suggested change
- Fixed `logger` not working when provided as an argument to `AdminClient`
- Fixed `logger` not working when provided as an argument to `AdminClient` instead of a configuration property.


confluent-kafka-python is based on librdkafka v2.4.1, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1)
Expand Down
47 changes: 47 additions & 0 deletions examples/adminapi_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import sys
import logging

from confluent_kafka.admin import AdminClient

if len(sys.argv) != 2:
sys.stderr.write("Usage: %s <broker>\n" % sys.argv[0])
sys.exit(1)

broker = sys.argv[1]

# Custom logger
logger = logging.getLogger('AdminClient')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
logger.addHandler(handler)

# Create Admin client
a = AdminClient({'bootstrap.servers': broker,
'debug': 'all'},
logger=logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger=logger)
logger=logger)
# Alternatively, pass the logger as a key.
# When passing it as an argument, it overwrites the key.
#
# a = AdminClient({'bootstrap.servers': broker,
# 'debug': 'all',
# 'logger': logger})


# Sample Admin API call
future = a.list_consumer_groups(request_timeout=10)

while not future.done():
# Log messages through custom logger while waiting for the result
a.poll(0.1)

try:
list_consumer_groups_result = future.result()
print("\n\n\n========================= List consumer groups result Start =========================")
print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
for valid in list_consumer_groups_result.valid:
print(" id: {} is_simple: {} state: {}".format(
valid.group_id, valid.is_simple_consumer_group, valid.state))
print("{} errors".format(len(list_consumer_groups_result.errors)))
for error in list_consumer_groups_result.errors:
print(" error: {}".format(error))
print("========================= List consumer groups result End =========================\n\n\n")

except Exception:
raise

# Log final log messages
a.poll(0)
9 changes: 5 additions & 4 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,18 @@ class AdminClient (_AdminClientImpl):
Requires broker version v0.11.0.0 or later.
"""

def __init__(self, conf):
def __init__(self, conf, **kwargs):
"""
Create a new AdminClient using the provided configuration dictionary.

The AdminClient is a standard Kafka protocol client, supporting
the standard librdkafka configuration properties as specified at
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

At least 'bootstrap.servers' should be configured.
:param dict conf: Configuration properties. At a minimum ``bootstrap.servers`` **should** be set\n"
:param Logger logger: Optional Logger instance to use as a custom log messages handler.
"""
super(AdminClient, self).__init__(conf)
super(AdminClient, self).__init__(conf, **kwargs)

@staticmethod
def _make_topics_result(f, futmap):
Expand Down
55 changes: 43 additions & 12 deletions tests/test_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from io import StringIO
import confluent_kafka
import confluent_kafka.avro
import confluent_kafka.admin
import logging


Expand All @@ -17,6 +18,16 @@ def filter(self, record):
print(record)


def _setup_string_buffer_logger(name):
stringBuffer = StringIO()
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stringBuffer)
handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s'))
logger.addHandler(handler)
return stringBuffer, logger


def test_logging_consumer():
""" Tests that logging works """

Expand Down Expand Up @@ -120,12 +131,7 @@ def test_logging_constructor():
def test_producer_logger_logging_in_given_format():
"""Test that asserts that logging is working by matching part of the log message"""

stringBuffer = StringIO()
logger = logging.getLogger('Producer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stringBuffer)
handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s'))
logger.addHandler(handler)
stringBuffer, logger = _setup_string_buffer_logger('Producer')

p = confluent_kafka.Producer(
{"bootstrap.servers": "test", "logger": logger, "debug": "msg"})
Expand All @@ -142,12 +148,7 @@ def test_producer_logger_logging_in_given_format():
def test_consumer_logger_logging_in_given_format():
"""Test that asserts that logging is working by matching part of the log message"""

stringBuffer = StringIO()
logger = logging.getLogger('Consumer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stringBuffer)
handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s'))
logger.addHandler(handler)
stringBuffer, logger = _setup_string_buffer_logger('Consumer')

c = confluent_kafka.Consumer(
{"bootstrap.servers": "test", "group.id": "test", "logger": logger, "debug": "msg"})
Expand All @@ -158,3 +159,33 @@ def test_consumer_logger_logging_in_given_format():
c.close()

assert "Consumer Logger | INIT" in logMessage


def test_admin_logger_logging_in_given_format_when_provided_in_conf():
"""Test that asserts that logging is working by matching part of the log message"""

stringBuffer, logger = _setup_string_buffer_logger('Admin')

admin_client = confluent_kafka.admin.AdminClient(
{"bootstrap.servers": "test", "logger": logger, "debug": "admin"})
admin_client.poll(0)

logMessage = stringBuffer.getvalue().strip()
stringBuffer.close()

assert "Admin Logger | INIT" in logMessage


def test_admin_logger_logging_in_given_format_when_provided_as_admin_client_argument():
"""Test that asserts that logging is working by matching part of the log message"""

stringBuffer, logger = _setup_string_buffer_logger('Admin')

admin_client = confluent_kafka.admin.AdminClient(
{"bootstrap.servers": "test", "debug": "admin"}, logger=logger)
admin_client.poll(0)

logMessage = stringBuffer.getvalue().strip()
stringBuffer.close()

assert "Admin Logger | INIT" in logMessage