Skip to content

Commit 8c85404

Browse files
committed
Update job service, repo, model structure
1 parent 458f7dd commit 8c85404

File tree

7 files changed

+81
-31
lines changed

7 files changed

+81
-31
lines changed

src/model/job_model.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from marshmallow import Schema, fields
2+
3+
4+
class BatchJob(Schema):
5+
_id = fields.Str(required=True)
6+
description = fields.Str(required=True)
7+
finished = fields.Bool(required=True)
8+
stats = fields.Raw(required=True)
9+
jobs = fields.Raw(required=True)
10+
createdTime = fields.Raw()
11+
updatedTime = fields.Raw()
12+
13+
class Meta:
14+
strict = True
15+
dateformat = "iso"

src/repository/job_repository.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from src.app import server
2+
from src.common import Logger
3+
from src.model.job_model import BatchJob
4+
from src.util import (
5+
get_initial_create_time_dict,
6+
get_update_time_dict,
7+
validate_and_dump_in_schema,
8+
)
9+
10+
log = Logger()
11+
12+
13+
def create_one(insert_args):
14+
insert_data = {**insert_args, **get_initial_create_time_dict()}
15+
log.info(f"job_repository : create_one : insert_id = {insert_data['_id']}")
16+
batch_job_schema = BatchJob()
17+
obj = validate_and_dump_in_schema(insert_data, batch_job_schema)
18+
return server.get_mongo_db().AtxDataBatchJobs.insert_one(obj)
19+
20+
21+
def find_one(batch_job_id):
22+
return server.get_mongo_db().AtxDataBatchJobs.find_one({"_id": batch_job_id})
23+
24+
25+
def update_one(batch_job_id, update_args):
26+
update_data = {**update_args, **get_update_time_dict()}
27+
log.info(f"job_repository : create_one : update_id = {update_data['_id']}")
28+
return server.get_mongo_db().AtxDataBatchJobs.update_one(
29+
{"_id": batch_job_id}, {"$set": update_data}
30+
)

src/routes/health/service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from src.app import server
77
from src.common import Logger
8-
from src.routes.job.service import create_batch_job
8+
from src.services.job_service import create_batch_job
99
from src.util.job import add_job_meta
1010

1111
log = Logger()

