Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub subscription.pull() fails silently in multiprocessing #3988

Closed
michaelplaing opened this issue Sep 18, 2017 · 3 comments
Closed

PubSub subscription.pull() fails silently in multiprocessing #3988

michaelplaing opened this issue Sep 18, 2017 · 3 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. grpc

Comments

@michaelplaing
Copy link

michaelplaing commented Sep 18, 2017

  • Python multiprocessing is broken by grpcio==0.1.6.
  • Downgrading to grpcio==0.1.4 is a workaround.
  • The 'spawn' context option should be a workaround but is not because RLock objects are present and cannot be pickled for transmission to child processes.
  1. OS type and version
    MacOs and linux (Google App Engine Flex)

  2. Python version and virtual environment information python --version
    python 3.6.2

  3. pip freeze
    cachetools==2.0.1
    certifi==2017.7.27.1
    chardet==3.0.4
    dill==0.2.7.1
    future==0.16.0
    gapic-google-cloud-pubsub-v1==0.15.4
    google-auth==1.1.0
    google-cloud-core==0.26.0
    google-cloud-pubsub==0.27.0
    google-gax==0.15.14
    googleapis-common-protos==1.5.2
    grpc-google-iam-v1==0.11.3
    grpcio==1.6.0
    httplib2==0.10.3
    idna==2.6
    monotonic==1.3
    oauth2client==3.0.0
    ply==3.8
    proto-google-cloud-pubsub-v1==0.15.4
    protobuf==3.4.0
    pyasn1==0.3.5
    pyasn1-modules==0.1.4
    requests==2.18.4
    rsa==3.4.2
    six==1.11.0
    tenacity==4.4.0
    urllib3==1.22

  4. Stacktrace if available
    fails silently

  5. Steps to reproduce
    . virtualenv -p python3 env
    . . env/bin/activate
    . pip install google-cloud-pubsub==0.27.0
    . create topic and subscription for example (see below)
    . python <example>.py

  6. Code example

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Works with grpcio==0.1.4
# Fails silently with grpcio==0.1.6 (the default 2017-09-18)

from multiprocessing import Process

from google.cloud import pubsub


class MyClass(object):
    def __init__(self):
        c = pubsub.Client()
        t = c.topic('my-topic')  # created previously
        self._sub = t.subscription('my-sub')  # created previously

    def print_messages(self):
        print('starting worker...')
        
        while True:
            try:
                envelopes = self._sub.pull()
                
                if envelopes:
                    ack_id, message = envelopes[0]
                    
                    print(
                        'ack_id: {}\ndata: {}; attributes: {}'.format(
                            ack_id, message.data, message.attributes
                        )
                    )
                    
                    self._sub.acknowledge([ack_id])
            except KeyboardInterrupt:
                break
                
        print('shutdown worker.')
            

if __name__ == '__main__':
    my_class = MyClass()
    p = Process(target=my_class.print_messages)
    p.start()
    
    try:
        p.join()
    except KeyboardInterrupt:
        print('shutdown parent.')
    
@michaelplaing
Copy link
Author

@michaelplaing
Copy link
Author

  • The root problem appears to be that grpcio==0.1.6 starts threads during import.
  • Hence, when the main thread is forked by multiprocessing.Process.start(), these threads are left behind or otherwise messed up.
  • Therefore an alternate workaround could be to do a lazy import in the worker.
    -> This works, as illustrated by the reworked example:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Works with grpcio==0.1.6 because import is lazy

from multiprocessing import Process

class MyClass(object):
    def __init__(self):
        pass
        
    def _lazy_init(self):
        from google.cloud import pubsub
        
        c = pubsub.Client()
        t = c.topic('my-topic')  # created previously
        self._sub = t.subscription('my-sub')  # created previously

    def print_messages(self):
        print('starting worker...')
        self._lazy_init()
        
        while True:
            try:
                envelopes = self._sub.pull()
                
                if envelopes:
                    ack_id, message = envelopes[0]
                    
                    print(
                        'ack_id: {}\ndata: {}; attributes: {}'.format(
                            ack_id, message.data, message.attributes
                        )
                    )
                    
                    self._sub.acknowledge([ack_id])
            except KeyboardInterrupt:
                break
                
        print('shutdown worker.')
            

if __name__ == '__main__':
    my_class = MyClass()
    p = Process(target=my_class.print_messages)
    p.start()
    
    try:
        p.join()
    except KeyboardInterrupt:
        print('shutdown parent.')

@lukesneeringer
Copy link
Contributor

This is definitely caused by grpc/grpc#12455. I am pinning the client libraries to < 1.6 to work around this issue for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. grpc
Projects
None yet
Development

No branches or pull requests

3 participants