Skip to content

Commit 576a16b

Browse files
authored
Merge pull request #316 Fix race condition between recreate stream reader and add new commit …
2 parents faaaf88 + 21a8e69 commit 576a16b

File tree

5 files changed

+36
-9
lines changed

5 files changed

+36
-9
lines changed

ydb/_topic_reader/datatypes.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ def __post_init__(self):
8282
self._loop = None
8383

8484
def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter":
85+
self._ensure_not_closed()
86+
8587
waiter = PartitionSession.CommitAckWaiter(end_offset, self._create_future())
8688
if end_offset <= self.committed_offset:
8789
waiter._finish_ok()
@@ -121,7 +123,7 @@ def close(self):
121123
return
122124

123125
self.state = PartitionSession.State.Stopped
124-
exception = topic_reader_asyncio.TopicReaderCommitToExpiredPartition()
126+
exception = topic_reader_asyncio.PublicTopicReaderPartitionExpiredError()
125127
for waiter in self._ack_waiters:
126128
waiter._finish_error(exception)
127129

@@ -131,7 +133,7 @@ def closed(self):
131133

132134
def _ensure_not_closed(self):
133135
if self.state == PartitionSession.State.Stopped:
134-
raise topic_reader_asyncio.TopicReaderCommitToExpiredPartition()
136+
raise topic_reader_asyncio.PublicTopicReaderPartitionExpiredError()
135137

136138
class State(enum.Enum):
137139
Active = 1

ydb/_topic_reader/datatypes_test.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,15 @@ async def test_close_notify_waiters(self, session):
192192
waiter = session.add_waiter(session.committed_offset + 1)
193193
session.close()
194194

195-
with pytest.raises(topic_reader_asyncio.TopicReaderCommitToExpiredPartition):
195+
with pytest.raises(topic_reader_asyncio.PublicTopicReaderPartitionExpiredError):
196196
waiter.future.result()
197197

198198
async def test_close_twice(self, session):
199199
session.close()
200200
session.close()
201+
202+
async def test_commit_after_close(self, session):
203+
session.close()
204+
205+
with pytest.raises(topic_reader_asyncio.PublicTopicReaderPartitionExpiredError):
206+
session.add_waiter(session.committed_offset + 1)

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class TopicReaderUnexpectedCodec(YdbError):
3939
pass
4040

4141

42-
class TopicReaderCommitToExpiredPartition(TopicReaderError):
42+
class PublicTopicReaderPartitionExpiredError(TopicReaderError):
4343
"""
4444
Commit message when partition read session are dropped.
4545
It is ok - the message/batch will not commit to server and will receive in other read session
@@ -114,15 +114,22 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
114114
Write commit message to a buffer.
115115
116116
For the method no way check the commit result
117-
(for example if lost connection - commits will not re-send and committed messages will receive again)
117+
(for example if lost connection - commits will not re-send and committed messages will receive again).
118118
"""
119-
self._reconnector.commit(batch)
119+
try:
120+
self._reconnector.commit(batch)
121+
except PublicTopicReaderPartitionExpiredError:
122+
pass
120123

121124
async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
122125
"""
123126
write commit message to a buffer and wait ack from the server.
124127
125128
use asyncio.wait_for for wait with timeout.
129+
130+
may raise ydb.TopicReaderPartitionExpiredError, the error mean reader partition closed from server
131+
before receive commit ack. Message may be acked or not (if not - it will send in other read session,
132+
to this or other reader).
126133
"""
127134
waiter = self._reconnector.commit(batch)
128135
await waiter.future
@@ -174,6 +181,14 @@ async def _connection_loop(self):
174181
await asyncio.sleep(retry_info.sleep_timeout_seconds)
175182

176183
attempt += 1
184+
finally:
185+
if self._stream_reader is not None:
186+
# noinspection PyBroadException
187+
try:
188+
await self._stream_reader.close()
189+
except BaseException:
190+
# supress any error on close stream reader
191+
pass
177192

178193
async def wait_message(self):
179194
while True:
@@ -369,10 +384,10 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co
369384
raise TopicReaderError("reader can commit only self-produced messages")
370385

371386
if partition_session.reader_stream_id != self._id:
372-
raise TopicReaderCommitToExpiredPartition("commit messages after reconnect to server")
387+
raise PublicTopicReaderPartitionExpiredError("commit messages after reconnect to server")
373388

374389
if partition_session.id not in self._partition_sessions:
375-
raise TopicReaderCommitToExpiredPartition("commit messages after server stop the partition read session")
390+
raise PublicTopicReaderPartitionExpiredError("commit messages after server stop the partition read session")
376391

377392
commit_range = batch._commit_get_offsets_range()
378393
waiter = partition_session.add_waiter(commit_range.end)
@@ -620,6 +635,7 @@ async def flush(self):
620635
async def close(self):
621636
if self._closed:
622637
return
638+
623639
self._closed = True
624640

625641
self._set_first_error(TopicReaderStreamClosedError())
@@ -628,6 +644,7 @@ async def close(self):
628644

629645
for session in self._partition_sessions.values():
630646
session.close()
647+
self._partition_sessions.clear()
631648

632649
for task in self._background_tasks:
633650
task.cancel()

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ async def test_close_ack_waiters_when_close_stream_reader(
374374
waiter = partition_session.add_waiter(self.partition_session_committed_offset + 1)
375375
await wait_for_fast(stream_reader_started.close())
376376

377-
with pytest.raises(topic_reader_asyncio.TopicReaderCommitToExpiredPartition):
377+
with pytest.raises(topic_reader_asyncio.PublicTopicReaderPartitionExpiredError):
378378
waiter.future.result()
379379

380380
async def test_flush(self, stream, stream_reader_started: ReaderStream, partition_session):

ydb/topic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"TopicReaderAsyncIO",
1414
"TopicReaderSelector",
1515
"TopicReaderSettings",
16+
"TopicReaderPartitionExpiredError",
1617
"TopicStatWindow",
1718
"TopicWriteResult",
1819
"TopicWriter",
@@ -40,6 +41,7 @@
4041

4142
from ._topic_reader.topic_reader_asyncio import (
4243
PublicAsyncIOReader as TopicReaderAsyncIO,
44+
PublicTopicReaderPartitionExpiredError as TopicReaderPartitionExpiredError,
4345
)
4446

4547
from ._topic_writer.topic_writer import ( # noqa: F401

0 commit comments

Comments
 (0)