Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task processing rate falls with number of unique tasks passed to wiji eventloop #73

Open
komuw opened this issue May 22, 2019 · 2 comments · May be fixed by #76
Open

task processing rate falls with number of unique tasks passed to wiji eventloop #73

komuw opened this issue May 22, 2019 · 2 comments · May be fixed by #76

Comments

@komuw
Copy link
Owner

komuw commented May 22, 2019

look at the benchmarks in;

wiji/tests/test_worker.py

Lines 534 to 847 in a315334

class TestWorkerBenchmark(TestCase):
"""
run tests as:
python -m unittest discover -v -s .
run one testcase as:
python -m unittest -v tests.test_worker.TestWorkerBenchmark.test_something
"""
def setUp(self):
self.BROKER = wiji.broker.InMemoryBroker()
def tearDown(self):
pass
def broker_path(self):
return self.BROKER.__module__ + "." + self.BROKER.__class__.__name__
@staticmethod
def _run(*coros):
loop = asyncio.get_event_loop()
async_tasks = asyncio.gather(*coros, loop=loop)
return loop.run_until_complete(async_tasks)
def test_one_task(self):
"""
processing rate for 1 task using an InMemoryBroker.
results: 7520 tasks/second
"""
class AdderTask(wiji.task.Task):
the_broker = self.BROKER
queue_name = "AdderTask"
loglevel = "NOTSET"
now = time.monotonic()
num_per_tasks = 0
async def run(self, a, b):
res = a + b
if self.num_per_tasks == 0:
self.now = time.monotonic()
self.num_per_tasks = self.num_per_tasks + 1
end = time.monotonic()
time_taken = end - self.now
process_rate = self.num_per_tasks / time_taken
print("\n\t process_rate:{0}".format(self.queue_name))
print(process_rate)
return res
t = AdderTask()
worker = wiji.Worker(the_task=t)
for i in range(0, 10_000):
t.synchronous_delay(a=21, b=i)
self._run(worker.consume_tasks())
def test_two_tasks(self):
"""
processing rate for 2 tasks using an InMemoryBroker.
results: 4237 tasks/second
"""
class TaskOne(wiji.task.Task):
the_broker = self.BROKER
queue_name = "TaskOne"
loglevel = "NOTSET"
now = time.monotonic()
num_per_tasks = 0
async def run(self, a, b):
res = a + b
if self.num_per_tasks == 0:
self.now = time.monotonic()
self.num_per_tasks = self.num_per_tasks + 1
end = time.monotonic()
time_taken = end - self.now
process_rate = self.num_per_tasks / time_taken
print("\n\t process_rate:{0}".format(self.queue_name))
print(process_rate)
return res
class TaskTwo(TaskOne):
queue_name = "TaskTwo"
t1 = TaskOne()
w1 = wiji.Worker(the_task=t1)
for i in range(0, 10_000):
t1.synchronous_delay(a=21, b=i)
t2 = TaskTwo()
w2 = wiji.Worker(the_task=t2)
for i in range(0, 10_000):
t2.synchronous_delay(a=21, b=i)
self._run(w1.consume_tasks(), w2.consume_tasks())
def test_three_tasks(self):
"""
processing rate for 3 tasks using an InMemoryBroker.
results: 2816 tasks/second
"""
class TaskOne(wiji.task.Task):
the_broker = self.BROKER
queue_name = "TaskOne"
loglevel = "NOTSET"
now = time.monotonic()
num_per_tasks = 0
async def run(self, a, b):
res = a + b
if self.num_per_tasks == 0:
self.now = time.monotonic()
self.num_per_tasks = self.num_per_tasks + 1
end = time.monotonic()
time_taken = end - self.now
process_rate = self.num_per_tasks / time_taken
print("\n\t process_rate:{0}".format(self.queue_name))
print(process_rate)
return res
class TaskTwo(TaskOne):
queue_name = "TaskTwo"
class TaskThree(TaskOne):
queue_name = "TaskThree"
t1 = TaskOne()
w1 = wiji.Worker(the_task=t1)
for i in range(0, 10_000):
t1.synchronous_delay(a=21, b=i)
t2 = TaskTwo()
w2 = wiji.Worker(the_task=t2)
for i in range(0, 10_000):
t2.synchronous_delay(a=21, b=i)
t3 = TaskThree()
w3 = wiji.Worker(the_task=t3)
for i in range(0, 10_000):
t3.synchronous_delay(a=21, b=i)
self._run(w1.consume_tasks(), w2.consume_tasks(), w3.consume_tasks())
def test_four_tasks(self):
"""
processing rate for 4 tasks using an InMemoryBroker.
results: 2157 tasks/second
"""
class TaskOne(wiji.task.Task):
the_broker = self.BROKER
queue_name = "TaskOne"
loglevel = "NOTSET"
now = time.monotonic()
num_per_tasks = 0
async def run(self, a, b):
res = a + b
if self.num_per_tasks == 0:
self.now = time.monotonic()
self.num_per_tasks = self.num_per_tasks + 1
end = time.monotonic()
time_taken = end - self.now
process_rate = self.num_per_tasks / time_taken
print("\n\t process_rate:{0}".format(self.queue_name))
print(process_rate)
return res
class TaskTwo(TaskOne):
queue_name = "TaskTwo"
class TaskThree(TaskOne):
queue_name = "TaskThree"
class TaskFour(TaskOne):
queue_name = "TaskFour"
t1 = TaskOne()
w1 = wiji.Worker(the_task=t1)
for i in range(0, 10_000):
t1.synchronous_delay(a=21, b=i)
t2 = TaskTwo()
w2 = wiji.Worker(the_task=t2)
for i in range(0, 10_000):
t2.synchronous_delay(a=21, b=i)
t3 = TaskThree()
w3 = wiji.Worker(the_task=t3)
for i in range(0, 10_000):
t3.synchronous_delay(a=21, b=i)
t4 = TaskFour()
w4 = wiji.Worker(the_task=t4)
for i in range(0, 10_000):
t4.synchronous_delay(a=21, b=i)
self._run(w1.consume_tasks(), w2.consume_tasks(), w3.consume_tasks(), w4.consume_tasks())
def test_eight_tasks(self):
"""
processing rate for 8 tasks using an InMemoryBroker.
results: 1120 tasks/second
"""
class TaskOne(wiji.task.Task):
the_broker = self.BROKER
queue_name = "TaskOne"
loglevel = "NOTSET"
now = time.monotonic()
num_per_tasks = 0
async def run(self, a, b):
res = a + b
if self.num_per_tasks == 0:
self.now = time.monotonic()
self.num_per_tasks = self.num_per_tasks + 1
end = time.monotonic()
time_taken = end - self.now
process_rate = self.num_per_tasks / time_taken
print("\n\t process_rate:{0}".format(self.queue_name))
print(process_rate)
return res
class TaskTwo(TaskOne):
queue_name = "TaskTwo"
class TaskThree(TaskOne):
queue_name = "TaskThree"
class TaskFour(TaskOne):
queue_name = "TaskFour"
class TaskFive(TaskOne):
queue_name = "TaskFive"
class TaskSix(TaskOne):
queue_name = "TaskSix"
class TaskSeven(TaskOne):
queue_name = "TaskSeven"
class TaskEight(TaskOne):
queue_name = "TaskEight"
t1 = TaskOne()
w1 = wiji.Worker(the_task=t1)
for i in range(0, 10_000):
t1.synchronous_delay(a=21, b=i)
t2 = TaskTwo()
w2 = wiji.Worker(the_task=t2)
for i in range(0, 10_000):
t2.synchronous_delay(a=21, b=i)
t3 = TaskThree()
w3 = wiji.Worker(the_task=t3)
for i in range(0, 10_000):
t3.synchronous_delay(a=21, b=i)
t4 = TaskFour()
w4 = wiji.Worker(the_task=t4)
for i in range(0, 10_000):
t4.synchronous_delay(a=21, b=i)
t5 = TaskFive()
w5 = wiji.Worker(the_task=t5)
for i in range(0, 10_000):
t5.synchronous_delay(a=21, b=i)
t6 = TaskSix()
w6 = wiji.Worker(the_task=t6)
for i in range(0, 10_000):
t6.synchronous_delay(a=21, b=i)
t7 = TaskSeven()
w7 = wiji.Worker(the_task=t7)
for i in range(0, 10_000):
t7.synchronous_delay(a=21, b=i)
t8 = TaskEight()
w8 = wiji.Worker(the_task=t8)
for i in range(0, 10_000):
t8.synchronous_delay(a=21, b=i)
self._run(
w1.consume_tasks(),
w2.consume_tasks(),
w3.consume_tasks(),
w4.consume_tasks(),
w5.consume_tasks(),
w6.consume_tasks(),
w7.consume_tasks(),
w8.consume_tasks(),

When we only have one unique task class in eventloop the processing rate using an InMemoryBroker is 7520 tasks/second

When we have two uniqe task classes the rate drops to; 4237 tasks/second

for three the rate is; 2816 tasks/second
for four; 2157 tasks/second
for eight; 1120 tasks/second

@komuw
Copy link
Owner Author

komuw commented May 22, 2019

Thus it appears that the throughput of each task class will drop, the more task classes we pack in an event loop.

Maybe we need to use multiple eventloops; one per task class.(maybe upto a maximum number of eventloops)
If we also go down this road; we also need to benchmark the added memory consumption per eventloop

@komuw komuw linked a pull request May 24, 2019 that will close this issue
@komuw
Copy link
Owner Author

komuw commented Jun 1, 2019

NB: logging is not multi process safe but it is thread safe

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant