-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.py
77 lines (55 loc) · 2.16 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from datetime import datetime, timezone
from types import SimpleNamespace
from worker import fetch_price
from apscheduler.schedulers.base import STATE_RUNNING
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.job import Job
# Suppress timezone warnings
import warnings
# Ignore dateparser warnings regarding pytz
warnings.filterwarnings(
"ignore",
message="The localize method is no longer necessary, as this time zone supports the fold attribute",
)
scheduler = BackgroundScheduler()
def init_jobs(config: SimpleNamespace):
global scheduler
if scheduler.state == STATE_RUNNING:
scheduler.shutdown(wait=True)
scheduler.remove_all_jobs()
scheduler = BackgroundScheduler()
for i, r in enumerate(config.rules):
# Hours are required
if not hasattr(r, 'hours'):
print(f"Rule Error: Rule {r.name} is missing property 'hours'.")
continue
# Mins are optional
mins = '0'
if hasattr(r, 'mins'):
mins = r.mins
# Set cron
t = CronTrigger.from_crontab(f'{mins} {r.hours} * * *')
if hasattr(config, 'scheduler'):
if hasattr(config.scheduler, 'spreadtime'):
if isinstance(config.scheduler.spreadtime, int):
t.jitter = config.scheduler.spreadtime
# Schedule
scheduler.add_job(fetch_price, t, name=r.name, kwargs={'rule': r, 'config': config, 'idx': i})
scheduler.start()
def get_jobs() -> list[dict]:
jobs: list[Job] = scheduler.get_jobs()
return ({
'name': j.name,
'next': j.next_run_time,
'delta': (j.next_run_time.replace(tzinfo=timezone.utc) - datetime.now().replace(tzinfo=timezone.utc))
} for j in jobs)
def run(name: str):
jobs: list[Job] = scheduler.get_jobs()
for j in jobs:
if (j.name == name):
j.modify(next_run_time=datetime.now())
def run_all():
jobs: list[Job] = scheduler.get_jobs()
for j in jobs:
j.modify(next_run_time=datetime.now())