Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
18970ad
gcs ingest cloud function
Oct 23, 2020
e9dd912
address review feedback
Oct 27, 2020
843f871
remove extra newline
Oct 27, 2020
bf1fd01
remove project files
Oct 27, 2020
1e83107
regex destination details env var
Oct 28, 2020
64c026c
ignore intellij .idea/ configs
Oct 28, 2020
242283f
incremental loads docs
Oct 28, 2020
152f4b7
update TODOs
Oct 28, 2020
db1f2bf
Merge branch 'master' into cloud-functions-ingest
Oct 28, 2020
899203d
remove bad labels
Oct 28, 2020
79665f8
Merge branch 'cloud-functions-ingest' of github.com:jaketf/bigquery-u…
Oct 28, 2020
6335be9
move external query recursive search todo
Oct 28, 2020
36eb155
passing integration tests
Oct 30, 2020
d2d4dc9
add ci for gcs_event_based_ingest cloud function
Oct 31, 2020
4a345c8
handle duplicate pubsub notification
Oct 31, 2020
0d6fdaf
Merge branch 'master' into cloud-functions-ingest
ryanmcdowell Nov 2, 2020
a542f38
fixup ci dockerfile
Nov 2, 2020
c4da1e3
more ci
Nov 2, 2020
bfc9622
Merge branch 'cloud-functions-ingest' of github.com:jaketf/bigquery-u…
Nov 2, 2020
41dc1cd
better defaults, faster ci
Nov 3, 2020
54390f6
add integration test for partitioned tables / data
Nov 3, 2020
7e7272d
parallelize tests 4x+ speed up in ci
Nov 4, 2020
ddc2656
fixup docs
Nov 4, 2020
b0bafe3
add terraform module for deployment
Nov 5, 2020
3da292d
remove useless gitignore
Nov 5, 2020
f1b5659
add pytest.mark.IT
Nov 5, 2020
a88939b
add note on alternatives
Nov 9, 2020
aeab1ad
wip
Nov 11, 2020
c4eaa19
move cloud functions to tools
Nov 11, 2020
ea45ad8
Merge branch 'chore/move-gcf-to-tools' into cloud-functions-ingest
Nov 11, 2020
cae43a4
wip backfill cli
Nov 11, 2020
85a2ec0
backfill cli
Nov 11, 2020
eddbfd6
update docs
Nov 11, 2020
32c2570
mock function name
Nov 11, 2020
d610365
Merge branch 'master' into cloud-functions-ingest
Nov 11, 2020
29d41a6
restructure tests
Nov 11, 2020
324b8e4
add backfill cli test
Nov 11, 2020
f617f91
Merge branch 'cloud-functions-ingest' of github.com:jaketf/bigquery-u…
Nov 11, 2020
13cc042
update test docstring
Nov 11, 2020
e85af56
dry up pytest fixtures
Nov 12, 2020
668cf4a
docs on tests fixtures
Nov 12, 2020
33ff201
fixup docs formatting
Nov 12, 2020
9943926
fix default regex
Nov 12, 2020
2b63418
fixup regex docs
Nov 12, 2020
dbb1612
Update tools/cloud_functions/gcs_event_based_ingest/backfill.py
Nov 13, 2020
321f997
Update tools/cloud_functions/gcs_event_based_ingest/backfill.py
Nov 13, 2020
ff4687f
fixup! tests
Nov 13, 2020
be672fc
add docs example / guidance for overriding destination regex
Nov 13, 2020
8356f93
fixup backfill usage docs
Nov 13, 2020
fe5245a
fixup pylint
Nov 13, 2020
a58088f
fixup regex docs
Nov 13, 2020
50d84e8
fixup document and support more flexible deployment scenarios
Nov 13, 2020
9e3e963
add support for Cloud Functions direct triggering
Nov 17, 2020
5752459
Update tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest…
Nov 17, 2020
98287ef
Update tools/cloud_functions/gcs_event_based_ingest/README.md
Nov 17, 2020
b6d9323
Update tools/cloud_functions/gcs_event_based_ingest/README.md
Nov 17, 2020
f311fcf
Update tools/cloud_functions/gcs_event_based_ingest/tests/conftest.py
Nov 17, 2020
df02536
Update tools/cloud_functions/gcs_event_based_ingest/tests/cli/test_ba…
Nov 17, 2020
bcf3b96
Update tools/cloud_functions/gcs_event_based_ingest/README.md
Nov 17, 2020
72f7a4f
Update tools/cloud_functions/gcs_event_based_ingest/README.md
Nov 17, 2020
17d7356
Update tools/cloud_functions/gcs_event_based_ingest/README.md
Nov 17, 2020
50d9400
Fixup address review feedback and fix linters
Nov 17, 2020
24d380c
Merge branch 'cloud-functions-ingest' of github.com:jaketf/bigquery-u…
Nov 17, 2020
4ae77a2
fixup google style imports everywhere
Nov 17, 2020
ab8eac8
set pytest workers to auto
Nov 17, 2020
6da5ddd
don't alias use full import paths
Nov 17, 2020
eef0256
fixup! move --workers=auto to pytest.ini
Nov 17, 2020
8ccc387
fixup don't reverse implement storage.Client.get_bucket
Nov 17, 2020
348287f
fixup import google style
Nov 18, 2020
ba79922
fixup ci cloudbuild
Nov 18, 2020
91d1dc6
remove trailing whitespace
Nov 18, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tools/cloud_functions/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# BigQuery Cloud Functions
# BigQuery Cloud Functions
This directory holds reusable Google Cloud Functions for
common BigQuery use cases.

