Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IllegalStateException on seek to offset of a partition that was removed by a rebalance #54

Closed
1 of 2 tasks
bobh66 opened this issue Dec 6, 2020 · 2 comments
Closed
1 of 2 tasks

Comments

@bobh66
Copy link
Contributor

bobh66 commented Dec 6, 2020

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

faust-streaming 0.3.0
aiokafka 0.7.0

In this example, partitions 1, 5 and 10 were originally assigned but after the rebalance partition 1 is no longer assigned. The code tries to seek to the committed offset for partition 1 and crashes:

[2020-12-06 15:31:12,106] [11] [WARNING] Heartbeat failed for group spe-5 because it is rebalancing 
[2020-12-06 15:31:12,106] [11] [INFO] Revoking previously assigned partitions 
+Topic Partition Set----------+
| topic          | partitions |
+----------------+------------+
| spe_requests_5 | {1, 5, 10} |
+----------------+------------+ for group spe-5 
[2020-12-06 15:31:12,108] [11] [INFO] (Re-)joining group spe-5 
[2020-12-06 15:31:12,108] [11] [INFO] [^---Recovery]: Resuming flow... 
[2020-12-06 15:31:12,109] [11] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2020-12-06 15:31:12,121] [11] [INFO] Joined group 'spe-5' (generation 968) with member_id faust-0.3.0-9126a240-a759-4e9f-bca9-cc0ad3f20f7e 
[2020-12-06 15:31:12,128] [11] [INFO] Successfully synced group spe-5 with generation 968 
[2020-12-06 15:31:12,129] [11] [INFO] Setting newly assigned partitions 
+Topic Partition Set----------+
| topic          | partitions |
+----------------+------------+
| spe_requests_5 | {5, 10}    |
+----------------+------------+ for group spe-5 
[2020-12-06 15:31:12,131] [11] [ERROR] [^---Recovery]: Crashed reason=IllegalStateError("No current assignment for partition TopicPartition(topic='spe_requests_5', partition=1)") 
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 802, in _execute_task
    await task
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 349, in _restart_recovery
    await T(self._resume_streams)()
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 297, in _resume_streams
    consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 556, in _wait
    wait_result = await self.wait_first(coro, signal, timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 715, in wait_first
    f.result()  # propagate exceptions
  File "/usr/local/lib/python3.7/site-packages/faust/transport/consumer.py", line 534, in perform_seek
    _committed_offsets = await self.seek_to_committed()
  File "/usr/local/lib/python3.7/site-packages/faust/transport/consumer.py", line 1315, in seek_to_committed
    return await self._thread.seek_to_committed()
  File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 509, in seek_to_committed
{"version": "1.0", "type": "log", "log": {"message": "[^---Recovery]: Crashed reason=IllegalStateError(\"No current assignment for partition TopicPartition(topic='spe_requests_5', partition=1)\")"}, "time": "2020-12-06T15:31:12.136Z", "level": "err"}
    return await self.call_thread(self._ensure_consumer().seek_to_committed)
  File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/usr/local/lib/python3.7/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 868, in seek_to_committed
    self._fetcher.seek_to(tp, offset)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/fetcher.py", line 1162, in seek_to
    self._subscriptions.seek(tp, offset)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/subscription_state.py", line 239, in seek
    self._assigned_state(tp).seek(offset)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/subscription_state.py", line 120, in _assigned_state
    "No current assignment for partition {}".format(tp))
kafka.errors.IllegalStateError: IllegalStateError: No current assignment for partition TopicPartition(topic='spe_requests_5', partition=1)

Expected behavior

Should not attempt to seek on a partition that has been removed. Maybe catch the exception and restart the fetcher?

Actual behavior

Process crashes on exception

Full traceback

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 802, in _execute_task
    await task
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 349, in _restart_recovery
    await T(self._resume_streams)()
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 297, in _resume_streams
    consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 556, in _wait
    wait_result = await self.wait_first(coro, signal, timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 715, in wait_first
    f.result()  # propagate exceptions
  File "/usr/local/lib/python3.7/site-packages/faust/transport/consumer.py", line 534, in perform_seek
    _committed_offsets = await self.seek_to_committed()
  File "/usr/local/lib/python3.7/site-packages/faust/transport/consumer.py", line 1315, in seek_to_committed
    return await self._thread.seek_to_committed()
  File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 509, in seek_to_committed
{"version": "1.0", "type": "log", "log": {"message": "[^---Recovery]: Crashed reason=IllegalStateError(\"No current assignment for partition TopicPartition(topic='spe_requests_5', partition=1)\")"}, "time": "2020-12-06T15:31:12.136Z", "level": "err"}
    return await self.call_thread(self._ensure_consumer().seek_to_committed)
  File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/usr/local/lib/python3.7/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 868, in seek_to_committed
    self._fetcher.seek_to(tp, offset)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/fetcher.py", line 1162, in seek_to
    self._subscriptions.seek(tp, offset)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/subscription_state.py", line 239, in seek
    self._assigned_state(tp).seek(offset)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/subscription_state.py", line 120, in _assigned_state
    "No current assignment for partition {}".format(tp))

Versions

  • Python version - 3.7.3
  • Faust version - 0.3.0
  • Operating system - Centos7
  • Kafka version - 4.0.0-5.3.1
  • RocksDB version (if applicable)
@austinnichols101
Copy link
Contributor

Receiving the same message above plus the following exception:

During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/mode/services.py", line 802, in _execute_task
    await task
  File "/usr/local/lib/python3.7/dist-packages/faust/transport/consumer.py", line 823, in _commit_handler
    await self.commit()
  File "/usr/local/lib/python3.7/dist-packages/faust/transport/consumer.py", line 865, in commit
    start_new_transaction=start_new_transaction,
  File "/usr/local/lib/python3.7/dist-packages/mode/services.py", line 460, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/faust/transport/consumer.py", line 899, in force_commit
    commit_tps, start_new_transaction=start_new_transaction
  File "/usr/local/lib/python3.7/dist-packages/faust/transport/consumer.py", line 915, in _commit_tps
    commit_offsets, start_new_transaction=start_new_transaction
  File "/usr/local/lib/python3.7/dist-packages/faust/transport/consumer.py", line 983, in _commit_offsets
    did_commit = await self._commit(committable_offsets)
  File "/usr/local/lib/python3.7/dist-packages/faust/transport/consumer.py", line 1354, in _commit
    return await self._thread.commit(offsets)
  File "/usr/local/lib/python3.7/dist-packages/faust/transport/drivers/aiokafka.py", line 513, in commit
    return await self.call_thread(self._commit, offsets)
  File "/usr/local/lib/python3.7/dist-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/usr/local/lib/python3.7/dist-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/usr/local/lib/python3.7/dist-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/usr/local/lib/python3.7/dist-packages/faust/transport/drivers/aiokafka.py", line 536, in _commit
    self.supervisor.wakeup()
AttributeError: 'NoneType' object has no attribute 'wakeup'

@austinnichols101
Copy link
Contributor

@patkivikram I believe this issue has been resolved as of the 0.3.2 beta versions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants