Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix errors in seek when flow is not active #264

Merged
merged 3 commits into from
Feb 4, 2022
Merged

Conversation

patkivikram
Copy link
Collaborator

@patkivikram patkivikram commented Feb 4, 2022

Description

Fix this stack trace

2022-02-04 08:04:04 [8] [ERROR] faust.transport.drivers.aiokafka [^---Consumer]: Drain messages raised: IllegalStateError("No current assignment for partition TopicPartition(topic='wfn.fos.internal.cartonreceived', partition=0)") Traceback (most recent call last): File "/pyenv/versions/fos/lib/python3.8/site-packages/faust/transport/consumer.py", line 1127, in _drain_messages async for tp, message in ait: File "/pyenv/versions/fos/lib/python3.8/site-packages/faust/transport/consumer.py", line 740, in getmany await self.perform_seek() File "/pyenv/versions/fos/lib/python3.8/site-packages/faust/transport/consumer.py", line 547, in perform_seek _committed_offsets = await self.seek_to_committed() File "/pyenv/versions/fos/lib/python3.8/site-packages/faust/transport/consumer.py", line 1373, in seek_to_committed return await self._thread.seek_to_committed() File "/pyenv/versions/fos/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 676, in seek_to_committed return await self.call_thread(self._ensure_consumer().seek_to_committed) File "/pyenv/versions/fos/lib/python3.8/site-packages/mode/threads.py", line 436, in call_thread result = await promise File "/pyenv/versions/fos/lib/python3.8/site-packages/mode/threads.py", line 383, in _process_enqueued result = await maybe_async(method(*args, **kwargs)) File "/pyenv/versions/fos/lib/python3.8/site-packages/mode/utils/futures.py", line 134, in maybe_async return await res File "/pyenv/versions/fos/lib/python3.8/site-packages/aiokafka/consumer/consumer.py", line 874, in seek_to_committed self._fetcher.seek_to(tp, offset) File "/pyenv/versions/fos/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 1162, in seek_to self._subscriptions.seek(tp, offset) File "/pyenv/versions/fos/lib/python3.8/site-packages/aiokafka/consumer/subscription_state.py", line 243, in seek self._assigned_state(tp).seek(offset) File "/pyenv/versions/fos/lib/python3.8/site-packages/aiokafka/consumer/subscription_state.py", line 123, in _assigned_state raise IllegalStateError( kafka.errors.IllegalStateError: IllegalStateError: No current assignment for partition TopicPartition(topic='wfn.fos.internal.cartonreceived', partition=0)

@patkivikram patkivikram merged commit a313821 into master Feb 4, 2022
@wbarnha wbarnha deleted the fix_error_on_seek branch July 3, 2023 03:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant