To Nha Notes | Aug. 4, 2022, 8:29 p.m.
Beow is sample lambda code of an API gateway written via chalice app, but we can reference its code inside to trigger an Airflow DAG.
Python lambda:
import base64
import json
import os
from typing import Dict
import boto3
import requests
from chalice import Chalice
app = Chalice(app_name='dags_trigger')
@app.route("/dags_trigger/{dag_name}", methods=["POST"], api_key_required=True)
def dags_trigger(dag_name: str) -> Dict[str, str]:
json_body = app.current_request.json_body
client = boto3.client("mwaa")
cli_token = client.create_cli_token(Name=os.environ.get("MWAA_ENV_NAME"))
auth_token = "Bearer " + cli_token["CliToken"]
webserver_hostname = "https://{0}/aws_mwaa/cli".format(cli_token["WebServerHostname"])
dag_conf = json.dumps(json_body)
raw_data = f"dags trigger {dag_name} -c '{dag_conf}'"
response = requests.post(
webserver_hostname, headers={"Authorization": auth_token, "Content-Type": "text/plain"}, data=raw_data
)
mwaa_std_err_message = base64.b64decode(response.json()["stderr"]).decode("utf8")
mwaa_std_out_message = base64.b64decode(response.json()["stdout"]).decode("utf8")
return {
"status": "OK" if response.status_code == 200 else "NG",
"message": mwaa_std_out_message,
"error": mwaa_std_err_message,
}
Python function:
def dag_trigger(env_name: str, dag_name: str) -> None:
client = boto3.client("mwaa", region_name="ap-northeast-1")
cli_token = client.create_cli_token(Name=env_name)
auth_token = "Bearer " + cli_token["CliToken"]
print(f"auth_token: {auth_token}")
webserver_hostname = "https://{0}/aws_mwaa/cli".format(cli_token["WebServerHostname"])
dag_conf = {}
raw_data = f"dags trigger {dag_name} -c {dag_conf}"
response = requests.post(
webserver_hostname, headers={"Authorization": auth_token, "Content-Type": "text/plain"}, data=raw_data
)
print(f"response.content: {response.content}")
mwaa_std_err_message = base64.b64decode(response.json()["stderr"]).decode("utf8")
mwaa_std_out_message = base64.b64decode(response.json()["stdout"]).decode("utf8")
return {
"status": "OK" if response.status_code == 200 else "NG",
"message": mwaa_std_out_message,
"error": mwaa_std_err_message,
}
dag_trigger("mp-dev-maf", "your_dag_name")
https://docs.aws.amazon.com/mwaa/latest/userguide/call-mwaa-apis-cli.html