Pipeline Orchestration: Airflow, Dagster și Prefect
Imaginați-vă că aveți o stivă de date fără cusur: o casă de lac pe Apache Iceberg, transformări dbt elegant, conectori Airbyte care sincronizează datele dintr-o duzină de surse. Totul perfect, până când cineva te întreabă: „Dar cine conduce toate astea? Cine garantează că începe munca de ingerare înainte de transformări? Cine te avertizează când ceva nu merge bine la 3 a.m.?" Răspunsul esteorchestrator de conducte.
Orchestrația și adeziv invizibil al stivei de date: componenta care coordonează executarea a sute de sarcini interdependente, gestionează reîncercări în caz de eșec, monitorizează starea conductelor și produce pista de audit pe care o necesită afacerea. Fără un orchestrator, conductele de date sunt scripturi cron ținute împreună de speranță.
În 2025, peisajul orchestratorului este dominat de trei platforme principale: Apache Airflow 3.0, veteranul care a definit chiar conceptul de DAG orchestrare; Dagster 1.x, provocatorul care a introdus conceptul a activelor definite de software; Şi Prefectul 3, soluția proiectată de Python pentru experiența dezvoltatorului. În acest articol le vom analiza în profunzime, cu cod o comparație reală și sinceră pentru a vă ajuta să alegeți cea potrivită pentru contextul dvs.
Ce veți învăța în acest articol
- Arhitectură internă Apache Airflow 3.0: planificator, executor, lucrător, DB de metadate
- Cum se scrie un DAG Python complet pentru o conductă ETL end-to-end
- Paradigma activelor definite de software a Dagster și de ce schimbă modul în care ne gândim la conducte
- Modelul Flow/Task al Prefectului 3 și modelul său de implementare simplificat
- Comparație detaliată între cele trei instrumente cu tabel de comparație în funcție de caz de utilizare
- Temporal ca alternativă pentru fluxuri de lucru durabile și de lungă durată
- Arhitectură de referință pentru integrarea orchestrației cu dbt, Airbyte și data lakehouse
- Monitorizare, alertă și bune practici pentru conductele idempotente în producție
Articole din seria Data Warehouse, AI și Digital Transformation
| # | Articol | Concentrează-te |
|---|---|---|
| 1 | Evoluția Data Warehouse: de la SQL Server la Data Lakehouse | Arhitecturi și platforme |
| 2 | Mesh de date și arhitectură descentralizată | Guvernare și proprietate |
| 3 | ETL vs ELT modern: dbt, Airbyte și Fivetran | Conducta de transformare |
| 4 | Sunteți aici - Pipeline Orchestration | Flux de aer, Dagster, Prefect |
| 5 | AI în producție: întreținere predictivă | IoT, ML, Digital Twin |
| 6 | AI în finanțe: detectarea fraudelor și scorarea creditelor | ML în timp real |
| 7 | AI în comerțul cu amănuntul: prognoza cererii și recomandare | ML aplicat |
deoarece Orchestrația este fundamentală
Înainte de a intra în detalii tehnice, merită să înțelegeți exact ce problemă rezolvă un orchestrator. O conductă tipică de date pentru o companie mijlocie până la mare include:
- Extragere din CRM, ERP, baze de date tranzacționale, API-uri externe (10-50 surse)
- Încărcați în lacul de date (Airflow declanșează Airbyte sau Fivetran)
- transformări dbt (30-200 de modele cu dependențe complexe)
- Actualizarea marților de date și a tabelelor agregate
- Exportați în instrumente BI (Metabase, Tableau, Power BI)
- Actualizarea modelelor ML (ingineria caracteristicilor + recalificare)
Fiecare dintre acești pași are dependențe (B nu poate pleca înainte ca A să fie finalizat), ALS (rapoartele trebuie să fie gata până la ora 08:00), e cerințe de calitate (dacă datele sursă sunt goale, nu lansați transformări). Gestionați toate acestea cu scripturi cron separate și un dezastru garantat.
Ce face un Pipeline Orchestrator
| Funcționalitate | Descriere |
|---|---|
| Managementul Dependenței | Definește ordinea de execuție a sarcinilor și gestionează dependențele pipeline |
| Programare | Programare Cron, bazată pe evenimente, conștient de date (declanșează actualizarea activelor) |
| Reîncercați și tratarea erorilor | Reîncercare automată cu backoff exponențial, gestionarea eșecurilor parțiale |
| Paralelism | Executarea paralelă a sarcinilor independente pentru optimizarea timpului |
| Monitorizare | Tablou de bord centralizat, jurnal de sarcini, monitorizare SLA, alertă |
| Umpluturi | Redarea rulărilor istorice atunci când logica unei conducte se schimbă |
| Pista de audit | Istoricul complet al fiecărei execuții pentru conformitate și depanare |
| Parametrizare | Configurație variabilă pentru medii (dev, staging, prod) și rulări manuale |
Apache Airflow 3.0: Veteranul reînnoit
Creat de Airbnb în 2014 și donat Apache Software Foundation în 2016, Apache Fluxul de aer și a devenit standardul de facto pentru orchestrarea pipelinei de date. Cu dincolo 35.000 de stele pe GitHub și o comunitate care numără sute de contribuitori, Airflow și folosit de mii de companii din întreaga lume, de la startup-uri la întreprinderile mari.
În 2025, Airflow a trecut prin cea mai semnificativă evoluție odată cu lansarea versiunea 3.0, care introduce o arhitectură client-server cu Executarea sarcinii Interfață, programarea nativă care ține cont de active (moștenită de la Dagster), DAG versiunea și o interfață de utilizare complet reproiectată. Și nu mai este doar un planificator de locuri de muncă: este un platformă modernă de orchestrație.
Arhitectura Apache Airflow
Înțelegerea arhitecturii Airflow este esențială pentru implementarea și diagnosticarea corectă a acesteia probleme în producție. Există cinci componente principale:
Componente arhitecturale ale fluxului de aer
| Componentă | Rol | Tehnologie |
|---|---|---|
| Server web | Interfață de utilizare web pentru monitorizarea, depanarea, declanșarea manuală a DAG-urilor | Balon + Gunicorn (2.x), FastAPI (3.0) |
| Programator | Analizează DAG-urile, programează sarcinile, le plasează în coada executorului | Daemon Python, HA cu mai multe instanțe |
| Executor testamentar | Execută sarcini: LocalExecutor (un singur nod), CeleryExecutor, KubernetesExecutor | Țelină + Redis/RabbitMQ sau K8s |
| Muncitori | De fapt, procesează sarcini (numai cu CeleryExecutor/KubernetesExecutor) | Lucrător în țelină sau K8s Pod |
| Baza de date cu metadate | Starea tuturor rulărilor DAG, instanțelor sarcinilor, variabilelor, conexiunilor | PostgreSQL (recomandat în prod), MySQL |
| Procesor DAG | Nou în Airflow 3.0: analizatorul DAG separat de planificator | Pool de procese Python |
Conceptul DAG
Inima Airflow și DAG (Grafic aciclic direcționat): un grafic orientat fără cicluri care definesc ordinea de executare a sarcinilor. Fiecare nod al graficului este o sarcină, fiecare arc este o dependență. Graficul trebuie să fie aciclic (fără ciclu), prin urmare nu puteți crea dependențe circulare.
Sarcinile sunt implementate prin Operator: clase Python care abstractizează un tip de muncă. Airflow include sute de operatori încorporați (PythonOperator, BashOperator, PostgresOperator, SparkSubmitOperator, dbtCloudOperator...) și comunitatea publică cât mai multe prin pachete de furnizor.
Exemplu de cod: DAG ETL complet pentru conducta de vânzări
Să vedem un DAG complet și realist care orchestrează o conductă ETL: extrage din PostgreSQL, încărcarea în staging, transformarea dbt și notificarea echipei.
# dags/pipeline_vendite.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import pandas as pd
import logging
logger = logging.getLogger(__name__)
# Configurazione DAG con default_args per tutti i task
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"email": ["data-team@azienda.it"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True, # 5, 10, 20 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(hours=2),
}
with DAG(
dag_id="pipeline_vendite_giornaliera",
default_args=default_args,
description="Estrai vendite da ERP, carica in DWH, aggrega con dbt",
schedule="0 5 * * *", # Ogni giorno alle 05:00 UTC
start_date=datetime(2025, 1, 1),
end_date=None,
catchup=False, # Non eseguire run passate
max_active_runs=1, # Una sola esecuzione alla volta
tags=["vendite", "etl", "produzione"],
doc_md="""
## Pipeline Vendite Giornaliera
Estrae ordini dall'ERP (PostgreSQL), li carica in staging,
lancia le trasformazioni dbt e notifica il team su Slack.
**SLA**: Completamento entro le 07:30 UTC
**Owner**: Team Data Engineering
""",
) as dag:
# ============ TASK 1: VERIFICA DISPONIBILITA SORGENTE ============
def check_source_data(**context):
"""Verifica che ci siano dati nella finestra di esecuzione."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
execution_date = context["ds"] # "2025-01-15"
count = pg_hook.get_first("""
SELECT COUNT(*)
FROM orders
WHERE DATE(created_at) = %s
""", parameters=[execution_date])[0]
logger.info(f"Trovati {count} ordini per {execution_date}")
if count == 0:
logger.warning("Nessun dato trovato, skip pipeline")
return "notify_no_data" # Branch: salta elaborazione
return "extract_task_group.extract_orders"
check_source = BranchPythonOperator(
task_id="check_source_data",
python_callable=check_source_data,
)
# ============ TASK GROUP: ESTRAZIONE ============
with TaskGroup("extract_task_group") as extract_group:
def extract_orders(**context):
"""Estrae ordini dall'ERP e li salva in staging."""
execution_date = context["ds"]
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
df = pg_hook.get_pandas_df("""
SELECT
o.id AS ordine_id,
o.customer_id,
o.created_at AS data_ordine,
o.status,
SUM(oi.qty * oi.unit_price * (1 - COALESCE(oi.discount, 0)))
AS importo_totale,
COUNT(oi.id) AS num_righe
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = %(exec_date)s
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""", parameters={"exec_date": execution_date})
# Validazione dati estratti
assert df["importo_totale"].ge(0).all(), "Importi negativi trovati!"
assert df["ordine_id"].is_unique, "Ordini duplicati nell'estrazione!"
# Salva in XCom per passare al task successivo
# Per dataset grandi, usa staging table invece di XCom
context["ti"].xcom_push(key="row_count", value=len(df))
# Carica in tabella staging del DWH
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
dwh_hook.insert_rows(
table="staging.stg_ordini_raw",
rows=df.values.tolist(),
target_fields=df.columns.tolist(),
replace=True,
replace_index=["ordine_id"],
)
logger.info(f"Caricati {len(df)} ordini in staging")
extract_orders_task = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders,
)
def extract_customers(**context):
"""Estrae snapshot clienti aggiornati."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
df_customers = pg_hook.get_pandas_df("""
SELECT id, email, nome, cognome, citta, segmento,
DATE(updated_at) AS data_aggiornamento
FROM customers
WHERE DATE(updated_at) >= CURRENT_DATE - INTERVAL '1 day'
""")
if len(df_customers) > 0:
dwh_hook.insert_rows(
table="staging.stg_clienti_raw",
rows=df_customers.values.tolist(),
target_fields=df_customers.columns.tolist(),
replace=True,
replace_index=["id"],
)
extract_customers_task = PythonOperator(
task_id="extract_customers",
python_callable=extract_customers,
)
# I due extract girano in PARALLELO
[extract_orders_task, extract_customers_task]
# ============ TASK GROUP: TRASFORMAZIONI DBT ============
with TaskGroup("dbt_task_group") as dbt_group:
# dbt run sullo staging layer
dbt_staging = BashOperator(
task_id="dbt_run_staging",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:staging --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
# dbt run sul marts layer (dipende dallo staging)
dbt_marts = BashOperator(
task_id="dbt_run_marts",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:marts --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
# dbt test per validare la qualità
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt test --select tag:staging tag:marts --target prod
""",
)
dbt_staging >> dbt_marts >> dbt_test
# ============ NOTIFICHE ============
notify_success = SlackWebhookOperator(
task_id="notify_success",
slack_webhook_conn_id="slack_data_team",
message="""
:white_check_mark: *Pipeline Vendite Completata*
Data: {{ ds }}
Ordini elaborati: {{ ti.xcom_pull(task_ids='extract_task_group.extract_orders', key='row_count') }}
Durata: {{ dag_run.get_task_instance('dbt_task_group.dbt_test').duration | round(1) }}s
""",
trigger_rule="all_success",
)
notify_no_data = SlackWebhookOperator(
task_id="notify_no_data",
slack_webhook_conn_id="slack_data_team",
message=":information_source: *Pipeline Vendite* - Nessun dato per {{ ds }}, esecuzione saltata",
)
notify_failure = SlackWebhookOperator(
task_id="notify_failure",
slack_webhook_conn_id="slack_data_team",
message=":x: *Pipeline Vendite FALLITA* - Data: {{ ds }} - Controlla Airflow UI!",
trigger_rule="one_failed",
)
# ============ DIPENDENZE DAG ============
check_source >> [extract_group, notify_no_data]
extract_group >> dbt_group >> notify_success
dbt_group >> notify_failure
Acest DAG demonstrează modelele fundamentale ale fluxului de aer: utilizarea TaskGroup pentru
sarcini legate de grup, BranchPythonOperator prin logica conditionala,
XCom pentru a transmite date între sarcini și pentru a declanșa reguli pentru management
de notificări despre succes și eșec.
Ce este nou în Apache Airflow 3.0 (2025)
- Interfață de execuție a sarcinilor: Nou API client-server care decuplează lucrătorul de serverul API Airflow, îmbunătățind securitatea și scalabilitatea
- Programare în funcție de bunuri: DAG-urile pot fi acum declanșate printr-o actualizare a activelor (setului de date), nu doar printr-o programare cron
- Versiune DAG: Fiecare rulare este asociată cu versiunea DAG la lansare, eliminând inconsecvențele în timpul implementărilor
- Human-in-the-Loop: Airflow 3.1 introduce fluxuri de lucru care așteaptă aprobarea umană înainte de a continua
- React UI: Interfață complet reproiectată, mai rapidă și mai intuitivă
- Procesor DAG separat: Analiza DAG are loc într-un proces dedicat, reducând sarcina pe planificator
Dagster: Paradigma activelor definite de software
Dagster, născut în 2018 din Elementl (acum Dagster Labs), a introdus o schimbare de paradigmă radical în orchestrare: în loc să se gândească la sarcini de îndeplinit, ne gândim la active de produs. Un activ și orice artefacte de date: un tabel în DWH, un model ML, un fișier Parquet, un raport generat.
Această abordare, numită Active definite de software (SDA), revoluționat
experiența dezvoltatorului. În Airflow definiți „execuți acest script la 5:00”. În Dagster
definește „Vreau masa gold.fatturato_mensile este mereu la zi,
și ea știe să se actualizeze". Diferența pare subtilă, dar schimbă totul: descendență automată,
testare simplă, înțelegere imediată a ceea ce există în stiva de date.
În 2025, Dagster 1.9 a ajuns la maturitate cu cadrul Components (GA în octombrie 2025), care permite ca conducte întregi să fie descrise ca configurații declarative și catalog avansat care oferă o vizibilitate fără precedent asupra stării fiecărui activ din sistem.
Concepte cheie Dagster
Terminologia Dagster
| Concept | Descriere | Echivalentul fluxului de aer |
|---|---|---|
| Active | Artefact de date pe care îl produce pipeline (tabel, model, fișier) | Nu are echivalent direct |
| @active | Decorator Python care definește modul de a produce un activ | PythonOperator (mai limitat) |
| Locuri de muncă | Selectarea activelor/operațiunilor pentru a rula împreună | DAG |
| Op | Sarcină generică fără date explicite | Operator |
| I Manager | Gestionează modul în care sunt salvate și citite elementele (S3, BigQuery, Snowflake...) | Nu are echivalent |
| Resurse | Conexiuni și clienți partajați (bază de date, API) | Conexiune + cârlig |
| Senzor | Declanșatoare bazate pe evenimente (interogarea sistemului de fișiere, API, evenimente) | Operator senzor |
| Programa | Declanșatoare bazate pe timp pentru locuri de muncă | Programul DAG |
| Paravane | Împărțirea activului după dată, categorie, regiune | PartitionedSchedule (mai complex) |
Exemplu de cod: Conductă bazată pe active cu Dagster
Implementăm același canal de vânzări ca și Airflow, dar cu paradigma Dagster. Veți observa imediat cum este explicită dependența dintre active și cum este testarea devine simplă.
# pipeline_vendite/assets.py
import pandas as pd
from dagster import (
asset,
AssetIn,
AssetExecutionContext,
MetadataValue,
Output,
DailyPartitionsDefinition,
MaterializeResult,
)
from dagster_dbt import dbt_assets, DbtCliResource
from resources import postgres_erp, postgres_dwh, slack_resource
# Partizionamento giornaliero - Dagster gestisce il backfill automaticamente
daily_partitions = DailyPartitionsDefinition(start_date="2025-01-01")
# ============ ASSET 1: ORDINI GREZZI DALL'ERP ============
@asset(
name="raw_ordini",
group_name="ingestione",
partitions_def=daily_partitions,
description="Ordini grezzi estratti dall'ERP PostgreSQL",
metadata={
"source": "ERP PostgreSQL",
"owner": "data-engineering@azienda.it",
},
compute_kind="python",
)
def raw_ordini(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Estrae gli ordini per la partizione corrente."""
partition_date = context.partition_key # "2025-01-15"
df = postgres_erp.execute_query(f"""
SELECT
o.id AS ordine_id,
o.customer_id,
o.created_at AS data_ordine,
o.status,
SUM(oi.qty * oi.unit_price) AS importo_totale
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = '{partition_date}'
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""")
# Metadata allegati all'asset: visibili nella UI Dagster
return Output(
df,
metadata={
"num_records": MetadataValue.int(len(df)),
"importo_totale": MetadataValue.float(df["importo_totale"].sum()),
"preview": MetadataValue.md(df.head(5).to_markdown()),
},
)
# ============ ASSET 2: CLIENTI AGGIORNATI ============
@asset(
name="raw_clienti",
group_name="ingestione",
partitions_def=daily_partitions,
compute_kind="python",
)
def raw_clienti(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Estrae snapshot clienti aggiornati."""
partition_date = context.partition_key
df = postgres_erp.execute_query(f"""
SELECT id, email, nome, cognome, citta, segmento
FROM customers
WHERE DATE(updated_at) = '{partition_date}'
""")
return Output(df, metadata={"num_records": MetadataValue.int(len(df))})
# ============ ASSET 3: ORDINI ARRICCHITI (Silver Layer) ============
@asset(
name="silver_ordini_arricchiti",
group_name="silver",
partitions_def=daily_partitions,
ins={
"raw_ordini": AssetIn("raw_ordini"),
"raw_clienti": AssetIn("raw_clienti"),
},
description="Ordini joinati con info cliente, validati",
compute_kind="python",
)
def silver_ordini_arricchiti(
context: AssetExecutionContext,
raw_ordini: pd.DataFrame,
raw_clienti: pd.DataFrame,
postgres_dwh: PostgresResource,
) -> MaterializeResult:
"""Arricchisce e valida gli ordini."""
df = raw_ordini.merge(
raw_clienti[["id", "citta", "segmento"]],
left_on="customer_id",
right_on="id",
how="left",
)
# Validazioni
assert df["importo_totale"].ge(0).all(), "Importi negativi!"
assert df["ordine_id"].is_unique, "Ordini duplicati!"
# Carica nel DWH
postgres_dwh.load_dataframe(
df=df,
table="silver.ordini_arricchiti",
partition_col="data_ordine",
partition_value=context.partition_key,
)
return MaterializeResult(
metadata={
"num_records": MetadataValue.int(len(df)),
"clienti_senza_match": MetadataValue.int(df["segmento"].isna().sum()),
}
)
# ============ ASSET 4: MODELLI DBT (Gold Layer) ============
# Dagster ha integrazione nativa con dbt via dagster-dbt
@dbt_assets(
manifest=dbt_manifest_path, # manifest.json generato da dbt
select="tag:marts", # Solo i modelli gold
)
def dbt_marts_assets(context: AssetExecutionContext, dbt: DbtCliResource):
"""Esegue i modelli dbt del layer gold."""
yield from dbt.cli(["run", "--select", "tag:marts"], context=context).stream()
yield from dbt.cli(["test", "--select", "tag:marts"], context=context).stream()
# ============ JOB: COMPOSIZIONE ============
# In Dagster, un Job seleziona quali asset materializzare
from dagster import define_asset_job, ScheduleDefinition
pipeline_vendite_job = define_asset_job(
name="pipeline_vendite_giornaliera",
selection=[
"raw_ordini",
"raw_clienti",
"silver_ordini_arricchiti",
"dbt_marts_assets*", # Tutti gli asset dbt marts
],
)
pipeline_vendite_schedule = ScheduleDefinition(
job=pipeline_vendite_job,
cron_schedule="0 5 * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
Avantajul imediat al acestei abordări este vizibil: fiecare activ declară în mod explicit
intrările dvs. (AssetIn), Dagster construiește automat graficul
dependențe, iar interfața de utilizare arată o descendență completă de la sursă la produsul final. Testarea
devine trivială deoarece fiecare funcție este o funcție pură Python.
Manager IO: cheia integrării Dagster
Un manager IO în Dagster definește modul în care sunt obținute activele salvate și încărcate între sarcini. Dagster include IO Manager pentru S3 (Parquet, CSV), Snowflake, BigQuery, DuckDB, Panda și multe altele. Puteți configura diferiți manageri IO pentru diferite medii fără schimba codul bunului.
# resources.py - Configurazione Resources e IO Managers
from dagster import EnvVar
from dagster_aws.s3 import S3PickleIOManager, S3Resource
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster_snowflake_pandas import SnowflakePandasIOManager
# Configurazione multi-environment
resources_by_env = {
"dev": {
# In dev: salva su DuckDB locale (costo zero)
"io_manager": DuckDBPandasIOManager(
database="./dev_lakehouse.duckdb",
),
"postgres_erp": PostgresResource(
host="localhost",
port=5432,
database="erp_dev",
),
},
"staging": {
# In staging: usa S3 + Parquet
"io_manager": S3PickleIOManager(
s3_resource=S3Resource(),
s3_bucket="my-staging-lakehouse",
s3_prefix="dagster/",
),
},
"prod": {
# In prod: usa Snowflake
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
database="DWH_PROD",
schema="DAGSTER_ASSETS",
),
},
}
Prefectul 3: Python-First pentru experiența dezvoltatorului
Prefect, fondată în 2018, a îmbrățișat încă de la început o altă filozofie: a face orchestrația cât se poate de simplă și pitonică. Versiunea 3, lansată în 2024 și maturat în 2025, aduce această viziune la bun sfârșit: orice funcție Python poate deveni o sarcină orchestrată cu adăugarea unui singur decorator.
Il Model de flux/sarcină Prefectul elimină o mare parte din configurație solicitate de Airflow și Dagster. Nu este nevoie să definiți DAG în mod explicit, nu sunt necesare fișiere fișiere de configurare separate, nu trebuie să învățați un DSL proprietar: scrieți simplu Python iar Prefectul face restul.
În 2025, Prefectul 3 a introdus Incidentele Prefectului pentru management întrerupere structurată a serviciului, automatizări cu declanșatoare bazate pe metrici (nu doar la evenimentele de execuție) și integrarea nativă cu Modal pentru execuție scalabil fără server. Prefect Cloud oferă un plan generos gratuit care îl face accesibil chiar si pentru echipele mici.
Exemplu de cod: Flow Prefect for Sales Pipeline
# flows/pipeline_vendite.py
import pandas as pd
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret
from prefect.filesystems import S3
from prefect_slack import SlackWebhook
from datetime import timedelta, date
from typing import Optional
# ============ TASK (funzioni Python con decoratore) ============
@task(
name="Estrai Ordini ERP",
description="Estrae ordini completati per la data specificata",
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash, # Cache basata su input
cache_expiration=timedelta(hours=1), # Valida per 1 ora
tags=["estrazione", "erp"],
)
def estrai_ordini(data: date) -> pd.DataFrame:
"""Estrae ordini dall'ERP per la data specificata."""
logger = get_run_logger()
# Leggi credenziali da Prefect Blocks (gestione sicura secrets)
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
query = """
SELECT o.id AS ordine_id, o.customer_id,
o.created_at, o.status,
SUM(oi.qty * oi.unit_price) AS importo_totale
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = :data
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
"""
with engine.connect() as conn:
df = pd.read_sql(query, conn, params={"data": str(data)})
logger.info(f"Estratti {len(df)} ordini per {data}")
return df
@task(
name="Estrai Clienti ERP",
retries=2,
retry_delay_seconds=30,
tags=["estrazione", "erp"],
)
def estrai_clienti(data: date) -> pd.DataFrame:
"""Estrae clienti aggiornati."""
logger = get_run_logger()
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df = pd.read_sql("""
SELECT id, email, nome, cognome, citta, segmento
FROM customers
WHERE DATE(updated_at) = :data
""", conn, params={"data": str(data)})
logger.info(f"Estratti {len(df)} clienti aggiornati")
return df
@task(
name="Valida e Arricchisci Ordini",
retries=1,
)
def valida_e_arricchisci(
ordini: pd.DataFrame,
clienti: pd.DataFrame,
) -> pd.DataFrame:
"""Valida e arricchisce gli ordini con dati cliente."""
logger = get_run_logger()
# Validazioni
if ordini["importo_totale"].lt(0).any():
raise ValueError("Trovati importi negativi nel dataset ordini")
if not ordini["ordine_id"].is_unique:
raise ValueError("Trovati ordini duplicati")
# Join
df = ordini.merge(
clienti[["id", "citta", "segmento"]],
left_on="customer_id",
right_on="id",
how="left",
)
match_rate = (1 - df["segmento"].isna().mean()) * 100
logger.info(f"Match rate clienti: {match_rate:.1f}%")
return df
@task(name="Carica nel DWH", retries=2)
def carica_dwh(df: pd.DataFrame, schema: str, table: str):
"""Carica il dataframe nel DWH."""
conn_string = Secret.load("dwh-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df.to_sql(
name=table,
con=conn,
schema=schema,
if_exists="replace",
index=False,
)
return len(df)
@task(name="Esegui dbt", retries=1)
def esegui_dbt(target: str = "prod"):
"""Lancia dbt run e test sui modelli marts."""
import subprocess
result = subprocess.run(
["dbt", "run", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if result.returncode != 0:
raise RuntimeError(f"dbt run fallito:\n{result.stderr}")
test_result = subprocess.run(
["dbt", "test", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if test_result.returncode != 0:
raise RuntimeError(f"dbt test fallito:\n{test_result.stderr}")
# ============ FLOW PRINCIPALE ============
@flow(
name="Pipeline Vendite Giornaliera",
description="Orchestrazione ETL vendite: ERP -> DWH -> dbt marts",
flow_run_name="vendite-{data}", # Nome run dinamico
retries=0, # Retry a livello task, non flow
timeout_seconds=7200, # 2 ore max
log_prints=True,
)
def pipeline_vendite(data: Optional[date] = None):
"""
Flow principale per la pipeline vendite.
Può essere lanciato manualmente con una data specifica
o schedulato per girare ogni giorno.
"""
logger = get_run_logger()
if data is None:
data = date.today()
logger.info(f"Avvio pipeline vendite per {data}")
# Estrazione parallela (Prefect gestisce la concorrenza automaticamente)
# submit() esegue il task in modo asincrono
future_ordini = estrai_ordini.submit(data)
future_clienti = estrai_clienti.submit(data)
# Attende entrambi i task e recupera i risultati
ordini = future_ordini.result()
clienti = future_clienti.result()
if len(ordini) == 0:
logger.warning(f"Nessun ordine per {data}, skip elaborazione")
return {"status": "skipped", "data": str(data)}
# Task sequenziali
df_arricchito = valida_e_arricchisci(ordini, clienti)
num_record = carica_dwh(df_arricchito, schema="silver", table="ordini_arricchiti")
esegui_dbt(target="prod")
logger.info(f"Pipeline completata: {num_record} record elaborati")
return {"status": "success", "records": num_record, "data": str(data)}
# ============ DEPLOYMENT ============
# Prefect 3: il deployment definisce schedule e infrastruttura
if __name__ == "__main__":
from prefect.deployments import DeploymentImage
pipeline_vendite.deploy(
name="prod-giornaliero",
cron="0 5 * * *",
work_pool_name="k8s-work-pool", # Esegui su Kubernetes
image=DeploymentImage(
name="myregistry/pipeline-vendite:latest",
platform="linux/amd64",
),
parameters={}, # Parametri default
)
Blocuri Prefect: Gestionarea centralizată a secretelor și configurației
I Prefect Blocks sunt obiecte configurabile stocate în Prefect backend-uri (Cloud sau self-hosted) care centralizează gestionarea conexiunilor și a secretelor și configurații de infrastructură. Ele elimină necesitatea variabilelor de mediu dispersate în containere și permit actualizări fără redistribuire a codului.
Comparația fluxului de aer vs Dagster vs Prefect: care este mai bine?
Răspunsul sincer este: depinde. Fiecare instrument are un caz de utilizare ideal și puncte putere specifică. Iată o comparație structurată pentru a vă ajuta în alegerea dvs.
Comparație detaliată: Airflow vs Dagster vs Prefect (2025)
| Criteriu | Flux de aer 3.0 | Dagster 1.9 | Prefectul 3 |
|---|---|---|---|
| Paradigmă | Centrat pe sarcini (DAG) | Centrat pe active (SDA) | Centrat pe sarcini (în primul rând Python) |
| Curba de învățare | Ridicat (semantică DAG, furnizori, cârlige) | Mediu-Ridicat (model de active, Manager IO) | Scăzut (decorator pe funcții existente) |
| Linia de date | Limitat (dependențe de sarcină) | Excelent (nativ, vizual) | Limitat (adăugat recent) |
| Experiență de dezvoltator (local) | Fair (Docker compus grea) | Excelent (dezvoltator dagster, interfață de utilizare locală) | Excelent (instalare pip, rulare local) |
| Testare | Complex (depinde de infra) | Excelent (active, precum și funcții) | Excelent (sarcini, precum și funcții) |
| integrarea dbt | Bun (operator dbt Cloud) | Excelent (dagster-dbt, activ nativ) | Bun (prefect-dbt) |
| Programare | Excelent (cron, eveniment, activ, termen limită) | Excelent (cron, eveniment, activ, senzor) | Bun (cron, eveniment, automatizare) |
| Umpluturi | Excelent (gestionat de planificator în 3.0) | Excelent (partiționare nativă) | discrete (manuale sau automatizări) |
| Scalabilitate | Ridicat (CeleryExecutor, KubernetesExecutor) | Ridicat (Kubernetes, ECS, Docker) | Ridicat (baze de lucru, K8, Modal) |
| Monitorizare UI | Excelent (React UI în 3.0) | Excelent (catalog, grafic de descendență) | Excelent (tabloul de bord Prefect Cloud) |
| Model de implementare | Complex (diagrama de cârmă, compune) | Mediu (dagster-webserver, k8s) | Simplu (.deploy(), pool-uri de lucru) |
| Comunitate/Ecosistem | Uriaș (35k+ stele GitHub, sute de furnizori) | Crescător (10.000 + stele, concentrare pentru întreprinderi) | Mare (15.000 + stele, concentrare pentru dezvoltatori) |
| Cloud gestionat | Astronom, MWAA, Cloud Composer | Dagster Cloud (fără server + hibrid) | Prefect Cloud (nivel gratuit generos) |
| Cost auto-găzduit | Sursă deschisă (infra de gestionat) | Sursă deschisă (infra de gestionat) | Sursă deschisă (infra de gestionat) |
| Cost gestionat | Astronom de la 500 USD/lună | Dagster Cloud de la 500 USD/lună | Prefect Cloud generos nivel gratuit |
| Ideal pentru | Echipa cu experiență Airflow, conducte ETL clasice | Echipa bazată pe date, integrare dbt, conducte ML | Team Python, prototipare rapidă, bazată pe evenimente |
Ghid de alegere rapidă
- Alegeți Flux de aer dacă: echipa ta are deja abilități Airflow, lucrezi cu ecosisteme complexe (Spark, Hadoop, mulți furnizori), ai nevoie din ecosistemul imens de operatori disponibili, sunteți fie pe AWS (MWAA) fie pe GCP (Cloud Composer) care îl gestionează nativ.
- Alegeți Dagster dacă: calitatea datelor și descendența sunt priorități, stiva ta include dbt și vrei o integrare profundă, echipa este obișnuită gândiți-vă după active și nu după sarcini, sau construiți conducte ML oriunde doriți urmăriți fiecare artefact.
- Alegeți Prefect dacă: vrei cea mai joasă curbă de învățare, echipa și Python-în primul rând fără experiență de orchestrare, aveți nevoie de flux de lucru bazat pe evenimente și operațional (nu doar date), sau doriți să începeți rapid nivelul gratuit al Prefect Cloud.
Temporal: execuție durabilă pentru fluxuri de lucru complexe
În timp ce Airflow, Dagster și Prefect se concentrează pe orchestrarea data conducte, Temporal rezolvă o altă problemă: execuția durabil a fluxurilor de lucru de aplicații de lungă durată care trebuie să supraviețuiască la blocări, reporniri și întreruperi ale rețelei.
Dezvoltat de foști ingineri Uber (care au creat Cadence, predecesorul său), Temporal tratează fiecare flux de lucru ca unul singur funcție de stare durabilă: dacă serverul repornește în timp ce un flux de lucru rulează, se reia exact unde se oprise, fără să piardă starea. Acest lucru este esențial pentru fluxurile de lucru care durează ore, zile sau săptămâni (de exemplu, procese de onboarding, orchestrarea agenților AI, saga model pentru tranzacțiile distribuite).
Temporal vs Airflow/Dagster: Când să alegeți
| Scenariu | Instrument recomandat | Motiv |
|---|---|---|
| Conducte ETL zilnice | Flux de aer / Dagster / Prefect | Optimizat pentru orchestrarea datelor în lot |
| transformări dbt | Dagster (mai bine) / Flux de aer | Integrare nativă, descendență a activelor |
| Conducte de antrenament ML | Dagster / Prefect | Urmărirea artefactelor, testare facilitată |
| Procese de integrare a utilizatorilor (în mai mulți pași) | Temporal | Durabilitate, managementul stării aplicației |
| Orchestrare agent AI / LLM | Temporal / Prefect | Gestionarea defecțiunilor complexe și de lungă durată |
| Saga modele / tranzacții distribuite | Temporal | Tranzacții compensatoare, durabilitate |
| Conductă IoT în timp real | Kafka + Flink (orchestratori non-batch) | Streamingul necesită o arhitectură diferită |
Multe companii adoptă o abordare hibridă: Airflow sau Dagster pentru orchestrare conductă de date, Temporal pentru fluxurile de lucru ale aplicațiilor care necesită durabilitate. Cele două instrumente se completează perfect și nu sunt în competiție directă.
Arhitectură de referință: orchestrare în stiva modernă de date
Cum se integrează un orchestrator cu restul stivei de date? Să vedem o arhitectură Referință completă care combină toate componentele pe care le-am văzut în această serie: Airbyte pentru asimilare, dbt pentru transformări, Apache Iceberg ca format de tabel, și Dagster ca orchestrator central.
Arhitectura stivei de date cu Dagster ca orchestrator
| Straturi | Componentă | Instrumente | Orchestrată de Dagster? |
|---|---|---|---|
| Ingestie | Sincronizarea sursei | Airbyte / Fivetran | Da (dagster-airbyte) |
| Depozitare | Depozitare obiecte + format tabel | S3 + Apache Iceberg | Da (Manager IO) |
| Transformare | Modele SQL declarative | dbt Core / Cloud | Da (dagster-dbt, activ nativ) |
| Calitate | Testarea și validarea datelor | teste dbt, mari așteptări | Da (verificări active) |
| Servire | Motor de interogări | Trino / Athena / DuckDB | Nu (interogări la cerere) |
| Analytics | Tablouri de bord și rapoarte | Metabază/Tabel | Nu (ei consumă date) |
| ML | Inginerie caracteristică + instruire | Python + MLflow | Da (materiale ML) |
# definitions.py - Dagster Definitions: composizione del data stack completo
from dagster import Definitions, load_assets_from_modules
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance
from dagster_dbt import DbtCliResource
from dagster_aws.s3 import S3Resource
# Import dei moduli asset
from assets import ingestione, silver, gold, ml_features
# ============ CARICAMENTO ASSET ============
# Asset Airbyte: ogni connessione Airbyte diventa un Dagster asset
airbyte_assets = load_assets_from_airbyte_instance(
airbyte=AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
connection_filter=lambda meta: meta.name.startswith("prod_"),
)
# Asset dbt: ogni modello dbt diventa un Dagster asset con lineage completa
@dbt_assets(
manifest=Path("dbt_project/target/manifest.json"),
)
def dbt_all_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Asset Python custom
python_assets = load_assets_from_modules([ingestione, silver, gold, ml_features])
# ============ RISORSE ============
resources_prod = {
"airbyte": AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
"dbt": DbtCliResource(
project_dir="dbt_project",
profiles_dir="dbt_project",
target="prod",
),
"s3": S3Resource(region_name="eu-west-1"),
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
),
"slack": SlackResource(token=EnvVar("SLACK_BOT_TOKEN")),
}
# ============ COMPOSIZIONE FINALE ============
defs = Definitions(
assets=[
*airbyte_assets,
dbt_all_assets,
*python_assets,
],
resources=resources_prod,
jobs=[pipeline_vendite_job, pipeline_clienti_job, pipeline_ml_job],
schedules=[pipeline_vendite_schedule, pipeline_ml_weekly_schedule],
sensors=[nuovo_file_s3_sensor, airbyte_sync_sensor],
)
Monitorizare, alertă și observabilitate
Un orchestrator fără monitorizare și ca un avion fără instrumente de bord. Monitorizare de conducte și esențial pentru asigurarea SLA, diagnosticarea problemelor și comunicare starea platformei de date pentru afacere.
Stivă de monitorizare recomandată
Instrumente de monitorizare pentru conducte orchestrate
| Straturi | Instrumente | Ceea ce monitorizează |
|---|---|---|
| Măsuri de infrastructură | Prometeu + Grafana | Lucrători CPU/RAM, coadă de sarcini, latența programatorului |
| Valori pipeline | Măsuri de flux de aer / evenimente Dagster / API Prefect | Rata de succes a sarcinii, durata, încălcările SLA |
| Jurnalele centralizate | ELK Stack / CloudWatch / Loki | Jurnalul fiecărei instanțe de activitate pentru depanare |
| Alertarea | PagerDuty / OpsGenie | Escaladare pentru eșecuri critice, încălcare SLA |
| Notificări de echipă | Slack / Microsoft Teams | Notificări de succes/eșec, raport zilnic |
| Calitatea datelor | Elementare / teste dbt / Mari așteptări | Anomalii de date, prospețime, număr de rânduri |
# monitoring/alerting_setup.py
# Configurazione alerting Airflow con Slack e PagerDuty
from airflow.hooks.base import BaseHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime
def task_failure_alert(context):
"""
Callback chiamato su ogni task failure.
Inviamo notifica Slack con dettagli del fallimento.
"""
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
run_id = context["run_id"]
execution_date = context["execution_date"]
log_url = context["task_instance"].log_url
exception = context.get("exception", "N/A")
message = f"""
:x: *Task Fallito*
*DAG:* {dag_id}
*Task:* {task_id}
*Run ID:* {run_id}
*Data:* {execution_date.strftime('%Y-%m-%d %H:%M')}
*Errore:* `{str(exception)[:200]}`
*Log:* <{log_url}|Vedi Log Airflow>
"""
slack_op = SlackWebhookOperator(
task_id="slack_alert",
slack_webhook_conn_id="slack_data_ops",
message=message,
channel="#data-ops-alerts",
dag=context["dag"],
)
slack_op.execute(context)
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Callback per SLA breach: invia alert critico su PagerDuty."""
import requests
sla_tasks = ", ".join([f"{s.task_id}" for s in slas])
message = f"SLA BREACH su DAG {dag.dag_id}: task {sla_tasks} non completati in tempo"
# PagerDuty Events API
payload = {
"routing_key": "YOUR_PAGERDUTY_INTEGRATION_KEY",
"event_action": "trigger",
"payload": {
"summary": message,
"severity": "critical",
"source": "airflow",
"component": dag.dag_id,
"custom_details": {
"sla_tasks": sla_tasks,
"dag_id": dag.dag_id,
},
},
}
requests.post("https://events.pagerduty.com/v2/enqueue", json=payload)
# Utilizzo nel DAG
with DAG(
dag_id="pipeline_critica",
default_args={
"on_failure_callback": task_failure_alert, # Per ogni task
},
sla_miss_callback=sla_miss_callback, # Per SLA violations
schedule="0 5 * * *",
# ...
) as dag:
task_etl = PythonOperator(
task_id="etl_principale",
python_callable=esegui_etl,
sla=timedelta(hours=1), # Deve completare entro 1 ora
)
Tabloul de bord Grafana pentru Pipeline Metrics
Airflow expune valorile prin StatsD (sau direct Prometheus cu airflow-prometheus-exporter). Iată cele mai importante valori de monitorizat într-un tablou de bord Grafana:
Măsuri de flux de aer de monitorizat în Grafana
- airflow.scheduler.tasks.moving: Sarcinile puse în coadă fără lucrători disponibili (indică o scalabilitate insuficientă)
- airflow.dagrun.duration.success: Distribuția duratei rulării DAG (percentila P95 ca indicator SLA)
- airflow.task_instance.failures: Tendința eșecurilor în funcție de sarcină (anomaliile indică probleme de sursă)
- airflow.scheduler.heartbeat: Frecvența bătăilor inimii programatorului (dacă se oprește, nu sunt programate sarcini)
- airflow.pool.open_slots: Sloturi gratuite în piscine (indică saturația)
- airflow.dagrun.first_task_scheduling_delay: Întârziere între program și prima sarcină începută
Cele mai bune practici pentru conducte robuste în producție
1. Idempotenta: regula de aur
Fiecare sarcină trebuie să fie idempotent: rulați-l de mai multe ori cu aceleași parametrii trebuie să producă același rezultat. Această proprietate este fundamentală deoarece reîncercările (automate sau manuale) sunt inevitabile în producție.
# SBAGLIATO: non idempotente (doppio insert)
def carica_dati_wrong(**context):
df = estrai_dati()
pg_hook.insert_rows("staging.ordini", df.values.tolist()) # APPEND!
# CORRETTO: idempotente (DELETE + INSERT per la data specifica)
def carica_dati_correct(**context):
execution_date = context["ds"] # "2025-01-15"
df = estrai_dati(execution_date)
with pg_hook.get_conn() as conn:
with conn.cursor() as cur:
# 1. Elimina i dati esistenti per questa partizione
cur.execute(
"DELETE FROM staging.ordini WHERE DATE(data_ordine) = %s",
[execution_date]
)
# 2. Inserisci i nuovi dati
execute_values(cur, """
INSERT INTO staging.ordini (ordine_id, data_ordine, importo)
VALUES %s
""", df[["ordine_id", "data_ordine", "importo"]].values.tolist())
conn.commit()
# OPPURE: usa INSERT ... ON CONFLICT per UPSERT atomico
def carica_dati_upsert(**context):
execution_date = context["ds"]
df = estrai_dati(execution_date)
pg_hook.insert_rows(
table="staging.ordini",
rows=df.values.tolist(),
target_fields=["ordine_id", "data_ordine", "importo"],
replace=True,
replace_index=["ordine_id"], # PK: upsert su ordine_id
)
2. Reîncercați Politica și Backoff exponențial
Configurați întotdeauna o politică rezonabilă de reîncercare. Eșecuri tranzitorii (timeout rețelei, blocarea bazei de date, servicii temporar indisponibile) sunt adesea rezolvate cu a reîncercați după câteva minute. Retragerea exponențială evită supraîncărcarea sistemelor existente în dificultate.
# Retry policy consigliata per diversi tipi di task
from datetime import timedelta
# Task di estrazione da API esterne: retry aggressivi con backoff
default_args_api = {
"retries": 5,
"retry_delay": timedelta(minutes=1),
"retry_exponential_backoff": True, # 1, 2, 4, 8, 16 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(minutes=30),
}
# Task di trasformazione dbt: retry conservativi
default_args_dbt = {
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
# Task di notifica: no retry (evita spam di alert)
default_args_notify = {
"retries": 0,
"execution_timeout": timedelta(minutes=2),
}
3. Managementul dependenței cu senzori
Nu este întotdeauna posibil să programați o conductă cu o oră fixă. Uneori trebuie să aștepți că un fișier ajunge în S3, că o altă conductă se termină sau că sunt date externe disponibile. THE Operator senzor de Airflow (și Dagster) sunt soluția.
# Sensor: attendi che il file di input sia disponibile prima di procedere
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
with DAG("pipeline_dipendente", ...) as dag:
# Attendi file S3 (polling ogni 5 minuti, max 2 ore)
wait_for_file = S3KeySensor(
task_id="wait_for_input_file",
bucket_name="my-data-lake",
bucket_key="inputs/vendite_{{ ds_nodash }}.csv",
aws_conn_id="aws_default",
poke_interval=300, # Controlla ogni 5 min
timeout=7200, # Timeout dopo 2 ore
mode="reschedule", # Libera il worker mentre aspetta
)
# Attendi che un'altra DAG abbia finito
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_erp_sync",
external_dag_id="sincronizzazione_erp",
external_task_id="verify_sync_complete",
allowed_states=["success"],
failed_states=["failed", "skipped"],
poke_interval=120,
timeout=3600,
mode="reschedule",
)
estrai = PythonOperator(task_id="estrai_dati", ...)
# Pipeline partira solo quando entrambi i sensor sono verdi
[wait_for_file, wait_for_upstream] >> estrai
Anti-Pattern: greșeli frecvente de evitat
- Sarcină cu stare globală mutabilă: Nu utilizați variabile globale Python în DAG-uri. DAG-ul este frecvent analizat de către planificator; variabile globale crea un comportament imprevizibil. Utilizați XCom, Variables sau TaskFlow API.
- Logica de afaceri în fișierele DAG: Fișierele DAG trebuie să conțină numai fișierul definiția orchestrației. Logica de afaceri merge în module Python separate, importate din sarcini. Facilitează testarea și reduce timpul de analiză.
- XCom pentru seturi mari de date: XCom este proiectat pentru metadate mici (numărări, căi de fișiere, steaguri). Nu îl utilizați pentru a trece DataFrames între sarcini: utilizați tabele de staging sau fișiere S3.
- Conducte monolitice fără granularitate: O singură sarcină care face totul (extragere + transformare + încărcare) face imposibilă depanarea. Fiecare operație distinct din punct de vedere logic trebuie să fie o sarcină separată.
-
Catchup=Adevărat cu programe strânse: Dacă activați catchup pe a
pipeline cu program orar și îl porniți după o pauză de 30 de zile, veți lansa 720
execuţii în acelaşi timp. Folosiți întotdeauna
catchup=Falseimplicit și gestionați în mod explicit umplerea. - Secrete codificate: Nu introduceți niciodată parola, cheia API sau conexiune șir direct în cod. Utilizați Airflow Connections, Dagster Resources sau Prefect Blocks integrate cu AWS Secrets Manager sau HashiCorp Vault.
4. Testarea conductelor
O conductă netestată și o datorie tehnică garantată. Iată cum să structurați testele pentru un DAG Airflow și pentru un activ Dagster.
# tests/test_pipeline_vendite.py
import pytest
from unittest.mock import MagicMock, patch
from datetime import date
import pandas as pd
# ============ TEST AIRFLOW ============
from airflow.models import DagBag
def test_dag_integrity():
"""Verifica che i DAG siano validi e non abbiano cicli."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
assert len(dagbag.import_errors) == 0, \
f"Errori di import: {dagbag.import_errors}"
dag = dagbag.get_dag("pipeline_vendite_giornaliera")
assert dag is not None, "DAG non trovato"
assert dag.schedule == "0 5 * * *"
def test_dag_task_count():
"""Verifica il numero di task nel DAG."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("pipeline_vendite_giornaliera")
assert len(dag.tasks) >= 5, "Troppo pochi task nel DAG"
# ============ TEST DAGSTER ============
from dagster import materialize, build_asset_context
from assets import raw_ordini, silver_ordini_arricchiti
def test_raw_ordini_asset():
"""Test dell'asset raw_ordini con mock delle risorse."""
mock_df = pd.DataFrame({
"ordine_id": [1, 2, 3],
"customer_id": [101, 102, 103],
"importo_totale": [150.0, 89.99, 220.5],
"data_ordine": ["2025-01-15"] * 3,
})
with patch("assets.ingestione.PostgresResource") as mock_pg:
mock_pg.return_value.execute_query.return_value = mock_df
result = materialize(
[raw_ordini],
resources={"postgres_erp": mock_pg.return_value},
partition_key="2025-01-15",
)
assert result.success
asset_value = result.output_for_node("raw_ordini")
assert len(asset_value) == 3
assert (asset_value["importo_totale"] >= 0).all()
# ============ TEST PREFECT ============
from flows.pipeline_vendite import estrai_ordini, valida_e_arricchisci
from prefect.testing.utilities import prefect_test_harness
def test_valida_e_arricchisci():
"""Test del task di validazione con dati di test."""
ordini = pd.DataFrame({
"ordine_id": [1, 2],
"customer_id": [101, 102],
"importo_totale": [100.0, 200.0],
})
clienti = pd.DataFrame({
"id": [101, 102],
"citta": ["Milano", "Roma"],
"segmento": ["Premium", "Standard"],
})
with prefect_test_harness():
result = valida_e_arricchisci.fn(ordini, clienti)
assert len(result) == 2
assert "segmento" in result.columns
assert result["segmento"].isna().sum() == 0
def test_valida_importi_negativi():
"""Test che importi negativi generino eccezione."""
ordini = pd.DataFrame({
"ordine_id": [1],
"importo_totale": [-50.0], # NEGATIVO
"customer_id": [101],
})
clienti = pd.DataFrame({"id": [101], "citta": ["Milano"], "segmento": ["Standard"]})
with prefect_test_harness():
with pytest.raises(ValueError, match="Trovati importi negativi"):
valida_e_arricchisci.fn(ordini, clienti)
Concluzii și recomandări
Orchestrarea și coloana vertebrală a oricărei stive de date mature. Fără orchestrator fiabile, chiar și cele mai sofisticate conducte devin fragile, opace și dificil de utilizat gestionează. Am explorat cele trei instrumente principale ale pieței 2025 și am văzut cum fiecare răspunde unor nevoi diferite.
Recomandări pentru scenariu
- PMI începând de acum: Tu alegi Prefectul 3 cu Prefectul Cloud nivel gratuit. Timpul de evaluare este minim, curba de învățare este scăzută și poți trece la mai multe soluții de întreprindere atunci când complexitatea o cere.
- Echipa cu stiva dbt: Dagster și alegere naturală. Integrare nativă cu dbt prin dagster-dbt și descendență automată între modele dbt și asset Dagster reduc foarte mult încărcătura cognitivă a echipei.
- Întreprindere cu abilități existente în fluxul de aer: Flux de aer 3.0 pe Astronomer (gestionat) sau MWAA. Nu există niciun motiv pentru a migra dacă echipa este deja productivă pe Airflow, iar versiunea 3.0 a redus mare parte din decalajul față de Dagster și Prefect.
- Fluxuri de lucru ale aplicațiilor de lungă durată: Luați în considerare Temporal în combinație cu orchestratorul de date preferat. Sunt instrumente complementare, nu alternative.
Următorul articol din serie aprofundeazăIA aplicată în producție: vom vedea cum să construim sisteme de întreținere predictivă cu senzori IoT, pentru Apache Kafka streaming de date ale mașinii și modele ML pentru a anticipa eșecurile înainte ca acestea să se producă. Un caz concret de utilizare care demonstrează modul în care stiva de date pe care o construim devine fundamentul inteligenței artificiale în companie.
Lista de verificare pentru punerea în producție a orchestratorului
- Toate sarcinile sunt idempotente: reluarea lor nu produce duplicate
- Politica din reîncercați configurată cu backoff exponențial pentru fiecare sarcină
- Secrete gestionate prin Conexiuni/Blocuri/Resurse, nu hardcoded
- Monitorizare activă cu Grafana sau echivalent pentru metricile infrastructurii
- Alertă pe Slack sau PagerDuty pentru eșecuri și încălcări SLA
- Testarea integrității DAG-urilor/activelor în pipeline CI/CD
- Catchup dezactivat în mod prestabilit, completarea gestionată în mod explicit
- Separați logica de afaceri de fișierele de definire a conductei
- Jurnalele centralizate accesibile fără acces direct la lucrători
- Documentație în linie: fiecare DAG/activ are o descriere clară
Link-uri utile pentru mai multe informatii
- Articolul precedent: ETL vs ELT modern: dbt, Airbyte și Fivetran
- Articolul următor: AI în producție: întreținere predictivă și digital Twin
- Serii înrudite: MLOps for Business - Modele AI în producție cu MLflow
- Serii înrudite: LLM în afaceri: RAG Enterprise, Fine-Tuning și Guardrails







