참고
https://leeyh0216.github.io/posts/airflow_install_and_tutorial/
Airflow 튜토리얼 실행해보기
Airflow 튜토리얼 실행해보기 Airflow 기본 개념 - DAG와 Operator 개요 Airflow를 사용할 일이 많아질 것 같아 사용법 정리 포스팅을 진행한다. Ubuntu 환경에서 설치, 운영, 활용 실습을 진행하려 했으나,
leeyh0216.github.io
Apache Airflow
Apache Airflow는 데이터 파이프라인을 작성, 스케쥴링, 모니터링하기 위한 오픈 소스 플랫폼이다. 복잡한 데이터 처리 작업을 하는 데 유리하며, ETL(추출, 변환, 적재) 프로세스에 많이 사용된다. 스케쥴링 기능은 cron 표현식을 통해 작업의 실행 주기를 설정할 수 있다. 여러 작업을 병렬로 실행할 수 있으며, 클러스터 환경에서도 작동한다.
Airflow의 작업 흐름은 DAG(Directed, Acyclic Graph, 유향 그래프)로 표현된다. DAG는 작업 간 의존성을 정의하며, 작업은 노드, 의존성은 간선으로 나타낸다. DAG는 순환이 없는 그래프이므로 작업이 순차적으로 실행될 수 있도록 보장한다.
docker pull puckel/docker-airflow
docker를 실행한 다음 터미널에 위의 명령을 입력한다. https://hub.docker.com/r/puckel/docker-airflow 의 이미지를 pull하는 명령이다. 이 이미지는 Airflow를 쉽게 배포하고 실행할 수 있도록 한다.
git clone https://github.com/puckel/docker-airflow.git
위의 리포지토리는 Airflow를 Docker 컨테이너로 실행하기 위한 설정 파일인 docker-compose 파일이 들어있다. 이 파일은 Airflow, PostgreSQL, Redis 등을 설정하는 데 필요한 구성이 포함되어 있다. Airflow는 PostgreSQL과 Redis를 각각 메타데이터 DB와 작업 큐로 사용한다.
정상적으로 실행이 완료되면 docker-airflow 폴더가 생성되며 내부에 docker-compose-CeleryExecutor.yml 파일이 생성된 것을 확인할 수 있다. 이 파일은 docker compose 파일의 버전을 지정하고, redis, postgres, webserver, flower, scheduler, worker 등과 같은 서비스를 정의한다. 또한 환경 변수를 정의하며 필요한 볼륨과 포트를 매핑한다. 서비스의 의존성의 정의하며 재시작 정책을 설정한다.
cd docker-airflow
docker-compose -f docker-compose-CeleryExecutor.yml up -d
위 명령은 Docker Compose를 통해 yml 파일에 정의된 서비스를 백그라운드에서 실행하는 명령이다. -f 플래그는 사용할 compose 파일을 지정하며 up은 정의된 서비스를 시작하고 필요 시 이미지를 빌드하는 명령이다. -d 플래그는 detached 모드로 실행하겠다는 의미인데, 명령을 실행한 후 터미널을 차지하지 않고 백그라운드에서 컨테이너가 실행된다.
위 명령이 정상적으로 실행이 되었다면 http://localhost:8080/ 에 접속하여 Airflow Web UI를 확인할 수 있다.
/dags 내부에 있는 tuto.py 파일을 실행해보자. 이 파일은 Airflow를 통해 DAG를 정의하는 예제 코드이다.
tuto.py
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2015, 6, 1),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG("tutorial", default_args=default_args, schedule_interval=timedelta(1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag)
t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id="templated",
bash_command=templated_command,
params={"my_param": "Parameter I passed in"},
dag=dag,
)
t2.set_upstream(t1)
t3.set_upstream(t1)
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2015, 6, 1),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
owner: DAG의 소유자
depends_on_past: 이전 태스크의 성공 여부에 따라 현재 태스크의 실행 여부를 결정할지의 여부
start_date: DAG의 시작 날짜
email: 알림을 받을 이메일의 주소
email_on_failure: 태스크 실패 시 이메일 알림 여부
email_on_retry: 태스크 실패 시 재시도 횟수
retries: 태스크 실패 시 재시도 횟수
retry_delay: 재시도 간격
dag = DAG("tutorial", default_args=default_args, schedule_interval=timedelta(1))
DAG 객체를 생성한다. DAG의 이름, 기본 인자, DAG의 실행 주기를 설정한다.
tuto.py에는 세 개의 태스크가 존재한다. 현재 날짜를 출력하는 태스크, 5초 동안 대기하는 태스크, 정의된 템플릿 명령을 실행하는 태스크가 있다.
t2.set_upstream(t1)
t3.set_upstream(t1)
태스크의 의존성을 설정한다. t2, t3은 t1이 완료된 후 실행된다.
docker-compose -f docker-compose-CeleryExecutor.yml run --rm webserver airflow list_dags
현재 Airflow 인스턴스에 등록된 모든 DAG의 목록을 출력한다.
현재 DAG는 중지된 상태이다. 실행하기 위해서는 아래 명령을 입력하거나 Web UI에서 OFF 버튼을 눌러 ON으로 변경해야 한다.
docker-compose -f docker-compose-CeleryExecutor.yml run --rm webserver airflow unpause tutorial
DAG가 정상적으로 실행이 되고 있음을 확인할 수 있다.
'Databases > Data Engineering' 카테고리의 다른 글
Hadoop과 구성 요소 (0) | 2024.11.03 |
---|---|
[Airflow] DAG 선언 및 실행하기 (2) | 2024.10.01 |
[Airflow] Airflow의 Branching 알아보기 (0) | 2024.09.11 |
[Docker] 도커란? 도커 설치하기 (3) | 2024.09.09 |
데이터 파이프라인이란? (0) | 2024.08.26 |