Orkiestracja rurociągu: przepływ powietrza, dagster i prefekt
Wyobraź sobie, że masz bezbłędny stos danych: domek nad jeziorem na Apache Iceberg, transformacje eleganckie złącza dbt, Airbyte, które synchronizują dane z kilkunastu źródeł. Wszystko idealnie, dopóki ktoś nie zapyta: „Ale kto tym wszystkim steruje? Kto gwarantuje, że czy zadanie pozyskiwania rozpoczyna się przed transformacjami? Kto ostrzega Cię, gdy coś pójdzie nie tak o 3 nad ranem?” Odpowiedź brzmikoordynator rurociągów.
Orkiestracja i niewidoczny klej stosu danych: składnik, który koordynuje wykonanie setek współzależnych zadań, zarządza ponownymi próbami w przypadku awarii, monitoruje stan rurociągów i tworzy ścieżkę audytu wymaganą przez firmę. Bez orkiestratora potoki danych są skryptami cron utrzymywanymi razem przez nadzieję.
W 2025 r. krajobraz orkiestratorów jest zdominowany przez trzy główne platformy: Apache Airflow 3.0, weteran, który zdefiniował samą koncepcję DAG orkiestracja; Dagster 1.x, pretendent, który wprowadził tę koncepcję aktywów zdefiniowanych programowo; I Prefekt 3, rozwiązanie zaprojektowane jako pierwsze w języku Python dla doświadczenia programisty. W tym artykule przeanalizujemy je dogłębnie za pomocą kodu prawdziwe i uczciwe porównanie, które pomoże Ci wybrać właściwe dla Twojego kontekstu.
Czego dowiesz się w tym artykule
- Architektura wewnętrzna Apache Airflow 3.0: harmonogram, moduł wykonawczy, proces roboczy, baza danych metadanych
- Jak napisać kompletny DAG w języku Python dla kompleksowego potoku ETL
- Paradygmat aktywów zdefiniowanych programowo Dagstera i dlaczego zmienia sposób, w jaki myślimy o rurociągach
- Model przepływu/zadań Prefect 3 i jego uproszczony model wdrażania
- Szczegółowe porównanie trzech narzędzi z tabelą porównawczą według przypadków użycia
- Temporal jako alternatywa dla trwałych i długotrwałych przepływów pracy
- Architektura referencyjna integrująca orkiestrację z dbt, Airbyte i data Lakehouse
- Monitorowanie, ostrzeganie i najlepsze praktyki dotyczące idempotentnych potoków w produkcji
Artykuły z serii Hurtownia danych, sztuczna inteligencja i transformacja cyfrowa
| # | Przedmiot | Centrum |
|---|---|---|
| 1 | Ewolucja hurtowni danych: od SQL Server do Data Lakehouse | Architektury i platformy |
| 2 | Siatka danych i zdecentralizowana architektura | Zarządzanie i własność |
| 3 | ETL kontra nowoczesny ELT: dbt, Airbyte i Fivetran | Rurociąg transformacji |
| 4 | Jesteś tutaj — Orkiestracja rurociągów | Przepływ powietrza, Dagster, prefekt |
| 5 | Sztuczna inteligencja w produkcji: konserwacja predykcyjna | IoT, ML, cyfrowy bliźniak |
| 6 | Sztuczna inteligencja w finansach: wykrywanie oszustw i punktacja kredytowa | ML w czasie rzeczywistym |
| 7 | Sztuczna inteligencja w handlu detalicznym: prognozowanie popytu i rekomendacje | Zastosowano ML |
ponieważ orkiestracja jest podstawą
Zanim przejdziemy do szczegółów technicznych, warto dokładnie zrozumieć, jaki problem rozwiązuje orkiestrator. Typowy potok danych dla średniej i dużej firmy obejmuje:
- Ekstrakcja z CRM, ERP, transakcyjnych baz danych, zewnętrznych API (10-50 źródeł)
- Prześlij do Data Lake (Airflow uruchamia Airbyte lub Fivetran)
- transformacje dbt (30-200 modeli ze złożonymi zależnościami)
- Aktualizowanie hurtowni danych i tabel zbiorczych
- Eksport do narzędzi BI (Metabase, Tableau, Power BI)
- Aktualizacja modeli ML (inżynieria funkcji + przekwalifikowanie)
Każdy z tych kroków ma uzależnienia (B nie może odejść, zanim A ukończone), stwardnienie zanikowe boczne (ALS). (raporty muszą być gotowe do godz. 08:00), e wymagania jakościowe (jeśli dane źródłowe są puste, nie uruchamiaj programu transformacje). Zajmij się tym wszystkim za pomocą oddzielnych skryptów cron i gwarantowanej katastrofy.
Co robi Pipeline Orchestrator
| Funkcjonalność | Opis |
|---|---|
| Zarządzanie zależnościami | Definiuje kolejność wykonywania zadań i zarządza zależnościami potoków |
| Planowanie | Planowanie Cron, sterowane zdarzeniami, obsługujące dane (wyzwalanie w przypadku aktualizacji zasobu) |
| Ponowna próba i obsługa błędów | Automatyczne ponawianie prób z wykładniczym wycofywaniem, zarządzanie częściowymi awariami |
| Równoległość | Równoległe wykonywanie niezależnych zadań w celu optymalizacji czasu |
| Monitorowanie | Scentralizowany pulpit nawigacyjny, dziennik zadań, monitorowanie SLA, alerty |
| Uzupełnienia | Ponowne uruchamianie historycznych przebiegów, gdy zmienia się logika potoku |
| Ścieżka audytu | Pełna historia każdego wykonania pod kątem zgodności i debugowania |
| Parametryzacja | Zmienna konfiguracja dla środowisk (dev, staging, prod) i uruchamianie ręczne |
Apache Airflow 3.0: odnowiony weteran
Stworzony przez Airbnb w 2014 r. i przekazany fundacji Apache Software Foundation w 2016 r. Apacz Przepływ powietrza i stał się de facto standardem w zakresie orkiestracji potoków danych. Z poza 35 000 gwiazdek na GitHubie i społeczność licząca setki osób współpracowników, Airflow i używany przez tysiące firm na całym świecie, począwszy od start-upów do dużych przedsiębiorstw.
W 2025 roku Airflow przeszedł najbardziej znaczącą ewolucję wraz z wydaniem wersja 3.0, która wprowadza architekturę klient-serwer z Wykonanie zadania Interfejs, natywne planowanie uwzględniające zasoby (odziedziczone od Dagstera), DAG wersjonowanie i całkowicie przeprojektowany interfejs użytkownika. I to już nie tylko narzędzie do planowania zadań: to narzędzie nowoczesna platforma orkiestracyjna.
Architektura Apache Airflow
Zrozumienie architektury Airflow ma kluczowe znaczenie dla prawidłowego wdrożenia i diagnozowania problemy w produkcji. Istnieje pięć głównych komponentów:
Elementy architektoniczne przepływu powietrza
| Część | Rola | Technologia |
|---|---|---|
| Serwer WWW | Interfejs sieciowy do monitorowania, debugowania i ręcznego wyzwalania DAG | Kolba + Gunicorn (2.x), FastAPI (3.0) |
| Harmonogram | Analizuje DAG, planuje zadania, umieszcza je w kolejce wykonawców | Demon Pythona, HA z wieloma instancjami |
| Wykonawca | Wykonuje zadania: LocalExecutor (pojedynczy węzeł), CeleryExecutor, KubernetesExecutor | Seler + Redis/RabbitMQ lub K8s |
| Pracownicy | Właściwie przetwarzaj zadania (tylko z CeleryExecutor/KubernetesExecutor) | Pracownik selera lub kapsuła K8 |
| Baza danych metadanych | Status wszystkich przebiegów DAG, instancji zadań, zmiennych, połączeń | PostgreSQL (zalecany w prod), MySQL |
| Procesor DAG | Nowość w Airflow 3.0: Parser DAG oddzielony od harmonogramu | Pula procesów Pythona |
Koncepcja DAG
Serce Airflow i DAG (ukierunkowany wykres acykliczny): graf skierowany bez cykli, który określa kolejność wykonywania zadań. Każdy węzeł grafu jest zadaniem, każdy łuk jest zależnością. Wykres musi być acykliczny (brak cyklu) zatem nie można tworzyć zależności cyklicznych.
Zadania realizowane są poprzez Operator: Klasy Pythona, które są abstrakcyjne rodzaj pracy. Airflow zawiera setki wbudowanych operatorów (PythonOperator, BashOperator, PostgresOperator, SparkSubmitOperator, dbtCloudOperator...) i społeczność publikuje tyle samo za pośrednictwem pakietów dostawców.
Przykład kodu: Ukończ ETL DAG dla rurociągu sprzedaży
Zobaczmy kompletny i realistyczny DAG orkiestrujący potok ETL: ekstrakt z PostgreSQL, ładowanie do stagingu, transformacja dbt i powiadamianie zespołu.
# dags/pipeline_vendite.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import pandas as pd
import logging
logger = logging.getLogger(__name__)
# Configurazione DAG con default_args per tutti i task
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"email": ["data-team@azienda.it"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True, # 5, 10, 20 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(hours=2),
}
with DAG(
dag_id="pipeline_vendite_giornaliera",
default_args=default_args,
description="Estrai vendite da ERP, carica in DWH, aggrega con dbt",
schedule="0 5 * * *", # Ogni giorno alle 05:00 UTC
start_date=datetime(2025, 1, 1),
end_date=None,
catchup=False, # Non eseguire run passate
max_active_runs=1, # Una sola esecuzione alla volta
tags=["vendite", "etl", "produzione"],
doc_md="""
## Pipeline Vendite Giornaliera
Estrae ordini dall'ERP (PostgreSQL), li carica in staging,
lancia le trasformazioni dbt e notifica il team su Slack.
**SLA**: Completamento entro le 07:30 UTC
**Owner**: Team Data Engineering
""",
) as dag:
# ============ TASK 1: VERIFICA DISPONIBILITA SORGENTE ============
def check_source_data(**context):
"""Verifica che ci siano dati nella finestra di esecuzione."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
execution_date = context["ds"] # "2025-01-15"
count = pg_hook.get_first("""
SELECT COUNT(*)
FROM orders
WHERE DATE(created_at) = %s
""", parameters=[execution_date])[0]
logger.info(f"Trovati {count} ordini per {execution_date}")
if count == 0:
logger.warning("Nessun dato trovato, skip pipeline")
return "notify_no_data" # Branch: salta elaborazione
return "extract_task_group.extract_orders"
check_source = BranchPythonOperator(
task_id="check_source_data",
python_callable=check_source_data,
)
# ============ TASK GROUP: ESTRAZIONE ============
with TaskGroup("extract_task_group") as extract_group:
def extract_orders(**context):
"""Estrae ordini dall'ERP e li salva in staging."""
execution_date = context["ds"]
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
df = pg_hook.get_pandas_df("""
SELECT
o.id AS ordine_id,
o.customer_id,
o.created_at AS data_ordine,
o.status,
SUM(oi.qty * oi.unit_price * (1 - COALESCE(oi.discount, 0)))
AS importo_totale,
COUNT(oi.id) AS num_righe
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = %(exec_date)s
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""", parameters={"exec_date": execution_date})
# Validazione dati estratti
assert df["importo_totale"].ge(0).all(), "Importi negativi trovati!"
assert df["ordine_id"].is_unique, "Ordini duplicati nell'estrazione!"
# Salva in XCom per passare al task successivo
# Per dataset grandi, usa staging table invece di XCom
context["ti"].xcom_push(key="row_count", value=len(df))
# Carica in tabella staging del DWH
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
dwh_hook.insert_rows(
table="staging.stg_ordini_raw",
rows=df.values.tolist(),
target_fields=df.columns.tolist(),
replace=True,
replace_index=["ordine_id"],
)
logger.info(f"Caricati {len(df)} ordini in staging")
extract_orders_task = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders,
)
def extract_customers(**context):
"""Estrae snapshot clienti aggiornati."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
df_customers = pg_hook.get_pandas_df("""
SELECT id, email, nome, cognome, citta, segmento,
DATE(updated_at) AS data_aggiornamento
FROM customers
WHERE DATE(updated_at) >= CURRENT_DATE - INTERVAL '1 day'
""")
if len(df_customers) > 0:
dwh_hook.insert_rows(
table="staging.stg_clienti_raw",
rows=df_customers.values.tolist(),
target_fields=df_customers.columns.tolist(),
replace=True,
replace_index=["id"],
)
extract_customers_task = PythonOperator(
task_id="extract_customers",
python_callable=extract_customers,
)
# I due extract girano in PARALLELO
[extract_orders_task, extract_customers_task]
# ============ TASK GROUP: TRASFORMAZIONI DBT ============
with TaskGroup("dbt_task_group") as dbt_group:
# dbt run sullo staging layer
dbt_staging = BashOperator(
task_id="dbt_run_staging",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:staging --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
# dbt run sul marts layer (dipende dallo staging)
dbt_marts = BashOperator(
task_id="dbt_run_marts",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:marts --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
# dbt test per validare la qualità
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt test --select tag:staging tag:marts --target prod
""",
)
dbt_staging >> dbt_marts >> dbt_test
# ============ NOTIFICHE ============
notify_success = SlackWebhookOperator(
task_id="notify_success",
slack_webhook_conn_id="slack_data_team",
message="""
:white_check_mark: *Pipeline Vendite Completata*
Data: {{ ds }}
Ordini elaborati: {{ ti.xcom_pull(task_ids='extract_task_group.extract_orders', key='row_count') }}
Durata: {{ dag_run.get_task_instance('dbt_task_group.dbt_test').duration | round(1) }}s
""",
trigger_rule="all_success",
)
notify_no_data = SlackWebhookOperator(
task_id="notify_no_data",
slack_webhook_conn_id="slack_data_team",
message=":information_source: *Pipeline Vendite* - Nessun dato per {{ ds }}, esecuzione saltata",
)
notify_failure = SlackWebhookOperator(
task_id="notify_failure",
slack_webhook_conn_id="slack_data_team",
message=":x: *Pipeline Vendite FALLITA* - Data: {{ ds }} - Controlla Airflow UI!",
trigger_rule="one_failed",
)
# ============ DIPENDENZE DAG ============
check_source >> [extract_group, notify_no_data]
extract_group >> dbt_group >> notify_success
dbt_group >> notify_failure
Ten DAG demonstruje podstawowe wzorce przepływu powietrza: użycie TaskGroup dla
zadania związane z grupą, BranchPythonOperator według logiki warunkowej,
XCom do przekazywania danych pomiędzy zadaniami i wyzwalania reguł zarządzania
powiadomień o sukcesach i porażkach.
Co nowego w Apache Airflow 3.0 (2025)
- Interfejs wykonywania zadań: Nowy interfejs API klient-serwer, który oddziela proces roboczy od serwera API Airflow, poprawiając bezpieczeństwo i skalowalność
- Planowanie uwzględniające zasoby: DAG można teraz uruchamiać poprzez aktualizację zasobu (zestawu danych), a nie tylko przez harmonogram cron
- Wersja DAG: Każde uruchomienie jest powiązane z wersją DAG w momencie uruchomienia, co eliminuje niespójności podczas wdrożeń
- Człowiek w pętli: Airflow 3.1 wprowadza przepływy pracy, które przed kontynuowaniem oczekują na zatwierdzenie przez człowieka
- Interfejs użytkownika reakcji: Całkowicie przeprojektowany interfejs, szybszy i bardziej intuicyjny
- Oddzielny procesor DAG: Analizowanie DAG odbywa się w dedykowanym procesie, co zmniejsza obciążenie programu planującego
Dagster: paradygmat aktywów zdefiniowanych programowo
Dagster, urodzony w 2018 roku w Elementl (obecnie Dagster Labs), wprowadził zmianę paradygmatu radykalny w orkiestracji: zamiast myśleć o zadania do wykonania, myślimy aktywa do wytworzenia. Zasób i wszelkie artefakty danych: tabela w DWH, model ML, plik Parquet, wygenerowany raport.
Podejście to, tzw Zasoby zdefiniowane programowo (SDA), zrewolucjonizował
doświadczenie programisty. W Airflow zdefiniuj „uruchom ten skrypt o 5:00”. W Dagsterze
zdefiniuj „Chcę stół gold.fatturato_mensile jest zawsze aktualny,
i wie, jak się zaktualizować”. Różnica wydaje się subtelna, ale zmienia wszystko: automatyczny rodowód,
proste testowanie, natychmiastowe zrozumienie tego, co istnieje w stosie danych.
W 2025 r. Dagster 1.9 osiągnie dojrzałość wraz z frameworkiem Components (GA w październiku 2025 r.), co umożliwia opisanie całych potoków jako konfiguracji deklaratywnych oraz katalogu Advanced, który zapewnia bezprecedensowy wgląd w stan każdego zasobu w systemie.
Kluczowe koncepcje Dagstera
Terminologia Dagstera
| Pojęcie | Opis | Odpowiednik przepływu powietrza |
|---|---|---|
| Aktywa | Artefakt danych generowany przez potok (tabela, model, plik) | Nie ma bezpośredniego odpowiednika |
| @aktywa | Dekorator Pythona, który definiuje sposób tworzenia zasobu | PythonOperator (bardziej ograniczony) |
| Praca | Wybieranie zasobów/operacji do wspólnego działania | DAG |
| Op | Zadanie ogólne bez jawnych danych wyjściowych | Operator |
| Ja Menedżer | Zarządza sposobem zapisywania i odczytywania zasobów (S3, BigQuery, Snowflake...) | Nie ma odpowiednika |
| Zasoby | Współdzielone połączenia i klienci (baza danych, API) | Połączenie + hak |
| Transduktor | Wyzwalacze sterowane zdarzeniami (odpytywanie systemu plików, API, zdarzenia) | Operator czujnika |
| Harmonogram | Wyzwalacze oparte na czasie dla zadań | Harmonogram DAG |
| Partycje | Podział majątku według daty, kategorii, regionu | PartitionedSchedule (bardziej złożony) |
Przykład kodu: Rurociąg oparty na zasobach z Dagster
Wdrażamy ten sam rurociąg sprzedaży co Airflow, ale z paradygmatem Dagster. Natychmiast zauważysz, jak wyraźna jest zależność między zasobami i jak testowanie staje się proste.
# pipeline_vendite/assets.py
import pandas as pd
from dagster import (
asset,
AssetIn,
AssetExecutionContext,
MetadataValue,
Output,
DailyPartitionsDefinition,
MaterializeResult,
)
from dagster_dbt import dbt_assets, DbtCliResource
from resources import postgres_erp, postgres_dwh, slack_resource
# Partizionamento giornaliero - Dagster gestisce il backfill automaticamente
daily_partitions = DailyPartitionsDefinition(start_date="2025-01-01")
# ============ ASSET 1: ORDINI GREZZI DALL'ERP ============
@asset(
name="raw_ordini",
group_name="ingestione",
partitions_def=daily_partitions,
description="Ordini grezzi estratti dall'ERP PostgreSQL",
metadata={
"source": "ERP PostgreSQL",
"owner": "data-engineering@azienda.it",
},
compute_kind="python",
)
def raw_ordini(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Estrae gli ordini per la partizione corrente."""
partition_date = context.partition_key # "2025-01-15"
df = postgres_erp.execute_query(f"""
SELECT
o.id AS ordine_id,
o.customer_id,
o.created_at AS data_ordine,
o.status,
SUM(oi.qty * oi.unit_price) AS importo_totale
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = '{partition_date}'
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""")
# Metadata allegati all'asset: visibili nella UI Dagster
return Output(
df,
metadata={
"num_records": MetadataValue.int(len(df)),
"importo_totale": MetadataValue.float(df["importo_totale"].sum()),
"preview": MetadataValue.md(df.head(5).to_markdown()),
},
)
# ============ ASSET 2: CLIENTI AGGIORNATI ============
@asset(
name="raw_clienti",
group_name="ingestione",
partitions_def=daily_partitions,
compute_kind="python",
)
def raw_clienti(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Estrae snapshot clienti aggiornati."""
partition_date = context.partition_key
df = postgres_erp.execute_query(f"""
SELECT id, email, nome, cognome, citta, segmento
FROM customers
WHERE DATE(updated_at) = '{partition_date}'
""")
return Output(df, metadata={"num_records": MetadataValue.int(len(df))})
# ============ ASSET 3: ORDINI ARRICCHITI (Silver Layer) ============
@asset(
name="silver_ordini_arricchiti",
group_name="silver",
partitions_def=daily_partitions,
ins={
"raw_ordini": AssetIn("raw_ordini"),
"raw_clienti": AssetIn("raw_clienti"),
},
description="Ordini joinati con info cliente, validati",
compute_kind="python",
)
def silver_ordini_arricchiti(
context: AssetExecutionContext,
raw_ordini: pd.DataFrame,
raw_clienti: pd.DataFrame,
postgres_dwh: PostgresResource,
) -> MaterializeResult:
"""Arricchisce e valida gli ordini."""
df = raw_ordini.merge(
raw_clienti[["id", "citta", "segmento"]],
left_on="customer_id",
right_on="id",
how="left",
)
# Validazioni
assert df["importo_totale"].ge(0).all(), "Importi negativi!"
assert df["ordine_id"].is_unique, "Ordini duplicati!"
# Carica nel DWH
postgres_dwh.load_dataframe(
df=df,
table="silver.ordini_arricchiti",
partition_col="data_ordine",
partition_value=context.partition_key,
)
return MaterializeResult(
metadata={
"num_records": MetadataValue.int(len(df)),
"clienti_senza_match": MetadataValue.int(df["segmento"].isna().sum()),
}
)
# ============ ASSET 4: MODELLI DBT (Gold Layer) ============
# Dagster ha integrazione nativa con dbt via dagster-dbt
@dbt_assets(
manifest=dbt_manifest_path, # manifest.json generato da dbt
select="tag:marts", # Solo i modelli gold
)
def dbt_marts_assets(context: AssetExecutionContext, dbt: DbtCliResource):
"""Esegue i modelli dbt del layer gold."""
yield from dbt.cli(["run", "--select", "tag:marts"], context=context).stream()
yield from dbt.cli(["test", "--select", "tag:marts"], context=context).stream()
# ============ JOB: COMPOSIZIONE ============
# In Dagster, un Job seleziona quali asset materializzare
from dagster import define_asset_job, ScheduleDefinition
pipeline_vendite_job = define_asset_job(
name="pipeline_vendite_giornaliera",
selection=[
"raw_ordini",
"raw_clienti",
"silver_ordini_arricchiti",
"dbt_marts_assets*", # Tutti gli asset dbt marts
],
)
pipeline_vendite_schedule = ScheduleDefinition(
job=pipeline_vendite_job,
cron_schedule="0 5 * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
Natychmiastowa zaleta takiego podejścia jest widoczna: każdy zasób jest wyraźnie deklarowany
Twoje dane wejściowe (AssetIn), Dagster automatycznie tworzy wykres
zależności, a interfejs użytkownika pokazuje pełny rodowód od źródła do produktu końcowego. Testowanie
staje się trywialne, ponieważ każda funkcja jest czystą funkcją Pythona.
Menedżer IO: klucz do integracji Dagster
Menedżer IO w Dagster definiuje sposób pozyskiwania zasobów zapisane i załadowane pomiędzy zadaniami. Dagster zawiera IO Manager dla S3 (Parquet, CSV), Snowflake, BigQuery, DuckDB, Pandy i wiele innych. Można skonfigurować różnych menedżerów IO dla różnych środowisk bez zmienić kod zasobu.
# resources.py - Configurazione Resources e IO Managers
from dagster import EnvVar
from dagster_aws.s3 import S3PickleIOManager, S3Resource
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster_snowflake_pandas import SnowflakePandasIOManager
# Configurazione multi-environment
resources_by_env = {
"dev": {
# In dev: salva su DuckDB locale (costo zero)
"io_manager": DuckDBPandasIOManager(
database="./dev_lakehouse.duckdb",
),
"postgres_erp": PostgresResource(
host="localhost",
port=5432,
database="erp_dev",
),
},
"staging": {
# In staging: usa S3 + Parquet
"io_manager": S3PickleIOManager(
s3_resource=S3Resource(),
s3_bucket="my-staging-lakehouse",
s3_prefix="dagster/",
),
},
"prod": {
# In prod: usa Snowflake
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
database="DWH_PROD",
schema="DAGSTER_ASSETS",
),
},
}
Prefekt 3: Python-First dla programistów
Firma Prefect, założona w 2018 roku, od początku przyjęła inną filozofię: tworzenie orkiestracja tak prosta i Pythoniczna, jak to tylko możliwe. Wersja 3, wydana w 2024 roku i dojrzały w 2025 r., urzeczywistnia tę wizję: dowolną funkcję Pythona może stać się zaaranżowanym zadaniem po dodaniu jednego dekoratora.
Il Model przepływu/zadania Prefect eliminuje większość konfiguracji na wniosek Airflow i Dagster. Nie ma potrzeby jawnego definiowania DAG, nie są potrzebne żadne pliki oddzielnych plików konfiguracyjnych, nie musisz uczyć się zastrzeżonego DSL: napisz zwykły Python a Prefekt zajmie się resztą.
W 2025 roku wprowadzono Prefekta 3 Incydenty prefektów dla zarządzania strukturyzowane przerwy w świadczeniu usług, automatyzacje z wyzwalaczami opartymi na metrykach (nie tylko w przypadku zdarzeń wykonawczych) i natywną integrację z Modalem w celu wykonania skalowalny, bezserwerowy. Prefect Cloud oferuje hojny darmowy plan, który sprawia, że jest to możliwe dostępne nawet dla małych zespołów.
Przykład kodu: Prefekt przepływu dla rurociągu sprzedaży
# flows/pipeline_vendite.py
import pandas as pd
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret
from prefect.filesystems import S3
from prefect_slack import SlackWebhook
from datetime import timedelta, date
from typing import Optional
# ============ TASK (funzioni Python con decoratore) ============
@task(
name="Estrai Ordini ERP",
description="Estrae ordini completati per la data specificata",
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash, # Cache basata su input
cache_expiration=timedelta(hours=1), # Valida per 1 ora
tags=["estrazione", "erp"],
)
def estrai_ordini(data: date) -> pd.DataFrame:
"""Estrae ordini dall'ERP per la data specificata."""
logger = get_run_logger()
# Leggi credenziali da Prefect Blocks (gestione sicura secrets)
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
query = """
SELECT o.id AS ordine_id, o.customer_id,
o.created_at, o.status,
SUM(oi.qty * oi.unit_price) AS importo_totale
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = :data
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
"""
with engine.connect() as conn:
df = pd.read_sql(query, conn, params={"data": str(data)})
logger.info(f"Estratti {len(df)} ordini per {data}")
return df
@task(
name="Estrai Clienti ERP",
retries=2,
retry_delay_seconds=30,
tags=["estrazione", "erp"],
)
def estrai_clienti(data: date) -> pd.DataFrame:
"""Estrae clienti aggiornati."""
logger = get_run_logger()
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df = pd.read_sql("""
SELECT id, email, nome, cognome, citta, segmento
FROM customers
WHERE DATE(updated_at) = :data
""", conn, params={"data": str(data)})
logger.info(f"Estratti {len(df)} clienti aggiornati")
return df
@task(
name="Valida e Arricchisci Ordini",
retries=1,
)
def valida_e_arricchisci(
ordini: pd.DataFrame,
clienti: pd.DataFrame,
) -> pd.DataFrame:
"""Valida e arricchisce gli ordini con dati cliente."""
logger = get_run_logger()
# Validazioni
if ordini["importo_totale"].lt(0).any():
raise ValueError("Trovati importi negativi nel dataset ordini")
if not ordini["ordine_id"].is_unique:
raise ValueError("Trovati ordini duplicati")
# Join
df = ordini.merge(
clienti[["id", "citta", "segmento"]],
left_on="customer_id",
right_on="id",
how="left",
)
match_rate = (1 - df["segmento"].isna().mean()) * 100
logger.info(f"Match rate clienti: {match_rate:.1f}%")
return df
@task(name="Carica nel DWH", retries=2)
def carica_dwh(df: pd.DataFrame, schema: str, table: str):
"""Carica il dataframe nel DWH."""
conn_string = Secret.load("dwh-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df.to_sql(
name=table,
con=conn,
schema=schema,
if_exists="replace",
index=False,
)
return len(df)
@task(name="Esegui dbt", retries=1)
def esegui_dbt(target: str = "prod"):
"""Lancia dbt run e test sui modelli marts."""
import subprocess
result = subprocess.run(
["dbt", "run", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if result.returncode != 0:
raise RuntimeError(f"dbt run fallito:\n{result.stderr}")
test_result = subprocess.run(
["dbt", "test", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if test_result.returncode != 0:
raise RuntimeError(f"dbt test fallito:\n{test_result.stderr}")
# ============ FLOW PRINCIPALE ============
@flow(
name="Pipeline Vendite Giornaliera",
description="Orchestrazione ETL vendite: ERP -> DWH -> dbt marts",
flow_run_name="vendite-{data}", # Nome run dinamico
retries=0, # Retry a livello task, non flow
timeout_seconds=7200, # 2 ore max
log_prints=True,
)
def pipeline_vendite(data: Optional[date] = None):
"""
Flow principale per la pipeline vendite.
Può essere lanciato manualmente con una data specifica
o schedulato per girare ogni giorno.
"""
logger = get_run_logger()
if data is None:
data = date.today()
logger.info(f"Avvio pipeline vendite per {data}")
# Estrazione parallela (Prefect gestisce la concorrenza automaticamente)
# submit() esegue il task in modo asincrono
future_ordini = estrai_ordini.submit(data)
future_clienti = estrai_clienti.submit(data)
# Attende entrambi i task e recupera i risultati
ordini = future_ordini.result()
clienti = future_clienti.result()
if len(ordini) == 0:
logger.warning(f"Nessun ordine per {data}, skip elaborazione")
return {"status": "skipped", "data": str(data)}
# Task sequenziali
df_arricchito = valida_e_arricchisci(ordini, clienti)
num_record = carica_dwh(df_arricchito, schema="silver", table="ordini_arricchiti")
esegui_dbt(target="prod")
logger.info(f"Pipeline completata: {num_record} record elaborati")
return {"status": "success", "records": num_record, "data": str(data)}
# ============ DEPLOYMENT ============
# Prefect 3: il deployment definisce schedule e infrastruttura
if __name__ == "__main__":
from prefect.deployments import DeploymentImage
pipeline_vendite.deploy(
name="prod-giornaliero",
cron="0 5 * * *",
work_pool_name="k8s-work-pool", # Esegui su Kubernetes
image=DeploymentImage(
name="myregistry/pipeline-vendite:latest",
platform="linux/amd64",
),
parameters={}, # Parametri default
)
Bloki prefektów: scentralizowane zarządzanie tajemnicami i konfiguracją
I Prefekt Bloki są to konfigurowalne obiekty przechowywane w Prefekcie backendy (w chmurze lub hostowane samodzielnie), które centralizują zarządzanie połączeniami i sekretami i konfiguracje infrastruktury. Eliminują potrzebę stosowania zmiennych środowiskowych rozproszone w kontenerach i umożliwiają aktualizacje bez ponownego wdrażania kodu.
Porównanie przepływu powietrza, Dagstera i prefekta: co jest lepsze?
Szczera odpowiedź brzmi: to zależy. Każde narzędzie ma idealny przypadek użycia i punkty konkretna siła. Oto ustrukturyzowane porównanie, które pomoże Ci w dokonaniu wyboru.
Szczegółowe porównanie: przepływ powietrza vs Dagster vs Prefect (2025)
| Kryterium | Przepływ powietrza 3.0 | Dagster 1.9 | Prefekt 3 |
|---|---|---|---|
| Paradygmat | Skoncentrowany na zadaniach (DAG) | Skoncentrowany na aktywach (SDA) | Skoncentrowany na zadaniach (najpierw w Pythonie) |
| Krzywa uczenia się | Wysoki (semantyka DAG, dostawcy, haki) | Średnio-wysoki (model aktywów, IO Manager) | Niski (dekorator istniejących funkcji) |
| Pochodzenie danych | Ograniczone (zależności zadań) | Doskonały (natywny, wizualny) | Ograniczona (ostatnio dodana) |
| Doświadczenie programisty (lokalnie) | Dostateczny (Docker skomponowany ciężki) | Znakomicie (programista dagster, lokalny interfejs użytkownika) | Doskonale (instalacja pip, uruchamianie lokalne) |
| Testowanie | Złożony (zależy od infrastruktury) | Doskonały (aktywa i funkcje) | Znakomity (zadania i funkcje) |
| integracja dbt | Dobry (operator dbt Cloud) | Znakomity (dagster-dbt, zasób natywny) | Dobrze (prefekt-dbt) |
| Planowanie | Znakomicie (cron, wydarzenie, zasób, termin) | Znakomicie (cron, zdarzenie, zasób, czujnik) | Dobry (cron, zdarzenie, automatyzacja) |
| Uzupełnienia | Znakomity (zarządzany przez program planujący w wersji 3.0) | Doskonały (natywna partycjonowanie) | Dyskretny (ręczny lub automatyczny) |
| Skalowalność | Wysoki (CeleryExecutor, KubernetesExecutor) | Wysoki (Kubernetes, ECS, Docker) | Wysoka (pule robocze, K8, modalne) |
| Monitorowanie interfejsu użytkownika | Znakomicie (UI React w wersji 3.0) | Znakomity (katalog, wykres rodowodu) | Doskonale (pulpit nawigacyjny Prefect Cloud) |
| Model wdrożenia | Złożony (wykres steru, kompozycja) | Średni (serwer Dagster, K8s) | Proste (.deploy(), pule pracy) |
| Społeczność/ekosystem | Ogromny (ponad 35 tys. gwiazd GitHub, setki dostawców) | Rosnąco (ponad 10 tys. gwiazdek, koncentracja na przedsiębiorstwach) | Duży (ponad 15 tys. gwiazdek, skupienie się na programistach) |
| Zarządzana chmura | Astronom, MWAA, twórca chmur | Dagster Cloud (bezserwerowy + hybrydowy) | Prefect Cloud (obfity darmowy poziom) |
| Koszt własnego hostingu | Open source (infra do zarządzania) | Open source (infra do zarządzania) | Open source (infra do zarządzania) |
| Koszt zarządzany | Astronom od 500 $ miesięcznie | Dagster Cloud od 500 USD miesięcznie | Prefect Cloud – hojny, bezpłatny poziom |
| Idealny dla | Zespół z doświadczeniem Airflow, klasycznymi potokami ETL | Zespół oparty na danych, integracja dbt, potoki ML | Team Python, szybkie prototypowanie, sterowane zdarzeniami |
Przewodnik szybkiego wyboru
- Wybierz Przepływ powietrza, jeśli: Twój zespół ma już umiejętności Airflow, pracujesz ze złożonymi ekosystemami (Spark, Hadoop, wielu dostawców), których potrzebujesz z ogromnego ekosystemu dostępnych operatorów, korzystasz albo z AWS (MWAA), albo z GCP (Cloud Composer), które zarządzają nim natywnie.
- Wybierz Dagstera jeśli: jakość i pochodzenie danych są priorytetami, twój stos zawiera dbt i chcesz głębokiej integracji, do czego zespół jest przyzwyczajony myśl o zasobach, a nie o zadaniach, albo budujesz potoki uczenia maszynowego, gdziekolwiek chcesz śledź każdy artefakt.
- Wybierz Prefekta jeśli: chcesz najniższej krzywej uczenia się, zespół i Python-najpierw bez doświadczenia w orkiestracji, potrzebujesz przepływu pracy sterowane zdarzeniami i operacyjne (nie tylko danymi) lub chcesz szybko zacząć bezpłatna warstwa Prefect Cloud.
Tymczasowe: trwałe wykonanie dla złożonych przepływów pracy
Podczas gdy Airflow, Dagster i Prefect skupiają się na orkiestracji data rurociągi, Czasowy rozwiązuje inny problem: wykonanie wytrzymały długotrwałych przepływów pracy aplikacji, które muszą przetrwać do awarii, ponownego uruchomienia i przerw w sieci.
Opracowany przez byłych inżynierów Ubera (którzy stworzyli Cadence, jego poprzedniczkę), Temporal traktuje każdy przepływ pracy jako jeden trwała funkcja stanowa: jeśli serwer uruchamia się ponownie podczas wykonywania przepływu pracy, wznawia pracę dokładnie tam, gdzie jest wykonywana zatrzymał się, nie tracąc stanu. Jest to niezbędne w przypadku przepływów pracy trwających wiele godzin, dni lub tygodni (np. procesy onboardingu, orkiestracja agentów AI, saga wzór dla transakcji rozproszonych).
Czasowy a przepływ powietrza/Dagster: kiedy wybrać
| Scenariusz | Zalecane narzędzie | Powód |
|---|---|---|
| Codzienne rurociągi ETL | Przepływ powietrza / Dagster / Prefekt | Zoptymalizowany pod kątem wsadowej orkiestracji danych |
| transformacje dbt | Dagster (lepszy) / Przepływ powietrza | Integracja natywna, pochodzenie aktywów |
| Potoki szkoleniowe ML | Dagster / Prefekt | Śledzenie artefaktów, ułatwione testowanie |
| Procesy onboardingu użytkowników (wieloetapowe) | Czasowy | Trwałość, zarządzanie stanem aplikacji |
| Orkiestracja agentów AI/LLM | Czasowy / Prefekt | Długotrwałe, złożone zarządzanie awariami |
| Wzorce Saga / transakcje rozproszone | Czasowy | Transakcje kompensacyjne, trwałość |
| Potok IoT w czasie rzeczywistym | Kafka + Flink (orkiestratorzy nie wsadowi) | Przesyłanie strumieniowe wymaga innej architektury |
Wiele firm stosuje podejście hybrydowe: Airflow lub Dagster do orkiestracji potok danych, Temporal dla przepływów pracy aplikacji wymagających trwałości. Dwa narzędzia doskonale się uzupełniają i nie stanowią bezpośredniej konkurencji.
Architektura referencyjna: orkiestracja w nowoczesnym stosie danych
W jaki sposób koordynator integruje się z resztą stosu danych? Zobaczmy architekturę Kompletne odniesienie, które łączy w sobie wszystkie komponenty, które widzieliśmy w tej serii: Airbyte do spożycia, dbt do transformacji, Apache Iceberg jako format tabeli, i Dagster jako główny orkiestrator.
Architektura stosu danych z Dagsterem w roli orkiestratora
| Warstwy | Część | Narzędzia | Zaaranżowany przez Dagstera? |
|---|---|---|---|
| Przyjmowanie pokarmu | Synchronizacja źródła | Airbyte/Fivetran | Tak (dagster-airbyte) |
| Składowanie | Przechowywanie obiektów + format tabeli | S3 + góra lodowa Apache | Tak (menedżer IO) |
| Transformacja | Deklaratywne modele SQL | dbt rdzeń/chmura | Tak (dagster-dbt, zasób natywny) |
| Jakość | Testowanie i walidacja danych | testy dbt, Wielkie nadzieje | Tak (kontrole aktywów) |
| Porcja | Silnik zapytań | Trino / Atena / DuckDB | Nie (zapytania na żądanie) |
| Analityka | Panele i raporty | Metabaza/tabela | Nie (zużywają dane) |
| ML | Inżynieria funkcji + szkolenie | Python + MLflow | Tak (zasoby ML) |
# definitions.py - Dagster Definitions: composizione del data stack completo
from dagster import Definitions, load_assets_from_modules
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance
from dagster_dbt import DbtCliResource
from dagster_aws.s3 import S3Resource
# Import dei moduli asset
from assets import ingestione, silver, gold, ml_features
# ============ CARICAMENTO ASSET ============
# Asset Airbyte: ogni connessione Airbyte diventa un Dagster asset
airbyte_assets = load_assets_from_airbyte_instance(
airbyte=AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
connection_filter=lambda meta: meta.name.startswith("prod_"),
)
# Asset dbt: ogni modello dbt diventa un Dagster asset con lineage completa
@dbt_assets(
manifest=Path("dbt_project/target/manifest.json"),
)
def dbt_all_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Asset Python custom
python_assets = load_assets_from_modules([ingestione, silver, gold, ml_features])
# ============ RISORSE ============
resources_prod = {
"airbyte": AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
"dbt": DbtCliResource(
project_dir="dbt_project",
profiles_dir="dbt_project",
target="prod",
),
"s3": S3Resource(region_name="eu-west-1"),
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
),
"slack": SlackResource(token=EnvVar("SLACK_BOT_TOKEN")),
}
# ============ COMPOSIZIONE FINALE ============
defs = Definitions(
assets=[
*airbyte_assets,
dbt_all_assets,
*python_assets,
],
resources=resources_prod,
jobs=[pipeline_vendite_job, pipeline_clienti_job, pipeline_ml_job],
schedules=[pipeline_vendite_schedule, pipeline_ml_weekly_schedule],
sensors=[nuovo_file_s3_sensor, airbyte_sync_sensor],
)
Monitorowanie, ostrzeganie i obserwowalność
Orkiestrator bez monitorowania i jak samolot bez przyrządów pokładowych. Monitorowanie rurociągów i mają kluczowe znaczenie dla zapewnienia SLA, diagnozowania problemów i komunikacji status platformy danych dla firmy.
Zalecany stos monitorowania
Narzędzia do monitorowania rurociągów zorganizowanych
| Warstwy | Narzędzia | Co monitoruje |
|---|---|---|
| Wskaźniki infrastruktury | Prometeusz + Grafana | Pracownicy procesora/RAM, kolejka zadań, opóźnienie harmonogramu |
| Metryki rurociągu | Metryki przepływu powietrza / Zdarzenia Dagster / Prefect API | Wskaźnik powodzenia zadania, czas trwania, naruszenia SLA |
| Scentralizowane dzienniki | ELK Stack / CloudWatch / Loki | Dziennik każdej instancji zadania na potrzeby debugowania |
| Alarmowanie | PagerDuty/OpsGenie | Eskalacja w przypadku awarii krytycznych, naruszenia SLA |
| Powiadomienia zespołu | Slack/Microsoft Teams | Powiadomienia o sukcesie/porażce, raport dzienny |
| Jakość danych | Testy podstawowe / dbt / Wielkie oczekiwania | Anomalie danych, świeżość, liczba wierszy |
# monitoring/alerting_setup.py
# Configurazione alerting Airflow con Slack e PagerDuty
from airflow.hooks.base import BaseHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime
def task_failure_alert(context):
"""
Callback chiamato su ogni task failure.
Inviamo notifica Slack con dettagli del fallimento.
"""
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
run_id = context["run_id"]
execution_date = context["execution_date"]
log_url = context["task_instance"].log_url
exception = context.get("exception", "N/A")
message = f"""
:x: *Task Fallito*
*DAG:* {dag_id}
*Task:* {task_id}
*Run ID:* {run_id}
*Data:* {execution_date.strftime('%Y-%m-%d %H:%M')}
*Errore:* `{str(exception)[:200]}`
*Log:* <{log_url}|Vedi Log Airflow>
"""
slack_op = SlackWebhookOperator(
task_id="slack_alert",
slack_webhook_conn_id="slack_data_ops",
message=message,
channel="#data-ops-alerts",
dag=context["dag"],
)
slack_op.execute(context)
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Callback per SLA breach: invia alert critico su PagerDuty."""
import requests
sla_tasks = ", ".join([f"{s.task_id}" for s in slas])
message = f"SLA BREACH su DAG {dag.dag_id}: task {sla_tasks} non completati in tempo"
# PagerDuty Events API
payload = {
"routing_key": "YOUR_PAGERDUTY_INTEGRATION_KEY",
"event_action": "trigger",
"payload": {
"summary": message,
"severity": "critical",
"source": "airflow",
"component": dag.dag_id,
"custom_details": {
"sla_tasks": sla_tasks,
"dag_id": dag.dag_id,
},
},
}
requests.post("https://events.pagerduty.com/v2/enqueue", json=payload)
# Utilizzo nel DAG
with DAG(
dag_id="pipeline_critica",
default_args={
"on_failure_callback": task_failure_alert, # Per ogni task
},
sla_miss_callback=sla_miss_callback, # Per SLA violations
schedule="0 5 * * *",
# ...
) as dag:
task_etl = PythonOperator(
task_id="etl_principale",
python_callable=esegui_etl,
sla=timedelta(hours=1), # Deve completare entro 1 ora
)
Pulpit nawigacyjny Grafana dla wskaźników rurociągu
Airflow udostępnia metryki poprzez StatsD (lub bezpośrednio Prometheus za pomocą airflow-prometheus-exporter). Oto najważniejsze wskaźniki do monitorowania w dashboardzie Grafana:
Wskaźniki przepływu powietrza do monitorowania w Grafanie
- przepływ.harmonogram.zadania.głodny: Zadania w kolejce bez dostępnych procesów roboczych (wskazuje niewystarczającą skalowalność)
- przepływ.dagrun.czas trwania.success: Rozkład czasu trwania DAG (percentyl P95 jako wskaźnik SLA)
- airflow.task_instance.failures: Trend awarii według zadania (anomalie wskazują problemy źródłowe)
- harmonogram.przepływu powietrza.heartbeat: Częstotliwość pulsu harmonogramu (jeśli się zatrzyma, nie zaplanowano żadnych zadań)
- airflow.pool.open_slots: Wolne miejsca w pulach (wskazuje nasycenie)
- airflow.dagrun.first_task_scheduling_delay: Opóźnienie między harmonogramem a rozpoczęciem pierwszego zadania
Najlepsze praktyki dotyczące wytrzymałych rurociągów w produkcji
1. Idempotencja: złota zasada
Każde zadanie musi takie być idempotentny: uruchom go kilka razy z tymi samymi parametry muszą dawać ten sam wynik. Ta właściwość jest fundamentalna, ponieważ ponowne próby (automatyczne lub ręczne) są nieuniknione w produkcji.
# SBAGLIATO: non idempotente (doppio insert)
def carica_dati_wrong(**context):
df = estrai_dati()
pg_hook.insert_rows("staging.ordini", df.values.tolist()) # APPEND!
# CORRETTO: idempotente (DELETE + INSERT per la data specifica)
def carica_dati_correct(**context):
execution_date = context["ds"] # "2025-01-15"
df = estrai_dati(execution_date)
with pg_hook.get_conn() as conn:
with conn.cursor() as cur:
# 1. Elimina i dati esistenti per questa partizione
cur.execute(
"DELETE FROM staging.ordini WHERE DATE(data_ordine) = %s",
[execution_date]
)
# 2. Inserisci i nuovi dati
execute_values(cur, """
INSERT INTO staging.ordini (ordine_id, data_ordine, importo)
VALUES %s
""", df[["ordine_id", "data_ordine", "importo"]].values.tolist())
conn.commit()
# OPPURE: usa INSERT ... ON CONFLICT per UPSERT atomico
def carica_dati_upsert(**context):
execution_date = context["ds"]
df = estrai_dati(execution_date)
pg_hook.insert_rows(
table="staging.ordini",
rows=df.values.tolist(),
target_fields=["ordine_id", "data_ordine", "importo"],
replace=True,
replace_index=["ordine_id"], # PK: upsert su ordine_id
)
2. Zasady ponawiania prób i wykładnicze wycofywanie
Zawsze ustalaj rozsądne zasady ponawiania prób. Przejściowe awarie (przekroczenia limitu czasu sieci, blokada bazy danych, usługi tymczasowo niedostępne) są często rozwiązywane za pomocą a spróbuj ponownie po kilku minutach. Wykładnicze wycofywanie pozwala uniknąć przeciążenia istniejących systemów w trudnej sytuacji.
# Retry policy consigliata per diversi tipi di task
from datetime import timedelta
# Task di estrazione da API esterne: retry aggressivi con backoff
default_args_api = {
"retries": 5,
"retry_delay": timedelta(minutes=1),
"retry_exponential_backoff": True, # 1, 2, 4, 8, 16 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(minutes=30),
}
# Task di trasformazione dbt: retry conservativi
default_args_dbt = {
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
# Task di notifica: no retry (evita spam di alert)
default_args_notify = {
"retries": 0,
"execution_timeout": timedelta(minutes=2),
}
3. Zarządzanie zależnościami za pomocą czujników
Nie zawsze możliwe jest zaplanowanie rurociągu o ustalonym czasie. Czasami trzeba poczekać że plik dociera do S3, że kończy się inny potok lub że dane zewnętrzne są dostępne. TO Operator czujnika of Airflow (i Dagster) są rozwiązaniem.
# Sensor: attendi che il file di input sia disponibile prima di procedere
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
with DAG("pipeline_dipendente", ...) as dag:
# Attendi file S3 (polling ogni 5 minuti, max 2 ore)
wait_for_file = S3KeySensor(
task_id="wait_for_input_file",
bucket_name="my-data-lake",
bucket_key="inputs/vendite_{{ ds_nodash }}.csv",
aws_conn_id="aws_default",
poke_interval=300, # Controlla ogni 5 min
timeout=7200, # Timeout dopo 2 ore
mode="reschedule", # Libera il worker mentre aspetta
)
# Attendi che un'altra DAG abbia finito
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_erp_sync",
external_dag_id="sincronizzazione_erp",
external_task_id="verify_sync_complete",
allowed_states=["success"],
failed_states=["failed", "skipped"],
poke_interval=120,
timeout=3600,
mode="reschedule",
)
estrai = PythonOperator(task_id="estrai_dati", ...)
# Pipeline partira solo quando entrambi i sensor sono verdi
[wait_for_file, wait_for_upstream] >> estrai
Anty-wzór: typowe błędy, których należy unikać
- Zadanie ze zmiennym stanem globalnym: Nie używaj zmiennych globalnych Pythona w DAG-ach. DAG jest często analizowany przez program planujący; zmienne globalne wywołać nieprzewidywalne zachowanie. Użyj XCom, Variables lub TaskFlow API.
- Logika biznesowa w plikach DAG: Pliki DAG muszą zawierać tylko rozszerzenie Definicja orkiestracji. Logika biznesowa jest podzielona na osobne moduły Pythona, importowane z zadań. Ułatwia testowanie i skraca czas analizy.
- XCom dla dużych zbiorów danych: XCom jest przeznaczony dla małych metadanych (liczba, ścieżki plików, flagi). Nie używaj go do przekazywania ramek danych między zadaniami: użyj tabele pomostowe lub pliki S3.
- Rurociągi monolityczne bez granulacji: Jedno zadanie, które załatwia wszystko (wyodrębnienie + transformacja + ładowanie) uniemożliwia debugowanie. Każda operacja logicznie odrębne, musi być odrębnym zadaniem.
-
Catchup=True w przypadku napiętych harmonogramów: Jeśli włączysz nadrabianie zaległości na a
rurociąg z harmonogramem godzinowym i włączysz go po 30-dniowej przerwie, uruchomisz 720
egzekucje w tym samym czasie. Używaj zawsze
catchup=Falsedomyślnie i zarządzaj jawnie uzupełnianiem. - Zakodowane na stałe sekrety: Nigdy nie wprowadzaj hasła, klucza API ani połączenia ciąg bezpośrednio do kodu. Skorzystaj z połączeń przepływu powietrza, zasobów Dagster lub Bloki prefektów zintegrowane z Menedżerem sekretów AWS lub Vault HashiCorp.
4. Testowanie rurociągów
Nieprzetestowany rurociąg i zabezpieczony dług techniczny. Oto jak ustrukturyzować testy dla Airflow DAG i dla zasobu Dagster.
# tests/test_pipeline_vendite.py
import pytest
from unittest.mock import MagicMock, patch
from datetime import date
import pandas as pd
# ============ TEST AIRFLOW ============
from airflow.models import DagBag
def test_dag_integrity():
"""Verifica che i DAG siano validi e non abbiano cicli."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
assert len(dagbag.import_errors) == 0, \
f"Errori di import: {dagbag.import_errors}"
dag = dagbag.get_dag("pipeline_vendite_giornaliera")
assert dag is not None, "DAG non trovato"
assert dag.schedule == "0 5 * * *"
def test_dag_task_count():
"""Verifica il numero di task nel DAG."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("pipeline_vendite_giornaliera")
assert len(dag.tasks) >= 5, "Troppo pochi task nel DAG"
# ============ TEST DAGSTER ============
from dagster import materialize, build_asset_context
from assets import raw_ordini, silver_ordini_arricchiti
def test_raw_ordini_asset():
"""Test dell'asset raw_ordini con mock delle risorse."""
mock_df = pd.DataFrame({
"ordine_id": [1, 2, 3],
"customer_id": [101, 102, 103],
"importo_totale": [150.0, 89.99, 220.5],
"data_ordine": ["2025-01-15"] * 3,
})
with patch("assets.ingestione.PostgresResource") as mock_pg:
mock_pg.return_value.execute_query.return_value = mock_df
result = materialize(
[raw_ordini],
resources={"postgres_erp": mock_pg.return_value},
partition_key="2025-01-15",
)
assert result.success
asset_value = result.output_for_node("raw_ordini")
assert len(asset_value) == 3
assert (asset_value["importo_totale"] >= 0).all()
# ============ TEST PREFECT ============
from flows.pipeline_vendite import estrai_ordini, valida_e_arricchisci
from prefect.testing.utilities import prefect_test_harness
def test_valida_e_arricchisci():
"""Test del task di validazione con dati di test."""
ordini = pd.DataFrame({
"ordine_id": [1, 2],
"customer_id": [101, 102],
"importo_totale": [100.0, 200.0],
})
clienti = pd.DataFrame({
"id": [101, 102],
"citta": ["Milano", "Roma"],
"segmento": ["Premium", "Standard"],
})
with prefect_test_harness():
result = valida_e_arricchisci.fn(ordini, clienti)
assert len(result) == 2
assert "segmento" in result.columns
assert result["segmento"].isna().sum() == 0
def test_valida_importi_negativi():
"""Test che importi negativi generino eccezione."""
ordini = pd.DataFrame({
"ordine_id": [1],
"importo_totale": [-50.0], # NEGATIVO
"customer_id": [101],
})
clienti = pd.DataFrame({"id": [101], "citta": ["Milano"], "segmento": ["Standard"]})
with prefect_test_harness():
with pytest.raises(ValueError, match="Trovati importi negativi"):
valida_e_arricchisci.fn(ordini, clienti)
Wnioski i zalecenia
Orkiestracja i szkielet każdego dojrzałego stosu danych. Bez orkiestratora niezawodne, nawet najbardziej wyrafinowane rurociągi stają się kruche, nieprzezroczyste i trudne do ułożenia zarządzać. Zbadaliśmy trzy główne instrumenty rynku 2025 i zobaczyliśmy, jak to zrobić każdy odpowiada na inne potrzeby.
Zalecenia dotyczące scenariusza
- PMI od teraz: Ty wybierasz Prefekt 3 z Prefect Cloud poziom darmowy. Czas osiągnięcia wartości jest minimalny, krzywa uczenia się jest krótka i możesz zdać do większej liczby rozwiązań dla przedsiębiorstw, gdy wymaga tego złożoność.
- Zespół ze stosem dbt: Dagster i naturalny wybór. Natywna integracja z dbt poprzez dagster-dbt i automatyczne powiązanie między modelami dbt i Asset Dagster znacznie zmniejszają obciążenie poznawcze zespołu.
- Przedsiębiorstwo z istniejącymi umiejętnościami w zakresie przepływu powietrza: Przepływ powietrza 3.0 na Astronomer (zarządzany) lub MWAA. Nie ma powodu do migracji, jeśli zespół jest już produktywny na Airflow, a wersja 3.0 wypełniła większość luki w stosunku do Dagster i Prefect.
- Długotrwałe przepływy pracy aplikacji: Rozważać Czasowy w połączeniu z Twoim ulubionym koordynatorem danych. Są to narzędzia uzupełniające, nie alternatywy.
Następny artykuł z serii poświęcony jest temu tematowiSztuczna inteligencja zastosowana w produkcji: zobaczymy, jak budować systemy konserwacji predykcyjnej z czujnikami IoT, dla Apache Kafka strumieniowe przesyłanie danych maszynowych oraz modele uczenia maszynowego umożliwiające przewidywanie awarii, zanim one wystąpią. Konkretny przypadek użycia, który pokazuje, w jaki sposób tworzony przez nas stos danych staje się fundament sztucznej inteligencji w firmie.
Lista kontrolna dotycząca wprowadzenia orkiestratora do produkcji
- Wszystkie zadania są idempotentne: ich ponowne uruchomienie nie powoduje utworzenia duplikatów
- Zasady ponawiania skonfigurowane z wykładniczym wycofywaniem dla każdego zadania
- Sekrety zarządzane poprzez połączenia/bloki/zasoby, a nie zakodowane na stałe
- Aktywne monitorowanie za pomocą Grafany lub odpowiednika dla wskaźników infrastruktury
- Alerty na Slacku lub PagerDuty w przypadku awarii i naruszeń SLA
- Testowanie integralności DAG/zasobów w rurociągu CI/CD
- Przechwytywanie domyślnie wyłączone, uzupełnianie obsługiwane jawnie
- Oddziel logikę biznesową od plików definicji potoku
- Scentralizowane logi dostępne bez bezpośredniego dostępu do pracowników
- Dokumentacja wbudowana: każdy DAG/zasób ma jasny opis
Przydatne linki do dalszych informacji
- Poprzedni artykuł: ETL kontra nowoczesny ELT: dbt, Airbyte i Fivetran
- Następny artykuł: Sztuczna inteligencja w produkcji: konserwacja predykcyjna i cyfrowy bliźniak
- Powiązane serie: MLOps for Business - Modele AI w produkcji z MLflow
- Powiązane serie: LLM w biznesie: przedsiębiorstwo RAG, dostrajanie i poręcze







