Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dataflow and mlengine samples with recent changes #1006

Merged
merged 1 commit into from
Mar 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 35 additions & 34 deletions components/gcp/dataflow/launch_python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@
A Kubeflow Pipeline component to submit a Apache Beam job authored in python, to Google Cloud Dataflow for execution. The python beam code runs with Google Cloud Dataflow runner.

## Run-Time Parameters:
Name | Description
:--- | :----------
python_file_path | The gcs or local path to the python file to run.
project_id | The ID of the parent project.
requirements_file_path | Optional, the gcs or local path to the pip requirements file.
location | Optional. The regional endpoint to which to direct the request.
job_name_prefix | Optional. The prefix of the genrated job name. If not provided, the method will generated a random name.
args | The list of args to pass to the python file.
wait_interval | Optional wait interval between calls to get job status. Defaults to 30.
Name | Description | Type | Default
:--- | :---------- | :--- | :------
python_file_path | The gcs or local path to the python file to run. | String |
project_id | The ID of the parent project. | GCPProjectID |
staging_dir | Optional. The GCS directory for keeping staging files. A random subdirectory will be created under the directory to keep job info for resuming the job in case of failure and it will be passed as `staging_location` and `temp_location` command line args of the beam code. | GCSPath | ``
requirements_file_path | Optional, the gcs or local path to the pip requirements file. | GCSPath | ``
args | The list of args to pass to the python file. | List | `[]`
wait_interval | Optional wait interval between calls to get job status. Defaults to 30. | Integer | `30`

## Output:
Name | Description
:--- | :----------
job_id | The id of the created dataflow job.
Name | Description | Type
:--- | :---------- | :---
job_id | The id of the created dataflow job. | String

## Sample

Expand All @@ -30,19 +29,19 @@ Note: the sample code below works in both IPython notebook or python code direct
```python
# Required Parameters
PROJECT_ID = '<Please put your project ID here>'
GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash
GCS_STAGING_DIR = 'gs://<Please put your GCS path here>' # No ending slash

# Optional Parameters
EXPERIMENT_NAME = 'Dataflow - Launch Python'
COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/gcp/dataflow/launch_python/component.yaml'
COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/07a0dd32c7d12066b2526210b0ae5a9ed9d9a90c/components/gcp/dataflow/launch_python/component.yaml'
```

### Install KFP SDK
Install the SDK (Uncomment the code if the SDK is not installed before)


```python
# Install the SDK (Uncomment the code if the SDK is not installed before)
# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.11/kfp.tar.gz'
# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'
# !pip3 install $KFP_PACKAGE --upgrade
```

Expand All @@ -68,16 +67,22 @@ import json
description='Dataflow launch python pipeline'
)
def pipeline(
python_file_path,
project_id,
requirements_file_path = '',
location = '',
job_name_prefix = '',
args = '',
python_file_path = 'gs://ml-pipeline-playground/samples/dataflow/wc/wc.py',
project_id = PROJECT_ID,
staging_dir = GCS_STAGING_DIR,
requirements_file_path = 'gs://ml-pipeline-playground/samples/dataflow/wc/requirements.txt',
args = json.dumps([
'--output', '{}/wc/wordcount.out'.format(GCS_STAGING_DIR)
]),
wait_interval = 30
):
dataflow_python_op(python_file_path, project_id, requirements_file_path, location, job_name_prefix, args,
wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))
dataflow_python_op(
python_file_path = python_file_path,
project_id = project_id,
staging_dir = staging_dir,
requirements_file_path = requirements_file_path,
args = args,
wait_interval = wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))
```

### Compile the pipeline
Expand All @@ -95,16 +100,7 @@ compiler.Compiler().compile(pipeline_func, pipeline_filename)

```python
#Specify pipeline argument values
arguments = {
'python_file_path': 'gs://ml-pipeline-playground/samples/dataflow/wc/wc.py',
'project_id': PROJECT_ID,
'requirements_file_path': 'gs://ml-pipeline-playground/samples/dataflow/wc/requirements.txt',
'args': json.dumps([
'--output', '{}/wc/wordcount.out'.format(GCS_WORKING_DIR),
'--temp_location', '{}/dataflow/wc/tmp'.format(GCS_WORKING_DIR),
'--staging_location', '{}/dataflow/wc/staging'.format(GCS_WORKING_DIR)
])
}
arguments = {}

#Get or create an experiment and submit a pipeline run
import kfp
Expand All @@ -115,3 +111,8 @@ experiment = client.create_experiment(EXPERIMENT_NAME)
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
```


```python

```
73 changes: 38 additions & 35 deletions components/gcp/dataflow/launch_python/sample.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,19 @@
"A Kubeflow Pipeline component to submit a Apache Beam job authored in python, to Google Cloud Dataflow for execution. The python beam code runs with Google Cloud Dataflow runner.\n",
"\n",
"## Run-Time Parameters:\n",
"Name | Description\n",
":--- | :----------\n",
"python_file_path | The gcs or local path to the python file to run.\n",
"project_id | The ID of the parent project.\n",
"requirements_file_path | Optional, the gcs or local path to the pip requirements file.\n",
"location | Optional. The regional endpoint to which to direct the request.\n",
"job_name_prefix | Optional. The prefix of the genrated job name. If not provided, the method will generated a random name.\n",
"args | The list of args to pass to the python file.\n",
"wait_interval | Optional wait interval between calls to get job status. Defaults to 30.\n",
"Name | Description | Type | Default\n",
":--- | :---------- | :--- | :------\n",
"python_file_path | The gcs or local path to the python file to run. | String |\n",
"project_id | The ID of the parent project. | GCPProjectID |\n",
"staging_dir | Optional. The GCS directory for keeping staging files. A random subdirectory will be created under the directory to keep job info for resuming the job in case of failure and it will be passed as `staging_location` and `temp_location` command line args of the beam code. | GCSPath | ``\n",
"requirements_file_path | Optional, the gcs or local path to the pip requirements file. | GCSPath | ``\n",
"args | The list of args to pass to the python file. | List | `[]`\n",
"wait_interval | Optional wait interval between calls to get job status. Defaults to 30. | Integer | `30`\n",
"\n",
"## Output:\n",
"Name | Description\n",
":--- | :----------\n",
"job_id | The id of the created dataflow job."
"Name | Description | Type\n",
":--- | :---------- | :---\n",
"job_id | The id of the created dataflow job. | String"
]
},
{
Expand Down Expand Up @@ -54,18 +53,19 @@
"source": [
"# Required Parameters\n",
"PROJECT_ID = '<Please put your project ID here>'\n",
"GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash\n",
"GCS_STAGING_DIR = 'gs://<Please put your GCS path here>' # No ending slash\n",
"\n",
"# Optional Parameters\n",
"EXPERIMENT_NAME = 'Dataflow - Launch Python'\n",
"COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/gcp/dataflow/launch_python/component.yaml'"
"COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/07a0dd32c7d12066b2526210b0ae5a9ed9d9a90c/components/gcp/dataflow/launch_python/component.yaml'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Install KFP SDK"
"### Install KFP SDK\n",
"Install the SDK (Uncomment the code if the SDK is not installed before)"
]
},
{
Expand All @@ -76,8 +76,7 @@
},
"outputs": [],
"source": [
"# Install the SDK (Uncomment the code if the SDK is not installed before)\n",
"# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.11/kfp.tar.gz'\n",
"# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'\n",
"# !pip3 install $KFP_PACKAGE --upgrade"
]
},
Expand Down Expand Up @@ -121,16 +120,22 @@
" description='Dataflow launch python pipeline'\n",
")\n",
"def pipeline(\n",
" python_file_path,\n",
" project_id,\n",
" requirements_file_path = '',\n",
" location = '',\n",
" job_name_prefix = '',\n",
" args = '',\n",
" python_file_path = 'gs://ml-pipeline-playground/samples/dataflow/wc/wc.py',\n",
" project_id = PROJECT_ID,\n",
" staging_dir = GCS_STAGING_DIR,\n",
" requirements_file_path = 'gs://ml-pipeline-playground/samples/dataflow/wc/requirements.txt',\n",
" args = json.dumps([\n",
" '--output', '{}/wc/wordcount.out'.format(GCS_STAGING_DIR)\n",
" ]),\n",
" wait_interval = 30\n",
"):\n",
" dataflow_python_op(python_file_path, project_id, requirements_file_path, location, job_name_prefix, args,\n",
" wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))"
" dataflow_python_op(\n",
" python_file_path = python_file_path, \n",
" project_id = project_id, \n",
" staging_dir = staging_dir, \n",
" requirements_file_path = requirements_file_path, \n",
" args = args,\n",
" wait_interval = wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))"
]
},
{
Expand Down Expand Up @@ -166,16 +171,7 @@
"outputs": [],
"source": [
"#Specify pipeline argument values\n",
"arguments = {\n",
" 'python_file_path': 'gs://ml-pipeline-playground/samples/dataflow/wc/wc.py',\n",
" 'project_id': PROJECT_ID,\n",
" 'requirements_file_path': 'gs://ml-pipeline-playground/samples/dataflow/wc/requirements.txt',\n",
" 'args': json.dumps([\n",
" '--output', '{}/wc/wordcount.out'.format(GCS_WORKING_DIR),\n",
" '--temp_location', '{}/dataflow/wc/tmp'.format(GCS_WORKING_DIR),\n",
" '--staging_location', '{}/dataflow/wc/staging'.format(GCS_WORKING_DIR)\n",
" ])\n",
"}\n",
"arguments = {}\n",
"\n",
"#Get or create an experiment and submit a pipeline run\n",
"import kfp\n",
Expand All @@ -186,6 +182,13 @@
"run_name = pipeline_func.__name__ + ' run'\n",
"run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
74 changes: 40 additions & 34 deletions components/gcp/dataflow/launch_template/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@
A Kubeflow Pipeline component to submit a job from a dataflow template to Google Cloud Dataflow service.

## Runtime Parameters:
Name | Description
:--- | :----------
project_id | Required. The ID of the Cloud Platform project that the job belongs to.
gcs_path | Required. A Cloud Storage path to the template from which to create the job. Must be valid Cloud Storage URL, beginning with 'gs://'.
launch_parameters | Parameters to provide to the template being launched. Schema defined in https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters. `jobName` will be replaced by generated name.
location | Optional. The regional endpoint to which to direct the request.
job_name_prefix | Optional. The prefix of the genrated job name. If not provided, the method will generated a random name.
validate_only | If true, the request is validated but not actually executed. Defaults to false.
wait_interval | Optional wait interval between calls to get job status. Defaults to 30.
Name | Description | Type | Default
:--- | :---------- | :--- | :------
project_id | Required. The ID of the Cloud Platform project that the job belongs to. | GCPProjectID |
gcs_path | Required. A Cloud Storage path to the template from which to create the job. Must be valid Cloud Storage URL, beginning with 'gs://'. | GCSPath |
launch_parameters | Parameters to provide to the template being launched. Schema defined in https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters. `jobName` will be replaced by generated name. | Dict | `{}`
location | Optional. The regional endpoint to which to direct the request. | GCPRegion | ``
validate_only | If true, the request is validated but not actually executed. Defaults to false. | Bool | `False`
staging_dir | Optional. The GCS directory for keeping staging files. A random subdirectory will be created under the directory to keep job info for resuming the job in case of failure. | GCSPath | ``
wait_interval | Optional wait interval between calls to get job status. Defaults to 30. | Integer | `30`

## Output:
Name | Description
:--- | :----------
job_id | The id of the created dataflow job.
Name | Description | Type
:--- | :---------- | :---
job_id | The id of the created dataflow job. | String

## Sample

Expand All @@ -39,11 +39,11 @@ COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/maste
```

### Install KFP SDK
Install the SDK (Uncomment the code if the SDK is not installed before)


```python
# Install the SDK (Uncomment the code if the SDK is not installed before)
# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.11/kfp.tar.gz'
# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'
# !pip3 install $KFP_PACKAGE --upgrade
```

Expand All @@ -69,16 +69,26 @@ import json
description='Dataflow launch template pipeline'
)
def pipeline(
project_id,
gcs_path,
launch_parameters,
location='',
job_name_prefix='',
validate_only='',
wait_interval = 30
):
dataflow_template_op(project_id, gcs_path, launch_parameters, location, job_name_prefix, validate_only,
wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))
project_id = PROJECT_ID,
gcs_path = 'gs://dataflow-templates/latest/Word_Count',
launch_parameters = json.dumps({
'parameters': {
'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
'output': '{}/output'.format(GCS_WORKING_DIR)
}
}),
location = '',
validate_only = 'False',
staging_dir = GCS_WORKING_DIR,
wait_interval = 30):
dataflow_template_op(
project_id = project_id,
gcs_path = gcs_path,
launch_parameters = launch_parameters,
location = location,
validate_only = validate_only,
staging_dir = staging_dir,
wait_interval = wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))
```

### Compile the pipeline
Expand All @@ -96,16 +106,7 @@ compiler.Compiler().compile(pipeline_func, pipeline_filename)

```python
#Specify pipeline argument values
arguments = {
'project_id': PROJECT_ID,
'gcs_path': 'gs://dataflow-templates/latest/Word_Count',
'launch_parameters': json.dumps({
'parameters': {
'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
'output': '{}/dataflow/launch-template/'.format(GCS_WORKING_DIR)
}
})
}
arguments = {}

#Get or create an experiment and submit a pipeline run
import kfp
Expand All @@ -116,3 +117,8 @@ experiment = client.create_experiment(EXPERIMENT_NAME)
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
```


```python

```
Loading