File tree 2 files changed +48
-12
lines changed
2 files changed +48
-12
lines changed Original file line number Diff line number Diff line change @@ -890,16 +890,8 @@ def example_list_offsets(a, args):
890
890
operation = sys .argv [2 ]
891
891
args = sys .argv [3 :]
892
892
893
- # Custom logger
894
- logger = logging .getLogger ('AdminClient' )
895
- logger .setLevel (logging .INFO )
896
- handler = logging .StreamHandler ()
897
- handler .setFormatter (logging .Formatter ('%(asctime)-15s %(levelname)-8s %(message)s' ))
898
- logger .addHandler (handler )
899
-
900
893
# Create Admin client
901
- a = AdminClient ({'bootstrap.servers' : broker },
902
- logger = logger )
894
+ a = AdminClient ({'bootstrap.servers' : broker })
903
895
904
896
opsmap = {'create_topics' : example_create_topics ,
905
897
'delete_topics' : example_delete_topics ,
@@ -928,6 +920,3 @@ def example_list_offsets(a, args):
928
920
sys .exit (1 )
929
921
930
922
opsmap [operation ](a , args )
931
-
932
- # Log messages through custom logger if provided
933
- a .poll (0 )
Original file line number Diff line number Diff line change
1
+ import sys
2
+ import logging
3
+
4
+ from confluent_kafka .admin import AdminClient
5
+
6
+ if len (sys .argv ) != 2 :
7
+ sys .stderr .write ("Usage: %s <broker>\n " % sys .argv [0 ])
8
+ sys .exit (1 )
9
+
10
+ broker = sys .argv [1 ]
11
+
12
+ # Custom logger
13
+ logger = logging .getLogger ('AdminClient' )
14
+ logger .setLevel (logging .DEBUG )
15
+ handler = logging .StreamHandler ()
16
+ handler .setFormatter (logging .Formatter ('%(asctime)-15s %(levelname)-8s %(message)s' ))
17
+ logger .addHandler (handler )
18
+
19
+ # Create Admin client
20
+ a = AdminClient ({'bootstrap.servers' : broker ,
21
+ 'debug' : 'all' },
22
+ logger = logger )
23
+
24
+ # Sample Admin API call
25
+ future = a .list_consumer_groups (request_timeout = 10 )
26
+
27
+ while not future .done ():
28
+ # Log messages through custom logger while waiting for the result
29
+ a .poll (0.1 )
30
+
31
+ try :
32
+ list_consumer_groups_result = future .result ()
33
+ print ("\n \n \n ========================= List consumer groups result Start =========================" )
34
+ print ("{} consumer groups" .format (len (list_consumer_groups_result .valid )))
35
+ for valid in list_consumer_groups_result .valid :
36
+ print (" id: {} is_simple: {} state: {}" .format (
37
+ valid .group_id , valid .is_simple_consumer_group , valid .state ))
38
+ print ("{} errors" .format (len (list_consumer_groups_result .errors )))
39
+ for error in list_consumer_groups_result .errors :
40
+ print (" error: {}" .format (error ))
41
+ print ("========================= List consumer groups result End =========================\n \n \n " )
42
+
43
+ except Exception :
44
+ raise
45
+
46
+ # Log final log messages
47
+ a .poll (0 )
You can’t perform that action at this time.
0 commit comments