Skip to content

Commit

Permalink
Components - Added more GCP BigQuery components (3914)
Browse files Browse the repository at this point in the history
* Update _client.py

* updated the gcp components

* Update the GCP BigQuery Components

* update the readme and component

* updated components
  • Loading branch information
Niklas Hansson committed Jun 26, 2020
1 parent 1bbd82c commit c52a73e
Show file tree
Hide file tree
Showing 8 changed files with 537 additions and 26 deletions.
188 changes: 188 additions & 0 deletions components/gcp/bigquery/query/to_CSV/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Name

Gather data by querying BigQuery and save it in a CSV file.


# Labels

GCP, BigQuery, Kubeflow, Pipeline


# Summary

A Kubeflow Pipeline component to submit a query to BigQuery and store the result in a csv file avialble for other components to utalize.


# Details


## Intended use

Use this Kubeflow component to:
* Select training data by submitting a query to BigQuery.
* Output the training data into a CSV files.


## Runtime arguments:

## Runtime arguments:


| Argument | Description | Optional | Data type | Accepted values | Default |
|----------|-------------|----------|-----------|-----------------|---------|
| query | The query used by BigQuery to fetch the results. | No | String | | |
| project_id | The project ID of the Google Cloud Platform (GCP) project to use to execute the query. | No | GCPProjectID | | |
| output_filename | The file name of the output file. | Yes | String | | bq_results.csv |
| job_config | The full configuration specification for the query job. See [QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig) for details. | Yes | Dict | A JSONobject which has the same structure as [QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig) | None |
## Input data schema

The input data is a BigQuery job containing a query that pulls data from various sources.


## Output:

Name | Description | Type
:--- | :---------- | :---
output_path | The path to the file containing the query output in CSV format. | OutputPath


## Cautions & requirements

To use the component, the following requirements must be met:

