Skip to content

Commit

Permalink
Merge pull request #7 from komuw/task-options
Browse files Browse the repository at this point in the history
Task options
  • Loading branch information
komuw authored Feb 28, 2019
2 parents ab0d873 + 05fbc94 commit 65b7a8c
Show file tree
Hide file tree
Showing 9 changed files with 393 additions and 314 deletions.
104 changes: 23 additions & 81 deletions cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,7 @@ async def async_run(self, *args, **kwargs):

subprocess.run(["dd", "if=/dev/zero", "of=/dev/null", "bs=500000", "count=1000000"])

task = BlockingDiskIOTask(
the_broker=the_broker,
queue_name="BlockingDiskIOTask",
eta=60.0,
retries=3,
log_id="BlockingDiskIOTask_LogID",
hook_metadata='{"email": "BlockingDiskIOTask"}',
)
task = BlockingDiskIOTask(the_broker=the_broker, queue_name="BlockingDiskIOTask")
return task


Expand All @@ -100,14 +93,7 @@ async def async_run(self, *args, **kwargs):
resp = requests.get(url)
print("resp: ", resp)

task = MyTask(
the_broker=the_broker,
queue_name="HttpQueue",
eta=60.0,
retries=3,
log_id="myLogID",
hook_metadata='{"email": "example@example.com"}',
)
task = MyTask(the_broker=the_broker, queue_name="BlockingHttp_Queue")
return task


Expand All @@ -123,14 +109,7 @@ async def async_run(self, *args, **kwargs):
res_text = await resp.text()
print(res_text[:50])

task = MyTask(
the_broker=the_broker,
queue_name="HttpQueue",
eta=60.0,
retries=3,
log_id="myLogID",
hook_metadata='{"email": "example@example.com"}',
)
task = MyTask(the_broker=the_broker, queue_name="AsyncHttpQueue")
return task


Expand All @@ -149,14 +128,7 @@ async def async_run(self, *args, **kwargs):
h.hexdigest()
await asyncio.sleep(0.4)

task = MyTask(
the_broker=the_broker,
queue_name="PrintQueue",
eta=60.0,
retries=3,
log_id="myLogID",
hook_metadata='{"email": "example@example.com"}',
)
task = MyTask(the_broker=the_broker, queue_name="PrintQueue")
return task


Expand All @@ -172,15 +144,7 @@ async def async_run(self, a, b):
await asyncio.sleep(2)
return res

task = AdderTask(
the_broker=the_broker,
queue_name="AdderTaskQueue",
eta=60.8,
retries=3,
log_id="adder_task_myLogID",
hook_metadata='{"email": "adder_task"}',
chain=chain,
)
task = AdderTask(the_broker=the_broker, queue_name="AdderTaskQueue", chain=chain)
return task


Expand All @@ -194,15 +158,7 @@ async def async_run(self, a):
print()
return res

task = DividerTask(
the_broker=the_broker,
queue_name="DividerTaskQueue",
eta=60.9,
retries=3,
log_id="divider_task_myLogID",
hook_metadata='{"email": "divider_task"}',
chain=chain,
)
task = DividerTask(the_broker=the_broker, queue_name="DividerTaskQueue", chain=chain)
return task


Expand All @@ -216,15 +172,7 @@ async def async_run(self, bbb, a=5.5):
print()
return res

task = MultiplierTask(
the_broker=the_broker,
queue_name="MultiplierTaskQueue",
eta=60.7,
retries=3,
log_id="multiplier_task_myLogID",
hook_metadata='{"email": "multiplier_task"}',
chain=chain,
)
task = MultiplierTask(the_broker=the_broker, queue_name="MultiplierTaskQueue", chain=chain)
return task


Expand All @@ -240,15 +188,7 @@ async def async_run(self):
await asyncio.sleep(0.5)
raise ValueError("\n Houston We got 99 problems. \n")

