fix errors in seek when flow is not active #264
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Fix this stack trace
2022-02-04 08:04:04 [8] [ERROR] faust.transport.drivers.aiokafka [^---Consumer]: Drain messages raised: IllegalStateError("No current assignment for partition TopicPartition(topic='wfn.fos.internal.cartonreceived', partition=0)") Traceback (most recent call last): File "/pyenv/versions/fos/lib/python3.8/site-packages/faust/transport/consumer.py", line 1127, in _drain_messages async for tp, message in ait: File "/pyenv/versions/fos/lib/python3.8/site-packages/faust/transport/consumer.py", line 740, in getmany await self.perform_seek() File "/pyenv/versions/fos/lib/python3.8/site-packages/faust/transport/consumer.py", line 547, in perform_seek _committed_offsets = await self.seek_to_committed() File "/pyenv/versions/fos/lib/python3.8/site-packages/faust/transport/consumer.py", line 1373, in seek_to_committed return await self._thread.seek_to_committed() File "/pyenv/versions/fos/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 676, in seek_to_committed return await self.call_thread(self._ensure_consumer().seek_to_committed) File "/pyenv/versions/fos/lib/python3.8/site-packages/mode/threads.py", line 436, in call_thread result = await promise File "/pyenv/versions/fos/lib/python3.8/site-packages/mode/threads.py", line 383, in _process_enqueued result = await maybe_async(method(*args, **kwargs)) File "/pyenv/versions/fos/lib/python3.8/site-packages/mode/utils/futures.py", line 134, in maybe_async return await res File "/pyenv/versions/fos/lib/python3.8/site-packages/aiokafka/consumer/consumer.py", line 874, in seek_to_committed self._fetcher.seek_to(tp, offset) File "/pyenv/versions/fos/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 1162, in seek_to self._subscriptions.seek(tp, offset) File "/pyenv/versions/fos/lib/python3.8/site-packages/aiokafka/consumer/subscription_state.py", line 243, in seek self._assigned_state(tp).seek(offset) File "/pyenv/versions/fos/lib/python3.8/site-packages/aiokafka/consumer/subscription_state.py", line 123, in _assigned_state raise IllegalStateError( kafka.errors.IllegalStateError: IllegalStateError: No current assignment for partition TopicPartition(topic='wfn.fos.internal.cartonreceived', partition=0)