From Idea to Production: The Complete ML Pipeline
Building an ML model that works in a Jupyter notebook is only 10% of the work. The remaining 90% consists of transforming it into a production-ready system: reliable, monitored, reproducible, and maintainable. An end-to-end ML pipeline automates the entire flow, from raw data to production predictions, ensuring consistency and quality at every execution. This article builds a complete pipeline for a real problem: churn prediction (predicting customer abandonment).
The pipeline covers seven phases: data loading, preprocessing, feature engineering, training, evaluation, model selection, and deployment. Each phase is a modular and testable component, orchestrated by tools like MLflow for experiment tracking and Docker for containerization.
What You Will Learn in This Article
- Architecture of a complete ML pipeline
- Case study: end-to-end churn prediction
- Experiment tracking with MLflow
- Data versioning and reproducibility
- Model serving with FastAPI
- Monitoring and retraining in production
Phase 1: Data Loading and Validation
The first phase loads data from the source and verifies its quality. Checks include: schema validation (expected columns, data types), data quality checks (missing value percentage, valid ranges, anomalous distributions), and data profiling (descriptive statistics). If data fails checks, the pipeline stops with a clear error instead of producing an unreliable model.
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class DataValidationResult:
"""Data validation result."""
is_valid: bool
errors: List[str]
warnings: List[str]
stats: Dict[str, float]
def load_and_validate(filepath: str, expected_columns: List[str]) -> tuple:
"""Load and validate the dataset."""
errors = []
warnings = []
# Loading
df = pd.read_csv(filepath)
# Schema validation
missing_cols = set(expected_columns) - set(df.columns)
if missing_cols:
errors.append(f"Missing columns: {missing_cols}")
# Data quality checks
null_pct = df.isnull().mean()
high_null_cols = null_pct[null_pct > 0.3].index.tolist()
if high_null_cols:
warnings.append(f"Columns with >30% null: {high_null_cols}")
# Duplicates
n_dupes = df.duplicated().sum()
if n_dupes > 0:
warnings.append(f"{n_dupes} duplicate rows found")
# Statistics
stats = {
'n_rows': len(df),
'n_cols': len(df.columns),
'null_pct_avg': null_pct.mean(),
'n_duplicates': n_dupes
}
result = DataValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
stats=stats
)
return df, result
# Usage
# df, validation = load_and_validate('data/churn.csv', expected_columns)
# if not validation.is_valid:
# raise ValueError(f"Validation failed: {validation.errors}")
print("Pipeline Stage 1: Data Loading & Validation - OK")
Phase 2-3: Preprocessing and Feature Engineering
Preprocessing and feature engineering are encapsulated in scikit-learn Pipelines and
custom transformers. Custom transformers extend BaseEstimator and
TransformerMixin to integrate seamlessly into the pipeline. This ensures the same transformations
are applied in both training and production, eliminating the risk of inconsistencies.
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
import numpy as np
import pandas as pd
class FeatureEngineer(BaseEstimator, TransformerMixin):
"""Custom transformer for feature engineering."""
def fit(self, X, y=None):
return self
def transform(self, X):
df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
# Derived features (churn prediction example)
if 'tenure' in df.columns and 'monthly_charges' in df.columns:
df['total_spent'] = df['tenure'] * df['monthly_charges']
df['avg_monthly_ratio'] = df['monthly_charges'] / (df['tenure'] + 1)
if 'tenure' in df.columns:
df['tenure_group'] = pd.cut(
df['tenure'], bins=[0, 12, 24, 48, 72],
labels=['new', 'developing', 'mature', 'loyal']
)
return df
def build_preprocessing_pipeline(
numeric_features: list,
categorical_features: list
) -> Pipeline:
"""Build the preprocessing pipeline."""
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(drop='first', handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
return Pipeline([
('feature_engineer', FeatureEngineer()),
('preprocessor', preprocessor)
])
print("Pipeline Stage 2-3: Preprocessing & Feature Engineering - OK")
Phase 4-5: Training, Evaluation, and Model Selection
The training phase systematically compares multiple algorithms with cross-validation and selects the best one. Experiment tracking with MLflow automatically records parameters, metrics, and artifacts of every experiment, making the process reproducible and comparable. The selected model is serialized and versioned for deployment.
from sklearn.ensemble import (
RandomForestClassifier, GradientBoostingClassifier
)
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_validate, StratifiedKFold
from sklearn.metrics import make_scorer, f1_score, roc_auc_score
import numpy as np
from datetime import datetime
class ExperimentTracker:
"""Simplified ML experiment tracker."""
def __init__(self):
self.experiments = []
def log_experiment(self, name, params, metrics, model):
self.experiments.append({
'name': name,
'params': params,
'metrics': metrics,
'model': model,
'timestamp': datetime.now().isoformat()
})
def get_best(self, metric='f1'):
return max(self.experiments, key=lambda x: x['metrics'].get(metric, 0))
def train_and_evaluate(X, y, tracker):
"""Train and evaluate multiple models."""
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scoring = {
'accuracy': 'accuracy',
'f1': 'f1',
'roc_auc': 'roc_auc',
'precision': 'precision',
'recall': 'recall'
}
models = {
'LogisticRegression': LogisticRegression(max_iter=10000, random_state=42),
'RandomForest': RandomForestClassifier(n_estimators=200, random_state=42),
'GradientBoosting': GradientBoostingClassifier(
n_estimators=200, learning_rate=0.05, max_depth=5, random_state=42
)
}
for name, model in models.items():
results = cross_validate(
model, X, y, cv=cv, scoring=scoring, return_train_score=True
)
metrics = {
metric: results[f'test_{metric}'].mean()
for metric in scoring.keys()
}
tracker.log_experiment(name, model.get_params(), metrics, model)
print(f"{name:<25s} F1={metrics['f1']:.3f} AUC={metrics['roc_auc']:.3f}")
best = tracker.get_best('f1')
print(f"\nBest model: {best['name']} (F1={best['metrics']['f1']:.3f})")
return best
# Usage
tracker = ExperimentTracker()
# best_model = train_and_evaluate(X_preprocessed, y, tracker)
print("Pipeline Stage 4-5: Training & Evaluation - OK")
Phase 6: Model Serving with FastAPI
Deploying the model as a REST API with FastAPI allows any application to get real-time predictions. The API accepts JSON requests with customer features and returns the churn probability and predicted class. The serialized model (with joblib or pickle) is loaded at server startup.
# api.py - Model deployment with FastAPI
# pip install fastapi uvicorn
from dataclasses import dataclass
from typing import Dict, Any
import json
@dataclass
class PredictionRequest:
"""Prediction request schema."""
tenure: int
monthly_charges: float
total_charges: float
contract: str
payment_method: str
@dataclass
class PredictionResponse:
"""Response schema."""
churn_probability: float
prediction: str
confidence: float
model_version: str
class ModelServer:
"""ML model server."""
def __init__(self, model_path: str, version: str = "1.0.0"):
self.version = version
# In production: self.model = joblib.load(model_path)
# self.pipeline = joblib.load(f"{model_path}/pipeline.pkl")
print(f"Model v{version} loaded from {model_path}")
def predict(self, request: PredictionRequest) -> PredictionResponse:
"""Generate prediction for a single customer."""
# features = self.pipeline.transform(request_to_dataframe(request))
# proba = self.model.predict_proba(features)[0]
# Simulation
proba = [0.3, 0.7]
return PredictionResponse(
churn_probability=round(proba[1], 3),
prediction="churn" if proba[1] > 0.5 else "no_churn",
confidence=round(max(proba), 3),
model_version=self.version
)
# FastAPI app (in production):
# app = FastAPI(title="Churn Prediction API")
# server = ModelServer("models/best_model")
#
# @app.post("/predict")
# async def predict(request: PredictionRequest):
# return server.predict(request)
#
# Start: uvicorn api:app --host 0.0.0.0 --port 8000
print("Pipeline Stage 6: Model Serving - OK")
Phase 7: Monitoring and Retraining
A production model degrades over time due to data drift (production data changes compared to training) and concept drift (the relationship between features and target changes). Continuous monitoring tracks model performance, input distributions, and prediction distributions. When metrics drop below a threshold, automatic retraining is triggered.
ML Production Checklist: (1) Data and model versioning, (2) Reproducible pipeline, (3) Automated tests for preprocessing and predictions, (4) Production metrics monitoring, (5) Alerts for data drift and performance degradation, (6) Quick rollback to previous version, (7) Complete logging for debugging, (8) A/B testing for new model versions.
MLOps Tools
The MLOps ecosystem offers specialized tools for every phase. MLflow for experiment tracking and model registry. DVC (Data Version Control) for data and pipeline versioning. Docker for containerization. GitHub Actions for CI/CD. Prometheus + Grafana for monitoring. Great Expectations for data quality. Tool choice depends on project scale and existing infrastructure.
Key Takeaways
- An end-to-end ML pipeline covers 7 phases: from raw data to monitored production model
- Custom transformers in scikit-learn ensure consistency between training and inference
- Experiment tracking (MLflow) makes experiments reproducible and comparable
- FastAPI + Docker is the standard stack for model serving
- Data drift and concept drift require continuous monitoring and automatic retraining
- Reproducibility is the most important requirement in a production ML pipeline







