-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix: Use get instead of hasattr for task_result in BulkStateFetcher #52839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
jason810496
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix!
How about adding type annotation for all task_results_by_task_id? Then mypy can point out other similar error we missed.
Since there isn’t type annotation for meta_from_decoded
https://github.com/celery/celery/blob/main/celery/backends/base.py#L518
|
Thanks for your suggestion! I've added type annotation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Thanks for the update!
Sorry, I used my phone to review last time and I didn't look through how did we utilize BulkStateFetcher in celery executor well.
How about adding type annotation for all parameters in BulkStateFetcher's methods ?
IMO, key_and_async_results variable is a good entrypoint to get the corresponding type hint.
airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py
Lines 385 to 401 in 874657d
| def _send_tasks_to_celery(self, task_tuples_to_send: Sequence[TaskInstanceInCelery]): | |
| from airflow.providers.celery.executors.celery_executor_utils import send_task_to_executor | |
| if len(task_tuples_to_send) == 1 or self._sync_parallelism == 1: | |
| # One tuple, or max one process -> send it in the main thread. | |
| return list(map(send_task_to_executor, task_tuples_to_send)) | |
| # Use chunks instead of a work queue to reduce context switching | |
| # since tasks are roughly uniform in size | |
| chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send)) | |
| num_processes = min(len(task_tuples_to_send), self._sync_parallelism) | |
| with ProcessPoolExecutor(max_workers=num_processes) as send_pool: | |
| key_and_async_results = list( | |
| send_pool.map(send_task_to_executor, task_tuples_to_send, chunksize=chunksize) | |
| ) | |
| return key_and_async_results |
We can take return type of send_task_to_executor to propagate the type annotation.
airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
Lines 267 to 270 in 874657d
| def send_task_to_executor( | |
| task_tuple: TaskInstanceInCelery, | |
| ) -> tuple[TaskInstanceKey, CommandType, AsyncResult | ExceptionWithTraceback]: | |
| """Send task to executor.""" |
|
Sure! Adding type annotation for all params in methods of task_results_by_task_id: dict[str, dict[str, Any]] = {
task_result["task_id"]: task_result for task_result in task_results
}ensures @staticmethod
def _prepare_state_and_info_by_task_dict(
task_ids, task_results_by_task_id
task_ids, task_results_by_task_id: dict[str, dict[str, Any]]
) -> Mapping[str, EventBufferValueType]:And its type is checked again when it is used as a parameter. Besides, in current branch. https://github.com/apache/airflow/blob/main/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py#L322C4-L322C47 However, adding type annotation to it would cause error since in |
I think the
I prefer to add mock value to |
|
Thanks for your suggestion! I retained the type annotation only in parameter level and add mock values of |
49cb086 to
07e72cb
Compare
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
…pache#52839) * Fix: Use get instead of hasattr for task_result in BulkStateFetcher * add type annotation for task_results_by_task_id * add type annotation for params in methods of BulkStateFetcher * retain type annotation only in param level * add mock value for sync_parallelism in test
Why
The method
BulkStateFetcher._prepare_state_and_info_by_task_dictusedhasattr(task_result, "info")to check for the presence of the"info"field.However,
task_resultis a dict, sohasattralways returnsFalse, even if the"info"key exists. This caused the info value to always beNonein the returned state info mapping.How
This PR replaces the
hasattrcheck withtask_result.get("info"), ensuring that the"info"value is correctly returned when present in the dict, and returnNonewhen"info"field is not present.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.