Overview
ETL (Extract, Transform, Load) 파이프라인은 대량의 데이터를 추출(Extract), 변환(Transform), 저장(Load)하는 과정으로, 데이터 분석, 머신러닝, 비즈니스 인사이트를 위한 필수적인 데이터 엔지니어링 프로세스다.
이 글에서는 Hadoop 기반의 ETL 파이프라인 구성 요소와 각 단계별 주요 기술 스택을 설명해보겠다.
📅 관련 글
2024.02.02 - [IaC/CI CD Tool] - Argo Workflow란?
2024.05.20 - [Data Enginnering] - Airflow란? (개념 및 설치)
2025.03.28 - [Data Enginnering] - 대량 데이터 처리와 데이터 아키텍처 설계(OLAP & OLTP)
ETL 파이프라인이란?
ETL 파이프라인은 데이터를 수집하고 변환하여 데이터 저장소로 전송하는 데이터 처리 프로세스다.
대부분 대용량 데이터(Big Data) 처리를 위해 Hadoop Ecosystem을 활용하며, 실시간 스트리밍 데이터의 경우 Kafka, Flink와 같은 기술을 조합한다.
ETL vs ELT 차이점
- ETL → 데이터 변환(Transform)이 데이터 저장(Load) 전에 이루어짐. (전통적 방식)
- ELT → 데이터를 저장(Load)한 후 변환(Transform) (클라우드 데이터 웨어하우스에서 주로 사용)
ETL 파이프라인 주요 구성 요소
ETL 파이프라인은 다음과 같은 3가지 핵심 단계로 구성된다.
1. Extract (데이터 추출)
데이터를 다양한 원천(Source)에서 수집하는 단계다.
주요 데이터 소스
- 데이터베이스 (MySQL, PostgreSQL, Oracle, SQL Server)
- 파일 시스템 (CSV, JSON, Parquet)
- API 및 웹 데이터 (REST API, GraphQL)
- 스트리밍 데이터 (Kafka, Flume)
- 로그 파일 및 센서 데이터 (Fluentd, Logstash, IoT)
추천 기술 스택
기술 | 역할 |
Sqoop | 관계형 DB → Hadoop(HDFS) 데이터 이관 |
Flume | 로그 데이터 수집 및 전송 |
Kafka | 실시간 스트리밍 데이터 수집 |
NiFi | GUI 기반 데이터 흐름 자동화 |
2. Transform (데이터 변환 및 처리)
추출된 데이터를 분석할 수 있도록 가공하는 과정이다.
이 단계에서 중복 제거, 필터링, 조인, 집계, 포맷 변환 등의 작업을 수행한다.
추천 기술 스택
기술 | 역할 |
Apache Spark | In-Memory 분산 처리 (Batch + Stream) |
MapReduce | Hadoop 기본 분산 처리 방식 (배치) |
Hive | SQL 기반 데이터 변환 및 쿼리 |
Pig | 스크립트 기반 데이터 변환 |
Flink | 실시간 데이터 변환 |
Dask | Python 기반 분산 처리 |
전통적인 MapReduce보다 Spark가 훨씬 빠르고 효율적
SQL 기반 처리를 선호하면 Hive, 실시간 처리는 Flink 추천
3. Load (데이터 저장 및 활용)
변환된 데이터를 저장소에 적재하는 과정이다.
주로 데이터 웨어하우스, 데이터 레이크, NoSQL, 검색 엔진에 저장하여 분석과 활용이 가능하도록 한다.
추천 기술 스택
기술 | 역할 |
HDFS | Hadoop 기반 분산 스토리지 |
HBase | 실시간 쿼리가 필요한 NoSQL 스토리지 |
Cassandra | 대량 데이터 저장용 NoSQL |
Elasticsearch | 검색 및 로그 분석 |
Redshift, BigQuery | 클라우드 기반 데이터 웨어하우스 |
Snowflake | 최신 클라우드 데이터 웨어하우스 |
ETL 파이프라인 설계 시 고려해야 할 요소
1. 데이터 볼륨 (Big Data)
- 데이터가 많아질수록 Hadoop, Spark 같은 분산 처리 시스템이 필수
- HDFS, S3 같은 확장 가능한 저장소 필요
2. 실시간 vs 배치 (Batch vs Streaming)
- 배치(Batch) 처리: Spark, MapReduce, Hive
- 실시간(Streaming) 처리: Kafka, Flink, Spark Streaming
3. 확장성 (Scalability)
- 분산 환경을 고려하여 Spark, Kubernetes, Airflow 사용
- 클라우드 환경에서는 AWS Glue, Google Dataflow, Azure Data Factory 활용 가능
4. 오류 처리 및 모니터링
- Airflow, Prefect를 사용해 워크플로우 관리
- Prometheus + Grafana로 실시간 모니터링
- Elasticsearch + Kibana로 로그 분석
ETL 파이프라인 예제 (Hadoop & Spark 기반)
ETL 프로세스 예제
- Extract: MySQL에서 transactions 테이블 데이터를 가져와 HDFS에 저장
- Transform: Spark로 데이터를 정제하고 purchase_amount가 높은 사용자만 필터링
- Load: 변환된 데이터를 HBase에 저장하여 빠른 조회 지원
ETL Workflow
# Extract: MySQL → HDFS
sqoop import --connect jdbc:mysql://dbserver:3306/ecommerce \\
--username user --password pass \\
--table transactions \\
--target-dir /data/transactions \\
--m 1
# Transform: Spark로 데이터 필터링
spark-submit --class MainApp transform.py
# Load: HDFS → HBase 저장
hbase shell <<EOF
create 'high_value_customers', 'info'
put 'high_value_customers', 'user123', 'info:purchase', '1000'
EOF
ETL 프로세스 예제 - 상세 설명 및 추가 케이스
이 ETL 예제는 MySQL에서 데이터를 추출(Extract) → Spark로 변환(Transform) → HBase에 저장(Load) 하는 과정이다.
여기서 데이터를 transactions 테이블에서 가져와, 구매 금액(purchase_amount)이 높은 사용자만 필터링하여 HBase에 저장하는 흐름을 설명하고 있다
Extract 단계 (MySQL → HDFS)
목표
- MySQL의 transactions 테이블 데이터를 HDFS로 이동
- sqoop을 사용하여 관계형 DB에서 데이터를 추출
작업 방식
sqoop import --connect jdbc:mysql://dbserver:3306/ecommerce \\
--username user --password pass \\
--table transactions \\
--target-dir /data/transactions \\
--m 1
설명
- jdbc:mysql://dbserver:3306/ecommerce → MySQL DB에 연결
- transactions → 가져올 테이블
- /data/transactions → HDFS에 저장할 디렉토리
- -m 1 → 단일 맵퍼 사용 (병렬 처리 가능)
추가 케이스
추출 대상 필터링 (최근 30일 데이터만 가져오기)
sqoop import --connect jdbc:mysql://dbserver:3306/ecommerce \\
--username user --password pass \\
--query "SELECT * FROM transactions WHERE purchase_date > DATE_SUB(NOW(), INTERVAL 30 DAY)" \\
--target-dir /data/transactions_last_30_days \\
--m 1
대량 데이터 처리 (병렬 4개 맵퍼 사용)
sqoop import --connect jdbc:mysql://dbserver:3306/ecommerce \\
--username user --password pass \\
--table transactions \\
--target-dir /data/transactions \\
--split-by transaction_id --m 4
- 이렇게 하면 데이터를 빠르게 가져올 수 있음!
Transform 단계 (Spark 데이터 필터링)
목표
- Spark를 사용하여 구매 금액(purchase_amount)이 높은 사용자만 필터링
- 필터링된 데이터를 다시 HDFS에 저장
작업 방식
spark-submit --class MainApp transform.py
`transform.py` (Python Spark 예제)
from pyspark.sql import SparkSession
# Spark 세션 생성
spark = SparkSession.builder.appName("HighValueCustomers").getOrCreate()
# HDFS에서 데이터 읽기
df = spark.read.option("header", "true").csv("hdfs:///data/transactions")
# 데이터 필터링 (purchase_amount > 1000)
high_value_customers = df.filter(df.purchase_amount > 1000)
# 필터링된 데이터 저장
high_value_customers.write.csv("hdfs:///data/high_value_customers", header=True)
spark.stop()
설명
- SparkSession 생성 → spark.read.csv()로 HDFS에서 데이터 읽음
- 필터링 적용 → purchase_amount > 1000인 데이터만 남김
- 결과를 다시 HDFS에 저장
추가 케이스
데이터를 Parquet 포맷으로 변환하여 저장 (성능 향상)
python
복사편집
high_value_customers.write.parquet("hdfs:///data/high_value_customers_parquet")
Spark SQL을 활용한 고급 필터링
python
복사편집
df.createOrReplaceTempView("transactions")
high_value_customers = spark.sql("SELECT * FROM transactions WHERE purchase_amount > 1000 AND region = 'US'")
- 이렇게 하면 성능이 훨씬 최적화됨!
Load 단계 (HDFS → HBase 저장)
목표
- 필터링된 데이터를 HBase에 저장하여 실시간 분석 가능하도록 처리
- HBase는 빠른 조회가 필요한 데이터에 적합
작업 방식
hbase shell <<EOF
create 'high_value_customers', 'info'
put 'high_value_customers', 'user123', 'info:purchase', '1000'
EOF
설명
- create 'high_value_customers', 'info' → HBase 테이블 생성
- put 'high_value_customers', 'user123', 'info:purchase', '1000' → 특정 사용자 데이터 저장
추가 케이스:
Spark에서 직접 HBase에 저장 (Python)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import happybase
# Spark 세션 생성
spark = SparkSession.builder.appName("LoadToHBase").getOrCreate()
# HBase 연결
connection = happybase.Connection('hbase-master')
table = connection.table('high_value_customers')
# HDFS에서 데이터 읽기
df = spark.read.parquet("hdfs:///data/high_value_customers_parquet")
# HBase에 저장
for row in df.collect():
table.put(row['user_id'].encode(), {b'info:purchase': str(row['purchase_amount']).encode()})
connection.close()
- 이렇게 하면 Spark에서 필터링된 데이터를 직접 HBase에 적재할 수 있음!
최종 정리 (ETL Workflow)
단계 | 기술 | 설명 |
Extract | Sqoop | MySQL → HDFS로 데이터 이동 |
Transform | Spark | purchase_amount > 1000 필터링 |
Load | HBase | 필터링된 데이터를 실시간 분석용 DB로 저장 |
- 이제 이 ETL 프로세스를 Kubernetes, Helm, Airflow로 자동화하면 더 강력한 데이터 파이프라인을 구축할 수 있음!
결론
- ETL 파이프라인은 데이터를 수집(Extract) → 변환(Transform) → 저장(Load)하는 필수 프로세스다.
- Hadoop 기반으로 구축할 경우 HDFS, Spark, Hive, Kafka 등의 기술을 활용하면 확장성과 성능을 확보할 수 있다.
- 최근에는 Spark, Flink, Airflow 등 클라우드 친화적인 데이터 파이프라인이 대세.
최신 트렌드
- ELT 방식으로 Snowflake, BigQuery 같은 클라우드 데이터 웨어하우스를 활용
- Spark + Airflow를 이용한 데이터 파이프라인 자동화
ETL 파이프라인 구축을 고민 중이라면?
- 배치 처리: Hadoop + Spark + Airflow
- 실시간 스트리밍: Kafka + Flink + Elasticsearch
- 클라우드 데이터 웨어하우스: Snowflake, Redshift, BigQuery
Reference
https://han-py.tistory.com/361
'Data Enginnering' 카테고리의 다른 글
대량 데이터 처리와 데이터 아키텍처 설계(OLAP & OLTP) (1) | 2025.04.29 |
---|---|
Airflow란? (개념 및 설치) (0) | 2025.04.04 |