1414from .util import read_short_string , read_int_string
1515from .util import relative_unpack
1616from .util import write_short_string , write_int_string
17- from .util import group_list_by_key
17+ from .util import group_by_topic_and_partition
1818from .util import BufferUnderflowError , ChecksumError
1919
2020log = logging .getLogger ("kafka" )
3333# Response payloads
3434ProduceResponse = namedtuple ("ProduceResponse" , ["topic" , "partition" , "error" , "offset" ])
3535FetchResponse = namedtuple ("FetchResponse" , ["topic" , "partition" , "error" , "highwaterMark" , "messages" ])
36- OffsetResponse = namedtuple ("OffsetResponse" , ["topic" , "partition" , "error" , "offset " ])
36+ OffsetResponse = namedtuple ("OffsetResponse" , ["topic" , "partition" , "error" , "offsets " ])
3737OffsetCommitResponse = namedtuple ("OffsetCommitResponse" , ["topic" , "partition" , "error" ])
3838OffsetFetchResponse = namedtuple ("OffsetFetchResponse" , ["topic" , "partition" , "offset" , "metadata" , "error" ])
3939BrokerMetadata = namedtuple ("BrokerMetadata" , ["nodeId" , "host" , "port" ])
@@ -74,6 +74,9 @@ class KafkaProtocol(object):
7474 OFFSET_FETCH_KEY = 7
7575
7676 ATTRIBUTE_CODEC_MASK = 0x03
77+ CODEC_NONE = 0x00
78+ CODEC_GZIP = 0x01
79+ CODEC_SNAPPY = 0x02
7780
7881 ###################
7982 # Private API #
@@ -171,13 +174,13 @@ def _decode_message(cls, data, offset):
171174
172175 (key , cur ) = read_int_string (data , cur )
173176 (value , cur ) = read_int_string (data , cur )
174- if att & KafkaProtocol .ATTRIBUTE_CODEC_MASK == 0 :
177+ if att & KafkaProtocol .ATTRIBUTE_CODEC_MASK == KafkaProtocol . CODEC_NONE :
175178 yield (offset , Message (magic , att , key , value ))
176- elif att & KafkaProtocol .ATTRIBUTE_CODEC_MASK == 1 :
179+ elif att & KafkaProtocol .ATTRIBUTE_CODEC_MASK == KafkaProtocol . CODEC_GZIP :
177180 gz = gzip_decode (value )
178181 for (offset , message ) in KafkaProtocol ._decode_message_set_iter (gz ):
179182 yield (offset , message )
180- elif att & KafkaProtocol .ATTRIBUTE_CODEC_MASK == 2 :
183+ elif att & KafkaProtocol .ATTRIBUTE_CODEC_MASK == KafkaProtocol . CODEC_SNAPPY :
181184 snp = snappy_decode (value )
182185 for (offset , message ) in KafkaProtocol ._decode_message_set_iter (snp ):
183186 yield (offset , message )
@@ -214,8 +217,25 @@ def create_gzip_message(cls, payloads, key=None):
214217 message_set = KafkaProtocol ._encode_message_set (
215218 [KafkaProtocol .create_message (payload ) for payload in payloads ])
216219 gzipped = gzip_encode (message_set )
217- return Message (0 , 0x00 | (KafkaProtocol .ATTRIBUTE_CODEC_MASK & 0x01 ), key , gzipped )
220+ return Message (0 , 0x00 | (KafkaProtocol .ATTRIBUTE_CODEC_MASK & KafkaProtocol . CODEC_GZIP ), key , gzipped )
218221
222+ @classmethod
223+ def create_snappy_message (cls , payloads , key = None ):
224+ """
225+ Construct a Snappy Message containing multiple Messages
226+
227+ The given payloads will be encoded, compressed, and sent as a single atomic
228+ message to Kafka.
229+
230+ Params
231+ ======
232+ payloads: list(bytes), a list of payload to send be sent to Kafka
233+ key: bytes, a key used for partition routing (optional)
234+ """
235+ message_set = KafkaProtocol ._encode_message_set (
236+ [KafkaProtocol .create_message (payload ) for payload in payloads ])
237+ snapped = snappy_encode (message_set )
238+ return Message (0 , 0x00 | (KafkaProtocol .ATTRIBUTE_CODEC_MASK & KafkaProtocol .CODEC_SNAPPY ), key , snapped )
219239
220240 @classmethod
221241 def encode_produce_request (cls , client_id , correlation_id , payloads = [], acks = 1 , timeout = 1000 ):
@@ -234,14 +254,14 @@ def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1,
234254 -1: waits for all replicas to be in sync
235255 timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout
236256 """
237- payloads_by_topic = group_list_by_key (payloads , key = attrgetter ( "topic" ) )
257+ grouped_payloads = group_by_topic_and_partition (payloads )
238258 message = cls ._encode_message_header (client_id , correlation_id , KafkaProtocol .PRODUCE_KEY )
239- message += struct .pack ('>hii' , acks , timeout , len (payloads_by_topic ))
240- for topic , payloads in payloads_by_topic .items ():
241- message += struct .pack ('>h%dsi' % len (topic ), len (topic ), topic , len (payloads ))
242- for payload in payloads :
259+ message += struct .pack ('>hii' , acks , timeout , len (grouped_payloads ))
260+ for topic , topic_payloads in grouped_payloads .items ():
261+ message += struct .pack ('>h%dsi' % len (topic ), len (topic ), topic , len (topic_payloads ))
262+ for partition , payload in topic_payloads . items () :
243263 message_set = KafkaProtocol ._encode_message_set (payload .messages )
244- message += struct .pack ('>ii%ds' % len (message_set ), payload . partition , len (message_set ), message_set )
264+ message += struct .pack ('>ii%ds' % len (message_set ), partition , len (message_set ), message_set )
245265 return struct .pack ('>i%ds' % len (message ), len (message ), message )
246266
247267 @classmethod
@@ -276,15 +296,15 @@ def encode_fetch_request(cls, client_id, correlation_id, payloads=[], max_wait_t
276296 max_wait_time: int, how long to block waiting on min_bytes of data
277297 min_bytes: int, the minimum number of bytes to accumulate before returning the response
278298 """
279-
280- payloads_by_topic = group_list_by_key (payloads , key = attrgetter ( "topic" ) )
299+
300+ grouped_payloads = group_by_topic_and_partition (payloads )
281301 message = cls ._encode_message_header (client_id , correlation_id , KafkaProtocol .FETCH_KEY )
282- message += struct .pack ('>iiii' , - 1 , max_wait_time , min_bytes , len (payloads_by_topic )) # -1 is the replica id
283- for topic , payloads in payloads_by_topic .items ():
302+ message += struct .pack ('>iiii' , - 1 , max_wait_time , min_bytes , len (grouped_payloads )) # -1 is the replica id
303+ for topic , topic_payloads in grouped_payloads .items ():
284304 message += write_short_string (topic )
285- message += struct .pack ('>i' , len (payloads ))
286- for payload in payloads :
287- message += struct .pack ('>iqi' , payload . partition , payload .offset , payload .max_bytes )
305+ message += struct .pack ('>i' , len (topic_payloads ))
306+ for partition , payload in topic_payloads . items () :
307+ message += struct .pack ('>iqi' , partition , payload .offset , payload .max_bytes )
288308 return struct .pack ('>i%ds' % len (message ), len (message ), message )
289309
290310 @classmethod
@@ -308,14 +328,14 @@ def decode_fetch_response_iter(cls, data):
308328
309329 @classmethod
310330 def encode_offset_request (cls , client_id , correlation_id , payloads = []):
311- payloads_by_topic = group_list_by_key (payloads , key = attrgetter ( "topic" ) )
331+ grouped_payloads = group_by_topic_and_partition (payloads )
312332 message = cls ._encode_message_header (client_id , correlation_id , KafkaProtocol .OFFSET_KEY )
313- message += struct .pack ('>ii' , - 1 , len (payloads_by_topic )) # -1 is the replica id
314- for topic , payloads in payloads_by_topic .items ():
333+ message += struct .pack ('>ii' , - 1 , len (grouped_payloads )) # -1 is the replica id
334+ for topic , topic_payloads in grouped_payloads .items ():
315335 message += write_short_string (topic )
316- message += struct .pack ('>i' , len (payloads ))
317- for payload in payloads :
318- message += struct .pack ('>iqi' , payload . partition , payload .time , payload .max_offsets )
336+ message += struct .pack ('>i' , len (topic_payloads ))
337+ for partition , payload in topic_payloads . items () :
338+ message += struct .pack ('>iqi' , partition , payload .time , payload .max_offsets )
319339 return struct .pack ('>i%ds' % len (message ), len (message ), message )
320340
321341 @classmethod
@@ -332,8 +352,12 @@ def decode_offset_response(cls, data):
332352 (topic , cur ) = read_short_string (data , cur )
333353 ((num_partitions ,), cur ) = relative_unpack ('>i' , data , cur )
334354 for i in range (num_partitions ):
335- ((partition , error , offset ), cur ) = relative_unpack ('>ihq' , data , cur )
336- yield OffsetResponse (topic , partition , error , offset )
355+ ((partition , error , num_offsets ,), cur ) = relative_unpack ('>ihi' , data , cur )
356+ offsets = []
357+ for j in range (num_offsets ):
358+ ((offset ,), cur ) = relative_unpack ('>q' , data , cur )
359+ offsets .append (offset )
360+ yield OffsetResponse (topic , partition , error , tuple (offsets ))
337361
338362 @classmethod
339363 def encode_metadata_request (cls , client_id , correlation_id , topics = []):
@@ -400,15 +424,15 @@ def encode_offset_commit_request(cls, client_id, correlation_id, group, payloads
400424 group: string, the consumer group you are committing offsets for
401425 payloads: list of OffsetCommitRequest
402426 """
403- payloads_by_topic = group_list_by_key (payloads , key = attrgetter ( "topic" ) )
427+ grouped_payloads = group_by_topic_and_partition (payloads )
404428 message = cls ._encode_message_header (client_id , correlation_id , KafkaProtocol .OFFSET_COMMIT_KEY )
405429 message += write_short_string (group )
406- message += struct .pack ('>i' , len (payloads_by_topic ))
407- for topic , payloads in payloads_by_topic .items ():
430+ message += struct .pack ('>i' , len (grouped_payloads ))
431+ for topic , topic_payloads in grouped_payloads .items ():
408432 message += write_short_string (topic )
409- message += struct .pack ('>i' , len (payloads ))
410- for payload in payloads :
411- message += struct .pack ('>iq' , payload . partition , payload .offset )
433+ message += struct .pack ('>i' , len (topic_payloads ))
434+ for partition , payload in topic_payloads . items () :
435+ message += struct .pack ('>iq' , partition , payload .offset )
412436 message += write_short_string (payload .metadata )
413437 return struct .pack ('>i%ds' % len (message ), len (message ), message )
414438
@@ -421,6 +445,7 @@ def decode_offset_commit_response(cls, data):
421445 ======
422446 data: bytes to decode
423447 """
448+ data = data [2 :] # TODO remove me when versionId is removed
424449 ((correlation_id ,), cur ) = relative_unpack ('>i' , data , 0 )
425450 (client_id , cur ) = read_short_string (data , cur )
426451 ((num_topics ,), cur ) = relative_unpack ('>i' , data , cur )
@@ -443,15 +468,15 @@ def encode_offset_fetch_request(cls, client_id, correlation_id, group, payloads)
443468 group: string, the consumer group you are fetching offsets for
444469 payloads: list of OffsetFetchRequest
445470 """
446- payloads_by_topic = group_list_by_key (payloads , key = attrgetter ( "topic" ) )
471+ grouped_payloads = group_by_topic_and_partition (payloads )
447472 message = cls ._encode_message_header (client_id , correlation_id , KafkaProtocol .OFFSET_FETCH_KEY )
448473 message += write_short_string (group )
449- message += struct .pack ('>i' , len (payloads_by_topic ))
450- for topic , payloads in payloads_by_topic .items ():
474+ message += struct .pack ('>i' , len (grouped_payloads ))
475+ for topic , topic_payloads in grouped_payloads .items ():
451476 message += write_short_string (topic )
452- message += struct .pack ('>i' , len (payloads ))
453- for payload in payloads :
454- message += struct .pack ('>i' , payload . partition )
477+ message += struct .pack ('>i' , len (topic_payloads ))
478+ for partition , payload in topic_payloads . items () :
479+ message += struct .pack ('>i' , partition )
455480 return struct .pack ('>i%ds' % len (message ), len (message ), message )
456481
457482 @classmethod
@@ -493,6 +518,9 @@ def __init__(self, host, port, bufsize=4096):
493518 self ._sock .connect ((host , port ))
494519 self ._sock .settimeout (10 )
495520
521+ def __str__ (self ):
522+ return "<KafkaConnection host=%s port=%d>" % (self .host , self .port )
523+
496524 ###################
497525 # Private API #
498526 ###################
@@ -536,6 +564,8 @@ def _consume_response_iter(self):
536564 # Public API #
537565 ##################
538566
567+ # TODO multiplex socket communication to allow for multi-threaded clients
568+
539569 def send (self , requestId , payload ):
540570 "Send a request to Kafka"
541571 sent = self ._sock .sendall (payload )
@@ -566,6 +596,10 @@ def __init__(self, host, port, bufsize=4096):
566596 self .topics_to_brokers = {} # topic_id -> broker_id
567597 self .load_metadata_for_topics ()
568598
599+ def close (self ):
600+ for conn in self .conns .values ():
601+ conn .close ()
602+
569603 def get_conn_for_broker (self , broker ):
570604 "Get or create a connection to a broker"
571605 if (broker .host , broker .port ) not in self .conns :
@@ -626,20 +660,14 @@ def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
626660 ======
627661 list of ProduceResponse or callback(ProduceResponse), in the order of input payloads
628662 """
629- key_fn = lambda x : (x .topic , x .partition )
630-
631- # Note the order of the incoming payloads
632- original_keys = [key_fn (payload ) for payload in payloads ]
633-
634- # Group the produce requests by topic+partition
635- payloads_by_topic_and_partition = group_list_by_key (payloads , key = key_fn )
636-
637663 # Group the produce requests by which broker they go to
664+ original_keys = []
638665 payloads_by_broker = defaultdict (list )
639- for (topic , partition ), payloads in payloads_by_topic_and_partition .items ():
640- payloads_by_broker [self .get_leader_for_partition (topic , partition )] += payloads
666+ for payload in payloads :
667+ payloads_by_broker [self .get_leader_for_partition (payload .topic , payload .partition )] += payloads
668+ original_keys .append ((payload .topic , payload .partition ))
641669
642- # Accumulate the responses in a dictionary, keyed by key_fn
670+ # Accumulate the responses in a dictionary
643671 acc = {}
644672
645673 # For each broker, send the list of request payloads
@@ -657,11 +685,10 @@ def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
657685 (TopicAndPartition (produce_response .topic , produce_response .partition ), produce_response .error ))
658686 # Run the callback
659687 if callback is not None :
660- acc [key_fn (produce_response )] = callback (produce_response )
688+ acc [(produce_response . topic , produce_response . partition )] = callback (produce_response )
661689 else :
662- acc [key_fn (produce_response )] = produce_response
690+ acc [(produce_response . topic , produce_response . partition )] = produce_response
663691
664- print (acc )
665692 # Order the accumulated responses by the original key order
666693 return (acc [k ] for k in original_keys )
667694
@@ -672,20 +699,14 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None):
672699 Payloads are grouped by topic and partition so they can be pipelined to the same
673700 brokers.
674701 """
675- key_fn = lambda x : (x .topic , x .partition )
676-
677- # Note the order of the incoming payloads
678- original_keys = [key_fn (payload ) for payload in payloads ]
679-
680- # Group the produce requests by topic+partition
681- payloads_by_topic_and_partition = group_list_by_key (payloads , key = key_fn )
682-
683702 # Group the produce requests by which broker they go to
703+ original_keys = []
684704 payloads_by_broker = defaultdict (list )
685- for (topic , partition ), payloads in payloads_by_topic_and_partition .items ():
686- payloads_by_broker [self .get_leader_for_partition (topic , partition )] += payloads
705+ for payload in payloads :
706+ payloads_by_broker [self .get_leader_for_partition (payload .topic , payload .partition )].append (payload )
707+ original_keys .append ((payload .topic , payload .partition ))
687708
688- # Accumulate the responses in a dictionary, keyed by key_fn
709+ # Accumulate the responses in a dictionary, keyed by topic+partition
689710 acc = {}
690711
691712 # For each broker, send the list of request payloads
@@ -703,9 +724,9 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None):
703724 (TopicAndPartition (fetch_response .topic , fetch_response .partition ), fetch_response .error ))
704725 # Run the callback
705726 if callback is not None :
706- acc [key_fn (fetch_response )] = callback (fetch_response )
727+ acc [(fetch_response . topic , fetch_response . partition )] = callback (fetch_response )
707728 else :
708- acc [key_fn (fetch_response )] = fetch_response
729+ acc [(fetch_response . topic , fetch_response . partition )] = fetch_response
709730
710731 # Order the accumulated responses by the original key order
711732 return (acc [k ] for k in original_keys )
@@ -720,11 +741,30 @@ def try_send_request(self, requestId, request):
720741 conn .send (requestId , request )
721742 response = conn .recv (requestId )
722743 return response
723- except Exception :
724- log .warning ("Could not commit offset to server %s, trying next server" , conn )
744+ except Exception , e :
745+ log .warning ("Could not send request [%r] to server %s, trying next server: %s" % ( request , conn , e ) )
725746 continue
726747 return None
727748
749+ def send_offset_request (self , payloads = [], fail_on_error = True , callback = None ):
750+ requestId = self .next_id ()
751+ request = KafkaProtocol .encode_offset_request (KafkaClient .CLIENT_ID , requestId , payloads )
752+ response = self .try_send_request (requestId , request )
753+ if response is None :
754+ if fail_on_error is True :
755+ raise Exception ("All servers failed to process request" )
756+ else :
757+ return None
758+ out = []
759+ for offset_response in KafkaProtocol .decode_offset_response (response ):
760+ if fail_on_error == True and offset_response .error != 0 :
761+ raise Exception ("OffsetRequest failed with errorcode=%s" , offset_response .error )
762+ if callback is not None :
763+ out .append (callback (offset_response ))
764+ else :
765+ out .append (offset_response )
766+ return out
767+
728768 def send_offset_commit_request (self , group , payloads = [], fail_on_error = True , callback = None ):
729769 requestId = self .next_id ()
730770 request = KafkaProtocol .encode_offset_commit_request (KafkaClient .CLIENT_ID , requestId , group , payloads )
@@ -737,6 +777,7 @@ def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, cal
737777 out = []
738778 for offset_commit_response in KafkaProtocol .decode_offset_commit_response (response ):
739779 if fail_on_error == True and offset_commit_response .error != 0 :
780+ print (offset_commit_response )
740781 raise Exception ("OffsetCommitRequest failed with errorcode=%s" , offset_commit_response .error )
741782 if callback is not None :
742783 out .append (callback (offset_commit_response ))
@@ -770,7 +811,7 @@ def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, call
770811
771812 topic = "foo8"
772813 # Bootstrap connection
773- conn = KafkaClient ("localhost" , 9092 )
814+ conn = KafkaClient ("localhost" , 49720 )
774815
775816 # Create some Messages
776817 messages = (KafkaProtocol .create_gzip_message (["GZIPPed" ]),
@@ -799,7 +840,6 @@ def init_offsets(offset_response):
799840 return 0
800841 else :
801842 return offset_response .offset
802-
803843 # Load offsets
804844 (offset1 , offset2 ) = conn .send_offset_fetch_request (
805845 group = "group1" ,
0 commit comments