-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(components): Added the Google Cloud Optimizer components (#4621)
* Components - Added the Google Cloud Optimizer components * Added the sample pipeline
- Loading branch information
Showing
7 changed files
with
958 additions
and
0 deletions.
There are no files selected for viewing
118 changes: 118 additions & 0 deletions
118
components/google-cloud/Optimizer/Add_measurement_for_trial/component.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
from typing import NamedTuple | ||
|
||
from kfp.components import create_component_from_func | ||
|
||
def add_measurement_for_trial_in_gcp_ai_platform_optimizer( | ||
trial_name: str, | ||
metric_value: float, | ||
complete_trial: bool = True, | ||
step_count: float = None, | ||
gcp_project_id: str = None, | ||
gcp_region: str = "us-central1", | ||
) -> NamedTuple('Outputs', [ | ||
("trial_name", list), | ||
("trial", dict), | ||
("stop_trial", bool), | ||
]): | ||
"""Add measurement for a trial and check whether to continue. | ||
See https://cloud.google.com/ai-platform/optimizer/docs | ||
Annotations: | ||
author: Alexey Volkov <alexey.volkov@ark-kun.com> | ||
Args: | ||
trial_name: Full trial resource name. | ||
metric_value: Result of the trial evaluation. | ||
step_count: Optional. The number of training steps performed with the model. Can be used when checking early stopping. | ||
complete_trial: Whether the trial should be completed. Only completed trials are used to suggest new trials. Default is True. | ||
""" | ||
|
||
import logging | ||
import time | ||
|
||
import google.auth | ||
from googleapiclient import discovery | ||
|
||
logging.getLogger().setLevel(logging.INFO) | ||
|
||
client_id = 'client1' | ||
metric_name = 'metric' | ||
|
||
credentials, default_project_id = google.auth.default() | ||
|
||
# Validating and inferring the arguments | ||
if not gcp_project_id: | ||
gcp_project_id = default_project_id | ||
|
||
# Building the API client. | ||
# The main API does not work, so we need to build from the published discovery document. | ||
def create_caip_optimizer_client(project_id): | ||
from google.cloud import storage | ||
_OPTIMIZER_API_DOCUMENT_BUCKET = 'caip-optimizer-public' | ||
_OPTIMIZER_API_DOCUMENT_FILE = 'api/ml_public_google_rest_v1.json' | ||
client = storage.Client(project_id) | ||
bucket = client.get_bucket(_OPTIMIZER_API_DOCUMENT_BUCKET) | ||
blob = bucket.get_blob(_OPTIMIZER_API_DOCUMENT_FILE) | ||
discovery_document = blob.download_as_string() | ||
return discovery.build_from_document(service=discovery_document) | ||
|
||
# Workaround for the Optimizer bug: Optimizer returns resource names that use project number, but only supports resource names with project IDs when making requests | ||
def get_project_number(project_id): | ||
service = discovery.build('cloudresourcemanager', 'v1', credentials=credentials) | ||
response = service.projects().get(projectId=project_id).execute() | ||
return response['projectNumber'] | ||
|
||
gcp_project_number = get_project_number(gcp_project_id) | ||
|
||
def fix_resource_name(name): | ||
return name.replace(gcp_project_number, gcp_project_id) | ||
|
||
ml_api = create_caip_optimizer_client(gcp_project_id) | ||
trials_api = ml_api.projects().locations().studies().trials() | ||
operations_api = ml_api.projects().locations().operations() | ||
|
||
measurement = { | ||
'measurement': { | ||
'stepCount': step_count, | ||
'metrics': [{ | ||
'metric': metric_name, | ||
'value': metric_value, | ||
}], | ||
}, | ||
} | ||
add_measurement_response = trials_api.addMeasurement( | ||
name=fix_resource_name(trial_name), | ||
body=measurement, | ||
).execute() | ||
|
||
if complete_trial: | ||
should_stop_trial = True | ||
complete_response = trials_api.complete( | ||
name=fix_resource_name(trial_name), | ||
).execute() | ||
return (trial_name, complete_response, should_stop_trial) | ||
else: | ||
check_early_stopping_response = trials_api.checkEarlyStoppingState( | ||
name=fix_resource_name(trial_name), | ||
).execute() | ||
operation_name = check_early_stopping_response['name'] | ||
while True: | ||
get_operation_response = operations_api.get( | ||
name=fix_resource_name(operation_name), | ||
).execute() | ||
if get_operation_response.get('done'): | ||
break | ||
logging.info('Not finished yet: ' + str(get_operation_response)) | ||
time.sleep(10) | ||
operation_response = get_operation_response['response'] | ||
should_stop_trial = operation_response['shouldStop'] | ||
return (trial_name, add_measurement_response, should_stop_trial) | ||
|
||
|
||
if __name__ == '__main__': | ||
add_measurement_for_trial_in_gcp_ai_platform_optimizer_op = create_component_from_func( | ||
add_measurement_for_trial_in_gcp_ai_platform_optimizer, | ||
base_image='python:3.8', | ||
packages_to_install=['google-api-python-client==1.12.3', 'google-cloud-storage==1.31.2', 'google-auth==1.21.3'], | ||
output_component_file='component.yaml', | ||
) |
216 changes: 216 additions & 0 deletions
216
components/google-cloud/Optimizer/Add_measurement_for_trial/component.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
name: Add measurement for trial in gcp ai platform optimizer | ||
description: Add measurement for a trial and check whether to continue. | ||
inputs: | ||
- {name: trial_name, type: String, description: Full trial resource name.} | ||
- {name: metric_value, type: Float, description: Result of the trial evaluation.} | ||
- name: complete_trial | ||
type: Boolean | ||
description: Whether the trial should be completed. Only completed trials are used | ||
to suggest new trials. Default is True. | ||
default: "True" | ||
optional: true | ||
- {name: step_count, type: Float, description: Optional. The number of training steps | ||
performed with the model. Can be used when checking early stopping., optional: true} | ||
- {name: gcp_project_id, type: String, optional: true} | ||
- {name: gcp_region, type: String, default: us-central1, optional: true} | ||
outputs: | ||
- {name: trial_name, type: JsonArray} | ||
- {name: trial, type: JsonObject} | ||
- {name: stop_trial, type: Boolean} | ||
implementation: | ||
container: | ||
image: python:3.8 | ||
command: | ||
- sh | ||
- -c | ||
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location | ||
'google-api-python-client==1.12.3' 'google-cloud-storage==1.31.2' 'google-auth==1.21.3' | ||
|| PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location | ||
'google-api-python-client==1.12.3' 'google-cloud-storage==1.31.2' 'google-auth==1.21.3' | ||
--user) && "$0" "$@" | ||
- python3 | ||
- -u | ||
- -c | ||
- | | ||
def add_measurement_for_trial_in_gcp_ai_platform_optimizer( | ||
trial_name, | ||
metric_value, | ||
complete_trial = True, | ||
step_count = None, | ||
gcp_project_id = None, | ||
gcp_region = "us-central1", | ||
): | ||
"""Add measurement for a trial and check whether to continue. | ||
See https://cloud.google.com/ai-platform/optimizer/docs | ||
Annotations: | ||
author: Alexey Volkov <alexey.volkov@ark-kun.com> | ||
Args: | ||
trial_name: Full trial resource name. | ||
metric_value: Result of the trial evaluation. | ||
step_count: Optional. The number of training steps performed with the model. Can be used when checking early stopping. | ||
complete_trial: Whether the trial should be completed. Only completed trials are used to suggest new trials. Default is True. | ||
""" | ||
import logging | ||
import time | ||
import google.auth | ||
from googleapiclient import discovery | ||
logging.getLogger().setLevel(logging.INFO) | ||
client_id = 'client1' | ||
metric_name = 'metric' | ||
credentials, default_project_id = google.auth.default() | ||
# Validating and inferring the arguments | ||
if not gcp_project_id: | ||
gcp_project_id = default_project_id | ||
# Building the API client. | ||
# The main API does not work, so we need to build from the published discovery document. | ||
def create_caip_optimizer_client(project_id): | ||
from google.cloud import storage | ||
_OPTIMIZER_API_DOCUMENT_BUCKET = 'caip-optimizer-public' | ||
_OPTIMIZER_API_DOCUMENT_FILE = 'api/ml_public_google_rest_v1.json' | ||
client = storage.Client(project_id) | ||
bucket = client.get_bucket(_OPTIMIZER_API_DOCUMENT_BUCKET) | ||
blob = bucket.get_blob(_OPTIMIZER_API_DOCUMENT_FILE) | ||
discovery_document = blob.download_as_string() | ||
return discovery.build_from_document(service=discovery_document) | ||
# Workaround for the Optimizer bug: Optimizer returns resource names that use project number, but only supports resource names with project IDs when making requests | ||
def get_project_number(project_id): | ||
service = discovery.build('cloudresourcemanager', 'v1', credentials=credentials) | ||
response = service.projects().get(projectId=project_id).execute() | ||
return response['projectNumber'] | ||
gcp_project_number = get_project_number(gcp_project_id) | ||
def fix_resource_name(name): | ||
return name.replace(gcp_project_number, gcp_project_id) | ||
ml_api = create_caip_optimizer_client(gcp_project_id) | ||
trials_api = ml_api.projects().locations().studies().trials() | ||
operations_api = ml_api.projects().locations().operations() | ||
measurement = { | ||
'measurement': { | ||
'stepCount': step_count, | ||
'metrics': [{ | ||
'metric': metric_name, | ||
'value': metric_value, | ||
}], | ||
}, | ||
} | ||
add_measurement_response = trials_api.addMeasurement( | ||
name=fix_resource_name(trial_name), | ||
body=measurement, | ||
).execute() | ||
if complete_trial: | ||
should_stop_trial = True | ||
complete_response = trials_api.complete( | ||
name=fix_resource_name(trial_name), | ||
).execute() | ||
return (trial_name, complete_response, should_stop_trial) | ||
else: | ||
check_early_stopping_response = trials_api.checkEarlyStoppingState( | ||
name=fix_resource_name(trial_name), | ||
).execute() | ||
operation_name = check_early_stopping_response['name'] | ||
while True: | ||
get_operation_response = operations_api.get( | ||
name=fix_resource_name(operation_name), | ||
).execute() | ||
if get_operation_response.get('done'): | ||
break | ||
logging.info('Not finished yet: ' + str(get_operation_response)) | ||
time.sleep(10) | ||
operation_response = get_operation_response['response'] | ||
should_stop_trial = operation_response['shouldStop'] | ||
return (trial_name, add_measurement_response, should_stop_trial) | ||
def _serialize_bool(bool_value: bool) -> str: | ||
if isinstance(bool_value, str): | ||
return bool_value | ||
if not isinstance(bool_value, bool): | ||
raise TypeError('Value "{}" has type "{}" instead of bool.'.format(str(bool_value), str(type(bool_value)))) | ||
return str(bool_value) | ||
def _serialize_json(obj) -> str: | ||
if isinstance(obj, str): | ||
return obj | ||
import json | ||
def default_serializer(obj): | ||
if hasattr(obj, 'to_struct'): | ||
return obj.to_struct() | ||
else: | ||
raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__) | ||
return json.dumps(obj, default=default_serializer, sort_keys=True) | ||
def _deserialize_bool(s) -> bool: | ||
from distutils.util import strtobool | ||
return strtobool(s) == 1 | ||
import argparse | ||
_parser = argparse.ArgumentParser(prog='Add measurement for trial in gcp ai platform optimizer', description='Add measurement for a trial and check whether to continue.') | ||
_parser.add_argument("--trial-name", dest="trial_name", type=str, required=True, default=argparse.SUPPRESS) | ||
_parser.add_argument("--metric-value", dest="metric_value", type=float, required=True, default=argparse.SUPPRESS) | ||
_parser.add_argument("--complete-trial", dest="complete_trial", type=_deserialize_bool, required=False, default=argparse.SUPPRESS) | ||
_parser.add_argument("--step-count", dest="step_count", type=float, required=False, default=argparse.SUPPRESS) | ||
_parser.add_argument("--gcp-project-id", dest="gcp_project_id", type=str, required=False, default=argparse.SUPPRESS) | ||
_parser.add_argument("--gcp-region", dest="gcp_region", type=str, required=False, default=argparse.SUPPRESS) | ||
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=3) | ||
_parsed_args = vars(_parser.parse_args()) | ||
_output_files = _parsed_args.pop("_output_paths", []) | ||
_outputs = add_measurement_for_trial_in_gcp_ai_platform_optimizer(**_parsed_args) | ||
_output_serializers = [ | ||
_serialize_json, | ||
_serialize_json, | ||
_serialize_bool, | ||
] | ||
import os | ||
for idx, output_file in enumerate(_output_files): | ||
try: | ||
os.makedirs(os.path.dirname(output_file)) | ||
except OSError: | ||
pass | ||
with open(output_file, 'w') as f: | ||
f.write(_output_serializers[idx](_outputs[idx])) | ||
args: | ||
- --trial-name | ||
- {inputValue: trial_name} | ||
- --metric-value | ||
- {inputValue: metric_value} | ||
- if: | ||
cond: {isPresent: complete_trial} | ||
then: | ||
- --complete-trial | ||
- {inputValue: complete_trial} | ||
- if: | ||
cond: {isPresent: step_count} | ||
then: | ||
- --step-count | ||
- {inputValue: step_count} | ||
- if: | ||
cond: {isPresent: gcp_project_id} | ||
then: | ||
- --gcp-project-id | ||
- {inputValue: gcp_project_id} | ||
- if: | ||
cond: {isPresent: gcp_region} | ||
then: | ||
- --gcp-region | ||
- {inputValue: gcp_region} | ||
- '----output-paths' | ||
- {outputPath: trial_name} | ||
- {outputPath: trial} | ||
- {outputPath: stop_trial} |
Oops, something went wrong.