Performance tuning for Apache Airflow on Amazon MWAA

To Nha Notes | Dec. 21, 2022, 11:31 a.m.

There are a few key settings that you should consider modifying as you scale up your data pipelines.

Airflow exposes a number of parameters that are closely related to DAG and task-level performance. These include:

  • Environment-level settings.
  • DAG-level settings.
  • Task-level settings.

Core Settings

Core settings control the number of processes running concurrently and how long processes run across an entire Airflow environment. The associated environment variables for all parameters in this section are formatted as AIRFLOW__CORE__PARAMETER_NAME.

  • parallelism: The maximum number of tasks that can run concurrently on each scheduler within a single Airflow environment. For example, if this setting is set to 32, and there are two schedulers, then no more than 64 tasks can be in a running or queued state at once across all DAGs. If your tasks remain in a scheduled state for an extended period, you might want to increase this value. The default value is 32.

  • max_active_tasks_per_dag (formerly dag_concurrency): The maximum number of tasks that can be scheduled at once, per DAG. Use this setting to prevent any one DAG from taking up too many of the available slots from parallelism or your pools. The default value is 16.

    If you increase the amount of resources available to Airflow (such as Celery workers or Kubernetes resources) and notice that tasks are still not running as expected, you might have to increase the values of both parallelism and max_active_tasks_per_dag.

  • max_active_runs_per_dag: Determines the maximum number of active DAG runs (per DAG) that the Airflow scheduler can create at a time. In Airflow, a DAG run represents an instantiation of a DAG in time, much like a task instance represents an instantiation of a task. This parameter is most relevant if Airflow needs to backfill missed DAG runs. Consider how you want to handle these scenarios when setting this parameter. The default value is 16.

  • dag_file_processor_timeout: How long a DagFileProcessor, which processes a DAG file, can run before timing out. The default value is 50 seconds.

  • dagbag_import_timeout: How long the dagbag can import DAG objects before timing out in seconds, which must be lower than the value set for dag_file_processor_timeout. If your DAG processing logs show timeouts, or if your DAG is not showing in the DAGs list or the import errors, try increasing this value. You can also try increasing this value if your tasks aren't executing, since workers need to fill up the dagbag when tasks execute. The default value is 30 seconds.

Scheduler settings

Scheduler settings control how the scheduler parses DAG files and creates DAG runs. The associated environment variables for all parameters in this section are formatted as AIRFLOW__SCHEDULER__PARAMETER_NAME.

  • min_file_process_interval: The frequency that each DAG file is parsed, in seconds. Updates to DAGs are reflected after this interval. A low number increases scheduler CPU usage. If you have dynamic DAGs created by complex code, you can increase this value to improve scheduler performance. The default value is 30 seconds.

  • dag_dir_list_interval: The frequency that the DAGs directory is scanned for new files, in seconds. The lower the value, the faster new DAGs are processed and the higher the CPU usage. The default value is 300 seconds (5 minutes).

    It's helpful to know how long it takes to parse your DAGs (dag_processing.total_parse_time) to know what values to choose for min_file_process_interval and dag_dir_list_interval. If your dag_dir_list_interval is less than the amount of time it takes to parse each DAG, performance issues can occur.

    TIP

    If you have less than 200 DAGs in a Deployment on Astro, it's safe to set AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=30 (30 seconds) as a Deployment-level environment variable.

  • parsing_processes (formerly max_threads): How many processes the scheduler can run in parallel to parse DAGs. Astronomer recommends setting a value that is twice your available vCPUs. Increasing this value can help serialize a large number of DAGs more efficiently. If you are running multiple schedulers, this value applies to each of them. The default value is 2.

  • file_parsing_sort_mode: Determines how the scheduler lists and sorts DAG files to determine the parsing order. Set to one of: modified_time, random_seeded_by_host and alphabetical. The default value is modified_time.

  • scheduler_heartbeat_sec: Defines how often the scheduler should run (in seconds) to trigger new tasks. The default value is 5 seconds.

  • max_dagruns_to_create_per_loop: The maximum number of DAGs to create DAG runs for per scheduler loop. Decrease the value to free resources for scheduling tasks. The default value is 10.

  • max_tis_per_query: Changes the batch size of queries to the metastore in the main scheduling loop. A higher value allows more tis to be processed per query, but your query may become too complex and cause performance issues. The default value is 512 queries.

