-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
153 lines (117 loc) · 6.02 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import json
from google.cloud import aiplatform as vertex_ai
import functions_framework
import os, random
from os import path
from google.cloud import storage
from urllib.parse import urlparse, urljoin
def process_request(request):
"""Processes the incoming HTTP request.
Args:
request (flask.Request): HTTP request object.
Returns:
The response text or any set of values that can be turned into a Response
object using `make_response
<http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
"""
# decode http request payload and translate into JSON object
request_str = request.data.decode('utf-8')
request_json = json.loads(request_str)
# ........................................
# Capture and print environment variables
# ........................................
# a) Pipeline template file in GCS
VAI_PIPELINE_JSON_TEMPLATE_GCS_FILE_FQN = os.environ.get("VAI_PIPELINE_JSON_TEMPLATE_GCS_FILE_FQN")
print("VAI_PIPELINE_JSON_TEMPLATE_GCS_FILE_FQN is {}".format(VAI_PIPELINE_JSON_TEMPLATE_GCS_FILE_FQN))
# b) Pipeline execution directory in GCS
VAI_PIPELINE_JSON_EXEC_DIR_URI = os.environ.get("VAI_PIPELINE_JSON_EXEC_DIR_URI")
print("VAI_PIPELINE_JSON_EXEC_DIR_URI is {}".format(VAI_PIPELINE_JSON_EXEC_DIR_URI))
# c) Project ID
PROJECT_ID = os.environ.get("PROJECT_ID")
print("PROJECT_ID is {}".format(PROJECT_ID))
# d) GCP location
GCP_LOCATION = os.environ.get("GCP_LOCATION")
print("GCP_LOCATION is {}".format(GCP_LOCATION))
# e) VAI pipeline root for logs
VAI_PIPELINE_ROOT_LOG_DIR = os.environ.get("VAI_PIPELINE_ROOT_LOG_DIR")
print("VAI_PIPELINE_ROOT_LOG_DIR is {}".format(VAI_PIPELINE_ROOT_LOG_DIR))
# f) DATAPROC SERVERLESS RUNTIME VERSION
DATAPROC_RUNTIME_VERSION = os.environ.get("DATAPROC_RUNTIME_VERSION")
print("DATAPROC_RUNTIME_VERSION is {}".format(DATAPROC_RUNTIME_VERSION))
# ........................................
# Create local scratch directory in /tmp
# ........................................
LOCAL_SCRATCH_DIR = "/tmp/scratch"
if not os.path.exists(LOCAL_SCRATCH_DIR):
os.makedirs(LOCAL_SCRATCH_DIR)
# ........................................
# Variables
# ........................................
# a) Generate custom job ID for Vertex AI pipeline run
vaiPipelineExecutionInstanceID = random.randint(1, 10000)
print("VAI_PIPELINE_EXECUTION_INSTANCE_ID is {}".format(vaiPipelineExecutionInstanceID))
# b) Customized pipeline JSON filename
pipelineFileName = "pipeline_{}.json".format(vaiPipelineExecutionInstanceID)
print("PIPELINE_FILE_NM is {}".format(pipelineFileName))
# c) Local path to customized pipeline JSON
localCustomPipelineJsonFileFQN = LOCAL_SCRATCH_DIR + "/" + pipelineFileName
print("VAI_PIPELINE_JSON_TO_EXECUTE is locally at {}".format(localCustomPipelineJsonFileFQN))
# d) Local (download) path for template pipeline JSON
localTemplatePipelineJsonFileFQN = LOCAL_SCRATCH_DIR + "/customer_churn_template.json"
# e) GCS URI for customized pipeline JSON
PIPELINE_JSON_GCS_URI = VAI_PIPELINE_JSON_EXEC_DIR_URI + "/executions/{}".format(pipelineFileName)
# ........................................
# Create custom VAI pipeline JSON
# ........................................
# a) Download the template VAI pipeline JSON
downloadVaiPipelineTemplateInGCS(VAI_PIPELINE_JSON_TEMPLATE_GCS_FILE_FQN,localTemplatePipelineJsonFileFQN)
# b) Create custom VAI pipeline JSON
createCustomVaiPipelineJson(vaiPipelineExecutionInstanceID,localTemplatePipelineJsonFileFQN,localCustomPipelineJsonFileFQN,DATAPROC_RUNTIME_VERSION)
# c) Push custom VAI pipeline JSON to GCS execution directory
pushLocalFileToGCS(urlparse(VAI_PIPELINE_JSON_EXEC_DIR_URI).netloc, localCustomPipelineJsonFileFQN, "executions/{}".format(pipelineFileName))
# ........................................
# Vertex AI Pipeline execution
# ........................................
vertex_ai.init(
project=PROJECT_ID,
location=GCP_LOCATION,
staging_bucket=VAI_PIPELINE_ROOT_LOG_DIR
)
job = vertex_ai.PipelineJob(
display_name='customer-churn-prediction-pipeline',
template_path=PIPELINE_JSON_GCS_URI,
pipeline_root=VAI_PIPELINE_ROOT_LOG_DIR,
enable_caching=False
)
job.submit()
return "Job submitted"
#}} End of entry point
def downloadVaiPipelineTemplateInGCS(gcsFQVaiPipelineTemplateJsonFileUri, fileToDownloadToLocally):
#{{
googleCloudStorageClient = storage.Client()
with open(fileToDownloadToLocally, 'wb') as fileObject:
googleCloudStorageClient.download_blob_to_file(
gcsFQVaiPipelineTemplateJsonFileUri, fileObject)
print("Downloaded template to {}".format(fileToDownloadToLocally))
#}}
def createCustomVaiPipelineJson(pipelineID, templatePipelineJsonLocalFile, customPipelineJsonLocalFile, dataprocRuntimeVersion):
#{{
searchTextPipelineID = "YOUR_USER_DEFINED_EXECUTION_ID"
replaceTextPipelineID = str(pipelineID)
searchTextDataprocRuntimeVersion = "YOUR_DATAPROC_RUNTIME_VERSION"
replaceTextDataprocRuntimeVersion = str(dataprocRuntimeVersion)
with open(templatePipelineJsonLocalFile, 'r') as templateFileHandle:
templateContent = templateFileHandle.read()
customContent = templateContent.replace(searchTextPipelineID, replaceTextPipelineID)
customContent = customContent.replace(searchTextDataprocRuntimeVersion, replaceTextDataprocRuntimeVersion)
with open(customPipelineJsonLocalFile, 'w') as customFileHandle:
customFileHandle.write(customContent)
print("Created customPipelineJsonLocalFile at {}".format(customPipelineJsonLocalFile))
#}}
def pushLocalFileToGCS(executionPipelineGCSDirUri, customPipelineJsonLocalFilePath, customPipelineFileName):
#{{
googleCloudStorageClient = storage.Client()
googleCloudStorageBucket = googleCloudStorageClient.bucket(executionPipelineGCSDirUri)
blob = googleCloudStorageBucket.blob(customPipelineFileName)
blob.upload_from_filename(customPipelineJsonLocalFilePath)
#}}