task = ExceptionTask(
the_broker=the_broker,
queue_name="ExceptionTaskQueue",
eta=60.1,
retries=3,
log_id="exception_task_myLogID",
hook_metadata='{"email": "exception_task"}',
chain=chain,
)
task = ExceptionTask(the_broker=the_broker, queue_name="ExceptionTaskQueue", chain=chain)
return task


Expand All @@ -268,7 +208,8 @@ async def async_run(self):
divider = divider_task(the_broker=MY_BROKER, chain=multiplier)

adder = adder_task(the_broker=MY_BROKER, chain=divider)
adder.blocking_delay(3, 7)

adder.blocking_delay(3, 7, task_options=wiji.task.TaskOptions(eta=76.87))
#############################################

# ALTERNATIVE way of chaining
Expand Down Expand Up @@ -298,26 +239,27 @@ async def async_run(self):
exception_task22,
BLOCKING_task,
]
workers = []

workers = [wiji.Worker(the_task=wiji.task.WatchDogTask, use_watchdog=True)]
producers = [produce_tasks_continously(task=wiji.task.WatchDogTask)]

for task in all_tasks:
_worker = wiji.Worker(the_task=task)
workers.append(_worker)

watchie_worker = wiji.Worker(the_task=wiji.task.WatchDogTask)
workers.append(watchie_worker)

consumers = []
for i in workers:
consumers.append(i.consume_forever())

producers = [
produce_tasks_continously(task=http_task1, url="https://httpbin.org/delay/45"),
produce_tasks_continously(task=print_task2, my_KWARGS={"name": "Jay-Z", "age": 4040}),
produce_tasks_continously(task=adder, a=23, b=67),
produce_tasks_continously(task=exception_task22),
produce_tasks_continously(task=BLOCKING_task, url="https://httpbin.org/delay/11"),
produce_tasks_continously(task=wiji.task.WatchDogTask),
]
producers.extend(
[
produce_tasks_continously(task=http_task1, url="https://httpbin.org/delay/45"),
produce_tasks_continously(task=print_task2, my_KWARGS={"name": "Jay-Z", "age": 4040}),
produce_tasks_continously(task=adder, a=23, b=67),
produce_tasks_continously(task=exception_task22),
produce_tasks_continously(task=BLOCKING_task, url="https://httpbin.org/delay/11"),
]
)

# 2.consume tasks
async def async_main():
Expand Down
97 changes: 26 additions & 71 deletions wiji/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
import typing
import json

from . import protocol

if typing.TYPE_CHECKING:
from . import task


class BaseBroker(abc.ABC):
"""
Expand All @@ -15,15 +20,16 @@ class BaseBroker(abc.ABC):
"""

@abc.abstractmethod
async def enqueue(self, item: str, queue_name: str) -> None:
async def enqueue(self, item: str, queue_name: str, task_options: "task.TaskOptions") -> None:
"""
enqueue/save an item.
Parameters:
item: The item to be enqueued/saved
queue_name: name of queue to enqueue in
task_options: options for the specific task been enqueued
"""
raise NotImplementedError("enqueue method must be implemented.")
raise NotImplementedError("`enqueue` method must be implemented.")

