Skip to content

Commit b594736

Browse files
authored
Add processor utils for easier definition of script processor (#8)
Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>
1 parent 994e38b commit b594736

File tree

2 files changed

+225
-122
lines changed

2 files changed

+225
-122
lines changed

examples/03_dataflow_with_vineyard/vineyard.ipynb

Lines changed: 108 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
},
7070
{
7171
"cell_type": "code",
72-
"execution_count": 1,
72+
"execution_count": null,
7373
"metadata": {},
7474
"outputs": [],
7575
"source": [
@@ -80,27 +80,9 @@
8080
},
8181
{
8282
"cell_type": "code",
83-
"execution_count": 2,
83+
"execution_count": null,
8484
"metadata": {},
85-
"outputs": [
86-
{
87-
"name": "stdout",
88-
"output_type": "stream",
89-
"text": [
90-
"DataFrame initialized.\n"
91-
]
92-
},
93-
{
94-
"data": {
95-
"text/plain": [
96-
"<oss2.models.PutObjectResult at 0x7f8e7b9b1de0>"
97-
]
98-
},
99-
"execution_count": 2,
100-
"metadata": {},
101-
"output_type": "execute_result"
102-
}
103-
],
85+
"outputs": [],
10486
"source": [
10587
"import numpy as np\n",
10688
"import pandas as pd\n",
@@ -164,7 +146,7 @@
164146
"# 请将您的 OSS accessKeyID 和 accessKeySecret 分别设置成环境变量 OSS_ACCESS_KEY_ID 和 OSS_ACCESS_KEY_SECRET\n",
165147
"auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())\n",
166148
"# 请将 OSS_ENDPOINT 和 BUCKET_NAME 替换为您的 OSS Endpoint 和 Bucket\n",
167-
"bucket = oss2.Bucket(auth, 'oss-cn-beijing.aliyuncs.com', 'fluid-demo')\n",
149+
"bucket = oss2.Bucket(auth, '<OSS_ENDPOINT>', '<OSS_BUCKET_NAME>')\n",
168150
"\n",
169151
"bytes_buffer = io.BytesIO()\n",
170152
"df.to_pickle(bytes_buffer)\n",
@@ -180,9 +162,33 @@
180162
},
181163
{
182164
"cell_type": "code",
183-
"execution_count": 3,
165+
"execution_count": 1,
184166
"metadata": {},
185167
"outputs": [],
168+
"source": [
169+
"# Setting fluidsdk logger level to DEBUG for detailed messages\n",
170+
"import logging\n",
171+
"import sys\n",
172+
"logger = logging.getLogger(\"fluidsdk\")\n",
173+
"stream_handler = logging.StreamHandler(sys.stdout)\n",
174+
"stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))\n",
175+
"logger.addHandler(stream_handler)\n",
176+
"logger.setLevel(logging.DEBUG)"
177+
]
178+
},
179+
{
180+
"cell_type": "code",
181+
"execution_count": 2,
182+
"metadata": {},
183+
"outputs": [
184+
{
185+
"name": "stdout",
186+
"output_type": "stream",
187+
"text": [
188+
"2024-03-08 12:08:26,145 - fluidsdk - DEBUG - Dataset \"default/vineyard\" created\n"
189+
]
190+
}
191+
],
186192
"source": [
187193
"import fluid\n",
188194
"\n",
@@ -230,39 +236,37 @@
230236
},
231237
{
232238
"cell_type": "code",
233-
"execution_count": 4,
239+
"execution_count": 3,
234240
"metadata": {},
235241
"outputs": [],
236242
"source": [
237243
"from kubernetes.client import models as k8s_models\n",
238244
"# 定义任务运行模版,并挂载OSS Volume\n",
239-
"def create_processor(script):\n",
240-
" return models.Processor(\n",
241-
" script=models.ScriptProcessor(\n",
242-
" command=[\"bash\"],\n",
243-
" source=script,\n",
244-
" image=\"python\",\n",
245-
" image_tag=\"3.10\",\n",
246-
" volumes=[k8s_models.V1Volume(\n",
245+
"def create_processor(process_func, packages_to_install, pip_index_url):\n",
246+
" extra_volumes = k8s_models.V1Volume(\n",
247247
" name=\"data\",\n",
248248
" persistent_volume_claim=k8s_models.V1PersistentVolumeClaimVolumeSource(\n",
249249
" claim_name=\"pvc-oss\"\n",
250250
" )\n",
251-
" )],\n",
252-
" volume_mounts=[k8s_models.V1VolumeMount(\n",
251+
" )\n",
252+
" extra_volume_mount = k8s_models.V1VolumeMount(\n",
253253
" name=\"data\",\n",
254254
" mount_path=\"/data\"\n",
255-
" )],\n",
256-
" ) \n",
257-
" )"
255+
" )\n",
256+
" \n",
257+
" from fluid.utils import processor as processor_utils\n",
258+
" debug_mode = True # Setting debug_mode to True for verbose\n",
259+
" processor = processor_utils.make_processor_from_func(process_func, packages_to_install=packages_to_install, pip_index_url=pip_index_url, volumes=[extra_volumes], volume_mounts=[extra_volume_mount], debug_mode=debug_mode)\n",
260+
"\n",
261+
" return processor"
258262
]
259263
},
260264
{
261265
"cell_type": "markdown",
262266
"metadata": {},
263267
"source": [
264268
"在上述代码片段中:\n",
265-
"- **创建任务模版:** 代码中封装了一个名为`create_processor`的任务模板函数,该函数接收一个bash脚本并把它传入作为某个容器的启动命令。该容器中定义了Python 3.10的运行环境,并在`/data`目录下挂载了OSS存储数据源"
269+
"- **创建任务模版:** 代码中封装了一个名为`create_processor`的任务模板函数,该函数接收一个Python函数对象,并自动解析Python函数对象中的代码内容,最后将代码传入作为某个容器的启动命令。`create_processor`函数还可以设置运行该函数所需的Python版本(默认为3.10版本,参考`processor_utils.make_processor_from_func`的方法签名)和PyPI依赖。容器还将在`/data`目录下挂载OSS存储数据源"
266270
]
267271
},
268272
{
@@ -274,118 +278,90 @@
274278
},
275279
{
276280
"cell_type": "code",
277-
"execution_count": 5,
281+
"execution_count": 4,
278282
"metadata": {},
279283
"outputs": [],
280284
"source": [
281285
"# 定义数据预处理脚本\n",
282-
"preprocess_data_script = \"\"\"\n",
283-
"# pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple\n",
284-
"pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2\n",
285-
"#!/bin/bash\n",
286-
"set -ex\n",
287-
"\n",
288-
"cat <<EOF > ./preprocess.py\n",
289-
"from sklearn.model_selection import train_test_split\n",
290-
"\n",
291-
"import pandas as pd\n",
292-
"import vineyard\n",
293-
"\n",
294-
"df = pd.read_pickle('/data/df.pkl')\n",
286+
"def preprocess():\n",
287+
" from sklearn.model_selection import train_test_split\n",
295288
"\n",
296-
"# Preprocess Data\n",
297-
"df = df.drop(df[(df['GrLivArea']>4800)].index)\n",
298-
"X = df.drop('SalePrice', axis=1) # Features\n",
299-
"y = df['SalePrice'] # Target variable\n",
289+
" import pandas as pd\n",
290+
" import vineyard\n",
291+
" \n",
292+
" df = pd.read_pickle('/data/df.pkl')\n",
293+
" \n",
294+
" # Preprocess Data\n",
295+
" df = df.drop(df[(df['GrLivArea']>4800)].index)\n",
296+
" X = df.drop('SalePrice', axis=1) # Features\n",
297+
" y = df['SalePrice'] # Target variable\n",
298+
" \n",
299+
" del df\n",
300+
" \n",
301+
" X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)\n",
302+
" \n",
303+
" del X, y\n",
304+
" \n",
305+
" vineyard.put(X_train, name=\"x_train\", persist=True)\n",
306+
" vineyard.put(X_test, name=\"x_test\", persist=True)\n",
307+
" vineyard.put(y_train, name=\"y_train\", persist=True)\n",
308+
" vineyard.put(y_test, name=\"y_test\", persist=True)\n",
300309
"\n",
301-
"del df\n",
302310
"\n",
303-
"X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)\n",
311+
"def train():\n",
312+
" from sklearn.linear_model import LinearRegression\n",
304313
"\n",
305-
"del X, y\n",
314+
" import joblib\n",
315+
" import pandas as pd\n",
316+
" import vineyard\n",
306317
"\n",
307-
"vineyard.put(X_train, name=\"x_train\", persist=True)\n",
308-
"vineyard.put(X_test, name=\"x_test\", persist=True)\n",
309-
"vineyard.put(y_train, name=\"y_train\", persist=True)\n",
310-
"vineyard.put(y_test, name=\"y_test\", persist=True)\n",
318+
" x_train_data = vineyard.get(name=\"x_train\", fetch=True)\n",
319+
" y_train_data = vineyard.get(name=\"y_train\", fetch=True)\n",
311320
"\n",
312-
"EOF\n",
321+
" model = LinearRegression()\n",
322+
" model.fit(x_train_data, y_train_data)\n",
313323
"\n",
314-
"python3 ./preprocess.py\n",
315-
"\"\"\"\n",
324+
" joblib.dump(model, '/data/model.pkl')\n",
316325
"\n",
317-
"# 定义模型训练脚本\n",
318-
"train_data_script = \"\"\"\n",
319-
"# pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple\n",
320-
"pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2\n",
321-
"#!/bin/bash\n",
322-
"set -ex\n",
323326
"\n",
324-
"cat <<EOF > ./train.py\n",
325-
"from sklearn.linear_model import LinearRegression\n",
327+
"def test():\n",
328+
" from sklearn.linear_model import LinearRegression\n",
329+
" from sklearn.metrics import mean_squared_error\n",
326330
"\n",
327-
"import joblib\n",
328-
"import pandas as pd\n",
329-
"import vineyard\n",
330-
"\n",
331-
"x_train_data = vineyard.get(name=\"x_train\", fetch=True)\n",
332-
"y_train_data = vineyard.get(name=\"y_train\", fetch=True)\n",
333-
"\n",
334-
"model = LinearRegression()\n",
335-
"model.fit(x_train_data, y_train_data)\n",
336-
"\n",
337-
"joblib.dump(model, '/data/model.pkl')\n",
331+
" import vineyard\n",
332+
" import joblib\n",
333+
" import pandas as pd\n",
338334
"\n",
339-
"EOF\n",
340-
"python3 ./train.py\n",
341-
"\"\"\"\n",
342-
"\n",
343-
"# 定义模型测试脚本\n",
344-
"test_data_script = \"\"\"\n",
345-
"# pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple\n",
346-
"pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2\n",
347-
"#!/bin/bash\n",
348-
"set -ex\n",
349-
"\n",
350-
"cat <<EOF > ./test.py\n",
351-
"from sklearn.linear_model import LinearRegression\n",
352-
"from sklearn.metrics import mean_squared_error\n",
353-
"\n",
354-
"import vineyard\n",
355-
"import joblib\n",
356-
"import pandas as pd\n",
335+
" x_test_data = vineyard.get(name=\"x_test\", fetch=True)\n",
336+
" y_test_data = vineyard.get(name=\"y_test\", fetch=True)\n",
357337
"\n",
358-
"x_test_data = vineyard.get(name=\"x_test\", fetch=True)\n",
359-
"y_test_data = vineyard.get(name=\"y_test\", fetch=True)\n",
338+
" model = joblib.load(\"/data/model.pkl\")\n",
339+
" y_pred = model.predict(x_test_data)\n",
360340
"\n",
361-
"model = joblib.load(\"/data/model.pkl\")\n",
362-
"y_pred = model.predict(x_test_data)\n",
341+
" err = mean_squared_error(y_test_data, y_pred)\n",
363342
"\n",
364-
"err = mean_squared_error(y_test_data, y_pred)\n",
343+
" with open('/data/output.txt', 'a') as f:\n",
344+
" f.write(str(err))\n",
365345
"\n",
366-
"with open('/data/output.txt', 'a') as f:\n",
367-
" f.write(str(err))\n",
368346
"\n",
369-
"EOF\n",
347+
"packages_to_install = [\"numpy\", \"pandas\", \"pyarrow\", \"requests\", \"vineyard\", \"scikit-learn==1.4.0\", \"joblib==1.3.2\"]\n",
348+
"pip_index_url = \"https://pypi.tuna.tsinghua.edu.cn/simple\"\n",
370349
"\n",
371-
"python3 ./test.py\n",
372-
"\"\"\"\n",
373-
"\n",
374-
"preprocess_processor = create_processor(preprocess_data_script)\n",
375-
"train_processor = create_processor(train_data_script)\n",
376-
"test_processor = create_processor(test_data_script)"
350+
"preprocess_processor = create_processor(preprocess, packages_to_install, pip_index_url)\n",
351+
"train_processor = create_processor(train, packages_to_install, pip_index_url)\n",
352+
"test_processor = create_processor(test, packages_to_install, pip_index_url)"
377353
]
378354
},
379355
{
380356
"cell_type": "markdown",
381357
"metadata": {},
382358
"source": [
383-
"上述代码片段分别定义了数据处理流水线中的三个步骤:数据预处理、模型训练和模型测试。这三个步骤对应的Bash脚本传入`create_processor`函数以被封装为三个processor。"
359+
"上述代码片段分别定义了数据处理流水线中的三个步骤:数据预处理、模型训练和模型测试。这三个步骤对应的Python函数传入`create_processor`函数以被封装为三个processor。"
384360
]
385361
},
386362
{
387363
"cell_type": "code",
388-
"execution_count": 6,
364+
"execution_count": 5,
389365
"metadata": {},
390366
"outputs": [],
391367
"source": [
@@ -398,9 +374,19 @@
398374
},
399375
{
400376
"cell_type": "code",
401-
"execution_count": 7,
377+
"execution_count": 6,
402378
"metadata": {},
403-
"outputs": [],
379+
"outputs": [
380+
{
381+
"name": "stdout",
382+
"output_type": "stream",
383+
"text": [
384+
"2024-03-08 12:13:09,983 - fluidsdk - INFO - DataProcess linear-regression-with-vineyard-step1 completed\n",
385+
"2024-03-08 12:15:26,417 - fluidsdk - INFO - DataProcess linear-regression-with-vineyard-step2 completed\n",
386+
"2024-03-08 12:17:39,682 - fluidsdk - INFO - DataProcess linear-regression-with-vineyard-step3 completed\n"
387+
]
388+
}
389+
],
404390
"source": [
405391
"# 将线性回归模型的数据处理任务工作流提交,并等待其运行完成\n",
406392
"run = flow.run(run_id=\"linear-regression-with-vineyard\")\n",
@@ -416,7 +402,7 @@
416402
},
417403
{
418404
"cell_type": "code",
419-
"execution_count": 8,
405+
"execution_count": 7,
420406
"metadata": {},
421407
"outputs": [],
422408
"source": [

0 commit comments

Comments
 (0)