Note
this is not an official Google product, experimental or otherwise and is provided without support. It is intended as a sample library for demonstrating a set of use cases for Google Cloud Pub/Sub. The official Pub/Sub client library should be used for production applications.
psq
is an example Python implementation of a simple distributed task
queue using Google Cloud Pub/Sub.
psq
requires minimal configuration and relies on Cloud Pub/Sub to
provide scalable and reliable messaging.
psq
is intentionally similar to rq and
simpleq, and takes some
inspiration from celery and this
blog
post.
Install via pip:
pip install psq
- A project on the Google Developers Console.
- The Google Cloud SDK installed locally.
- You will need the Cloud Pub/Sub API enabled on your project. The link will walk you through enabling the API.
- You will need to run
gcloud auth
before running these examples so that authentication to Google Cloud Platform services is handled transparently.
First, create a task:
def adder(a, b):
return a + b
Then, create a pubsub client and a queue:
from google.cloud import pubsub_v1
import psq
project = 'your-project-id'
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
q = psq.Queue(publisher, subscriber, project)
Now you can enqueue tasks:
from tasks import adder
q.enqueue(adder)
In order to get task results, you have to configure storage:
from google.cloud import pubsub_v1
from google.cloud import datastore
import psq
project = 'your-project-id'
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
ds_client = datastore.Client()
q = psq.Queue(
publisher, subscriber, project,
storage=psq.DatastoreStorage(ds_client))
With storage configured, you can get the result of a task:
r = q.enqueue(adder, 5, 6)
r.result() # -> 11
You can also define multiple queues:
fast = psq.Queue(publisher, subscriber, project, 'fast')
slow = psq.Queue(publisher, subscriber, project, 'slow')
Because psq
is largely similar to rq
, similar rules around tasks
apply. You can put any Python function call on a queue, provided:
- The function is importable by the worker. This means the
__module__
that the function lives in must be importable. Notably, you can't enqueue functions that are declared in the main module - such as tasks defined in a file that is run directly withpython
or via the interactive interpreter. - The function can be a string, but it must be the absolutely importable path to a function that the worker can import. Otherwise, the task will fail.
- The worker and the applications queuing tasks must share exactly the same source code.
- The function can't depend on global context such as global variables, current_request, etc. Pass any needed context into the worker at queue time.
Pub/sub guarantees your tasks will be delivered to the workers, but
psq
doesn't presently guarantee that a task completes execution or
exactly-once semantics, though it does allow you to provide your own
mechanisms for this. This is similar to Celery's
default
configuration.
Task completion guarantees can be provided via late ack support. Late ack is possible with Cloud Pub/sub, but it currently not implemented in this library. See CONTRIBUTING.md.
There are many approaches for exactly-once semantics, such as distributed locks. This is possible in systems such as zookeeper and redis.
Execute psqworker
in the same directory where you tasks are
defined:
psqworker.py config.q
psqworker
only operates on one queue at a time. If you want a server
to listen to multiple queues, use something like
supervisord to run multiple psqworker
processes.
A normal queue will send a single task to a single worker, spreading your tasks over all workers listening to the same queue. There are also broadcast queues, which will deliver a copy of the task to every worker. This is useful in situations where you want every worker to execute the same task, such as installing or upgrading software on every server.
broadcast_q = psq.BroadcastQueue(publisher, subscriber, project)
def restart_apache_task():
call(["apachectl", "restart"])
broadcast_q.enqueue(restart_apache_task)
Broadcast queues provide an implementation of the solution described in Reliable Task Scheduling on Google Compute Engine.
Note: broadcast queues do not currently support any form of storage and do not support return values.
Raising psq.Retry
in your task will cause it to be retried.
from psq import Retry
def retry_if_fail(self):
try:
r = requests.get('http://some.flaky.service.com')
except Exception as e:
logging.error(e)
raise Retry()
You can bind an extra context manager to the queue.
app = Flask(__name__)
q = psq.Queue(extra_context=app.app_context)
This will ensure that the context is available in your tasks, which is useful for things such as database connections, etc.:
from flask import current_app
def flasky_task():
backend = current_app.config['BACKEND']
During unit tests you most certainly don't want to spin up workers, but instead execute the enqueued functions immediately and synchronously. To do this, pass asynchronous=False to the Queue's constructor (default is True). Also, you don't have to provide a publisher, subscriber or project arguments in this case, just pass None for all them to the queue.
q = psq.Queue(None, None, project=None, asynchronous=False)
r = q.enqueue(adder, 1, 2) # Will be run immediately
- some sort of storage solution for broadcast queues.
- Memcache/redis value store.
- @task decorator that adds a delay/defer function.
- Task chaining / groups / chords.
- Late ack.
- Gevent worker.
- batch support for queueing.
- See CONTRIBUTING.md
- Apache 2.0 - See LICENSE