Overview
오늘은 데이터 파이프라인 및 워크플로 오케스트레이션 도구인 Apache Airflow에 대해 알아본다.
Airflow는 데이터 엔지니어링, DevOps, MLOps 등 다양한 분야에서 복잡한 태스크 간의 의존성 관리와 자동화된 실행을 돕는 강력한 도구이다.
Airbnb에서 시작되어 현재는 Apache Software Foundation에서 관리되고 있으며, 다양한 Operator 및 확장성을 통해 다양한 클라우드 및 온프레미스 환경에서 유연하게 사용 가능하다.
이번 글에서는 Airflow의 핵심 개념과 구성 요소, 그리고 Kubernetes 기반 오케스트레이션 도구인 Argo Workflow와의 비교를 통해 어떤 환경에 적합한지 이해해보자.
또한 Airflow를 Docker와 Kubernetes 환경에서 설치하는 방법에 대해서 알아본다.
📅 관련 글
2024.02.02 - [IaC/CI CD Tool] - Argo Workflow란?
Airflow란?
Apache Airflow는 복잡한 계산 워크플로와 데이터 처리 파이프라인을 조정하기 위한 오픈 소스 도구이다. Airflow는 2014년 Airbnb에 의해 시작되었으며 이후 2016년에 Apache Software Foundation의 일부가 되었다. Airflow는 작업 실행을 관리하고 지정된 워크플로 내에서 올바른 순서로 실행되도록 하는 데 사용된다.
Airflow 주요개념
- DAGs (Directed Acyclic Graphs)
- Airflow의 핵심 개념은 DAG이다. 이는 실행하려는 모든 작업의 모음을 나타내며 관계 및 종속성을 반영하는 방식으로 구성된다.
- DAG는 순환이 없는 노드와 방향성 간선이 있는 그래프이다. 이는 자체적으로 루프백할 수 없음을 의미한다.
- Tasks
- Task는 DAG 내의 작업 단위를 나타낸다. 각 작업은 일반적으로 수행할 특정 유형의 작업을 나타내는 "Operator"의 인스턴스이다.
- 작업의 예로는 Python 함수 실행, SQL 명령 실행, Hadoop 클러스터에서 작업 수행 등이 있다.
- Operators
- Operator는 작업에 의해 실제로 수행되는 작업을 결정한다. Airflow는 Bash 명령 실행을 위한 BashOperator, Python 코드 실행을 위한 PythonOperator 또는 PostgresOperator 또는 KubernetesPodOperator와 같은 보다 전문화된 연산자와 같은 여러 유형의 연산자를 제공한다.
- 각 Operator는 특정 유형의 작업을 처리하므로 사용자는 워크플로의 세부 사항에 따라 DAG를 맞춤 설정할 수 있다.
- Schedulers
- Scheduler는 모든 작업과 DAG를 모니터링한 다음 종속성이 충족된 작업 인스턴스를 트리거한다.
- Scheduler는 '무엇을 실행해야 하는지'와 '언제 실행해야 하는지'를 결정하는 Airflow의 구성요소이다.
- Executors
- Executor는 실제로 작업을 실행하는 메커니즘이다. Airflow는 여러 실행기를 지원하며 가장 주목할만한 것은 LocalExecutor, CeleryExecutor 및 KubernetesExecutor이다.
- Executor의 선택은 워크플로의 규모와 요구 사항에 따라 작업이 실행되는 방법과 장소에 영향을 미친다.
- Hooks
- Hook는 MySQL, PostgreSQL, HDFS 등과 같은 외부 플랫폼 및 데이터베이스에 대한 인터페이스 이다.
- 외부 데이터와 인터페이스를 가져오는 데 사용되며, 운영자는 이를 사용하여 작업을 수행할 수 있다.
Airflow의 구조 아키텍처 다이어그램
- Scheduler: DAG 파일을 읽고 작업 스케줄링
- Web Server: UI 제공 (모니터링, 트리거, 히스토리 조회 등)
- Executor: 실제 작업 실행 (Local, Celery, Kubernetes 등)
- Metadata DB: 상태 저장 (작업 성공 여부, DAG 이력 등)
- Worker: Executor가 사용하는 작업 실행 단위 (특히 CeleryExecutor에서 중요)
Apache Airflow vs Argo Workflow
2024.02.02 - [IaC/CI CD Tool] - Argo Workflow란?
Feature | Airflow | Argo Worklfow |
워크플로 정의 | Python 스크립트를 사용하여 워크플로를 정의하므로 복잡한 논리와 통합이 가능 | Kubernetes 리소스와 직접 통합할 수 있는 YAML을 사용하여 정의 |
DAG 기능 | 작업 종속성 및 조정을 관리하기 위한 DAG에 대한 기본 지원 | Kubernetes 내 작업의 종속성과 실행 순서를 관리하기 위한 DAG를 지원 |
실행 환경 | 일반적으로 Celery, Kubernetes 등으로 관리되는 독립형 서버 또는 클러스터에서 실행 | Pod를 사용하여 워크플로 단계를 실행하여 Kubernetes에서 기본적으로 실행 |
확장성 | Celery, Kubernetes 등과 같은 실행 프로그램을 통해 확장 가능합니다. 작업은 작업자 가용성에 따라 확장 | Kubernetes 통합으로 인해 확장성이 뛰어나고 Pod의 동적 할당을 통해 확장 |
사용자 인터페이스 |
워크플로 모니터링, 재시도 및 시각화를 위한 풍부한 UI | 주로 워크플로를 시각화하고 Kubernetes 개체를 직접 관리하기 위한 단순화된 UI |
커뮤니티 및 지원 | 다양한 플러그인과 타사 도구를 통한 광범위한 커뮤니티 지원 | Kubernetes 기반 지원 및 통합을 통해 커뮤니티가 성장 |
- Apache Airflow는 Python을 사용하여 프로그래밍 방식으로 작업을 정의하고 관리할 수 있는 복잡한 데이터 파이프라인 조정에 더 적합하다.
- Argo Workflows는 워크플로가 컨테이너화된 Kubernetes 환경에서 빛을 발하며 Kubernetes가 이미 사용 중인 DevOps 및 MLOps 파이프라인에 이상적이다.
Airflow 설치
Helm Chart로 Airflow 설치 예시
# Helm repo 추가
helm repo add apache-airflow https://airflow.apache.org
# Helm chart 업데이트
helm repo update
# values.yaml 수정 후 설치
helm install airflow apache-airflow/airflow -n airflow --create-namespace -f values.yaml
- `values.yaml` 에는 PostgreSQL, Redis 등의 설정이 포함되며, 공식 차트에서 제공하는 예제를 참고할 수 있다.
- helm 링크
어떤 Executor를 사용할까?
Executor 유형 | 특징 | 추천 대상 |
SequentialExecutor | 기본 설정. 한 번에 하나의 Task만 실행 | 테스트용, 로컬 환경 |
LocalExecutor | 병렬 실행 가능. 멀티프로세싱 기반 | 소규모 워크플로 |
CeleryExecutor | 분산 환경 지원. Task를 Worker가 소비 | 복잡한 워크플로, 병렬성 중요 |
KubernetesExecutor | Pod 단위로 Task 실행. 완전 분산 환경 | 클라우드, 쿠버네티스 네이티브 환경 |
Airflow 설치 실습
실습 1: LocalExecutor로 Airflow 실행하기 (Docker 기반)
설치 전 요구사항
- Docker
- Docker Compose
디렉토리 구조
airflow-local/
├── docker-compose.yaml
└── dags/
└── sample_dag.py
`docker-compose.yaml`
version: '3'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
airflow-init:
image: apache/airflow:2.8.1
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
entrypoint: >
bash -c "airflow db init && airflow users create
--username admin --firstname somaz --lastname admin --role Admin
--email somaz@email.com --password somaz1234"
airflow-webserver:
image: apache/airflow:2.8.1
depends_on:
- airflow-init
ports:
- "8080:8080"
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
command: webserver
airflow-scheduler:
image: apache/airflow:2.8.1
depends_on:
- airflow-webserver
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
command: scheduler
`dags/sample_dag.py`
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG("sample_dag", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
t1 = BashOperator(task_id="hello", bash_command="echo 'Hello Airflow'")
t2 = BashOperator(task_id="bye", bash_command="echo 'Bye Airflow'")
t1 >> t2
실행
# 1. docker-compose up -d 로 실행
docker-compose up -d
# 2. Web UI 접속 확인
open http://localhost:8080
# 3. admin 계정
username: admin
password: somaz1234
확인하고 로그인 해본다.
내가 생성한 DAG 실행해본다.
아래와 같이 Graph도 확인 가능하다.
CLI로 확인도 가능하다.
# Webserver 컨테이너에 접속
docker exec -it airflow-local-airflow-webserver-1 bash
# DAG 목록 확인
airflow dags list
...
dag_id | filepath | owner | paused
===========+===========+=========+=======
sample_dag | sample.py | airflow | False
# DAG 실행
airflow dags trigger sample_dag
...
[2025-04-04T05:55:56.784+0000] {__init__.py:42} INFO - Loaded API auth backend: airflow.api.auth.backend.session
| | | data_interval_star | | | | last_scheduling_de | | | |
conf | dag_id | dag_run_id | t | data_interval_end | end_date | external_trigger | cision | logical_date | run_type | start_date | state
=====+============+====================+====================+=====================+==========+==================+====================+=====================+==========+============+=======
{} | sample_dag | manual__2025-04-04 | 2025-04-03 | 2025-04-04 | None | True | None | 2025-04-04 | manual | None | queued
| | T05:55:56+00:00 | 00:00:00+00:00 | 00:00:00+00:00 | | | | 05:55:56+00:00 | | |
# 상태 확인
airflow tasks list sample_dag
...
bye
hello
airflow tasks list sample_dag --tree
...
<Task(BashOperator): hello>
<Task(BashOperator): bye>
airflow tasks states-for-dag-run sample_dag <dag_run_id>
삭제해준다.
docker compose down
WARN[0000] /Users/somaz/PrivateWork-sub/airflow-local/docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion
[+] Running 5/5
✔ Container airflow-local-airflow-scheduler-1 Removed 1.5s
✔ Container airflow-local-airflow-webserver-1 Removed 2.4s
✔ Container airflow-local-airflow-init-1 Removed 0.1s
✔ Container airflow-local-postgres-1 Removed 0.1s
✔ Network airflow-local_default Removed 0.1s
실습 2: KubernetesExecutor로 Airflow 실행하기
전제 조건
- Kubernetes 클러스터 (Minikube, Kind, GKE 등)
- Helm 설치
- kubectl CLI 설치
Git Clone 후 파일을 확인한다.
# git clone
git clone https://github.com/airflow-helm/charts.git
# 디렉토리 확인
cd charts/charts/airflow
ls
CHANGELOG.md ci sample-values-CeleryKubernetesExecutor.yaml
CONTRIBUTING.md docs sample-values-KubernetesExecutor.yaml
Chart.lock examples templates
Chart.yaml files values.yaml
README.md sample-values-CeleryExecutor.yaml
# sample KuberntesExcutor.yaml 복사
cp sample-values-KubernetesExecutor.yaml airflow.yaml
`airflow.yaml` 예시
# airflow-values.yaml
########################################
## Airflow 기본 설정
########################################
airflow:
image:
repository: apache/airflow
tag: 2.8.4-python3.9
executor: KubernetesExecutor
fernetKey: "$(openssl rand -base64 32)"
webserverSecretKey: "$(openssl rand -hex 16)"
config:
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "False"
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
users:
- username: admin
password: admin
role: Admin
email: admin@example.com
firstName: Admin
lastName: User
clusterDomain: "concrit-cluster.local"
########################################
## CONFIG | Airflow DAGs
###################################
dags:
persistence:
enabled: true
storageClass: "nfs-client-nopath"
gitSync:
enabled: false
########################################
## Webserver
########################################
web:
replicas: 1
service:
type: NodePort
sessionAffinity: "None"
sessionAffinityConfig: {}
externalPort: 8080
nodePort:
http: 30080
loadBalancerIP: ""
loadBalancerSourceRanges: []
webserverConfig:
stringOverride: |
from airflow import configuration as conf
from flask_appbuilder.security.manager import AUTH_DB
SQLALCHEMY_DATABASE_URI = conf.get("core", "SQL_ALCHEMY_CONN")
AUTH_TYPE = AUTH_DB
webserver:
service:
type: NodePort
ports:
- name: airflow-ui
port: 8080
targetPort: 8080
nodePort: 30080
########################################
## Scheduler
########################################
scheduler:
replicas: 1
########################################
## Triggerer
########################################
triggerer:
enabled: true
replicas: 1
########################################
## Postgres
########################################
postgresql:
enabled: true
persistence:
enabled: true
size: 8Gi
storageClass: "nfs-client-nopath"
########################################
## 기타
########################################
persistence:
enabled: false
- StorageClass 는 각자 상황에 맞게 설정해야 한다.
- 필자는 nfs-provisioner 사용 중이다.
k get sc
NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE
nfs-client (default) nfs-provisioner Delete WaitForFirstConsumer true 155d
nfs-client-nopath nfs-provisioner Delete WaitForFirstConsumer true 113d
nfs-client-sts nfs-provisioner Delete WaitForFirstConsumer true 127d
설치해본다.
# Helm repo 추가
helm repo add apache-airflow https://airflow.apache.org
# Helm chart 업데이트
helm repo update
# values.yaml 수정 후 설치
helm install airflow apache-airflow/airflow -n airflow --create-namespace -f airflow.yaml
Service NodePort 확인
# helm template 으로 확인
helm template airflow apache-airflow/airflow -f airflow.yaml -n airflow | grep -A 20 "kind: Service"
# sevice 확인
k get svc -n airflow
- 동일하게 접속이 된다.
DAG가 보이지 않거나 작동 안 할 때 체크리스트
체크 항목 | 해결 방법 |
DAG가 나타나지 않음 | dags 폴더가 volumes에 잘 마운트되어 있는지 확인 |
DAG 인식 안 됨 | 컨테이너 또는 파드 재시작 `docker-compose restart or airflow dags list` `kubectl rollout restart deployment airflow-scheduler -n airflow` `kubectl rollout restart deployment airflow-webserver -n airflow` |
DAG에서 에러 발생 | Web UI → DAG 클릭 → "Log" 탭 확인 |
스케줄러 실행 안 됨 | `docker ps` 또는 `kubectl get po -n airflow` 로 scheduler 컨테이너가 살아있는지 확인 |
추가로 확인할 것
확인 포인트 | 설명 |
`dags.path` & `dags.persistence` | Helm values.yaml에서 DAG 경로가 올바른지, PVC가 제대로 mount되었는지 |
DAG 파일 오너 및 권한 | airflow 유저가 읽을 수 있는지 확인 (chmod, chown) |
DAG 내 Python 문법 오류 | airflow-scheduler pod 로그에서 DAG 파싱 에러가 없는지 확인 |
`gitSync.enabled` 여부 | GitSync를 쓰고 있다면 Git 리포지토리와 동기화되는지 확인 |
DAG 파일 위치 | `/opt/airflow/dags` 경로인지? 다른 경로면 `AIRFLOW__CORE__DAGS_FOLDER` 수정 필요 |
마무리
Airflow는 복잡한 데이터 파이프라인의 흐름을 DAG 구조로 명시적으로 정의하고, 직관적인 UI와 다양한 Operator를 통해 운영 관리가 용이한 워크플로 도구이다.
데이터 엔지니어링, 배치 처리, ETL, 머신러닝 파이프라인 등 다양한 분야에서 광범위하게 사용되고 있으며,
특히 Python 기반의 유연한 DAG 정의와 다양한 외부 시스템 연동 Hook이 강력한 장점이다.
또한 Kubernetes 환경을 사용한다면 Argo Workflows처럼 클라우드 네이티브한 대안도 고려해볼 수 있다.
각 도구는 사용 목적과 환경에 따라 장단점이 다르므로, 유스케이스에 맞는 도입이 중요하다.
Airflow의 개념과 구성 요소를 제대로 이해하고 나면, 데이터 중심의 자동화 및 오케스트레이션 환경을 보다 체계적으로 구축할 수 있을 것이다.
Reference
https://coffeewhale.com/kubernetes/workflow/argo/2020/02/14/argo-wf/
https://developnote-blog.tistory.com/176
https://zerohertz.github.io/k8s-airflow/
'Data Enginnering' 카테고리의 다른 글
대량 데이터 처리와 데이터 아키텍처 설계(OLAP & OLTP) (1) | 2025.04.29 |
---|---|
Data ETL Pipeline 구성 요소 및 설명 (0) | 2025.04.14 |