@abc.abstractmethod
async def dequeue(self, queue_name: str) -> str:
Expand All @@ -33,7 +39,7 @@ async def dequeue(self, queue_name: str) -> str:
Returns:
item that was dequeued
"""
raise NotImplementedError("dequeue method must be implemented.")
raise NotImplementedError("`dequeue` method must be implemented.")


class SimpleBroker(BaseBroker):
Expand All @@ -49,23 +55,9 @@ def __init__(self) -> None:
"""
"""
self.store: dict = {}
self._queue_watchdog_task()

WatchDogTask_Queue_name = "WatchDogTask_Queue"
WatchDogTask_Queue_init = {
"version": 1,
"task_id": "3c03f930-3098-44bd-a4e3-fee5162dd0e2",
"eta": "2019-02-24T17:37:06.534478",
"retries": 0,
"queue_name": WatchDogTask_Queue_name,
"log_id": "log_id",
"hook_metadata": "hook_metadata",
"timelimit": 1800,
"args": [],
"kwargs": {},
}
self.store[WatchDogTask_Queue_name] = [json.dumps(WatchDogTask_Queue_init)]

async def enqueue(self, item: str, queue_name: str) -> None:
async def enqueue(self, item: str, queue_name: str, task_options: "task.TaskOptions") -> None:
if self.store.get(queue_name):
self.store[queue_name].append(item)
await asyncio.sleep(delay=-1)
Expand All @@ -84,55 +76,18 @@ async def dequeue(self, queue_name: str) -> str:
else:
raise ValueError("queue with name: {0} does not exist.".format(queue_name))


class YoloBroker(BaseBroker):
"""
This is an in-memory implementation of BaseBroker.
Note: It should only be used for tests and demo purposes.
"""

def __init__(self, maxsize: int = 0) -> None:
"""
Parameters:
maxsize: the maximum number of items(not size) that can be put in the queue.
loop: an event loop
"""
self.queue: asyncio.queues.Queue = asyncio.Queue(maxsize=maxsize)
self.store: dict = {}
self.max_ttl: float = 20 * 60 # 20mins
self.start_timer = time.monotonic()

async def enqueue(self, item: str, queue_name: str) -> None:
if self.store.get(queue_name):
self.store[queue_name].append(item)
else:
self.store[queue_name] = [item]
self.queue.put_nowait(self.store)

# garbage collect
await self.delete_after_ttl()

async def dequeue(self, queue_name: str) -> str:
store = await self.queue.get()
if queue_name in store:
try:
# garbage collect
await self.delete_after_ttl()
return self.store[queue_name].pop(0)
except IndexError:
# queue is empty
await asyncio.sleep(1.5)
else:
raise ValueError("queue with name: {0} does not exist.".format(queue_name))

async def delete_after_ttl(self) -> None:
"""
iterate over all stored items and delete any that are
older than self.max_ttl seconds
"""
now = time.monotonic()
time_diff = now - self.start_timer
if time_diff > self.max_ttl:
for key in list(self.store.keys()):
self.store[key] = []
def _queue_watchdog_task(self):
# queue the first WatchDogTask
_watchDogTask_name = "WatchDogTask"
_proto = protocol.Protocol(
version=1,
task_id="{0}_id_1".format(_watchDogTask_name),
eta=0.00,
current_retries=0,
max_retries=0,
log_id="{0}_log_id".format(_watchDogTask_name),
hook_metadata="",
argsy=(),
kwargsy={},
)
self.store["{0}_Queue".format(_watchDogTask_name)] = [_proto.json()]
4 changes: 2 additions & 2 deletions wiji/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ class BaseHook(abc.ABC):
async def request(self, task_id: str, log_id: str, hook_metadata: str) -> None:
"""
"""
raise NotImplementedError("request method must be implemented.")
raise NotImplementedError("`request` method must be implemented.")

@abc.abstractmethod
async def response(self, task_id: str, log_id: str, hook_metadata: str) -> None:
"""
"""
raise NotImplementedError("response method must be implemented.")
raise NotImplementedError("`response` method must be implemented.")


class SimpleHook(BaseHook):
Expand Down
4 changes: 2 additions & 2 deletions wiji/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def bind(self, loglevel: str, log_metadata: dict) -> None:
loglevel: logging level eg DEBUG
log_metadata: log metadata that can be included in all log statements
"""
raise NotImplementedError("bind method must be implemented.")
raise NotImplementedError("`bind` method must be implemented.")

@abc.abstractmethod
def log(self, level: int, log_data: dict) -> None:
Expand All @@ -29,7 +29,7 @@ def log(self, level: int, log_data: dict) -> None:
level: logging level eg `logging.INFO`
log_data: the message to log
"""
raise NotImplementedError("log method must be implemented.")
raise NotImplementedError("`log` method must be implemented.")


class SimpleBaseLogger(BaseLogger):
Expand Down
Loading

0 comments on commit 65b7a8c

Please sign in to comment.