DAG-level Airflow settings

DAG-level settings apply only to specific DAGs and are defined in your DAG code. You should modify DAG-level settings if you want to performance tune a particular DAG, especially in cases where that DAG is hitting an external system such as an API or a database that might cause performance issues if hit too frequently. When a setting exists at both the DAG-level and environment-level, the DAG-level setting takes precedence.

There are three primary DAG-level Airflow settings that you can define in code:

  • max_active_runs: The maximum number of active DAG runs allowed for the DAG. When this limit is exceeded, the scheduler won't create new active DAG runs. If this setting is not defined, the value of the environment-level setting max_active_runs_per_dag is assumed.

    If you're utilizing catchup or backfill for your DAG, consider defining this parameter to ensure that you don't accidentally trigger a high number of DAG runs.

  • max_active_tasks:** The total number of tasks that can run at the same time for a given DAG run. It essentially controls the parallelism within your DAG. If this setting is not defined, the value of the environment-level setting max_active_tasks_per_dag is assumed.

  • concurrency:** The maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to allow one DAG to run 32 tasks at once, and another DAG can be set to run 16 tasks at once. If this setting is not defined, the value of the environment-level setting max_active_tasks_per_dag is assumed.

You can define any DAG-level settings within your DAG definition. For example:

  # Allow a maximum of concurrent 10 tasks across a max of 3 active DAG runs
  dag = DAG('my_dag_id', concurrency=10,  max_active_runs=3)

 

Task-level Airflow settings

Task-level settings are defined by task operators that you can use to implement additional performance adjustments. Modify task-level settings when specific types of tasks are causing performance issues.

There are two primary task-level Airflow settings users can define in code:

  • max_active_tis_per_dag (formerly task_concurrency): The maximum number of times that the same task can run concurrently across all DAG runs. For instance, if a task pulls from an external resource, such as a data table, that should not be modified by multiple tasks at once, then you can set this value to 1.
  • pool: Defines the amount of pools available for a task. Pools are a way to limit the number of concurrent instances of an arbitrary group of tasks. This setting is useful if you have a lot of workers or DAG runs in parallel, but you want to avoid an API rate limit or otherwise don't want to overwhelm a data source or destination. For more information, see the Airflow Pools Guide.

The parameters above are inherited from the BaseOperator, so you can set them in any operator definition. For example:

  t1 = PythonOperator(task_id='t1', pool='my_custom_pool', max_active_tis_per_dag=14)

 

Executors and scaling

Depending on which executor you choose for your Airflow environment, there are additional settings to keep in mind when scaling.

Celery executor

The Celery executor utilizes standing workers to run tasks. Scaling with the Celery executor involves choosing both the number and size of the workers available to Airflow. The more workers you have available in your environment, or the larger your workers are, the more capacity you have to run tasks concurrently.

You can also tune your worker_concurrency (environment variable: AIRFLOW__CELERY__WORKER_CONCURRENCY), which determines how many tasks each Celery worker can run at any given time. By default, the Celery executor runs a maximum of sixteen tasks concurrently. If you increase worker_concurrency, you might also need to provision additional CPU and/or memory for your workers.

Potential scaling issues

Scaling your Airflow environment is an art and not a science, and it's highly dependent on your supporting infrastructure and your DAGs. The following are some of the most common issues:

  • Task scheduling latency is high.
    • The scheduler may not have enough resources to parse DAGs in order to then schedule tasks.
    • Change worker_concurrency (if using Celery), or parallelism.
  • DAGs remain in queued state, but are not running.
    • The number of tasks being scheduled may be beyond the capacity of your Airflow infrastructure.
    • If you're using the Kubernetes executor, check that there are available resources in the namespace and check if worker_pods_creation_batch_size can be increased. If using the Celery executor, check if worker_concurrency can be increased.
  • An individual DAG is having trouble running tasks in parallel, while other DAGs are unaffected.
    • Possible DAG-level bottleneck.
    • Change max_active_task_per_dag, pools (if using them), or overall parallelism.

Example high performance use case

The following section describes the type of configurations you can use to enable high performance and parallelism on an environment.

On-premise Apache Airflow

