@@ -39,7 +39,7 @@ class TopicReaderUnexpectedCodec(YdbError):
39
39
pass
40
40
41
41
42
- class TopicReaderCommitToExpiredPartition (TopicReaderError ):
42
+ class PublicTopicReaderPartitionExpiredError (TopicReaderError ):
43
43
"""
44
44
Commit message when partition read session are dropped.
45
45
It is ok - the message/batch will not commit to server and will receive in other read session
@@ -114,15 +114,22 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
114
114
Write commit message to a buffer.
115
115
116
116
For the method no way check the commit result
117
- (for example if lost connection - commits will not re-send and committed messages will receive again)
117
+ (for example if lost connection - commits will not re-send and committed messages will receive again).
118
118
"""
119
- self ._reconnector .commit (batch )
119
+ try :
120
+ self ._reconnector .commit (batch )
121
+ except PublicTopicReaderPartitionExpiredError :
122
+ pass
120
123
121
124
async def commit_with_ack (self , batch : typing .Union [datatypes .PublicMessage , datatypes .PublicBatch ]):
122
125
"""
123
126
write commit message to a buffer and wait ack from the server.
124
127
125
128
use asyncio.wait_for for wait with timeout.
129
+
130
+ may raise ydb.TopicReaderPartitionExpiredError, the error mean reader partition closed from server
131
+ before receive commit ack. Message may be acked or not (if not - it will send in other read session,
132
+ to this or other reader).
126
133
"""
127
134
waiter = self ._reconnector .commit (batch )
128
135
await waiter .future
@@ -174,6 +181,14 @@ async def _connection_loop(self):
174
181
await asyncio .sleep (retry_info .sleep_timeout_seconds )
175
182
176
183
attempt += 1
184
+ finally :
185
+ if self ._stream_reader is not None :
186
+ # noinspection PyBroadException
187
+ try :
188
+ await self ._stream_reader .close ()
189
+ except BaseException :
190
+ # supress any error on close stream reader
191
+ pass
177
192
178
193
async def wait_message (self ):
179
194
while True :
@@ -366,10 +381,10 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co
366
381
raise TopicReaderError ("reader can commit only self-produced messages" )
367
382
368
383
if partition_session .reader_stream_id != self ._id :
369
- raise TopicReaderCommitToExpiredPartition ("commit messages after reconnect to server" )
384
+ raise PublicTopicReaderPartitionExpiredError ("commit messages after reconnect to server" )
370
385
371
386
if partition_session .id not in self ._partition_sessions :
372
- raise TopicReaderCommitToExpiredPartition ("commit messages after server stop the partition read session" )
387
+ raise PublicTopicReaderPartitionExpiredError ("commit messages after server stop the partition read session" )
373
388
374
389
commit_range = batch ._commit_get_offsets_range ()
375
390
waiter = partition_session .add_waiter (commit_range .end )
@@ -617,6 +632,7 @@ async def flush(self):
617
632
async def close (self ):
618
633
if self ._closed :
619
634
return
635
+
620
636
self ._closed = True
621
637
622
638
self ._set_first_error (TopicReaderStreamClosedError ())
@@ -625,6 +641,7 @@ async def close(self):
625
641
626
642
for session in self ._partition_sessions .values ():
627
643
session .close ()
644
+ self ._partition_sessions .clear ()
628
645
629
646
for task in self ._background_tasks :
630
647
task .cancel ()
0 commit comments