Skip to content

Commit 9862b46

Browse files
committed
Change log to metadata in task function
1 parent af9c8d6 commit 9862b46

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

README.md

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ OUTLINE
3131
- [Installation](#installation)
3232
- [Usage](#usage)
3333
- [Options](#options)
34-
- [task.callback](#taskcallback)
35-
- [task.result_callback](#taskresult_callback)
34+
- [task.function](#taskfunction)
35+
- [task.callback.on_done](#taskcallbackon_done)
36+
- [task.callback.on_all_done](#taskcallbackon_all_done)
3637
- [Other Methods](#other-methods)
3738
- [get_results()](#get_results)
3839
- [get_logs()](#get_logs)
@@ -81,7 +82,7 @@ Use 20 theads concurrently to dispatch tasks for HTTP reqeusts
8182
import worker_dispatcher
8283
import requests
8384

84-
def each_task(id: int, config, task, log):
85+
def each_task(id: int, config, task, metadata):
8586
response = requests.get(config['my_endpoint'] + task)
8687
return response
8788

@@ -104,7 +105,7 @@ Utilizes all CPU cores on the machine to compute tasks.
104105
```python
105106
import worker_dispatcher
106107

107-
def each_task(id: int, config, task, log):
108+
def each_task(id: int, config, task, metadata):
108109
result = sum(id * i for i in range(10**9))
109110
return result
110111

@@ -209,15 +210,15 @@ results = worker_dispatcher.start({
209210
The main function to execute per task
210211

211212
```python
212-
task_function (id: int, config, task, log: dict) -> Any
213+
task_function (id: int, config, task, metadata: dict) -> Any
213214
```
214215

215216
|Argument |Type |Deafult |Description|
216217
|:-- |:-- |:-- |:-- |
217218
|id |int |(auto) |The sequence number generated by each task starting from 1|
218219
|config |multitype|{} |The custom variable to be passed to the callback function|
219220
|task |multitype|(custom) |Each value from the `task.list`|
220-
|log |dict |{} |The log from each task written by this callback function.|
221+
|metadata |dict |{} |A user-defined dictionary for custom metadata per task, saved in its log.|
221222

222223
> The return value can be `False` to indicate task failure in TPS logs.
223224
> Alternatively, it can be a `requests.Response`, indicating failure if the status code is not 200.
@@ -263,11 +264,12 @@ callback_on_all_done_function (id: int, config, result, log: dict) -> Any
263264
Get all logs in list type after completing `start()`
264265

265266
Each log is of type dict, containing the results of every task processed by the worker:
266-
- task_id
267-
- started_at
268-
- ended_at
269-
- duration
270-
- result
267+
- task_id *(Auto-increased number)*
268+
- started_at *(Unixtime)*
269+
- ended_at *(Unixtime)*
270+
- duration *(Seconds)*
271+
- result *(Boolean or user-defined)*
272+
- metadata *(can be set within each task function)*
271273

272274
- #### get_result_info()
273275
Get a dict with the whole spending time and started/ended timestamps after completing `start()`
@@ -314,7 +316,7 @@ Perform a stress test scenario with 10 requests per second.
314316
```python
315317
import worker_dispatcher
316318

317-
def each_task(id, config, task, log):
319+
def each_task(id, config, task, metadata):
318320
response = None
319321
try:
320322
response = requests.get(config['my_endpoint'], timeout=(5, 10))

src/worker_dispatcher/worker_dispatcher.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import queue as Queue
44

55
# Sample task function
6-
def task_function_sample(id: int, config=None, task=None, log: dict=None):
6+
def task_function_sample(id: int, config=None, task=None, metadata: dict=None):
77
if id == 1:
88
print("Runing sample task function, please customize yours according to the actual usage.")
99
result = {
@@ -536,8 +536,8 @@ def _consume_queue(queue, config, timeout_unixtime, frequency_interval_seconds)
536536
# Single Task function
537537
def _consume_task(data, config) -> dict:
538538
started_at = time.time()
539-
task_rewrite_data = {}
540-
return_value = config['task']['function'](config=config['task']['config'], id=data['id'], task=data['task'], log=task_rewrite_data)
539+
customized_meta = {}
540+
return_value = config['task']['function'](config=config['task']['config'], id=data['id'], task=data['task'], metadata=customized_meta)
541541
ended_at = time.time()
542542
duration = ended_at - started_at
543543
log = {
@@ -546,7 +546,7 @@ def _consume_task(data, config) -> dict:
546546
'ended_at': ended_at,
547547
'duration': duration,
548548
'result': return_value,
549-
'log': task_rewrite_data,
549+
'metadata': customized_meta,
550550
}
551551
# On_done hook (will update the result in the log)
552552
if callable(config['task']['callback']['on_done']):

0 commit comments

Comments
 (0)