Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
6fd7db9
feat: tf gcs-bq-ingest module sets bq permissions
Nov 23, 2020
d2f00ce
fixup roles
Nov 23, 2020
65d9515
fixup dockerfil ci check
Nov 24, 2020
7fdffd7
docs: add note on unicode delimiters
Nov 24, 2020
537f05d
fix: update nested values in configs
Nov 30, 2020
02458b8
fix: external configs not found in parent dirs
Nov 25, 2020
d951ebf
feat: bq project env-var
Dec 2, 2020
c77908f
Merge branch 'feat/bq-project-env-var'
Dec 3, 2020
1768414
Merge branch 'fix/configDiscovery'
Dec 3, 2020
ddf3391
Merge branch 'docs/delimiters'
Dec 3, 2020
abbf5ea
Merge branch 'tf/multi-project'
Dec 3, 2020
cf23f1b
Move utility methods into a utils module
Dec 4, 2020
b52d291
Fix sorting issues
Dec 4, 2020
3276214
Move out constants into their own file
Dec 4, 2020
117d91b
fixup! pylint
Dec 5, 2020
3770c84
fixup! fixup! gcb pylint issue
Dec 5, 2020
8b1982b
Merge pull request #1 from jaketf/feat/move-utils
Dec 5, 2020
81bb167
feat: sequencing with backlog publisher / subscriber
Dec 4, 2020
3c798f7
fixup! mypy pylint
Dec 9, 2020
c631150
fixup! flake8
Dec 9, 2020
d5fabfa
fixup! mypy tests
Dec 9, 2020
1c26e23
support _config/*.sql for bq tranform sql
Dec 9, 2020
d16fb1b
improve performance of wait_on_bq_job
Dec 9, 2020
c627af0
wip
Dec 10, 2020
35e26d9
fixup! handle race condition
Dec 10, 2020
8c97f5a
ordering docs and isort single line rule
Dec 10, 2020
70d2d2b
docs
Dec 10, 2020
6ec3625
fixup linters
Dec 10, 2020
2d0e5a8
fixup import style
Dec 10, 2020
7cb00e4
typing isort single line exclusion
Dec 10, 2020
0be46f9
fixup gcb no-name-in-module bug
Dec 10, 2020
9a0ee10
add test of subscriber after subscriber exit
Dec 10, 2020
feb867e
chores: tf updates, larger machine type, etc.
Dec 11, 2020
2218212
terraform fmt
Dec 11, 2020
d528d85
handle abandoned _BACKFILL and other review feedback
Dec 11, 2020
a0114e1
improve tests
Dec 12, 2020
def1ddb
fix: handle long running bq jobs
Dec 14, 2020
ddaf280
chore: add e2e test, fixup terraform
Dec 15, 2020
c18e5e9
ignore pylint redherring import errors
Dec 15, 2020
2c4376a
fixup! e2e tf to support builds where short_sha is set to empty string.
Dec 15, 2020
b6690af
fix TF_VAR env var
Dec 15, 2020
36be628
enable resource manager api
Dec 15, 2020
6103743
enable cloud functions api...
Dec 15, 2020
edcdae5
add unit test timeout
Dec 15, 2020
63f480d
explicit local backend
Dec 15, 2020
03d9b79
debug missing state file
Dec 15, 2020
fa82f12
debug
Dec 15, 2020
d1acf9e
relative state path
Dec 15, 2020
b9e741c
typo .[tf]state
Dec 15, 2020
dadacaa
fixup docs
Dec 15, 2020
41f04ae
chore: clean up subscriber
Dec 15, 2020
d8ae3cf
fix: don't try to regex match _backlog/* items
Dec 15, 2020
d9f3482
don't regex match in triage if ordering enabled (this happens later)
Dec 15, 2020
7d2f28f
fix: subscriber monitor get table prefix
Dec 15, 2020
35fe6e3
fix: get_table_prefix issues w/ backlog, backfill and historydone
Dec 15, 2020
d93a2c9
fix: look_for_config_in_parents should return empty string for empty …
Dec 15, 2020
d50fefc
fix table prefix w/ trailing slash
Dec 15, 2020
b16a8b0
use get_table_prefix instead of removesuffix
Dec 15, 2020
f685511
chore: refactor terraform into pytest fixture to always clean up
Dec 16, 2020
905949d
fix don't removesuffix for start backfill file
Dec 16, 2020
675c756
fixup isort
Dec 16, 2020
f0ebcd0
more logging statements fail on untriageable event
Dec 16, 2020
b83fee8
fix pylint
Dec 16, 2020
c9263b7
Merge branch 'master' into sequencing-develop
Dec 16, 2020
eae687f
feat: env-var t numDmlRowsAffected = 0 as a failure
Dec 16, 2020
94136b6
[skip ci] add comment to cloudbuild.yaml
Dec 16, 2020
790abb1
[skip ci] update comment in cloudbuild.yaml
Dec 16, 2020
94ca2f6
chore: clean up unused fixture, init files
Dec 16, 2020
b216d88
chore: improve terraform printint in pytest fixture
Dec 16, 2020
d5fe02b
better bq job ids
Dec 16, 2020
fcb88a0
fixup regex escaping
Dec 16, 2020
85cea34
make pylint happy
Dec 16, 2020
f7af0fb
[skip ci] more docs
Dec 16, 2020
7971bc3
fix default load config return type
Dec 17, 2020
de19c98
fix: fail on failure of children jobs
Jan 7, 2021
61d2c14
chore: add test for child job failing behavior
Jan 8, 2021
fb69a6a
fixup flake8
Jan 8, 2021
1aec908
fixup flake8
Jan 8, 2021
0490217
fixup flake8
Jan 8, 2021
3c3bd3d
feat: separate bq storage and compute project env vars
Jan 11, 2021
14d6cc1
feat: add utility for dry running external queries
Jan 12, 2021
43d3ee7
fixup! module docstring for dry_run_external cli
Jan 13, 2021
9b8e780
fixup! flake8
Jan 13, 2021
14922df
support non-dryrun
Jan 14, 2021
ca70175
fixup dry run default
Jan 15, 2021
9e8e52f
fix: don't require escaping braces in sql, still support {dest_datase…
Jan 22, 2021
854aa68
happy newyear! copyright 2020 -> 2021
Jan 22, 2021
f780171
Merge branch 'sequencing-develop' into dry-run-external
Feb 4, 2021
493426d
fixup! isort
Feb 4, 2021
6db3d5f
fixup! flake8
Feb 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tools/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__/
2 changes: 1 addition & 1 deletion tools/cloud_functions/gcs_event_based_ingest/.flake8
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[flake8]
max-line-length = 110
ignore = E731,W504,I001,W503,E402
exclude = .svn,CVS,.bzr,.hg,.git,__pycache__,.eggs,*.egg,node_modules,.venv
exclude = .svn,CVS,.bzr,.hg,.git,__pycache__,.eggs,*.egg,node_modules,.venv,.terraform
# format = ${cyan}%(path)s${reset}:${yellow_bold}%(row)d${reset}:${green_bold}%(col)d${reset}: ${red_bold}%(code)s${reset} %(text)s

2 changes: 2 additions & 0 deletions tools/cloud_functions/gcs_event_based_ingest/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
prof/
test.log
2 changes: 2 additions & 0 deletions tools/cloud_functions/gcs_event_based_ingest/.hadolint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ignored:
- DL3008
2 changes: 2 additions & 0 deletions tools/cloud_functions/gcs_event_based_ingest/.isort.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
[settings]
src_paths=backfill.py,gcs_ocn_bq_ingest,test
skip=terraform_module
force_single_line=True
single_line_exclusions=typing
15 changes: 13 additions & 2 deletions tools/cloud_functions/gcs_event_based_ingest/Dockerfile.ci
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
FROM python:3.8-slim
FROM python:3.8
RUN apt-get update \
&& apt-get install --no-install-recommends -y \
apt-transport-https \
ca-certificates \
curl \
sudo \
unzip \
&& apt-get autoremove -yqq --purge \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
COPY requirements.txt requirements-dev.txt ./
COPY scripts/install_terraform.sh ./
RUN ./install_terraform.sh
RUN pip3 install --no-cache-dir -r requirements-dev.txt
ENTRYPOINT ["pytest"]
ENTRYPOINT ["python3 -m pytest"]
202 changes: 202 additions & 0 deletions tools/cloud_functions/gcs_event_based_ingest/ORDERING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
# Ordering Batches
There are use cases where it is important for incremental batches get
applied in order rather than as soon as they are uploaded to GCS (which is the
default behavior of this solution).
1. When using External Query that performs DML other than insert only.
(e.g. an `UPDATE` assumes that prior batches have already been committed)
1. To ensure that there are not time gaps in the data (e.g. ensure that
2020/01/02 data is not committed to BigQuery before 2020/01/01, or similarly
that 00 hour is ingested before the 01 hour, etc.)

This Cloud Function supports serializing the submission of ingestion jobs to
BigQuery by using Google Cloud Storage's consistency guarantees to provide a
pessimistic lock on a table to prevent concurrent jobs and
[GCS Object.list](https://cloud.google.com/storage/docs/json_api/v1/objects/list)
lexicographic sorting of results to providing ordering gurantees.
The solution involves a table level `_backlog/` directory to keep track
of success files whose batches have not yet been committed to BigQuery and
a table level `_bqlock` file to keep track of what job is currently ingesting to
that table. This way we can make our Cloud Function idempotent by having all the
state stored in GCS so we can safely retrigger it to skirt the Cloud Functions
timeout.

## Assumptions
This ordering solution assumes that you want to apply batches in lexicographic
order. This is usually the case because path names usually contain some sort of
date / hour information.

## Enabling Ordering
### Environment Variable
Ordering can be enabled at the function level by setting the `ORDER_PER_TABLE`
environment variable to `"True"`.
### Config File
Ordering can be configured at any level of your naming convention (e.g. dataset
table or some sub-path) by placing a `_config/ORDERME` file. This can be helpful
in scenarios where your historical load can be processed safely in parallel but
incrementals must be ordered.
For example:
```text
gs://${BUCKET}/${DATASET}/${TABLE}/historical/_config/load.json
gs://${BUCKET}/${DATASET}/${TABLE}/incremental/_config/external.json
gs://${BUCKET}/${DATASET}/${TABLE}/incremental/_config/bq_transform.sql
gs://${BUCKET}/${DATASET}/${TABLE}/incremental/_config/ORDERME
```

## Dealing With Out-of-Order Publishing to GCS During Historical Load
In some use cases, there is a period where incrementals that must be applied in
order are uploaded in parallel (meaning their `_SUCCESS` files are expected to
be out of order). This typically happens during some historical backfill period.
This can be solved by setting the `START_BACKFILL_FILENAME` environment
variable to a file name that indicates that the parallel upload of historical
incrementals is complete (e.g. `_HISTORYDONE`). This will cause all success
files for a table to be added to the backlog until the `_HISTORYDONE` file is
dropped at the table level. At that point the backlog subscriber will begin
processing the batches in order.

## Batch Failure Behavior
When ordering is enabled, if the BQ job to apply a batch failed, it is not safe
to continue to ingest the next batch. The Cloud Function will leave the
`_bqlock` file and stop trying to process the backlog. The Cloud function
will report an exception like this which should be alerted on as the ingestion
process for the table will be deadlocked until there is human intervention to
address the failed batch:
```text
f"previous BigQuery job: {job_id} failed or could not "
"be found. This will kill the backfill subscriber for "
f"the table prefix {table_prefix}."
"Once the issue is dealt with by a human, the lock"
"file at: "
f"gs://{lock_blob.bucket.name}/{lock_blob.name} "
"should be manually removed and a new empty _BACKFILL"
"file uploaded to:"
f"gs://{lock_blob.bucket.name}/{table_prefix}/_BACKFILL"
f"to resume the backfill subscriber so it can "
"continue with the next item in the backlog.\n"
"Original Exception:\n"
f"{traceback.format_exc()}")
```
Note that once the `_bqlock` is removed and `_BACKFILL` is reposted, the Cloud
Function will proceed by applying the next batch in the `_backlog`. This means,
if you have applied the batch manually you should remove this object from the
`_backlog`. However, if you have patched the data on GCS for the failed batch
and would like the cloud function to apply it, then you leave this object in the
`_backlog`.

## Ordering Mechanics Explained
We've treated ordering incremental commits to table as a variation on the
[Producer-Consumer Problem](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem)
Where we have multiple producers (each call of Backlog Publisher) and a single
Consumer (the Backlog Subscriber which is enforced to be a singleton per table
with a claim file). Our solution is to use GCS `_backlog` directory as our queue
and `_bqlock` as a mutex. There is still a rare corner case of a race condition
that we handle as well.

### Backlog Publisher
The Backlog Publisher has two responsibilities:
1. add incoming success files to a
table's `_backlog` so they are not "forgotten" by the ingestion system.
1. if there is a non-empty backlog start the backfill subscriber (if one is not
already running). This is accomplished by uploading a table level `_BACKFILL`
file if it does not already exist.

### Backlog Subscriber
The Backlog Subscriber is responsible for keeping track of BigQuery jobs running
on a table and ensure that batches are committed in order. When the backlog is
not empty for a table the backlog subscriber should be running for that table
unless a job has failed.
It will either be polling a `RUNNING` BigQuery job for completion, or submitting
the next batch in the `_backlog`.

The state of what BigQuery job is currently running on a table is kept in a
`_bqlock` file at the table prefix.

In order to escape the maximum nine-minute (540s) Cloud Function Timeout, the
backfill subscriber will re-trigger itself by posting a new `_BACKFILL` file
until the `_backlog` for the table prefix is empty. When a new success file
arrives it is the responsibility of the publisher to restart the subscriber if
one is not already running.

### Example: Life of a Table
The following process explains the triggers (GCS files) and actions of the
Cloud Function for a single table prefix.

1. Source data uploaded to GCS prefix for the destination dataset / table, etc.
- `gs://ingestion-bucket/dataset/table/historical/2020/01/02/03/foo-data-00.csv`
- `gs://ingestion-bucket/dataset/table/historical/2020/01/02/03/foo-data-01.csv`
- `gs://ingestion-bucket/dataset/table/historical/2020/01/02/04/foo-data-00.csv`
- `gs://ingestion-bucket/dataset/table/incremental/2020/01/02/05/foo-data-01.csv`
1. Success file uploaded to GCS (to indicate this atomic batch is ready to be
applied).
- `gs://ingestion-bucket/dataset/table/historical/2020/01/02/03/_SUCCESS`
- `gs://ingestion-bucket/dataset/table/historical/2020/01/02/04/_SUCCESS`
- `gs://ingestion-bucket/dataset/table/incremental/2020/01/02/05/_SUCCESS`
1. Backlog Publisher adds a pointer to each success file in the backlog for the
table.
- `gs://ingestion-bucket/dataset/table/_backlog/historical/2020/01/02/03/_SUCCESS`
- `gs://ingestion-bucket/dataset/table/_backlog/historical/2020/01/02/04/_SUCCESS`
- `gs://ingestion-bucket/dataset/table/_backlog/incremental/2020/01/02/05/_SUCCESS`
1. If the `START_BACKFILL_FILENAME` is set and the file exists at the table prefix, After adding each item the backlog, the Backlog Publisher will start the
Backfill Subscriber if it is not already running (as indicated by a `_BACKFILL`
file). If the `START_BACKFILL_FILENAME` is not present the backlog subscriber
will not be started until this file is uploaded.
- `gs://ingestion-bucket/dataset/table/_BACKFILL`
1. The Backlog Subscriber will look at the backlog and apply the batches in
order (lexicographic). This process looks like this:
1. Claim this backfill file:
- `gs://ingestion-bucket/dataset/table/_claimed__BACKFILL_created_at_...`
1. Claim first batch in backlog (ensure no duplicate processing):
- `gs://ingestion-bucket/dataset/table/historical/2020/01/02/03/_claimed__SUCCESS_created_at_...`
1. Submit the BigQuery Job for this batch (load job or external query based on the `_config/*` files)
- Ingest the data at the `gs://ingestion-bucket/dataset/table/historical/2020/01/02/03/*` prefix
- Store the job ID in `gs://ingestion-bucket/dataset/table/_bqlock`
1. Wait for this Job to complete successfully and remove this item from the backlog.
- If job is `DONE` with errors:
- Raise exception (do not continue to process any more batches)
- If job is `DONE` without errors remove the pointer from the backlog:
- DELETE `gs://ingestion-bucket/dataset/table/_backlog/historical/2020/01/02/03/_SUCCESS`
1. Repeat from Backlog Subscriber step 2
- Where the first item in the backlog is now
- `gs://ingestion-bucket/dataset/table/_backlog/historical/2020/01/02/04/_SUCCESS`
- And on the next loop:
- `gs://ingestion-bucket/dataset/table/_backlog/incremental/2020/01/02/05/_SUCCESS`
1. Backlog Subscriber sees the `_backlog/` is empty for the table. In other words
The BigQuery table is caught up with the data on GCS.
- DELETE `gs://ingestion-bucket/dataset/table/_BACKFILL` and exit
1. The next day a new incremental arrives
- `gs://ingestion-bucket/dataset/table/_backlog/incremental/2020/01/02/05/_SUCCESS`
1. The Backlog Publisher adds this item to the backlog and wakes up the
Backfill Subscriber by posting a new `_BACKFILL` file.
- `gs://ingestion-bucket/dataset/table/_backlog/incremental/2020/01/02/05/_SUCCESS`
- `gs://ingestion-bucket/dataset/table/_BACKFILL`
1. Backlog Subscriber will handle the backlog of just one item
(See Backlog Subscriber step #5 and #6 above)


### Note on Handling Race Condition
We use `subscribe_monitor` to handle a rare race condition where:

1. subscriber reads an empty backlog (before it can delete the
_BACKFILL blob...)
2. a new item is added to the backlog (causing a separate
function invocation)
3. In this new invocation we reach this point in the code path
and start_subscriber_if_not_running sees the old _BACKFILL
and does not create a new one.
4. The subscriber deletes the _BACKFILL blob and exits without
processing the new item on the backlog from #2.

We handle this by the following:

1. When success file added to the backlog starts this monitoring
to wait 10 seconds before checking that the backfill file exists. To catch if
the backfill file disappears when it should not. This might trigger an extra
loop of the backfill subscriber but this loop will not take any action and this
wasted compute is far better than dropping a batch of data.
1. On the subscriber side we check if there was more time
than 10 seconds between list backlog items and delete backfill calls. If so the
subscriber double checks that the backlog is still empty. This way we always
handle this race condition either in this monitor or in the subscriber itself.


### Visualization of Ordering Triggers in the Cloud Function
![architecture](img/ordering.png)
Loading