1
1
"""
2
2
Kafka Admin client: create, view, alter, delete topics and resources.
3
3
"""
4
- from ..cimpl import (KafkaException , # noqa
5
- _AdminClientImpl ,
6
- NewTopic ,
7
- NewPartitions ,
8
- CONFIG_SOURCE_UNKNOWN_CONFIG ,
9
- CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG ,
10
- CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG ,
11
- CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG ,
12
- CONFIG_SOURCE_STATIC_BROKER_CONFIG ,
13
- CONFIG_SOURCE_DEFAULT_CONFIG ,
14
- RESOURCE_UNKNOWN ,
15
- RESOURCE_ANY ,
16
- RESOURCE_TOPIC ,
17
- RESOURCE_GROUP ,
18
- RESOURCE_BROKER )
4
+ from confluent_kafka .cimpl import (KafkaException ,
5
+ _AdminClientImpl ,
6
+ NewTopic ,
7
+ NewPartitions ,
8
+ CONFIG_SOURCE_UNKNOWN_CONFIG ,
9
+ CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG ,
10
+ CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG ,
11
+ CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG ,
12
+ CONFIG_SOURCE_STATIC_BROKER_CONFIG ,
13
+ CONFIG_SOURCE_DEFAULT_CONFIG ,
14
+ RESOURCE_UNKNOWN ,
15
+ RESOURCE_ANY ,
16
+ RESOURCE_TOPIC ,
17
+ RESOURCE_GROUP ,
18
+ RESOURCE_BROKER )
19
+
20
+ __all__ = ['CONFIG_SOURCE_DEFAULT_CONFIG' ,
21
+ 'CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG' ,
22
+ 'CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG' ,
23
+ 'CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG' ,
24
+ 'CONFIG_SOURCE_STATIC_BROKER_CONFIG' ,
25
+ 'CONFIG_SOURCE_UNKNOWN_CONFIG' ,
26
+ 'NewTopic' ,
27
+ 'NewPartitions' ]
19
28
20
29
import concurrent .futures
21
30
import functools
@@ -122,7 +131,8 @@ def __init__(self, restype, name,
122
131
try :
123
132
restype = ConfigResource .Type [restype .upper ()]
124
133
except KeyError :
125
- raise ValueError ("Unknown resource type \" %s\" : should be a ConfigResource.Type" % restype )
134
+ raise ValueError (
135
+ "Unknown resource type \" %s\" : should be a ConfigResource.Type" % restype )
126
136
127
137
elif type (restype ) == int :
128
138
# The C-code passes restype as an int, convert to Type.
@@ -182,7 +192,7 @@ def set_config(self, name, value, overwrite=True):
182
192
self .set_config_dict [name ] = value
183
193
184
194
185
- class AdminClient (_AdminClientImpl ):
195
+ class AdminClient (_AdminClientImpl ):
186
196
"""
187
197
AdminClient provides admin operations for Kafka brokers, topics, groups,
188
198
and other resource types supported by the broker.
@@ -203,6 +213,7 @@ class AdminClient (_AdminClientImpl):
203
213
204
214
Requires broker version v0.11.0.0 or later.
205
215
"""
216
+
206
217
def __init__ (self , conf ):
207
218
"""
208
219
Create a new AdminClient using the provided configuration dictionary.
@@ -250,7 +261,8 @@ def _make_resource_result(f, futmap):
250
261
for resource , configs in result .items ():
251
262
fut = futmap .get (resource , None )
252
263
if fut is None :
253
- raise RuntimeError ("Resource {} not found in future-map: {}" .format (resource , futmap ))
264
+ raise RuntimeError (
265
+ "Resource {} not found in future-map: {}" .format (resource , futmap ))
254
266
if resource .error is not None :
255
267
# Resource-level exception
256
268
fut .set_exception (KafkaException (resource .error ))
@@ -462,7 +474,7 @@ def alter_configs(self, resources, **kwargs):
462
474
return futmap
463
475
464
476
465
- class ClusterMetadata (object ):
477
+ class ClusterMetadata (object ):
466
478
"""
467
479
ClusterMetadata as returned by list_topics() contains information
468
480
about the Kafka cluster, brokers, and topics.
@@ -476,6 +488,7 @@ class ClusterMetadata (object):
476
488
:ivar int orig_broker_id: The broker this metadata originated from.
477
489
:ivar str orig_broker_name: Broker name/address this metadata originated from.
478
490
"""
491
+
479
492
def __init__ (self ):
480
493
self .cluster_id = None
481
494
self .controller_id = - 1
@@ -491,7 +504,7 @@ def __str__(self):
491
504
return str (self .cluster_id )
492
505
493
506
494
- class BrokerMetadata (object ):
507
+ class BrokerMetadata (object ):
495
508
"""
496
509
BrokerMetadata contains information about a Kafka broker.
497
510
@@ -501,6 +514,7 @@ class BrokerMetadata (object):
501
514
:ivar str host: Broker hostname.
502
515
:ivar int port: Broker port.
503
516
"""
517
+
504
518
def __init__ (self ):
505
519
self .id = - 1
506
520
self .host = None
@@ -513,7 +527,7 @@ def __str__(self):
513
527
return "{}:{}/{}" .format (self .host , self .port , self .id )
514
528
515
529
516
- class TopicMetadata (object ):
530
+ class TopicMetadata (object ):
517
531
"""
518
532
TopicMetadata contains information about a Kafka topic.
519
533
@@ -523,6 +537,7 @@ class TopicMetadata (object):
523
537
:ivar dict partitions: Map of partitions indexed by partition id. Value is PartitionMetadata object.
524
538
:ivar KafkaError -error: Topic error, or None. Value is a KafkaError object.
525
539
"""
540
+
526
541
# The dash in "-topic" and "-error" is needed to circumvent a
527
542
# Sphinx issue where it tries to reference the same instance variable
528
543
# on other classes which raises a warning/error.
@@ -533,15 +548,16 @@ def __init__(self):
533
548
534
549
def __repr__ (self ):
535
550
if self .error is not None :
536
- return "TopicMetadata({}, {} partitions, {})" .format (self .topic , len (self .partitions ), self .error )
551
+ return "TopicMetadata({}, {} partitions, {})" .format (self .topic , len (self .partitions ),
552
+ self .error )
537
553
else :
538
554
return "TopicMetadata({}, {} partitions)" .format (self .topic , len (self .partitions ))
539
555
540
556
def __str__ (self ):
541
557
return self .topic
542
558
543
559
544
- class PartitionMetadata (object ):
560
+ class PartitionMetadata (object ):
545
561
"""
546
562
PartitionsMetadata contains information about a Kafka partition.
547
563
@@ -558,6 +574,7 @@ class PartitionMetadata (object):
558
574
in ClusterMetadata.brokers. Always check the availability
559
575
of a broker id in the brokers dict.
560
576
"""
577
+
561
578
def __init__ (self ):
562
579
self .id = - 1
563
580
self .leader = - 1
0 commit comments