Skip to content

Commit faaaf88

Browse files
authored
Merge pull request #315 Fix release buffer while read messages one by one
2 parents 6276d3e + fa263d2 commit faaaf88

File tree

2 files changed

+46
-7
lines changed

2 files changed

+46
-7
lines changed

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,14 +348,17 @@ def receive_batch_nowait(self):
348348
return batch
349349

350350
def receive_message_nowait(self):
351+
if self._get_first_error():
352+
raise self._get_first_error()
353+
351354
try:
352355
batch = self._message_batches[0]
353356
message = batch.pop_message()
354357
except IndexError:
355358
return None
356359

357360
if batch.empty():
358-
self._message_batches.popleft()
361+
self.receive_batch_nowait()
359362

360363
return message
361364

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,16 @@ def create_message(
213213
)
214214

215215
async def send_message(self, stream_reader, message: PublicMessage):
216+
await self.send_batch(stream_reader, [message])
217+
218+
async def send_batch(self, stream_reader, batch: typing.List[PublicMessage]):
219+
if len(batch) == 0:
220+
return
221+
222+
first_message = batch[0]
223+
for message in batch:
224+
assert message._partition_session is first_message._partition_session
225+
216226
def batch_count():
217227
return len(stream_reader._message_batches)
218228

@@ -225,7 +235,7 @@ def batch_count():
225235
server_message=StreamReadMessage.ReadResponse(
226236
partition_data=[
227237
StreamReadMessage.ReadResponse.PartitionData(
228-
partition_session_id=message._partition_session.id,
238+
partition_session_id=first_message._partition_session.id,
229239
batches=[
230240
StreamReadMessage.ReadResponse.Batch(
231241
message_data=[
@@ -237,11 +247,12 @@ def batch_count():
237247
uncompresed_size=len(message.data),
238248
message_group_id=message.message_group_id,
239249
)
250+
for message in batch
240251
],
241-
producer_id=message.producer_id,
242-
write_session_meta=message.session_metadata,
252+
producer_id=first_message.producer_id,
253+
write_session_meta=first_message.session_metadata,
243254
codec=Codec.CODEC_RAW,
244-
written_at=message.written_at,
255+
written_at=first_message.written_at,
245256
)
246257
],
247258
)
@@ -1066,13 +1077,15 @@ async def test_read_message(
10661077
async def test_receive_batch_nowait(self, stream, stream_reader, partition_session):
10671078
assert stream_reader.receive_batch_nowait() is None
10681079

1080+
initial_buffer_size = stream_reader._buffer_size_bytes
1081+
10691082
mess1 = self.create_message(partition_session, 1, 1)
10701083
await self.send_message(stream_reader, mess1)
10711084

10721085
mess2 = self.create_message(partition_session, 2, 1)
10731086
await self.send_message(stream_reader, mess2)
10741087

1075-
initial_buffer_size = stream_reader._buffer_size_bytes
1088+
assert stream_reader._buffer_size_bytes == initial_buffer_size - 2 * self.default_batch_size
10761089

10771090
received = stream_reader.receive_batch_nowait()
10781091
assert received == PublicBatch(
@@ -1090,14 +1103,37 @@ async def test_receive_batch_nowait(self, stream, stream_reader, partition_sessi
10901103
_codec=Codec.CODEC_RAW,
10911104
)
10921105

1093-
assert stream_reader._buffer_size_bytes == initial_buffer_size + 2 * self.default_batch_size
1106+
assert stream_reader._buffer_size_bytes == initial_buffer_size
10941107

10951108
assert StreamReadMessage.ReadRequest(self.default_batch_size) == stream.from_client.get_nowait().client_message
10961109
assert StreamReadMessage.ReadRequest(self.default_batch_size) == stream.from_client.get_nowait().client_message
10971110

10981111
with pytest.raises(asyncio.QueueEmpty):
10991112
stream.from_client.get_nowait()
11001113

1114+
async def test_receive_message_nowait(self, stream, stream_reader, partition_session):
1115+
assert stream_reader.receive_batch_nowait() is None
1116+
1117+
initial_buffer_size = stream_reader._buffer_size_bytes
1118+
1119+
await self.send_batch(
1120+
stream_reader, [self.create_message(partition_session, 1, 1), self.create_message(partition_session, 2, 1)]
1121+
)
1122+
await self.send_batch(
1123+
stream_reader,
1124+
[
1125+
self.create_message(partition_session, 10, 1),
1126+
],
1127+
)
1128+
1129+
assert stream_reader._buffer_size_bytes == initial_buffer_size - 2 * self.default_batch_size
1130+
1131+
for expected_seqno in [1, 2, 10]:
1132+
mess = stream_reader.receive_message_nowait()
1133+
assert mess.seqno == expected_seqno
1134+
1135+
assert stream_reader._buffer_size_bytes == initial_buffer_size
1136+
11011137
async def test_update_token(self, stream):
11021138
settings = PublicReaderSettings(
11031139
consumer="test-consumer",

0 commit comments

Comments
 (0)