To Nha Notes | Oct. 26, 2022, 10:59 a.m.
A valuable component of logging and monitoring is the use of task callbacks to act upon changes in state of a given task, or across all tasks in a given DAG. For example, you may wish to alert when certain tasks have failed, or have the last task in your DAG invoke a callback when it succeeds.
I've noticed that scheduler aren't sending all events when they have happened. I have defined some custom functions as callbacks when a Dag is created, so when a job is finished the scheduler should make an event and put it to Redis to notify this backend I have built.
Reviewing logs I've seen callbacks are usually invoked in order but if there's one of them out of place, scheduler skip one notification even if job has finished successfully. Attached are the scheduler logs. See more detail in the issue
Use below example as best practice of using callbakcs in Airflow DAG.
In the following example, failures in any task call the task_failure_alert function, and success in the last task calls the dag_success_alert function:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
def task_failure_alert(context):
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
def dag_success_alert(context):
print(f"DAG has succeeded, run_id: {context['run_id']}")
with DAG(
dag_id="example_callback",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
catchup=False,
on_success_callback=None,
on_failure_callback=task_failure_alert,
tags=["example"],
) as dag:
task1 = DummyOperator(task_id="task1")
task2 = DummyOperator(task_id="task2")
task3 = DummyOperator(task_id="task3", on_success_callback=dag_success_alert)
task1 >> task2 >> task3
https://airflow.apache.org/docs/apache-airflow/2.2.1/logging-monitoring/callbacks.html