Setup and run a DAG in Airflow

To Nha Notes | Nov. 2, 2021, 3:49 p.m.

Installation:

pyenv virtualenv 3.8.5 airflow
source ~/.pyenv/versions/airflow/bin/activate
pip install apache-airflow

Create workspace:

mkdir -p ~/workspace/airflow

Activate workspace:

cd ~/workspace/airflow
source ~/.pyenv/versions/airflow/bin/activate
export AIRFLOW_HOME="$(pwd)"

Configure:

airflow.cfg

Initialize DB:

airflow db init

Create admin account:

airflow users create --username tonha --password 123456 --firstname to --lastname nha --role Admin --email admin@example.com

Start server:

airflow webserver airflow scheduler or airflow standalone

Browser:

http://127.0.0.1:8080

Implement new DAG:

mkdir -p ~/workspace/airflow/dags vi download_rocket_launches.py

import json
import pathlib

import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
   dag_id="download_rocket_launches",
   start_date=airflow.utils.dates.days_ago(14),
   schedule_interval=None,
)

download_launches = BashOperator(
   task_id="download_launches",
   bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",
   dag=dag,
)


def _get_pictures():
   # Ensure directory exists
   pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

   # Download all pictures in launches.json
   with open("/tmp/launches.json") as f:
       launches = json.load(f)
       image_urls = [launch["image"] for launch in launches["results"]]
       for image_url in image_urls:
           try:
               response = requests.get(image_url)
               image_filename = image_url.split("/")[-1]
               target_file = f"/tmp/images/{image_filename}"
               with open(target_file, "wb") as f:
                   f.write(response.content)
               print(f"Downloaded {image_url} to {target_file}")
           except requests_exceptions.MissingSchema:
               print(f"{image_url} appears to be an invalid URL.")
           except requests_exceptions.ConnectionError:
               print(f"Could not connect to {image_url}.")


get_pictures = PythonOperator(
   task_id="get_pictures",
   python_callable=_get_pictures,
   dag=dag,
)

notify = BashOperator(
   task_id="notify",
   bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
   dag=dag,
)

download_launches >> get_pictures >> notify

Validate DAG code:

python dags/download_rocket_launches.py

Run a DAG:

airflow dags test download_rocket_launches "2021-01-01 11:00:00"

Check run logs:

tail -f logs/scheduler/latest/download_rocket_launches.log

https://docs.aws.amazon.com/mwaa/latest/userguide/samples-dag-run-info-to-csv.html