Validating data with Great Expectations in Apache Airflow

To Nha Notes | July 25, 2022, 4:24 p.m.

The introduction of Great Expectations is here  https://github.com/great-expectations/great_expectations#introduction

Expectations are assertions for data. They are the workhorse abstraction in Great Expectations, covering all kinds of common data issues, including:

  • expect_column_values_to_not_be_null
  • expect_column_values_to_match_regex
  • expect_column_values_to_be_unique
  • expect_column_values_to_match_strftime_format
  • expect_table_row_count_to_be_between
  • expect_column_median_to_be_between
  • ...and many more

In Apache Airflow, you can create a validation task that has the code from the tap. To handle the failure, you would need to raise an exception. To do that, import the library in your Airflow code. I have included the libraries that you need to include on top of your standard boilerplate in the following code block:

import sys

from great_expectations import DataContext

from airflow.exceptions import AirflowException

from airflow import DAG

from airflow.operators.bash_operator import BashOperator

from airflow.operators.python_operator import PythonOperator

After importing all of the libraries, you can write your task, as shown in the following code block:

def validateData():

    context = DataContext("/home/paulcrickard/peoplepipeline/great_expectations")

    suite = context.get_expectation_suite("people.validate")

    batch_kwargs = {

        "path": "/home/paulcrickard/peoplepipeline/people.csv",

        "datasource": "files_datasource",

        "reader_method": "read_csv",

    }

    batch = context.get_batch(batch_kwargs, suite)

    results = context.run_validation_operator("action_list_operator", [batch])

    if not results["success"]:

        raise AirflowException("Validation Failed")

The preceding code will throw an error, or it will end if the validation succeeded. However, choosing to handle the failure is up to you. All you need to do is check whether results["success"] is True. You can now code the other functions, create the tasks using PythonOperator, and then set the downstream relationships as you have in all the other Airflow examples.

References

Chapter 7: Features of a Production Pipeline of the book Data Engineering with Python

Tutorials

Deploying Great Expectations with Airflow

How to create custom Expectations for pandas

Apache Airflow Provider for Great Expectations

Optimize Your Data Pipeline with Apache Airflow and Great Expectations

Data Context Reference

example_dags