Typically, in an on-premise Apache Airflow platform, you would configure task parallelism, autoscaling, and concurrency settings in your airflow.cfg file:

  • core.parallelism – The maximum number of task instances that can run simultaneously across the entire environment in parallel.

  • core.dag_concurrency – The maximum concurrency for DAGs (not workers).

  • celery.worker_autoscale – The maximum and minimum number of tasks that can run concurrently on any worker.

For example, if core.parallelism was set to 100 and core.dag_concurrency was set to 7, you would still only be able to run a total of 14 tasks concurrently if you had 2 DAGs. Given, each DAG is set to run only seven tasks concurrently (in core.dag_concurrency), even though overall parallelism is set to 100 (in core.parallelism).

On an Amazon MWAA environment

On an Amazon MWAA environment, you can configure these settings directly on the Amazon MWAA console using Apache Airflow configuration optionsAmazon MWAA environment class, and the Maximum worker count autoscaling mechanism. While core.dag_concurrency is not available in the dropdown list as an Apache Airflow configuration option on the Amazon MWAA console, you can add it as a custom Apache Airflow configuration option.

Let's say, when you created your environment, you chose the following settings:

  1. The mw1.small environment class which controls the maximum number of concurrent tasks each worker can run by default and the vCPU of containers.

  2. The default setting of 10 Workers in Maximum worker count.

  3. An Apache Airflow configuration option for celery.worker_autoscale of 5,5 tasks per worker.

This means you can run 50 concurrent tasks in your environment. Any tasks beyond 50 will be queued, and wait for the running tasks to complete.

Run more concurrent tasks. You can modify your environment to run more tasks concurrently using the following configurations:

  1. Increase the maximum number of concurrent tasks each worker can run by default and the vCPU of containers by choosing the mw1.medium (10 concurrent tasks by default) environment class.

  2. Add celery.worker_autoscale as an Apache Airflow configuration option.

  3. Increase the Maximum worker count. In this example, increasing maximum workers from 10 to 20 would double the number of concurrent tasks the environment can run.

 

Trigger multiple DAG runs of a given DAG

import base64
import json
import os
import asyncio
import boto3
import requests
import time
from datetime import datetime


from concurrent.futures import ThreadPoolExecutor


def log(txt):
    with open('/tmp/dags_trigger_log.txt', 'a') as f:
        f.write(txt)

def dag_trigger(env_name: str, dag_name: str, dag_conf, id) -> None:
    now = datetime.now()
    ts_now = time.time()
    dag_run_id = f"{dag_name}_{id}_{ts_now}"
    client = boto3.client("mwaa", region_name="ap-northeast-1")
    cli_token = client.create_cli_token(Name=env_name)
    auth_token = "Bearer " + cli_token["CliToken"]
    # log(f"auth_token: {auth_token}")
    webserver_hostname = "https://{0}/aws_mwaa/cli".format(cli_token["WebServerHostname"])
    raw_data = f"dags trigger {dag_name} -c {dag_conf} -r {dag_run_id}"
    response = requests.post(
        webserver_hostname, headers={"Authorization": auth_token, "Content-Type": "text/plain"}, data=raw_data
    )
    # log(f"response.content: {response.content}")
    mwaa_std_err_message = base64.b64decode(response.json()["stderr"]).decode("utf8")
    mwaa_std_out_message = base64.b64decode(response.json()["stdout"]).decode("utf8")
    log(f"[{str(now)}] dag_run_id: {dag_run_id}, response code: {response.status_code}")
    return {
        "status": "OK" if response.status_code == 200 else "NG",
        "message": mwaa_std_out_message,
        "error": mwaa_std_err_message,
    }
    

env_name = "mp-dev-maf"
dag_name = "mp_ma_instant_notification_target"
dag_conf = {}
async def trigger(loop, sem, id):
    async with sem:
        await loop.run_in_executor(None, dag_trigger, env_name, dag_name, dag_conf, id)


loop = asyncio.get_event_loop()
sem = asyncio.Semaphore(3)

coroutines = [trigger(loop, sem, 1), trigger(loop, sem, 2), trigger(loop, sem, 3)]
loop.run_until_complete(asyncio.gather(*coroutines))

 

References

https://docs.astronomer.io/learn/airflow-scaling-workers

https://docs.aws.amazon.com/mwaa/latest/userguide/best-practices-tuning.html

https://blog.devgenius.io/airflow-task-parallelism-6360e60ab942

https://airflow.apache.org/docs/apache-airflow/stable/faq.html#how-to-improve-dag-performance