Skip to content

Commit

Permalink
fix(components): Update GCP component container to Python 3.7. Fixes k…
Browse files Browse the repository at this point in the history
…ubeflow#4959  (kubeflow#4960)

* Update GCP component container to Python 3.7, fix Dataflow client

* fix test

* add back the http patch
  • Loading branch information
chensun authored Jan 7, 2021
1 parent 7540ba5 commit 8463992
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 40 deletions.
10 changes: 6 additions & 4 deletions components/gcp/container/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018 The Kubeflow Authors
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,19 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM python:2.7-slim-jessie
FROM python:3.7-slim

RUN apt-get update && apt-get install -y --no-install-recommends \
wget patch \
&& rm -rf /var/lib/apt/lists/*

RUN pip2 install apache-beam[gcp]==2.10.0
RUN pip install apache-beam[gcp]
RUN pip install pandas

ADD build /ml
WORKDIR /ml
RUN pip install .

RUN patch /usr/local/lib/python2.7/site-packages/googleapiclient/http.py < /ml/patches/http.patch
# The patch sets User Agent for telemetry purpose.
# It is based on "google-api-python-client==1.7.8", and needs to be updated when upgrading the package.
RUN patch /usr/local/lib/python3.7/site-packages/googleapiclient/http.py < /ml/patches/http.patch

ENTRYPOINT ["python", "-u", "-m", "kfp_component.launcher"]
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018 Google LLC
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,9 @@

class DataflowClient:
def __init__(self):
self._df = discovery.build('dataflow', 'v1b3')
self._df = discovery.build('dataflow', 'v1b3', cache_discovery=False)

def launch_template(self, project_id, gcs_path, location,
def launch_template(self, project_id, gcs_path, location,
validate_only, launch_parameters):
return self._df.projects().locations().templates().launch(
projectId = project_id,
Expand Down Expand Up @@ -47,7 +47,7 @@ def cancel_job(self, project_id, job_id, location):
}
).execute()

def list_aggregated_jobs(self, project_id, filter=None,
def list_aggregated_jobs(self, project_id, filter=None,
view=None, page_size=None, page_token=None, location=None):
return self._df.projects().jobs().aggregated(
projectId = project_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018 Google LLC
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,12 +19,12 @@
from google.cloud import storage
from kfp_component.core import KfpExecutionContext
from ._client import DataflowClient
from ._common_ops import (wait_and_dump_job, stage_file, get_staging_location,
from ._common_ops import (wait_and_dump_job, stage_file, get_staging_location,
read_job_id_and_location, upload_job_id_and_location)
from ._process import Process
from ..storage import parse_blob_path

def launch_python(python_file_path, project_id, staging_dir=None, requirements_file_path=None,
def launch_python(python_file_path, project_id, region, staging_dir=None, requirements_file_path=None,
args=[], wait_interval=30,
job_id_output_path='/tmp/kfp/output/dataflow/job_id.txt',
job_object_output_path='/tmp/kfp/output/dataflow/job.json',
Expand All @@ -33,12 +33,13 @@ def launch_python(python_file_path, project_id, staging_dir=None, requirements_f
Args:
python_file_path (str): The gcs or local path to the python file to run.
project_id (str): The ID of the parent project.
staging_dir (str): Optional. The GCS directory for keeping staging files.
project_id (str): The ID of the GCP project to run the Dataflow job.
region (str): The GCP region to run the Dataflow job.
staging_dir (str): Optional. The GCS directory for keeping staging files.
A random subdirectory will be created under the directory to keep job info
for resuming the job in case of failure and it will be passed as
for resuming the job in case of failure and it will be passed as
`staging_location` and `temp_location` command line args of the beam code.
requirements_file_path (str): Optional, the gcs or local path to the pip
requirements_file_path (str): Optional, the gcs or local path to the pip
requirements file.
args (list): The list of args to pass to the python file.
wait_interval (int): The wait seconds between polling.
Expand Down Expand Up @@ -70,7 +71,7 @@ def cancel():

_install_requirements(requirements_file_path)
python_file_path = stage_file(python_file_path)
cmd = _prepare_cmd(project_id, python_file_path, args, staging_location)
cmd = _prepare_cmd(project_id, region, python_file_path, args, staging_location)
sub_process = Process(cmd)
for line in sub_process.read_lines():
job_id, location = _extract_job_id_and_location(line)
Expand All @@ -83,35 +84,36 @@ def cancel():
logging.warning('No dataflow job was found when '
'running the python file.')
return None
job = df_client.get_job(project_id, job_id,
job = df_client.get_job(project_id, job_id,
location=location)
return wait_and_dump_job(df_client, project_id, location, job,
wait_interval,
job_id_output_path=job_id_output_path,
job_object_output_path=job_object_output_path,
)

def _prepare_cmd(project_id, python_file_path, args, staging_location):
def _prepare_cmd(project_id, region, python_file_path, args, staging_location):
dataflow_args = [
'--runner', 'dataflow',
'--project', project_id]
'--runner', 'DataflowRunner',
'--project', project_id,
'--region', region]
if staging_location:
dataflow_args += ['--staging_location', staging_location, '--temp_location', staging_location]
return (['python', '-u', python_file_path] +
return (['python', '-u', python_file_path] +
dataflow_args + args)

def _extract_job_id_and_location(line):
"""Returns (job_id, location) from matched log.
"""
job_id_pattern = re.compile(
br'.*console.cloud.google.com/dataflow.*/locations/([a-z|0-9|A-Z|\-|\_]+)/jobs/([a-z|0-9|A-Z|\-|\_]+).*')
br'.*console.cloud.google.com/dataflow/jobs/(?P<location>[a-z|0-9|A-Z|\-|\_]+)/(?P<job_id>[a-z|0-9|A-Z|\-|\_]+).*')
matched_job_id = job_id_pattern.search(line or '')
if matched_job_id:
return (matched_job_id.group(2).decode(), matched_job_id.group(1).decode())
return (matched_job_id.group('job_id').decode(), matched_job_id.group('location').decode())
return (None, None)

def _install_requirements(requirements_file_path):
if not requirements_file_path:
return
requirements_file_path = stage_file(requirements_file_path)
subprocess.check_call(['pip', 'install', '-r', requirements_file_path])
subprocess.check_call(['pip', 'install', '-r', requirements_file_path])
2 changes: 0 additions & 2 deletions components/gcp/container/component_sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018 Google LLC
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,27 +29,27 @@
@mock.patch(MODULE + '.subprocess')
class LaunchPythonTest(unittest.TestCase):

def test_launch_python_succeed(self, mock_subprocess, mock_process,
def test_launch_python_succeed(self, mock_subprocess, mock_process,
mock_client, mock_context, mock_stage_file, mock_display, mock_storage):
mock_context().__enter__().context_id.return_value = 'ctx-1'
mock_storage.Client().bucket().blob().exists.return_value = False
mock_process().read_lines.return_value = [
b'https://console.cloud.google.com/dataflow/locations/us-central1/jobs/job-1?project=project-1'
b'https://console.cloud.google.com/dataflow/jobs/us-central1/job-1?project=project-1'
]
expected_job = {
'id': 'job-1',
'currentState': 'JOB_STATE_DONE'
}
mock_client().get_job.return_value = expected_job

result = launch_python('/tmp/test.py', 'project-1', staging_dir='gs://staging/dir')
result = launch_python('/tmp/test.py', 'project-1', 'us-central1', staging_dir='gs://staging/dir')

self.assertEqual(expected_job, result)
mock_storage.Client().bucket().blob().upload_from_string.assert_called_with(
'job-1,us-central1'
)

def test_launch_python_retry_succeed(self, mock_subprocess, mock_process,
def test_launch_python_retry_succeed(self, mock_subprocess, mock_process,
mock_client, mock_context, mock_stage_file, mock_display, mock_storage):
mock_context().__enter__().context_id.return_value = 'ctx-1'
mock_storage.Client().bucket().blob().exists.return_value = True
Expand All @@ -60,20 +60,19 @@ def test_launch_python_retry_succeed(self, mock_subprocess, mock_process,
}
mock_client().get_job.return_value = expected_job

result = launch_python('/tmp/test.py', 'project-1', staging_dir='gs://staging/dir')
result = launch_python('/tmp/test.py', 'project-1', 'us-central1', staging_dir='gs://staging/dir')

self.assertEqual(expected_job, result)
mock_process.assert_not_called()

def test_launch_python_no_job_created(self, mock_subprocess, mock_process,
def test_launch_python_no_job_created(self, mock_subprocess, mock_process,
mock_client, mock_context, mock_stage_file, mock_display, mock_storage):
mock_context().__enter__().context_id.return_value = 'ctx-1'
mock_process().read_lines.return_value = [
b'no job id',
b'no job id'
]

result = launch_python('/tmp/test.py', 'project-1')
result = launch_python('/tmp/test.py', 'project-1', 'us-central1')

self.assertEqual(None, result)

2 changes: 1 addition & 1 deletion components/gcp/container/component_sdk/python/tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = py27,py35,py36,py37,py38
envlist = py35,py36,py37,py38
skip_missing_interpreters = true

[testenv]
Expand Down
8 changes: 6 additions & 2 deletions components/gcp/dataflow/launch_python/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ inputs:
description: 'The gcs or local path to the python file to run.'
type: String
- name: project_id
description: 'The ID of the parent project.'
type: GCPProjectID
description: 'The ID of the GCP project to run the Dataflow job.'
type: String
- name: region
description: 'The GCP region to run the Dataflow job.'
type: String
- name: staging_dir
description: >-
Optional. The GCS directory for keeping staging files.
Expand Down Expand Up @@ -59,6 +62,7 @@ implementation:
kfp_component.google.dataflow, launch_python,
--python_file_path, {inputValue: python_file_path},
--project_id, {inputValue: project_id},
--region, {inputValue: region},
--staging_dir, {inputValue: staging_dir},
--requirements_file_path, {inputValue: requirements_file_path},
--args, {inputValue: args},
Expand Down
8 changes: 6 additions & 2 deletions components/gcp/dataflow/launch_python/sample.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"Name | Description | Optional | Data type| Accepted values | Default |\n",
":--- | :----------| :----------| :----------| :----------| :---------- |\n",
"python_file_path | The path to the Cloud Storage bucket or local directory containing the Python file to be run. | | GCSPath | | |\n",
"project_id | The ID of the Google Cloud Platform (GCP) project containing the Cloud Dataflow job.| | GCPProjectID | | |\n",
"project_id | The ID of the Google Cloud Platform (GCP) project to run the Cloud Dataflow job.| | String | | |\n",
"region | The Google Cloud Platform (GCP) region to run the Cloud Dataflow job.| | String | | |\n",
"staging_dir | The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the job information.This is done so that you can resume the job in case of failure. `staging_dir` is passed as the command line arguments (`staging_location` and `temp_location`) of the Beam code. | Yes | GCSPath | | None |\n",
"requirements_file_path | The path to the Cloud Storage bucket or local directory containing the pip requirements file. | Yes | GCSPath | | None |\n",
"args | The list of arguments to pass to the Python file. | No | List | A list of string arguments | None |\n",
Expand Down Expand Up @@ -263,6 +264,7 @@
"source": [
"# Required Parameters\n",
"PROJECT_ID = '<Please put your project ID here>'\n",
"REGION = '<Please put a GCP region here>'\n"
"GCS_STAGING_DIR = 'gs://<Please put your GCS path here>' # No ending slash"
]
},
Expand Down Expand Up @@ -299,6 +301,7 @@
"def pipeline(\n",
" python_file_path = 'gs://ml-pipeline-playground/samples/dataflow/wc/wc.py',\n",
" project_id = PROJECT_ID,\n",
" region = REGION,\n",
" staging_dir = GCS_STAGING_DIR,\n",
" requirements_file_path = 'gs://ml-pipeline-playground/samples/dataflow/wc/requirements.txt',\n",
" args = json.dumps([\n",
Expand All @@ -309,6 +312,7 @@
" dataflow_python_op(\n",
" python_file_path = python_file_path, \n",
" project_id = project_id, \n",
" region = region, \n",
" staging_dir = staging_dir, \n",
" requirements_file_path = requirements_file_path, \n",
" args = args,\n",
Expand Down Expand Up @@ -412,4 +416,4 @@
},
"nbformat": 4,
"nbformat_minor": 2
}
}

0 comments on commit 8463992

Please sign in to comment.