Airflow testing

To Nha Notes | April 12, 2022, 11:46 p.m.

Testing a DAG

Airflow users should treat DAGs as production level code. The DAGs should have various tests to ensure that it produces expected results. You can write a wide variety of tests for a DAG. Let’s take a look at some of them.

DAG Loader Test

This test should ensure that your DAG does not contain a piece of code that raises error while loading. No additional code needs to be written by the user to run this test.

python your-dag-file.py

Running the above command without any error ensures your DAG does not contain any uninstalled dependency, syntax errors, etc.

Unit tests

Unit tests ensure that there is no incorrect code in your DAG. You can write a unit test for your tasks as well as your DAG.

Unit test for loading a DAG:

from airflow.models import DagBag
import unittest

class TestHelloWorldDAG(unittest.TestCase):
   @classmethod
   def setUpClass(cls):
       cls.dagbag = DagBag()

   def test_dag_loaded(self):
       dag = self.dagbag.get_dag(dag_id='hello_world')
       self.assertDictEqual(self.dagbag.import_errors, {})
       self.assertIsNotNone(dag)
       self.assertEqual(len(dag.tasks), 1)

Unit test for custom operator:

import unittest
from airflow.utils.state import State

DEFAULT_DATE = '2019-10-03'
TEST_DAG_ID = 'test_my_custom_operator'

class MyCustomOperatorTest(unittest.TestCase):
   def setUp(self):
       self.dag = DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date' : DEFAULT_DATE})
       self.op = MyCustomOperator(
           dag=self.dag,
           task_id='test',
           prefix='s3://bucket/some/prefix',
       )
       self.ti = TaskInstance(task=self.op, execution_date=DEFAULT_DATE)

   def test_execute_no_trigger(self):
       self.ti.run(ignore_ti_state=True)
       self.assertEqual(self.ti.state, State.SUCCESS)
       #Assert something related to tasks results

 

Notes:

To run unittest on local, the PYTHONPATH should point to airflow project root folder.

export PYTHONPATH=/home/tonha/workspace/my-mwaa

python -m unittest discover

 

https://learning.oreilly.com/library/view/data-pipelines-with/9781617296901/OEBPS/Text/09.htm

https://airflowsummit.org/slides/j2-Ensuring-your-DAGs-work-before-going-to-production.pdf

https://github.com/godatadriven/airflow-testing-examples

https://airflow.readthedocs.io/en/1.10.7/best-practices.html

https://www.astronomer.io/guides/testing-airflow/

https://github.com/apache/airflow/tree/main/tests

 

Mocking

 

https://www.programcreek.com/python/example/63694/mock.patch.object

https://changhsinlee.com/pytest-mock/

https://gist.github.com/mmasashi/d45d2fc5f32fba9ae2b91506976099a8

https://linuxtut.com/en/0547f6c898ea2d3b7f0b/

https://docs.python.org/3/library/unittest.mock-examples.html

https://airflow.apache.org/docs/apache-airflow/2.0.1/best-practices.html