Pipeline Orchestration: Airflow, Dagster and Prefect
Imagine having a flawless data stack: a lakehouse on Apache Iceberg, elegant transformations in dbt, Airbyte connectors syncing data from a dozen sources. Everything perfect, until someone asks: "But who runs all of this? Who guarantees that the ingestion job starts before the transformations? Who alerts you when something breaks at 3 AM?" The answer is the pipeline orchestrator.
Orchestration is the invisible glue of the data stack: the component that coordinates the execution of hundreds of interdependent tasks, manages retries on failure, monitors pipeline health, and produces the audit trail the business requires. Without an orchestrator, data pipelines are cron scripts held together by hope.
In 2025, the orchestration landscape is dominated by three main platforms: Apache Airflow 3.0, the veteran that defined the very concept of DAG orchestration; Dagster 1.x, the challenger that introduced software-defined assets; and Prefect 3, the Python-first solution built for developer experience. In this article we will analyze all three in depth, with real code and an honest comparison to help you choose the right one for your context.
What You Will Learn in This Article
- Apache Airflow 3.0 internal architecture: Scheduler, Executor, Worker, Metadata DB
- How to write a complete Python DAG for an end-to-end ETL pipeline
- Dagster's software-defined assets paradigm and why it changes how you think about pipelines
- Prefect 3's Flow/Task model and its simplified deployment model
- Detailed comparison between the three tools with a feature matrix for each use case
- Temporal as an alternative for durable and long-running workflows
- Reference architecture for integrating orchestration with dbt, Airbyte, and a data lakehouse
- Monitoring, alerting, and best practices for idempotent pipelines in production
Articles in the Data Warehouse, AI and Digital Transformation Series
| # | Article | Focus |
|---|---|---|
| 1 | Data Warehouse Evolution: from SQL Server to Data Lakehouse | Architectures and platforms |
| 2 | Data Mesh and Decentralized Architecture | Governance and ownership |
| 3 | Modern ETL vs ELT: dbt, Airbyte and Fivetran | Transformation pipelines |
| 4 | You are here - Pipeline Orchestration | Airflow, Dagster, Prefect |
| 5 | AI in Manufacturing: Predictive Maintenance | IoT, ML, Digital Twin |
| 6 | AI in Finance: Fraud Detection and Credit Scoring | Real-time ML |
| 7 | AI in Retail: Demand Forecasting and Recommendations | Applied ML |
Why Orchestration is Fundamental
Before diving into technical details, it is worth understanding exactly what problem an orchestrator solves. A typical data pipeline at a mid-to-large company includes:
- Extraction from CRM, ERP, transactional databases, external APIs (10-50 sources)
- Loading into the data lake (Airflow triggers Airbyte or Fivetran)
- dbt transformations (30-200 models with complex dependencies)
- Data mart and aggregated table updates
- Exports to BI tools (Metabase, Tableau, Power BI)
- ML model updates (feature engineering + retraining)
Each of these steps has dependencies (B cannot start before A completes), SLAs (reports must be ready by 8:00 AM), and quality requirements (if source data is empty, do not launch transformations). Managing all of this with separate cron scripts is a guaranteed disaster.
What a Pipeline Orchestrator Does
| Capability | Description |
|---|---|
| Dependency Management | Defines task execution order and manages cross-pipeline dependencies |
| Scheduling | Cron, event-driven, and data-aware scheduling (trigger on asset update) |
| Retry & Error Handling | Automatic retries with exponential backoff, partial failure management |
| Parallelism | Parallel execution of independent tasks to optimize run time |
| Monitoring | Centralized dashboard, per-task logs, SLA monitoring, alerting |
| Backfill | Re-execution of historical runs when pipeline logic changes |
| Audit Trail | Complete execution history for compliance and debugging |
| Parameterization | Variable configuration for environments (dev, staging, prod) and manual runs |
Apache Airflow 3.0: The Veteran Reinvented
Created by Airbnb in 2014 and donated to the Apache Software Foundation in 2016, Apache Airflow became the de facto standard for data pipeline orchestration. With over 35,000 GitHub stars and a community of hundreds of contributors, Airflow is used by thousands of companies worldwide, from startups to large enterprises.
In 2025, Airflow underwent its most significant evolution with the release of version 3.0, which introduces a client-server architecture via the Task Execution Interface, native asset-aware scheduling (inspired by Dagster), DAG versioning, and a completely redesigned UI. It is no longer just a job scheduler: it is a modern orchestration platform.
Apache Airflow Architecture
Understanding Airflow's architecture is essential to deploy it correctly and diagnose production issues. There are five main components:
Airflow Architectural Components
| Component | Role | Technology |
|---|---|---|
| Webserver | Web UI for monitoring, debugging, manual DAG triggers | Flask + Gunicorn (2.x), FastAPI (3.0) |
| Scheduler | Parses DAGs, schedules tasks, pushes them into the executor queue | Python daemon, HA with multiple instances |
| Executor | Runs tasks: LocalExecutor (single node), CeleryExecutor, KubernetesExecutor | Celery + Redis/RabbitMQ, or K8s |
| Worker | Actually processes tasks (only with CeleryExecutor/KubernetesExecutor) | Celery worker or K8s Pod |
| Metadata Database | State of all DAG runs, task instances, variables, connections | PostgreSQL (recommended in prod), MySQL |
| DAG Processor | New in Airflow 3.0: separate DAG parser from the scheduler | Python process pool |
The DAG Concept
The heart of Airflow is the DAG (Directed Acyclic Graph): a directed graph with no cycles that defines the execution order of tasks. Each node in the graph is a task, each edge is a dependency. The graph must be acyclic (no loops), so circular dependencies are not allowed.
Tasks are implemented through Operators: Python classes that abstract a type of work. Airflow includes hundreds of built-in operators (PythonOperator, BashOperator, PostgresOperator, SparkSubmitOperator, dbtCloudOperator...) and the community publishes many more via provider packages.
Code Example: Complete ETL DAG for a Sales Pipeline
Let us look at a complete, realistic DAG that orchestrates an ETL pipeline: extraction from PostgreSQL, staging load, dbt transformation, and team notification.
# dags/sales_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.task_group import TaskGroup
import pandas as pd
import logging
logger = logging.getLogger(__name__)
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"email": ["data-team@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True, # 5, 10, 20 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(hours=2),
}
with DAG(
dag_id="daily_sales_pipeline",
default_args=default_args,
description="Extract sales from ERP, load to DWH, aggregate with dbt",
schedule="0 5 * * *", # Every day at 05:00 UTC
start_date=datetime(2025, 1, 1),
catchup=False,
max_active_runs=1,
tags=["sales", "etl", "production"],
) as dag:
# ============ TASK 1: CHECK SOURCE DATA AVAILABILITY ============
def check_source_data(**context):
"""Verifies that data exists for the execution window."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
execution_date = context["ds"]
count = pg_hook.get_first("""
SELECT COUNT(*)
FROM orders
WHERE DATE(created_at) = %s
""", parameters=[execution_date])[0]
logger.info(f"Found {count} orders for {execution_date}")
if count == 0:
logger.warning("No data found, skipping pipeline")
return "notify_no_data"
return "extract_task_group.extract_orders"
check_source = BranchPythonOperator(
task_id="check_source_data",
python_callable=check_source_data,
)
# ============ TASK GROUP: EXTRACTION ============
with TaskGroup("extract_task_group") as extract_group:
def extract_orders(**context):
"""Extracts orders from ERP and saves them to staging."""
execution_date = context["ds"]
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
df = pg_hook.get_pandas_df("""
SELECT
o.id AS order_id,
o.customer_id,
o.created_at AS order_date,
o.status,
SUM(oi.qty * oi.unit_price * (1 - COALESCE(oi.discount, 0)))
AS total_amount,
COUNT(oi.id) AS line_count
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = %(exec_date)s
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""", parameters={"exec_date": execution_date})
assert df["total_amount"].ge(0).all(), "Negative amounts found!"
assert df["order_id"].is_unique, "Duplicate orders in extraction!"
context["ti"].xcom_push(key="row_count", value=len(df))
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
dwh_hook.insert_rows(
table="staging.stg_orders_raw",
rows=df.values.tolist(),
target_fields=df.columns.tolist(),
replace=True,
replace_index=["order_id"],
)
logger.info(f"Loaded {len(df)} orders into staging")
extract_orders_task = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders,
)
def extract_customers(**context):
"""Extracts updated customer snapshot."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
df_customers = pg_hook.get_pandas_df("""
SELECT id, email, first_name, last_name, city, segment,
DATE(updated_at) AS update_date
FROM customers
WHERE DATE(updated_at) >= CURRENT_DATE - INTERVAL '1 day'
""")
if len(df_customers) > 0:
dwh_hook.insert_rows(
table="staging.stg_customers_raw",
rows=df_customers.values.tolist(),
target_fields=df_customers.columns.tolist(),
replace=True,
replace_index=["id"],
)
extract_customers_task = PythonOperator(
task_id="extract_customers",
python_callable=extract_customers,
)
# Both extractions run IN PARALLEL
[extract_orders_task, extract_customers_task]
# ============ TASK GROUP: DBT TRANSFORMATIONS ============
with TaskGroup("dbt_task_group") as dbt_group:
dbt_staging = BashOperator(
task_id="dbt_run_staging",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:staging --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
dbt_marts = BashOperator(
task_id="dbt_run_marts",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:marts --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt test --select tag:staging tag:marts --target prod
""",
)
dbt_staging >> dbt_marts >> dbt_test
# ============ NOTIFICATIONS ============
notify_success = SlackWebhookOperator(
task_id="notify_success",
slack_webhook_conn_id="slack_data_team",
message="""
:white_check_mark: *Sales Pipeline Completed*
Date: {{ ds }}
Orders processed: {{ ti.xcom_pull(task_ids='extract_task_group.extract_orders', key='row_count') }}
""",
trigger_rule="all_success",
)
notify_no_data = SlackWebhookOperator(
task_id="notify_no_data",
slack_webhook_conn_id="slack_data_team",
message=":information_source: *Sales Pipeline* - No data for {{ ds }}, execution skipped",
)
notify_failure = SlackWebhookOperator(
task_id="notify_failure",
slack_webhook_conn_id="slack_data_team",
message=":x: *Sales Pipeline FAILED* - Date: {{ ds }} - Check Airflow UI!",
trigger_rule="one_failed",
)
# ============ DAG DEPENDENCIES ============
check_source >> [extract_group, notify_no_data]
extract_group >> dbt_group >> notify_success
dbt_group >> notify_failure
This DAG demonstrates fundamental Airflow patterns: using TaskGroup to group
related tasks, BranchPythonOperator for conditional logic, XCom
for passing data between tasks, and trigger rules for handling success and failure notifications.
What's New in Apache Airflow 3.0 (2025)
- Task Execution Interface: New client-server API that decouples the worker from the Airflow API server, improving security and scalability
- Asset-Aware Scheduling: DAGs can now be triggered by asset updates, not just cron schedules
- DAG Versioning: Each run is linked to the DAG version at launch time, eliminating inconsistencies during deploys
- Human-in-the-Loop: Airflow 3.1 introduces workflows that wait for human approval before proceeding
- React UI: Completely redesigned interface, faster and more intuitive
- Separate DAG Processor: DAG parsing happens in a dedicated process, reducing scheduler load
Dagster: The Software-Defined Assets Paradigm
Dagster, born in 2018 from Elementl (now Dagster Labs), introduced a radical paradigm shift in orchestration: instead of thinking about tasks to execute, you think about assets to produce. An asset is any data artifact: a table in the DWH, an ML model, a Parquet file, a generated report.
This approach, called Software-Defined Assets (SDA), has revolutionized
the developer experience. In Airflow you define "run this script at 5 AM". In Dagster you
define "I want the gold.monthly_revenue table to always be up-to-date, and it
knows how to update itself." The difference seems subtle but changes everything: automatic
lineage, simple testing, immediate understanding of what exists in the data stack.
In 2025, Dagster 1.9 reached maturity with the Components framework (GA in October 2025), which allows describing entire pipelines as declarative configurations, and the advanced catalog that provides unprecedented visibility into the state of every asset in the system.
Dagster Key Concepts
Dagster Terminology
| Concept | Description | Airflow Equivalent |
|---|---|---|
| Asset | Data artifact the pipeline produces (table, model, file) | No direct equivalent |
| @asset | Python decorator defining how to produce an asset | PythonOperator (more limited) |
| Job | Selection of assets/ops to execute together | DAG |
| Op | Generic task with no explicit data output | Operator |
| IO Manager | Handles how assets are saved and read (S3, BigQuery, Snowflake...) | No equivalent |
| Resource | Shared connections and clients (databases, APIs) | Connection + Hook |
| Sensor | Event-driven trigger (polling filesystem, API, events) | Sensor operator |
| Partition | Asset division by date, category, region | PartitionedSchedule (more complex) |
Code Example: Asset-Based Pipeline with Dagster
# pipeline_sales/assets.py
import pandas as pd
from dagster import (
asset,
AssetIn,
AssetExecutionContext,
MetadataValue,
Output,
DailyPartitionsDefinition,
MaterializeResult,
)
from dagster_dbt import dbt_assets, DbtCliResource
daily_partitions = DailyPartitionsDefinition(start_date="2025-01-01")
# ============ ASSET 1: RAW ORDERS FROM ERP ============
@asset(
name="raw_orders",
group_name="ingestion",
partitions_def=daily_partitions,
description="Raw orders extracted from ERP PostgreSQL",
metadata={
"source": "ERP PostgreSQL",
"owner": "data-engineering@company.com",
},
compute_kind="python",
)
def raw_orders(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Extracts orders for the current partition."""
partition_date = context.partition_key # "2025-01-15"
df = postgres_erp.execute_query(f"""
SELECT
o.id AS order_id,
o.customer_id,
o.created_at AS order_date,
o.status,
SUM(oi.qty * oi.unit_price) AS total_amount
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = '{partition_date}'
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""")
return Output(
df,
metadata={
"num_records": MetadataValue.int(len(df)),
"total_amount": MetadataValue.float(df["total_amount"].sum()),
"preview": MetadataValue.md(df.head(5).to_markdown()),
},
)
# ============ ASSET 2: ENRICHED ORDERS (Silver Layer) ============
@asset(
name="silver_enriched_orders",
group_name="silver",
partitions_def=daily_partitions,
ins={
"raw_orders": AssetIn("raw_orders"),
"raw_customers": AssetIn("raw_customers"),
},
description="Orders joined with customer info, validated",
compute_kind="python",
)
def silver_enriched_orders(
context: AssetExecutionContext,
raw_orders: pd.DataFrame,
raw_customers: pd.DataFrame,
postgres_dwh: PostgresResource,
) -> MaterializeResult:
"""Enriches and validates orders."""
df = raw_orders.merge(
raw_customers[["id", "city", "segment"]],
left_on="customer_id",
right_on="id",
how="left",
)
assert df["total_amount"].ge(0).all(), "Negative amounts found!"
assert df["order_id"].is_unique, "Duplicate orders found!"
postgres_dwh.load_dataframe(
df=df,
table="silver.enriched_orders",
partition_col="order_date",
partition_value=context.partition_key,
)
return MaterializeResult(
metadata={
"num_records": MetadataValue.int(len(df)),
"unmatched_customers": MetadataValue.int(df["segment"].isna().sum()),
}
)
# ============ ASSET 3: DBT MODELS (Gold Layer) ============
@dbt_assets(
manifest=dbt_manifest_path,
select="tag:marts",
)
def dbt_marts_assets(context: AssetExecutionContext, dbt: DbtCliResource):
"""Runs dbt gold layer models."""
yield from dbt.cli(["run", "--select", "tag:marts"], context=context).stream()
yield from dbt.cli(["test", "--select", "tag:marts"], context=context).stream()
# ============ JOB AND SCHEDULE ============
from dagster import define_asset_job, ScheduleDefinition, DefaultScheduleStatus
sales_pipeline_job = define_asset_job(
name="daily_sales_pipeline",
selection=[
"raw_orders",
"raw_customers",
"silver_enriched_orders",
"dbt_marts_assets*",
],
)
sales_pipeline_schedule = ScheduleDefinition(
job=sales_pipeline_job,
cron_schedule="0 5 * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
IO Manager: the Key to Dagster Integration
A Dagster IO Manager defines how assets are saved and loaded between tasks. Dagster includes IO Managers for S3 (Parquet, CSV), Snowflake, BigQuery, DuckDB, Pandas, and many more. You can configure different IO Managers for different environments without changing asset code.
Prefect 3: Python-First for Developer Experience
Prefect, founded in 2018, embraced a different philosophy from the start: make orchestration as simple and Pythonic as possible. Version 3, released in 2024 and matured in 2025, brings this vision to its fulfillment: any Python function can become an orchestrated task with the addition of a single decorator.
Prefect 3's Flow/Task model eliminates most of the configuration required by Airflow and Dagster. No need to explicitly define DAGs, no separate configuration files, no proprietary DSL to learn: write normal Python and Prefect handles the rest.
In 2025, Prefect 3 introduced Prefect Incidents for structured service disruption management, automations with metrics-based triggers (not just execution events), and native Modal integration for scalable serverless execution. Prefect Cloud offers a generous free tier making it accessible even to small teams.
Code Example: Prefect Flow for Sales Pipeline
# flows/sales_pipeline.py
import pandas as pd
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret
from datetime import timedelta, date
from typing import Optional
@task(
name="Extract Orders ERP",
description="Extracts completed orders for the specified date",
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
tags=["extraction", "erp"],
)
def extract_orders(execution_date: date) -> pd.DataFrame:
"""Extracts orders from ERP for the specified date."""
logger = get_run_logger()
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df = pd.read_sql("""
SELECT o.id AS order_id, o.customer_id,
o.created_at, o.status,
SUM(oi.qty * oi.unit_price) AS total_amount
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = :dt
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""", conn, params={"dt": str(execution_date)})
logger.info(f"Extracted {len(df)} orders for {execution_date}")
return df
@task(
name="Validate and Enrich Orders",
retries=1,
)
def validate_and_enrich(
orders: pd.DataFrame,
customers: pd.DataFrame,
) -> pd.DataFrame:
"""Validates and enriches orders with customer data."""
logger = get_run_logger()
if orders["total_amount"].lt(0).any():
raise ValueError("Negative amounts found in orders dataset")
if not orders["order_id"].is_unique:
raise ValueError("Duplicate orders found")
df = orders.merge(
customers[["id", "city", "segment"]],
left_on="customer_id",
right_on="id",
how="left",
)
match_rate = (1 - df["segment"].isna().mean()) * 100
logger.info(f"Customer match rate: {match_rate:.1f}%")
return df
@task(name="Load to DWH", retries=2)
def load_to_dwh(df: pd.DataFrame, schema: str, table: str) -> int:
"""Loads the dataframe into the DWH."""
conn_string = Secret.load("dwh-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df.to_sql(name=table, con=conn, schema=schema,
if_exists="replace", index=False)
return len(df)
@task(name="Run dbt", retries=1)
def run_dbt(target: str = "prod"):
"""Runs dbt build for marts tag."""
import subprocess
result = subprocess.run(
["dbt", "build", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if result.returncode != 0:
raise RuntimeError(f"dbt build failed:\n{result.stderr}")
# ============ MAIN FLOW ============
@flow(
name="Daily Sales Pipeline",
description="ETL orchestration: ERP -> DWH -> dbt marts",
flow_run_name="sales-{execution_date}",
timeout_seconds=7200,
log_prints=True,
)
def sales_pipeline(execution_date: Optional[date] = None):
"""Main flow for the sales pipeline."""
logger = get_run_logger()
if execution_date is None:
execution_date = date.today()
logger.info(f"Starting sales pipeline for {execution_date}")
# Parallel extraction (Prefect manages concurrency automatically)
future_orders = extract_orders.submit(execution_date)
future_customers = extract_customers.submit(execution_date)
orders = future_orders.result()
customers = future_customers.result()
if len(orders) == 0:
logger.warning(f"No orders for {execution_date}, skipping processing")
return {"status": "skipped", "date": str(execution_date)}
enriched_df = validate_and_enrich(orders, customers)
record_count = load_to_dwh(enriched_df, schema="silver", table="enriched_orders")
run_dbt(target="prod")
logger.info(f"Pipeline completed: {record_count} records processed")
return {"status": "success", "records": record_count}
# ============ DEPLOYMENT ============
if __name__ == "__main__":
from prefect.deployments import DeploymentImage
sales_pipeline.deploy(
name="prod-daily",
cron="0 5 * * *",
work_pool_name="k8s-work-pool",
image=DeploymentImage(
name="myregistry/sales-pipeline:latest",
platform="linux/amd64",
),
)
Airflow vs Dagster vs Prefect: Which is Best?
The honest answer is: it depends. Each tool has an ideal use case and specific strengths. Here is a structured comparison to guide your decision.
Detailed Comparison: Airflow vs Dagster vs Prefect (2025)
| Criterion | Airflow 3.0 | Dagster 1.9 | Prefect 3 |
|---|---|---|---|
| Paradigm | Task-centric (DAG) | Asset-centric (SDA) | Task-centric (Python-first) |
| Learning curve | High (DAG semantics, providers, hooks) | Medium-High (asset model, IO Manager) | Low (decorator on existing functions) |
| Data Lineage | Limited (task dependencies) | Excellent (native, visual) | Limited (recently added) |
| Local Dev Experience | Moderate (heavy Docker Compose) | Excellent (dagster dev, local UI) | Excellent (pip install, run locally) |
| Testing | Complex (depends on infra) | Excellent (assets as pure functions) | Excellent (tasks as pure functions) |
| dbt integration | Good (dbt Cloud operator) | Excellent (dagster-dbt, native asset) | Good (prefect-dbt) |
| Backfill | Excellent (scheduler-managed in 3.0) | Excellent (native partitioning) | Moderate (manual or automations) |
| Ecosystem | Huge (35k+ stars, hundreds of providers) | Growing (10k+ stars, enterprise focus) | Large (15k+ stars, developer focus) |
| Managed cloud cost | Astronomer from $500/month | Dagster Cloud from $500/month | Prefect Cloud with generous free tier |
| Best for | Teams with Airflow experience, classic ETL pipelines | Data-quality-focused teams, dbt integration, ML pipelines | Python teams, rapid prototyping, event-driven workflows |
Quick Decision Guide
- Choose Airflow if: your team already has Airflow expertise, you work with complex ecosystems (Spark, Hadoop, many providers), you need the huge operator ecosystem, or you are on AWS (MWAA) or GCP (Cloud Composer) which manage it natively.
- Choose Dagster if: data quality and lineage are top priorities, your stack includes dbt and you want deep integration, the team thinks in assets rather than tasks, or you are building ML pipelines where tracking every artifact matters.
- Choose Prefect if: you want the lowest learning curve, the team is Python-first with no prior orchestration experience, you need event-driven and operational workflows (not just data), or you want to start fast with Prefect Cloud's free tier.
Temporal: Durable Execution for Complex Workflows
While Airflow, Dagster, and Prefect focus on orchestrating data pipelines, Temporal solves a different problem: the durable execution of long-running application workflows that must survive crashes, restarts, and network outages.
Developed by former Uber engineers (who had built Cadence, its predecessor), Temporal treats every workflow as a durable stateful function: if the server restarts while a workflow is running, it resumes exactly where it left off, without losing state. This is fundamental for workflows that run for hours, days, or weeks (e.g., user onboarding processes, AI agent orchestration, saga pattern for distributed transactions).
Temporal vs Airflow/Dagster: When to Choose Which
| Scenario | Recommended Tool | Reason |
|---|---|---|
| Daily ETL pipelines | Airflow / Dagster / Prefect | Optimized for batch data orchestration |
| dbt transformations | Dagster (best) / Airflow | Native integration, asset lineage |
| ML training pipelines | Dagster / Prefect | Artifact tracking, simplified testing |
| Multi-step user onboarding | Temporal | Durability, application state management |
| AI agent / LLM orchestration | Temporal / Prefect | Long-running, complex failure handling |
| Saga pattern / distributed transactions | Temporal | Compensating transactions, durability |
Reference Architecture: Orchestration in the Modern Data Stack
How does an orchestrator integrate with the rest of the data stack? Let us look at a complete reference architecture combining all the components we have seen in this series: Airbyte for ingestion, dbt for transformations, Apache Iceberg as the table format, and Dagster as the central orchestrator.
# definitions.py - Complete data stack with Dagster
from dagster import Definitions, load_assets_from_modules
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance
from dagster_dbt import DbtCliResource
from dagster_aws.s3 import S3Resource
from assets import ingestion, silver, gold, ml_features
# Airbyte assets: each connection becomes a Dagster asset
airbyte_assets = load_assets_from_airbyte_instance(
airbyte=AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
connection_filter=lambda meta: meta.name.startswith("prod_"),
)
# dbt assets: every dbt model becomes a Dagster asset with full lineage
@dbt_assets(manifest=Path("dbt_project/target/manifest.json"))
def dbt_all_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
python_assets = load_assets_from_modules([ingestion, silver, gold, ml_features])
resources_prod = {
"airbyte": AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
"dbt": DbtCliResource(
project_dir="dbt_project",
profiles_dir="dbt_project",
target="prod",
),
"s3": S3Resource(region_name="us-east-1"),
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
),
}
defs = Definitions(
assets=[*airbyte_assets, dbt_all_assets, *python_assets],
resources=resources_prod,
jobs=[sales_pipeline_job, customers_job, ml_pipeline_job],
schedules=[sales_pipeline_schedule, ml_weekly_schedule],
sensors=[s3_new_file_sensor, airbyte_sync_sensor],
)
Monitoring, Alerting, and Observability
An orchestrator without monitoring is like an aircraft without instruments. Pipeline monitoring is essential for guaranteeing SLAs, diagnosing issues, and communicating the state of the data platform to the business.
Monitoring Stack for Orchestrated Pipelines
| Layer | Tool | What It Monitors |
|---|---|---|
| Infrastructure metrics | Prometheus + Grafana | Worker CPU/RAM, task queue depth, scheduler latency |
| Pipeline metrics | Airflow metrics / Dagster events / Prefect API | Task success rate, duration, SLA violations |
| Centralized logs | ELK Stack / CloudWatch / Loki | Per-task instance logs for debugging |
| On-call alerting | PagerDuty / OpsGenie | Escalation for critical failures, SLA breach |
| Team notifications | Slack / Microsoft Teams | Success/failure notifications, daily digest |
| Data quality | Elementary / dbt tests / Great Expectations | Data anomalies, freshness, row count checks |
Best Practices for Robust Production Pipelines
1. Idempotency: the Golden Rule
Every task must be idempotent: executing it multiple times with the same parameters must produce the same result. This property is fundamental because retries (automatic or manual) are inevitable in production.
# WRONG: not idempotent (double insert on retry)
def load_data_wrong(**context):
df = extract_data()
pg_hook.insert_rows("staging.orders", df.values.tolist()) # APPEND!
# CORRECT: idempotent (DELETE + INSERT for the specific date)
def load_data_correct(**context):
execution_date = context["ds"]
df = extract_data(execution_date)
with pg_hook.get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM staging.orders WHERE DATE(order_date) = %s",
[execution_date]
)
execute_values(cur, """
INSERT INTO staging.orders (order_id, order_date, amount)
VALUES %s
""", df[["order_id", "order_date", "amount"]].values.tolist())
conn.commit()
# OR: use INSERT ... ON CONFLICT for atomic UPSERT
def load_data_upsert(**context):
execution_date = context["ds"]
df = extract_data(execution_date)
pg_hook.insert_rows(
table="staging.orders",
rows=df.values.tolist(),
target_fields=["order_id", "order_date", "amount"],
replace=True,
replace_index=["order_id"],
)
2. Retry Policy and Exponential Backoff
Always configure a reasonable retry policy. Transient failures (network timeouts, database locks, temporarily unavailable services) often resolve with a retry after a few minutes. Exponential backoff avoids overloading systems that are already under pressure.
# Recommended retry policies for different task types
from datetime import timedelta
# Tasks extracting from external APIs: aggressive retries with backoff
default_args_api = {
"retries": 5,
"retry_delay": timedelta(minutes=1),
"retry_exponential_backoff": True, # 1, 2, 4, 8, 16 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(minutes=30),
}
# dbt transformation tasks: conservative retries
default_args_dbt = {
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
# Notification tasks: no retry (avoid alert spam)
default_args_notify = {
"retries": 0,
"execution_timeout": timedelta(minutes=2),
}
3. Testing Pipelines
An untested pipeline is guaranteed technical debt. Here is how to structure tests for Airflow DAGs and Dagster assets.
# tests/test_sales_pipeline.py
import pytest
from dagster import materialize
from assets import raw_orders, silver_enriched_orders
from flows.sales_pipeline import validate_and_enrich
from prefect.testing.utilities import prefect_test_harness
import pandas as pd
# ============ AIRFLOW TESTS ============
from airflow.models import DagBag
def test_dag_integrity():
"""Verifies that DAGs are valid and have no cycles."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
assert len(dagbag.import_errors) == 0
dag = dagbag.get_dag("daily_sales_pipeline")
assert dag is not None
assert dag.schedule == "0 5 * * *"
# ============ DAGSTER TESTS ============
def test_raw_orders_asset():
"""Test raw_orders asset with mocked resources."""
mock_df = pd.DataFrame({
"order_id": [1, 2, 3],
"customer_id": [101, 102, 103],
"total_amount": [150.0, 89.99, 220.5],
})
from unittest.mock import patch
with patch("assets.ingestion.PostgresResource") as mock_pg:
mock_pg.return_value.execute_query.return_value = mock_df
result = materialize(
[raw_orders],
resources={"postgres_erp": mock_pg.return_value},
partition_key="2025-01-15",
)
assert result.success
assert len(result.output_for_node("raw_orders")) == 3
# ============ PREFECT TESTS ============
def test_validate_and_enrich():
"""Test of the validation task with test data."""
orders = pd.DataFrame({
"order_id": [1, 2],
"customer_id": [101, 102],
"total_amount": [100.0, 200.0],
})
customers = pd.DataFrame({
"id": [101, 102],
"city": ["Milan", "Rome"],
"segment": ["Premium", "Standard"],
})
with prefect_test_harness():
result = validate_and_enrich.fn(orders, customers)
assert len(result) == 2
assert "segment" in result.columns
assert result["segment"].isna().sum() == 0
def test_negative_amounts_raise_error():
"""Test that negative amounts raise an exception."""
orders = pd.DataFrame({
"order_id": [1],
"total_amount": [-50.0],
"customer_id": [101],
})
customers = pd.DataFrame({"id": [101], "city": ["Milan"], "segment": ["Standard"]})
with prefect_test_harness():
with pytest.raises(ValueError, match="Negative amounts found"):
validate_and_enrich.fn(orders, customers)
Anti-Patterns: Common Mistakes to Avoid
- Mutable global state in DAGs: Do not use Python global variables in DAG files. The DAG is parsed frequently by the scheduler; global variables create unpredictable behavior. Use XCom, Variables, or TaskFlow API instead.
- Business logic in DAG files: DAG files should only contain orchestration definitions. Business logic belongs in separate Python modules, imported by tasks. This facilitates testing and reduces parsing time.
- XCom for large datasets: XCom is designed for small metadata (counts, file paths, flags). Do not use it to pass DataFrames between tasks: use staging tables or S3 files instead.
- Monolithic pipelines without granularity: A single task that does everything (extract + transform + load) makes debugging impossible. Each logically distinct operation should be a separate task.
-
Catchup=True with frequent schedules: If you enable catchup on an
hourly pipeline and start it after 30 days of downtime, you will launch 720 executions
simultaneously. Always default to
catchup=Falseand manage backfill explicitly. - Hardcoded secrets: Never embed passwords, API keys, or connection strings directly in code. Use Airflow Connections, Dagster Resources, or Prefect Blocks integrated with AWS Secrets Manager or HashiCorp Vault.
Conclusions and Recommendations
Orchestration is the backbone of any mature data stack. Without a reliable orchestrator, even the most sophisticated pipelines become fragile, opaque, and hard to manage. We have explored the three main tools in the 2025 market and seen how each addresses different needs.
Recommendations by Scenario
- Team starting out: Choose Prefect 3 with Prefect Cloud free tier. Time-to-value is minimal, the learning curve is low, and you can move to more enterprise-grade solutions as complexity demands.
- Team with a dbt stack: Dagster is the natural choice. The native dbt integration via dagster-dbt and automatic lineage between dbt models and Dagster assets dramatically reduce team cognitive load.
- Enterprise with existing Airflow expertise: Airflow 3.0 on Astronomer (managed) or MWAA. There is no compelling reason to migrate if the team is already productive with Airflow, and version 3.0 has closed most of the gap with Dagster and Prefect.
- Long-running application workflows: Consider Temporal alongside your preferred data orchestrator. They are complementary tools, not alternatives.
The next article in the series dives into AI applied to Manufacturing: we will see how to build predictive maintenance systems with IoT sensors, Apache Kafka for streaming machine data, and ML models to anticipate failures before they happen. A concrete use case demonstrating how the data stack we are building becomes the foundation of enterprise artificial intelligence.
Checklist for Taking Your Orchestrator to Production
- All tasks are idempotent: re-running them does not produce duplicates
- Retry policy configured with exponential backoff for every task
- Secrets managed via Connections/Blocks/Resources, never hardcoded
- Active monitoring with Grafana or equivalent for infrastructure metrics
- Alerting on Slack or PagerDuty for failures and SLA breach
- DAG/asset integrity tests in the CI/CD pipeline
- Catchup disabled by default, backfill managed explicitly
- Business logic separated from pipeline definition files
- Centralized logs accessible without direct worker access
- Inline documentation: every DAG/asset has a clear description
Related Articles and Series
- Previous article: Modern ETL vs ELT: dbt, Airbyte and Fivetran
- Next article: AI in Manufacturing: Predictive Maintenance and Digital Twin
- Related series: MLOps for Business - AI Models in Production with MLflow
- Related series: LLM in the Enterprise: RAG, Fine-Tuning and Guardrails







