@@ -329,7 +329,7 @@ def __init__(self, magic, compression_type, batch_size):
329329 self ._batch_size = batch_size
330330 self ._buffer = bytearray ()
331331
332- def append (self , offset , timestamp , key , value , headers = None ):
332+ def append (self , offset , timestamp , key_bytes , value_bytes , headers = None ):
333333 """ Append message to batch.
334334 """
335335 assert not headers , "Headers not supported in v0/v1"
@@ -344,18 +344,18 @@ def append(self, offset, timestamp, key, value, headers=None):
344344 raise TypeError (
345345 "`timestamp` should be int, but {} provided" .format (
346346 type (timestamp )))
347- if not (key is None or
348- isinstance (key , (bytes , bytearray , memoryview ))):
347+ if not (key_bytes is None or
348+ isinstance (key_bytes , (bytes , bytearray , memoryview ))):
349349 raise TypeError (
350- "Not supported type for key: {}" .format (type (key )))
351- if not (value is None or
352- isinstance (value , (bytes , bytearray , memoryview ))):
350+ "Not supported type for key: {}" .format (type (key_bytes )))
351+ if not (value_bytes is None or
352+ isinstance (value_bytes , (bytes , bytearray , memoryview ))):
353353 raise TypeError (
354- "Not supported type for value: {}" .format (type (value )))
354+ "Not supported type for value: {}" .format (type (value_bytes )))
355355
356356 # Check if we have room for another message
357357 pos = len (self ._buffer )
358- size = self .size_in_bytes (offset , timestamp , key , value )
358+ size = self .size_in_bytes (offset , timestamp , key_bytes , value_bytes )
359359 # We always allow at least one record to be appended
360360 if offset != 0 and pos + size >= self ._batch_size :
361361 return None
@@ -364,11 +364,11 @@ def append(self, offset, timestamp, key, value, headers=None):
364364 self ._buffer .extend (bytearray (size ))
365365
366366 # Encode message
367- crc = self ._encode_msg (pos , offset , timestamp , key , value )
367+ crc = self ._encode_msg (pos , offset , timestamp , key_bytes , value_bytes )
368368
369369 return LegacyRecordMetadata (offset , crc , size , timestamp )
370370
371- def _encode_msg (self , start_pos , offset , timestamp , key , value ,
371+ def _encode_msg (self , start_pos , offset , timestamp , key_bytes , value_bytes ,
372372 attributes = 0 ):
373373 """ Encode msg data into the `msg_buffer`, which should be allocated
374374 to at least the size of this message.
@@ -380,24 +380,24 @@ def _encode_msg(self, start_pos, offset, timestamp, key, value,
380380 # Write key and value
381381 pos += self .KEY_OFFSET_V0 if magic == 0 else self .KEY_OFFSET_V1
382382
383- if key is None :
383+ if key_bytes is None :
384384 struct .pack_into (">i" , buf , pos , - 1 )
385385 pos += self .KEY_LENGTH
386386 else :
387- key_size = len (key )
387+ key_size = len (key_bytes )
388388 struct .pack_into (">i" , buf , pos , key_size )
389389 pos += self .KEY_LENGTH
390- buf [pos : pos + key_size ] = key
390+ buf [pos : pos + key_size ] = key_bytes
391391 pos += key_size
392392
393- if value is None :
393+ if value_bytes is None :
394394 struct .pack_into (">i" , buf , pos , - 1 )
395395 pos += self .VALUE_LENGTH
396396 else :
397- value_size = len (value )
397+ value_size = len (value_bytes )
398398 struct .pack_into (">i" , buf , pos , value_size )
399399 pos += self .VALUE_LENGTH
400- buf [pos : pos + value_size ] = value
400+ buf [pos : pos + value_size ] = value_bytes
401401 pos += value_size
402402 length = (pos - start_pos ) - self .LOG_OVERHEAD
403403
@@ -430,15 +430,15 @@ def _maybe_compress(self):
430430 else :
431431 compressed = lz4_encode (data )
432432 size = self .size_in_bytes (
433- 0 , timestamp = 0 , key = None , value = compressed )
433+ 0 , timestamp = 0 , key_bytes = None , value_bytes = compressed )
434434 # We will try to reuse the same buffer if we have enough space
435435 if size > len (self ._buffer ):
436436 self ._buffer = bytearray (size )
437437 else :
438438 del self ._buffer [size :]
439439 self ._encode_msg (
440440 start_pos = 0 ,
441- offset = 0 , timestamp = 0 , key = None , value = compressed ,
441+ offset = 0 , timestamp = 0 , key_bytes = None , value_bytes = compressed ,
442442 attributes = self ._compression_type )
443443 return True
444444 return False
@@ -455,20 +455,20 @@ def size(self):
455455
456456 # Size calculations. Just copied Java's implementation
457457
458- def size_in_bytes (self , offset , timestamp , key , value , headers = None ):
458+ def size_in_bytes (self , offset , timestamp , key_bytes , value_bytes , headers = None ):
459459 """ Actual size of message to add
460460 """
461461 assert not headers , "Headers not supported in v0/v1"
462462 magic = self ._magic
463- return self .LOG_OVERHEAD + self .record_size (magic , key , value )
463+ return self .LOG_OVERHEAD + self .record_size (magic , key_bytes , value_bytes )
464464
465465 @classmethod
466- def record_size (cls , magic , key , value ):
466+ def record_size (cls , magic , key_bytes , value_bytes ):
467467 message_size = cls .record_overhead (magic )
468- if key is not None :
469- message_size += len (key )
470- if value is not None :
471- message_size += len (value )
468+ if key_bytes is not None :
469+ message_size += len (key_bytes )
470+ if value_bytes is not None :
471+ message_size += len (value_bytes )
472472 return message_size
473473
474474 @classmethod
@@ -480,17 +480,17 @@ def record_overhead(cls, magic):
480480 return cls .RECORD_OVERHEAD_V1
481481
482482 @classmethod
483- def estimate_size_in_bytes (cls , magic , compression_type , key , value ):
483+ def estimate_size_in_bytes (cls , magic , compression_type , key_bytes , value_bytes ):
484484 """ Upper bound estimate of record size.
485485 """
486486 assert magic in [0 , 1 ], "Not supported magic"
487487 # In case of compression we may need another overhead for inner msg
488488 if compression_type :
489489 return (
490490 cls .LOG_OVERHEAD + cls .record_overhead (magic ) +
491- cls .record_size (magic , key , value )
491+ cls .record_size (magic , key_bytes , value_bytes )
492492 )
493- return cls .LOG_OVERHEAD + cls .record_size (magic , key , value )
493+ return cls .LOG_OVERHEAD + cls .record_size (magic , key_bytes , value_bytes )
494494
495495
496496class LegacyRecordMetadata (object ):
0 commit comments