Expand Down
2 changes: 1 addition & 1 deletion tools/cloud_functions/gcs_event_based_ingest/.isort.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[settings]
src_paths=gcs_ocn_bq_ingest,test
src_paths=backfill.py,gcs_ocn_bq_ingest,test
skip=terraform_module
6 changes: 4 additions & 2 deletions tools/cloud_functions/gcs_event_based_ingest/.style.yapf
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
[style]
based_on_style = pep8
spaces_before_comment = 4
based_on_style = google
spaces_before_comment = 2
split_before_logical_operator = true
allow_split_before_default_or_named_assigns = false
[knobs]
blank_lines_between_top_level_imports_and_variables = 1
197 changes: 181 additions & 16 deletions tools/cloud_functions/gcs_event_based_ingest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,77 @@ like so:
Note, the table prefix can contain multiple sub-prefixes for handling partitions
or for configuring historical / incremental loads differently.

### Configurable Naming Convention with Regex
By Default we try to read dataset, table, partition (or yyyy/mm/dd/hh) and
batch id using the following python regex:
```python3
DEFAULT_DESTINATION_REGEX = (
r"^(?P<dataset>[\w\-\._0-9]+)/" # dataset (required)
r"(?P<table>[\w\-_0-9]+)/?" # table name (required)
r"(?P<partition>\$[0-9]+)?/?" # partition decortator (optional)
r"(?P<yyyy>[0-9]{4})?/?" # partition year (yyyy) (optional)
r"(?P<mm>[0-9]{2})?/?" # partition month (mm) (optional)
r"(?P<dd>[0-9]{2})?/?" # partition day (dd) (optional)
r"(?P<hh>[0-9]{2})?/?" # partition hour (hh) (optional)
r"(?P<batch>[\w\-_0-9]+)?/" # batch id (optional)
)
```
you can see if this meets your needs in this [regex playground](https://regex101.com/r/5Y9TDh/2)
Otherwise you can override the regex by setting the `DESTINATION_REGEX` to
better fit your naming convention on GCS. Your regex must include
[Python Regex with named capturing groups](https://docs.python.org/3/howto/regex.html#non-capturing-and-named-groups)
for destination `dataset`, and `table`.
Note, that `dataset` can optionally, explicitly specify destination project
(i.e. `gs://${BUCKET}/project_id.dataset_id/table/....`) otherwise the default
project will be inferred from Application Default Credential (the project in
which the Cloud Function is running, or the ADC configured in Google Cloud SDK
if invoked locally). This is useful in scenarios where a single deployment of
the Cloud Function is responsible for ingesting data into BigQuery tables in
projects other than the one it is deployed in. In these cases it is crucial to
ensure the service account that Cloud Functions is impersonating has the correct
permissions on all destination projects.

Your regex can optionally include for
- `partition` must be BigQuery Partition decorator with leading `$`
- `yyyy`, `mm`, `dd`, `hr` partition year, month, day, and hour
(depending on your partition granularity)
- `batch` an optional batch id to indicate multiple uploads for this partition.

For example, if your datafiles were laid out like this:
```text
gs://${BUCKET}/${SOURCE_SYSTEM}/${DATASET}/${TABLE}/region=${LOCATION}/yyyy=${YEAR}/mm=${MONTH}/dd=${DAY}/hh=${HOUR}
```
i.e.
```text
gs://my-bucket/on-prem-edw/my_product/transactions/region=US/yyyy=2020/mm=01/dd=02/hh=03/_SUCCESS
```
Then you could use [this regex](https://regex101.com/r/OLpmg4/2):
```text
DESTINATION_REGEX='(?:[\w\-_0-9]+)/(?P<dataset>[\w\-_0-9]+)/(?P<table>[\w\-_0-9]+)/region=(?P<batch>[\w]+)/yyyy=(?P<yyyy>[0-9]{4})/mm=(?P<mm>[0-9]{2})/dd=(?P<dd>[0-9]{2})/hh=(?P<hh>[0-9]{2})/'
```
In this case we can take advantage of a more known rigid structure so our regex
is simpler (no optional capturing groups, optional slashes).
Note, we can use the `region=` string (which may have been partitioned on
in an upstream system such as Hive) as a batch ID because we might expect that
an hourly partition might have multiple directories that upload to it.
(e.g. US, GB, etc). Because it is all named capturing groups we don't have any
strict ordering restrictions about batch id appearing before / after partition
information.

### Dealing with Different Naming Conventions in the Same Bucket
In most cases, it would be recommended to have separate buckets / deployment
of the Cloud Function for each naming convention as this typically means that
the upstream systems are governed by different teams.

Sometimes many upstream systems might be using different naming conventions
when uploading to the same bucket due to organizational constraints.
In this case you have two options:
1. Try to write a very flexible regex that handles all of your naming conventions with lots of optional groups.
[Regex101 is your friend!](https://regex101.com/)
1. Create separate Pub/Sub notifications and separate deployment of the
Cloud Function with the appropriate `DESTINATION_REGEX` environment variable for
each GCS prefix with a different naming convention.

### Configuration Files
The Ingestion has many optional configuration files that should live in
a special `_config/` prefix at the root of the bucket and/or under the dataset
Expand Down Expand Up @@ -106,7 +177,7 @@ This can be helpful in reprocessing scenarios where you'd want to `WRITE_TRUNCAT
a partition that had some data quality issue.

#### Hive Partitioning
If your data will be uploaded to GCS from a hadoop system that uses the
If your data will be uploaded to GCS from a hadoop system that uses the
[supported default hive partitioning](https://cloud.google.com/bigquery/docs/hive-partitioned-loads-gcs#supported_data_layouts)
you can specify this in the [`hivePartitioningOptions`](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#hivepartitioningoptions)
key of `load.json` for that table.
Expand All @@ -115,12 +186,12 @@ Any non-trivial incremental loading to partitions should usually use the
Transformation SQL to define the `INSERT / MERGE / UPDATE / DELETE` logic into
the target BQ table as these DML semantics are much more flexible thant the load
job write dispositions.
Furthermore, using external query has the added benefit of circumventing the
Furthermore, using external query has the added benefit of circumventing the
per load job bytes limits (default 15 TB) and commiting large partitions
atomically.

## Handling Incremental Loads
This solution introduces the concept of `batch_id` which uniquely identifies
This solution introduces the concept of `batch_id` which uniquely identifies
a batch of data committed by an upstream system that needs to be picked up as an
incremental load. You can again set the load job or external query configuration
at any parent folders `_config` prefix. This allows you dictate
Expand All @@ -130,21 +201,24 @@ or "for that table any new batch should `WRITE_APPEND` to it's parent partition/
## Monitoring
Monitoring what data has been loaded by this solution should be done with the
BigQuery [`INFORMATION_SCHEMA` jobs metadata](https://cloud.google.com/bigquery/docs/information-schema-jobs)
If more granular data is needed about a particular job id
If more granular data is needed about a particular job id

### Job Naming Convention
All load or external query jobs will have a job id witha prefix following this convention:
All load or external query jobs will have a job id with a prefix following this convention:
```python3
job_id_prefix=f"gcf-ingest-{dest_table_ref.dataset_id}-{dest_table_ref.table_id}-{1}-of-{1}-"
job_id_prefix=f"gcf-ingest-{dest_table_ref.dataset_id}-{dest_table_ref.table_id}"
```
Note, the prefix `gcf-ingest-` is configurable with the `JOB_PREFIX` environment
variable.

### Job Labels
All load or external query jobs are labelled with functional component and cloud function name.
All load or external query jobs are labelled with functional component and
cloud function name.
```python3
DEFAULT_JOB_LABELS = {
"component": "event-based-gcs-ingest",
"cloud-function-name": getenv("FUNCTION_NAME"),
"gcs-prefix": gs://bucket/prefix/for/this/ingest,
"bucket-id": "<bucket-for-this-notification>"
}
```
If the destination regex matches a batch group, there will be a `batch-id` label.
Expand All @@ -171,8 +245,15 @@ WHERE
```

## Triggers

### Pub/Sub Storage Notifications `_SUCCESS`
GCS Object Finalize triggers can communicate with Cloud Functions directly or
via Pub/Sub topic. This function supports both reading bucket / object id from
the Pub/Sub attributes or the Cloud Functions event schema. Pub/Sub triggers
offer some additional features as opposed to Cloud Functions direct including
filtering notifications to a prefix within a bucket (rather than bucket wide)
and controlling message retention period for the Pub/Sub topic.
More info can be found here:
[Pub/Sub Storage Notification](https://cloud.google.com/storage/docs/pubsub-notifications)
[Cloud Functions direct trigger](https://cloud.google.com/functions/docs/tutorials/storage)
1. Trigger on `_SUCCESS` File to load all other files in that directory.
1. Trigger on non-`_SUCCESS` File will no-op

Expand All @@ -187,9 +268,9 @@ We run the following CI checks to ensure code quality and avoid common pitfalls:

This CI process is defined in [cloudbuild.yaml](cloudbuild.yaml) and can be run
locally with [cloud-build-local](https://cloud.google.com/cloud-build/docs/build-debug-locally)
from this directory with:
from the root of the repo with:
```bash
cloud-build-local --config cloudbuild.yaml --dryrun=false .
cloud-build-local --config=tools/cloud_functions/gcs_event_based_ingest/cloudbuild.yaml --dryrun=false .
```

### Optimizations / Philosophy
Expand Down Expand Up @@ -225,6 +306,12 @@ Note that integration tests will spin up / tear down cloud resources that can
incur a small cost. These resources will be spun up based on your Google Cloud SDK
[Application Default Credentials](https://cloud.google.com/sdk/gcloud/reference/auth/application-default)

#### Pytest Fixtures
All Pytest fixtures are DRY-ed up into `tests/conftest.py`
This is mostly to share fixtures between the main integration test for the cloud function
and the integration tests for the backfill CLI.
See more info on sharing pytest fixtures in the [pytest docs](https://docs.pytest.org/en/stable/fixture.html#conftest-py-sharing-fixture-functions).

#### Running All Tests
```bash
pytest
Expand All @@ -240,7 +327,85 @@ pytest -m IT
```

## Deployment
It is suggested to deploy this Cloud Function with the [accompanying terraform module](terraform_module/gcs_ocn_bq_ingest_function/README.md)
It is suggested to deploy this Cloud Function with the
[accompanying terraform module](terraform_module/gcs_ocn_bq_ingest_function/README.md)

### Google Cloud SDK
Alternatively, you can deploy with Google Cloud SDK:

#### Pub/Sub Notifications
```bash
PROJECT_ID=your-project-id
TOPIC_ID=test-gcs-ocn
PUBSUB_TOPIC=projects/${PROJECT_ID/topics/${TOPIC_ID}

# Create Pub/Sub Object Change Notifications
gsutil notification create -f json -t ${PUBSUB_TOPIC} -e OBJECT_FINALIZE gs://${INGESTION_BUCKET}

# Deploy Cloud Function
gcloud functions deploy test-gcs-bq-ingest \
--region=us-west4 \
--source=gcs_ocn_bq_ingest \
--entrypoint=main \
--runtime=python38 \
--trigger-topic=${PUBSUB_TOPIC} \
--service-account=${SERVICE_ACCOUNT_EMAIL} \
--timeout=540 \
--set-env-vars='DESTINATION_REGEX=^(?:[\w\-0-9]+)/(?P<dataset>[\w\-_0-9]+)/(?P<table>[\w\-_0-9]+)/?(?:incremental|history)?/?(?P<yyyy>[0-9]{4})?/?(?P<mm>[0-9]{2})?/?(?P<dd>[0-9]{2})?/?(?P<hh>[0-9]{2})?/?(?P<batch>[0-9]+)?/?'
```

#### Cloud Functions Events
```bash
PROJECT_ID=your-project-id

# Deploy Cloud Function
gcloud functions deploy test-gcs-bq-ingest \
--region=us-west4 \
--source=gcs_ocn_bq_ingest \
--entrypoint=main \
--runtime=python38 \
--trigger-resource ${INGESTION_BUCKET} \
--trigger-event google.storage.object.finalize
--service-account=${SERVICE_ACCOUNT_EMAIL} \
--timeout=540 \
--set-env-vars='DESTINATION_REGEX=^(?:[\w\-0-9]+)/(?P<dataset>[\w\-_0-9]+)/(?P<table>[\w\-_0-9]+)/?(?:incremental|history)?/?(?P<yyyy>[0-9]{4})?/?(?P<mm>[0-9]{2})?/?(?P<dd>[0-9]{2})?/?(?P<hh>[0-9]{2})?/?(?P<batch>[0-9]+)?/?'
```

In theory, one could set up Pub/Sub notifications from multiple GCS Buckets
(owned by different teams but following a common naming convention) to the same
Pub/Sub topic so that data uploaded to any of these buckets could get
automatically loaded to BigQuery by a single deployment of the Cloud Function.

## Backfill
There are some cases where you may have data already copied to GCS according to
the naming convention / with success files before the Object Change
Notifications or Cloud Function have been set up. In these cases, you can use
the `backfill.py` CLI utility to crawl an existing bucket searching for success
files. The utility supports either invoking the Cloud Function main method
locally (in concurrent threads) or publishing notifications for the success
files (for a deployed Cloud Function to pick up).

### Usage
```
python3 -m backfill -h
usage: backfill.py [-h] --gcs-path GCS_PATH [--mode {LOCAL,NOTIFICATIONS}] [--pubsub-topic PUBSUB_TOPIC] [--success-filename SUCCESS_FILENAME] [--destination-regex DESTINATION_REGEX]

utility to backfill success file notifications or run the cloud function locally in concurrent threads.

optional arguments:
-h, --help show this help message and exit
--gcs-path GCS_PATH, -p GCS_PATH
GCS path (e.g. gs://bucket/prefix/to/search/)to search for existing _SUCCESS files
--mode {LOCAL,NOTIFICATIONS}, -m {LOCAL,NOTIFICATIONS}
How to perform the backfill: LOCAL run cloud function main method locally (in concurrent threads) or NOTIFICATIONS just push notifications to Pub/Sub for a deployed
version of the cloud function to pick up. Default is NOTIFICATIONS.
--pubsub-topic PUBSUB_TOPIC, --topic PUBSUB_TOPIC, -t PUBSUB_TOPIC
Pub/Sub notifications topic to post notifications for. i.e. projects/{PROJECT_ID}/topics/{TOPIC_ID} Required if using NOTIFICATIONS mode.
--success-filename SUCCESS_FILENAME, -f SUCCESS_FILENAME
Override the default success filename '_SUCCESS'
--destination-regex DESTINATION_REGEX, -r DESTINATION_REGEX
Override the default destination regex for determining BigQuery destination based on information encoded in the GCS path of the success file
```

## Alternatives
### BQ Tail
Expand All @@ -249,12 +414,12 @@ driven ingest to BigQuery from GCS that achieves batching based on window
in processing time (as driven by Cloud Scheduler). BQ Tail has nice features for
triggering Post actions (BQ queries / GCS file moves or deletes) once the data
is ingested, and slack notifications. bqtail is well suited for use cases where
the atomicity of event partition is not important (e.g. many distributed
publishers uploading logs to GCS). Due to dependency of certain features of
the atomicity of event partition is not important (e.g. many distributed
publishers uploading logs to GCS). Due to dependency of certain features of
bqtail on Cloud Scheduler it cannot be used inside VPC-SC perimeters.
This tool might be more appropriate when the publisher is authoritative on the
atomicity of batches (e.g. an upstream hadoop job responsible for commiting an
event time hour's worth of data).
event time hour's worth of data).

### BigQuery Data Transfer Service
[Cloud Storage Transfer](https://cloud.google.com/bigquery-transfer/docs/cloud-storage-transfer)
Expand Down
16 changes: 16 additions & 0 deletions tools/cloud_functions/gcs_event_based_ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2020 Google LLC.
# This software is provided as-is, without warranty or representation
# for any use or purpose.
# Your use of it is subject to your agreement with Google.

# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading