Skip to content

Commit f42bb1b

Browse files
authored
Initial implementation for task runner in python (#1706)
1 parent 3c0a272 commit f42bb1b

File tree

3 files changed

+85
-7
lines changed

3 files changed

+85
-7
lines changed

pkg/workloads/cortex/serve/init/bootloader.sh

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,17 @@ if [ "$CORTEX_KIND" = "RealtimeAPI" ]; then
138138
# generate nginx conf
139139
/opt/conda/envs/env/bin/python -c 'from cortex.lib import util; import os; generated = util.render_jinja_template("/src/cortex/serve/nginx.conf.j2", os.environ); print(generated);' > /run/nginx.conf
140140

141-
# prepare batch otherwise
142-
else
141+
# create the python initialization service
142+
create_s6_service "py_init" "cd /mnt/project && /opt/conda/envs/env/bin/python /src/cortex/serve/init/script.py"
143+
elif [ "$CORTEX_KIND" = "BatchAPI" ]; then
143144
create_s6_service "batch" "cd /mnt/project && $source_env_file_cmd && exec env PYTHONUNBUFFERED=TRUE env PYTHONPATH=$PYTHONPATH:$CORTEX_PYTHON_PATH /opt/conda/envs/env/bin/python /src/cortex/serve/start/batch.py"
145+
146+
# create the python initialization service
147+
create_s6_service "py_init" "cd /mnt/project && /opt/conda/envs/env/bin/python /src/cortex/serve/init/script.py"
148+
elif [ "$CORTEX_KIND" = "TaskAPI" ]; then
149+
create_s6_service "task" "cd /mnt/project && $source_env_file_cmd && exec env PYTHONUNBUFFERED=TRUE env PYTHONPATH=$PYTHONPATH:$CORTEX_PYTHON_PATH /opt/conda/envs/env/bin/python /src/cortex/serve/start/task.py"
144150
fi
145151

146-
# create the python initialization service
147-
create_s6_service "py_init" "cd /mnt/project && /opt/conda/envs/env/bin/python /src/cortex/serve/init/script.py"
152+
153+
154+

pkg/workloads/cortex/serve/start/batch.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@
4141
"job_spec": None,
4242
"provider": None,
4343
"predictor_impl": None,
44-
"predict_route": None,
45-
"client": None,
46-
"class_set": set(),
4744
"sqs_client": None,
4845
}
4946

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Copyright 2020 Cortex Labs, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import sys
16+
import os
17+
import argparse
18+
import inspect
19+
import time
20+
import json
21+
import threading
22+
import math
23+
import pathlib
24+
25+
import boto3
26+
import botocore
27+
28+
from cortex import consts
29+
from cortex.lib import util
30+
from cortex.lib.api import API, get_spec, get_api
31+
from cortex.lib.log import cx_logger as logger
32+
from cortex.lib.concurrency import LockedFile
33+
from cortex.lib.storage import S3, LocalStorage
34+
from cortex.lib.exceptions import UserRuntimeException
35+
36+
37+
local_cache = {
38+
"api_spec": None,
39+
"task_spec": None,
40+
}
41+
42+
43+
def get_task_spec(storage, cache_dir, job_spec_path):
44+
local_spec_path = os.path.join(cache_dir, "task_spec.json")
45+
_, key = S3.deconstruct_s3_path(job_spec_path)
46+
storage.download_file(key, local_spec_path)
47+
with open(local_spec_path) as f:
48+
return json.load(f)
49+
50+
51+
def start():
52+
cache_dir = os.environ["CORTEX_CACHE_DIR"]
53+
provider = os.environ["CORTEX_PROVIDER"]
54+
api_spec_path = os.environ["CORTEX_API_SPEC"]
55+
task_spec_path = os.environ["CORTEX_TASK_SPEC"]
56+
project_dir = os.environ["CORTEX_PROJECT_DIR"]
57+
58+
region = os.getenv("AWS_REGION")
59+
60+
storage, api_spec = get_spec(provider, api_spec_path, cache_dir, region)
61+
task_spec = get_task_spec(storage, cache_dir, task_spec_path)
62+
63+
logger().info("loading the task definition from {}".format(api_spec["definition"]["path"]))
64+
65+
# TODO validate the task definition and execute the task
66+
67+
local_cache["api_spec"] = api_spec
68+
local_cache["provider"] = provider
69+
local_cache["task_spec"] = task_spec
70+
print(task_spec)
71+
72+
73+
if __name__ == "__main__":
74+
start()

0 commit comments

Comments
 (0)