* The BigQuery API is enabled.
* The component can authenticate to GCP. Refer to [Authenticating Pipelines to GCP](https://www.kubeflow.org/docs/gke/authentication-pipelines/) for details.
* The Kubeflow user service account is a member of the `roles/bigquery.admin` role of the project.
* The Kubeflow user service account is a member of the `roles/storage.objectCreator `role of the Cloud Storage output bucket.

## Detailed description
This Kubeflow Pipeline component is used to:
* Submit a query to BigQuery.
* The query results are extracted and stored as a csv file locally avilable for other kubeflow components.

Use the code below as an example of how to run your BigQuery job.

## Sample

Note: The following sample code works in an IPython notebook or directly in Python code.

#### Set sample parameters


```python
%%capture --no-stderr

KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar.gz'
!pip3 install $KFP_PACKAGE --upgrade
```

2. Load the component using KFP SDK


```python
import kfp.components as comp

bigquery_query_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/bigquery/query/to?gcs/component.yaml')
help(bigquery_query_op)
```

### Query

In this sample, we send a query to get the top questions from stackdriver public data and output the data to CSV file which other components can access. Here is the query:


```python
QUERY = 'SELECT * FROM `bigquery-public-data.stackoverflow.posts_questions` LIMIT 10'
```

#### Set sample parameters


```python
# Required Parameters
PROJECT_ID = '<Please put your project ID here>'
```


```python
# Optional Parameters
FILE_NAME = 'test.csv'
```

#### Run the component as a single pipeline


```python
import kfp.dsl as dsl
import json
@dsl.pipeline(
name='Bigquery query pipeline',
description='Bigquery query pipeline'
)
def pipeline(
query=QUERY,
project_id = PROJECT_ID,
output_filename=FILE_NAME
job_config=''
):
bigquery_query_op(
query=query,
project_id=project_id,
job_config=job_config)
```

#### Compile the pipeline


```python
pipeline_func = pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
```

#### Submit the pipeline for execution


```python
#Specify pipeline argument values
arguments = {}

#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
```

#### Use the output in a pipeline

Small example on how to use the output form the component, here `read_csv` any component of interest that can consume a csv file.

```python
def pipeline(
query=QUERY,
project_id = PROJECT_ID,
job_config=''
):
bq_out = bigquery_query(
query=query,
project_id=project_id,
output_filename=FILE_NAME,
job_config=job_config)
read_csv(input_path=bq_out.outputs["table"] + "/" + FILE_NAME)
```



## References
* [Component python code](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/component_sdk/python/kfp_component/google/bigquery/_query.py)
* [Component docker file](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/Dockerfile)
* [BigQuery query REST API](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query)

## License
By deploying or using this software you agree to comply with the [AI Hub Terms of Service](https://aihub.cloud.google.com/u/0/aihub-tos) and the [Google APIs Terms of Service](https://developers.google.com/terms/). To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.
61 changes: 61 additions & 0 deletions components/gcp/bigquery/query/to_CSV/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Export to file for next processing step in pipeline

# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Bigquery - Query
description: |
A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery and
store the results to a csv file.
metadata:
labels:
add-pod-env: 'true'
inputs:
- name: query
description: 'The query used by Bigquery service to fetch the results.'
type: String
- name: project_id
description: 'The project to execute the query job.'
type: GCPProjectID
- name: job_config
description: >-
The full config spec for the query job.See
[QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig)
for details.
default: ''
type: Dict
- name: output_filename
description: 'The output file name'
default: 'bq_results.csv'
type: String
outputs:
- name: MLPipeline UI metadata
type: UI metadata
- name: table
description: 'The path to the result from BigQuery'
type: CSV
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-gcp
args: [
--ui_metadata_path, {outputPath: MLPipeline UI metadata},
kfp_component.google.bigquery, query,
--query, {inputValue: query},
--project_id, {inputValue: project_id},
--output_path, {outputPath: table},
--output_filename, {inputValue: output_filename},
--job_config, {inputValue: job_config},
]
env:
KFP_POD_NAME: "{{pod.name}}"
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

# Name

Gather training data by querying BigQuery
Gather data by querying BigQuery and save it to GCS.


# Labels
Expand All @@ -11,7 +11,7 @@ GCP, BigQuery, Kubeflow, Pipeline

# Summary

A Kubeflow Pipeline component to submit a query to BigQuery and store the result in a Cloud Storage bucket.
A Kubeflow Pipeline component to submit a query to BigQuery and store the result in table on BigQuery.


# Details
Expand All @@ -20,8 +20,8 @@ A Kubeflow Pipeline component to submit a query to BigQuery and store the result
## Intended use

Use this Kubeflow component to:
* Select training data by submitting a query to BigQuery.
* Output the training data into a Cloud Storage bucket as CSV files.
* Select data by submitting a query to BigQuery.
* Output the data into a table on BigQuery.


## Runtime arguments:
Expand All @@ -33,12 +33,12 @@ Use this Kubeflow component to:
| project_id | The project ID of the Google Cloud Platform (GCP) project to use to execute the query. | No | GCPProjectID | | |
| dataset_id | The ID of the persistent BigQuery dataset to store the results of the query. If the dataset does not exist, the operation will create a new one. | Yes | String | | None |
| table_id | The ID of the BigQuery table to store the results of the query. If the table ID is absent, the operation will generate a random ID for the table. | Yes | String | | None |
| output_gcs_path | The path to the Cloud Storage bucket to store the query output. | Yes | GCSPath | | None |
| dataset_location | The location where the dataset is created. Defaults to US. | Yes | String | | US |
| job_config | The full configuration specification for the query job. See [QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig) for details. | Yes | Dict | A JSONobject which has the same structure as [QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig) | None |

## Input data schema

The input data is a BigQuery job containing a query that pulls data f rom various sources.
The input data is a BigQuery job containing a query that pulls data from various sources.


## Output:
Expand All @@ -47,6 +47,7 @@ Name | Description | Type
:--- | :---------- | :---
output_gcs_path | The path to the Cloud Storage bucket containing the query output in CSV format. | GCSPath


## Cautions & requirements

To use the component, the following requirements must be met:
Expand All @@ -60,7 +61,7 @@ To use the component, the following requirements must be met:
This Kubeflow Pipeline component is used to:
* Submit a query to BigQuery.
* The query results are persisted in a dataset table in BigQuery.
* An extract job is created in BigQuery to extract the data from the dataset table and output it to a Cloud Storage bucket as CSV files.
* The data is extracted localy and stored as a csv file.

Use the code below as an example of how to run your BigQuery job.

Expand All @@ -85,13 +86,11 @@ KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar
import kfp.components as comp

bigquery_query_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/bigquery/query/component.yaml')
'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/bigquery/query/to?gcs/component.yaml')
help(bigquery_query_op)
```

### Sample

Note: The following sample code works in IPython notebook or directly in Python code.
### Query

In this sample, we send a query to get the top questions from stackdriver public data and output the data to a Cloud Storage bucket. Here is the query:

Expand All @@ -112,7 +111,7 @@ GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash

```python
# Optional Parameters
EXPERIMENT_NAME = 'Bigquery -Query'
EXPERIMENT_NAME = 'Bigquery-Query'
OUTPUT_PATH = '{}/bigquery/query/questions.csv'.format(GCS_WORKING_DIR)
```

Expand Down Expand Up @@ -186,4 +185,4 @@ run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arg
* [BigQuery query REST API](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query)

## License
By deploying or using this software you agree to comply with the [AI Hub Terms of Service](https://aihub.cloud.google.com/u/0/aihub-tos) and the [Google APIs Terms of Service](https://developers.google.com/terms/). To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.
By deploying or using this software you agree to comply with the [AI Hub Terms of Service](https://aihub.cloud.google.com/u/0/aihub-tos) and the [Google APIs Terms of Service](https://developers.google.com/terms/). To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Copyright 2018 Google LLC
# Export to bucket in gcs

# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,7 +17,7 @@
name: Bigquery - Query
description: |
A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery
service and dump outputs to a Google Cloud Storage blob.
service and dump outputs to a Google Cloud Storage blob.
metadata:
labels:
add-pod-env: 'true'
Expand Down Expand Up @@ -51,6 +53,10 @@ inputs:
for details.
default: ''
type: Dict
- name: output_kfp_path
description: 'The path to where the file should be stored.'
default: ''
type: String
outputs:
- name: output_gcs_path
description: 'The path to the Cloud Storage bucket containing the query output in CSV format.'
Expand All @@ -59,7 +65,7 @@ outputs:
type: UI metadata
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-gcp:ad9bd5648dd0453005225779f25d8cebebc7ca00
image: gcr.io/ml-pipeline/ml-pipeline-gcp
args: [
--ui_metadata_path, {outputPath: MLPipeline UI metadata},
kfp_component.google.bigquery, query,
Expand Down
Loading

0 comments on commit c52a73e

Please sign in to comment.