@@ -741,16 +741,8 @@ def my_listener(consumer, message):
741741 if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
742742 start_message_id_inclusive: bool, default=False
743743 Set the consumer to include the given position of any reset operation like Consumer::seek.
744- batch_receive_policy: class BatchReceivePolicy, Constructor parameters (in order):
745- : param maxNumMessage: Max num message, if less than 0, it means no limit. default: -1
746- : param maxNumBytes: Max num bytes, if less than 0, it means no limit. default: 10 * 1024 * 1024
747- : param timeoutMs: If less than 0, it means no limit. default: 100
748-
749- Batch receive policy can limit the number and bytes of messages in a single batch,
750- and can specify a timeout for waiting for enough messages for this batch.
751-
752- A batch receive action is completed as long as any one of the conditions (the batch has enough number
753- or size of messages, or the waiting timeout is passed) are met.
744+ batch_receive_policy: class ConsumerBatchReceivePolicy
745+ Set the batch collection policy for batch receiving.
754746 """
755747 _check_type (str , subscription_name , 'subscription_name' )
756748 _check_type (ConsumerType , consumer_type , 'consumer_type' )
@@ -770,7 +762,7 @@ def my_listener(consumer, message):
770762 _check_type (int , max_pending_chunked_message , 'max_pending_chunked_message' )
771763 _check_type (bool , auto_ack_oldest_chunked_message_on_queue_full , 'auto_ack_oldest_chunked_message_on_queue_full' )
772764 _check_type (bool , start_message_id_inclusive , 'start_message_id_inclusive' )
773- _check_type_or_none (BatchReceivePolicy , batch_receive_policy , 'batch_receive_policy' )
765+ _check_type_or_none (ConsumerBatchReceivePolicy , batch_receive_policy , 'batch_receive_policy' )
774766
775767 conf = _pulsar .ConsumerConfiguration ()
776768 conf .consumer_type (consumer_type )
@@ -801,7 +793,7 @@ def my_listener(consumer, message):
801793 conf .auto_ack_oldest_chunked_message_on_queue_full (auto_ack_oldest_chunked_message_on_queue_full )
802794 conf .start_message_id_inclusive (start_message_id_inclusive )
803795 if batch_receive_policy :
804- conf .batch_receive_policy (batch_receive_policy )
796+ conf .batch_receive_policy (batch_receive_policy . policy () )
805797
806798 c = Consumer ()
807799 if isinstance (topic , str ):
@@ -1382,6 +1374,32 @@ def get_last_message_id(self):
13821374 """
13831375 return self ._consumer .get_last_message_id ()
13841376
1377+ class ConsumerBatchReceivePolicy :
1378+ """
1379+ Batch receive policy can limit the number and bytes of messages in a single batch,
1380+ and can specify a timeout for waiting for enough messages for this batch.
1381+
1382+ A batch receive action is completed as long as any one of the conditions (the batch has enough number
1383+ or size of messages, or the waiting timeout is passed) are met.
1384+ """
1385+ def __init__ (self , max_num_message , max_num_bytes , timeout_ms ):
1386+ """
1387+ Wrapper BatchReceivePolicy.
1388+
1389+ Parameters
1390+ ----------
1391+
1392+ max_num_message: Max num message, if less than 0, it means no limit. default: -1
1393+ max_num_bytes: Max num bytes, if less than 0, it means no limit. default: 10 * 1024 * 1024
1394+ timeout_ms: If less than 0, it means no limit. default: 100
1395+ """
1396+ self ._policy = BatchReceivePolicy (max_num_message , max_num_bytes , timeout_ms )
1397+
1398+ def policy (self ):
1399+ """
1400+ Returns the actual one BatchReceivePolicy.
1401+ """
1402+ return self ._policy
13851403
13861404class Reader :
13871405 """
0 commit comments