Orchestrazione Pipeline: Airflow, Dagster e Prefect
Immagina di avere un data stack impeccabile: un lakehouse su Apache Iceberg, trasformazioni eleganti in dbt, connettori Airbyte che sincronizzano dati da una decina di sorgenti. Tutto perfetto, finchè qualcuno ti chiede: "Ma chi fa girare tutto questo? Chi garantisce che il job di ingestione parta prima delle trasformazioni? Chi ti avvisa quando qualcosa va storto alle 3 di notte?" La risposta e l'orchestratore di pipeline.
L'orchestrazione e il collante invisibile del data stack: il componente che coordina l'esecuzione di centinaia di task interdipendenti, gestisce i retry in caso di fallimento, monitora la salute delle pipeline e produce l'audit trail che il business richiede. Senza un orchestratore, le pipeline di dati sono script cron tenuti insieme dalla speranza.
Nel 2025, il panorama degli orchestratori e dominato da tre piattaforme principali: Apache Airflow 3.0, il veterano che ha definito il concetto stesso di DAG orchestration; Dagster 1.x, il challenger che ha introdotto il concetto di software-defined assets; e Prefect 3, la soluzione Python-first pensata per la developer experience. In questo articolo le analizzeremo in profondità, con codice reale e un confronto onesto per aiutarti a scegliere quella giusta per il tuo contesto.
Cosa Imparerai in Questo Articolo
- Architettura interna di Apache Airflow 3.0: Scheduler, Executor, Worker, Metadata DB
- Come scrivere un DAG Python completo per una pipeline ETL end-to-end
- Il paradigma software-defined assets di Dagster e perchè cambia il modo di pensare le pipeline
- Il modello Flow/Task di Prefect 3 e il suo deployment model semplificato
- Confronto dettagliato tra i tre strumenti con tabella comparativa per caso d'uso
- Temporal come alternativa per workflow durable e long-running
- Architettura di riferimento per integrare orchestrazione con dbt, Airbyte e data lakehouse
- Monitoring, alerting e best practices per pipeline idempotenti in produzione
Articoli della Serie Data Warehouse, AI e Trasformazione Digitale
| # | Articolo | Focus |
|---|---|---|
| 1 | Evoluzione del Data Warehouse: da SQL Server a Data Lakehouse | Architetture e piattaforme |
| 2 | Data Mesh e Architettura Decentralizzata | Governance e ownership |
| 3 | ETL vs ELT Moderno: dbt, Airbyte e Fivetran | Pipeline di trasformazione |
| 4 | Sei qui - Orchestrazione Pipeline | Airflow, Dagster, Prefect |
| 5 | AI nella Manifattura: Predictive Maintenance | IoT, ML, Digital Twin |
| 6 | AI nel Finance: Fraud Detection e Credit Scoring | ML in tempo reale |
| 7 | AI nel Retail: Demand Forecasting e Recommendation | ML applicato |
perchè l'Orchestrazione e Fondamentale
Prima di entrare nei dettagli tecnici, vale la pena capire esattamente quale problema risolve un orchestratore. Una pipeline dati tipica di un'azienda medio-grande comprende:
- Estrazione da CRM, ERP, database transazionali, API esterne (10-50 sorgenti)
- Caricamento nel data lake (Airflow trigghera Airbyte o Fivetran)
- Trasformazioni dbt (30-200 modelli con dipendenze complesse)
- Aggiornamento di data mart e tabelle aggregate
- Export verso strumenti di BI (Metabase, Tableau, Power BI)
- Aggiornamento di modelli ML (feature engineering + retraining)
Ognuno di questi step ha dipendenze (B non può partire prima che A sia completato), SLA (i report devono essere pronti entro le 08:00), e requisiti di qualità (se i dati sorgente sono vuoti, non lanciare le trasformazioni). Gestire tutto questo con script cron separati e un disastro garantito.
Cosa fa un Orchestratore di Pipeline
| Funzionalità | Descrizione |
|---|---|
| Dependency Management | Definisce l'ordine di esecuzione dei task e gestisce le dipendenze tra pipeline |
| Scheduling | Schedulazione cron, event-driven, data-aware (trigger su aggiornamento asset) |
| Retry & Error Handling | Retry automatici con backoff esponenziale, gestione dei fallimenti parziali |
| Parallelismo | Esecuzione parallela di task indipendenti per ottimizzare i tempi |
| Monitoring | Dashboard centralizata, log per task, SLA monitoring, alerting |
| Backfill | Riesecuzione di run storiche quando la logica di una pipeline cambia |
| Audit Trail | Storico completo di ogni esecuzione per compliance e debugging |
| Parameterizzazione | Configurazione variabile per ambienti (dev, staging, prod) e run manuali |
Apache Airflow 3.0: Il Veterano Rinnovato
Creato da Airbnb nel 2014 e donato all'Apache Software Foundation nel 2016, Apache Airflow e diventato lo standard de facto per l'orchestrazione di data pipeline. Con oltre 35.000 stelle su GitHub e una community che conta centinaia di contributori, Airflow e utilizzato da migliaia di aziende in tutto il mondo, da startup alle grandi enterprise.
Nel 2025, Airflow ha attraversato la sua evoluzione più significativa con il rilascio della versione 3.0, che introduce un'architettura client-server con la Task Execution Interface, l'asset-aware scheduling nativo (ereditato da Dagster), il DAG versioning, e una UI completamente ridisegnata. E non più solo un job scheduler: e una piattaforma di orchestrazione moderna.
Architettura di Apache Airflow
Capire l'architettura di Airflow e fondamentale per deployarlo correttamente e diagnosticare problemi in produzione. I componenti principali sono cinque:
Componenti Architetturali di Airflow
| Componente | Ruolo | Tecnologia |
|---|---|---|
| Webserver | UI web per monitoraggio, debug, trigger manuali dei DAG | Flask + Gunicorn (2.x), FastAPI (3.0) |
| Scheduler | Analizza i DAG, pianifica i task, li inserisce nella coda dell'executor | Python daemon, HA con più istanze |
| Executor | Esegue i task: LocalExecutor (singolo nodo), CeleryExecutor, KubernetesExecutor | Celery + Redis/RabbitMQ, oppure K8s |
| Worker | Processa effettivamente i task (solo con CeleryExecutor/KubernetesExecutor) | Celery worker o K8s Pod |
| Metadata Database | Stato di tutte le DAG run, task instance, variabili, connessioni | PostgreSQL (consigliato in prod), MySQL |
| DAG Processor | Nuovo in Airflow 3.0: parser DAG separato dallo scheduler | Python process pool |
Il Concetto di DAG
Il cuore di Airflow e il DAG (Directed Acyclic Graph): un grafo orientato senza cicli che definisce l'ordine di esecuzione dei task. Ogni nodo del grafo e un task, ogni arco e una dipendenza. Il grafo deve essere aciclico (nessun ciclo), quindi non si possono creare dipendenze circolari.
I task sono implementati tramite Operator: classi Python che astraggono un tipo di lavoro. Airflow include centinaia di operatori built-in (PythonOperator, BashOperator, PostgresOperator, SparkSubmitOperator, dbtCloudOperator...) e la community ne pubblica altrettanti tramite provider packages.
Code Example: DAG ETL Completo per Pipeline Vendite
Vediamo un DAG completo e realistico che orchestra una pipeline ETL: estrazione da PostgreSQL, caricamento in staging, trasformazione dbt e notifica al team.
# dags/pipeline_vendite.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import pandas as pd
import logging
logger = logging.getLogger(__name__)
# Configurazione DAG con default_args per tutti i task
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"email": ["data-team@azienda.it"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True, # 5, 10, 20 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(hours=2),
}
with DAG(
dag_id="pipeline_vendite_giornaliera",
default_args=default_args,
description="Estrai vendite da ERP, carica in DWH, aggrega con dbt",
schedule="0 5 * * *", # Ogni giorno alle 05:00 UTC
start_date=datetime(2025, 1, 1),
end_date=None,
catchup=False, # Non eseguire run passate
max_active_runs=1, # Una sola esecuzione alla volta
tags=["vendite", "etl", "produzione"],
doc_md="""
## Pipeline Vendite Giornaliera
Estrae ordini dall'ERP (PostgreSQL), li carica in staging,
lancia le trasformazioni dbt e notifica il team su Slack.
**SLA**: Completamento entro le 07:30 UTC
**Owner**: Team Data Engineering
""",
) as dag:
# ============ TASK 1: VERIFICA DISPONIBILITA SORGENTE ============
def check_source_data(**context):
"""Verifica che ci siano dati nella finestra di esecuzione."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
execution_date = context["ds"] # "2025-01-15"
count = pg_hook.get_first("""
SELECT COUNT(*)
FROM orders
WHERE DATE(created_at) = %s
""", parameters=[execution_date])[0]
logger.info(f"Trovati {count} ordini per {execution_date}")
if count == 0:
logger.warning("Nessun dato trovato, skip pipeline")
return "notify_no_data" # Branch: salta elaborazione
return "extract_task_group.extract_orders"
check_source = BranchPythonOperator(
task_id="check_source_data",
python_callable=check_source_data,
)
# ============ TASK GROUP: ESTRAZIONE ============
with TaskGroup("extract_task_group") as extract_group:
def extract_orders(**context):
"""Estrae ordini dall'ERP e li salva in staging."""
execution_date = context["ds"]
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
df = pg_hook.get_pandas_df("""
SELECT
o.id AS ordine_id,
o.customer_id,
o.created_at AS data_ordine,
o.status,
SUM(oi.qty * oi.unit_price * (1 - COALESCE(oi.discount, 0)))
AS importo_totale,
COUNT(oi.id) AS num_righe
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = %(exec_date)s
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""", parameters={"exec_date": execution_date})
# Validazione dati estratti
assert df["importo_totale"].ge(0).all(), "Importi negativi trovati!"
assert df["ordine_id"].is_unique, "Ordini duplicati nell'estrazione!"
# Salva in XCom per passare al task successivo
# Per dataset grandi, usa staging table invece di XCom
context["ti"].xcom_push(key="row_count", value=len(df))
# Carica in tabella staging del DWH
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
dwh_hook.insert_rows(
table="staging.stg_ordini_raw",
rows=df.values.tolist(),
target_fields=df.columns.tolist(),
replace=True,
replace_index=["ordine_id"],
)
logger.info(f"Caricati {len(df)} ordini in staging")
extract_orders_task = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders,
)
def extract_customers(**context):
"""Estrae snapshot clienti aggiornati."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
df_customers = pg_hook.get_pandas_df("""
SELECT id, email, nome, cognome, citta, segmento,
DATE(updated_at) AS data_aggiornamento
FROM customers
WHERE DATE(updated_at) >= CURRENT_DATE - INTERVAL '1 day'
""")
if len(df_customers) > 0:
dwh_hook.insert_rows(
table="staging.stg_clienti_raw",
rows=df_customers.values.tolist(),
target_fields=df_customers.columns.tolist(),
replace=True,
replace_index=["id"],
)
extract_customers_task = PythonOperator(
task_id="extract_customers",
python_callable=extract_customers,
)
# I due extract girano in PARALLELO
[extract_orders_task, extract_customers_task]
# ============ TASK GROUP: TRASFORMAZIONI DBT ============
with TaskGroup("dbt_task_group") as dbt_group:
# dbt run sullo staging layer
dbt_staging = BashOperator(
task_id="dbt_run_staging",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:staging --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
# dbt run sul marts layer (dipende dallo staging)
dbt_marts = BashOperator(
task_id="dbt_run_marts",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:marts --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
# dbt test per validare la qualità
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt test --select tag:staging tag:marts --target prod
""",
)
dbt_staging >> dbt_marts >> dbt_test
# ============ NOTIFICHE ============
notify_success = SlackWebhookOperator(
task_id="notify_success",
slack_webhook_conn_id="slack_data_team",
message="""
:white_check_mark: *Pipeline Vendite Completata*
Data: {{ ds }}
Ordini elaborati: {{ ti.xcom_pull(task_ids='extract_task_group.extract_orders', key='row_count') }}
Durata: {{ dag_run.get_task_instance('dbt_task_group.dbt_test').duration | round(1) }}s
""",
trigger_rule="all_success",
)
notify_no_data = SlackWebhookOperator(
task_id="notify_no_data",
slack_webhook_conn_id="slack_data_team",
message=":information_source: *Pipeline Vendite* - Nessun dato per {{ ds }}, esecuzione saltata",
)
notify_failure = SlackWebhookOperator(
task_id="notify_failure",
slack_webhook_conn_id="slack_data_team",
message=":x: *Pipeline Vendite FALLITA* - Data: {{ ds }} - Controlla Airflow UI!",
trigger_rule="one_failed",
)
# ============ DIPENDENZE DAG ============
check_source >> [extract_group, notify_no_data]
extract_group >> dbt_group >> notify_success
dbt_group >> notify_failure
Questo DAG dimostra i pattern fondamentali di Airflow: uso di TaskGroup per
raggruppare task correlati, BranchPythonOperator per logica condizionale,
XCom per passare dati tra task, e trigger rules per la gestione
di notifiche su successo e fallimento.
Novità di Apache Airflow 3.0 (2025)
- Task Execution Interface: Nuova API client-server che disaccoppia il worker dall'Airflow API server, migliorando sicurezza e scalabilità
- Asset-Aware Scheduling: I DAG possono ora essere triggerati dall'aggiornamento di un asset (dataset), non solo da schedule cron
- DAG Versioning: Ogni run e associata alla versione del DAG al momento del lancio, eliminando inconsistenze durante i deploy
- Human-in-the-Loop: Airflow 3.1 introduce workflow che attendono approvazione umana prima di procedere
- UI React: Interfaccia completamente ridisegnata, più veloce e intuitiva
- DAG Processor separato: Il parsing dei DAG avviene in un processo dedicato, riducendo il carico sullo scheduler
Dagster: Il Paradigma Software-Defined Assets
Dagster, nato nel 2018 da Elementl (oggi Dagster Labs), ha introdotto un cambio di paradigma radicale nell'orchestrazione: invece di pensare ai task da eseguire, si pensa agli asset da produrre. Un asset e qualsiasi artefatto dati: una tabella nel DWH, un modello ML, un file Parquet, un report generato.
Questo approccio, chiamato Software-Defined Assets (SDA), ha rivoluzionato
la developer experience. In Airflow definisci "esegui questo script alle 5:00". In Dagster
definisci "voglio che la tabella gold.fatturato_mensile sia sempre aggiornata,
e lei sa come aggiornarsi". La differenza sembra sottile, ma cambia tutto: lineage automatica,
testing semplice, comprensione immediata di cosa esiste nel data stack.
Nel 2025, Dagster 1.9 ha raggiunto la maturita con il framework Components (GA a ottobre 2025), che permette di descrivere intere pipeline come configurazioni dichiarative, e il catalog avanzato che offre visibilità senza precedenti sullo stato di ogni asset nel sistema.
Concetti Chiave di Dagster
Terminologia Dagster
| Concetto | Descrizione | Equivalente Airflow |
|---|---|---|
| Asset | Artefatto dati che la pipeline produce (tabella, modello, file) | Non ha equivalente diretto |
| @asset | Decoratore Python che definisce come produrre un asset | PythonOperator (più limitato) |
| Job | Selezione di asset/op da eseguire insieme | DAG |
| Op | Task generico senza output dati esplicito | Operator |
| IO Manager | Gestisce come gli asset vengono salvati e letti (S3, BigQuery, Snowflake...) | Non ha equivalente |
| Resource | Connessioni e client condivisi (database, API) | Connection + Hook |
| Sensor | Trigger event-driven (polling di filesystem, API, eventi) | Sensor operator |
| Schedule | Trigger time-based per job | DAG schedule |
| Partition | Divisione dell'asset per data, categoria, regione | PartitionedSchedule (più complesso) |
Code Example: Pipeline Asset-Based con Dagster
Implementiamo la stessa pipeline vendite di Airflow, ma con il paradigma Dagster. Noterai immediatamente come la dipendenza tra asset sia esplicita e come il testing diventi semplice.
# pipeline_vendite/assets.py
import pandas as pd
from dagster import (
asset,
AssetIn,
AssetExecutionContext,
MetadataValue,
Output,
DailyPartitionsDefinition,
MaterializeResult,
)
from dagster_dbt import dbt_assets, DbtCliResource
from resources import postgres_erp, postgres_dwh, slack_resource
# Partizionamento giornaliero - Dagster gestisce il backfill automaticamente
daily_partitions = DailyPartitionsDefinition(start_date="2025-01-01")
# ============ ASSET 1: ORDINI GREZZI DALL'ERP ============
@asset(
name="raw_ordini",
group_name="ingestione",
partitions_def=daily_partitions,
description="Ordini grezzi estratti dall'ERP PostgreSQL",
metadata={
"source": "ERP PostgreSQL",
"owner": "data-engineering@azienda.it",
},
compute_kind="python",
)
def raw_ordini(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Estrae gli ordini per la partizione corrente."""
partition_date = context.partition_key # "2025-01-15"
df = postgres_erp.execute_query(f"""
SELECT
o.id AS ordine_id,
o.customer_id,
o.created_at AS data_ordine,
o.status,
SUM(oi.qty * oi.unit_price) AS importo_totale
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = '{partition_date}'
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""")
# Metadata allegati all'asset: visibili nella UI Dagster
return Output(
df,
metadata={
"num_records": MetadataValue.int(len(df)),
"importo_totale": MetadataValue.float(df["importo_totale"].sum()),
"preview": MetadataValue.md(df.head(5).to_markdown()),
},
)
# ============ ASSET 2: CLIENTI AGGIORNATI ============
@asset(
name="raw_clienti",
group_name="ingestione",
partitions_def=daily_partitions,
compute_kind="python",
)
def raw_clienti(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Estrae snapshot clienti aggiornati."""
partition_date = context.partition_key
df = postgres_erp.execute_query(f"""
SELECT id, email, nome, cognome, citta, segmento
FROM customers
WHERE DATE(updated_at) = '{partition_date}'
""")
return Output(df, metadata={"num_records": MetadataValue.int(len(df))})
# ============ ASSET 3: ORDINI ARRICCHITI (Silver Layer) ============
@asset(
name="silver_ordini_arricchiti",
group_name="silver",
partitions_def=daily_partitions,
ins={
"raw_ordini": AssetIn("raw_ordini"),
"raw_clienti": AssetIn("raw_clienti"),
},
description="Ordini joinati con info cliente, validati",
compute_kind="python",
)
def silver_ordini_arricchiti(
context: AssetExecutionContext,
raw_ordini: pd.DataFrame,
raw_clienti: pd.DataFrame,
postgres_dwh: PostgresResource,
) -> MaterializeResult:
"""Arricchisce e valida gli ordini."""
df = raw_ordini.merge(
raw_clienti[["id", "citta", "segmento"]],
left_on="customer_id",
right_on="id",
how="left",
)
# Validazioni
assert df["importo_totale"].ge(0).all(), "Importi negativi!"
assert df["ordine_id"].is_unique, "Ordini duplicati!"
# Carica nel DWH
postgres_dwh.load_dataframe(
df=df,
table="silver.ordini_arricchiti",
partition_col="data_ordine",
partition_value=context.partition_key,
)
return MaterializeResult(
metadata={
"num_records": MetadataValue.int(len(df)),
"clienti_senza_match": MetadataValue.int(df["segmento"].isna().sum()),
}
)
# ============ ASSET 4: MODELLI DBT (Gold Layer) ============
# Dagster ha integrazione nativa con dbt via dagster-dbt
@dbt_assets(
manifest=dbt_manifest_path, # manifest.json generato da dbt
select="tag:marts", # Solo i modelli gold
)
def dbt_marts_assets(context: AssetExecutionContext, dbt: DbtCliResource):
"""Esegue i modelli dbt del layer gold."""
yield from dbt.cli(["run", "--select", "tag:marts"], context=context).stream()
yield from dbt.cli(["test", "--select", "tag:marts"], context=context).stream()
# ============ JOB: COMPOSIZIONE ============
# In Dagster, un Job seleziona quali asset materializzare
from dagster import define_asset_job, ScheduleDefinition
pipeline_vendite_job = define_asset_job(
name="pipeline_vendite_giornaliera",
selection=[
"raw_ordini",
"raw_clienti",
"silver_ordini_arricchiti",
"dbt_marts_assets*", # Tutti gli asset dbt marts
],
)
pipeline_vendite_schedule = ScheduleDefinition(
job=pipeline_vendite_job,
cron_schedule="0 5 * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
Il vantaggio immediato di questo approccio e visibile: ogni asset dichiara esplicitamente
i propri input (AssetIn), Dagster costruisce automaticamente il grafo delle
dipendenze, e la UI mostra una lineage completa da sorgente a prodotto finale. Il testing
diventa triviale perchè ogni funzione e una pura funzione Python.
IO Manager: la Chiave dell'Integrazione Dagster
Un IO Manager in Dagster definisce come gli asset vengono salvati e caricati tra task. Dagster include IO Manager per S3 (Parquet, CSV), Snowflake, BigQuery, DuckDB, Pandas e molti altri. Puoi configurare IO Manager diversi per ambienti diversi senza modificare il codice degli asset.
# resources.py - Configurazione Resources e IO Managers
from dagster import EnvVar
from dagster_aws.s3 import S3PickleIOManager, S3Resource
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster_snowflake_pandas import SnowflakePandasIOManager
# Configurazione multi-environment
resources_by_env = {
"dev": {
# In dev: salva su DuckDB locale (costo zero)
"io_manager": DuckDBPandasIOManager(
database="./dev_lakehouse.duckdb",
),
"postgres_erp": PostgresResource(
host="localhost",
port=5432,
database="erp_dev",
),
},
"staging": {
# In staging: usa S3 + Parquet
"io_manager": S3PickleIOManager(
s3_resource=S3Resource(),
s3_bucket="my-staging-lakehouse",
s3_prefix="dagster/",
),
},
"prod": {
# In prod: usa Snowflake
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
database="DWH_PROD",
schema="DAGSTER_ASSETS",
),
},
}
Prefect 3: Python-First per la Developer Experience
Prefect, fondato nel 2018, ha abbracciato sin dall'inizio una filosofia diversa: rendere l'orchestrazione il più semplice e Pythonico possibile. La versione 3, rilasciata nel 2024 e maturata nel 2025, porta questa visione al suo compimento: qualsiasi funzione Python può diventare un task orchestrato con l'aggiunta di un singolo decoratore.
Il modello Flow/Task di Prefect elimina gran parte della configurazione richiesta da Airflow e Dagster. Non serve definire DAG esplicitamente, non servono file di configurazione separati, non serve imparare un DSL proprietario: scrivi Python normale e Prefect fa il resto.
Nel 2025, Prefect 3 ha introdotto Prefect Incidents per la gestione strutturata delle interruzioni di servizio, automazioni con trigger basati su metriche (non solo su eventi di esecuzione), e l'integrazione nativa con Modal per l'esecuzione serverless scalabile. Prefect Cloud offre un piano gratuito generoso che lo rende accessibile anche a team piccoli.
Code Example: Flow Prefect per Pipeline Vendite
# flows/pipeline_vendite.py
import pandas as pd
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret
from prefect.filesystems import S3
from prefect_slack import SlackWebhook
from datetime import timedelta, date
from typing import Optional
# ============ TASK (funzioni Python con decoratore) ============
@task(
name="Estrai Ordini ERP",
description="Estrae ordini completati per la data specificata",
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash, # Cache basata su input
cache_expiration=timedelta(hours=1), # Valida per 1 ora
tags=["estrazione", "erp"],
)
def estrai_ordini(data: date) -> pd.DataFrame:
"""Estrae ordini dall'ERP per la data specificata."""
logger = get_run_logger()
# Leggi credenziali da Prefect Blocks (gestione sicura secrets)
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
query = """
SELECT o.id AS ordine_id, o.customer_id,
o.created_at, o.status,
SUM(oi.qty * oi.unit_price) AS importo_totale
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = :data
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
"""
with engine.connect() as conn:
df = pd.read_sql(query, conn, params={"data": str(data)})
logger.info(f"Estratti {len(df)} ordini per {data}")
return df
@task(
name="Estrai Clienti ERP",
retries=2,
retry_delay_seconds=30,
tags=["estrazione", "erp"],
)
def estrai_clienti(data: date) -> pd.DataFrame:
"""Estrae clienti aggiornati."""
logger = get_run_logger()
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df = pd.read_sql("""
SELECT id, email, nome, cognome, citta, segmento
FROM customers
WHERE DATE(updated_at) = :data
""", conn, params={"data": str(data)})
logger.info(f"Estratti {len(df)} clienti aggiornati")
return df
@task(
name="Valida e Arricchisci Ordini",
retries=1,
)
def valida_e_arricchisci(
ordini: pd.DataFrame,
clienti: pd.DataFrame,
) -> pd.DataFrame:
"""Valida e arricchisce gli ordini con dati cliente."""
logger = get_run_logger()
# Validazioni
if ordini["importo_totale"].lt(0).any():
raise ValueError("Trovati importi negativi nel dataset ordini")
if not ordini["ordine_id"].is_unique:
raise ValueError("Trovati ordini duplicati")
# Join
df = ordini.merge(
clienti[["id", "citta", "segmento"]],
left_on="customer_id",
right_on="id",
how="left",
)
match_rate = (1 - df["segmento"].isna().mean()) * 100
logger.info(f"Match rate clienti: {match_rate:.1f}%")
return df
@task(name="Carica nel DWH", retries=2)
def carica_dwh(df: pd.DataFrame, schema: str, table: str):
"""Carica il dataframe nel DWH."""
conn_string = Secret.load("dwh-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df.to_sql(
name=table,
con=conn,
schema=schema,
if_exists="replace",
index=False,
)
return len(df)
@task(name="Esegui dbt", retries=1)
def esegui_dbt(target: str = "prod"):
"""Lancia dbt run e test sui modelli marts."""
import subprocess
result = subprocess.run(
["dbt", "run", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if result.returncode != 0:
raise RuntimeError(f"dbt run fallito:\n{result.stderr}")
test_result = subprocess.run(
["dbt", "test", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if test_result.returncode != 0:
raise RuntimeError(f"dbt test fallito:\n{test_result.stderr}")
# ============ FLOW PRINCIPALE ============
@flow(
name="Pipeline Vendite Giornaliera",
description="Orchestrazione ETL vendite: ERP -> DWH -> dbt marts",
flow_run_name="vendite-{data}", # Nome run dinamico
retries=0, # Retry a livello task, non flow
timeout_seconds=7200, # 2 ore max
log_prints=True,
)
def pipeline_vendite(data: Optional[date] = None):
"""
Flow principale per la pipeline vendite.
Può essere lanciato manualmente con una data specifica
o schedulato per girare ogni giorno.
"""
logger = get_run_logger()
if data is None:
data = date.today()
logger.info(f"Avvio pipeline vendite per {data}")
# Estrazione parallela (Prefect gestisce la concorrenza automaticamente)
# submit() esegue il task in modo asincrono
future_ordini = estrai_ordini.submit(data)
future_clienti = estrai_clienti.submit(data)
# Attende entrambi i task e recupera i risultati
ordini = future_ordini.result()
clienti = future_clienti.result()
if len(ordini) == 0:
logger.warning(f"Nessun ordine per {data}, skip elaborazione")
return {"status": "skipped", "data": str(data)}
# Task sequenziali
df_arricchito = valida_e_arricchisci(ordini, clienti)
num_record = carica_dwh(df_arricchito, schema="silver", table="ordini_arricchiti")
esegui_dbt(target="prod")
logger.info(f"Pipeline completata: {num_record} record elaborati")
return {"status": "success", "records": num_record, "data": str(data)}
# ============ DEPLOYMENT ============
# Prefect 3: il deployment definisce schedule e infrastruttura
if __name__ == "__main__":
from prefect.deployments import DeploymentImage
pipeline_vendite.deploy(
name="prod-giornaliero",
cron="0 5 * * *",
work_pool_name="k8s-work-pool", # Esegui su Kubernetes
image=DeploymentImage(
name="myregistry/pipeline-vendite:latest",
platform="linux/amd64",
),
parameters={}, # Parametri default
)
Prefect Blocks: Gestione Centralizzata di Secrets e Configurazione
I Prefect Blocks sono oggetti configurabili archiviati nel Prefect backend (Cloud o self-hosted) che centralizzano la gestione di connessioni, secrets e configurazioni infrastrutturali. Eliminano la necessità di variabili d'ambiente disperse nei container e permettono aggiornamenti senza rideploy del codice.
Confronto Airflow vs Dagster vs Prefect: Qual è il Migliore?
La risposta onesta e: dipende. Ogni strumento ha un caso d'uso ideale e dei punti di forza specifici. Ecco un confronto strutturato per aiutarti nella scelta.
Confronto Dettagliato: Airflow vs Dagster vs Prefect (2025)
| Criterio | Airflow 3.0 | Dagster 1.9 | Prefect 3 |
|---|---|---|---|
| Paradigma | Task-centric (DAG) | Asset-centric (SDA) | Task-centric (Python-first) |
| Curva di apprendimento | Alta (DAG semantics, provider, hook) | Media-Alta (asset model, IO Manager) | Bassa (decorator su funzioni esistenti) |
| Data Lineage | Limitata (task dependencies) | Eccellente (nativa, visuale) | Limitata (aggiunta recentemente) |
| Developer Experience (local) | Discreta (Docker compose pesante) | Ottima (dagster dev, UI locale) | Eccellente (pip install, run locale) |
| Testing | Complesso (dipende da infra) | Ottimo (asset come pure functions) | Ottimo (task come pure functions) |
| Integrazione dbt | Buona (dbt Cloud operator) | Eccellente (dagster-dbt, asset nativa) | Buona (prefect-dbt) |
| Scheduling | Ottimo (cron, event, asset, deadline) | Ottimo (cron, event, asset, sensor) | Buono (cron, event, automation) |
| Backfill | Ottimo (scheduler-managed in 3.0) | Eccellente (partitioning nativo) | Discreto (manuale o automations) |
| Scalabilità | Alta (CeleryExecutor, KubernetesExecutor) | Alta (Kubernetes, ECS, Docker) | Alta (work pools, K8s, Modal) |
| Monitoring UI | Ottima (React UI in 3.0) | Eccellente (catalog, lineage graph) | Ottima (Prefect Cloud dashboard) |
| Deployment model | Complesso (helm chart, compose) | Medio (dagster-webserver, k8s) | Semplice (.deploy(), work pools) |
| Community/Ecosystem | Enorme (35k+ GitHub stars, centinaia di provider) | Crescente (10k+ stars, enterprise focus) | Grande (15k+ stars, developer focus) |
| Managed cloud | Astronomer, MWAA, Cloud Composer | Dagster Cloud (Serverless + Hybrid) | Prefect Cloud (generous free tier) |
| Costo self-hosted | Open source (infra da gestire) | Open source (infra da gestire) | Open source (infra da gestire) |
| Costo managed | Astronomer da $500/mese | Dagster Cloud da $500/mese | Prefect Cloud free tier generoso |
| Ideale per | Team con esperienza Airflow, pipeline ETL classiche | Team data-driven, integrazione dbt, ML pipelines | Team Python, rapid prototyping, event-driven |
Guida Rapida alla Scelta
- Scegli Airflow se: il tuo team ha già competenze Airflow, lavori con ecosistemi complessi (Spark, Hadoop, molti provider), hai bisogno dell'enorme ecosistema di operatori disponibili, o sei su AWS (MWAA) o GCP (Cloud Composer) che lo gestiscono nativamente.
- Scegli Dagster se: la data quality e la lineage sono priorità, il tuo stack include dbt e vuoi integrazione profonda, il team e abituato a ragionare per asset e non per task, o stai costruendo ML pipelines dove vuoi tracciare ogni artefatto.
- Scegli Prefect se: vuoi la curva di apprendimento più bassa, il team e Python-first senza esperienza di orchestrazione, hai bisogno di workflow event-driven e operazionali (non solo dati), o vuoi partire velocemente con il free tier di Prefect Cloud.
Temporal: Durable Execution per Workflow Complessi
Mentre Airflow, Dagster e Prefect si concentrano sull'orchestrazione di data pipeline, Temporal risolve un problema diverso: l'esecuzione durable di workflow applicativi long-running che devono sopravvivere a crash, restart e interruzioni di rete.
Sviluppato da ex-ingegneri di Uber (che avevano creato Cadence, il suo predecessore), Temporal tratta ogni workflow come una funzione stateful durabile: se il server si riavvia mentre un workflow e in esecuzione, riprende esattamente da dove si era fermato, senza perdere stato. Questo e fondamentale per workflow che durano ore, giorni o settimane (es: processi di onboarding, orchestrazione di agenti AI, saga pattern per transazioni distribuite).
Temporal vs Airflow/Dagster: Quando Scegliere
| Scenario | Strumento Consigliato | Motivo |
|---|---|---|
| Pipeline ETL giornaliere | Airflow / Dagster / Prefect | Ottimizzati per batch data orchestration |
| Trasformazioni dbt | Dagster (migliore) / Airflow | Integrazione nativa, asset lineage |
| ML training pipelines | Dagster / Prefect | Tracking artefatti, testing facilitato |
| Processi di onboarding utente (multi-step) | Temporal | Durability, state management applicativo |
| Orchestrazione agenti AI / LLM | Temporal / Prefect | Long-running, gestione fallimenti complessa |
| Saga pattern / transazioni distribuite | Temporal | Compensating transactions, durability |
| Pipeline IoT real-time | Kafka + Flink (non orchestratori batch) | Streaming richiede architettura diversa |
Molte aziende adottano un approccio ibrido: Airflow o Dagster per l'orchestrazione delle data pipeline, Temporal per i workflow applicativi che richiedono durability. I due strumenti si complementano perfettamente e non sono in competizione diretta.
Architettura di Riferimento: Orchestrazione nel Data Stack Moderno
Come si integra un orchestratore con il resto del data stack? Vediamo un'architettura di riferimento completa che combina tutti i componenti che abbiamo visto in questa serie: Airbyte per l'ingestione, dbt per le trasformazioni, Apache Iceberg come table format, e Dagster come orchestratore centrale.
Architettura Data Stack con Dagster come Orchestratore
| Layer | Componente | Tool | Orchestrato da Dagster? |
|---|---|---|---|
| Ingestione | Sincronizzazione sorgenti | Airbyte / Fivetran | Si (dagster-airbyte) |
| Storage | Object storage + table format | S3 + Apache Iceberg | Si (IO Manager) |
| Trasformazione | Modelli SQL dichiarativi | dbt Core / Cloud | Si (dagster-dbt, asset nativa) |
| Quality | Test e validazione dati | dbt tests, Great Expectations | Si (asset checks) |
| Serving | Query engine | Trino / Athena / DuckDB | No (query on-demand) |
| Analytics | Dashboard e report | Metabase / Tableau | No (consumano dati) |
| ML | Feature engineering + training | Python + MLflow | Si (asset ML) |
# definitions.py - Dagster Definitions: composizione del data stack completo
from dagster import Definitions, load_assets_from_modules
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance
from dagster_dbt import DbtCliResource
from dagster_aws.s3 import S3Resource
# Import dei moduli asset
from assets import ingestione, silver, gold, ml_features
# ============ CARICAMENTO ASSET ============
# Asset Airbyte: ogni connessione Airbyte diventa un Dagster asset
airbyte_assets = load_assets_from_airbyte_instance(
airbyte=AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
connection_filter=lambda meta: meta.name.startswith("prod_"),
)
# Asset dbt: ogni modello dbt diventa un Dagster asset con lineage completa
@dbt_assets(
manifest=Path("dbt_project/target/manifest.json"),
)
def dbt_all_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Asset Python custom
python_assets = load_assets_from_modules([ingestione, silver, gold, ml_features])
# ============ RISORSE ============
resources_prod = {
"airbyte": AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
"dbt": DbtCliResource(
project_dir="dbt_project",
profiles_dir="dbt_project",
target="prod",
),
"s3": S3Resource(region_name="eu-west-1"),
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
),
"slack": SlackResource(token=EnvVar("SLACK_BOT_TOKEN")),
}
# ============ COMPOSIZIONE FINALE ============
defs = Definitions(
assets=[
*airbyte_assets,
dbt_all_assets,
*python_assets,
],
resources=resources_prod,
jobs=[pipeline_vendite_job, pipeline_clienti_job, pipeline_ml_job],
schedules=[pipeline_vendite_schedule, pipeline_ml_weekly_schedule],
sensors=[nuovo_file_s3_sensor, airbyte_sync_sensor],
)
Monitoring, Alerting e Observability
Un orchestratore senza monitoring e come un aereo senza strumenti di bordo. Il monitoring delle pipeline e fondamentale per garantire SLA, diagnosticare problemi e comunicare lo stato della piattaforma dati al business.
Stack di Monitoring Consigliato
Strumenti di Monitoring per Pipeline Orchestrate
| Layer | Tool | Cosa Monitora |
|---|---|---|
| Metriche infrastruttura | Prometheus + Grafana | CPU/RAM workers, coda task, latenza scheduler |
| Metriche pipeline | Airflow metrics / Dagster events / Prefect API | Task success rate, durata, SLA violations |
| Log centralizzati | ELK Stack / CloudWatch / Loki | Log di ogni task instance per debugging |
| Alerting | PagerDuty / OpsGenie | Escalation per fallimenti critici, SLA breach |
| Notifiche team | Slack / Microsoft Teams | Notifiche su successo/fallimento, report giornaliero |
| Data quality | Elementary / dbt tests / Great Expectations | Anomalie nei dati, freshness, row counts |
# monitoring/alerting_setup.py
# Configurazione alerting Airflow con Slack e PagerDuty
from airflow.hooks.base import BaseHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime
def task_failure_alert(context):
"""
Callback chiamato su ogni task failure.
Inviamo notifica Slack con dettagli del fallimento.
"""
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
run_id = context["run_id"]
execution_date = context["execution_date"]
log_url = context["task_instance"].log_url
exception = context.get("exception", "N/A")
message = f"""
:x: *Task Fallito*
*DAG:* {dag_id}
*Task:* {task_id}
*Run ID:* {run_id}
*Data:* {execution_date.strftime('%Y-%m-%d %H:%M')}
*Errore:* `{str(exception)[:200]}`
*Log:* <{log_url}|Vedi Log Airflow>
"""
slack_op = SlackWebhookOperator(
task_id="slack_alert",
slack_webhook_conn_id="slack_data_ops",
message=message,
channel="#data-ops-alerts",
dag=context["dag"],
)
slack_op.execute(context)
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Callback per SLA breach: invia alert critico su PagerDuty."""
import requests
sla_tasks = ", ".join([f"{s.task_id}" for s in slas])
message = f"SLA BREACH su DAG {dag.dag_id}: task {sla_tasks} non completati in tempo"
# PagerDuty Events API
payload = {
"routing_key": "YOUR_PAGERDUTY_INTEGRATION_KEY",
"event_action": "trigger",
"payload": {
"summary": message,
"severity": "critical",
"source": "airflow",
"component": dag.dag_id,
"custom_details": {
"sla_tasks": sla_tasks,
"dag_id": dag.dag_id,
},
},
}
requests.post("https://events.pagerduty.com/v2/enqueue", json=payload)
# Utilizzo nel DAG
with DAG(
dag_id="pipeline_critica",
default_args={
"on_failure_callback": task_failure_alert, # Per ogni task
},
sla_miss_callback=sla_miss_callback, # Per SLA violations
schedule="0 5 * * *",
# ...
) as dag:
task_etl = PythonOperator(
task_id="etl_principale",
python_callable=esegui_etl,
sla=timedelta(hours=1), # Deve completare entro 1 ora
)
Grafana Dashboard per Pipeline Metrics
Airflow espone metriche via StatsD (o direttamente Prometheus con airflow-prometheus-exporter). Ecco le metriche più importanti da monitorare in una dashboard Grafana:
Metriche Airflow da Monitorare in Grafana
- airflow.scheduler.tasks.starving: Task in coda senza worker disponibili (indica scalabilità insufficiente)
- airflow.dagrun.duration.success: Distribuzione durata delle DAG run (percentile P95 come SLA indicatore)
- airflow.task_instance.failures: Trend dei fallimenti per task (anomalie segnalano problemi sorgente)
- airflow.scheduler.heartbeat: Frequenza heartbeat scheduler (se si ferma, nessun task viene schedulato)
- airflow.pool.open_slots: Slot liberi nei pool (indica saturazione)
- airflow.dagrun.first_task_scheduling_delay: Ritardo tra schedule e primo task avviato
Best Practices per Pipeline Robuste in Produzione
1. Idempotenza: la Regola d'Oro
Ogni task deve essere idempotente: eseguirlo più volte con gli stessi parametri deve produrre lo stesso risultato. Questa proprietà e fondamentale perchè i retry (automatici o manuali) sono inevitabili in produzione.
# SBAGLIATO: non idempotente (doppio insert)
def carica_dati_wrong(**context):
df = estrai_dati()
pg_hook.insert_rows("staging.ordini", df.values.tolist()) # APPEND!
# CORRETTO: idempotente (DELETE + INSERT per la data specifica)
def carica_dati_correct(**context):
execution_date = context["ds"] # "2025-01-15"
df = estrai_dati(execution_date)
with pg_hook.get_conn() as conn:
with conn.cursor() as cur:
# 1. Elimina i dati esistenti per questa partizione
cur.execute(
"DELETE FROM staging.ordini WHERE DATE(data_ordine) = %s",
[execution_date]
)
# 2. Inserisci i nuovi dati
execute_values(cur, """
INSERT INTO staging.ordini (ordine_id, data_ordine, importo)
VALUES %s
""", df[["ordine_id", "data_ordine", "importo"]].values.tolist())
conn.commit()
# OPPURE: usa INSERT ... ON CONFLICT per UPSERT atomico
def carica_dati_upsert(**context):
execution_date = context["ds"]
df = estrai_dati(execution_date)
pg_hook.insert_rows(
table="staging.ordini",
rows=df.values.tolist(),
target_fields=["ordine_id", "data_ordine", "importo"],
replace=True,
replace_index=["ordine_id"], # PK: upsert su ordine_id
)
2. Retry Policy e Backoff Esponenziale
Configura sempre una retry policy ragionevole. I fallimenti transitori (timeout di rete, lock del database, servizi temporaneamente non disponibili) si risolvono spesso con un retry dopo qualche minuto. Il backoff esponenziale evita di sovraccaricare sistemi gia in difficolta.
# Retry policy consigliata per diversi tipi di task
from datetime import timedelta
# Task di estrazione da API esterne: retry aggressivi con backoff
default_args_api = {
"retries": 5,
"retry_delay": timedelta(minutes=1),
"retry_exponential_backoff": True, # 1, 2, 4, 8, 16 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(minutes=30),
}
# Task di trasformazione dbt: retry conservativi
default_args_dbt = {
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
# Task di notifica: no retry (evita spam di alert)
default_args_notify = {
"retries": 0,
"execution_timeout": timedelta(minutes=2),
}
3. Dependency Management con Sensors
Non sempre e possibile schedule una pipeline con un orario fisso. A volte bisogna aspettare che un file arrivi in S3, che un'altra pipeline termini, o che un dato esterno sia disponibile. I Sensor operator di Airflow (e Dagster) sono la soluzione.
# Sensor: attendi che il file di input sia disponibile prima di procedere
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
with DAG("pipeline_dipendente", ...) as dag:
# Attendi file S3 (polling ogni 5 minuti, max 2 ore)
wait_for_file = S3KeySensor(
task_id="wait_for_input_file",
bucket_name="my-data-lake",
bucket_key="inputs/vendite_{{ ds_nodash }}.csv",
aws_conn_id="aws_default",
poke_interval=300, # Controlla ogni 5 min
timeout=7200, # Timeout dopo 2 ore
mode="reschedule", # Libera il worker mentre aspetta
)
# Attendi che un'altra DAG abbia finito
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_erp_sync",
external_dag_id="sincronizzazione_erp",
external_task_id="verify_sync_complete",
allowed_states=["success"],
failed_states=["failed", "skipped"],
poke_interval=120,
timeout=3600,
mode="reschedule",
)
estrai = PythonOperator(task_id="estrai_dati", ...)
# Pipeline partira solo quando entrambi i sensor sono verdi
[wait_for_file, wait_for_upstream] >> estrai
Anti-Pattern: Errori Comuni da Evitare
- Task con stato globale mutabile: Non usare variabili globali Python nei DAG. Il DAG viene parsato frequentemente dallo scheduler; le variabili globali creano comportamenti imprevedibili. Usa XCom, Variables o TaskFlow API.
- Business logic nei DAG file: I file DAG devono contenere solo la definizione dell'orchestrazione. La logica di business va in moduli Python separati, importati dai task. Facilità il testing e riduce il tempo di parsing.
- XCom per dataset grandi: XCom e pensato per piccoli metadati (conteggi, path di file, flag). Non usarlo per passare DataFrame tra task: usa staging tables o file S3.
- Pipeline monolitiche senza granularità: Un task unico che fa tutto (estrai + trasforma + carica) rende il debugging impossibile. Ogni operazione logicamente distinta deve essere un task separato.
-
Catchup=True con schedule fitte: Se abiliti il catchup su una
pipeline con schedule oraria e la accendi dopo 30 giorni di pausa, lancerai 720
esecuzioni contemporaneamente. Usa always
catchup=Falseper default e gestisci il backfill esplicitamente. - Secrets hardcoded: Non inserire mai password, API key o connection string direttamente nel codice. Usa Airflow Connections, Dagster Resources, o Prefect Blocks integrati con AWS Secrets Manager o HashiCorp Vault.
4. Testing delle Pipeline
Una pipeline non testata e un debito tecnico garantito. Ecco come strutturare i test per un DAG Airflow e per un asset Dagster.
# tests/test_pipeline_vendite.py
import pytest
from unittest.mock import MagicMock, patch
from datetime import date
import pandas as pd
# ============ TEST AIRFLOW ============
from airflow.models import DagBag
def test_dag_integrity():
"""Verifica che i DAG siano validi e non abbiano cicli."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
assert len(dagbag.import_errors) == 0, \
f"Errori di import: {dagbag.import_errors}"
dag = dagbag.get_dag("pipeline_vendite_giornaliera")
assert dag is not None, "DAG non trovato"
assert dag.schedule == "0 5 * * *"
def test_dag_task_count():
"""Verifica il numero di task nel DAG."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("pipeline_vendite_giornaliera")
assert len(dag.tasks) >= 5, "Troppo pochi task nel DAG"
# ============ TEST DAGSTER ============
from dagster import materialize, build_asset_context
from assets import raw_ordini, silver_ordini_arricchiti
def test_raw_ordini_asset():
"""Test dell'asset raw_ordini con mock delle risorse."""
mock_df = pd.DataFrame({
"ordine_id": [1, 2, 3],
"customer_id": [101, 102, 103],
"importo_totale": [150.0, 89.99, 220.5],
"data_ordine": ["2025-01-15"] * 3,
})
with patch("assets.ingestione.PostgresResource") as mock_pg:
mock_pg.return_value.execute_query.return_value = mock_df
result = materialize(
[raw_ordini],
resources={"postgres_erp": mock_pg.return_value},
partition_key="2025-01-15",
)
assert result.success
asset_value = result.output_for_node("raw_ordini")
assert len(asset_value) == 3
assert (asset_value["importo_totale"] >= 0).all()
# ============ TEST PREFECT ============
from flows.pipeline_vendite import estrai_ordini, valida_e_arricchisci
from prefect.testing.utilities import prefect_test_harness
def test_valida_e_arricchisci():
"""Test del task di validazione con dati di test."""
ordini = pd.DataFrame({
"ordine_id": [1, 2],
"customer_id": [101, 102],
"importo_totale": [100.0, 200.0],
})
clienti = pd.DataFrame({
"id": [101, 102],
"citta": ["Milano", "Roma"],
"segmento": ["Premium", "Standard"],
})
with prefect_test_harness():
result = valida_e_arricchisci.fn(ordini, clienti)
assert len(result) == 2
assert "segmento" in result.columns
assert result["segmento"].isna().sum() == 0
def test_valida_importi_negativi():
"""Test che importi negativi generino eccezione."""
ordini = pd.DataFrame({
"ordine_id": [1],
"importo_totale": [-50.0], # NEGATIVO
"customer_id": [101],
})
clienti = pd.DataFrame({"id": [101], "citta": ["Milano"], "segmento": ["Standard"]})
with prefect_test_harness():
with pytest.raises(ValueError, match="Trovati importi negativi"):
valida_e_arricchisci.fn(ordini, clienti)
Conclusioni e Raccomandazioni
L'orchestrazione e la spina dorsale di qualsiasi data stack maturo. Senza un orchestratore affidabile, anche le pipeline più sofisticate diventano fragili, opache e difficili da gestire. Abbiamo esplorato i tre strumenti principali del mercato 2025 e visto come ciascuno risponda a esigenze diverse.
Raccomandazioni per Scenario
- PMI che inizia ora: Scegli Prefect 3 con Prefect Cloud free tier. Il time-to-value e minimo, la curva di apprendimento bassa, e puoi passare a soluzioni più enterprise quando la complessità lo richiede.
- Team con stack dbt: Dagster e la scelta naturale. L'integrazione nativa con dbt tramite dagster-dbt e la lineage automatica tra modelli dbt e asset Dagster riducono enormemente il cognitive load del team.
- Enterprise con competenze Airflow esistenti: Airflow 3.0 su Astronomer (managed) o MWAA. Non c'è motivo di migrare se il team e già produttivo su Airflow, e la versione 3.0 ha colmato gran parte del gap con Dagster e Prefect.
- Workflow applicativi long-running: Considera Temporal in combinazione con il tuo orchestratore dati preferito. Sono strumenti complementari, non alternativi.
Il prossimo articolo della serie si addentra nell'AI applicata alla Manifattura: vedremo come costruire sistemi di predictive maintenance con sensori IoT, Apache Kafka per lo streaming dei dati macchina, e modelli ML per anticipare i guasti prima che accadano. Un caso d'uso concreto che dimostra come il data stack che stiamo costruendo diventa il fondamento dell'intelligenza artificiale in azienda.
Checklist per Mettere in Produzione il Tuo Orchestratore
- Tutti i task sono idempotenti: rieseguirli non produce duplicati
- Retry policy configurata con backoff esponenziale per ogni task
- Secrets gestiti tramite Connections/Blocks/Resources, non hardcoded
- Monitoring attivo con Grafana o equivalente per metriche infrastrutturali
- Alerting su Slack o PagerDuty per fallimenti e SLA breach
- Test di integrita dei DAG/asset nella CI/CD pipeline
- Catchup disabilitato per default, backfill gestito esplicitamente
- Business logic separata dai file di definizione pipeline
- Log centralizzati accessibili senza accesso diretto ai worker
- Documentazione inline: ogni DAG/asset ha una descrizione chiara
Link Utili per Approfondire
- Articolo precedente: ETL vs ELT Moderno: dbt, Airbyte e Fivetran
- Prossimo articolo: AI nella Manifattura: Predictive Maintenance e Digital Twin
- Serie correlata: MLOps per Business - Modelli AI in Produzione con MLflow
- Serie correlata: LLM in Azienda: RAG Enterprise, Fine-Tuning e Guardrails







