Skip to content

Conversation

@ferruzzi
Copy link
Contributor

@ferruzzi ferruzzi commented Sep 26, 2025

Adds async support to the existing SQS Notifier so it can be used as an AsyncCallback for DeadlineAlerts, among other things

dags used for manual testing:

from datetime import datetime, timedelta

from airflow.providers.amazon.aws.hooks.sqs import SqsHook
from airflow.providers.amazon.aws.notifications.sqs import SqsNotifier
from airflow.sdk import task, DAG
from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference, AsyncCallback

PAST_DATE = datetime(1980, 8, 10, 2)
MSG_PREFIX = "SQS Testing Rnd 0:"

DEFAULTS = {"queue_url": "https://sqs.us-east-1.amazonaws.com/324969868898/async-notifier-testing"}


def _sqs_callback(context):
    SqsHook().send_message(**DEFAULTS, message_body=f"{MSG_PREFIX} Hook as Callback -- {context['dag_run'].dag_id}")


@task
def send_sqs_message(message):
    SqsHook().send_message(**DEFAULTS, message_body=message)


@task.bash(task_id='sleep_task')
def sleep_n_secs(seconds):
    return f'sleep {seconds}'


with DAG(
    "sqs_hook_via_taskflow",
    tags=["sqs"]
):
    send_sqs_message(message=f"{MSG_PREFIX} Taskflow -- {{{{ dag_run.dag_id }}}} ")

with DAG(
    "sqs_callback_on_success",
    on_success_callback=_sqs_callback,
    tags=["sqs"],
):
    sleep_n_secs(1)

with DAG(
    "sqs_notifier_as_callback",
    on_success_callback=SqsNotifier(**DEFAULTS, message_body=f"{MSG_PREFIX} Sync Notifier as callback -- {{{{ dag_run.dag_id }}}} "),
    tags=["sqs"],
):
    sleep_n_secs(1)


with DAG(
    "sqs_notifier_as_deadline",
    deadline=DeadlineAlert(
        reference=DeadlineReference.FIXED_DATETIME(PAST_DATE),
        interval=timedelta(0),
        callback=AsyncCallback(
            SqsNotifier,
            kwargs={**DEFAULTS, "message_body": f"{MSG_PREFIX} Async Notifier as Deadline -- {{{{ dag_run.dag_id }}}} "}
        ),
    ),
    tags=["sqs"],
):
    sleep_n_secs(1)

and proof of delivery with intact templating:

image

cc @ramitkataria @seanghaeli


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Sep 26, 2025
@ferruzzi ferruzzi mentioned this pull request Sep 26, 2025
1 task
MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
@ferruzzi ferruzzi requested a review from vincbeck September 30, 2025 16:19
@eladkal eladkal merged commit 351098e into apache:main Oct 1, 2025
80 checks passed
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 2, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 3, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 4, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 7, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 8, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 9, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 10, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 11, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 12, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
dabla pushed a commit to dabla/airflow that referenced this pull request Oct 12, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 14, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 15, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 17, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 19, 2025
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants