-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pub/Sub 0.28: batch support? #4141
Comments
Hi @krelst, I am afraid I am unsure what you mean by "batch pulling". Can you explain your use case? |
Thanks for the response! In versions of the I hope this clarifies my question! |
It does clarify my question, thank you. I see how this is a useful feature in principle, although when I have time to implement it is a separate question. The Pub/Sub team is primarily concerned with high throughput situations -- they actually explicitly wanted "polling" pull to be explicitly removed so that people would not use it by accident. There are a few options here. First: You could use the old polling-based Second: You could implement the batching yourself. In fact, it would probably look pretty similar to the batching class we provide on the publisher side. Here is a quick pass at a plausible implementation (note: written in the GitHub text editor; may have some silly bugs): import threading
class Batch(object):
def __init__(self, max_messages=250, max_seconds=30):
self._max_messages = max_messages
self._max_seconds = max_seconds
self._accepting_messages = True
self._accepting_messages_check_lock = threading.Lock()
self._messages = []
# Fire off a timer to commit when time elapses.
self._countdown_thread = threading.Thread(target=self.countdown)
self._countdown_thread.start()
@property
def accepting_messages(self):
return self._accepting_messages
def countdown(self):
time.sleep(self._max_seconds)
self._commit()
def commit(self):
threading.Thread(target=self._commit).start()
def _commit(self):
# Sanity check: Do not multiply-commit.
with self._accepting_messages_check_lock:
if not self._accepting_messages:
return
self._accepting_messages = False
# Actually do the work and then ack the messages.
do_something_with(self._messages) # <-- YOUR PROCESSING HERE.
for message in self._messages:
message.ack()
def add_message(self, message):
self._messages.append(message)
if len(self._messages) >= self._max_messages:
self._commit() And then, your callback becomes something like: import threading
from (somewhere) import Batch
lock = threading.Lock()
batch = None
def my_callback(message):
with lock:
if batch is None or not batch.accepting_messages:
batch = Batch()
batch.add_message(message) A few thoughts:
|
Indeed this would solve our issue :) As I don't think we are the only one relying on batching of message, we really hope this will be included in one of the next releases of the pub sub library. |
Hi @krelst, We are committed to the new Pub/Sub surface, so you can implement the above without worry about the surface breaking. If we do add built-in batching, it will be in an additive way that does not break this code. :-) As you have said this is a workable solution for you, I am going to close this issue. Thanks for reaching out! |
Hi, this thread seems to solve the issue based on message size, however in the documentation, it seems like there is a way to batch it by byte size: https://cloud.google.com/pubsub/docs/publisher#batching-configuration
what object would I pass in the batch configuration object. thanks |
Hello, I'm currently working out the problem that you can batch larger sizes than the specified byte parameter. But this would be the setting you would use. batch_settings = pubsub_v1.types.BatchSettings( |
* Tables Notebooks [(#2090)](GoogleCloudPlatform/python-docs-samples#2090) * initial commit * update census * update notebooks * remove the reference to a bug [(#2100)](GoogleCloudPlatform/python-docs-samples#2100) as the bug has been fixed in the public client lib * delete this file. [(#2102)](GoogleCloudPlatform/python-docs-samples#2102) * rename file name [(#2103)](GoogleCloudPlatform/python-docs-samples#2103) * trying to fix images [(#2101)](GoogleCloudPlatform/python-docs-samples#2101) * remove typo in installation [(#2110)](GoogleCloudPlatform/python-docs-samples#2110) * Rename census_income_prediction.ipynb to getting_started_notebook.ipynb [(#2115)](GoogleCloudPlatform/python-docs-samples#2115) renaming the notebooks as Getting Started (will be in sync with the doc). It will be great if the folder could be renamed too * added back missing file package import [(#2150)](GoogleCloudPlatform/python-docs-samples#2150) * added back missing file import [(#2145)](GoogleCloudPlatform/python-docs-samples#2145) * remove incorrect reference to Iris dataset [(#2203)](GoogleCloudPlatform/python-docs-samples#2203) * conversion to jupyter/colab [(#2340)](GoogleCloudPlatform/python-docs-samples#2340) plus bug fixes * updated for the Jupyter support [(#2337)](GoogleCloudPlatform/python-docs-samples#2337) * updated readme for support Jupyter [(#2336)](GoogleCloudPlatform/python-docs-samples#2336) to approve with the updated notebook supporting jupyter * conversion to jupyer/colab [(#2339)](GoogleCloudPlatform/python-docs-samples#2339) plus bug fixes * conversion of notebook for jupyter/Colab [(#2338)](GoogleCloudPlatform/python-docs-samples#2338) conversion of the notebook to support both Jupyter and Colab + bug fixes * [BLOCKED] AutoML Tables: Docs samples updated to use new (pending) client [(#2276)](GoogleCloudPlatform/python-docs-samples#2276) * AutoML Tables: Docs samples updated to use new (pending) client * Linter warnings * add product recommendation for automl tables notebook [(#2257)](GoogleCloudPlatform/python-docs-samples#2257) * added colab filtering notebook * update to tables client * update readme * tell user to restart kernel for automl * AutoML Tables: Notebook samples updated to use new tables client [(#2424)](GoogleCloudPlatform/python-docs-samples#2424) * fix users bug and emphasize kernal restart [(#2407)](GoogleCloudPlatform/python-docs-samples#2407) * fix problems with automl docs [(#2501)](GoogleCloudPlatform/python-docs-samples#2501) Today when we try to use the function `batch_predict` follow the docs we receive and error saying: `the paramaters should be a pandas.Dataframe` it’s happens because the first parameter of the function `batch_predict` is a pandas.Dataframe. To solve this problem we need to use de named parameters of python. * Fix typo in GCS URI parameter [(#2459)](GoogleCloudPlatform/python-docs-samples#2459) * fix: fix tables notebook links and bugs [(#2601)](GoogleCloudPlatform/python-docs-samples#2601) * feat(tables): update samples to show explainability [(#2523)](GoogleCloudPlatform/python-docs-samples#2523) * show xai * local feature importance * use updated client * use fixed library * use new model * Auto-update dependencies. [(#2005)](GoogleCloudPlatform/python-docs-samples#2005) * Auto-update dependencies. * Revert update of appengine/flexible/datastore. * revert update of appengine/flexible/scipy * revert update of bigquery/bqml * revert update of bigquery/cloud-client * revert update of bigquery/datalab-migration * revert update of bigtable/quickstart * revert update of compute/api * revert update of container_registry/container_analysis * revert update of dataflow/run_template * revert update of datastore/cloud-ndb * revert update of dialogflow/cloud-client * revert update of dlp * revert update of functions/imagemagick * revert update of functions/ocr/app * revert update of healthcare/api-client/fhir * revert update of iam/api-client * revert update of iot/api-client/gcs_file_to_device * revert update of iot/api-client/mqtt_example * revert update of language/automl * revert update of run/image-processing * revert update of vision/automl * revert update testing/requirements.txt * revert update of vision/cloud-client/detect * revert update of vision/cloud-client/product_search * revert update of jobs/v2/api_client * revert update of jobs/v3/api_client * revert update of opencensus * revert update of translate/cloud-client * revert update to speech/cloud-client Co-authored-by: Kurtis Van Gent <31518063+kurtisvg@users.noreply.github.com> Co-authored-by: Doug Mahugh <dmahugh@gmail.com> * Update dependency google-cloud-automl to v0.10.0 [(#3033)](GoogleCloudPlatform/python-docs-samples#3033) Co-authored-by: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com> Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com> * Simplify noxfile setup. [(#2806)](GoogleCloudPlatform/python-docs-samples#2806) * chore(deps): update dependency requests to v2.23.0 * Simplify noxfile and add version control. * Configure appengine/standard to only test Python 2.7. * Update Kokokro configs to match noxfile. * Add requirements-test to each folder. * Remove Py2 versions from everything execept appengine/standard. * Remove conftest.py. * Remove appengine/standard/conftest.py * Remove 'no-sucess-flaky-report' from pytest.ini. * Add GAE SDK back to appengine/standard tests. * Fix typo. * Roll pytest to python 2 version. * Add a bunch of testing requirements. * Remove typo. * Add appengine lib directory back in. * Add some additional requirements. * Fix issue with flake8 args. * Even more requirements. * Readd appengine conftest.py. * Add a few more requirements. * Even more Appengine requirements. * Add webtest for appengine/standard/mailgun. * Add some additional requirements. * Add workaround for issue with mailjet-rest. * Add responses for appengine/standard/mailjet. Co-authored-by: Renovate Bot <bot@renovateapp.com> * chore: some lint fixes [(#3750)](GoogleCloudPlatform/python-docs-samples#3750) * automl: tables code sample clean-up [(#3571)](GoogleCloudPlatform/python-docs-samples#3571) * delete unused tables_dataset samples * delete args code associated with unused automl_tables samples * delete tests associated with unused automl_tables samples * restore get_dataset method/yargs without region tagging * Restore update_dataset methodsa without region tagging Co-authored-by: Takashi Matsuo <tmatsuo@google.com> Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com> * add example of creating AutoML Tables client with non-default endpoint ('new' sdk) [(#3929)](GoogleCloudPlatform/python-docs-samples#3929) * add example of creating client with non-default endpoint * more test file cleanup * move connectivity print stmt out of test fn Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com> Co-authored-by: Torry Yang <sirtorry@users.noreply.github.com> * Replace GCLOUD_PROJECT with GOOGLE_CLOUD_PROJECT. [(#4022)](GoogleCloudPlatform/python-docs-samples#4022) * chore(deps): update dependency google-cloud-automl to v1 [(#4127)](GoogleCloudPlatform/python-docs-samples#4127) This PR contains the following updates: | Package | Update | Change | |---|---|---| | [google-cloud-automl](https://togithub.com/googleapis/python-automl) | major | `==0.10.0` -> `==1.0.1` | --- ### Release Notes <details> <summary>googleapis/python-automl</summary> ### [`v1.0.1`](https://togithub.com/googleapis/python-automl/blob/master/CHANGELOG.md#​101-httpswwwgithubcomgoogleapispython-automlcomparev100v101-2020-06-18) [Compare Source](https://togithub.com/googleapis/python-automl/compare/v0.10.0...v1.0.1) </details> --- ### Renovate configuration :date: **Schedule**: At any time (no schedule defined). :vertical_traffic_light: **Automerge**: Disabled by config. Please merge this manually once you are satisfied. :recycle: **Rebasing**: Never, or you tick the rebase/retry checkbox. :no_bell: **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box --- This PR has been generated by [WhiteSource Renovate](https://renovate.whitesourcesoftware.com). View repository job log [here](https://app.renovatebot.com/dashboard#GoogleCloudPlatform/python-docs-samples). * [tables/automl] fix: update the csv file and the dataset name [(#4188)](GoogleCloudPlatform/python-docs-samples#4188) fixes #4177 fixes #4178 * samples: Automl table batch test [(#4267)](GoogleCloudPlatform/python-docs-samples#4267) * added rtest req.txt * samples: added automl batch predict test * added missing package * Update tables/automl/batch_predict_test.py Co-authored-by: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com> Co-authored-by: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com> * samples: fixed wrong format on GCS input Uri [(#4270)](GoogleCloudPlatform/python-docs-samples#4270) ## Description Current predict sample indicates that it can multiples GCS URI inputs but it should be singular. ## Checklist - [X] Please **merge** this PR for me once it is approved. * chore(deps): update dependency pytest to v5.4.3 [(#4279)](GoogleCloudPlatform/python-docs-samples#4279) * chore(deps): update dependency pytest to v5.4.3 * specify pytest for python 2 in appengine Co-authored-by: Leah Cole <coleleah@google.com> * Update automl_tables_predict.py with batch_predict_bq sample [(#4142)](GoogleCloudPlatform/python-docs-samples#4142) Added a new method `batch_predict_bq` demonstrating running batch_prediction using BigQuery. Added notes in comments about asynchronicity for `batch_predict` method. The region `automl_tables_batch_predict_bq` will be used on cloud.google.com (currently both sections for GCS and BigQuery use the same sample code which is incorrect). Fixes #4141 Note: It's a good idea to open an issue first for discussion. - [x] Please **merge** this PR for me once it is approved. * Update dependency pytest to v6 [(#4390)](GoogleCloudPlatform/python-docs-samples#4390) * chore: exclude notebooks * chore: update templates * chore: add codeowners and fix tests * chore: ignore warnings from sphinx * chore: fix tables client * test: fix unit tests Co-authored-by: Torry Yang <sirtorry@users.noreply.github.com> Co-authored-by: florencep <florenceperot@google.com> Co-authored-by: Mike Burton <mb-github@niskala.org> Co-authored-by: Lars Wander <lwander@users.noreply.github.com> Co-authored-by: Michael Hu <Michael.an.hu@gmail.com> Co-authored-by: Michael Hu <michaelanhu@gmail.com> Co-authored-by: Alefh Sousa <alefh.sousa@gmail.com> Co-authored-by: DPEBot <dpebot@google.com> Co-authored-by: Kurtis Van Gent <31518063+kurtisvg@users.noreply.github.com> Co-authored-by: Doug Mahugh <dmahugh@gmail.com> Co-authored-by: WhiteSource Renovate <bot@renovateapp.com> Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com> Co-authored-by: Takashi Matsuo <tmatsuo@google.com> Co-authored-by: Anthony <wens.ajw@gmail.com> Co-authored-by: Amy <amy@infosleuth.net> Co-authored-by: Mike <45373284+munkhuushmgl@users.noreply.github.com> Co-authored-by: Leah Cole <coleleah@google.com> Co-authored-by: Sergei Dorogin <github@dorogin.com>
It seems that the new
google-cloud-pubsub
library version 0.28 does not include any support for pulling and processing messages in batches. Several of our production systems rely on processing pubsub messages in batches in order to reduce the load on other systems (e.g. to lower the amount of queries done our databases). Is there any ETA on re-adding batch pulling to the library?The text was updated successfully, but these errors were encountered: