Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/confluent_kafka/experimental/aio/producer/_AIOProducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,27 @@ def __del__(self) -> None:
if hasattr(self, '_buffer_timeout_manager'):
self._buffer_timeout_manager.stop_timeout_monitoring()

def __len__(self) -> int:
"""Return the total number of pending messages.

This includes:
- Messages in librdkafka's output queue (waiting to be delivered to broker)
- Messages in the async batch buffer (waiting to be sent to librdkafka)

Returns:
int: Total number of pending messages across both queues
"""
if self._is_closed:
return 0

# Count messages in librdkafka queue
librdkafka_count = len(self._producer)

# Count messages in async batch buffer
buffer_count = self._batch_processor.get_buffer_size()

return librdkafka_count + buffer_count

# ========================================================================
# CORE PRODUCER OPERATIONS - Main public API
# ========================================================================
Expand Down
82 changes: 82 additions & 0 deletions tests/test_AIOProducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,3 +534,85 @@ async def test_edge_cases_batching(self, mock_producer, mock_common, basic_confi
assert mock_flush.call_count >= 1 # At least one flush

await producer.close()

@pytest.mark.asyncio
async def test_aio_producer_len_with_buffered_messages(self, mock_producer, mock_common, basic_config):
"""Test that __len__ counts messages in async batch buffer"""
producer = AIOProducer(basic_config, batch_size=10, buffer_timeout=0)
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test patches _flush_buffer to prevent auto-flush, but doesn't verify that the patch was actually called or not called. Consider adding an assertion like mock_flush.assert_not_called() after producing messages to ensure the buffer wasn't inadvertently flushed.

Copilot uses AI. Check for mistakes.

# Produce 5 messages (less than batch_size, so they stay in buffer)
with patch.object(producer, '_flush_buffer'): # Prevent auto-flush
for i in range(5):
await producer.produce('test-topic', value=f'msg-{i}'.encode())

# len() should count messages in buffer
assert len(producer) == 5
assert producer._batch_processor.get_buffer_size() == 5
assert len(producer._producer) == 0 # Nothing in librdkafka yet

await producer.close()

@pytest.mark.asyncio
async def test_aio_producer_len_after_flush(self, mock_producer, mock_common, basic_config):
"""Test that __len__ counts messages after flush to librdkafka"""
producer = AIOProducer(basic_config, batch_size=10, buffer_timeout=0)

# Produce and flush
with patch.object(producer, '_flush_buffer') as mock_flush:
for i in range(5):
await producer.produce('test-topic', value=f'msg-{i}'.encode())

# Mock flush to simulate messages moving to librdkafka
# After flush, buffer should be empty, but messages may be in librdkafka
await producer.flush()

# After flush, messages move to librdkafka queue
# len() should still count them (exact count depends on librdkafka state)
total_len = len(producer)
assert total_len >= 0 # Should be non-negative
# Buffer should be empty after flush
assert producer._batch_processor.get_buffer_size() == 0

await producer.close()

@pytest.mark.asyncio
async def test_aio_producer_len_closed_producer(self, mock_producer, mock_common, basic_config):
"""Test that __len__ returns 0 for closed producer"""
producer = AIOProducer(basic_config, batch_size=10, buffer_timeout=0)

# Produce some messages
with patch.object(producer, '_flush_buffer'): # Prevent auto-flush
for i in range(3):
await producer.produce('test-topic', value=f'msg-{i}'.encode())

# Verify messages are there
assert len(producer) == 3

# Close producer
await producer.close()

# len() should return 0 for closed producer
assert len(producer) == 0

@pytest.mark.asyncio
async def test_aio_producer_len_mixed_state(self, mock_producer, mock_common, basic_config):
"""Test __len__ when messages are in both buffer and librdkafka queue"""
producer = AIOProducer(basic_config, batch_size=5, buffer_timeout=0)

# Produce 7 messages - first 5 should flush (batch_size=5), last 2 stay in buffer
with patch.object(producer, '_flush_buffer') as mock_flush:
for i in range(7):
await producer.produce('test-topic', value=f'msg-{i}'.encode())
Comment on lines +613 to +615
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test patches _flush_buffer but doesn't verify the flushing behavior. With batch_size=5, the buffer should flush after the 5th message. Consider asserting mock_flush.call_count == 1 after the loop to confirm the expected flush occurred.

Copilot uses AI. Check for mistakes.

# After batch_size messages, some may have flushed
# Total should be sum of buffer + librdkafka queue
buffer_count = producer._batch_processor.get_buffer_size()
librdkafka_count = len(producer._producer)
total_count = len(producer)

assert total_count == buffer_count + librdkafka_count
# At least the messages beyond batch_size should be in buffer
# (exact count depends on flush behavior)
assert total_count >= 2 # At least the last 2 should be pending

await producer.close()