src/routes/job/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from src.common.logger import Logger
22
from src.core.namespace import Namespace
3-
from src.routes.job.service import (
3+
from src.services.job_service import (
44
view_or_update_batch_job,
55
clear_all_failed_jobs,
66
clean_queue,

src/routes/job/service.py renamed to src/services/job_service.py

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,35 @@
11
import calendar
2-
import hashlib
3-
import uuid
42
from datetime import datetime, timedelta
53

64
from rq import Worker
75
from rq.job import Job
86

9-
from src.common import Logger
107
from src.app import server
11-
from src.util import get_initial_create_time_dict, get_update_time_dict
8+
from src.common import Logger
9+
from src.repository import job_repository
1210
from src.util.job import (
1311
update_job_array_with_meta,
1412
add_job_meta,
1513
is_batch_job_finished,
1614
batch_job_stats,
1715
)
16+
from src.util.sequence import fetch_unique_uuid_md5_id
1817

1918
log = Logger()
2019

2120

2221
def create_batch_job(desc, job_array):
23-
rand_unique_id = str(datetime.utcnow()) + str(uuid.uuid4())
24-
seq_id = hashlib.md5(rand_unique_id.encode("utf-8")).hexdigest()
22+
seq_id = fetch_unique_uuid_md5_id()
2523
log.info(f"create_batch_job : creating new batch job for {desc} with id {seq_id}")
2624

2725
job_array_with_meta = update_job_array_with_meta(job_array)
28-
insert_resp = server.get_mongo_db().AtxDataBatchJobs.insert_one(
26+
insert_resp = job_repository.create_one(
2927
{
3028
"_id": seq_id,
3129
"description": desc,
3230
"finished": is_batch_job_finished(job_array_with_meta),
3331
"stats": batch_job_stats(job_array_with_meta),
3432
"jobs": job_array_with_meta,
35-
**get_initial_create_time_dict(),
3633
}
3734
)
3835
if insert_resp.acknowledged:
@@ -78,10 +75,6 @@ def clean_queue(queue_name):
7875
return {"success": True}
7976

8077

81-
def fetch_batch_job(job_id):
82-
return server.get_mongo_db().AtxDataBatchJobs.find_one({"_id": job_id})
83-
84-
8578
def get_all_queue_stats():
8679
all_stats = {}
8780
jq = server.get_job_queue()
@@ -128,7 +121,7 @@ def kill_all_zombie_workers():
128121

129122

130123
def view_or_update_batch_job(batch_job_id):
131-
batch_job = fetch_batch_job(batch_job_id)
124+
batch_job = job_repository.find_one(batch_job_id)
132125
if batch_job is None:
133126
return {
134127
"success": False,
@@ -152,23 +145,20 @@ def view_or_update_batch_job(batch_job_id):
152145
kwargs={"job_id": batch_job_id},
153146
)
154147

155-
update_resp = server.get_mongo_db().AtxDataBatchJobs.update_one(
156-
{"_id": batch_job_id},
148+
update_resp = job_repository.update_one(
149+
batch_job_id,
157150
{
158-
"$set": {
159-
"finished": job_finished,
160-
"stats": batch_job_stats(job_array_with_meta),
161-
"jobs": job_array_with_meta,
162-
**get_update_time_dict(),
163-
}
151+
"finished": job_finished,
152+
"stats": batch_job_stats(job_array_with_meta),
153+
"jobs": job_array_with_meta,
164154
},
165155
)
166156

167157
if update_resp.acknowledged:
168158
server.get_job_queue().enqueue_job(
169159
poll_batch_job, priority="low", args=(tuple([batch_job_id]))
170160
)
171-
updated_batch_job = fetch_batch_job(batch_job_id)
161+
updated_batch_job = job_repository.find_one(batch_job_id)
172162
return {"success": True, "body": updated_batch_job}
173163
else:
174164
log.error(
@@ -193,7 +183,7 @@ def post_batch_job(batch_job_id):
193183

194184

195185
def restart_batch_job(batch_job_id):
196-
batch_job = fetch_batch_job(batch_job_id)
186+
batch_job = job_repository.find_one(batch_job_id)
197187
redis_conn = server.get_redis().get_redis_conn()
198188
if batch_job is None:
199189
return {
@@ -207,9 +197,7 @@ def restart_batch_job(batch_job_id):
207197
log.info(f"For batch job {batch_job_id}, requeue job {job_meta['_id']}")
208198
job = Job.fetch(job_meta["_id"], redis_conn)
209199
job.requeue()
210-
server.get_mongo_db().AtxDataBatchJobs.update_one(
211-
{"_id": batch_job_id}, {"$set": {"finished": False, **get_update_time_dict()}}
212-
)
200+
job_repository.update_one(batch_job_id, {"finished": False})
213201
server.get_job_queue().enqueue_job(
214202
poll_batch_job, priority="low", args=(tuple([batch_job_id]))
215203
)

src/util/__init__.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,18 @@ def parse_request_using_schema(request, schema):
2626
func_name = inspect.stack()[1][3]
2727
log.info(f"Request received : {func_name}")
2828
request_body = request.get_json()
29-
validation_errors_dict = schema.validate(request_body)
30-
if len(validation_errors_dict) > 0:
31-
raise ValidationError(validation_errors_dict)
29+
validate_schema(request_body, schema)
3230
data = schema.dump(request_body)
3331
log.info(f"{func_name} : {data}")
3432
return data
33+
34+
35+
def validate_and_dump_in_schema(obj, schema):
36+
validate_schema(obj, schema)
37+
return schema.dump(obj)
38+
39+
40+
def validate_schema(obj, schema):
41+
validation_errors_dict = schema.validate(obj)
42+
if len(validation_errors_dict) > 0:
43+
raise ValidationError(validation_errors_dict)

src/util/sequence.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import hashlib
2+
import uuid
3+
from datetime import datetime
4+
5+
6+
def fetch_unique_uuid_md5_id():
7+
rand_unique_id = str(datetime.utcnow()) + str(uuid.uuid4())
8+
return hashlib.md5(rand_unique_id.encode("utf-8")).hexdigest()

0 commit comments

Comments
 (0)