Changing apply_function in AwaitMessageSensor or AwaitMessageTrigger requires trigger restart #46473
Replies: 3 comments
-
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. |
Beta Was this translation helpful? Give feedback.
-
You should implement restarting triggerrer as part of your workflow when you change the function. Reapplying python code to a running triggerer, generally requires restarting of the Python interpreter - this is a nature of event loop - which achieves speed and deferrability thanks to the fact that it does not need to restart interpreter or create a new interpreter. This is how Python works. If you want to update the documetnation after you got it explained - feel free. There is "Suggest a change in this page" button in our docs and anyone - including you can click it and PR will be open for you where you might update the docs using GitHub UI - as easy as creating the issue above. You are probably one of the best people to know where to update the docs and how to explain it for people like you to understand that this is expected behaviour. |
Beta Was this translation helpful? Give feedback.
-
Converted it to a discussion if more is needed - but looking forward to your contribution - it might be helpful to other people like you who might have similar question. |
Beta Was this translation helpful? Give feedback.
-
Apache Airflow Provider(s)
apache-kafka
Versions of Apache Airflow Providers
apache-airflow-providers-apache-kafka==1.7.0
Apache Airflow version
2.10.4
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Docker-Compose
Deployment details
Used "Docker version 27.4.0, build bde2b89", with official docker-compose from airflow-website as basis. Some changes to install "apache-airflow[apache-kafka]" and include Kafka containers into compose-file.
What happened
I am designing a workflow sending and receiving Kafka messages. For reception I am using AwaitMessageSensor where you have to pass an apply_function in dot-sting-notation as used by importlib. Directly after startup of the airflow system, everything works fine, but when the code of apply_function is changed, these changes are not reflected by the trigger actually being executed. The problem gets highlighted, if the function is renamed. The trigger crashes with an ImprtError "Module "myDAG" does not define a "my_apply_function" attribute/class", although my_apply_function is defined in the currently used DAG-file. I verified that the most recent version of the file is correctly synced to the triggerer-container (mounted into the container as per default) and it can also be found in the database.
What you think should happen instead
Changing the apply function should be correctly reflected by the triggerer.
How to reproduce
Take a working Kafka-Airflow-Setup, use an AwaitMessageSensor, rename its apply_function. (I think this problem is quite general, so I did not provide code, if someone really requires my docker-compose and a minimal DAG, I could provide it later).
Anything else
I researched quite a while, to come to anything near a solution. The TriggererRunner uses a cache to get triggerer classes by their function name (see source code). I read #43253 which has been closed saying that this would be a CloudComposer issue (which I am not using). Also issue #31743 has been closed after updates to the Documentation of deferrable operators. I also digged into the source code of AwaitMessageSensor and AwaitMessageTrigger where the string passed via apply_function is imported by importlib. So I expected my new code to be imported each time, I used the sensor.
My understanding of this problem is far from complete, but it seems to me that my previous expectation is not met, because the TriggererRunner caches the triggerers and therefore the import of my apply_function by importlib is only performed once. If that is not true, please correct me.
I propose that at least the documentation should be updated, explicitly pointing out that changing apply_function requires a restart of the triggerer. The aforementioned changes there where only in the section about writing your own triggerers, which I did not take into account, since I am not writing my own triggerer, but only using one from an offical provider. The way it is working now is very unexpected and cost me a lot of time figuring out. But from a user perspective it would be much more convenient, if the apply_function could be updated whenever code is updated across all containers, e.g. by sidestepping the caching either with the whole AwaitMessageTrigger or at least the apply_function.
Are you willing to submit PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions