The recommended GCP solution for scalable Spark based ML code interactive authoring is Serverless Spark notebooks on Vertex AI Workbench, Managed Notebooks. In this lab module, we will go through the typical data science/ML engineering work - preprocess data, train & test model, tune model, and do some scoring. Since this lab is focused on demystifying the integration, the notebooks are pre-created for you, so you can quickly understand the integration.
Telco Customer Churn Prediction with a Kaggle dataset and Spark MLLib, Random Forest Classifer
The environment for the module is Vertex AI Workbench, Managed notebook instance, custom container image for serverless Spark - pre-created as part of the Terraform deployent. We will reuse kernel created in the prior module. The following are the four exercises in the module.
Training and scoring data are available in GCS in the data bucket and the data is in CSV format.
We will read customer churn raw source data for model training, in GCS, cleanse/transform and persist to BigQuery for use in the model training step.
Lets ensure we have the Serverless Spark kernel created in the prior module attached to the pre-processing notebook. Follow the screenshots below-
Navigate to BigQuery, and run the following query-
SELECT * FROM `customer_churn_ds.training_data` LIMIT 1000
The following is the author's results-
For each notebook, there is complementary code in a PySpark script that will be used for operationalizing the model training Vertex AI pipeline.
Now that we have preprocessed data, lets create a model model.
- Close the preprocessing notebook
- Shutdown kernel, leave spark interactive session active.
- Open the model training notebook
- Review the code, run all cells as showin section 4.3 above
This JSON is persisted so it can be visualized in Vertex AI pipeline. For queryability, we also persist to BigQuery.
The following tables are created and written to in append mode.
Notice the columns for the tables. There is a pipeline_id column and a pipeline_execution_dt for traceability/lineage tracking.
Run the below query in BigQuery. Be sure to add pipeline_id to the where clause if you are running the experiments multiple times.
SELECT * FROM `customer_churn_ds.model_feature_importance_scores`
WHERE operation='training'
The following is the author's output-
Run the below query in BigQuery. Be sure to add pipeline_id to the where clause if you are running the experiments multiple times.
SELECT * FROM `customer_churn_ds.model_metrics`
WHERE operation='training'
The following is the author's output-
Run the below queries in BigQuery. Be sure to add pipeline_id to the where clause if you are running the experiments multiple times.
Just the predictions-
SELECT churn, prediction, *
FROM `customer_churn_ds.test_predictions`
WHERE operation='training'
Confusion matrix-
SELECT churn, prediction, count(*) as count
FROM `customer_churn_ds.test_predictions`
WHERE operation='training'
GROUP BY churn, prediction ORDER BY churn
The following is the author's output-
The confusion matrix-
This sub-module demonstrates hyperparameter tuning with Spark MLLib in an effort to improve model performance.
We need this to be able to pass multiple packages (BQ + MLEAP)... This is just to demonstrate session creation via CLI.
PROJECT_ID=`gcloud config list --format "value(core.project)" 2>/dev/null`
PROJECT_NBR=`gcloud projects describe $PROJECT_ID | grep projectNumber | cut -d':' -f2 | tr -d "'" | xargs`
SESSION_NAME="s8s-spark-session-$RANDOM-mleap-included"
REGION="us-central1" # REPLACE WITH YOUR REGION
HISTORY_SERVER_NAME="s8s-sphs-${PROJECT_NBR}"
SUBNET="spark-snet"
NOTEBOOK_BUCKET="gs://s8s_notebook_bucket-${PROJECT_NBR}"
CONTAINER_IMAGE_URI="gcr.io/$PROJECT_ID/customer_churn_image:1.0.0"
DATAPROC_RUNTIME_VERSION="1.1"
gcloud beta dataproc sessions create spark $SESSION_NAME \
--project=${PROJECT_ID} \
--location=${REGION} \
--property=spark.jars.packages="ml.combust.mleap:mleap-spark_2.12:0.20.0" \
--history-server-cluster=projects/$PROJECT_ID/regions/$REGION/clusters/$HISTORY_SERVER_NAME \
--container-image=${CONTAINER_IMAGE_URI} \
--subnet=$SUBNET \
--version $DATAPROC_RUNTIME_VERSION
Pick the serverless Spark interactive kernel created in previous step and attach to the hyperparameter tuning notebook and run the entire notebok. It takes ~30 minutes to complete.
Notice that Spark Mllib creates a bestModel directory and persists the tuned model there. We will use the model in the bestModel directory for batch scoring.
Again, this for the Vertex AI pipeline which we will cover in the module after the next.
Run the below query in BigQuery. Be sure to add pipeline_id to the where clause if you are running the experiments multiple times.
SELECT * FROM `customer_churn_ds.model_metrics`
WHERE operation='hyperparameter-tuning'
The following is the author's output-
Run the below queries in BigQuery. Be sure to add pipeline_id to the where clause if you are running the experiments multiple times.
SELECT churn, prediction, *
FROM `customer_churn_ds.test_predictions`
WHERE operation='hyperparameter-tuning'
SELECT churn, prediction, count(*) as count
FROM `customer_churn_ds.test_predictions`
WHERE operation='hyperparameter-tuning'
GROUP BY churn, prediction ORDER BY churn
The following is the author's output-
The ID generated in the variables section for the author is 29657. You can locate artifacts by identifying your PIPELINE_ID.
echo $PIPELINE_ID
This table has the latest model artifact specifics across storage systems.
The following is the table definition-
Run the below query in BigQuery to view assets specific to your execution-
SELECT *
FROM `customer_churn_ds.model_asset_tracker`
In this sub-module, we will use the best model from the hyperparameter tuning exercise and complete batch scoring. The source is in GCS. We will transform, run predictions and persist results to BigQuery.
Switch the serverless Spark interactive kernel to this notebook and run the entire notebok. It takes <5 minutes to complete.
You need to get the model version from the hyperparameter tuning step and replace the modelVersion assignment (modelVersion = YOUR_MODEL_VERSION_HERE - 3rd code cell, line 5). You can do so by running this query in BigQuery-
SELECT DISTINCT pipeline_id
FROM `customer_churn_ds.model_metrics`
WHERE operation='hyperparameter-tuning'
AND pipeline_execution_dt=(SELECT max(pipeline_execution_dt) FROM `customer_churn_ds.model_metrics`
WHERE operation='hyperparameter-tuning')
Switch the serverless Spark interactive kernel to this notebook and run the entire notebok. It takes <5 minutes to complete.
Run the below queries in BigQuery. Be sure to add pipeline_id to the where clause if you are running the experiments multiple times.
SELECT *
FROM `customer_churn_ds.batch_predictions`
The following is the author's output-
The author has created a pipeline ID and model version for tracking and the same attributes are added to all datasets, directories in GCS and wherever else applicable for traceability.
This concludes the lab module where you learned to author ML experiments on interactive Spark notebooks. Proceed to the next module where you will learn to execute equivalent Spark ML PySpark scripts via command line powered by Dataproc Serverless Spark batches.