Skip to content

Conversation

@Jasperora
Copy link
Contributor

Why

The method BulkStateFetcher._prepare_state_and_info_by_task_dict used hasattr(task_result, "info") to check for the presence of the "info" field.
However, task_result is a dict, so hasattr always returns False, even if the "info" key exists. This caused the info value to always be None in the returned state info mapping.

How

This PR replaces the hasattr check with task_result.get("info"), ensuring that the "info" value is correctly returned when present in the dict, and return None when "info" field is not present.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix!

How about adding type annotation for all task_results_by_task_id? Then mypy can point out other similar error we missed.

Starting from https://github.com/apache/airflow/blob/main/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py#L345

Since there isn’t type annotation for meta_from_decoded

https://github.com/celery/celery/blob/main/celery/backends/base.py#L518

@Jasperora
Copy link
Contributor Author

Thanks for your suggestion! I've added type annotation.

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the update!
Sorry, I used my phone to review last time and I didn't look through how did we utilize BulkStateFetcher in celery executor well.

How about adding type annotation for all parameters in BulkStateFetcher's methods ?

IMO, key_and_async_results variable is a good entrypoint to get the corresponding type hint.

def _send_tasks_to_celery(self, task_tuples_to_send: Sequence[TaskInstanceInCelery]):
from airflow.providers.celery.executors.celery_executor_utils import send_task_to_executor
if len(task_tuples_to_send) == 1 or self._sync_parallelism == 1:
# One tuple, or max one process -> send it in the main thread.
return list(map(send_task_to_executor, task_tuples_to_send))
# Use chunks instead of a work queue to reduce context switching
# since tasks are roughly uniform in size
chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send))
num_processes = min(len(task_tuples_to_send), self._sync_parallelism)
with ProcessPoolExecutor(max_workers=num_processes) as send_pool:
key_and_async_results = list(
send_pool.map(send_task_to_executor, task_tuples_to_send, chunksize=chunksize)
)
return key_and_async_results

We can take return type of send_task_to_executor to propagate the type annotation.

def send_task_to_executor(
task_tuple: TaskInstanceInCelery,
) -> tuple[TaskInstanceKey, CommandType, AsyncResult | ExceptionWithTraceback]:
"""Send task to executor."""

@Jasperora
Copy link
Contributor Author

Jasperora commented Jul 8, 2025

Sure! Adding type annotation for all params in methods of BulkStateFetcher would make it more complete.
But in my current editing, some variables would be checked multiple times.
For example,

task_results_by_task_id: dict[str, dict[str, Any]] = {
            task_result["task_id"]: task_result for task_result in task_results
        }

ensures task_results_by_task_id's type.

@staticmethod
    def _prepare_state_and_info_by_task_dict(
        task_ids, task_results_by_task_id
        task_ids, task_results_by_task_id: dict[str, dict[str, Any]]
    ) -> Mapping[str, EventBufferValueType]:

And its type is checked again when it is used as a parameter.
Is it ok? I think it might cause redundancy and inefficiency. If it is redundant, which type annotation should I remove?

Besides, in current branch. https://github.com/apache/airflow/blob/main/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py#L322C4-L322C47
sync_parallelism is optional.
But I checked its usage in celery_executor.py.
https://github.com/apache/airflow/blob/main/providers/celery/src/airflow/providers/celery/executors/celery_executor.py#L293C5-L307C90
and
https://github.com/apache/airflow/blob/main/providers/celery/src/airflow/providers/celery/executors/celery_executor.py#L312C4-L318C73
I think sync_parallelism should be int instead of optional.

However, adding type annotation to it would cause error since in providers/celery/tests/integration/celery/test_celery_executor.py, some BulkStateFetcher are not provided with this parameter.
For example, https://github.com/apache/airflow/blob/main/providers/celery/tests/integration/celery/test_celery_executor.py#L331C17-L331C67
Do I need to give a fake value to BulkStateFetcher in the test?

@jason810496
Copy link
Member

And its type is checked again when it is used as a parameter.
Is it ok? I think it might cause redundancy and inefficiency. If it is redundant, which type annotation should I remove?

I think the ruff linter will raise error if we add type annotation on both side. ( No sure, maybe I am wrong )
IMO, adding type annotation at parameters level should be enough.

Do I need to give a fake value to BulkStateFetcher in the test?

I prefer to add mock value to BulkStateFetcher in the test, as we always provide sync_parallelism in our real use.
Thanks for pointing out the issue!

@Jasperora
Copy link
Contributor Author

Thanks for your suggestion! I retained the type annotation only in parameter level and add mock values of sync_parallelism.

@jason810496 jason810496 requested a review from eladkal July 9, 2025 06:35
@eladkal eladkal force-pushed the celery-executor-utils branch from 49cb086 to 07e72cb Compare August 5, 2025 07:44
@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 20, 2025
@jason810496 jason810496 merged commit 4ee07a9 into apache:main Sep 20, 2025
71 checks passed
KatalKavya96 pushed a commit to KatalKavya96/airflow that referenced this pull request Sep 22, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Sep 30, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 1, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 2, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 3, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 4, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 7, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 8, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 9, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 10, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 11, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 12, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 14, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 15, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 17, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 19, 2025
…pache#52839)

* Fix: Use get instead of hasattr for task_result in BulkStateFetcher

* add type annotation for task_results_by_task_id

* add type annotation for params in methods of BulkStateFetcher

* retain type annotation only in param level

* add mock value for sync_parallelism in test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:celery stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants