-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PubSub: Subscriber Client stops acking messages after several seconds #4274
Comments
Don't have any solutions for you, but I'm also trying out the new API and seeing similar issues. I'm also seeing In any case, that modification of EDIT: I see there are several similar issues already opened. Between this, #4186, and #3886 upgrading Pub/Sub has been a real pain, but the old API holds back every other client :/ |
I think this issue is a duplicate of #4238, but leaving open just in case (and because this is a really useful bug report). I think I have a handle on the root cause of this and am working on it this week. I actually got a fix in for the exception handling issue last week, but we have not released it to PyPI yet. |
More info on this: What it appears to do is drop the ack rate to between 0/s and 10/s while StreamingPullOperations climb into the thousands. It also has thousands of ModifyAckDeadline events. So it seems to keep repulling (?) the same events and extending their deadline so they don't get resent to other subscribers. |
Any updates on this? |
@eoltean Still working on it. :-/ |
Let me make sure I understand this correctly. You are saying that your acks cease going through (except in small amounts) while the modacks do? |
Could someone recommend a version of the pubsub client which doesn't have this bug? I've just started using pubsub and this bug makes it unusable. Thanks. Note for those debugging this issue: I only see this problem on GKE and not when running locally. |
This is happening both locally and on GKE for us |
@adamlofts The last version that appears to work correctly was before the API rewrite. So the 0.27.x releases. |
@anorth2 I've downgraded and the subscriber is working nicely. Thanks. |
(Status update: Work still in progress trying to fix this.) |
Any updates on this? Our team is trying to decide if the production version of our project can rely on the Python client or if we need to switch to Go/Java. |
@lukesneeringer I see a couple of the other issues are being resolved do these effect this issue at all? Is there any other updates on this its been over 3 weeks since any update. |
We might be running into a related problem, we are currently rewriting the psq package to suit our needs and I have trouble with ack'ing messages after an exception occurred. Is the message somehow auto-nack'ed after an exception is thrown? If I start a multi-process worker I can watch the task getting redelivered to the other processes just to also fail.
@classmethod
def restore(cls, message):
"""Restore task from dumped data.
Args:
message (google.cloud.pubsub_v1.subscriber.message.Message):
Returns:
psq.task.Task: task instance for worker
"""
try:
# todo: psq: implement TaskRegistry and a sane json serialization to avoid import problems
task = unpickle(message.data) # type: Task
task.message = message
return task
except UnpickleError:
#
# import broken, it's dead, Jim.
#
message.ack() # <-- this should discard the poor message, shouldn't it?
logger.exception('Failed to unpickle task {}.'.format(message.message_id))
|
psq hasn't yet been updated to use the latest pubsub package. In fact, the
latest pubsub package has many of the features that psq implements.
Also worth noting that psq isn't an official Google product, just an
open-source project.
…On Tue, Dec 5, 2017, 1:34 AM André Cimander ***@***.***> wrote:
We might be running into the same problem, we are currently rewriting the
psq package to suit our needs and I have trouble with ack'ing messages
after an exception occurred. Is the message somehow auto-nack'ed after an
exception is thrown?
If I start a multi-process worker I can watch the task getting redelivered
to the other processes just to also fail.
google-cloud==0.30.0
google-cloud-pubsub==0.29.2
@classmethod
def restore(cls, message):
"""Restore task from dumped data.
Args:
message (google.cloud.pubsub_v1.subscriber.message.Message):
Returns:
psq.task.Task: task instance for worker
"""
try:
# todo: psq: implement TaskRegistry and a sane json serialization to avoid import problems
task = unpickle(message.data) # type: Task
task.message = message
return task
except UnpickleError:
#
# import broken, it's dead, Jim.
#
message.ack() # <-- this should discard the poor message, shouldn't it?
logger.exception('Failed to unpickle task {}.'.format(message.message_id))
ERROR 2017-12-05 10:07:15,998 psq.task Failed to unpickle task 101585298217325.
Traceback (most recent call last):
File "/home/andre/Projects/psq/src/psq/utils.py", line 47, in unpickle
obj = loads(pickled_string)
AttributeError: Can't get attribute 'mark_done' on <module 'psq.tests.worker_test' from '/home/andre/Projects/psq/src/psq/tests/worker_test.py'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/andre/Projects/psq/src/psq/task.py", line 99, in restore
task = unpickle(message.data)
File "/home/andre/Projects/psq/src/psq/utils.py", line 49, in unpickle
raise UnpickleError('Could not unpickle', pickled_string, e)
psq.utils.UnpickleError: ('Could not unpickle', b'\x80\x04\x95\x96\x00\x00\x00\x00\x00\x00\x00\x8c\x08psq.task\x94\x8c\x04Task\x94\x93\x94)\x81\x94}\x94(\x8c\x06kwargs\x94}\x94\x8c\x01f\x94\x8c\x15psq.tests.worker_test\x94\x8c\tmark_done\x94\x93\x94\x8c\x06result\x94N\x8c\x04args\x94)\x8c\x02id\x94\x8c\x011\x94\x8c\x07retries\x94K\x00\x8c\x07message\x94N\x8c\x06status\x94\x8c\x06queued\x94ub.', AttributeError("Can't get attribute 'mark_done' on <module 'psq.tests.worker_test' from '/home/andre/Projects/psq/src/psq/tests/worker_test.py'>",))
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#4274 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAPUcz_lIgZhuiTvcA4dPCWozWnT8wYrks5s9Q4UgaJpZM4QJjhU>
.
|
Any updates on this? |
I might be seeing this issue as well. I have a single subscriber listening to infrequent changes in a GCS bucket using a PubSub topic. My subscriber callback looks like this: def add_to_queue(message):
message_queue.put(message)
message.ack() Old messages keep getting delivered over and over, and eventually I think the un-ack()'d backlog blocks new messages from being processed. Edit: I rewrote my script using google-cloud-pubsub==0.27.0 as mentioned by @anorth2, and it works correctly. I'll also try the newly released 0.29.4 to see how it works. Edit 2: 0.29.4 seemed to handle message pulling a bit better but not perfectly (once it hung on receiving messages, but they seemed to be acknowledged when I restarted the app). However, it exhibits the separate behavior of eventually consuming 100% CPU that I reported in #4563, so that forced me to use 0.27.0 anyway. |
@anorth2, there is a spinlock bug in gRPC that I think may be the culprit. A PR (grpc/grpc#13665) is out as a potential fix and I am going to install Can you run a
I identified
The current example I have to reproduce the CPU spike usually takes an hour to appear, I'd love it if I could get a spike within 10 minutes so I could run a reproducible case more often. Ping me on Hangouts and maybe we can figure something out? |
I have backported that fix to the |
So I just ran my "do-nothing" reproducible case (thanks to @dmontag) that reliably thrashes the CPU after 65 minutes. I can confirm that after installing a custom I have created a Feel free to install but "buyer beware" this isn't a wheel endorsed by the
All of these commands will work on your target machine except for |
@anorth2 Would you mind trying with the patched |
I'm closing this issue in favor of #4600. The original report "PubSub: Subscriber Client stops acking messages after several seconds" has been resolved by the addition of @anorth2 Can we move future discussion there? I'll move my last pending question there for "completeness". |
Specify the API at the beginning of the title (for example, "BigQuery: ...")
General, Core, and Other are also allowed as types
OS type and version
python:3-alpine docker image running in GKE
Python version and virtual environment information
python --version
google-cloud-python version
pip show google-cloud
,pip show google-<service>
orpip freeze
google-cloud-pubsub==0.28.4
grpcio==1.6.3
Stacktrace if available
Steps to reproduce
Run example code, watch stackdriver metrics for acks vs modify_ack_deadline
Code example
Using GitHub flavored markdown can help make your request clearer.
See: https://guides.github.com/features/mastering-markdown/
The text was updated successfully, but these errors were encountered: