Skip to content

Commit

Permalink
feat(components): Add location validation to preview.llm.rlhf_pipeline
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 616229944
  • Loading branch information
Googler committed Mar 15, 2024
1 parent 4d90770 commit 361c16f
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 44 deletions.
1 change: 1 addition & 0 deletions components/google-cloud/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Update the documentation of `GetModel`.
* Add CMEK support to `preview.model_evaluation.autosxs_pipeline`.
* Updated component and pipeline inputs/outputs to support creating ModelEvaluations for ModelRegistry models in the AutoSxS pipeline.
* Add DRZ-at-rest to `preview.llm.rlhf_pipeline`.

## Release 2.10.0
* Fix the missing output of pipeline remote runner. `AutoMLImageTrainingJobRunOp` now passes the model artifacts correctly to downstream components.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def pipeline(
model_display_name: Optional[str] = None,
deploy_model: bool = True,
encryption_spec_key_name: str = '',
upload_location: str = _placeholders.LOCATION_PLACEHOLDER,
) -> PipelineOutput:
# fmt: off
"""Uploads a tuned language model and (optionally) deploys it to an endpoint.
Expand All @@ -47,13 +48,13 @@ def pipeline(
model_display_name: Name of the fine-tuned model shown in the Model Registry. If not provided, a default name will be created.
deploy_model: Whether to deploy the model to an endpoint in `us-central1`. Default is True.
encryption_spec_key_name: Customer-managed encryption key. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. Note that this is not supported for TPU at the moment.
upload_location: Region to upload and deploy the model to. Default is the location used to run the pipeline components.
Returns:
model_resource_name: Path to the model uploaded to the Model Registry. This will be an empty string if the model was not deployed.
endpoint_resource_name: Path the Online Prediction Endpoint. This will be an empty string if the model was not deployed.
"""
# fmt: on
upload_location = 'us-central1'
adapter_artifact = kfp.dsl.importer(
artifact_uri=output_adapter_path,
artifact_class=kfp.dsl.Artifact,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,59 +22,69 @@

@dsl.component(base_image=_image.GCPC_IMAGE_TAG, install_kfp_package=False)
def resolve_machine_spec(
location: str,
accelerator_type: str = '',
use_test_spec: bool = False,
) -> NamedTuple(
'MachineSpec', machine_type=str, accelerator_type=str, accelerator_count=int
'MachineSpec',
machine_type=str,
tuning_location=str,
accelerator_type=str,
accelerator_count=int,
):
"""Returns machine spec to use for a given location.
"""Returns machine spec to use for a given accelerator_type.
Args:
location: Where the machine will run.
accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning
components run in europe-west4. Otherwise tuning components run in
us-central1 on GPUs. Default is 'GPU'.
use_test_spec: Whether to use a lower resource machine for testing.
Returns:
Machine spec.
tuning_location: Where the machine will run.
Raises:
ValueError: If accelerators are requested in an unsupported location.
"""
outputs = NamedTuple(
'MachineSpec',
machine_type=str,
accelerator_type=str,
accelerator_count=int,
tuning_location=str,
accelerator_type=str,
)
tpu_regions = {'europe-west4'}
gpu_regions = {'us-central1'}
if use_test_spec:
if location in tpu_regions:
if accelerator_type == 'TPU':
return outputs(
machine_type='cloud-tpu',
accelerator_type='TPU_V3',
accelerator_count=32,
tuning_location='europe-west4',
)
else:
return outputs(
machine_type='a2-highgpu-1g',
accelerator_type='NVIDIA_TESLA_A100',
accelerator_count=1,
tuning_location='us-central1',
)
elif location in tpu_regions:
elif accelerator_type == 'TPU':
return outputs(
machine_type='cloud-tpu',
accelerator_type='TPU_V3',
accelerator_count=64,
tuning_location='europe-west4',
)
elif location in gpu_regions:
elif accelerator_type == 'GPU':
return outputs(
machine_type='a2-ultragpu-8g',
accelerator_type='NVIDIA_A100_80GB',
accelerator_count=8,
tuning_location='us-central1',
)
raise ValueError(
f'Unsupported accelerator location {location}. Must be one of'
f' {tpu_regions | gpu_regions}.'
f'Unsupported accelerator type {accelerator_type}. Must be one of'
'TPU or GPU.'
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def pipeline(
kl_coeff: float = 0.1,
instruction: Optional[str] = None,
project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
accelerator_type: str = 'GPU',
location: str = _placeholders.LOCATION_PLACEHOLDER,
tensorboard_resource_id: Optional[str] = None,
encryption_spec_key_name: str = '',
Expand All @@ -73,7 +74,8 @@ def pipeline(
kl_coeff: Coefficient for KL penalty. This regularizes the policy model and penalizes if it diverges from its initial distribution. If set to 0, the reference language model is not loaded into memory. Default value is 0.1.
instruction: This field lets the model know what task it needs to perform. Base models have been trained over a large set of varied instructions. You can give a simple and intuitive description of the task and the model will follow it, e.g. "Classify this movie review as positive or negative" or "Translate this sentence to Danish". Do not specify this if your dataset already prepends the instruction to the inputs field.
project: Project used to run custom jobs. If not specified the project used to run the pipeline will be used.
location: Location used to run custom jobs. If not specified the location used to run the pipeline will be used.
accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning components run in europe-west4. Otherwise tuning components run in us-central1 on GPUs. Default is 'GPU'.
location: Location used to run non-tuning components, i.e. components that do not require accelerators. If not specified the location used to run the pipeline will be used.
tensorboard_resource_id: Optional tensorboard resource id in format `projects/{project_number}/locations/{location}/tensorboards/{tensorboard_id}`. If provided, tensorboard metrics will be uploaded to this location.
encryption_spec_key_name: Customer-managed encryption key. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. Note that this is not supported for TPU at the moment.
Expand All @@ -84,7 +86,8 @@ def pipeline(
# fmt: on
prompt_column = 'input_text'
machine_spec = function_based.resolve_machine_spec(
location=location, use_test_spec=env.get_use_test_machine_spec()
accelerator_type=accelerator_type,
use_test_spec=env.get_use_test_machine_spec(),
).set_display_name('Resolve Machine Spec')

reference_model_metadata = function_based.resolve_reference_model_metadata(
Expand Down Expand Up @@ -126,7 +129,7 @@ def pipeline(
rl_model = (
reinforcer.reinforcer(
project=project,
location=location,
location=machine_spec.outputs['tuning_location'],
input_reference_model_path=reference_model_metadata.outputs[
'reference_model_path'
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def pipeline(
eval_dataset: Optional[str] = None,
instruction: Optional[str] = None,
project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
accelerator_type: str = 'GPU',
location: str = _placeholders.LOCATION_PLACEHOLDER,
tensorboard_resource_id: Optional[str] = None,
encryption_spec_key_name: str = '',
Expand All @@ -66,7 +67,8 @@ def pipeline(
reward_model_train_steps: Number of steps to use when training a reward model. Default value is 1000.
instruction: This field lets the model know what task it needs to perform. Base models have been trained over a large set of varied instructions. You can give a simple and intuitive description of the task and the model will follow it, e.g. "Classify this movie review as positive or negative" or "Translate this sentence to Danish". Do not specify this if your dataset already prepends the instruction to the inputs field.
project: Project used to run custom jobs. If not specified the project used to run the pipeline will be used.
location: Location used to run custom jobs. If not specified the location used to run the pipeline will be used.
accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning components run in europe-west4. Otherwise tuning components run in us-central1 on GPUs. Default is 'GPU'.
location: Location used to run non-tuning components, i.e. components that do not require accelerators. If not specified the location used to run the pipeline will be used.
tensorboard_resource_id: Optional tensorboard resource id in format `projects/{project_number}/locations/{location}/tensorboards/{tensorboard_id}`. If provided, tensorboard metrics will be uploaded to this location.
encryption_spec_key_name: Customer-managed encryption key. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. Note that this is not supported for TPU at the moment.
Expand All @@ -80,7 +82,8 @@ def pipeline(
candidate_columns = ['candidate_0', 'candidate_1']
choice_column = 'choice'
machine_spec = function_based.resolve_machine_spec(
location=location, use_test_spec=env.get_use_test_machine_spec()
accelerator_type=accelerator_type,
use_test_spec=env.get_use_test_machine_spec(),
).set_display_name('Resolve Machine Spec')

reference_model_metadata = function_based.resolve_reference_model_metadata(
Expand Down Expand Up @@ -150,7 +153,7 @@ def pipeline(
reward_model = (
reward_model_trainer.reward_model_trainer(
project=project,
location=location,
location=machine_spec.outputs['tuning_location'],
input_model_path=reference_model_metadata.outputs[
'reward_model_path'
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,22 @@
def validate_pipeline(
location: str,
encryption_spec_key_name: str = '',
machine_type: str = '',
accelerator_type: str = '',
eval_dataset: Optional[str] = None,
) -> NamedTuple('PreprocessedInputs', reward_model_eval_dataset=str):
# fmt: off
"""Validates and preprocesses RLHF pipeline parameters.
Args:
location: Region where all jobs run.
location: Location used to run non-tuning components, i.e. components
that do not require accelerators. If not specified the location used
to run the pipeline will be used.
encryption_spec_key_name: If set, CMEK support will be validated.
machine_type: Machine used to run training jobs.
eval_dataset: Optional Cloud storage path to an evaluation dataset. The format should match that of the preference dataset.
pipeline_location: Region where the pipeline is running.
Returns:
reward_model_eval_dataset: Path to evaluation dataset to use when training a reward model.
accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning
components run in europe-west4. Otherwise tuning components run in
us-central1 on GPUs. Default is 'GPU'.
eval_dataset: Optional Cloud storage path to an evaluation dataset. The
format should match that of the preference dataset.
"""
# fmt: on
# pylint: disable=g-import-not-at-top,import-outside-toplevel
Expand Down Expand Up @@ -76,15 +77,7 @@ def validate_pipeline(
if not eval_dataset or i >= max_lines_to_check:
break
# ]

# [ Check CMEK
if 'gpu' in machine_type:
accelerator_type = 'GPU'
elif 'tpu' in machine_type:
accelerator_type = 'TPU'
else:
accelerator_type = None

supported_pipeline_regions = {
'europe-west4',
'us-central1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def infer_pipeline(
sampling_strategy: str = 'greedy',
instruction: Optional[str] = None,
project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
accelerator_type: str = 'GPU',
location: str = _placeholders.LOCATION_PLACEHOLDER,
encryption_spec_key_name: str = '',
) -> PipelineOutput:
Expand All @@ -56,7 +57,8 @@ def infer_pipeline(
sampling_strategy: This field specifies the sampling strategy. The valid options are 'greedy' and 'temperature_sampling'.
instruction: This field lets the model know what task it needs to perform. Base models have been trained over a large set of varied instructions. You can give a simple and intuitive description of the task and the model will follow it, e.g. "Classify this movie review as positive or negative" or "Translate this sentence to Danish". Do not specify this if your dataset already prepends the instruction to the inputs field.
project: Project used to run custom jobs. If not specified the project used to run the pipeline will be used.
location: Location used to run custom jobs. If not specified the location used to run the pipeline will be used.
accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning components run in europe-west4. Otherwise tuning components run in us-central1 on GPUs. Default is 'GPU'.
location: Location used to run non-tuning components, i.e. components that do not require accelerators. If not specified the location used to run the pipeline will be used.
encryption_spec_key_name: Customer-managed encryption key. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. Note that this is not supported for TPU at the moment.
Returns:
Expand All @@ -65,7 +67,7 @@ def infer_pipeline(
# fmt: on
prompt_column = 'input_text'
machine_spec = function_based.resolve_machine_spec(
location=location,
accelerator_type=accelerator_type,
use_test_spec=env.get_use_test_machine_spec(),
).set_display_name('Resolve Machine Spec')
reference_model_metadata = function_based.resolve_reference_model_metadata(
Expand Down Expand Up @@ -107,7 +109,7 @@ def infer_pipeline(
).set_display_name('Resolve Bulk Inferrer Image URI')
bulk_inference = bulk_inferrer.bulk_inferrer(
project=project,
location=location,
location=machine_spec.outputs['tuning_location'],
input_model=reference_model_metadata.outputs['reference_model_path'],
input_dataset_path=prompt_dataset_importer.outputs['imported_data_path'],
dataset_split=env.TRAIN_SPLIT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def rlaif_pipeline(
instruction: Optional[str] = None,
eval_dataset: Optional[str] = None,
project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
accelerator_type: str = 'GPU',
location: str = _placeholders.LOCATION_PLACEHOLDER,
tensorboard_resource_id: Optional[str] = None,
) -> PipelineOutput:
Expand All @@ -79,6 +80,7 @@ def rlaif_pipeline(
instruction: This field lets the model know what task it needs to perform. Base models have been trained over a large set of varied instructions. You can give a simple and intuitive description of the task and the model will follow it, e.g., "Classify this movie review as positive or negative" or "Translate this sentence to Danish". Do not specify this if your dataset already prepends the instruction to the inputs field.
eval_dataset: Optional Cloud storage path to an evaluation dataset. If provided, inference will be performed on this dataset after training. The dataset format is jsonl. Each example in the dataset must contain a field `input_text` that contains the prompt.
project: Project used to run custom jobs. If not specified the project used to run the pipeline will be used.
accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning components run in europe-west4. Otherwise tuning components run in us-central1 on GPUs. Default is 'GPU'.
location: Location used to run custom jobs. If not specified the location used to run the pipeline will be used.
tensorboard_resource_id: Optional tensorboard resource id in format `projects/{project_number}/locations/{location}/tensorboards/{tensorboard_id}`. If provided, tensorboard metrics will be uploaded to this location.
Expand All @@ -100,6 +102,7 @@ def rlaif_pipeline(
instruction=instruction,
project=project,
location=location,
accelerator_type=accelerator_type,
).set_display_name('Inferrer A')
output_prediction_gcs_path_b = infer.infer_pipeline(
large_model_reference=large_model_b_reference,
Expand All @@ -110,6 +113,7 @@ def rlaif_pipeline(
instruction=instruction,
project=project,
location=location,
accelerator_type=accelerator_type,
).set_display_name('Inferrer B')

inference_output_uri = (
Expand Down Expand Up @@ -155,6 +159,7 @@ def rlaif_pipeline(
project=project,
location=location,
tensorboard_resource_id=tensorboard_resource_id,
accelerator_type=accelerator_type,
)
.set_display_name('Reinforcement Learning From AI Feedback')
.outputs
Expand Down
Loading

0 comments on commit 361c16f

Please sign in to comment.