02 - ML Pipeline s CI/CD: GitHub Actions + Docker
V prvním článku série jsme viděli, proč 85 % projektů ML nikdy nedosáhne produkce a jak MLOps tento problém řeší. Z monolitického notebooku jsme udělali potrubí modulární a konfigurovatelné. Nyní je čas udělat další krok: zautomatizovat celek potrubí s CI/CD, takže jakékoli změny kódu, dat nebo konfigurace automaticky spouští školení, ověřování a nasazení modelu.
V tomto článku vytvoříme kompletní kanál CI/CD pro strojové učení Akce GitHubu jako orchestrátor a Přístavní dělník jako runtime of provedení. Nebudeme se omezovat na teorii – vytvoříme pracovní projekt s klasifikátorem sentimentu, doplněný vícestupňovým souborem Dockerfile, pracovním postupem YAML, ověřováním dat, registrem modelů a automatické nasazení.
Co se naučíte
- Proč se CI/CD pro ML liší od tradičního softwarového CI/CD
- Jak navrhnout architekturu end-to-end ML potrubí
- Jak vytvořit vícefázové soubory Dockerfiles optimalizované pro školení a obsluhu
- Jak napsat kompletní pracovní postupy GitHub Actions pro ML
- Jak integrovat ověřování dat, registr modelů a automatické nasazení
- Jak spravovat verzování dat pomocí DVC v procesu
- Jak implementovat testování specifické pro ML (jednotka, integrace, kouř)
- Jak monitorovat model po nasazení
- Jak optimalizovat náklady pomocí ukládání do mezipaměti a samoobslužných běžců
- Jak vybrat správný nástroj CI/CD pro váš tým
proč CI/CD pro ML a Different
Pokud pocházíte ze světa tradičního vývoje softwaru, můžete si myslet, že stačí použít stejné postupy CI/CD jako projekty ML. Ve skutečnosti strojové učení přináší složitost jedinečné, které vyžadují specifický přístup. Zásadní rozdíl je v tom softwaru Tradičně CI/CD spravuje pouze jeden artefakt (kód), zatímco v ML jich musí spravovat několik tři najednou: kód, data a model.
Tři artefakty ML
Pokud se v tradičním softwaru nezmění kód, nezmění se ani výstup. V ML i s identický kód, změna v datech vytváří jiný model. To znamená, že potrubí CI/CD musí sledovat a ověřovat tři nezávislé dimenze.
| Velikost | Tradiční CI/CD | CI/CD pro ML |
|---|---|---|
| Kód | Git push triggery sestavení + test | Git push spouští trénink + vyhodnocení |
| Data | Nelze použít | Nová data spouští rekvalifikaci |
| Model | Nelze použít | Nový model vyžaduje ověření + propagaci |
| Konfigurace | Příznaky funkcí, env vars | Hyperparametry, sady funkcí, metrické prahy |
| Prostředí | OS + knihovny | OS + knihovny + ovladače GPU + verze CUDA |
| Validace | Testy projde/nevyhoví | Nadlimitní/podprahové metriky + srovnání s modelem ve výrobě |
| Nasazení | Nasadit nebo vrátit zpět | Postupné nasazení + A/B testování + sledování driftu |
Kontinuální školení: klíčový koncept
CI/CD pro ML zavádí koncept, který v tradičním softwaru neexistuje: a Průběžný trénink (CT). Stejně jako kontinuální integrace a kontinuální Nasazení, CT zajišťuje, že model je automaticky přeškolen, když:
- Přicházejí nová data: datový soubor je aktualizován o nová pozorování
- Změňte kód: předzpracování nebo algoritmus se změní
- Degradují metriky: monitorování detekuje posun dat nebo pokles výkonu
- Časovač vyprší: je aktivována plánovaná rekvalifikace (např. týdenní).
Běžná chyba: CI/CD bez CT
Mnoho týmů implementuje CI/CD pro kód ML, ale zapomíná na průběžné školení. Výsledkem je model, který je jednou nasazen a poté se již nikdy neaktualizuje, tiše degraduje v průběhu času, jak se data ve výrobě rozcházejí z tréninkových dat. Potrubí bez CT je jako auto bez údržby: Funguje, dokud se nerozbije.
Architektura potrubí ML
Před napsáním kódu navrhneme kompletní architekturu pipeline. Každá fáze má specifické vstupy a výstupy a porucha jedné fáze blokuje ty následující. Toto Přístup „fail fast“ zajišťuje, že se do výroby dostanou pouze ověřené modely.
+------------------+ +------------------+ +------------------+
| DATA INGESTION |---->| PREPROCESSING |---->| TRAINING |
| | | | | |
| - Pull dati DVC | | - Pulizia | | - Train modello |
| - Validazione | | - Feature eng. | | - Log metriche |
| - Schema check | | - Split train/ | | - Log parametri |
| | | test/val | | - Salva artefatti|
+------------------+ +------------------+ +------------------+
| |
| (trigger: dati |
| nuovi/schedule) v
| +------------------+
| | EVALUATION |
| | |
| | - Metriche |
| | - Confronto con |
| | produzione |
| | - Gate: soglie |
| +------------------+
| |
| (se metriche > threshold)
| v
+------------------+ +------------------+ +------------------+
| MONITORING |<----| SMOKE TEST |<----| DEPLOYMENT |
| | | | | |
| - Health check | | - Test endpoint | | - Push registry |
| - Drift detect. | | - Predizione | | - Stage/Prod |
| - Alert | | di prova | | - Rollback ready |
| - Trigger retrain| | - Latenza check | | |
+------------------+ +------------------+ +------------------+
Každý blok odpovídá kroku pracovního postupu GitHub Actions. Podívejme se nyní podrobně popisuje, jak implementovat jednotlivé fáze, počínaje kontejnerizací pomocí Dockeru.
Docker pro strojové učení
Docker řeší jeden z nejvíce frustrujících problémů v ML: "funguje na mém stroji". Kontejnerizací školícího a obslužného prostředí zajistíme, že kód vytvoří stejné výsledky, ať běží kdekoli: na notebooku datového vědce, v CI/CD runneru a ve výrobě. U ML vyžaduje Docker zvláštní pozornost: obrázky mají tendenci být velmi velké (vědecké knihovny + ovladače GPU) a sestavení může být pomalé.
Základní obrázky pro ML
Volba obrazové základny je rozhodující pro velikost a kompatibilitu. Zde jsou možnosti hlavní a kdy je použít.
| Základní obrázek | Velikost | Použití | Kdy si to vybrat |
|---|---|---|---|
| krajta:3.11-tenká | ~120 MB | Školení/obsluhování CPU | Scikit-learn modely, XGBoost, lehké podávání |
| python:3.11-knihomol | ~900 MB | Školení se sestavovacími nástroji | Závislosti, které vyžadují kompilaci C/C++ |
| nvidia/cuda: 12.1-runtime | ~3,5 GB | Inference GPU | Podávat modely hlubokého učení |
| nvidia/cuda: 12.1-vývoj | ~5,2 GB | školení GPU | Školení PyTorch/TensorFlow s CUDA |
| pytorch/pytorch:2.1.0-cuda12.1 | ~6 GB | Školení/obsluhování PyTorch | Projekty PyTorch, které se chtějí vyhnout ručnímu nastavení CUDA |
Dockerfile Multi-Stage pro školení a poskytování
Vícestupňový vzor je základem ML. Pomocí dvou samostatných fází můžeme mít kompletní prostředí pro sestavení (s kompilátory a nástroji pro sestavení) a konečný obraz zjednodušené, které obsahuje pouze nezbytné runtime. Tím se zmenší velikost obrázku konečná až 60 %.
# ============================================
# Stage 1: Builder - installa dipendenze
# ============================================
FROM python:3.11-slim AS builder
WORKDIR /build
# Installa build tools necessari per compilare dipendenze native
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Copia e installa dipendenze in un virtual environment
COPY requirements.txt .
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# ============================================
# Stage 2: Training - esegue il training
# ============================================
FROM python:3.11-slim AS trainer
WORKDIR /app
# Copia virtual environment dallo stage builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copia codice sorgente
COPY src/ ./src/
COPY config/ ./config/
COPY train.py .
COPY evaluate.py .
# Entrypoint per training
ENTRYPOINT ["python", "train.py"]
# ============================================
# Stage 3: Serving - API di produzione
# ============================================
FROM python:3.11-slim AS serving
WORKDIR /app
# Utente non-root per sicurezza
RUN useradd --create-home appuser
# Copia virtual environment dallo stage builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copia solo il codice necessario per serving
COPY src/serving/ ./src/serving/
COPY src/preprocessing/ ./src/preprocessing/
# Healthcheck endpoint
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"
# Switch a utente non-root
USER appuser
# Porta di default
EXPOSE 8000
# Entrypoint per serving
ENTRYPOINT ["uvicorn", "src.serving.app:app", "--host", "0.0.0.0", "--port", "8000"]
Proč Multi-Stage pro ML?
- Bezpečnost: zobrazovaný obraz neobsahuje kompilátory ani nástroje pro vytváření
- Velikost: fáze podávání je mnohem lehčí (~300 MB oproti ~1,2 GB)
- Mezipaměť: závislosti se mění méně často než kód a využívají výhody mezipaměti vrstvy
- Flexibilita: můžete postavit pouze tréninkovou fázi nebo pouze podávací fázi
Optimalizace mezipaměti vrstvy
Pořadí příkazů COPY v Dockerfile je pro mezipaměť zásadní. Závislosti Pythonu
málokdy se mění, zdrojový kód se mění často. Prvním zkopírováním
requirements.txt a poté kód, vyhneme se přeinstalaci závislostí
s každou změnou kódu.
# Dati e modelli (gestiti da DVC, non da Docker)
data/
models/
*.pkl
*.h5
*.pt
# Ambiente di sviluppo
.venv/
__pycache__/
*.pyc
.pytest_cache/
.mypy_cache/
# Git e CI
.git/
.github/
.dvc/cache/
# IDE e editor
.vscode/
.idea/
*.swp
# Documentazione
docs/
*.md
LICENSE
Docker s podporou GPU
Chcete-li trénovat modely hlubokého učení, potřebujete podporu GPU v kontejneru. Docker podporuje GPU NVIDIA prostřednictvím sady NVIDIA Container Toolkit. Nastavení vyžaduje ovladač NVIDIA na hostiteli a nainstalované sadě nástrojů.
# Base image con CUDA runtime
FROM nvidia/cuda:12.1.1-runtime-ubuntu22.04 AS gpu-trainer
WORKDIR /app
# Installa Python e dipendenze di sistema
RUN apt-get update && apt-get install -y --no-install-recommends \
python3.11 \
python3.11-venv \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Virtual environment
RUN python3.11 -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Dipendenze PyTorch con CUDA
COPY requirements-gpu.txt .
RUN pip install --no-cache-dir -r requirements-gpu.txt
COPY src/ ./src/
COPY train.py .
# Variabili ambiente per CUDA
ENV NVIDIA_VISIBLE_DEVICES=all
ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility
ENTRYPOINT ["python", "train.py"]
# Build dell'immagine GPU
docker build -f Dockerfile.gpu -t ml-trainer:gpu .
# Esecuzione con accesso GPU
docker run --gpus all \
-v $(pwd)/data:/app/data \
-v $(pwd)/models:/app/models \
ml-trainer:gpu \
--config config/training.yaml
Akce GitHubu pro strojové učení
GitHub Actions je služba CI/CD integrovaná do GitHubu, která spouští automatizované pracovní postupy v reakci na události (push, pull request, plán, ruční odeslání). Pro ML to přináší významné výhody: nativní integrace s úložištěm Git, tržiště s akcemi předdefinované, správa tajných údajů pro přihlašovací údaje a až 2 000 volných minut měsíčně pro veřejná úložiště.
Struktura pracovního postupu ML
Pracovní postup akcí GitHub pro ML má specifickou strukturu: více odpovídajících úloh do fází potrubí s explicitními závislostmi mezi úlohami a podmínkami provádění na základě modelových metrik.
name: ML Pipeline - Train, Evaluate, Deploy
on:
# Trigger su push al branch main (codice o config)
push:
branches: [main]
paths:
- 'src/**'
- 'config/**'
- 'requirements.txt'
- 'train.py'
- 'evaluate.py'
# Trigger schedulato per retraining periodico
schedule:
- cron: '0 6 * * 1' # Ogni lunedi alle 6:00 UTC
# Trigger manuale con parametri
workflow_dispatch:
inputs:
force_deploy:
description: 'Forza il deployment anche se le metriche non migliorano'
required: false
default: 'false'
type: choice
options:
- 'true'
- 'false'
training_config:
description: 'File di configurazione per il training'
required: false
default: 'config/training.yaml'
env:
PYTHON_VERSION: '3.11'
DOCKER_REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/ml-model
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
jobs:
# ============================================
# Job 1: Data Validation
# ============================================
data-validation:
name: Validate Data Quality
runs-on: ubuntu-latest
outputs:
data_valid: ${{ steps.validate.outputs.valid }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Setup DVC
uses: iterative/setup-dvc@v2
- name: Pull data from DVC
run: dvc pull
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Validate data quality
id: validate
run: |
python -m src.data.validate_data \
--data-path data/raw/reviews.csv \
--schema-path config/data_schema.yaml
echo "valid=true" >> $GITHUB_OUTPUT
# ============================================
# Job 2: Model Training
# ============================================
training:
name: Train Model
needs: data-validation
if: needs.data-validation.outputs.data_valid == 'true'
runs-on: ubuntu-latest
outputs:
model_version: ${{ steps.train.outputs.model_version }}
run_id: ${{ steps.train.outputs.run_id }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Setup DVC and pull data
run: |
pip install dvc[s3]
dvc pull
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Train model
id: train
run: |
python train.py \
--config ${{ github.event.inputs.training_config || 'config/training.yaml' }} \
--output-dir models/
echo "model_version=$(cat models/version.txt)" >> $GITHUB_OUTPUT
echo "run_id=$(cat models/run_id.txt)" >> $GITHUB_OUTPUT
env:
MLFLOW_TRACKING_URI: ${{ env.MLFLOW_TRACKING_URI }}
MLFLOW_TRACKING_USERNAME: ${{ secrets.MLFLOW_USERNAME }}
MLFLOW_TRACKING_PASSWORD: ${{ secrets.MLFLOW_PASSWORD }}
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: trained-model
path: models/
retention-days: 30
# ============================================
# Job 3: Model Evaluation
# ============================================
evaluation:
name: Evaluate Model
needs: training
runs-on: ubuntu-latest
outputs:
metrics_pass: ${{ steps.evaluate.outputs.metrics_pass }}
accuracy: ${{ steps.evaluate.outputs.accuracy }}
f1_score: ${{ steps.evaluate.outputs.f1_score }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: trained-model
path: models/
- name: Setup DVC and pull test data
run: |
pip install dvc[s3]
dvc pull data/test/
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Evaluate model
id: evaluate
run: |
python evaluate.py \
--model-path models/model.pkl \
--test-data data/test/reviews_test.csv \
--thresholds config/thresholds.yaml \
--output metrics/report.json
echo "accuracy=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"accuracy\"])')" >> $GITHUB_OUTPUT
echo "f1_score=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"f1_score\"])')" >> $GITHUB_OUTPUT
echo "metrics_pass=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"pass\"])')" >> $GITHUB_OUTPUT
- name: Post metrics as PR comment
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const metrics = JSON.parse(fs.readFileSync('metrics/report.json'));
const body = `## Model Evaluation Results
| Metric | Value | Threshold | Status |
|--------|-------|-----------|--------|
| Accuracy | ${metrics.accuracy.toFixed(4)} | ${metrics.thresholds.accuracy} | ${metrics.accuracy >= metrics.thresholds.accuracy ? 'PASS' : 'FAIL'} |
| F1 Score | ${metrics.f1_score.toFixed(4)} | ${metrics.thresholds.f1_score} | ${metrics.f1_score >= metrics.thresholds.f1_score ? 'PASS' : 'FAIL'} |
| AUC-ROC | ${metrics.auc_roc.toFixed(4)} | ${metrics.thresholds.auc_roc} | ${metrics.auc_roc >= metrics.thresholds.auc_roc ? 'PASS' : 'FAIL'} |`;
github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: body
});
- name: Upload evaluation report
uses: actions/upload-artifact@v4
with:
name: evaluation-report
path: metrics/
# ============================================
# Job 4: Build and Push Docker Image
# ============================================
build-image:
name: Build Docker Image
needs: [evaluation]
if: |
needs.evaluation.outputs.metrics_pass == 'true' ||
github.event.inputs.force_deploy == 'true'
runs-on: ubuntu-latest
outputs:
image_tag: ${{ steps.meta.outputs.tags }}
steps:
- uses: actions/checkout@v4
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: trained-model
path: models/
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=sha,prefix=
type=raw,value=latest
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.DOCKER_REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push serving image
uses: docker/build-push-action@v5
with:
context: .
target: serving
push: true
tags: ${{ steps.meta.outputs.tags }}
cache-from: type=gha
cache-to: type=gha,mode=max
# ============================================
# Job 5: Deploy to Staging + Smoke Test
# ============================================
deploy:
name: Deploy and Smoke Test
needs: [build-image, training]
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
echo "Deploying model version ${{ needs.training.outputs.model_version }}"
# Qui il comando di deploy reale (kubectl, docker-compose, etc.)
- name: Smoke test
run: |
# Attendi che il servizio sia pronto
for i in $(seq 1 30); do
if curl -sf http://staging:8000/health; then
echo "Service is ready"
break
fi
echo "Waiting for service... attempt $i"
sleep 5
done
# Test di predizione
RESPONSE=$(curl -sf -X POST http://staging:8000/predict \
-H "Content-Type: application/json" \
-d '{"text": "This product is amazing, I love it!"}')
echo "Prediction response: $RESPONSE"
# Verifica che la risposta sia valida
echo "$RESPONSE" | python -c "
import sys, json
data = json.load(sys.stdin)
assert 'prediction' in data, 'Missing prediction field'
assert 'confidence' in data, 'Missing confidence field'
assert data['confidence'] > 0.5, 'Low confidence'
print('Smoke test PASSED')
"
Tajemství a bezpečnost v potrubí
Nikdy nezadávejte přihlašovací údaje přímo do pracovního postupu YAML. Vždy používejte GitHub Secrets pro klíče AWS, tokeny MLflow, přihlašovací údaje registru Docker a další citlivé informace. Nakonfigurujte tajemství v Nastavení > Tajemství a proměnné > Akce v úložišti GitHub.
Příklad projektu: Sentiment Classifier
Pojďme vše spojit do konkrétního projektu. Vytvoříme klasifikátor sentimentu pro recenze produktů, s kompletním CI/CD potrubím. Projekt využívá scikit-learn pro jednoduchost, ale architektura platí identicky pro modely PyTorch nebo TensorFlow.
Struktura projektu
sentiment-classifier/
src/
data/
__init__.py
preprocessing.py # Pulizia e trasformazione testi
validate_data.py # Validazione schema e qualità
models/
__init__.py
trainer.py # Training del classificatore
serving/
__init__.py
app.py # FastAPI application
schemas.py # Pydantic schemas
monitoring/
__init__.py
health.py # Health checks
tests/
test_preprocessing.py # Unit test preprocessing
test_trainer.py # Unit test training
test_api.py # Integration test API
config/
training.yaml # Configurazione training
thresholds.yaml # Soglie metriche
data_schema.yaml # Schema validazione dati
train.py # Entrypoint training
evaluate.py # Entrypoint evaluation
Dockerfile # Multi-stage build
requirements.txt # Dipendenze Python
.github/
workflows/
ml-pipeline.yml # Pipeline CI/CD
.dvc/ # Configurazione DVC
dvc.yaml # Pipeline DVC
dvc.lock # Lock file DVC
Školicí skripty
"""Script principale di training per il classificatore di sentiment."""
import argparse
import yaml
import mlflow
import mlflow.sklearn
from pathlib import Path
from datetime import datetime
from src.data.preprocessing import load_and_preprocess_data, split_dataset
from src.models.trainer import create_pipeline, train_model
def parse_args():
"""Parse degli argomenti da riga di comando."""
parser = argparse.ArgumentParser(
description="Train sentiment classifier"
)
parser.add_argument(
"--config",
type=str,
default="config/training.yaml",
help="Path al file di configurazione"
)
parser.add_argument(
"--output-dir",
type=str,
default="models/",
help="Directory per salvare il modello"
)
parser.add_argument(
"--experiment-name",
type=str,
default="sentiment-classifier",
help="Nome dell'esperimento MLflow"
)
return parser.parse_args()
def main():
"""Esegue la pipeline di training completa."""
args = parse_args()
# 1. Carica configurazione
with open(args.config) as f:
config = yaml.safe_load(f)
# 2. Setup MLflow
mlflow.set_experiment(args.experiment_name)
with mlflow.start_run(run_name=f"train-{datetime.now().strftime('%Y%m%d-%H%M%S')}") as run:
# 3. Log dei parametri
mlflow.log_params(config["model"])
mlflow.log_param("data_path", config["data"]["train_path"])
mlflow.log_param("test_size", config["data"]["test_size"])
# 4. Caricamento e preprocessing dati
print("[1/5] Caricamento e preprocessing dati...")
X, y = load_and_preprocess_data(config["data"]["train_path"])
# 5. Split dataset
print("[2/5] Split train/validation...")
X_train, X_val, y_train, y_val = split_dataset(
X, y,
test_size=config["data"]["test_size"],
random_state=config["data"]["random_state"]
)
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("val_size", len(X_val))
# 6. Crea pipeline di preprocessing + modello
print("[3/5] Creazione pipeline ML...")
pipeline = create_pipeline(config["model"])
# 7. Training
print("[4/5] Training in corso...")
trained_pipeline = train_model(pipeline, X_train, y_train)
# 8. Valutazione su validation set
print("[5/5] Valutazione...")
from src.models.trainer import evaluate_model
metrics = evaluate_model(trained_pipeline, X_val, y_val)
# 9. Log metriche in MLflow
mlflow.log_metrics(metrics)
# 10. Salva modello
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
import joblib
model_path = output_dir / "model.pkl"
joblib.dump(trained_pipeline, model_path)
# Log modello in MLflow con signature
from mlflow.models.signature import infer_signature
signature = infer_signature(X_train[:5], trained_pipeline.predict(X_train[:5]))
mlflow.sklearn.log_model(
trained_pipeline,
"model",
signature=signature,
registered_model_name="sentiment-classifier"
)
# Salva versione e run_id per il CI/CD
(output_dir / "version.txt").write_text(run.info.run_id[:8])
(output_dir / "run_id.txt").write_text(run.info.run_id)
print(f"\nTraining completato!")
print(f" Run ID: {run.info.run_id}")
for name, value in metrics.items():
print(f" {name}: {value:.4f}")
if __name__ == "__main__":
main()
Modul předběžného zpracování
"""Modulo per il preprocessing dei dati testuali."""
import re
import pandas as pd
from typing import Tuple
from sklearn.model_selection import train_test_split
def clean_text(text: str) -> str:
"""Pulisce un singolo testo rimuovendo HTML, caratteri speciali e spazi extra."""
if not isinstance(text, str):
return ""
# Rimuovi tag HTML
text = re.sub(r'<[^>]+>', '', text)
# Rimuovi URL
text = re.sub(r'http\S+|www\.\S+', '', text)
# Rimuovi caratteri speciali (mantieni lettere, numeri, spazi)
text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
# Normalizza spazi
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
def load_and_preprocess_data(data_path: str) -> Tuple[pd.Series, pd.Series]:
"""Carica e preprocess il dataset di recensioni."""
df = pd.read_csv(data_path)
# Validazione colonne richieste
required_cols = ["review_text", "sentiment"]
missing = [c for c in required_cols if c not in df.columns]
if missing:
raise ValueError(f"Colonne mancanti nel dataset: {missing}")
# Rimuovi righe con valori mancanti
df = df.dropna(subset=required_cols)
# Pulisci testi
df["clean_text"] = df["review_text"].apply(clean_text)
# Rimuovi testi vuoti dopo pulizia
df = df[df["clean_text"].str.len() > 0]
return df["clean_text"], df["sentiment"]
def split_dataset(
X: pd.Series,
y: pd.Series,
test_size: float = 0.2,
random_state: int = 42
) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series]:
"""Split stratificato del dataset."""
return train_test_split(
X, y,
test_size=test_size,
random_state=random_state,
stratify=y
)
Tréninkový modul
"""Modulo per la creazione, il training e la valutazione del modello."""
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
accuracy_score, f1_score, precision_score,
recall_score, roc_auc_score
)
from typing import Dict, Any
import pandas as pd
MODELS = {
"logistic_regression": LogisticRegression,
"random_forest": RandomForestClassifier,
}
def create_pipeline(model_config: Dict[str, Any]) -> Pipeline:
"""Crea una pipeline sklearn con TF-IDF + classificatore."""
algorithm = model_config.get("algorithm", "logistic_regression")
model_class = MODELS.get(algorithm)
if model_class is None:
raise ValueError(
f"Algoritmo non supportato: {algorithm}. "
f"Supportati: {list(MODELS.keys())}"
)
# Parametri specifici del modello
model_params = {
k: v for k, v in model_config.items()
if k not in ("algorithm", "tfidf")
}
# Parametri TF-IDF
tfidf_params = model_config.get("tfidf", {})
return Pipeline([
("tfidf", TfidfVectorizer(
max_features=tfidf_params.get("max_features", 10000),
ngram_range=tuple(tfidf_params.get("ngram_range", [1, 2])),
min_df=tfidf_params.get("min_df", 2),
max_df=tfidf_params.get("max_df", 0.95),
)),
("classifier", model_class(**model_params)),
])
def train_model(
pipeline: Pipeline,
X_train: pd.Series,
y_train: pd.Series
) -> Pipeline:
"""Addestra la pipeline sul training set."""
pipeline.fit(X_train, y_train)
return pipeline
def evaluate_model(
pipeline: Pipeline,
X_test: pd.Series,
y_test: pd.Series
) -> Dict[str, float]:
"""Valuta il modello e restituisce tutte le metriche."""
y_pred = pipeline.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred, average="weighted"),
"precision": precision_score(y_test, y_pred, average="weighted"),
"recall": recall_score(y_test, y_pred, average="weighted"),
}
# AUC-ROC solo per classificazione binaria
if len(set(y_test)) == 2:
y_proba = pipeline.predict_proba(X_test)[:, 1]
metrics["auc_roc"] = roc_auc_score(y_test, y_proba)
return metrics
Hodnotící skript
"""Script di valutazione del modello con confronto soglie."""
import argparse
import json
import yaml
import joblib
import pandas as pd
from pathlib import Path
from src.data.preprocessing import clean_text
from src.models.trainer import evaluate_model
def parse_args():
parser = argparse.ArgumentParser(description="Evaluate trained model")
parser.add_argument("--model-path", required=True, help="Path al modello .pkl")
parser.add_argument("--test-data", required=True, help="Path ai dati di test")
parser.add_argument("--thresholds", required=True, help="Path al file soglie YAML")
parser.add_argument("--output", required=True, help="Path per il report JSON")
return parser.parse_args()
def main():
args = parse_args()
# 1. Carica modello e dati
pipeline = joblib.load(args.model_path)
df = pd.read_csv(args.test_data)
df["clean_text"] = df["review_text"].apply(clean_text)
X_test = df["clean_text"]
y_test = df["sentiment"]
# 2. Valuta
metrics = evaluate_model(pipeline, X_test, y_test)
# 3. Confronta con soglie
with open(args.thresholds) as f:
thresholds = yaml.safe_load(f)["thresholds"]
all_pass = True
results = {}
for metric_name, threshold_value in thresholds.items():
actual = metrics.get(metric_name, 0.0)
passed = actual >= threshold_value
if not passed:
all_pass = False
results[metric_name] = {
"value": actual,
"threshold": threshold_value,
"pass": passed
}
# 4. Genera report
report = {
**metrics,
"thresholds": thresholds,
"details": results,
"pass": all_pass
}
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(report, f, indent=2)
# 5. Stampa risultati
print("\n=== Evaluation Report ===")
for name, detail in results.items():
status = "PASS" if detail["pass"] else "FAIL"
print(f" {name}: {detail['value']:.4f} (threshold: {detail['threshold']}) [{status}]")
print(f"\nOverall: {'PASS' if all_pass else 'FAIL'}")
# Exit code non-zero se le metriche non passano
if not all_pass:
print("\nWARNING: Le metriche non raggiungono le soglie minime!")
# Non usiamo exit(1) perchè il workflow legge l'output
if __name__ == "__main__":
main()
Konfigurace školení a prahů
# Configurazione pipeline di training
data:
train_path: "data/raw/reviews.csv"
test_size: 0.2
random_state: 42
model:
algorithm: "logistic_regression"
max_iter: 1000
C: 1.0
random_state: 42
tfidf:
max_features: 15000
ngram_range: [1, 2]
min_df: 3
max_df: 0.9
mlflow:
experiment_name: "sentiment-classifier"
registered_model_name: "sentiment-classifier"
# Soglie minime per approvare il deployment
thresholds:
accuracy: 0.85
f1_score: 0.83
precision: 0.80
recall: 0.80
auc_roc: 0.90
# Confronto con modello in produzione
comparison:
# Il nuovo modello deve migliorare almeno dello 0.5%
min_improvement: 0.005
# Metriche su cui e richiesto il miglioramento
compare_metrics:
- f1_score
- auc_roc
Požadavky na soubor
# Core ML
scikit-learn==1.4.0
pandas==2.2.0
numpy==1.26.3
# NLP preprocessing
nltk==3.8.1
# Experiment tracking
mlflow==2.10.0
# Model serving
fastapi==0.109.0
uvicorn==0.27.0
pydantic==2.5.3
# Data validation
pandera==0.18.0
great-expectations==0.18.8
# Data versioning
dvc[s3]==3.42.0
# Configuration
pyyaml==6.0.1
python-dotenv==1.0.1
# Serialization
joblib==1.3.2
# Testing
pytest==7.4.4
httpx==0.26.0
Validace dat v kanálu
Před trénováním modelu se musíme ujistit, že data jsou platná. Model trénovaný na poškozených datech produkuje nepředvídatelné výsledky. Datum ověření a první brána našeho potrubí: pokud data selžou v kontrolách, školení neopouští to.
"""Validazione della qualità dei dati con Pandera."""
import argparse
import sys
import yaml
import pandas as pd
import pandera as pa
from pandera import Column, Check, DataFrameSchema
def build_schema(schema_config: dict) -> DataFrameSchema:
"""Costruisce uno schema Pandera dalla configurazione YAML."""
columns = {}
for col_name, col_spec in schema_config["columns"].items():
checks = []
if "min_length" in col_spec:
checks.append(Check.str_length(min_value=col_spec["min_length"]))
if "max_length" in col_spec:
checks.append(Check.str_length(max_value=col_spec["max_length"]))
if "allowed_values" in col_spec:
checks.append(Check.isin(col_spec["allowed_values"]))
if "min_value" in col_spec:
checks.append(Check.greater_than_or_equal_to(col_spec["min_value"]))
if "max_value" in col_spec:
checks.append(Check.less_than_or_equal_to(col_spec["max_value"]))
columns[col_name] = Column(
dtype=col_spec.get("dtype", "object"),
nullable=col_spec.get("nullable", False),
checks=checks if checks else None
)
return DataFrameSchema(
columns=columns,
coerce=True,
strict=schema_config.get("strict", False)
)
def validate_data(data_path: str, schema_path: str) -> bool:
"""Valida il dataset contro lo schema definito."""
# Carica schema
with open(schema_path) as f:
schema_config = yaml.safe_load(f)
schema = build_schema(schema_config)
# Carica dati
df = pd.read_csv(data_path)
# Controlla dimensione minima
min_rows = schema_config.get("min_rows", 100)
if len(df) < min_rows:
print(f"FAIL: Dataset ha {len(df)} righe, minimo richiesto: {min_rows}")
return False
# Controlla duplicati
max_duplicates_pct = schema_config.get("max_duplicates_pct", 0.05)
duplicates_pct = df.duplicated().mean()
if duplicates_pct > max_duplicates_pct:
print(f"FAIL: {duplicates_pct:.1%} duplicati (max: {max_duplicates_pct:.1%})")
return False
# Valida schema
try:
schema.validate(df, lazy=True)
print(f"PASS: Dataset valido ({len(df)} righe, {len(df.columns)} colonne)")
return True
except pa.errors.SchemaErrors as e:
print(f"FAIL: Schema validation errors:")
print(e.failure_cases.head(20))
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", required=True)
parser.add_argument("--schema-path", required=True)
args = parser.parse_args()
valid = validate_data(args.data_path, args.schema_path)
sys.exit(0 if valid else 1)
# Schema per il dataset di recensioni
min_rows: 1000
max_duplicates_pct: 0.05
strict: false
columns:
review_text:
dtype: "object"
nullable: false
min_length: 10
max_length: 5000
sentiment:
dtype: "int64"
nullable: false
allowed_values: [0, 1]
rating:
dtype: "float64"
nullable: true
min_value: 1.0
max_value: 5.0
review_date:
dtype: "object"
nullable: true
Verze dat s DVC v potrubí
Data jsou pro Git příliš velká, ale je třeba je verzovat a synchronizovat
v potrubí CI/CD. DVC (Data Version Control) řeší tento problém:
ukládat data do vzdáleného úložiště (S3, GCS, Azure Blob) a sledovat pouze v Gitu
metadata (hash, velikost). Akce GitHub lze použít dvc pull pro
stáhnout přesně verzi dat souvisejících s aktuálním odevzdáním.
stages:
prepare:
cmd: python -m src.data.preprocessing --config config/training.yaml
deps:
- src/data/preprocessing.py
- config/training.yaml
- data/raw/reviews.csv
outs:
- data/processed/train.csv
- data/processed/test.csv
train:
cmd: python train.py --config config/training.yaml
deps:
- train.py
- src/models/trainer.py
- data/processed/train.csv
- config/training.yaml
outs:
- models/model.pkl
metrics:
- metrics/train_metrics.json:
cache: false
evaluate:
cmd: >-
python evaluate.py
--model-path models/model.pkl
--test-data data/processed/test.csv
--thresholds config/thresholds.yaml
--output metrics/eval_metrics.json
deps:
- evaluate.py
- models/model.pkl
- data/processed/test.csv
- config/thresholds.yaml
metrics:
- metrics/eval_metrics.json:
cache: false
# Inizializza DVC
dvc init
# Configura storage remoto (S3 in questo esempio)
dvc remote add -d myremote s3://my-ml-data-bucket/sentiment-classifier
dvc remote modify myremote region eu-west-1
# Traccia il dataset
dvc add data/raw/reviews.csv
# Committa i file DVC in Git
git add data/raw/reviews.csv.dvc data/raw/.gitignore dvc.yaml dvc.lock
git commit -m "feat: add DVC tracking for training data"
# Push dei dati su S3
dvc push
Akce DVC + GitHub: Jak to funguje
- Git sleduje soubory
.dvc(metadata: SHA256 hash, velikost) - Skutečná data jsou umístěna na S3 (nebo GCS, Azure Blob, Google Drive)
- Spustí se pracovní postup akcí GitHub
dvc pullke stažení dat - Přihlašovací údaje S3 jsou předávány prostřednictvím tajemství GitHub
- Každé potvrzení Git odpovídá přesné verzi dat
Registr modelů s MLflow
Registr modelů je komponenta, která spravuje životní cyklus modelů po školení. Každý trénovaný model je registrován s názvem, verzí a a stav (Staging, Výroba, Archivováno). Kanál CI/CD spolupracuje s registrem propagovat modely, které překračují limity ověření.
"""Script per la promozione del modello nel registry MLflow."""
import mlflow
from mlflow.tracking import MlflowClient
def promote_model(
model_name: str,
run_id: str,
target_stage: str = "Production"
) -> None:
"""Promuove un modello a Production nel registry."""
client = MlflowClient()
# Cerca la versione del modello associata al run
model_versions = client.search_model_versions(
f"name='{model_name}'"
)
target_version = None
for mv in model_versions:
if mv.run_id == run_id:
target_version = mv.version
break
if target_version is None:
raise ValueError(
f"Nessun modello trovato per run_id={run_id}"
)
# Archivia il modello attualmente in Production
for mv in model_versions:
if mv.current_stage == "Production":
client.transition_model_version_stage(
name=model_name,
version=mv.version,
stage="Archived",
archive_existing_versions=False
)
print(f"Archiviato modello v{mv.version} (precedente Production)")
# Promuovi il nuovo modello
client.transition_model_version_stage(
name=model_name,
version=target_version,
stage=target_stage
)
print(f"Promosso modello v{target_version} a {target_stage}")
def load_production_model(model_name: str):
"""Carica il modello attualmente in Production."""
model_uri = f"models:/{model_name}/Production"
return mlflow.sklearn.load_model(model_uri)
Příklad podpisu modelu a vstupu
Podpis modelu dokumentuje vzor vstupů a výstupů modelu. To slouží jako dokumentace a automatické ověření: pokud někdo zkuste předat vstup s jiným schématem, MLflow vyvolá jasnou chybu.
import mlflow
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec
# Definisci la signature esplicita
input_schema = Schema([
ColSpec("string", "text")
])
output_schema = Schema([
ColSpec("long", "prediction")
])
signature = ModelSignature(
inputs=input_schema,
outputs=output_schema
)
# Esempio di input per documentazione
input_example = {
"text": "This product is excellent, highly recommended!"
}
# Registra il modello con signature e esempio
mlflow.sklearn.log_model(
sk_model=trained_pipeline,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name="sentiment-classifier"
)
Testování v ML Pipeline
Testování ML je složitější než tradiční testování softwaru. Nestačí kontrolovat že kód "funguje": musíte otestovat kvalitu dat, správnost předzpracování, stabilita školení a chování rozhraní API. The CI/CD potrubí provádí tři úrovně testování.
Unit Test pro předběžné zpracování
"""Unit test per il modulo di preprocessing."""
import pytest
import pandas as pd
from src.data.preprocessing import clean_text, load_and_preprocess_data
class TestCleanText:
"""Test per la funzione clean_text."""
def test_removes_html_tags(self):
assert clean_text("<p>Hello</p>") == "hello"
def test_removes_urls(self):
assert clean_text("Visit http://example.com for info") == "visit for info"
def test_removes_special_characters(self):
assert clean_text("Hello!!! World???") == "hello world"
def test_normalizes_whitespace(self):
assert clean_text("Hello World") == "hello world"
def test_lowercases(self):
assert clean_text("HELLO WORLD") == "hello world"
def test_empty_string(self):
assert clean_text("") == ""
def test_none_input(self):
assert clean_text(None) == ""
def test_numeric_preserved(self):
assert clean_text("Rating 5 out of 10") == "rating 5 out of 10"
class TestLoadAndPreprocess:
"""Test per il caricamento e preprocessing dei dati."""
def test_missing_columns_raises(self, tmp_path):
"""Verifica che colonne mancanti generino un errore."""
df = pd.DataFrame({"wrong_col": ["text"]})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
with pytest.raises(ValueError, match="Colonne mancanti"):
load_and_preprocess_data(str(csv_path))
def test_drops_na_rows(self, tmp_path):
"""Verifica che le righe con NA vengano rimosse."""
df = pd.DataFrame({
"review_text": ["Good product", None, "Bad product"],
"sentiment": [1, 0, 0]
})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
X, y = load_and_preprocess_data(str(csv_path))
assert len(X) == 2
def test_output_types(self, tmp_path):
"""Verifica i tipi di output."""
df = pd.DataFrame({
"review_text": ["Great product", "Terrible service"],
"sentiment": [1, 0]
})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
X, y = load_and_preprocess_data(str(csv_path))
assert isinstance(X, pd.Series)
assert isinstance(y, pd.Series)
Integrační test pro potrubí
"""Integration test per il training e la valutazione del modello."""
import pytest
import pandas as pd
from src.models.trainer import create_pipeline, train_model, evaluate_model
@pytest.fixture
def sample_data():
"""Crea un dataset di test sintetico."""
texts = [
"amazing product love it", "terrible waste of money",
"great quality recommended", "horrible experience never again",
"excellent value for price", "poor quality disappointed",
"best purchase ever made", "worst product i bought",
"fantastic results happy", "awful terrible regret buying",
] * 10 # Ripetuto per avere abbastanza dati
sentiments = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0] * 10
return pd.Series(texts), pd.Series(sentiments)
class TestPipeline:
"""Test per la pipeline di training."""
def test_create_pipeline_logistic(self):
"""Verifica creazione pipeline con LogisticRegression."""
config = {"algorithm": "logistic_regression", "max_iter": 100}
pipeline = create_pipeline(config)
assert len(pipeline.steps) == 2
def test_create_pipeline_invalid_algorithm(self):
"""Verifica errore con algoritmo non supportato."""
with pytest.raises(ValueError, match="non supportato"):
create_pipeline({"algorithm": "invalid_algo"})
def test_train_and_evaluate(self, sample_data):
"""Test end-to-end: training + evaluation."""
X, y = sample_data
config = {
"algorithm": "logistic_regression",
"max_iter": 200,
"random_state": 42
}
pipeline = create_pipeline(config)
trained = train_model(pipeline, X[:80], y[:80])
metrics = evaluate_model(trained, X[80:], y[80:])
assert "accuracy" in metrics
assert "f1_score" in metrics
assert 0.0 <= metrics["accuracy"] <= 1.0
assert 0.0 <= metrics["f1_score"] <= 1.0
def test_model_deterministic(self, sample_data):
"""Verifica che il training sia deterministico con seed fisso."""
X, y = sample_data
config = {
"algorithm": "logistic_regression",
"max_iter": 200,
"random_state": 42
}
p1 = train_model(create_pipeline(config), X[:80], y[:80])
p2 = train_model(create_pipeline(config), X[:80], y[:80])
m1 = evaluate_model(p1, X[80:], y[80:])
m2 = evaluate_model(p2, X[80:], y[80:])
assert m1["accuracy"] == m2["accuracy"]
Kouřový test pro podávání
"""Smoke test per l'API di serving FastAPI."""
import pytest
from httpx import AsyncClient, ASGITransport
from src.serving.app import app
@pytest.fixture
def client():
"""Client HTTP per testare l'API."""
transport = ASGITransport(app=app)
return AsyncClient(transport=transport, base_url="http://test")
@pytest.mark.asyncio
async def test_health_endpoint(client):
"""Verifica che l'endpoint /health risponda 200."""
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
@pytest.mark.asyncio
async def test_predict_positive(client):
"""Verifica predizione per testo positivo."""
response = await client.post(
"/predict",
json={"text": "This product is amazing, I love it!"}
)
assert response.status_code == 200
data = response.json()
assert "prediction" in data
assert "confidence" in data
assert data["confidence"] > 0.0
@pytest.mark.asyncio
async def test_predict_empty_text(client):
"""Verifica errore con testo vuoto."""
response = await client.post(
"/predict",
json={"text": ""}
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_predict_batch(client):
"""Verifica predizione batch."""
response = await client.post(
"/predict/batch",
json={"texts": [
"Great product",
"Terrible experience"
]}
)
assert response.status_code == 200
data = response.json()
assert len(data["predictions"]) == 2
# Aggiungere questo job prima del training nel workflow
tests:
name: Run Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest-asyncio pytest-cov
- name: Run unit tests
run: pytest tests/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v4
with:
file: coverage.xml
Poskytování API s FastAPI
Trénovaný model musí být přístupný přes REST API. FastAPI a výběr ideální pro poskytování ML v Pythonu: a rychlé, má automatickou validaci vstupu přes Pydantic a automaticky generuje dokumentaci OpenAPI.
"""API di serving per il classificatore di sentiment."""
import os
import time
import joblib
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from src.serving.schemas import (
PredictionRequest, PredictionResponse,
BatchPredictionRequest, BatchPredictionResponse,
HealthResponse
)
from src.data.preprocessing import clean_text
# Variabili globali per il modello
model_pipeline = None
model_version = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Carica il modello all'avvio dell'applicazione."""
global model_pipeline, model_version
model_path = os.getenv("MODEL_PATH", "models/model.pkl")
if not Path(model_path).exists():
raise RuntimeError(f"Modello non trovato: {model_path}")
model_pipeline = joblib.load(model_path)
model_version = os.getenv("MODEL_VERSION", "unknown")
print(f"Modello caricato: v{model_version}")
yield
model_pipeline = None
app = FastAPI(
title="Sentiment Classifier API",
version="1.0.0",
lifespan=lifespan
)
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check dell'API."""
return HealthResponse(
status="healthy" if model_pipeline is not None else "unhealthy",
model_version=model_version or "not loaded"
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Predizione singola."""
if model_pipeline is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
cleaned = clean_text(request.text)
if not cleaned:
raise HTTPException(status_code=422, detail="Text is empty after cleaning")
prediction = model_pipeline.predict([cleaned])[0]
probabilities = model_pipeline.predict_proba([cleaned])[0]
confidence = float(max(probabilities))
latency_ms = (time.time() - start) * 1000
return PredictionResponse(
prediction=int(prediction),
confidence=confidence,
label="positive" if prediction == 1 else "negative",
latency_ms=round(latency_ms, 2)
)
@app.post("/predict/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
"""Predizione batch su più testi."""
if model_pipeline is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
cleaned_texts = [clean_text(t) for t in request.texts]
predictions = model_pipeline.predict(cleaned_texts)
probabilities = model_pipeline.predict_proba(cleaned_texts)
latency_ms = (time.time() - start) * 1000
results = []
for i, text in enumerate(request.texts):
results.append(PredictionResponse(
prediction=int(predictions[i]),
confidence=float(max(probabilities[i])),
label="positive" if predictions[i] == 1 else "negative",
latency_ms=0
))
return BatchPredictionResponse(
predictions=results,
total_latency_ms=round(latency_ms, 2)
)
"""Pydantic schemas per l'API di serving."""
from pydantic import BaseModel, Field
from typing import List
class PredictionRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=5000,
description="Testo della recensione")
class PredictionResponse(BaseModel):
prediction: int = Field(..., description="0=negativo, 1=positivo")
confidence: float = Field(..., ge=0.0, le=1.0,
description="Confidenza della predizione")
label: str = Field(..., description="Label leggibile")
latency_ms: float = Field(..., description="Latenza in millisecondi")
class BatchPredictionRequest(BaseModel):
texts: List[str] = Field(..., min_length=1, max_length=100,
description="Lista di testi")
class BatchPredictionResponse(BaseModel):
predictions: List[PredictionResponse]
total_latency_ms: float
class HealthResponse(BaseModel):
status: str
model_version: str
Monitorování po nasazení
Nasazení není koncem procesu, ale začátkem nejkritičtější fáze: sledování ve výrobě. ML model může tiše degradovat, když i skutečná data se liší od tréninkových dat. Potrubí musí zahrnovat zdravotní kontroly průběžné, prediktivní protokolování a automatické spouštěče přeškolování.
"""Monitoring post-deployment per il modello ML."""
import time
import logging
from datetime import datetime, timedelta
from collections import deque
from typing import Dict, Optional
from dataclasses import dataclass, field
@dataclass
class PredictionLog:
"""Log di una singola predizione."""
timestamp: datetime
input_text: str
prediction: int
confidence: float
latency_ms: float
class ModelMonitor:
"""Monitora le performance del modello in produzione."""
def __init__(
self,
window_size: int = 1000,
min_confidence_threshold: float = 0.6,
max_latency_ms: float = 500.0,
drift_check_interval: int = 100
):
self.window_size = window_size
self.min_confidence = min_confidence_threshold
self.max_latency = max_latency_ms
self.drift_check_interval = drift_check_interval
self.predictions: deque = deque(maxlen=window_size)
self.alert_callbacks = []
self.prediction_count = 0
self.logger = logging.getLogger("model_monitor")
def log_prediction(self, log: PredictionLog) -> None:
"""Registra una predizione e verifica le metriche."""
self.predictions.append(log)
self.prediction_count += 1
# Check latenza
if log.latency_ms > self.max_latency:
self._alert(
"HIGH_LATENCY",
f"Latenza {log.latency_ms:.0f}ms supera soglia {self.max_latency}ms"
)
# Check confidenza bassa
if log.confidence < self.min_confidence:
self._alert(
"LOW_CONFIDENCE",
f"Confidenza {log.confidence:.2f} sotto soglia {self.min_confidence}"
)
# Check periodico per drift
if self.prediction_count % self.drift_check_interval == 0:
self._check_distribution_drift()
def get_metrics(self) -> Dict:
"""Restituisce le metriche correnti della finestra."""
if not self.predictions:
return {"status": "no_data"}
recent = list(self.predictions)
confidences = [p.confidence for p in recent]
latencies = [p.latency_ms for p in recent]
predictions = [p.prediction for p in recent]
positive_rate = sum(1 for p in predictions if p == 1) / len(predictions)
return {
"total_predictions": self.prediction_count,
"window_size": len(recent),
"avg_confidence": sum(confidences) / len(confidences),
"min_confidence": min(confidences),
"avg_latency_ms": sum(latencies) / len(latencies),
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"positive_rate": positive_rate,
"low_confidence_pct": sum(
1 for c in confidences if c < self.min_confidence
) / len(confidences),
}
def _check_distribution_drift(self) -> None:
"""Verifica se la distribuzione delle predizioni e cambiata."""
if len(self.predictions) < self.window_size:
return
recent = list(self.predictions)
half = len(recent) // 2
first_half = [p.prediction for p in recent[:half]]
second_half = [p.prediction for p in recent[half:]]
rate_first = sum(first_half) / len(first_half)
rate_second = sum(second_half) / len(second_half)
# Se la distribuzione cambia più del 15%, segnala drift
if abs(rate_first - rate_second) > 0.15:
self._alert(
"DISTRIBUTION_DRIFT",
f"Positive rate cambiato: {rate_first:.2f} -> {rate_second:.2f}"
)
def _alert(self, alert_type: str, message: str) -> None:
"""Invia un alert."""
self.logger.warning(f"[{alert_type}] {message}")
for callback in self.alert_callbacks:
callback(alert_type, message)
Klíčové metriky ke sledování
- Latence (p50, p95, p99): doba odezvy API
- propustnost: počet předpovědí za sekundu
- Distribuce předpovědí: změny v poměru kladný/negativní
- Střední důvěra: pokles znamená, že model je „nejistý“
- Míra chyb: Chybovost HTTP 5xx
- Posun data: rozdíl mezi produkčními daty a tréninkovými daty
Náklady a optimalizace
Akce GitHub nabízí 2 000 volných minut měsíčně pro veřejná úložiště a 500 minut pro soukromá úložiště (bezplatný plán). Trénink ML může tyto minuty rychle spotřebovat. Zde je návod, jak optimalizovat.
Strategie ukládání do mezipaměti
# Cache delle dipendenze Python
- name: Cache pip packages
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
# Cache del dataset DVC (evita download ripetuti)
- name: Cache DVC data
uses: actions/cache@v4
with:
path: |
data/
.dvc/cache/
key: dvc-${{ hashFiles('data/*.dvc', 'dvc.lock') }}
restore-keys: |
dvc-
# Cache della Docker layer
- name: Build with cache
uses: docker/build-push-action@v5
with:
context: .
cache-from: type=gha
cache-to: type=gha,mode=max
Samoobslužní běžci s GPU
Pro trénink GPU nestačí běžci hostovaní na GitHubu (nemají GPU). Řešení a self-hosted runner na stroji s GPU. To také eliminuje náklady na minutu od GitHub Actions.
# 1. Sulla macchina con GPU, scarica il runner
mkdir actions-runner && cd actions-runner
curl -o actions-runner.tar.gz -L \
https://github.com/actions/runner/releases/download/v2.311.0/actions-runner-linux-x64-2.311.0.tar.gz
tar xzf actions-runner.tar.gz
# 2. Configura il runner
./config.sh --url https://github.com/YOUR_ORG/YOUR_REPO \
--token YOUR_TOKEN \
--labels gpu,cuda12,ml-training
# 3. Installa come servizio
sudo ./svc.sh install
sudo ./svc.sh start
training:
name: Train Model (GPU)
runs-on: [self-hosted, gpu, cuda12]
# Il job viene eseguito sulla macchina con GPU
steps:
- uses: actions/checkout@v4
- name: Train with GPU
run: |
nvidia-smi # Verifica GPU disponibile
python train.py --config config/training-gpu.yaml --device cuda
Odhad nákladů pro scénář
| Scénář | Minuty/měsíc | Náklady na akce GitHub | Celkové náklady |
|---|---|---|---|
| Prototyp (manuální trénink, scikit-learn) | ~200 | Zdarma (bezplatný plán) | ~0 EUR/měsíc |
| PMI (týdenní školení, střední model) | ~800 | ~12 EUR/měsíc | ~50 EUR/měsíc (s S3) |
| Rozšíření (denní školení, hluboké učení) | ~3000 | ~48 EUR/měsíc | ~200 EUR/měsíc (s cloudovým GPU) |
| Enterprise (multimodelové, průběžné školení) | ~10 000+ | Běžec s vlastním hostitelem | ~500+ EUR/měsíc |
Porovnání nástrojů CI/CD pro ML
Akce GitHub nejsou jedinou možností. Každý nástroj má v závislosti na něm specifické výhody kontextu. Zde je praktické srovnání, které vám pomůže s výběrem.
| Charakteristický | Akce GitHubu | GitLab CI | Jenkins | Pracovní postupy Argo |
|---|---|---|---|---|
| Nastavení | Nula (integrovaná) | Nula (integrovaná) | Dedikovaný server | Klastry Kubernetes |
| Podpora GPU | Běžec s vlastním hostitelem | Běžec s vlastním hostitelem | pluginy NVIDIA | Nativní (GPU K8s) |
| Cena (malý tým) | Volný/nízký | Volný/nízký | Cena serveru | Cena clusteru K8s |
| Rovnoběžnost | Dobrý (matice) | Dobrý | Optimální | Vynikající (DAG) |
| Pipeline jako kód | YAML | YAML | Groovy/YAML | YAML/Python SDK |
| ML ekosystém | Obrovské tržiště | Dobrý | Pluginy | Cloud-nativní |
| Křivka učení | Nízký | Nízký | Průměrný | Vysoký |
| Ideální pro | Malé/střední týmy, repo GitHub | Tým na GitLabu, samostatně spravovaný | Enterprise, on-premise | Tým K8, složité potrubí |
Kterou si vybrat?
- Začátek a prototyp: Akce GitHub – nulové nastavení, nativní integrace, zdarma pro veřejná úložiště
- Tým na GitLabu: GitLab CI - nativní integrace s GitLab, vynikající kontejner registru
- Podnik v místě: Jenkins – maximální flexibilita, vyspělý ekosystém pluginů
- Cloudové nativní týmy s K8: Argo Workflows - DAG pipeline, nativní škálování, skvělé pro komplexní ML
Kompletní nastavení pod 5 000 EUR/rok
Pro SMB, kteří chtějí implementovat kompletní ML CI/CD potrubí, je to možné zůstat pod 5 000 EUR/rok pomocí open-source nástrojů a cloudových služeb s bezplatnými nebo nízkonákladovými úrovněmi. Zde je doporučený zásobník.
| Komponent | Nástroje | Náklady/rok | Poznámky |
|---|---|---|---|
| Úložiště + CI/CD | GitHub (tým) | ~400 EUR | 3 000 min/měsíc Včetně akcí |
| Ukládání dat | AWS S3 | ~120 EUR | ~500 GB datový soubor, včetně přenosu |
| Sledování experimentu | MLflow (samohoštěný) | ~0 EUR | Open source, nasazený na cloudovém virtuálním počítači |
| Registr modelů | Registr modelu MLflow | ~0 EUR | Zahrnuto v MLflow |
| Registr kontejnerů | Registr kontejnerů GitHub | ~0 EUR | Zahrnuje GitHub |
| Model hostování | Cloudový virtuální počítač (e2-medium) | ~500 EUR | Pro obsluhu serveru FastAPI + MLflow |
| Verze dat | DVC | ~0 EUR | Open-source, úložiště již počítáno výše |
| Sledování | Prometheus + Grafana | ~0 EUR | Open-source, na stejném VM |
| Validace dat | Pandera / Velká očekávání | ~0 EUR | Open source |
| Školení GPU (občasné) | Cloudová bodová instance GPU | ~600 EUR | ~50 hodin/měsíc T4 spot |
Odhadovaná celková částka: ~1 620 EUR/rok - rozpočet výrazně pod 5 000 EUR, s rezervou pro změnu měřítka. Největší náklady jsou školení GPU: pokud používáte modely klasika (scikit-learn, XGBoost) cena GPU jde na nulu.
Pozor na skryté náklady
Cena nástrojů je jen část. Nejvýznamnější náklady a čas týmu: Nastavení počátečního potrubí trvá zkušenému inženýrovi přibližně 2-4 týdny. Nepřetržitá údržba trvá přibližně 2-4 hodiny/týden. Spočítejte si také náklady na přenos dat mezi cloudovými službami (výstup), který může rychle růst s velkými datovými sadami.
Závěry a další kroky
V tomto článku jsme vytvořili kompletní CI/CD potrubí pro strojové učení, od vícefázového Dockerfile po pracovní postup GitHub Actions s ověřováním dat, školením, vyhodnocení a automatické nasazení. Klíčové pojmy k zapamatování jsou:
- CI/CD pro ML zvládá tři artefakty (kód, data, model) a zavádí průběžné školení
- Vícestupňový docker samostatné sestavení, školení a poskytování optimalizovaných obrázků
- Akce GitHubu zorganizovat celý kanál pomocí závislých a podmíněných úloh
- Validace dat a první brána: poškozená data vytvářejí nepoužitelné modely
- Registr modelů spravuje vydávání šablon a propagaci
- ML testování pokrývá tři úrovně: jednotka, integrace, kouřový test
- Sledování po nasazení a rozhodující pro detekci posunu a degradace
- Náklady jsou zvládnutelné: s open-source nástroji zůstanete pod 2 000 EUR/rok
Potrubí, které jsme postavili, odpovídá Úroveň 2 modelu vyspělosti Google MLOps: Kompletní automatizace školení a nasazení, s integrované ověřování a monitorování. Počínaje tímto základem v dalším článku ponoříme se hlouběji MLflow pro pokročilé sledování experimentů registr modelů a správa artefaktů.
Plán seriálu
- článek 1: MLOps: Od experimentu k produkci (dokončeno)
- Článek 2: ML kanál s CI/CD: GitHub Actions + Docker (tento článek)
- Článek 3: MLflow Deep Dive - Sledování experimentu a registr modelů
- Článek 4: DVC - verzování dat pro ML
- Článek 5: Škálovatelné poskytování modelů s FastAPI a Docker
- Článek 6: Kubernetes pro ML: Orchestrace a škálování
- Článek 7: Pokročilé monitorování: Data Drift a evidentně AI
- Článek 8: A/B testování pro ML modely ve výrobě
- Článek 9: Správa, dodržování a odpovědné ML
- Článek 10: Případová studie: End-to-End MLOps Pipeline







