파이프라인 오케스트레이션: Airflow, Dagster 및 Prefect
완벽한 데이터 스택이 있다고 상상해 보십시오: Apache Iceberg의 호숫가, 변환 수십 개의 소스에서 데이터를 동기화하는 우아한 DBT, Airbyte 커넥터입니다. 모든 것 완벽합니다. 누군가가 "그런데 이 모든 것을 운영하는 사람은 누구입니까? 누가 보장합니까?"라고 묻습니다. 변환 전에 수집 작업이 시작됩니까? 문제가 발생했을 때 알려주는 사람 새벽 3시에요?" 대답은파이프라인 조정자.
오케스트레이션과 데이터 스택의 보이지 않는 접착제: 구성 요소 수백 개의 상호 의존적인 작업 실행을 조정하고, 다음과 같은 경우 재시도를 관리합니다. 장애가 발생하면 파이프라인 상태를 모니터링하고 비즈니스에 필요한 감사 추적을 생성합니다. 오케스트레이터가 없으면 데이터 파이프라인은 희망으로 함께 묶인 크론 스크립트입니다.
2025년 오케스트레이터 환경은 세 가지 주요 플랫폼이 지배합니다. 아파치 에어플로우 3.0, DAG의 개념을 정의한 베테랑 오케스트레이션; 대그스터 1.x, 그 개념을 도입한 도전자 소프트웨어 정의 자산 그리고 반장 3, Python 최초 솔루션 설계 개발자 경험을 위해. 이 기사에서는 코드를 사용하여 심층적으로 분석합니다. 귀하의 상황에 맞는 것을 선택하는 데 도움이 되는 실제적이고 정직한 비교입니다.
이 기사에서 배울 내용
- Apache Airflow 3.0 내부 아키텍처: 스케줄러, 실행자, 작업자, 메타데이터 DB
- 엔드투엔드 ETL 파이프라인을 위한 완전한 Python DAG를 작성하는 방법
- Dagster의 소프트웨어 정의 자산 패러다임과 이것이 파이프라인에 대한 생각을 바꾸는 이유
- Prefect 3의 흐름/작업 모델과 단순화된 배포 모델
- 사용 사례별 비교표를 통해 세 가지 도구 간의 자세한 비교
- 지속적이고 장기 실행되는 워크플로우를 위한 대안으로서의 임시적
- dbt, Airbyte 및 데이터 레이크하우스와 오케스트레이션을 통합하기 위한 참조 아키텍처
- 프로덕션의 멱등성 파이프라인에 대한 모니터링, 경고 및 모범 사례
데이터 웨어하우스, AI 및 디지털 혁신 시리즈 기사
| # | Articolo | 집중하다 |
|---|---|---|
| 1 | 데이터 웨어하우스의 진화: SQL Server에서 Data Lakehouse로 | 아키텍처 및 플랫폼 |
| 2 | 데이터 메시 및 분산형 아키텍처 | 거버넌스 및 소유권 |
| 3 | ETL 대 최신 ELT: dbt, Airbyte 및 Fivetran | 변환 파이프라인 |
| 4 | 현재 위치 - 파이프라인 오케스트레이션 | 에어플로우, 대그스터, 프리펙트 |
| 5 | 제조 분야의 AI: 예측 유지 관리 | IoT, ML, 디지털 트윈 |
| 6 | 금융 분야의 AI: 사기 탐지 및 신용 평가 | 실시간 ML |
| 7 | 소매업의 AI: 수요 예측 및 추천 | ML이 적용됨 |
오케스트레이션은 기본이기 때문에
기술적인 세부 사항을 살펴보기 전에 이 기술이 어떤 문제를 해결하는지 정확히 이해하는 것이 좋습니다. 오케스트레이터. 중소기업에서 대기업까지의 일반적인 데이터 파이프라인은 다음과 같습니다.
- CRM, ERP, 트랜잭션 데이터베이스, 외부 API(10-50개 소스)에서 추출
- 데이터 레이크에 업로드(Airflow가 Airbyte 또는 Fivetran을 트리거함)
- DBT 변환(복잡한 종속성을 갖는 30-200개 모델)
- 데이터 마트 및 집계 테이블 업데이트
- BI 도구(Metabase, Tableau, Power BI)로 내보내기
- ML 모델 업데이트(특성 추출 + 재학습)
이 각 단계에는 중독 (B는 A가 오기 전에 떠날 수 없다. 완료), 루게릭병 (보고서는 08:00까지 준비되어야 함), e 품질 요구 사항 (if the source data is empty, do not launch the 변환). 별도의 cron 스크립트와 보장된 재해로 이 모든 것을 처리하십시오.
파이프라인 오케스트레이터가 하는 일
| 기능성 | 설명 |
|---|---|
| 종속성 관리 | 작업 실행 순서를 정의하고 파이프라인 종속성을 관리합니다. |
| 스케줄링 | 크론 스케줄링, 이벤트 기반, 데이터 인식(자산 업데이트 트리거) |
| 재시도 및 오류 처리 | 지수 백오프를 통한 자동 재시도, 부분 실패 관리 |
| 병행 | 시간 최적화를 위한 독립적인 작업의 병렬 실행 |
| 모니터링 | 중앙 집중식 대시보드, 작업 로그, SLA 모니터링, 경고 |
| 백필 | 파이프라인의 논리가 변경되면 기록 실행 재실행 |
| 감사 추적 | 규정 준수 및 디버깅을 위한 각 실행의 전체 기록 |
| 매개변수화 | 환경(개발, 스테이징, 프로덕션) 및 수동 실행을 위한 변수 구성 |
Apache Airflow 3.0: 개선된 베테랑
2014년 에어비앤비가 만들고 2016년 아파치 소프트웨어 재단에 기부한 이 앱은 아파치 기류 데이터 파이프라인 조정의 사실상 표준이 되었습니다. 그 이상으로 GitHub의 별 35,000개 수백 명의 커뮤니티가 있습니다. Airflow의 기여자이며 스타트업부터 전 세계 수천 개의 회사에서 사용됩니다. 대기업에.
2025년에 Airflow는 다음 출시와 함께 가장 중요한 발전을 이루었습니다. 버전 3.0은 클라이언트-서버 아키텍처를 도입했습니다. 작업 실행 인터페이스, 기본 자산 인식 스케줄링(Dagster에서 상속됨), DAG 버전 관리 및 완전히 새롭게 디자인된 UI. 그리고 더 이상 단순한 작업 스케줄러가 아닙니다. 현대적인 오케스트레이션 플랫폼.
Apache Airflow 아키텍처
Airflow의 아키텍처를 이해하는 것은 Airflow를 올바르게 배포하고 진단하는 데 중요합니다. 생산상의 문제. 다섯 가지 주요 구성 요소가 있습니다.
공기 흐름 아키텍처 구성요소
| 요소 | 역할 | 기술 |
|---|---|---|
| 웹서버 | DAG 모니터링, 디버깅, 수동 트리거를 위한 웹 UI | 플라스크 + Gunicorn(2.x), FastAPI(3.0) |
| 스케줄러 | DAG를 분석하고, 작업을 예약하고, 실행자 대기열에 배치합니다. | 여러 인스턴스가 있는 Python 데몬, HA |
| 집행자 | 작업 실행: LocalExecutor(단일 노드), CeleryExecutor, KubernetesExecutor | Celery + Redis/RabbitMQ 또는 K8s |
| 노동자 | 실제로 작업 처리(CeleryExecutor/KubernetesExecutor만 사용) | 셀러리 작업자 또는 K8s 포드 |
| 메타데이터 데이터베이스 | 모든 DAG 실행, 작업 인스턴스, 변수, 연결 상태 | PostgreSQL(프로덕션에서 권장), MySQL |
| DAG 프로세서 | Airflow 3.0의 새로운 기능: 스케줄러에서 분리된 DAG 파서 | Python 프로세스 풀 |
DAG 개념
Airflow의 핵심과 DAG(방향성 비순환 그래프): 유향 그래프 작업 실행 순서를 정의하는 주기가 없습니다. 그래프의 각 노드는 작업입니다. 각 호는 종속성입니다. 그래프는 다음과 같아야합니다. 비순환 (사이클 없음) 따라서 순환 종속성을 만들 수 없습니다.
작업은 다음을 통해 구현됩니다. 연산자: 추상화하는 Python 클래스 일의 일종. Airflow에는 수백 개의 내장 연산자(PythonOperator, BashOperator, PostgresOperator, SparkSubmitOperator, dbtCloudOperator...) 및 커뮤니티 공급자 패키지를 통해 최대한 많이 게시합니다.
코드 예: 판매 파이프라인을 위한 완전한 ETL DAG
ETL 파이프라인을 조정하는 완전하고 현실적인 DAG를 살펴보겠습니다. PostgreSQL, 스테이징 로딩, DBT 변환 및 팀에 알림.
# dags/pipeline_vendite.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import pandas as pd
import logging
logger = logging.getLogger(__name__)
# Configurazione DAG con default_args per tutti i task
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"email": ["data-team@azienda.it"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True, # 5, 10, 20 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(hours=2),
}
with DAG(
dag_id="pipeline_vendite_giornaliera",
default_args=default_args,
description="Estrai vendite da ERP, carica in DWH, aggrega con dbt",
schedule="0 5 * * *", # Ogni giorno alle 05:00 UTC
start_date=datetime(2025, 1, 1),
end_date=None,
catchup=False, # Non eseguire run passate
max_active_runs=1, # Una sola esecuzione alla volta
tags=["vendite", "etl", "produzione"],
doc_md="""
## Pipeline Vendite Giornaliera
Estrae ordini dall'ERP (PostgreSQL), li carica in staging,
lancia le trasformazioni dbt e notifica il team su Slack.
**SLA**: Completamento entro le 07:30 UTC
**Owner**: Team Data Engineering
""",
) as dag:
# ============ TASK 1: VERIFICA DISPONIBILITA SORGENTE ============
def check_source_data(**context):
"""Verifica che ci siano dati nella finestra di esecuzione."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
execution_date = context["ds"] # "2025-01-15"
count = pg_hook.get_first("""
SELECT COUNT(*)
FROM orders
WHERE DATE(created_at) = %s
""", parameters=[execution_date])[0]
logger.info(f"Trovati {count} ordini per {execution_date}")
if count == 0:
logger.warning("Nessun dato trovato, skip pipeline")
return "notify_no_data" # Branch: salta elaborazione
return "extract_task_group.extract_orders"
check_source = BranchPythonOperator(
task_id="check_source_data",
python_callable=check_source_data,
)
# ============ TASK GROUP: ESTRAZIONE ============
with TaskGroup("extract_task_group") as extract_group:
def extract_orders(**context):
"""Estrae ordini dall'ERP e li salva in staging."""
execution_date = context["ds"]
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
df = pg_hook.get_pandas_df("""
SELECT
o.id AS ordine_id,
o.customer_id,
o.created_at AS data_ordine,
o.status,
SUM(oi.qty * oi.unit_price * (1 - COALESCE(oi.discount, 0)))
AS importo_totale,
COUNT(oi.id) AS num_righe
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = %(exec_date)s
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""", parameters={"exec_date": execution_date})
# Validazione dati estratti
assert df["importo_totale"].ge(0).all(), "Importi negativi trovati!"
assert df["ordine_id"].is_unique, "Ordini duplicati nell'estrazione!"
# Salva in XCom per passare al task successivo
# Per dataset grandi, usa staging table invece di XCom
context["ti"].xcom_push(key="row_count", value=len(df))
# Carica in tabella staging del DWH
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
dwh_hook.insert_rows(
table="staging.stg_ordini_raw",
rows=df.values.tolist(),
target_fields=df.columns.tolist(),
replace=True,
replace_index=["ordine_id"],
)
logger.info(f"Caricati {len(df)} ordini in staging")
extract_orders_task = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders,
)
def extract_customers(**context):
"""Estrae snapshot clienti aggiornati."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
df_customers = pg_hook.get_pandas_df("""
SELECT id, email, nome, cognome, citta, segmento,
DATE(updated_at) AS data_aggiornamento
FROM customers
WHERE DATE(updated_at) >= CURRENT_DATE - INTERVAL '1 day'
""")
if len(df_customers) > 0:
dwh_hook.insert_rows(
table="staging.stg_clienti_raw",
rows=df_customers.values.tolist(),
target_fields=df_customers.columns.tolist(),
replace=True,
replace_index=["id"],
)
extract_customers_task = PythonOperator(
task_id="extract_customers",
python_callable=extract_customers,
)
# I due extract girano in PARALLELO
[extract_orders_task, extract_customers_task]
# ============ TASK GROUP: TRASFORMAZIONI DBT ============
with TaskGroup("dbt_task_group") as dbt_group:
# dbt run sullo staging layer
dbt_staging = BashOperator(
task_id="dbt_run_staging",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:staging --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
# dbt run sul marts layer (dipende dallo staging)
dbt_marts = BashOperator(
task_id="dbt_run_marts",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:marts --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
# dbt test per validare la qualità
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt test --select tag:staging tag:marts --target prod
""",
)
dbt_staging >> dbt_marts >> dbt_test
# ============ NOTIFICHE ============
notify_success = SlackWebhookOperator(
task_id="notify_success",
slack_webhook_conn_id="slack_data_team",
message="""
:white_check_mark: *Pipeline Vendite Completata*
Data: {{ ds }}
Ordini elaborati: {{ ti.xcom_pull(task_ids='extract_task_group.extract_orders', key='row_count') }}
Durata: {{ dag_run.get_task_instance('dbt_task_group.dbt_test').duration | round(1) }}s
""",
trigger_rule="all_success",
)
notify_no_data = SlackWebhookOperator(
task_id="notify_no_data",
slack_webhook_conn_id="slack_data_team",
message=":information_source: *Pipeline Vendite* - Nessun dato per {{ ds }}, esecuzione saltata",
)
notify_failure = SlackWebhookOperator(
task_id="notify_failure",
slack_webhook_conn_id="slack_data_team",
message=":x: *Pipeline Vendite FALLITA* - Data: {{ ds }} - Controlla Airflow UI!",
trigger_rule="one_failed",
)
# ============ DIPENDENZE DAG ============
check_source >> [extract_group, notify_no_data]
extract_group >> dbt_group >> notify_success
dbt_group >> notify_failure
이 DAG는 Airflow의 기본 패턴을 보여줍니다. TaskGroup 에 대한
그룹 관련 업무, BranchPythonOperator 조건부 논리로,
XCom 작업 간에 데이터를 전달하고 관리를 위한 규칙을 트리거합니다.
성공과 실패에 대한 알림.
Apache Airflow 3.0(2025)의 새로운 기능
- 작업 실행 인터페이스: Airflow API 서버에서 작업자를 분리하여 보안과 확장성을 향상시키는 새로운 클라이언트-서버 API
- 자산 인식 스케줄링: 이제 크론 일정뿐만 아니라 자산(데이터 세트) 업데이트에 의해 DAG가 트리거될 수 있습니다.
- DAG 버전 관리: 각 실행은 시작 시 DAG 버전과 연결되어 배포 중 불일치를 제거합니다.
- 인간 참여형: Airflow 3.1에는 진행하기 전에 사람의 승인을 기다리는 워크플로가 도입되었습니다.
- 반응 UI: 완전히 새롭게 디자인된 인터페이스, 더욱 빠르고 직관적
- 별도의 DAG 프로세서: DAG 구문 분석은 전용 프로세스에서 수행되어 스케줄러의 부하를 줄입니다.
Dagster: 소프트웨어 정의 자산 패러다임
2018년 Elementl(현 Dagster Labs)에서 탄생한 Dagster는 패러다임 전환을 도입했습니다. 오케스트레이션의 급진적: 생각하는 대신 수행할 작업, 우리는 생각합니다 생산할 자산. 자산 및 데이터 아티팩트: DWH의 테이블, ML 모델, Parquet 파일, 생성된 보고서.
이 접근법은 소프트웨어 정의 자산(SDA), 혁명을 일으켰다
개발자 경험. Airflow에서 "5:00에 이 스크립트 실행"을 정의합니다. 대그스터에서
정의 "나는 테이블을 원한다 gold.fatturato_mensile 항상 최신 상태이고,
그리고 그녀는 자신을 업데이트하는 방법을 알고 있습니다." 차이는 미묘해 보이지만 자동 혈통,
간단한 테스트, 데이터 스택에 존재하는 내용에 대한 즉각적인 이해.
2025년에 Dagster 1.9는 구성요소 프레임워크(2025년 10월 GA)로 성숙해졌습니다. 전체 파이프라인을 선언적 구성으로 설명할 수 있으며 카탈로그 시스템의 모든 자산 상태에 대한 전례 없는 가시성을 제공하는 고급 기술입니다.
Dagster 주요 개념
Dagster 용어
| 개념 | 설명 | 기류 상당 |
|---|---|---|
| 자산 | 파이프라인이 생성하는 데이터 아티팩트(테이블, 모델, 파일) | 직접적으로 동등한 것이 없습니다 |
| @자산 | 자산 생성 방법을 정의하는 Python 데코레이터 | PythonOperator(더 제한됨) |
| 채용 정보 | 함께 실행할 자산/작업 선택 | 가리비 |
| Op | 명시적인 데이터 출력이 없는 일반 작업 | 연산자 |
| 나 매니저 | 자산을 저장하고 읽는 방법을 관리합니다(S3, BigQuery, Snowflake...) | 이에 상응하는 것이 없습니다 |
| 자원 | 공유 연결 및 클라이언트(데이터베이스, API) | 연결 + 후크 |
| 감지기 | 이벤트 기반 트리거(파일 시스템 폴링, API, 이벤트) | 센서 오퍼레이터 |
| 일정 | 작업에 대한 시간 기반 트리거 | DAG 일정 |
| 파티션 | 날짜, 카테고리, 지역별 자산 분할 | PartitionedSchedule(더 복잡함) |
코드 예: Dagster를 사용한 자산 기반 파이프라인
우리는 Airflow와 동일한 판매 파이프라인을 구현하지만 Dagster 패러다임을 사용합니다. 자산 간의 종속성이 어떻게 명시적이고 테스트가 간단해집니다.
# pipeline_vendite/assets.py
import pandas as pd
from dagster import (
asset,
AssetIn,
AssetExecutionContext,
MetadataValue,
Output,
DailyPartitionsDefinition,
MaterializeResult,
)
from dagster_dbt import dbt_assets, DbtCliResource
from resources import postgres_erp, postgres_dwh, slack_resource
# Partizionamento giornaliero - Dagster gestisce il backfill automaticamente
daily_partitions = DailyPartitionsDefinition(start_date="2025-01-01")
# ============ ASSET 1: ORDINI GREZZI DALL'ERP ============
@asset(
name="raw_ordini",
group_name="ingestione",
partitions_def=daily_partitions,
description="Ordini grezzi estratti dall'ERP PostgreSQL",
metadata={
"source": "ERP PostgreSQL",
"owner": "data-engineering@azienda.it",
},
compute_kind="python",
)
def raw_ordini(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Estrae gli ordini per la partizione corrente."""
partition_date = context.partition_key # "2025-01-15"
df = postgres_erp.execute_query(f"""
SELECT
o.id AS ordine_id,
o.customer_id,
o.created_at AS data_ordine,
o.status,
SUM(oi.qty * oi.unit_price) AS importo_totale
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = '{partition_date}'
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""")
# Metadata allegati all'asset: visibili nella UI Dagster
return Output(
df,
metadata={
"num_records": MetadataValue.int(len(df)),
"importo_totale": MetadataValue.float(df["importo_totale"].sum()),
"preview": MetadataValue.md(df.head(5).to_markdown()),
},
)
# ============ ASSET 2: CLIENTI AGGIORNATI ============
@asset(
name="raw_clienti",
group_name="ingestione",
partitions_def=daily_partitions,
compute_kind="python",
)
def raw_clienti(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Estrae snapshot clienti aggiornati."""
partition_date = context.partition_key
df = postgres_erp.execute_query(f"""
SELECT id, email, nome, cognome, citta, segmento
FROM customers
WHERE DATE(updated_at) = '{partition_date}'
""")
return Output(df, metadata={"num_records": MetadataValue.int(len(df))})
# ============ ASSET 3: ORDINI ARRICCHITI (Silver Layer) ============
@asset(
name="silver_ordini_arricchiti",
group_name="silver",
partitions_def=daily_partitions,
ins={
"raw_ordini": AssetIn("raw_ordini"),
"raw_clienti": AssetIn("raw_clienti"),
},
description="Ordini joinati con info cliente, validati",
compute_kind="python",
)
def silver_ordini_arricchiti(
context: AssetExecutionContext,
raw_ordini: pd.DataFrame,
raw_clienti: pd.DataFrame,
postgres_dwh: PostgresResource,
) -> MaterializeResult:
"""Arricchisce e valida gli ordini."""
df = raw_ordini.merge(
raw_clienti[["id", "citta", "segmento"]],
left_on="customer_id",
right_on="id",
how="left",
)
# Validazioni
assert df["importo_totale"].ge(0).all(), "Importi negativi!"
assert df["ordine_id"].is_unique, "Ordini duplicati!"
# Carica nel DWH
postgres_dwh.load_dataframe(
df=df,
table="silver.ordini_arricchiti",
partition_col="data_ordine",
partition_value=context.partition_key,
)
return MaterializeResult(
metadata={
"num_records": MetadataValue.int(len(df)),
"clienti_senza_match": MetadataValue.int(df["segmento"].isna().sum()),
}
)
# ============ ASSET 4: MODELLI DBT (Gold Layer) ============
# Dagster ha integrazione nativa con dbt via dagster-dbt
@dbt_assets(
manifest=dbt_manifest_path, # manifest.json generato da dbt
select="tag:marts", # Solo i modelli gold
)
def dbt_marts_assets(context: AssetExecutionContext, dbt: DbtCliResource):
"""Esegue i modelli dbt del layer gold."""
yield from dbt.cli(["run", "--select", "tag:marts"], context=context).stream()
yield from dbt.cli(["test", "--select", "tag:marts"], context=context).stream()
# ============ JOB: COMPOSIZIONE ============
# In Dagster, un Job seleziona quali asset materializzare
from dagster import define_asset_job, ScheduleDefinition
pipeline_vendite_job = define_asset_job(
name="pipeline_vendite_giornaliera",
selection=[
"raw_ordini",
"raw_clienti",
"silver_ordini_arricchiti",
"dbt_marts_assets*", # Tutti gli asset dbt marts
],
)
pipeline_vendite_schedule = ScheduleDefinition(
job=pipeline_vendite_job,
cron_schedule="0 5 * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
이 접근 방식의 즉각적인 이점은 눈에 띕니다. 각 자산은 명시적으로 다음을 선언합니다.
귀하의 입력(AssetIn), Dagster는 자동으로 다음 그래프를 작성합니다.
종속성이 있으며 UI는 소스에서 최종 제품까지 완전한 계보를 보여줍니다. 테스트
모든 함수가 순수한 Python 함수이기 때문에 사소해집니다.
IO 관리자: Dagster 통합의 핵심
Dagster의 IO 관리자는 자산 소싱 방법을 정의합니다. 저장하고 로드했습니다 작업 사이. Dagster에는 S3(Parquet, CSV), Snowflake, BigQuery, DuckDB용 IO 관리자가 포함되어 있습니다. 팬더와 더 많은 것. 별도의 설정 없이 다양한 환경에 대해 다양한 IO 관리자를 설정할 수 있습니다. 자산 코드를 변경하세요.
# resources.py - Configurazione Resources e IO Managers
from dagster import EnvVar
from dagster_aws.s3 import S3PickleIOManager, S3Resource
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster_snowflake_pandas import SnowflakePandasIOManager
# Configurazione multi-environment
resources_by_env = {
"dev": {
# In dev: salva su DuckDB locale (costo zero)
"io_manager": DuckDBPandasIOManager(
database="./dev_lakehouse.duckdb",
),
"postgres_erp": PostgresResource(
host="localhost",
port=5432,
database="erp_dev",
),
},
"staging": {
# In staging: usa S3 + Parquet
"io_manager": S3PickleIOManager(
s3_resource=S3Resource(),
s3_bucket="my-staging-lakehouse",
s3_prefix="dagster/",
),
},
"prod": {
# In prod: usa Snowflake
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
database="DWH_PROD",
schema="DAGSTER_ASSETS",
),
},
}
Prefect 3: 개발자 경험을 위한 Python 우선
2018년에 설립된 Prefect는 처음부터 다른 철학을 받아들였습니다. 오케스트레이션은 가능한 한 간단하고 Pythonic합니다. 버전 3, 2024년 출시 2025년에 완성되어 이 비전을 실현합니다. 모든 Python 함수 단일 데코레이터를 추가하면 조정된 작업이 될 수 있습니다.
Il 흐름/작업 모델 Prefect는 많은 구성을 제거합니다. Airflow와 Dagster에서 요청했습니다. DAG를 명시적으로 정의할 필요가 없으며 파일도 필요하지 않습니다. 별도의 구성 파일을 사용하면 독점 DSL을 배울 필요가 없습니다. 일반 Python을 작성하세요. Prefect가 나머지 작업을 수행합니다.
2025년에는 Prefect 3이 도입되었습니다. 지사 사건 관리를 위해 구조화된 서비스 중단, 메트릭 기반 트리거를 사용한 자동화 (실행 이벤트뿐만 아니라) 실행을 위한 Modal과의 기본 통합 확장 가능한 서버리스. Prefect Cloud는 넉넉한 무료 요금제를 제공합니다. 소규모 팀에서도 접근 가능합니다.
코드 예: 판매 파이프라인을 위한 Flow Prefect
# flows/pipeline_vendite.py
import pandas as pd
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret
from prefect.filesystems import S3
from prefect_slack import SlackWebhook
from datetime import timedelta, date
from typing import Optional
# ============ TASK (funzioni Python con decoratore) ============
@task(
name="Estrai Ordini ERP",
description="Estrae ordini completati per la data specificata",
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash, # Cache basata su input
cache_expiration=timedelta(hours=1), # Valida per 1 ora
tags=["estrazione", "erp"],
)
def estrai_ordini(data: date) -> pd.DataFrame:
"""Estrae ordini dall'ERP per la data specificata."""
logger = get_run_logger()
# Leggi credenziali da Prefect Blocks (gestione sicura secrets)
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
query = """
SELECT o.id AS ordine_id, o.customer_id,
o.created_at, o.status,
SUM(oi.qty * oi.unit_price) AS importo_totale
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = :data
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
"""
with engine.connect() as conn:
df = pd.read_sql(query, conn, params={"data": str(data)})
logger.info(f"Estratti {len(df)} ordini per {data}")
return df
@task(
name="Estrai Clienti ERP",
retries=2,
retry_delay_seconds=30,
tags=["estrazione", "erp"],
)
def estrai_clienti(data: date) -> pd.DataFrame:
"""Estrae clienti aggiornati."""
logger = get_run_logger()
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df = pd.read_sql("""
SELECT id, email, nome, cognome, citta, segmento
FROM customers
WHERE DATE(updated_at) = :data
""", conn, params={"data": str(data)})
logger.info(f"Estratti {len(df)} clienti aggiornati")
return df
@task(
name="Valida e Arricchisci Ordini",
retries=1,
)
def valida_e_arricchisci(
ordini: pd.DataFrame,
clienti: pd.DataFrame,
) -> pd.DataFrame:
"""Valida e arricchisce gli ordini con dati cliente."""
logger = get_run_logger()
# Validazioni
if ordini["importo_totale"].lt(0).any():
raise ValueError("Trovati importi negativi nel dataset ordini")
if not ordini["ordine_id"].is_unique:
raise ValueError("Trovati ordini duplicati")
# Join
df = ordini.merge(
clienti[["id", "citta", "segmento"]],
left_on="customer_id",
right_on="id",
how="left",
)
match_rate = (1 - df["segmento"].isna().mean()) * 100
logger.info(f"Match rate clienti: {match_rate:.1f}%")
return df
@task(name="Carica nel DWH", retries=2)
def carica_dwh(df: pd.DataFrame, schema: str, table: str):
"""Carica il dataframe nel DWH."""
conn_string = Secret.load("dwh-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df.to_sql(
name=table,
con=conn,
schema=schema,
if_exists="replace",
index=False,
)
return len(df)
@task(name="Esegui dbt", retries=1)
def esegui_dbt(target: str = "prod"):
"""Lancia dbt run e test sui modelli marts."""
import subprocess
result = subprocess.run(
["dbt", "run", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if result.returncode != 0:
raise RuntimeError(f"dbt run fallito:\n{result.stderr}")
test_result = subprocess.run(
["dbt", "test", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if test_result.returncode != 0:
raise RuntimeError(f"dbt test fallito:\n{test_result.stderr}")
# ============ FLOW PRINCIPALE ============
@flow(
name="Pipeline Vendite Giornaliera",
description="Orchestrazione ETL vendite: ERP -> DWH -> dbt marts",
flow_run_name="vendite-{data}", # Nome run dinamico
retries=0, # Retry a livello task, non flow
timeout_seconds=7200, # 2 ore max
log_prints=True,
)
def pipeline_vendite(data: Optional[date] = None):
"""
Flow principale per la pipeline vendite.
Può essere lanciato manualmente con una data specifica
o schedulato per girare ogni giorno.
"""
logger = get_run_logger()
if data is None:
data = date.today()
logger.info(f"Avvio pipeline vendite per {data}")
# Estrazione parallela (Prefect gestisce la concorrenza automaticamente)
# submit() esegue il task in modo asincrono
future_ordini = estrai_ordini.submit(data)
future_clienti = estrai_clienti.submit(data)
# Attende entrambi i task e recupera i risultati
ordini = future_ordini.result()
clienti = future_clienti.result()
if len(ordini) == 0:
logger.warning(f"Nessun ordine per {data}, skip elaborazione")
return {"status": "skipped", "data": str(data)}
# Task sequenziali
df_arricchito = valida_e_arricchisci(ordini, clienti)
num_record = carica_dwh(df_arricchito, schema="silver", table="ordini_arricchiti")
esegui_dbt(target="prod")
logger.info(f"Pipeline completata: {num_record} record elaborati")
return {"status": "success", "records": num_record, "data": str(data)}
# ============ DEPLOYMENT ============
# Prefect 3: il deployment definisce schedule e infrastruttura
if __name__ == "__main__":
from prefect.deployments import DeploymentImage
pipeline_vendite.deploy(
name="prod-giornaliero",
cron="0 5 * * *",
work_pool_name="k8s-work-pool", # Esegui su Kubernetes
image=DeploymentImage(
name="myregistry/pipeline-vendite:latest",
platform="linux/amd64",
),
parameters={}, # Parametri default
)
Prefect Blocks: 비밀 및 구성의 중앙 집중식 관리
I 프리펙트 블록 Prefect에 저장된 구성 가능한 개체입니다. 연결 및 비밀 관리를 중앙 집중화하는 백엔드(클라우드 또는 자체 호스팅) 및 인프라 구성. 환경 변수가 필요하지 않습니다. 컨테이너에 분산되어 있으며 코드 재배포 없이 업데이트가 가능합니다.
Airflow vs Dagster vs Prefect 비교: 어느 것이 더 낫습니까?
정직한 대답은 다음과 같습니다. 상황에 따라 다릅니다. 각 도구에는 이상적인 사용 사례와 포인트가 있습니다. 특정 강도. 다음은 귀하의 선택에 도움이 되는 구조화된 비교입니다.
세부 비교: Airflow vs Dagster vs Prefect(2025)
| 표준 | 에어플로우 3.0 | 대그스터 1.9 | 반장 3 |
|---|---|---|---|
| 어형 변화표 | 작업 중심(DAG) | 자산 중심(SDA) | 작업 중심(Python 우선) |
| 학습 곡선 | 높음(DAG 의미 체계, 공급자, 후크) | 중간-높음(자산 모델, IO 관리자) | 낮음(기존 함수의 데코레이터) |
| 데이터 계보 | 제한적(작업 종속성) | 우수함(네이티브, 시각적) | 제한적(최근 추가됨) |
| 개발자 경험(로컬) | Fair(Docker는 Heavy로 구성됨) | 훌륭함(dagster 개발, 로컬 UI) | 훌륭함(pip 설치, 로컬 실행) |
| 테스트 | 복잡함(인프라에 따라 다름) | 우수함(자산 및 기능 모두) | 우수함(업무 및 기능 모두) |
| DBT 통합 | 좋음(dbt클라우드 운영자) | 우수(dagster-dbt, 기본 자산) | 좋음(prefect-dbt) |
| 스케줄링 | 우수(크론, 이벤트, 자산, 마감일) | 우수(크론, 이벤트, 자산, 센서) | 좋음(크론, 이벤트, 자동화) |
| 백필 | 훌륭함(3.0에서 스케줄러로 관리됨) | 우수(네이티브 파티셔닝) | 개별형(수동 또는 자동화) |
| 확장성 | 높음(CeleryExecutor, KubernetesExecutor) | 높음(Kubernetes, ECS, Docker) | 높음(작업풀, K8s, 모달) |
| UI 모니터링 | 훌륭함(3.0의 React UI) | 우수(카탈로그, 계보 그래프) | 우수(Prefect Cloud 대시보드) |
| 배포 모델 | 복잡함(헬름 차트, 작성) | 중간(dagster-webserver, k8s) | 단순(.deploy(), 작업 풀) |
| 커뮤니티/생태계 | 거대함(35,000개 이상의 GitHub 스타, 수백 개의 공급자) | 오름차순(별 10,000개 이상, 기업 중심) | 대형(별 15,000개 이상, 개발자 중심) |
| 관리형 클라우드 | 천문학자, MWAA, 클라우드 컴포저 | Dagster Cloud(서버리스 + 하이브리드) | Prefect Cloud(관대한 무료 등급) |
| 자체 호스팅 비용 | 오픈소스(관리할 인프라) | 오픈소스(관리할 인프라) | 오픈소스(관리할 인프라) |
| 관리 비용 | 월 $500부터 천문학자 | Dagster Cloud(월 $500부터) | Prefect Cloud 넉넉한 무료 등급 |
| 다음에 이상적입니다. | Airflow 경험, 클래식 ETL 파이프라인을 갖춘 팀 | 데이터 기반 팀, DBT 통합, ML 파이프라인 | Team Python, 신속한 프로토타이핑, 이벤트 중심 |
빠른 선택 가이드
- 다음과 같은 경우 공기 흐름을 선택하세요. 귀하의 팀은 이미 Airflow 기술을 보유하고 있습니다. 복잡한 생태계(Spark, Hadoop, 다양한 공급자)를 사용하는 경우 사용 가능한 거대한 운영자 생태계 중 AWS(MWAA) 또는 GCP를 사용하고 있습니다. (Cloud Composer)를 기본적으로 관리합니다.
- 다음과 같은 경우 Dagster를 선택하세요. 데이터 품질과 계보가 우선순위입니다. 스택에 dbt가 포함되어 있고 심층적인 통합을 원할 경우 팀은 다음 작업에 익숙합니다. 작업이 아닌 자산으로 생각하거나 원하는 곳에 ML 파이프라인을 구축하는 경우 각 유물을 추적합니다.
- 다음과 같은 경우 Prefect를 선택하세요. 가장 낮은 학습 곡선을 원하고, 오케스트레이션 경험이 없는 팀과 Python 우선, 워크플로가 필요함 데이터뿐만 아니라 이벤트 중심적이고 운영적이거나 빠르게 시작하고 싶은 경우 Prefect Cloud의 무료 계층.
임시: 복잡한 워크플로를 위한 지속 가능한 실행
Airflow, Dagster 및 Prefect는 다음의 오케스트레이션에 중점을 둡니다. 날짜 파이프라인, 일시적인 다른 문제 해결: 실행 튼튼한 살아남아야 하는 장기 실행 애플리케이션 워크플로우 충돌, 재시작 및 네트워크 중단이 발생합니다.
전 Uber 엔지니어(전임자인 Cadence를 만든 사람)가 개발했습니다. Temporal은 각 워크플로우를 하나로 취급합니다. 지속 가능한 상태 저장 기능: 만약에 워크플로가 실행되는 동안 서버가 다시 시작되면 정확히 해당 위치에서 다시 시작됩니다. 상태를 잃지 않고 중지되었습니다. 이는 몇 시간 동안 지속되는 워크플로에 필수적입니다. 며칠 또는 몇 주(예: 온보딩 프로세스, AI 에이전트 조정, 사가 분산 트랜잭션 패턴).
임시 대 Airflow/Dagster: 선택 시기
| 대본 | 권장 도구 | 이유 |
|---|---|---|
| 일일 ETL 파이프라인 | 에어플로우 / 대그스터 / 프리펙트 | 배치 데이터 조정에 최적화됨 |
| DBT 변환 | Dagster(더 좋음) / Airflow | 기본 통합, 자산 계보 |
| ML 학습 파이프라인 | 대그스터/프리펙트 | 아티팩트 추적, 테스트 용이 |
| 사용자 온보딩 프로세스(다단계) | 일시적인 | 내구성, 애플리케이션 상태 관리 |
| AI/LLM 에이전트 오케스트레이션 | 임시 / 지사 | 장기간 실행되는 복잡한 오류 관리 |
| Saga 패턴 / 분산 트랜잭션 | 일시적인 | 보상거래, 내구성 |
| 실시간 IoT 파이프라인 | Kafka + Flink(비배치 오케스트레이터) | 스트리밍에는 다른 아키텍처가 필요합니다 |
많은 기업이 하이브리드 접근 방식을 취하고 있습니다: 오케스트레이션을 위한 Airflow 또는 Dagster 데이터 파이프라인, 내구성이 필요한 애플리케이션 워크플로우를 위한 임시. 두 가지 도구 그들은 서로를 완벽하게 보완하며 직접적인 경쟁 관계에 있지 않습니다.
참조 아키텍처: 최신 데이터 스택의 오케스트레이션
오케스트레이터는 데이터 스택의 나머지 부분과 어떻게 통합됩니까? 건축물을 살펴보자 이 시리즈에서 본 모든 구성 요소를 결합한 완전한 참조 자료: 수집을 위한 Airbyte, 변환을 위한 dbt, 테이블 형식의 Apache Iceberg, Dagster는 중앙 오케스트레이터입니다.
Dagster를 Orchestrator로 사용하는 데이터 스택 아키텍처
| 레이어 | 요소 | 도구 | Dagster가 편성했다고요? |
|---|---|---|---|
| 음식물 섭취 | 소스 동기화 | 에어바이트/파이브트란 | 예(dagster-airbyte) |
| 저장 | 객체 스토리지 + 테이블 형식 | S3 + 아파치 아이스버그 | 예(IO 관리자) |
| 변환 | 선언적 SQL 모델 | DBT 코어/클라우드 | 예(dagster-dbt, 기본 자산) |
| 품질 | 데이터 테스트 및 검증 | DBT 테스트, 위대한 기대 | 예(자산 확인) |
| 피복재 | 쿼리 엔진 | 트리노/아테나/DuckDB | 아니요(주문형 쿼리) |
| 해석학 | 대시보드 및 보고서 | 메타베이스/태블로 | 아니요(데이터를 소비함) |
| ML | 기능 엔지니어링 + 교육 | Python + MLflow | 예(ML 자산) |
# definitions.py - Dagster Definitions: composizione del data stack completo
from dagster import Definitions, load_assets_from_modules
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance
from dagster_dbt import DbtCliResource
from dagster_aws.s3 import S3Resource
# Import dei moduli asset
from assets import ingestione, silver, gold, ml_features
# ============ CARICAMENTO ASSET ============
# Asset Airbyte: ogni connessione Airbyte diventa un Dagster asset
airbyte_assets = load_assets_from_airbyte_instance(
airbyte=AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
connection_filter=lambda meta: meta.name.startswith("prod_"),
)
# Asset dbt: ogni modello dbt diventa un Dagster asset con lineage completa
@dbt_assets(
manifest=Path("dbt_project/target/manifest.json"),
)
def dbt_all_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Asset Python custom
python_assets = load_assets_from_modules([ingestione, silver, gold, ml_features])
# ============ RISORSE ============
resources_prod = {
"airbyte": AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
"dbt": DbtCliResource(
project_dir="dbt_project",
profiles_dir="dbt_project",
target="prod",
),
"s3": S3Resource(region_name="eu-west-1"),
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
),
"slack": SlackResource(token=EnvVar("SLACK_BOT_TOKEN")),
}
# ============ COMPOSIZIONE FINALE ============
defs = Definitions(
assets=[
*airbyte_assets,
dbt_all_assets,
*python_assets,
],
resources=resources_prod,
jobs=[pipeline_vendite_job, pipeline_clienti_job, pipeline_ml_job],
schedules=[pipeline_vendite_schedule, pipeline_ml_weekly_schedule],
sensors=[nuovo_file_s3_sensor, airbyte_sync_sensor],
)
모니터링, 경고 및 관찰 가능성
모니터링이 없고 탑재된 악기가 없는 비행기와 같은 오케스트레이터입니다. 모니터링 SLA 보장, 문제 진단 및 의사소통에 매우 중요합니다. 비즈니스를 위한 데이터 플랫폼 현황입니다.
권장 모니터링 스택
오케스트레이션된 파이프라인을 위한 모니터링 도구
| 레이어 | 도구 | 모니터링 대상 |
|---|---|---|
| 인프라 측정항목 | 프로메테우스 + 그라파나 | CPU/RAM 작업자, 작업 대기열, 스케줄러 대기 시간 |
| 파이프라인 측정항목 | Airflow 측정항목/Dagster 이벤트/Prefect API | 작업 성공률, 기간, SLA 위반 |
| 중앙 집중식 로그 | ELK 스택/CloudWatch/로키 | 디버깅을 위한 각 작업 인스턴스의 로그 |
| 경고 | PagerDuty / OpsGenie | 중대한 실패, SLA 위반에 대한 에스컬레이션 |
| 팀 알림 | 슬랙/마이크로소프트 팀즈 | 성공/실패 알림, 일일 보고서 |
| 데이터 품질 | 초등/dbt 테스트/큰 기대 | 데이터 이상, 최신성, 행 수 |
# monitoring/alerting_setup.py
# Configurazione alerting Airflow con Slack e PagerDuty
from airflow.hooks.base import BaseHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime
def task_failure_alert(context):
"""
Callback chiamato su ogni task failure.
Inviamo notifica Slack con dettagli del fallimento.
"""
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
run_id = context["run_id"]
execution_date = context["execution_date"]
log_url = context["task_instance"].log_url
exception = context.get("exception", "N/A")
message = f"""
:x: *Task Fallito*
*DAG:* {dag_id}
*Task:* {task_id}
*Run ID:* {run_id}
*Data:* {execution_date.strftime('%Y-%m-%d %H:%M')}
*Errore:* `{str(exception)[:200]}`
*Log:* <{log_url}|Vedi Log Airflow>
"""
slack_op = SlackWebhookOperator(
task_id="slack_alert",
slack_webhook_conn_id="slack_data_ops",
message=message,
channel="#data-ops-alerts",
dag=context["dag"],
)
slack_op.execute(context)
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Callback per SLA breach: invia alert critico su PagerDuty."""
import requests
sla_tasks = ", ".join([f"{s.task_id}" for s in slas])
message = f"SLA BREACH su DAG {dag.dag_id}: task {sla_tasks} non completati in tempo"
# PagerDuty Events API
payload = {
"routing_key": "YOUR_PAGERDUTY_INTEGRATION_KEY",
"event_action": "trigger",
"payload": {
"summary": message,
"severity": "critical",
"source": "airflow",
"component": dag.dag_id,
"custom_details": {
"sla_tasks": sla_tasks,
"dag_id": dag.dag_id,
},
},
}
requests.post("https://events.pagerduty.com/v2/enqueue", json=payload)
# Utilizzo nel DAG
with DAG(
dag_id="pipeline_critica",
default_args={
"on_failure_callback": task_failure_alert, # Per ogni task
},
sla_miss_callback=sla_miss_callback, # Per SLA violations
schedule="0 5 * * *",
# ...
) as dag:
task_etl = PythonOperator(
task_id="etl_principale",
python_callable=esegui_etl,
sla=timedelta(hours=1), # Deve completare entro 1 ora
)
파이프라인 측정항목을 위한 Grafana 대시보드
Airflow는 StatsD를 통해(또는 airflow-prometheus-exporter를 사용하여 직접 Prometheus) 측정항목을 노출합니다. Grafana 대시보드에서 모니터링해야 할 가장 중요한 측정항목은 다음과 같습니다.
Grafana에서 모니터링할 공기 흐름 측정항목
- airflow.scheduler.tasks.starving: 사용 가능한 작업자가 없는 대기 중인 작업(확장성이 부족함을 나타냄)
- airflow.dagrun.duration.success: DAG 실행 기간 분포(SLA 지표인 P95 백분위수)
- airflow.task_instance.failures: 작업별 실패 추세(이상은 소스 문제를 나타냄)
- airflow.scheduler.heartbeat: 스케줄러 하트비트 빈도(중지되면 작업이 예약되지 않음)
- airflow.pool.open_slots: 풀의 여유 슬롯(포화 상태를 나타냄)
- airflow.dagrun.first_task_scheduling_delay: 일정과 첫 번째 작업 시작 사이의 지연
생산 분야의 강력한 파이프라인을 위한 모범 사례
1. 멱등성: 황금률
모든 업무는 다음과 같아야 합니다. 멱등성: 같은 것으로 여러 번 실행 매개변수는 동일한 결과를 생성해야 합니다. 이 속성은 기본이기 때문에 프로덕션에서는 재시도(자동 또는 수동)가 불가피합니다.
# SBAGLIATO: non idempotente (doppio insert)
def carica_dati_wrong(**context):
df = estrai_dati()
pg_hook.insert_rows("staging.ordini", df.values.tolist()) # APPEND!
# CORRETTO: idempotente (DELETE + INSERT per la data specifica)
def carica_dati_correct(**context):
execution_date = context["ds"] # "2025-01-15"
df = estrai_dati(execution_date)
with pg_hook.get_conn() as conn:
with conn.cursor() as cur:
# 1. Elimina i dati esistenti per questa partizione
cur.execute(
"DELETE FROM staging.ordini WHERE DATE(data_ordine) = %s",
[execution_date]
)
# 2. Inserisci i nuovi dati
execute_values(cur, """
INSERT INTO staging.ordini (ordine_id, data_ordine, importo)
VALUES %s
""", df[["ordine_id", "data_ordine", "importo"]].values.tolist())
conn.commit()
# OPPURE: usa INSERT ... ON CONFLICT per UPSERT atomico
def carica_dati_upsert(**context):
execution_date = context["ds"]
df = estrai_dati(execution_date)
pg_hook.insert_rows(
table="staging.ordini",
rows=df.values.tolist(),
target_fields=["ordine_id", "data_ordine", "importo"],
replace=True,
replace_index=["ordine_id"], # PK: upsert su ordine_id
)
2. 재시도 정책 및 지수 백오프
항상 합리적인 재시도 정책을 설정하세요. 일시적인 오류(네트워크 시간 초과, 데이터베이스 잠금, 서비스 일시적 사용 불가)은 종종 다음을 통해 해결됩니다. 몇 분 후에 다시 시도하세요. 지수 백오프로 기존 시스템의 과부하 방지 어려움에 처해 있습니다.
# Retry policy consigliata per diversi tipi di task
from datetime import timedelta
# Task di estrazione da API esterne: retry aggressivi con backoff
default_args_api = {
"retries": 5,
"retry_delay": timedelta(minutes=1),
"retry_exponential_backoff": True, # 1, 2, 4, 8, 16 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(minutes=30),
}
# Task di trasformazione dbt: retry conservativi
default_args_dbt = {
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
# Task di notifica: no retry (evita spam di alert)
default_args_notify = {
"retries": 0,
"execution_timeout": timedelta(minutes=2),
}
3. 센서를 통한 종속성 관리
고정된 시간으로 파이프라인을 예약하는 것이 항상 가능한 것은 아닙니다. 때로는 기다려야 할 때도 있다 파일이 S3에 도착하거나, 다른 파이프라인이 종료되거나, 외부 데이터가 가능합니다. 그만큼 센서 오퍼레이터 Airflow(및 Dagster)가 솔루션입니다.
# Sensor: attendi che il file di input sia disponibile prima di procedere
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
with DAG("pipeline_dipendente", ...) as dag:
# Attendi file S3 (polling ogni 5 minuti, max 2 ore)
wait_for_file = S3KeySensor(
task_id="wait_for_input_file",
bucket_name="my-data-lake",
bucket_key="inputs/vendite_{{ ds_nodash }}.csv",
aws_conn_id="aws_default",
poke_interval=300, # Controlla ogni 5 min
timeout=7200, # Timeout dopo 2 ore
mode="reschedule", # Libera il worker mentre aspetta
)
# Attendi che un'altra DAG abbia finito
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_erp_sync",
external_dag_id="sincronizzazione_erp",
external_task_id="verify_sync_complete",
allowed_states=["success"],
failed_states=["failed", "skipped"],
poke_interval=120,
timeout=3600,
mode="reschedule",
)
estrai = PythonOperator(task_id="estrai_dati", ...)
# Pipeline partira solo quando entrambi i sensor sono verdi
[wait_for_file, wait_for_upstream] >> estrai
안티 패턴: 피해야 할 일반적인 실수
- 변경 가능한 전역 상태가 있는 작업: Python 전역 변수를 사용하지 마세요. DAG에서. DAG는 스케줄러에 의해 자주 구문 분석됩니다. 전역 변수 예측할 수 없는 행동을 만들어냅니다. XCom, 변수 또는 TaskFlow API를 사용하세요.
- DAG 파일의 비즈니스 논리: DAG 파일에는 다음만 포함되어야 합니다. 오케스트레이션의 정의. 비즈니스 로직은 별도의 Python 모듈에 들어갑니다. 작업에서 가져왔습니다. 테스트를 용이하게 하고 구문 분석 시간을 단축합니다.
- 대규모 데이터 세트를 위한 XCom: XCom은 작은 메타데이터용으로 설계되었습니다. (개수, 파일 경로, 플래그). 작업 간에 DataFrame을 전달하는 데 사용하지 마세요. 준비 테이블 또는 S3 파일.
- 세분성이 없는 모놀리식 파이프라인: 모든 작업을 수행하는 단일 작업 (추출 + 변환 + 로드)는 디버깅을 불가능하게 만듭니다. 모든 작업 논리적으로 구별되는 작업은 별도의 작업이어야 합니다.
-
일정이 빡빡할 경우 따라잡기=참: 캐치업을 활성화한 경우
시간별 일정이 있는 파이프라인을 사용하고 30일 휴식 후 켜면 720을 시작합니다.
동시에 처형. 항상 사용
catchup=False기본적으로 백필을 명시적으로 관리합니다. - 하드코딩된 비밀: 비밀번호, API 키 또는 연결을 입력하지 마세요. 문자열을 코드에 직접 입력합니다. Airflow 연결, Dagster 리소스 사용 또는 AWS Secrets Manager 또는 HashiCorp Vault와 통합된 Prefect Blocks.
4. 파이프라인 테스트
테스트되지 않은 파이프라인과 확보된 기술 부채. 테스트를 구성하는 방법은 다음과 같습니다. Airflow DAG 및 Dagster 자산의 경우.
# tests/test_pipeline_vendite.py
import pytest
from unittest.mock import MagicMock, patch
from datetime import date
import pandas as pd
# ============ TEST AIRFLOW ============
from airflow.models import DagBag
def test_dag_integrity():
"""Verifica che i DAG siano validi e non abbiano cicli."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
assert len(dagbag.import_errors) == 0, \
f"Errori di import: {dagbag.import_errors}"
dag = dagbag.get_dag("pipeline_vendite_giornaliera")
assert dag is not None, "DAG non trovato"
assert dag.schedule == "0 5 * * *"
def test_dag_task_count():
"""Verifica il numero di task nel DAG."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("pipeline_vendite_giornaliera")
assert len(dag.tasks) >= 5, "Troppo pochi task nel DAG"
# ============ TEST DAGSTER ============
from dagster import materialize, build_asset_context
from assets import raw_ordini, silver_ordini_arricchiti
def test_raw_ordini_asset():
"""Test dell'asset raw_ordini con mock delle risorse."""
mock_df = pd.DataFrame({
"ordine_id": [1, 2, 3],
"customer_id": [101, 102, 103],
"importo_totale": [150.0, 89.99, 220.5],
"data_ordine": ["2025-01-15"] * 3,
})
with patch("assets.ingestione.PostgresResource") as mock_pg:
mock_pg.return_value.execute_query.return_value = mock_df
result = materialize(
[raw_ordini],
resources={"postgres_erp": mock_pg.return_value},
partition_key="2025-01-15",
)
assert result.success
asset_value = result.output_for_node("raw_ordini")
assert len(asset_value) == 3
assert (asset_value["importo_totale"] >= 0).all()
# ============ TEST PREFECT ============
from flows.pipeline_vendite import estrai_ordini, valida_e_arricchisci
from prefect.testing.utilities import prefect_test_harness
def test_valida_e_arricchisci():
"""Test del task di validazione con dati di test."""
ordini = pd.DataFrame({
"ordine_id": [1, 2],
"customer_id": [101, 102],
"importo_totale": [100.0, 200.0],
})
clienti = pd.DataFrame({
"id": [101, 102],
"citta": ["Milano", "Roma"],
"segmento": ["Premium", "Standard"],
})
with prefect_test_harness():
result = valida_e_arricchisci.fn(ordini, clienti)
assert len(result) == 2
assert "segmento" in result.columns
assert result["segmento"].isna().sum() == 0
def test_valida_importi_negativi():
"""Test che importi negativi generino eccezione."""
ordini = pd.DataFrame({
"ordine_id": [1],
"importo_totale": [-50.0], # NEGATIVO
"customer_id": [101],
})
clienti = pd.DataFrame({"id": [101], "citta": ["Milano"], "segmento": ["Standard"]})
with prefect_test_harness():
with pytest.raises(ValueError, match="Trovati importi negativi"):
valida_e_arricchisci.fn(ordini, clienti)
결론 및 권장사항
성숙한 데이터 스택의 오케스트레이션 및 백본입니다. 오케스트레이터 없이 가장 정교한 파이프라인이라도 부서지기 쉽고 불투명하며 관리하다. 우리는 2025년 시장의 세 가지 주요 도구를 살펴보고 어떻게 각각은 서로 다른 요구에 응답합니다.
시나리오에 대한 권장 사항
- 지금부터 시작하는 PMI: 당신이 선택 반장 3 프리펙트 클라우드와 함께 무료 등급. 가치 창출 시간이 최소화되고 학습 곡선이 낮으며 통과할 수 있습니다. 복잡성이 필요할 때 더 많은 엔터프라이즈 솔루션을 사용하세요.
- DBT 스택을 사용한 팀: 대그스터 그리고 자연선택. dagster-dbt를 통한 dbt와의 기본 통합 및 모델 간의 자동 계보 dbt 및 자산 Dagster는 팀의 인지 부하를 크게 줄여줍니다.
- 기존 Airflow 기술을 갖춘 기업: 에어플로우 3.0 천문학자(관리) 또는 MWAA. 팀이 이미 생산적이라면 마이그레이션할 이유가 없습니다. Airflow 및 버전 3.0은 Dagster 및 Prefect와의 격차를 상당 부분 해소했습니다.
- 장기 실행 애플리케이션 워크플로: 고려하다 일시적인 선호하는 데이터 오케스트레이터와 함께 사용하세요. 그들은 보완적인 도구입니다. 대안이 아닙니다.
시리즈의 다음 기사에서는 다음 내용을 다룹니다.제조업에 적용된 AI: IoT 센서인 Apache Kafka를 사용하여 예측 유지 관리 시스템을 구축하는 방법을 알아봅니다. 스트리밍 머신 데이터 및 ML 모델을 통해 오류가 발생하기 전에 이를 예측합니다. 우리가 구축하고 있는 데이터 스택이 어떻게 데이터 스택이 되는지 보여주는 구체적인 사용 사례입니다. 회사 내 인공지능의 기초.
Orchestrator를 프로덕션에 투입하기 위한 체크리스트
- 모든 작업은 멱등적입니다. 다시 실행해도 중복이 생성되지 않습니다.
- 각 작업에 대해 지수 백오프로 구성된 재시도 정책
- 하드코딩되지 않은 연결/블록/리소스를 통해 관리되는 비밀
- 인프라 지표에 대해 Grafana 또는 이에 상응하는 활성 모니터링
- 실패 및 SLA 위반에 대해 Slack 또는 PagerDuty에 대한 경고
- CI/CD 파이프라인에서 DAG/자산의 무결성 테스트
- 캐치업은 기본적으로 비활성화되어 있으며 백필은 명시적으로 처리됩니다.
- 파이프라인 정의 파일에서 비즈니스 로직을 분리합니다.
- 작업자가 직접 액세스하지 않고도 액세스 가능한 중앙 집중식 로그
- 인라인 문서: 각 DAG/자산에는 명확한 설명이 있습니다.
추가 정보에 대한 유용한 링크
- 이전 기사: ETL 대 최신 ELT: dbt, Airbyte 및 Fivetran
- 다음 기사: 제조 분야의 AI: 예측 유지 관리 및 디지털 트윈
- 관련 시리즈: 비즈니스용 MLOps - MLflow를 사용하여 프로덕션 중인 AI 모델
- 관련 시리즈: 비즈니스 LLM: RAG Enterprise, 미세 조정 및 가드레일







