Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 修复在其他文件中使用publish发布任务并指定task_id时获取task_id报错 #110

Closed
wants to merge 1 commit into from

Conversation

aniya105
Copy link

@aniya105 aniya105 commented Apr 2, 2024

在非消费函数的文件中使用publish方法发布任务时,使用的TaskIdLoggger由于无法通过get_current_taskid()获取到task_id报错

@ydf0509
Copy link
Owner

ydf0509 commented Apr 2, 2024

看下你的代码demo例子呢,还有你的funboost版本,

image
image

发一下你报错截图吧,应该不会报错的吧,都try了,如果没传也获取不到,就返回no_task_id.

@ydf0509
Copy link
Owner

ydf0509 commented Apr 2, 2024

还是要发下报错截图呢,因为你在 f'10秒内推送了 {self.count_per_minute} 条消息,累计推送了 {self.publish_msg_num_total} 条消息到 {self._queue_name} 队列中') 这里加taskid是不适合的,这个地方使用过统计,没有和某条任务消息关联.

@ydf0509
Copy link
Owner

ydf0509 commented Apr 2, 2024

image
这就是模拟你说的场景,在单独的另外文件里面publish的,不会报错的.no_task_id取代了.

@aniya105
Copy link
Author

aniya105 commented Apr 3, 2024

funboost版本: 43.0

生产者

async def restart_data_annotation_task(
    task_id: str = Body(),
    app_id: str = Body(),
    begin_time: str = Body(),
    end_time: str = Body(),
    wait_annotation_num: int = Body(),
    startup_mode: str = Body(),
):
    data_annotation_consumer.data_annotation.publish(
        {
            "app_id": app_id,
            "start_time": begin_time,
            "end_time": end_time,
            "wait_annotation_num": wait_annotation_num,
            "startup_mode": "full" if startup_mode == "0" else "increment",
        },
        task_id=task_id,
    )

消费者

@boost(
    BoosterParams(
        queue_name="queue_data_annotation",
        qps=10,
        broker_kind=BrokerEnum.REDIS_STREAM,
        concurrent_mode=ConcurrentModeEnum.ASYNC,
        is_using_distributed_frequency_control=True,
        user_custom_record_process_info_func=save_to_postgresql,
    )
)
async def data_annotation(
    app_id,
    start_time,
    end_time,
    wait_annotation_num,
    startup_mode="full",
):
    task_id = funboost_current_task().task_id

报错信息:

  File "C:\Users\ys\PycharmProjects\power-qa\backend\app\app\api\v1\endpoints\data_annotation.py", line 168, in restart_data_annotation_task
    data_annotation_consumer.data_annotation.publish(
    │                        │               └ <bound method Booster._safe_publish of <funboost.core.booster.Booster object at 0x0000027B7383AAD0>>
    │                        └ <funboost.core.booster.Booster object at 0x0000027B7383AAD0>
    └ <module 'app.core.funboost.data_annotation_consumer' from 'C:\\Users\\ys\\PycharmProjects\\power-qa\\backend\\app\\app\\core\...
  File "C:\soft\Anaconda3\envs\power-qa\lib\site-packages\funboost\core\booster.py", line 123, in _safe_publish
    return consumer.publisher_of_same_queue.publish(msg=msg, task_id=task_id, priority_control_config=priority_control_config)
           │        │                                   │            │                                └ None
           │        │                                   │            └ '018e7f2255bf7f489fc908f65ab8fed0'
           │        │                                   └ {'app_id': '018d10cf328e729f8d7455d26f0f2f57', 'start_time': '2023-12-01', 'end_time': '2024-03-29', 'wait_annotation_num': 7...
           │        └ <property object at 0x0000027B7506BAB0>
           └ <funboost.consumers.redis_stream_consumer.RedisStreamConsumer object at 0x0000027B7383A530>
  File "C:\soft\Anaconda3\envs\power-qa\lib\site-packages\funboost\publishers\base_publisher.py", line 222, in publish
    self.logger.info(
    │    │      └ <function Logger.info at 0x0000027B58116710>
    │    └ <TaskIdLogger funboost.RedisStreamPublisher--queue_data_annotation (DEBUG)>
    └ <funboost.publishers.redis_stream_publisher.RedisStreamPublisher object at 0x0000027B75D89E40>
  File "C:\soft\Anaconda3\envs\power-qa\lib\logging\__init__.py", line 1477, in info
    self._log(INFO, msg, args, **kwargs)
    │    │    │     │    │       └ {}
    │    │    │     │    └ ()
    │    │    │     └ '10秒内推送了 1 条消息,累计推送了 1 条消息到 queue_data_annotation 队列中'
    │    │    └ 20
    │    └ <function TaskIdLogger._log at 0x0000027B74A28280>
    └ <TaskIdLogger funboost.RedisStreamPublisher--queue_data_annotation (DEBUG)>
  File "C:\soft\Anaconda3\envs\power-qa\lib\site-packages\funboost\core\task_id_logger.py", line 11, in _log
    extra['task_id'] = get_current_taskid()
    │                  └ <function get_current_taskid at 0x0000027B74A280D0>
    └ {}
  File "C:\soft\Anaconda3\envs\power-qa\lib\site-packages\funboost\core\current_task.py", line 155, in get_current_taskid
    return fct.task_id  # 不在funboost的消费函数里面就获取不到上下文了
           │   └ <property object at 0x0000027B74A1CA40>
           └ <funboost.core.current_task.AsyncioCurrentTask object at 0x0000027B7B58AAD0>
  File "C:\soft\Anaconda3\envs\power-qa\lib\site-packages\funboost\core\current_task.py", line 136, in task_id
    return self.function_result_status.task_id
           │    └ <property object at 0x0000027B74A1CB80>
           └ <funboost.core.current_task.AsyncioCurrentTask object at 0x0000027B7B58AAD0>
  File "C:\soft\Anaconda3\envs\power-qa\lib\site-packages\funboost\core\current_task.py", line 128, in function_result_status
    return self._function_result_status.get()
           │    │                       └ <method 'get' of '_contextvars.ContextVar' objects>
           │    └ <ContextVar name='function_result_status' at 0x0000027B74A1C9A0>
           └ <funboost.core.current_task.AsyncioCurrentTask object at 0x0000027B7B58AAD0>
LookupError: <ContextVar name='function_result_status' at 0x0000027B74A1C9A0>

@ydf0509
Copy link
Owner

ydf0509 commented Apr 3, 2024

你把这个 funboost/core/current_task.py 的改成

def get_current_taskid():
    fct = funboost_current_task()
    # return fct.function_result_status.task_id
    try:
        return fct.task_id  # 不在funboost的消费函数里面就获取不到上下文了
    except (AttributeError,LookupError) as e:
        # print(e,type(e))
        return 'no_task_id'

就是这行 except (AttributeError,LookupError) as e: 增加 LookupError 试试.

image

@aniya105
Copy link
Author

aniya105 commented Apr 3, 2024

你把这个 funboost/core/current_task.py 的改成

def get_current_taskid():
    fct = funboost_current_task()
    # return fct.function_result_status.task_id
    try:
        return fct.task_id  # 不在funboost的消费函数里面就获取不到上下文了
    except (AttributeError,LookupError) as e:
        # print(e,type(e))
        return 'no_task_id'

就是这行 except (AttributeError,LookupError) as e: 增加 LookupError 试试.

image

这样可以,不报错了,感谢指点

@aniya105 aniya105 closed this Apr 3, 2024
@aniya105 aniya105 deleted the fix branch April 3, 2024 03:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants