Pipeline Orchestration: Airflow, Dagster a Prefect
Představte si, že máte bezchybný zásobník dat: dům u jezera na ledovci Apache, transformace elegantní konektory dbt, Airbyte, které synchronizují data z tuctu zdrojů. Všechno perfektní, dokud se vás někdo nezeptá: „Ale kdo to všechno řídí? začíná úloha příjmu před transformacemi? Kdo vás upozorní, když se něco pokazí ve 3 hodiny ráno?" Odpověď jeorchestrátor potrubí.
Orchestrace a neviditelné lepidlo zásobníku dat: složka, která koordinuje provádění stovek vzájemně závislých úkolů, řídí opakování v případě selhání, monitoruje stav potrubí a vytváří auditní stopu, kterou podnik vyžaduje. Bez orchestrátoru jsou datové kanály cron skripty, které drží pohromadě naděje.
V roce 2025 dominují kraji orchestrátorů tři hlavní platformy: Apache Airflow 3.0, veterán, který definoval samotný koncept DAG orchestrace; Dagster 1.x, vyzyvatel, který koncept představil softwarově definovaných aktiv; A Prefekt 3, řešení navržené jako první v Pythonu pro vývojářskou zkušenost. V tomto článku je analyzujeme do hloubky pomocí kódu skutečné a upřímné srovnání, které vám pomůže vybrat ten správný pro váš kontext.
Co se dozvíte v tomto článku
- Vnitřní architektura Apache Airflow 3.0: Plánovač, Exekutor, Pracovník, Metadata DB
- Jak napsat kompletní Python DAG pro end-to-end ETL potrubí
- Paradigma softwarově definovaných aktiv společnosti Dagster a proč mění způsob, jakým přemýšlíme o kanálech
- Model Flow/Task společnosti Prefect 3 a jeho zjednodušený model nasazení
- Podrobné srovnání mezi třemi nástroji se srovnávací tabulkou podle případu použití
- Temporal jako alternativa pro trvalé a dlouhotrvající pracovní postupy
- Referenční architektura pro integraci orchestrace s dbt, Airbyte a data lakehouse
- Monitorování, upozorňování a osvědčené postupy pro idempotentní potrubí ve výrobě
Články ze série Data Warehouse, AI a Digital Transformation Series
| # | Položka | Soustředit |
|---|---|---|
| 1 | Evoluce datového skladu: od SQL serveru k Data Lakehouse | Architektury a platformy |
| 2 | Data Mesh a decentralizovaná architektura | Správa a vlastnictví |
| 3 | ETL vs moderní ELT: dbt, Airbyte a Fivetran | Transformační potrubí |
| 4 | Jste zde - Pipeline Orchestrace | Proudění vzduchu, Dagster, Prefekt |
| 5 | AI ve výrobě: Prediktivní údržba | IoT, ML, digitální dvojče |
| 6 | AI ve financích: detekce podvodů a hodnocení kreditů | ML v reálném čase |
| 7 | AI v maloobchodě: Prognóza poptávky a doporučení | ML použito |
protože orchestrace je zásadní
Než se dostaneme k technickým detailům, stojí za to přesně pochopit, jaký problém řeší orchestrátor. Typický datový kanál pro střední až velké společnosti zahrnuje:
- Extrakce z CRM, ERP, transakční databáze, externí API (10-50 zdrojů)
- Nahrání do datového jezera (Airflow spouští Airbyte nebo Fivetran)
- dbt transformace (30-200 modelů se složitými závislostmi)
- Aktualizace datových tržišť a agregačních tabulek
- Export do nástrojů BI (Metabase, Tableau, Power BI)
- Aktualizace modelů ML (funkce inženýrství + přeškolení)
Každý z těchto kroků má závislostí (B nemůže odejít dříve, než je A dokončeno), ALS (zprávy musí být připraveny do 8:00), e požadavky na kvalitu (Pokud jsou zdrojová data prázdná, nespouštějte transformace). To vše zvládnete pomocí samostatných cron skriptů a zaručenou katastrofou.
Co dělá Pipeline Orchestrator
| Funkčnost | Popis |
|---|---|
| Správa závislostí | Definuje pořadí provádění úloh a spravuje závislosti kanálu |
| Plánování | Plánování Cron, řízené událostmi, s ohledem na data (spouštění při aktualizaci aktiv) |
| Opakování a zpracování chyb | Automatické opakování s exponenciálním couváním, správa dílčích selhání |
| Rovnoběžnost | Paralelní provádění nezávislých úkolů pro optimalizaci času |
| Sledování | Centralizovaný dashboard, protokol úloh, monitorování SLA, upozornění |
| Zásypy | Opakované spuštění historie se spustí, když se změní logika potrubí |
| Audit Trail | Kompletní historie každého spuštění pro zajištění souladu a ladění |
| Parametrizace | Variabilní konfigurace pro prostředí (dev, staging, prod) a ruční spuštění |
Apache Airflow 3.0: Vylepšený veterán
Vytvořil Airbnb v roce 2014 a daroval Apache Software Foundation v roce 2016, Apache Proudění vzduchu a stal se de facto standardem pro orchestraci datového kanálu. S dál 35 000 hvězdiček na GitHubu a komunita, která čítá stovky přispěvatelů, Airflow a používají je tisíce společností po celém světě, od startupů do velkých podniků.
V roce 2025 prošel Airflow svým nejvýznamnějším vývojem s vydáním verze 3.0, která zavádí architekturu klient-server s Provedení úkolu Rozhraní, nativní plánování s ohledem na aktiva (zděděné od Dagster), DAG verzování a zcela přepracované uživatelské rozhraní. A už to není jen plánovač úloh: je to a moderní orchestrační platforma.
Architektura Apache Airflow
Pochopení architektury Airflow je zásadní pro správné nasazení a diagnostiku problémy ve výrobě. Existuje pět hlavních složek:
Architektonické komponenty proudění vzduchu
| Komponent | Role | Technologie |
|---|---|---|
| webový server | Webové uživatelské rozhraní pro monitorování, ladění, ruční spouštění DAG | Baňka + Gunicorn (2.x), FastAPI (3.0) |
| Plánovač | Analyzuje DAG, naplánuje úkoly, zařadí je do fronty exekutorů | Démon Python, HA s více instancemi |
| Vykonavatel | Provádí úkoly: LocalExecutor (jeden uzel), CeleryExecutor, KubernetesExecutor | Celer + Redis/RabbitMQ nebo K8s |
| Dělníci | Skutečně zpracovávat úkoly (pouze s CeleryExecutor/KubernetesExecutor) | Celer worker nebo K8s Pod |
| Databáze metadat | Stav všech běhů DAG, instancí úloh, proměnných, připojení | PostgreSQL (doporučeno v prod), MySQL |
| Procesor DAG | Novinka v Airflow 3.0: DAG parser oddělený od plánovače | Procesní fond Pythonu |
Koncept DAG
Srdce Airflow a DAG (Směrovaný acyklický graf): orientovaný graf bez cyklů, které definují pořadí provádění úkolů. Každý uzel grafu je úkol, každý oblouk je závislost. Graf musí být acyklický (žádný cyklus), proto nemůžete vytvářet kruhové závislosti.
Úkoly jsou realizovány prostřednictvím Operátor: Python třídy, které jsou abstraktní druh práce. Airflow zahrnuje stovky vestavěných operátorů (PythonOperator, BashOperator, PostgresOperator, SparkSubmitOperator, dbtCloudOperator...) a komunita publikuje tolik prostřednictvím balíčků poskytovatelů.
Příklad kódu: Dokončete ETL DAG pro prodejní kanál
Podívejme se na kompletní a realistický DAG orchestrující ETL potrubí: extrakt z PostgreSQL, načítání do stagingu, transformace dbt a upozornění týmu.
# dags/pipeline_vendite.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import pandas as pd
import logging
logger = logging.getLogger(__name__)
# Configurazione DAG con default_args per tutti i task
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"email": ["data-team@azienda.it"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True, # 5, 10, 20 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(hours=2),
}
with DAG(
dag_id="pipeline_vendite_giornaliera",
default_args=default_args,
description="Estrai vendite da ERP, carica in DWH, aggrega con dbt",
schedule="0 5 * * *", # Ogni giorno alle 05:00 UTC
start_date=datetime(2025, 1, 1),
end_date=None,
catchup=False, # Non eseguire run passate
max_active_runs=1, # Una sola esecuzione alla volta
tags=["vendite", "etl", "produzione"],
doc_md="""
## Pipeline Vendite Giornaliera
Estrae ordini dall'ERP (PostgreSQL), li carica in staging,
lancia le trasformazioni dbt e notifica il team su Slack.
**SLA**: Completamento entro le 07:30 UTC
**Owner**: Team Data Engineering
""",
) as dag:
# ============ TASK 1: VERIFICA DISPONIBILITA SORGENTE ============
def check_source_data(**context):
"""Verifica che ci siano dati nella finestra di esecuzione."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
execution_date = context["ds"] # "2025-01-15"
count = pg_hook.get_first("""
SELECT COUNT(*)
FROM orders
WHERE DATE(created_at) = %s
""", parameters=[execution_date])[0]
logger.info(f"Trovati {count} ordini per {execution_date}")
if count == 0:
logger.warning("Nessun dato trovato, skip pipeline")
return "notify_no_data" # Branch: salta elaborazione
return "extract_task_group.extract_orders"
check_source = BranchPythonOperator(
task_id="check_source_data",
python_callable=check_source_data,
)
# ============ TASK GROUP: ESTRAZIONE ============
with TaskGroup("extract_task_group") as extract_group:
def extract_orders(**context):
"""Estrae ordini dall'ERP e li salva in staging."""
execution_date = context["ds"]
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
df = pg_hook.get_pandas_df("""
SELECT
o.id AS ordine_id,
o.customer_id,
o.created_at AS data_ordine,
o.status,
SUM(oi.qty * oi.unit_price * (1 - COALESCE(oi.discount, 0)))
AS importo_totale,
COUNT(oi.id) AS num_righe
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = %(exec_date)s
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""", parameters={"exec_date": execution_date})
# Validazione dati estratti
assert df["importo_totale"].ge(0).all(), "Importi negativi trovati!"
assert df["ordine_id"].is_unique, "Ordini duplicati nell'estrazione!"
# Salva in XCom per passare al task successivo
# Per dataset grandi, usa staging table invece di XCom
context["ti"].xcom_push(key="row_count", value=len(df))
# Carica in tabella staging del DWH
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
dwh_hook.insert_rows(
table="staging.stg_ordini_raw",
rows=df.values.tolist(),
target_fields=df.columns.tolist(),
replace=True,
replace_index=["ordine_id"],
)
logger.info(f"Caricati {len(df)} ordini in staging")
extract_orders_task = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders,
)
def extract_customers(**context):
"""Estrae snapshot clienti aggiornati."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
df_customers = pg_hook.get_pandas_df("""
SELECT id, email, nome, cognome, citta, segmento,
DATE(updated_at) AS data_aggiornamento
FROM customers
WHERE DATE(updated_at) >= CURRENT_DATE - INTERVAL '1 day'
""")
if len(df_customers) > 0:
dwh_hook.insert_rows(
table="staging.stg_clienti_raw",
rows=df_customers.values.tolist(),
target_fields=df_customers.columns.tolist(),
replace=True,
replace_index=["id"],
)
extract_customers_task = PythonOperator(
task_id="extract_customers",
python_callable=extract_customers,
)
# I due extract girano in PARALLELO
[extract_orders_task, extract_customers_task]
# ============ TASK GROUP: TRASFORMAZIONI DBT ============
with TaskGroup("dbt_task_group") as dbt_group:
# dbt run sullo staging layer
dbt_staging = BashOperator(
task_id="dbt_run_staging",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:staging --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
# dbt run sul marts layer (dipende dallo staging)
dbt_marts = BashOperator(
task_id="dbt_run_marts",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:marts --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
# dbt test per validare la qualità
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt test --select tag:staging tag:marts --target prod
""",
)
dbt_staging >> dbt_marts >> dbt_test
# ============ NOTIFICHE ============
notify_success = SlackWebhookOperator(
task_id="notify_success",
slack_webhook_conn_id="slack_data_team",
message="""
:white_check_mark: *Pipeline Vendite Completata*
Data: {{ ds }}
Ordini elaborati: {{ ti.xcom_pull(task_ids='extract_task_group.extract_orders', key='row_count') }}
Durata: {{ dag_run.get_task_instance('dbt_task_group.dbt_test').duration | round(1) }}s
""",
trigger_rule="all_success",
)
notify_no_data = SlackWebhookOperator(
task_id="notify_no_data",
slack_webhook_conn_id="slack_data_team",
message=":information_source: *Pipeline Vendite* - Nessun dato per {{ ds }}, esecuzione saltata",
)
notify_failure = SlackWebhookOperator(
task_id="notify_failure",
slack_webhook_conn_id="slack_data_team",
message=":x: *Pipeline Vendite FALLITA* - Data: {{ ds }} - Controlla Airflow UI!",
trigger_rule="one_failed",
)
# ============ DIPENDENZE DAG ============
check_source >> [extract_group, notify_no_data]
extract_group >> dbt_group >> notify_success
dbt_group >> notify_failure
Tento DAG demonstruje základní vzorce proudění vzduchu: použití TaskGroup pro
skupinové úkoly, BranchPythonOperator podle podmíněné logiky,
XCom předávat data mezi úkoly a spouštět pravidla pro správu
upozornění na úspěch a neúspěch.
Co je nového v Apache Airflow 3.0 (2025)
- Rozhraní pro provádění úloh: Nové rozhraní klient-server API, které odděluje pracovníka od serveru Airflow API a zlepšuje zabezpečení a škálovatelnost
- Plánování podle aktiv: DAG mohou být nyní spouštěny aktualizací aktiv (datové sady), nikoli pouze plánem cron
- Verze DAG: Každé spuštění je spojeno s verzí DAG při spuštění, což eliminuje nekonzistence během nasazení
- Human-in-the-Loop: Airflow 3.1 zavádí pracovní postupy, které před pokračováním čekají na schválení člověkem
- React UI: Kompletně přepracované rozhraní, rychlejší a intuitivnější
- Samostatný DAG procesor: Analýza DAG probíhá ve vyhrazeném procesu, což snižuje zatížení plánovače
Dagster: The Software-Defined Assets Paradigm
Dagster, narozený v roce 2018 z Elementl (nyní Dagster Labs), zavedl změnu paradigmatu radikální v orchestraci: místo přemýšlení úkoly plnit, přemýšlíme aktiva vyrábět. Dílo a jakékoli datové artefakty: tabulka v DWH, model ML, soubor Parquet, vygenerovaná zpráva.
Tento přístup, tzv Softwarově definovaná aktiva (SDA), revolucionizoval
zkušenosti vývojáře. V Průtok vzduchu definujte "spustit tento skript v 5:00". V Dagsteru
definovat „Chci stůl gold.fatturato_mensile je vždy aktuální,
a ví, jak se aktualizovat." Rozdíl se zdá nepatrný, ale mění vše: automatická linie,
jednoduché testování, okamžité pochopení toho, co existuje v datovém zásobníku.
V roce 2025 Dagster 1.9 dosáhl zralosti s rámcem komponent (GA v říjnu 2025), který umožňuje popisovat celé potrubí jako deklarativní konfigurace a katalog pokročilé, které nabízí bezprecedentní přehled o stavu každého aktiva v systému.
Klíčové pojmy Dagster
Terminologie Dagster
| Pojem | Popis | Ekvivalent proudění vzduchu |
|---|---|---|
| Aktiva | Datový artefakt, který potrubí produkuje (tabulka, model, soubor) | Nemá přímý ekvivalent |
| @aktiva | Dekorátor Pythonu, který definuje, jak vytvořit aktivum | PythonOperator (omezenější) |
| Práce | Výběr aktiv/operací ke společnému provozu | DAG |
| Op | Obecná úloha bez explicitního výstupu dat | Operátor |
| I Manažer | Spravuje způsob ukládání a čtení podkladů (S3, BigQuery, Snowflake...) | Nemá to ekvivalent |
| Zdroje | Sdílená připojení a klienti (databáze, API) | Připojení + háček |
| Senzor | Spouštěče řízené událostmi (dotazování systému souborů, rozhraní API, události) | Operátor senzoru |
| Naplánovat | Časově založené spouštěče pro úlohy | Harmonogram DAG |
| Příčky | Rozdělení majetku podle data, kategorie, regionu | PartitionedSchedule (složitější) |
Příklad kódu: Asset-Based Pipeline s Dagster
Implementujeme stejný prodejní kanál jako Airflow, ale s paradigmatem Dagster. Okamžitě si všimnete, jak je závislost mezi aktivy explicitní a jak testování se stává jednoduchým.
# pipeline_vendite/assets.py
import pandas as pd
from dagster import (
asset,
AssetIn,
AssetExecutionContext,
MetadataValue,
Output,
DailyPartitionsDefinition,
MaterializeResult,
)
from dagster_dbt import dbt_assets, DbtCliResource
from resources import postgres_erp, postgres_dwh, slack_resource
# Partizionamento giornaliero - Dagster gestisce il backfill automaticamente
daily_partitions = DailyPartitionsDefinition(start_date="2025-01-01")
# ============ ASSET 1: ORDINI GREZZI DALL'ERP ============
@asset(
name="raw_ordini",
group_name="ingestione",
partitions_def=daily_partitions,
description="Ordini grezzi estratti dall'ERP PostgreSQL",
metadata={
"source": "ERP PostgreSQL",
"owner": "data-engineering@azienda.it",
},
compute_kind="python",
)
def raw_ordini(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Estrae gli ordini per la partizione corrente."""
partition_date = context.partition_key # "2025-01-15"
df = postgres_erp.execute_query(f"""
SELECT
o.id AS ordine_id,
o.customer_id,
o.created_at AS data_ordine,
o.status,
SUM(oi.qty * oi.unit_price) AS importo_totale
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = '{partition_date}'
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""")
# Metadata allegati all'asset: visibili nella UI Dagster
return Output(
df,
metadata={
"num_records": MetadataValue.int(len(df)),
"importo_totale": MetadataValue.float(df["importo_totale"].sum()),
"preview": MetadataValue.md(df.head(5).to_markdown()),
},
)
# ============ ASSET 2: CLIENTI AGGIORNATI ============
@asset(
name="raw_clienti",
group_name="ingestione",
partitions_def=daily_partitions,
compute_kind="python",
)
def raw_clienti(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Estrae snapshot clienti aggiornati."""
partition_date = context.partition_key
df = postgres_erp.execute_query(f"""
SELECT id, email, nome, cognome, citta, segmento
FROM customers
WHERE DATE(updated_at) = '{partition_date}'
""")
return Output(df, metadata={"num_records": MetadataValue.int(len(df))})
# ============ ASSET 3: ORDINI ARRICCHITI (Silver Layer) ============
@asset(
name="silver_ordini_arricchiti",
group_name="silver",
partitions_def=daily_partitions,
ins={
"raw_ordini": AssetIn("raw_ordini"),
"raw_clienti": AssetIn("raw_clienti"),
},
description="Ordini joinati con info cliente, validati",
compute_kind="python",
)
def silver_ordini_arricchiti(
context: AssetExecutionContext,
raw_ordini: pd.DataFrame,
raw_clienti: pd.DataFrame,
postgres_dwh: PostgresResource,
) -> MaterializeResult:
"""Arricchisce e valida gli ordini."""
df = raw_ordini.merge(
raw_clienti[["id", "citta", "segmento"]],
left_on="customer_id",
right_on="id",
how="left",
)
# Validazioni
assert df["importo_totale"].ge(0).all(), "Importi negativi!"
assert df["ordine_id"].is_unique, "Ordini duplicati!"
# Carica nel DWH
postgres_dwh.load_dataframe(
df=df,
table="silver.ordini_arricchiti",
partition_col="data_ordine",
partition_value=context.partition_key,
)
return MaterializeResult(
metadata={
"num_records": MetadataValue.int(len(df)),
"clienti_senza_match": MetadataValue.int(df["segmento"].isna().sum()),
}
)
# ============ ASSET 4: MODELLI DBT (Gold Layer) ============
# Dagster ha integrazione nativa con dbt via dagster-dbt
@dbt_assets(
manifest=dbt_manifest_path, # manifest.json generato da dbt
select="tag:marts", # Solo i modelli gold
)
def dbt_marts_assets(context: AssetExecutionContext, dbt: DbtCliResource):
"""Esegue i modelli dbt del layer gold."""
yield from dbt.cli(["run", "--select", "tag:marts"], context=context).stream()
yield from dbt.cli(["test", "--select", "tag:marts"], context=context).stream()
# ============ JOB: COMPOSIZIONE ============
# In Dagster, un Job seleziona quali asset materializzare
from dagster import define_asset_job, ScheduleDefinition
pipeline_vendite_job = define_asset_job(
name="pipeline_vendite_giornaliera",
selection=[
"raw_ordini",
"raw_clienti",
"silver_ordini_arricchiti",
"dbt_marts_assets*", # Tutti gli asset dbt marts
],
)
pipeline_vendite_schedule = ScheduleDefinition(
job=pipeline_vendite_job,
cron_schedule="0 5 * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
Bezprostřední výhoda tohoto přístupu je viditelná: každé aktivum je explicitně deklarováno
vaše příspěvky (AssetIn), Dagster automaticky vytvoří graf
závislosti a uživatelské rozhraní zobrazuje kompletní řadu od zdroje po konečný produkt. Testování
se stává triviální, protože každá funkce je čistě funkce Pythonu.
IO Manager: klíč k integraci Dagster
IO Manager v Dagster definuje, jak jsou zdroje získávány uloženo a načteno mezi úkoly. Dagster obsahuje IO Manager pro S3 (Parquet, CSV), Snowflake, BigQuery, DuckDB, Pandy a mnoho dalších. Můžete nastavit různé správce IO pro různá prostředí bez změnit kód majetku.
# resources.py - Configurazione Resources e IO Managers
from dagster import EnvVar
from dagster_aws.s3 import S3PickleIOManager, S3Resource
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster_snowflake_pandas import SnowflakePandasIOManager
# Configurazione multi-environment
resources_by_env = {
"dev": {
# In dev: salva su DuckDB locale (costo zero)
"io_manager": DuckDBPandasIOManager(
database="./dev_lakehouse.duckdb",
),
"postgres_erp": PostgresResource(
host="localhost",
port=5432,
database="erp_dev",
),
},
"staging": {
# In staging: usa S3 + Parquet
"io_manager": S3PickleIOManager(
s3_resource=S3Resource(),
s3_bucket="my-staging-lakehouse",
s3_prefix="dagster/",
),
},
"prod": {
# In prod: usa Snowflake
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
database="DWH_PROD",
schema="DAGSTER_ASSETS",
),
},
}
Prefekt 3: Python-First pro vývojáře
Prefect, založený v roce 2018, se od začátku hlásí k jiné filozofii: vytváření orchestrace co nejjednodušší a Pythonic. Verze 3, vydaná v roce 2024 a dozrála v roce 2025, přináší tuto vizi její uskutečnění: jakoukoli funkci Pythonu může se stát řízeným úkolem s přidáním jediného dekoratéra.
Il Model toku/úkolu Prefekt eliminuje velkou část konfigurace požadoval Airflow a Dagster. Není potřeba explicitně definovat DAG, nejsou potřeba žádné soubory samostatné konfigurační soubory, nemusíte se učit proprietární DSL: napište prostý Python a prefekt udělá zbytek.
V roce 2025 byl představen Prefect 3 Prefektské incidenty pro řízení strukturované přerušení služby, automatizace s metrickými spouštěči (nejen o událostech provádění) a nativní integraci s Modal pro provádění škálovatelné bez serveru. Prefect Cloud nabízí velkorysý bezplatný plán, který to umožňuje přístupné i malým týmům.
Příklad kódu: Flow Prefect for Sales Pipeline
# flows/pipeline_vendite.py
import pandas as pd
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret
from prefect.filesystems import S3
from prefect_slack import SlackWebhook
from datetime import timedelta, date
from typing import Optional
# ============ TASK (funzioni Python con decoratore) ============
@task(
name="Estrai Ordini ERP",
description="Estrae ordini completati per la data specificata",
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash, # Cache basata su input
cache_expiration=timedelta(hours=1), # Valida per 1 ora
tags=["estrazione", "erp"],
)
def estrai_ordini(data: date) -> pd.DataFrame:
"""Estrae ordini dall'ERP per la data specificata."""
logger = get_run_logger()
# Leggi credenziali da Prefect Blocks (gestione sicura secrets)
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
query = """
SELECT o.id AS ordine_id, o.customer_id,
o.created_at, o.status,
SUM(oi.qty * oi.unit_price) AS importo_totale
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = :data
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
"""
with engine.connect() as conn:
df = pd.read_sql(query, conn, params={"data": str(data)})
logger.info(f"Estratti {len(df)} ordini per {data}")
return df
@task(
name="Estrai Clienti ERP",
retries=2,
retry_delay_seconds=30,
tags=["estrazione", "erp"],
)
def estrai_clienti(data: date) -> pd.DataFrame:
"""Estrae clienti aggiornati."""
logger = get_run_logger()
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df = pd.read_sql("""
SELECT id, email, nome, cognome, citta, segmento
FROM customers
WHERE DATE(updated_at) = :data
""", conn, params={"data": str(data)})
logger.info(f"Estratti {len(df)} clienti aggiornati")
return df
@task(
name="Valida e Arricchisci Ordini",
retries=1,
)
def valida_e_arricchisci(
ordini: pd.DataFrame,
clienti: pd.DataFrame,
) -> pd.DataFrame:
"""Valida e arricchisce gli ordini con dati cliente."""
logger = get_run_logger()
# Validazioni
if ordini["importo_totale"].lt(0).any():
raise ValueError("Trovati importi negativi nel dataset ordini")
if not ordini["ordine_id"].is_unique:
raise ValueError("Trovati ordini duplicati")
# Join
df = ordini.merge(
clienti[["id", "citta", "segmento"]],
left_on="customer_id",
right_on="id",
how="left",
)
match_rate = (1 - df["segmento"].isna().mean()) * 100
logger.info(f"Match rate clienti: {match_rate:.1f}%")
return df
@task(name="Carica nel DWH", retries=2)
def carica_dwh(df: pd.DataFrame, schema: str, table: str):
"""Carica il dataframe nel DWH."""
conn_string = Secret.load("dwh-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df.to_sql(
name=table,
con=conn,
schema=schema,
if_exists="replace",
index=False,
)
return len(df)
@task(name="Esegui dbt", retries=1)
def esegui_dbt(target: str = "prod"):
"""Lancia dbt run e test sui modelli marts."""
import subprocess
result = subprocess.run(
["dbt", "run", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if result.returncode != 0:
raise RuntimeError(f"dbt run fallito:\n{result.stderr}")
test_result = subprocess.run(
["dbt", "test", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if test_result.returncode != 0:
raise RuntimeError(f"dbt test fallito:\n{test_result.stderr}")
# ============ FLOW PRINCIPALE ============
@flow(
name="Pipeline Vendite Giornaliera",
description="Orchestrazione ETL vendite: ERP -> DWH -> dbt marts",
flow_run_name="vendite-{data}", # Nome run dinamico
retries=0, # Retry a livello task, non flow
timeout_seconds=7200, # 2 ore max
log_prints=True,
)
def pipeline_vendite(data: Optional[date] = None):
"""
Flow principale per la pipeline vendite.
Può essere lanciato manualmente con una data specifica
o schedulato per girare ogni giorno.
"""
logger = get_run_logger()
if data is None:
data = date.today()
logger.info(f"Avvio pipeline vendite per {data}")
# Estrazione parallela (Prefect gestisce la concorrenza automaticamente)
# submit() esegue il task in modo asincrono
future_ordini = estrai_ordini.submit(data)
future_clienti = estrai_clienti.submit(data)
# Attende entrambi i task e recupera i risultati
ordini = future_ordini.result()
clienti = future_clienti.result()
if len(ordini) == 0:
logger.warning(f"Nessun ordine per {data}, skip elaborazione")
return {"status": "skipped", "data": str(data)}
# Task sequenziali
df_arricchito = valida_e_arricchisci(ordini, clienti)
num_record = carica_dwh(df_arricchito, schema="silver", table="ordini_arricchiti")
esegui_dbt(target="prod")
logger.info(f"Pipeline completata: {num_record} record elaborati")
return {"status": "success", "records": num_record, "data": str(data)}
# ============ DEPLOYMENT ============
# Prefect 3: il deployment definisce schedule e infrastruttura
if __name__ == "__main__":
from prefect.deployments import DeploymentImage
pipeline_vendite.deploy(
name="prod-giornaliero",
cron="0 5 * * *",
work_pool_name="k8s-work-pool", # Esegui su Kubernetes
image=DeploymentImage(
name="myregistry/pipeline-vendite:latest",
platform="linux/amd64",
),
parameters={}, # Parametri default
)
Prefect Blocks: Centralizovaná správa tajemství a konfigurace
I Prefektní bloky jsou to konfigurovatelné objekty uložené v Prefektu backendy (cloudové nebo samostatně hostované), které centralizují správu připojení a tajemství a konfigurace infrastruktury. Eliminují potřebu proměnných prostředí rozptýlené v kontejnerech a umožňují aktualizace bez opětovného nasazení kódu.
Airflow vs Dagster vs Prefect Srovnání: Co je lepší?
Upřímná odpověď zní: záleží. Každý nástroj má ideální případ použití a body specifická síla. Zde je strukturované srovnání, které vám pomůže při výběru.
Podrobné srovnání: Airflow vs Dagster vs Prefect (2025)
| Kritérium | Průtok vzduchu 3.0 | Dagster 1.9 | Prefekt 3 |
|---|---|---|---|
| Paradigma | Úkolově zaměřené (DAG) | Asset-centric (SDA) | Úkol zaměřený (Python-first) |
| Křivka učení | Vysoká (sémantika DAG, poskytovatelé, háčky) | Středně vysoká (model aktiv, IO Manager) | Nízká (dekorátor na stávajících funkcích) |
| Data Lineage | Omezené (závislosti na úkolu) | Vynikající (nativní, vizuální) | Omezené (nedávno přidáno) |
| Zkušenosti vývojáře (místní) | Fair (Docker složil těžce) | Vynikající (dagster dev, místní uživatelské rozhraní) | Vynikající (pip nainstalovat, spustit lokálně) |
| Testování | Komplexní (závisí na infrastruktuře) | Vynikající (majetek i funkce) | Vynikající (úkoly i funkce) |
| integrace dbt | Dobré (dbt Cloud operátor) | Vynikající (dagster-dbt, nativní aktivum) | Dobrý (prefekt-dbt) |
| Plánování | Vynikající (cron, událost, aktiva, termín) | Vynikající (cron, událost, aktivum, senzor) | Dobré (cron, událost, automatizace) |
| Zásypy | Vynikající (spravováno plánovačem ve verzi 3.0) | Vynikající (nativní dělení) | Diskrétní (manuální nebo automatizace) |
| Škálovatelnost | Vysoká (CeleryExecutor, KubernetesExecutor) | Vysoká (Kubernetes, ECS, Docker) | Vysoká (pracovní bazény, K8, modální) |
| Monitorování uživatelského rozhraní | Vynikající (uživatelské rozhraní React ve verzi 3.0) | Vynikající (katalog, graf linie) | Vynikající (panel Prefect Cloud) |
| Model nasazení | Komplexní (kormidlový graf, skládání) | Střední (dagster-webový server, k8s) | Jednoduché (.deploy(), pracovní fondy) |
| Společenství/Ekosystém | Obrovský (35 tisíc hvězd GitHub+, stovky poskytovatelů) | Vzestupně (10 000 a více hvězdiček, zaměření na podniky) | Velké (15 tisíc hvězdiček a více, zaměření na vývojáře) |
| Spravovaný cloud | Astronom, MWAA, Cloud Composer | Dagster Cloud (bez serveru + hybridní) | Prefect Cloud (štědrá bezplatná úroveň) |
| Náklady na vlastní hostování | Open source (infra ke správě) | Open source (infra ke správě) | Open source (infra ke správě) |
| Řízené náklady | Astronom od 500 $ měsíčně | Dagster Cloud od 500 $ měsíčně | Prefect Cloud velkorysá bezplatná úroveň |
| Ideální pro | Tým se zkušenostmi Airflow, klasické ETL potrubí | Data-řízený tým, dbt integrace, ML potrubí | Team Python, rychlé prototypování, řízené událostmi |
Průvodce rychlým výběrem
- Proud vzduchu zvolte, pokud: váš tým již má dovednosti proudění vzduchu, pracujete s komplexními ekosystémy (Spark, Hadoop, mnoho poskytovatelů), které potřebujete z obrovského dostupného ekosystému operátorů jste buď na AWS (MWAA) nebo GCP (Cloud Composer), které jej spravují nativně.
- Vyberte Dagster, pokud: kvalita dat a linie jsou prioritami, váš stack obsahuje dbt a chcete hlubokou integraci, na to je tým zvyklý přemýšlejte podle aktiv a ne podle úkolů, nebo budujete ML potrubí, kdekoli chcete sledovat každý artefakt.
- Vyberte Prefekt, pokud: chcete nejnižší křivku učení, tým a Python-nejprve bez zkušeností s orchestrací, potřebujete pracovní postup řízené událostmi a provozní (nejen data), nebo s nimi chcete rychle začít bezplatná úroveň Prefect Cloud.
Temporal: Trvanlivé provádění pro komplexní pracovní postupy
Zatímco Airflow, Dagster a Prefect se zaměřují na orchestraci datum potrubí, Temporální řeší jiný problém: provedení odolný dlouhotrvajících pracovních postupů aplikací, které musí přežít k pádům, restartům a přerušením sítě.
Vyvinutý bývalými inženýry Uber (kteří vytvořili Cadence, jeho předchůdce), Temporal považuje každý pracovní postup za jeden trvanlivá stavová funkce: pokud server se restartuje, zatímco pracovní postup běží, pokračuje přesně tam, kde zastavilo se, aniž by ztratilo stát. To je nezbytné pro pracovní postupy, které trvají hodiny, dny nebo týdny (např. procesy onboardingu, orchestrace agentů AI, sága vzor pro distribuované transakce).
Temporal vs Airflow/Dagster: Kdy zvolit
| Scénář | Doporučený nástroj | Důvod |
|---|---|---|
| Denní ETL potrubí | Proudění vzduchu / Dagster / Prefekt | Optimalizováno pro dávkovou orchestraci dat |
| dbt transformace | Dagster (lepší) / Průtok vzduchu | Nativní integrace, rodová linie aktiv |
| Tréninkové kanály ML | Dagster / prefekt | Sledování artefaktů, usnadnění testování |
| Procesy registrace uživatele (více kroků) | Temporální | Trvanlivost, řízení stavu aplikace |
| Orchestrace agentů AI / LLM | Temporální / prefekt | Dlouhodobé, komplexní řízení poruch |
| Vzory ságy / distribuované transakce | Temporální | Kompenzační transakce, trvanlivost |
| IoT potrubí v reálném čase | Kafka + Flink (nedávkoví orchestrátoři) | Streamování vyžaduje jinou architekturu |
Mnoho společností používá hybridní přístup: Airflow nebo Dagster pro orchestraci datový kanál, Temporal pro aplikační pracovní postupy, které vyžadují trvanlivost. Dva nástroje dokonale se doplňují a nejsou si přímou konkurencí.
Reference Architecture: Orchestrace v moderním datovém zásobníku
Jak se orchestrátor integruje se zbytkem datového zásobníku? Podívejme se na architekturu Kompletní reference, která kombinuje všechny komponenty, které jsme viděli v této sérii: Airbyte pro příjem, dbt pro transformace, Apache Iceberg jako formát tabulky, a Dagster jako ústřední orchestrátor.
Data Stack Architecture s Dagsterem jako orchestrátorem
| Vrstvy | Komponent | Nástroje | Orchestrován Dagsterem? |
|---|---|---|---|
| Požití | Synchronizace zdroje | Airbyte / Fivetran | Ano (dagster-airbyte) |
| Skladování | Ukládání objektů + formát tabulky | S3 + Apache Iceberg | Ano (IO Manager) |
| Transformace | Deklarativní SQL modely | dbt Core / Cloud | Ano (dagster-dbt, nativní aktivum) |
| Kvalitní | Testování a validace dat | dbt testy, velká očekávání | Ano (kontroly aktiv) |
| Porce | Dotazový engine | Trino / Athena / DuckDB | Ne (dotazy na vyžádání) |
| Analytics | Řídicí panely a sestavy | Metabáze / tablo | Ne (spotřebovávají data) |
| ML | Funkce inženýrství + školení | Python + MLflow | Ano (podklady ML) |
# definitions.py - Dagster Definitions: composizione del data stack completo
from dagster import Definitions, load_assets_from_modules
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance
from dagster_dbt import DbtCliResource
from dagster_aws.s3 import S3Resource
# Import dei moduli asset
from assets import ingestione, silver, gold, ml_features
# ============ CARICAMENTO ASSET ============
# Asset Airbyte: ogni connessione Airbyte diventa un Dagster asset
airbyte_assets = load_assets_from_airbyte_instance(
airbyte=AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
connection_filter=lambda meta: meta.name.startswith("prod_"),
)
# Asset dbt: ogni modello dbt diventa un Dagster asset con lineage completa
@dbt_assets(
manifest=Path("dbt_project/target/manifest.json"),
)
def dbt_all_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Asset Python custom
python_assets = load_assets_from_modules([ingestione, silver, gold, ml_features])
# ============ RISORSE ============
resources_prod = {
"airbyte": AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
"dbt": DbtCliResource(
project_dir="dbt_project",
profiles_dir="dbt_project",
target="prod",
),
"s3": S3Resource(region_name="eu-west-1"),
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
),
"slack": SlackResource(token=EnvVar("SLACK_BOT_TOKEN")),
}
# ============ COMPOSIZIONE FINALE ============
defs = Definitions(
assets=[
*airbyte_assets,
dbt_all_assets,
*python_assets,
],
resources=resources_prod,
jobs=[pipeline_vendite_job, pipeline_clienti_job, pipeline_ml_job],
schedules=[pipeline_vendite_schedule, pipeline_ml_weekly_schedule],
sensors=[nuovo_file_s3_sensor, airbyte_sync_sensor],
)
Monitorování, varování a pozorovatelnost
Orchestrátor bez monitorování a jako letadlo bez palubních nástrojů. Sledování potrubí a zásadní pro zajištění SLA, diagnostiku problémů a komunikaci stav datové platformy pro firmu.
Doporučený monitorovací zásobník
Monitorovací nástroje pro organizované potrubí
| Vrstvy | Nástroje | Co sleduje |
|---|---|---|
| Metriky infrastruktury | Prometheus + Grafana | Pracovníci CPU/RAM, fronta úloh, latence plánovače |
| Metriky potrubí | Metriky proudění vzduchu / události Dagster / Prefect API | Úspěšnost úkolu, trvání, porušení SLA |
| Centralizované protokoly | ELK Stack / CloudWatch / Loki | Protokol každé instance úlohy pro ladění |
| Upozornění | PagerDuty / OpsGenie | Eskalace pro kritická selhání, porušení SLA |
| Týmová upozornění | Slack / Microsoft Teams | Upozornění na úspěch/neúspěch, denní zpráva |
| Kvalita dat | Elementární / dbt testy / Velká očekávání | Anomálie dat, čerstvost, počty řádků |
# monitoring/alerting_setup.py
# Configurazione alerting Airflow con Slack e PagerDuty
from airflow.hooks.base import BaseHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime
def task_failure_alert(context):
"""
Callback chiamato su ogni task failure.
Inviamo notifica Slack con dettagli del fallimento.
"""
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
run_id = context["run_id"]
execution_date = context["execution_date"]
log_url = context["task_instance"].log_url
exception = context.get("exception", "N/A")
message = f"""
:x: *Task Fallito*
*DAG:* {dag_id}
*Task:* {task_id}
*Run ID:* {run_id}
*Data:* {execution_date.strftime('%Y-%m-%d %H:%M')}
*Errore:* `{str(exception)[:200]}`
*Log:* <{log_url}|Vedi Log Airflow>
"""
slack_op = SlackWebhookOperator(
task_id="slack_alert",
slack_webhook_conn_id="slack_data_ops",
message=message,
channel="#data-ops-alerts",
dag=context["dag"],
)
slack_op.execute(context)
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Callback per SLA breach: invia alert critico su PagerDuty."""
import requests
sla_tasks = ", ".join([f"{s.task_id}" for s in slas])
message = f"SLA BREACH su DAG {dag.dag_id}: task {sla_tasks} non completati in tempo"
# PagerDuty Events API
payload = {
"routing_key": "YOUR_PAGERDUTY_INTEGRATION_KEY",
"event_action": "trigger",
"payload": {
"summary": message,
"severity": "critical",
"source": "airflow",
"component": dag.dag_id,
"custom_details": {
"sla_tasks": sla_tasks,
"dag_id": dag.dag_id,
},
},
}
requests.post("https://events.pagerduty.com/v2/enqueue", json=payload)
# Utilizzo nel DAG
with DAG(
dag_id="pipeline_critica",
default_args={
"on_failure_callback": task_failure_alert, # Per ogni task
},
sla_miss_callback=sla_miss_callback, # Per SLA violations
schedule="0 5 * * *",
# ...
) as dag:
task_etl = PythonOperator(
task_id="etl_principale",
python_callable=esegui_etl,
sla=timedelta(hours=1), # Deve completare entro 1 ora
)
Grafana Dashboard pro metriky potrubí
Airflow vystavuje metriky přes StatsD (nebo přímo Prometheus s airflow-prometheus-exporter). Zde jsou nejdůležitější metriky ke sledování na řídicím panelu Grafana:
Metriky proudění vzduchu k monitorování v Grafaně
- airflow.scheduler.tasks.hladovějící: Úlohy ve frontě bez dostupných pracovníků (označuje nedostatečnou škálovatelnost)
- airflow.dagrun.duration.úspěch: Distribuce trvání běhu DAG (percentil P95 jako indikátor SLA)
- airflow.task_instance.failures: Trend selhání podle úkolu (anomálie indikují problémy se zdrojem)
- airflow.scheduler.heartbeat: Frekvence prezenčního signálu plánovače (pokud se zastaví, nejsou naplánovány žádné úkoly)
- airflow.pool.open_slots: Volné sloty v bazénech (označuje nasycení)
- airflow.dagrun.first_task_scheduling_delay: Prodleva mezi plánem a prvním spuštěným úkolem
Nejlepší postupy pro robustní potrubí ve výrobě
1. Idempotence: Zlaté pravidlo
Každý úkol musí být idempotentní: spusťte jej několikrát se stejnými parametry musí přinést stejný výsledek. Tato vlastnost je zásadní, protože opakování (automatické nebo ruční) je ve výrobě nevyhnutelné.
# SBAGLIATO: non idempotente (doppio insert)
def carica_dati_wrong(**context):
df = estrai_dati()
pg_hook.insert_rows("staging.ordini", df.values.tolist()) # APPEND!
# CORRETTO: idempotente (DELETE + INSERT per la data specifica)
def carica_dati_correct(**context):
execution_date = context["ds"] # "2025-01-15"
df = estrai_dati(execution_date)
with pg_hook.get_conn() as conn:
with conn.cursor() as cur:
# 1. Elimina i dati esistenti per questa partizione
cur.execute(
"DELETE FROM staging.ordini WHERE DATE(data_ordine) = %s",
[execution_date]
)
# 2. Inserisci i nuovi dati
execute_values(cur, """
INSERT INTO staging.ordini (ordine_id, data_ordine, importo)
VALUES %s
""", df[["ordine_id", "data_ordine", "importo"]].values.tolist())
conn.commit()
# OPPURE: usa INSERT ... ON CONFLICT per UPSERT atomico
def carica_dati_upsert(**context):
execution_date = context["ds"]
df = estrai_dati(execution_date)
pg_hook.insert_rows(
table="staging.ordini",
rows=df.values.tolist(),
target_fields=["ordine_id", "data_ordine", "importo"],
replace=True,
replace_index=["ordine_id"], # PK: upsert su ordine_id
)
2. Opakujte zásady a exponenciální stažení
Vždy nastavte rozumné zásady opakování. Přechodná selhání (časové limity sítě, zámek databáze, služby dočasně nedostupné) se často řeší pomocí a opakujte po několika minutách. Exponenciální backoff zabraňuje přetížení stávajících systémů v obtížích.
# Retry policy consigliata per diversi tipi di task
from datetime import timedelta
# Task di estrazione da API esterne: retry aggressivi con backoff
default_args_api = {
"retries": 5,
"retry_delay": timedelta(minutes=1),
"retry_exponential_backoff": True, # 1, 2, 4, 8, 16 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(minutes=30),
}
# Task di trasformazione dbt: retry conservativi
default_args_dbt = {
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
# Task di notifica: no retry (evita spam di alert)
default_args_notify = {
"retries": 0,
"execution_timeout": timedelta(minutes=2),
}
3. Správa závislostí pomocí senzorů
Ne vždy je možné naplánovat potrubí s pevným časem. Někdy musíte počkat že soubor dorazí do S3, že končí jiný kanál nebo že jsou externí data k dispozici. THE Operátor senzoru Airflow (a Dagster) jsou řešením.
# Sensor: attendi che il file di input sia disponibile prima di procedere
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
with DAG("pipeline_dipendente", ...) as dag:
# Attendi file S3 (polling ogni 5 minuti, max 2 ore)
wait_for_file = S3KeySensor(
task_id="wait_for_input_file",
bucket_name="my-data-lake",
bucket_key="inputs/vendite_{{ ds_nodash }}.csv",
aws_conn_id="aws_default",
poke_interval=300, # Controlla ogni 5 min
timeout=7200, # Timeout dopo 2 ore
mode="reschedule", # Libera il worker mentre aspetta
)
# Attendi che un'altra DAG abbia finito
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_erp_sync",
external_dag_id="sincronizzazione_erp",
external_task_id="verify_sync_complete",
allowed_states=["success"],
failed_states=["failed", "skipped"],
poke_interval=120,
timeout=3600,
mode="reschedule",
)
estrai = PythonOperator(task_id="estrai_dati", ...)
# Pipeline partira solo quando entrambi i sensor sono verdi
[wait_for_file, wait_for_upstream] >> estrai
Anti-Pattern: Běžné chyby, kterým je třeba se vyhnout
- Úloha s proměnlivým globálním stavem: Nepoužívejte globální proměnné Pythonu v DAGs. DAG je často analyzován plánovačem; globální proměnné vytvářet nepředvídatelné chování. Použijte XCom, Variables nebo TaskFlow API.
- Obchodní logika v souborech DAG: Soubory DAG musí obsahovat pouze definice orchestrace. Obchodní logika je součástí samostatných modulů Pythonu, importované z úkolů. Usnadňuje testování a zkracuje dobu analýzy.
- XCom pro velké datové sady: XCom je navržen pro malá metadata (počty, cesty k souborům, příznaky). Nepoužívejte jej k předávání datových rámců mezi úkoly: použijte pracovní tabulky nebo soubory S3.
- Monolitická potrubí bez granularity: Jediný úkol, který zvládne vše (extract + transform + load) znemožňuje ladění. Každá operace logicky odlišné musí být samostatným úkolem.
-
Catchup=Pravda s napjatými plány: Pokud povolíte catchup na a
pipeline s hodinovým plánem a po 30denní přestávce jej zapnete, spustíte 720
exekuce ve stejnou dobu. Používejte vždy
catchup=Falsestandardně a explicitně spravovat zásyp. - Pevně zakódovaná tajemství: Nikdy nezadávejte heslo, API klíč nebo připojení řetězec přímo do kódu. Použijte Airflow Connections, Dagster Resources, popř Prefect Blocks integrované s AWS Secrets Manager nebo HashiCorp Vault.
4. Testování potrubí
Nevyzkoušený plynovod a zajištěný technický dluh. Zde je návod, jak strukturovat testy pro Airflow DAG a pro aktivum Dagster.
# tests/test_pipeline_vendite.py
import pytest
from unittest.mock import MagicMock, patch
from datetime import date
import pandas as pd
# ============ TEST AIRFLOW ============
from airflow.models import DagBag
def test_dag_integrity():
"""Verifica che i DAG siano validi e non abbiano cicli."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
assert len(dagbag.import_errors) == 0, \
f"Errori di import: {dagbag.import_errors}"
dag = dagbag.get_dag("pipeline_vendite_giornaliera")
assert dag is not None, "DAG non trovato"
assert dag.schedule == "0 5 * * *"
def test_dag_task_count():
"""Verifica il numero di task nel DAG."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("pipeline_vendite_giornaliera")
assert len(dag.tasks) >= 5, "Troppo pochi task nel DAG"
# ============ TEST DAGSTER ============
from dagster import materialize, build_asset_context
from assets import raw_ordini, silver_ordini_arricchiti
def test_raw_ordini_asset():
"""Test dell'asset raw_ordini con mock delle risorse."""
mock_df = pd.DataFrame({
"ordine_id": [1, 2, 3],
"customer_id": [101, 102, 103],
"importo_totale": [150.0, 89.99, 220.5],
"data_ordine": ["2025-01-15"] * 3,
})
with patch("assets.ingestione.PostgresResource") as mock_pg:
mock_pg.return_value.execute_query.return_value = mock_df
result = materialize(
[raw_ordini],
resources={"postgres_erp": mock_pg.return_value},
partition_key="2025-01-15",
)
assert result.success
asset_value = result.output_for_node("raw_ordini")
assert len(asset_value) == 3
assert (asset_value["importo_totale"] >= 0).all()
# ============ TEST PREFECT ============
from flows.pipeline_vendite import estrai_ordini, valida_e_arricchisci
from prefect.testing.utilities import prefect_test_harness
def test_valida_e_arricchisci():
"""Test del task di validazione con dati di test."""
ordini = pd.DataFrame({
"ordine_id": [1, 2],
"customer_id": [101, 102],
"importo_totale": [100.0, 200.0],
})
clienti = pd.DataFrame({
"id": [101, 102],
"citta": ["Milano", "Roma"],
"segmento": ["Premium", "Standard"],
})
with prefect_test_harness():
result = valida_e_arricchisci.fn(ordini, clienti)
assert len(result) == 2
assert "segmento" in result.columns
assert result["segmento"].isna().sum() == 0
def test_valida_importi_negativi():
"""Test che importi negativi generino eccezione."""
ordini = pd.DataFrame({
"ordine_id": [1],
"importo_totale": [-50.0], # NEGATIVO
"customer_id": [101],
})
clienti = pd.DataFrame({"id": [101], "citta": ["Milano"], "segmento": ["Standard"]})
with prefect_test_harness():
with pytest.raises(ValueError, match="Trovati importi negativi"):
valida_e_arricchisci.fn(ordini, clienti)
Závěry a doporučení
Orchestrace a páteř jakéhokoli vyspělého zásobníku dat. Bez orchestrátora spolehlivé, i ta nejsofistikovanější potrubí se stávají křehkými, neprůhlednými a obtížně proveditelné spravovat. Prozkoumali jsme tři hlavní nástroje trhu 2025 a viděli jsme, jak na to každý reaguje na jiné potřeby.
Doporučení pro scénář
- PMI začíná nyní: Vy si vyberete Prefekt 3 s Prefect Cloud volná úroveň. Čas do zhodnocení je minimální, křivka učení je nízká a můžete projít k více podnikovým řešením, pokud to vyžaduje složitost.
- Tým s dbt stackem: Dagster a přirozený výběr. Nativní integrace s dbt přes dagster-dbt a automatické řazení mezi modely dbt a asset Dagster výrazně snižují kognitivní zátěž týmu.
- Podnik se stávajícími dovednostmi Airflow: Průtok vzduchu 3.0 na Astronomer (spravováno) nebo MWAA. Není důvod migrovat, pokud je tým již produktivní na Airflow a verze 3.0 uzavřela velkou část mezery s Dagster a Prefect.
- Dlouhotrvající pracovní postupy aplikací: Zvážit Temporální v kombinaci s vaším oblíbeným datovým orchestrátorem. Jsou to doplňkové nástroje, ne alternativy.
Další článek v sérii se ponoří doAI aplikovaná na výrobu: uvidíme, jak vybudovat systémy prediktivní údržby se senzory IoT, pro Apache Kafka streamování strojových dat a modely ML pro předvídání selhání dříve, než k nim dojde. Konkrétní případ použití, který ukazuje, jak se z datového zásobníku, který vytváříme, stává základ umělé inteligence ve firmě.
Kontrolní seznam pro uvedení vašeho orchestra do produkce
- Všechny úlohy jsou idempotentní: jejich opětovné spuštění nevytváří duplikáty
- Zásadu opakování nakonfigurovanou s exponenciálním stažením pro každý úkol
- Tajemství spravované přes Připojení/Bloky/Zdroje, nikoli pevně zakódované
- Aktivní monitorování pomocí Grafany nebo ekvivalentu pro metriky infrastruktury
- Upozornění na Slack nebo PagerDuty na selhání a porušení SLA
- Testování integrity DAG/aktiv v CI/CD potrubí
- Ve výchozím nastavení je Catchup zakázáno, zálohování je řešeno explicitně
- Oddělte obchodní logiku od definičních souborů potrubí
- Centralizované protokoly přístupné bez přímého přístupu k pracovníkům
- Inline dokumentace: Každý DAG/aktiv má jasný popis
Užitečné odkazy pro další informace
- Předchozí článek: ETL vs moderní ELT: dbt, Airbyte a Fivetran
- Další článek: AI ve výrobě: Prediktivní údržba a digitální dvojče
- Související série: MLOps for Business – modely umělé inteligence ve výrobě s MLflow
- Související série: LLM v podnikání: RAG Enterprise, Fine-Tuning a Guardrails







