Trigger an Airflow DAG from python code

To Nha Notes | Aug. 4, 2022, 8:29 p.m.

Beow is sample lambda code of an API gateway written via chalice app, but we can reference its code inside to trigger an Airflow DAG. 

Python lambda:

import base64
import json
import os
from typing import Dict

import boto3
import requests
from chalice import Chalice

app = Chalice(app_name='dags_trigger')

@app.route("/dags_trigger/{dag_name}", methods=["POST"], api_key_required=True)
def dags_trigger(dag_name: str) -> Dict[str, str]:
    json_body = app.current_request.json_body
    client = boto3.client("mwaa")
    cli_token = client.create_cli_token(Name=os.environ.get("MWAA_ENV_NAME"))
    auth_token = "Bearer " + cli_token["CliToken"]
    webserver_hostname = "https://{0}/aws_mwaa/cli".format(cli_token["WebServerHostname"])
    dag_conf = json.dumps(json_body)
    raw_data = f"dags trigger {dag_name} -c '{dag_conf}'"
    response = requests.post(
        webserver_hostname, headers={"Authorization": auth_token, "Content-Type": "text/plain"}, data=raw_data
    )
    mwaa_std_err_message = base64.b64decode(response.json()["stderr"]).decode("utf8")
    mwaa_std_out_message = base64.b64decode(response.json()["stdout"]).decode("utf8")
    return {
        "status": "OK" if response.status_code == 200 else "NG",
        "message": mwaa_std_out_message,
        "error": mwaa_std_err_message,
    }

 

Python function:

def dag_trigger(env_name: str, dag_name: str) -> None:
    client = boto3.client("mwaa", region_name="ap-northeast-1")
    cli_token = client.create_cli_token(Name=env_name)
    auth_token = "Bearer " + cli_token["CliToken"]
    print(f"auth_token: {auth_token}")
    webserver_hostname = "https://{0}/aws_mwaa/cli".format(cli_token["WebServerHostname"])
    dag_conf = {}
    raw_data = f"dags trigger {dag_name} -c {dag_conf}"
    response = requests.post(
        webserver_hostname, headers={"Authorization": auth_token, "Content-Type": "text/plain"}, data=raw_data
    )
    print(f"response.content: {response.content}")
    mwaa_std_err_message = base64.b64decode(response.json()["stderr"]).decode("utf8")
    mwaa_std_out_message = base64.b64decode(response.json()["stdout"]).decode("utf8")
    return {
        "status": "OK" if response.status_code == 200 else "NG",
        "message": mwaa_std_out_message,
        "error": mwaa_std_err_message,
    }
dag_trigger("mp-dev-maf", "your_dag_name")

 

https://docs.aws.amazon.com/mwaa/latest/userguide/call-mwaa-apis-cli.html