From 99313e0baacd61d7d00d6576a22b151c1d8e1a49 Mon Sep 17 00:00:00 2001 From: Mike <34043825+Mlawrence95@users.noreply.github.com> Date: Fri, 9 Dec 2022 13:31:27 -0800 Subject: [PATCH] feat: Adds the temporal fusion transformer (TFT) forecasting job COPYBARA_INTEGRATE_REVIEW=https://github.com/googleapis/python-aiplatform/pull/1817 from mikelawrence-google:mikealawrence-add-tft-model-support dde8ac0569baa8f9245674cb3d9afc1d875b8f04 PiperOrigin-RevId: 494251134 --- google/cloud/aiplatform/__init__.py | 2 + google/cloud/aiplatform/schema.py | 1 + google/cloud/aiplatform/training_jobs.py | 12 +++ .../system/aiplatform/test_e2e_forecasting.py | 5 +- .../test_automl_forecasting_training_jobs.py | 102 ++++-------------- 5 files changed, 36 insertions(+), 86 deletions(-) diff --git a/google/cloud/aiplatform/__init__.py b/google/cloud/aiplatform/__init__.py index d6f12c0e7b..cb7f08b8aa 100644 --- a/google/cloud/aiplatform/__init__.py +++ b/google/cloud/aiplatform/__init__.py @@ -68,6 +68,7 @@ AutoMLTabularTrainingJob, AutoMLForecastingTrainingJob, SequenceToSequencePlusForecastingTrainingJob, + TemporalFusionTransformerForecastingTrainingJob, AutoMLImageTrainingJob, AutoMLTextTrainingJob, AutoMLVideoTrainingJob, @@ -162,6 +163,7 @@ "TensorboardRun", "TensorboardTimeSeries", "TextDataset", + "TemporalFusionTransformerForecastingTrainingJob", "TimeSeriesDataset", "VideoDataset", ) diff --git a/google/cloud/aiplatform/schema.py b/google/cloud/aiplatform/schema.py index 96a7a50bbd..9436283fe1 100644 --- a/google/cloud/aiplatform/schema.py +++ b/google/cloud/aiplatform/schema.py @@ -24,6 +24,7 @@ class definition: automl_tabular = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_tabular_1.0.0.yaml" automl_forecasting = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_time_series_forecasting_1.0.0.yaml" seq2seq_plus_forecasting = "gs://google-cloud-aiplatform/schema/trainingjob/definition/seq2seq_plus_time_series_forecasting_1.0.0.yaml" + tft_forecasting = "gs://google-cloud-aiplatform/schema/trainingjob/definition/temporal_fusion_transformer_time_series_forecasting_1.0.0.yaml" automl_image_classification = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_image_classification_1.0.0.yaml" automl_image_object_detection = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_image_object_detection_1.0.0.yaml" automl_text_classification = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_text_classification_1.0.0.yaml" diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index e8aa9c0f3d..e76cc04465 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -5204,12 +5204,16 @@ class column_data_types: class AutoMLForecastingTrainingJob(_ForecastingTrainingJob): + """Class to train AutoML forecasting models.""" + _model_type = "AutoML" _training_task_definition = schema.training_job.definition.automl_forecasting _supported_training_schemas = (schema.training_job.definition.automl_forecasting,) class SequenceToSequencePlusForecastingTrainingJob(_ForecastingTrainingJob): + """Class to train Sequence to Sequence (Seq2Seq) forecasting models.""" + _model_type = "Seq2Seq" _training_task_definition = schema.training_job.definition.seq2seq_plus_forecasting _supported_training_schemas = ( @@ -5217,6 +5221,14 @@ class SequenceToSequencePlusForecastingTrainingJob(_ForecastingTrainingJob): ) +class TemporalFusionTransformerForecastingTrainingJob(_ForecastingTrainingJob): + """Class to train Temporal Fusion Transformer (TFT) forecasting models.""" + + _model_type = "TFT" + _training_task_definition = schema.training_job.definition.tft_forecasting + _supported_training_schemas = (schema.training_job.definition.tft_forecasting,) + + class AutoMLImageTrainingJob(_TrainingJob): _supported_training_schemas = ( schema.training_job.definition.automl_image_classification, diff --git a/tests/system/aiplatform/test_e2e_forecasting.py b/tests/system/aiplatform/test_e2e_forecasting.py index 024946b91b..6347bd8b8d 100644 --- a/tests/system/aiplatform/test_e2e_forecasting.py +++ b/tests/system/aiplatform/test_e2e_forecasting.py @@ -40,9 +40,10 @@ class TestEndToEndForecasting(e2e_base.TestEndToEnd): "training_job", [ training_jobs.AutoMLForecastingTrainingJob, + training_jobs.SequenceToSequencePlusForecastingTrainingJob, pytest.param( - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - marks=pytest.mark.skip(reason="Seq2Seq not yet released."), + training_jobs.TemporalFusionTransformerForecastingTrainingJob, + marks=pytest.mark.skip(reason="TFT not yet released."), ), ], ) diff --git a/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py b/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py index c2d79d818d..3788a36868 100644 --- a/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py +++ b/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py @@ -183,6 +183,12 @@ _TEST_SPLIT_PREDEFINED_COLUMN_NAME = "split" _TEST_SPLIT_TIMESTAMP_COLUMN_NAME = "timestamp" +_FORECASTING_JOB_MODEL_TYPES = [ + training_jobs.AutoMLForecastingTrainingJob, + training_jobs.SequenceToSequencePlusForecastingTrainingJob, + training_jobs.TemporalFusionTransformerForecastingTrainingJob, +] + @pytest.fixture def mock_pipeline_service_create(): @@ -293,13 +299,7 @@ def teardown_method(self): @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) @pytest.mark.parametrize("sync", [True, False]) - @pytest.mark.parametrize( - "training_job", - [ - training_jobs.AutoMLForecastingTrainingJob, - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - ], - ) + @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_run_call_pipeline_service_create( self, mock_pipeline_service_create, @@ -401,13 +401,7 @@ def test_run_call_pipeline_service_create( @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) @pytest.mark.parametrize("sync", [True, False]) - @pytest.mark.parametrize( - "training_job", - [ - training_jobs.AutoMLForecastingTrainingJob, - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - ], - ) + @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_run_call_pipeline_service_create_with_timeout( self, mock_pipeline_service_create, @@ -496,13 +490,7 @@ def test_run_call_pipeline_service_create_with_timeout( @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) @pytest.mark.usefixtures("mock_pipeline_service_get") @pytest.mark.parametrize("sync", [True, False]) - @pytest.mark.parametrize( - "training_job", - [ - training_jobs.AutoMLForecastingTrainingJob, - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - ], - ) + @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_run_call_pipeline_if_no_model_display_name_nor_model_labels( self, mock_pipeline_service_create, @@ -584,13 +572,7 @@ def test_run_call_pipeline_if_no_model_display_name_nor_model_labels( @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) @pytest.mark.usefixtures("mock_pipeline_service_get") @pytest.mark.parametrize("sync", [True, False]) - @pytest.mark.parametrize( - "training_job", - [ - training_jobs.AutoMLForecastingTrainingJob, - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - ], - ) + @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_run_call_pipeline_if_set_additional_experiments( self, mock_pipeline_service_create, @@ -675,13 +657,7 @@ def test_run_call_pipeline_if_set_additional_experiments( "mock_model_service_get", ) @pytest.mark.parametrize("sync", [True, False]) - @pytest.mark.parametrize( - "training_job", - [ - training_jobs.AutoMLForecastingTrainingJob, - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - ], - ) + @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_run_called_twice_raises( self, mock_dataset_time_series, @@ -762,13 +738,7 @@ def test_run_called_twice_raises( @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) @pytest.mark.parametrize("sync", [True, False]) - @pytest.mark.parametrize( - "training_job", - [ - training_jobs.AutoMLForecastingTrainingJob, - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - ], - ) + @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_run_raises_if_pipeline_fails( self, mock_pipeline_service_create_and_get_with_fail, @@ -823,13 +793,7 @@ def test_run_raises_if_pipeline_fails( with pytest.raises(RuntimeError): job.get_model() - @pytest.mark.parametrize( - "training_job", - [ - training_jobs.AutoMLForecastingTrainingJob, - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - ], - ) + @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_raises_before_run_is_called( self, mock_pipeline_service_create, @@ -855,13 +819,7 @@ def test_raises_before_run_is_called( @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) @pytest.mark.parametrize("sync", [True, False]) - @pytest.mark.parametrize( - "training_job", - [ - training_jobs.AutoMLForecastingTrainingJob, - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - ], - ) + @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_splits_fraction( self, mock_pipeline_service_create, @@ -960,13 +918,7 @@ def test_splits_fraction( @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) @pytest.mark.parametrize("sync", [True, False]) - @pytest.mark.parametrize( - "training_job", - [ - training_jobs.AutoMLForecastingTrainingJob, - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - ], - ) + @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_splits_timestamp( self, mock_pipeline_service_create, @@ -1067,13 +1019,7 @@ def test_splits_timestamp( @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) @pytest.mark.parametrize("sync", [True, False]) - @pytest.mark.parametrize( - "training_job", - [ - training_jobs.AutoMLForecastingTrainingJob, - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - ], - ) + @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_splits_predefined( self, mock_pipeline_service_create, @@ -1168,13 +1114,7 @@ def test_splits_predefined( @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) @pytest.mark.parametrize("sync", [True, False]) - @pytest.mark.parametrize( - "training_job", - [ - training_jobs.AutoMLForecastingTrainingJob, - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - ], - ) + @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_splits_default( self, mock_pipeline_service_create, @@ -1264,13 +1204,7 @@ def test_splits_default( @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) @pytest.mark.usefixtures("mock_pipeline_service_get") @pytest.mark.parametrize("sync", [True, False]) - @pytest.mark.parametrize( - "training_job", - [ - training_jobs.AutoMLForecastingTrainingJob, - training_jobs.SequenceToSequencePlusForecastingTrainingJob, - ], - ) + @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_run_call_pipeline_if_set_additional_experiments_probabilistic_inference( self, mock_pipeline_service_create,