02 - ML Pipeline cu CI/CD: GitHub Actions + Docker
În primul articol al seriei am văzut de ce 85% dintre proiectele ML nu ajung niciodată la producție și cum MLOps rezolvă această problemă. Am transformat un caiet monolitic într-o conductă modular și configurabil. Acum este momentul să faceți următorul pas: automatiza întregul conductă cu CI/CD, astfel încât orice modificare a codului, datelor sau configurației declanșează automat instruirea, validarea și implementarea modelului.
În acest articol, vom construi o conductă completă CI/CD pentru utilizarea învățării automate Acțiuni GitHub în calitate de orchestrator şi Docher ca timp de rulare a executarea. Nu ne vom limita la teorie - vom crea un proiect de lucru cu un clasificator de sentiment, complet cu Dockerfile în mai multe etape, flux de lucru YAML, validare a datelor, registru de model și desfășurare automată.
Ce vei învăța
- De ce CI/CD pentru ML este diferit de software-ul tradițional CI/CD
- Cum să construiți arhitectura unei conducte de ML end-to-end
- Cum să creați fișiere Dockerfile în mai multe etape optimizate pentru instruire și servire
- Cum să scrieți fluxuri de lucru complete GitHub Actions pentru ML
- Cum se integrează validarea datelor, registrul modelului și implementarea automată
- Cum să gestionați versiunea datelor cu DVC în curs
- Cum se implementează testarea specifică ML (unitate, integrare, fum)
- Cum se monitorizează modelul după implementare
- Cum să optimizați costurile cu memorarea în cache și alergătorii auto-găzduiți
- Cum să alegi instrumentul CI/CD potrivit pentru echipa ta
de ce CI/CD pentru ML și Different
Dacă veniți din lumea dezvoltării tradiționale de software, ați putea crede că doar aplicați aceleași practici CI/CD pentru proiectele ML. În realitate, învățarea automată introduce complexitate cele unice care necesită o abordare specifică. Diferența fundamentală este că în software În mod tradițional, CI/CD gestionează un singur artefact (codul), în timp ce în ML trebuie să gestioneze mai multe trei deodată: cod, date și model.
Cele trei artefacte ale ML
În software-ul tradițional, dacă codul nu se modifică, rezultatul nu se schimbă. În ML, chiar și cu cod identic, o modificare a datelor produce un model diferit. Aceasta înseamnă că conducta CI/CD trebuie să urmărească și să valideze trei dimensiuni independente.
| Dimensiune | CI/CD tradițional | CI/CD pentru ML |
|---|---|---|
| Cod | Git push declanșează construirea + testarea | Git push declanșează antrenamentul + evaluarea |
| Date | Nu se aplică | Datele noi declanșează recalificarea |
| Model | Nu se aplică | Noul model necesită validare + promovare |
| Configurare | Caracteristici steaguri, vars env | Hiperparametri, seturi de caracteristici, praguri de metrice |
| Mediu | OS + biblioteci | OS + biblioteci + drivere GPU + versiunea CUDA |
| Validare | Teste de promovare/recuperare | Valori peste/sub prag + comparație cu modelul în producție |
| Desfăşurare | Implementați sau derulați înapoi | Desfășurare treptată + testare A/B + derive de monitorizare |
Formare continuă: Conceptul cheie
CI/CD pentru ML introduce un concept care nu există în software-ul tradițional: cel Formare continuă (CT). Precum Integrarea Continuă și Continuă Implementare, CT asigură că modelul este reantrenat automat atunci când:
- Sosesc date noi: setul de date este actualizat cu noi observații
- Schimbați codul: se modifică preprocesarea sau algoritmul
- Ele degradează valorile: monitorizarea detectează deriva de date sau scăderea performanței
- Un cronometru expiră: recalificarea programată (de exemplu, săptămânală) este activată
Eroare comună: CI/CD fără CT
Multe echipe implementează CI/CD pentru codul ML, dar uită de Formarea continuă. Rezultatul este un model care este implementat o dată și apoi nu mai actualizat niciodată, degradându-se în tăcere în timp pe măsură ce datele din producție diferă din datele de antrenament. O conductă fără CT este ca o mașină fără întreținere: Funcționează până se rupe.
ML Pipeline Architecture
Înainte de a scrie cod, proiectăm arhitectura completă a conductei. Fiecare fază are intrari si iesiri specifice, iar defectarea unei faze le blocheaza pe cele ulterioare. Aceasta Abordarea „fail fast” asigură că numai modelele validate ajung la producție.
+------------------+ +------------------+ +------------------+
| DATA INGESTION |---->| PREPROCESSING |---->| TRAINING |
| | | | | |
| - Pull dati DVC | | - Pulizia | | - Train modello |
| - Validazione | | - Feature eng. | | - Log metriche |
| - Schema check | | - Split train/ | | - Log parametri |
| | | test/val | | - Salva artefatti|
+------------------+ +------------------+ +------------------+
| |
| (trigger: dati |
| nuovi/schedule) v
| +------------------+
| | EVALUATION |
| | |
| | - Metriche |
| | - Confronto con |
| | produzione |
| | - Gate: soglie |
| +------------------+
| |
| (se metriche > threshold)
| v
+------------------+ +------------------+ +------------------+
| MONITORING |<----| SMOKE TEST |<----| DEPLOYMENT |
| | | | | |
| - Health check | | - Test endpoint | | - Push registry |
| - Drift detect. | | - Predizione | | - Stage/Prod |
| - Alert | | di prova | | - Rollback ready |
| - Trigger retrain| | - Latenza check | | |
+------------------+ +------------------+ +------------------+
Fiecare bloc corespunde unui pas al fluxului de lucru GitHub Actions. Să vedem acum detaliază cum să implementezi fiecare fază, începând de la containerizarea cu Docker.
Docker pentru Machine Learning
Docker rezolvă una dintre cele mai frustrante probleme din ML: "funcționează la mașina mea". Prin containerizarea mediului de instruire și servire, ne asigurăm că codul produce aceleași rezultate oriunde rulează: pe laptopul data scientist, în CI/CD runner și în producție. Pentru ML, Docker necesită o atenție specială: imaginile tind să să fie foarte mare (biblioteci științifice + drivere GPU) și construcția poate fi lentă.
Imagini de bază pentru ML
Alegerea bazei de imagine este critică pentru dimensiune și compatibilitate. Iată opțiunile principalele și când să le folosești.
| Imagine de bază | Dimensiune | Utilizare | Când să-l aleg |
|---|---|---|---|
| python:3.11-slim | ~120 MB | Instruire/Servire CPU | Modele Scikit-learn, XGBoost, servire ușoară |
| python:3.11-bookworm | ~900 MB | Antrenament cu instrumente de construcție | Dependențe care necesită compilare C/C++ |
| nvidia/cuda:12.1-runtime | ~3,5 GB | Inferență GPU | Servirea modelelor de învățare profundă |
| nvidia/cuda:12.1-devel | ~5,2 GB | Antrenament GPU | Antrenament PyTorch/TensorFlow cu CUDA |
| pytorch/pytorch:2.1.0-cuda12.1 | ~6 GB | Antrenarea/Servirea PyTorch | Proiecte PyTorch care doresc să evite configurarea manuală CUDA |
Dockerfile în mai multe etape pentru instruire și servire
Modelul în mai multe etape este fundamental pentru ML. Folosind două etape separate, putem avea un mediu de compilare complet (cu compilatoare și instrumente de compilare) și o imagine finală simplificat care conține doar timpul de rulare necesar. Acest lucru reduce dimensiunea imaginii final până la 60%.
# ============================================
# Stage 1: Builder - installa dipendenze
# ============================================
FROM python:3.11-slim AS builder
WORKDIR /build
# Installa build tools necessari per compilare dipendenze native
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Copia e installa dipendenze in un virtual environment
COPY requirements.txt .
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# ============================================
# Stage 2: Training - esegue il training
# ============================================
FROM python:3.11-slim AS trainer
WORKDIR /app
# Copia virtual environment dallo stage builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copia codice sorgente
COPY src/ ./src/
COPY config/ ./config/
COPY train.py .
COPY evaluate.py .
# Entrypoint per training
ENTRYPOINT ["python", "train.py"]
# ============================================
# Stage 3: Serving - API di produzione
# ============================================
FROM python:3.11-slim AS serving
WORKDIR /app
# Utente non-root per sicurezza
RUN useradd --create-home appuser
# Copia virtual environment dallo stage builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copia solo il codice necessario per serving
COPY src/serving/ ./src/serving/
COPY src/preprocessing/ ./src/preprocessing/
# Healthcheck endpoint
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"
# Switch a utente non-root
USER appuser
# Porta di default
EXPOSE 8000
# Entrypoint per serving
ENTRYPOINT ["uvicorn", "src.serving.app:app", "--host", "0.0.0.0", "--port", "8000"]
De ce Multi-Stage pentru ML?
- Siguranţă: imaginea de servire nu conține compilatoare sau instrumente de compilare
- Dimensiune: etapa de servire este mult mai ușoară (~300 MB față de ~1,2 GB)
- Cache: dependențele se schimbă mai rar decât codul, profitând de cache-ul stratului
- Flexibilitate: poți construi doar etapa de antrenament sau doar etapa de servire
Optimizarea Layer Cache
Ordinea instrucțiunilor COPY în fișierul Docker este crucială pentru cache. Dependențe Python
se schimbă rar, codul sursă se schimbă des. Copiind mai întâi fișierul
requirements.txt și apoi codul, evităm să reinstalăm dependențe
cu fiecare schimbare de cod.
# Dati e modelli (gestiti da DVC, non da Docker)
data/
models/
*.pkl
*.h5
*.pt
# Ambiente di sviluppo
.venv/
__pycache__/
*.pyc
.pytest_cache/
.mypy_cache/
# Git e CI
.git/
.github/
.dvc/cache/
# IDE e editor
.vscode/
.idea/
*.swp
# Documentazione
docs/
*.md
LICENSE
Docker cu suport GPU
Pentru a antrena modele de învățare profundă, aveți nevoie de suport pentru GPU în container. Docker acceptă GPU-uri NVIDIA prin NVIDIA Container Toolkit. Configurarea necesită driver NVIDIA pe gazdă și setul de instrumente instalat.
# Base image con CUDA runtime
FROM nvidia/cuda:12.1.1-runtime-ubuntu22.04 AS gpu-trainer
WORKDIR /app
# Installa Python e dipendenze di sistema
RUN apt-get update && apt-get install -y --no-install-recommends \
python3.11 \
python3.11-venv \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Virtual environment
RUN python3.11 -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Dipendenze PyTorch con CUDA
COPY requirements-gpu.txt .
RUN pip install --no-cache-dir -r requirements-gpu.txt
COPY src/ ./src/
COPY train.py .
# Variabili ambiente per CUDA
ENV NVIDIA_VISIBLE_DEVICES=all
ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility
ENTRYPOINT ["python", "train.py"]
# Build dell'immagine GPU
docker build -f Dockerfile.gpu -t ml-trainer:gpu .
# Esecuzione con accesso GPU
docker run --gpus all \
-v $(pwd)/data:/app/data \
-v $(pwd)/models:/app/models \
ml-trainer:gpu \
--config config/training.yaml
Acțiuni GitHub pentru învățare automată
GitHub Actions este un serviciu CI/CD integrat în GitHub care rulează fluxuri de lucru automate ca răspuns la evenimente (push, pull request, program, expediere manuală). Pentru ML, oferă avantaje semnificative: integrare nativă cu depozitul Git, piață cu acțiuni predefinit, gestionarea secretelor pentru acreditări și până la 2.000 de minute gratuite/lună pentru depozitele publice.
Structura fluxului de lucru ML
Un flux de lucru GitHub Actions pentru ML are o structură specifică: mai multe joburi de potrivire la etapele pipeline, cu dependențe explicite între joburi și condițiile de execuție bazate pe valorile modelului.
name: ML Pipeline - Train, Evaluate, Deploy
on:
# Trigger su push al branch main (codice o config)
push:
branches: [main]
paths:
- 'src/**'
- 'config/**'
- 'requirements.txt'
- 'train.py'
- 'evaluate.py'
# Trigger schedulato per retraining periodico
schedule:
- cron: '0 6 * * 1' # Ogni lunedi alle 6:00 UTC
# Trigger manuale con parametri
workflow_dispatch:
inputs:
force_deploy:
description: 'Forza il deployment anche se le metriche non migliorano'
required: false
default: 'false'
type: choice
options:
- 'true'
- 'false'
training_config:
description: 'File di configurazione per il training'
required: false
default: 'config/training.yaml'
env:
PYTHON_VERSION: '3.11'
DOCKER_REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/ml-model
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
jobs:
# ============================================
# Job 1: Data Validation
# ============================================
data-validation:
name: Validate Data Quality
runs-on: ubuntu-latest
outputs:
data_valid: ${{ steps.validate.outputs.valid }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Setup DVC
uses: iterative/setup-dvc@v2
- name: Pull data from DVC
run: dvc pull
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Validate data quality
id: validate
run: |
python -m src.data.validate_data \
--data-path data/raw/reviews.csv \
--schema-path config/data_schema.yaml
echo "valid=true" >> $GITHUB_OUTPUT
# ============================================
# Job 2: Model Training
# ============================================
training:
name: Train Model
needs: data-validation
if: needs.data-validation.outputs.data_valid == 'true'
runs-on: ubuntu-latest
outputs:
model_version: ${{ steps.train.outputs.model_version }}
run_id: ${{ steps.train.outputs.run_id }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Setup DVC and pull data
run: |
pip install dvc[s3]
dvc pull
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Train model
id: train
run: |
python train.py \
--config ${{ github.event.inputs.training_config || 'config/training.yaml' }} \
--output-dir models/
echo "model_version=$(cat models/version.txt)" >> $GITHUB_OUTPUT
echo "run_id=$(cat models/run_id.txt)" >> $GITHUB_OUTPUT
env:
MLFLOW_TRACKING_URI: ${{ env.MLFLOW_TRACKING_URI }}
MLFLOW_TRACKING_USERNAME: ${{ secrets.MLFLOW_USERNAME }}
MLFLOW_TRACKING_PASSWORD: ${{ secrets.MLFLOW_PASSWORD }}
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: trained-model
path: models/
retention-days: 30
# ============================================
# Job 3: Model Evaluation
# ============================================
evaluation:
name: Evaluate Model
needs: training
runs-on: ubuntu-latest
outputs:
metrics_pass: ${{ steps.evaluate.outputs.metrics_pass }}
accuracy: ${{ steps.evaluate.outputs.accuracy }}
f1_score: ${{ steps.evaluate.outputs.f1_score }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: trained-model
path: models/
- name: Setup DVC and pull test data
run: |
pip install dvc[s3]
dvc pull data/test/
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Evaluate model
id: evaluate
run: |
python evaluate.py \
--model-path models/model.pkl \
--test-data data/test/reviews_test.csv \
--thresholds config/thresholds.yaml \
--output metrics/report.json
echo "accuracy=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"accuracy\"])')" >> $GITHUB_OUTPUT
echo "f1_score=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"f1_score\"])')" >> $GITHUB_OUTPUT
echo "metrics_pass=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"pass\"])')" >> $GITHUB_OUTPUT
- name: Post metrics as PR comment
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const metrics = JSON.parse(fs.readFileSync('metrics/report.json'));
const body = `## Model Evaluation Results
| Metric | Value | Threshold | Status |
|--------|-------|-----------|--------|
| Accuracy | ${metrics.accuracy.toFixed(4)} | ${metrics.thresholds.accuracy} | ${metrics.accuracy >= metrics.thresholds.accuracy ? 'PASS' : 'FAIL'} |
| F1 Score | ${metrics.f1_score.toFixed(4)} | ${metrics.thresholds.f1_score} | ${metrics.f1_score >= metrics.thresholds.f1_score ? 'PASS' : 'FAIL'} |
| AUC-ROC | ${metrics.auc_roc.toFixed(4)} | ${metrics.thresholds.auc_roc} | ${metrics.auc_roc >= metrics.thresholds.auc_roc ? 'PASS' : 'FAIL'} |`;
github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: body
});
- name: Upload evaluation report
uses: actions/upload-artifact@v4
with:
name: evaluation-report
path: metrics/
# ============================================
# Job 4: Build and Push Docker Image
# ============================================
build-image:
name: Build Docker Image
needs: [evaluation]
if: |
needs.evaluation.outputs.metrics_pass == 'true' ||
github.event.inputs.force_deploy == 'true'
runs-on: ubuntu-latest
outputs:
image_tag: ${{ steps.meta.outputs.tags }}
steps:
- uses: actions/checkout@v4
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: trained-model
path: models/
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=sha,prefix=
type=raw,value=latest
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.DOCKER_REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push serving image
uses: docker/build-push-action@v5
with:
context: .
target: serving
push: true
tags: ${{ steps.meta.outputs.tags }}
cache-from: type=gha
cache-to: type=gha,mode=max
# ============================================
# Job 5: Deploy to Staging + Smoke Test
# ============================================
deploy:
name: Deploy and Smoke Test
needs: [build-image, training]
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
echo "Deploying model version ${{ needs.training.outputs.model_version }}"
# Qui il comando di deploy reale (kubectl, docker-compose, etc.)
- name: Smoke test
run: |
# Attendi che il servizio sia pronto
for i in $(seq 1 30); do
if curl -sf http://staging:8000/health; then
echo "Service is ready"
break
fi
echo "Waiting for service... attempt $i"
sleep 5
done
# Test di predizione
RESPONSE=$(curl -sf -X POST http://staging:8000/predict \
-H "Content-Type: application/json" \
-d '{"text": "This product is amazing, I love it!"}')
echo "Prediction response: $RESPONSE"
# Verifica che la risposta sia valida
echo "$RESPONSE" | python -c "
import sys, json
data = json.load(sys.stdin)
assert 'prediction' in data, 'Missing prediction field'
assert 'confidence' in data, 'Missing confidence field'
assert data['confidence'] > 0.5, 'Low confidence'
print('Smoke test PASSED')
"
Secrete și securitate în conductă
Nu introduceți niciodată acreditările direct în fluxul de lucru YAML. Folosiți întotdeauna GitHub Secrets pentru cheile AWS, jetoanele MLflow, acreditările de registru Docker și orice altele informatii sensibile. Configurați secretele în Setări > Secrete și variabile > Acțiuni în depozitul GitHub.
Exemplu de proiect: Clasificator de sentimente
Să punem totul împreună cu un proiect concret. Vom construi un clasificator de sentimente pentru recenzii de produse, cu circuit complet CI/CD. Proiectul folosește scikit-learn pentru simplitate, dar arhitectura se aplică în mod identic modelelor PyTorch sau TensorFlow.
Structura proiectului
sentiment-classifier/
src/
data/
__init__.py
preprocessing.py # Pulizia e trasformazione testi
validate_data.py # Validazione schema e qualità
models/
__init__.py
trainer.py # Training del classificatore
serving/
__init__.py
app.py # FastAPI application
schemas.py # Pydantic schemas
monitoring/
__init__.py
health.py # Health checks
tests/
test_preprocessing.py # Unit test preprocessing
test_trainer.py # Unit test training
test_api.py # Integration test API
config/
training.yaml # Configurazione training
thresholds.yaml # Soglie metriche
data_schema.yaml # Schema validazione dati
train.py # Entrypoint training
evaluate.py # Entrypoint evaluation
Dockerfile # Multi-stage build
requirements.txt # Dipendenze Python
.github/
workflows/
ml-pipeline.yml # Pipeline CI/CD
.dvc/ # Configurazione DVC
dvc.yaml # Pipeline DVC
dvc.lock # Lock file DVC
Scripturi de antrenament
"""Script principale di training per il classificatore di sentiment."""
import argparse
import yaml
import mlflow
import mlflow.sklearn
from pathlib import Path
from datetime import datetime
from src.data.preprocessing import load_and_preprocess_data, split_dataset
from src.models.trainer import create_pipeline, train_model
def parse_args():
"""Parse degli argomenti da riga di comando."""
parser = argparse.ArgumentParser(
description="Train sentiment classifier"
)
parser.add_argument(
"--config",
type=str,
default="config/training.yaml",
help="Path al file di configurazione"
)
parser.add_argument(
"--output-dir",
type=str,
default="models/",
help="Directory per salvare il modello"
)
parser.add_argument(
"--experiment-name",
type=str,
default="sentiment-classifier",
help="Nome dell'esperimento MLflow"
)
return parser.parse_args()
def main():
"""Esegue la pipeline di training completa."""
args = parse_args()
# 1. Carica configurazione
with open(args.config) as f:
config = yaml.safe_load(f)
# 2. Setup MLflow
mlflow.set_experiment(args.experiment_name)
with mlflow.start_run(run_name=f"train-{datetime.now().strftime('%Y%m%d-%H%M%S')}") as run:
# 3. Log dei parametri
mlflow.log_params(config["model"])
mlflow.log_param("data_path", config["data"]["train_path"])
mlflow.log_param("test_size", config["data"]["test_size"])
# 4. Caricamento e preprocessing dati
print("[1/5] Caricamento e preprocessing dati...")
X, y = load_and_preprocess_data(config["data"]["train_path"])
# 5. Split dataset
print("[2/5] Split train/validation...")
X_train, X_val, y_train, y_val = split_dataset(
X, y,
test_size=config["data"]["test_size"],
random_state=config["data"]["random_state"]
)
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("val_size", len(X_val))
# 6. Crea pipeline di preprocessing + modello
print("[3/5] Creazione pipeline ML...")
pipeline = create_pipeline(config["model"])
# 7. Training
print("[4/5] Training in corso...")
trained_pipeline = train_model(pipeline, X_train, y_train)
# 8. Valutazione su validation set
print("[5/5] Valutazione...")
from src.models.trainer import evaluate_model
metrics = evaluate_model(trained_pipeline, X_val, y_val)
# 9. Log metriche in MLflow
mlflow.log_metrics(metrics)
# 10. Salva modello
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
import joblib
model_path = output_dir / "model.pkl"
joblib.dump(trained_pipeline, model_path)
# Log modello in MLflow con signature
from mlflow.models.signature import infer_signature
signature = infer_signature(X_train[:5], trained_pipeline.predict(X_train[:5]))
mlflow.sklearn.log_model(
trained_pipeline,
"model",
signature=signature,
registered_model_name="sentiment-classifier"
)
# Salva versione e run_id per il CI/CD
(output_dir / "version.txt").write_text(run.info.run_id[:8])
(output_dir / "run_id.txt").write_text(run.info.run_id)
print(f"\nTraining completato!")
print(f" Run ID: {run.info.run_id}")
for name, value in metrics.items():
print(f" {name}: {value:.4f}")
if __name__ == "__main__":
main()
Modul de preprocesare
"""Modulo per il preprocessing dei dati testuali."""
import re
import pandas as pd
from typing import Tuple
from sklearn.model_selection import train_test_split
def clean_text(text: str) -> str:
"""Pulisce un singolo testo rimuovendo HTML, caratteri speciali e spazi extra."""
if not isinstance(text, str):
return ""
# Rimuovi tag HTML
text = re.sub(r'<[^>]+>', '', text)
# Rimuovi URL
text = re.sub(r'http\S+|www\.\S+', '', text)
# Rimuovi caratteri speciali (mantieni lettere, numeri, spazi)
text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
# Normalizza spazi
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
def load_and_preprocess_data(data_path: str) -> Tuple[pd.Series, pd.Series]:
"""Carica e preprocess il dataset di recensioni."""
df = pd.read_csv(data_path)
# Validazione colonne richieste
required_cols = ["review_text", "sentiment"]
missing = [c for c in required_cols if c not in df.columns]
if missing:
raise ValueError(f"Colonne mancanti nel dataset: {missing}")
# Rimuovi righe con valori mancanti
df = df.dropna(subset=required_cols)
# Pulisci testi
df["clean_text"] = df["review_text"].apply(clean_text)
# Rimuovi testi vuoti dopo pulizia
df = df[df["clean_text"].str.len() > 0]
return df["clean_text"], df["sentiment"]
def split_dataset(
X: pd.Series,
y: pd.Series,
test_size: float = 0.2,
random_state: int = 42
) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series]:
"""Split stratificato del dataset."""
return train_test_split(
X, y,
test_size=test_size,
random_state=random_state,
stratify=y
)
Modul de formare
"""Modulo per la creazione, il training e la valutazione del modello."""
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
accuracy_score, f1_score, precision_score,
recall_score, roc_auc_score
)
from typing import Dict, Any
import pandas as pd
MODELS = {
"logistic_regression": LogisticRegression,
"random_forest": RandomForestClassifier,
}
def create_pipeline(model_config: Dict[str, Any]) -> Pipeline:
"""Crea una pipeline sklearn con TF-IDF + classificatore."""
algorithm = model_config.get("algorithm", "logistic_regression")
model_class = MODELS.get(algorithm)
if model_class is None:
raise ValueError(
f"Algoritmo non supportato: {algorithm}. "
f"Supportati: {list(MODELS.keys())}"
)
# Parametri specifici del modello
model_params = {
k: v for k, v in model_config.items()
if k not in ("algorithm", "tfidf")
}
# Parametri TF-IDF
tfidf_params = model_config.get("tfidf", {})
return Pipeline([
("tfidf", TfidfVectorizer(
max_features=tfidf_params.get("max_features", 10000),
ngram_range=tuple(tfidf_params.get("ngram_range", [1, 2])),
min_df=tfidf_params.get("min_df", 2),
max_df=tfidf_params.get("max_df", 0.95),
)),
("classifier", model_class(**model_params)),
])
def train_model(
pipeline: Pipeline,
X_train: pd.Series,
y_train: pd.Series
) -> Pipeline:
"""Addestra la pipeline sul training set."""
pipeline.fit(X_train, y_train)
return pipeline
def evaluate_model(
pipeline: Pipeline,
X_test: pd.Series,
y_test: pd.Series
) -> Dict[str, float]:
"""Valuta il modello e restituisce tutte le metriche."""
y_pred = pipeline.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred, average="weighted"),
"precision": precision_score(y_test, y_pred, average="weighted"),
"recall": recall_score(y_test, y_pred, average="weighted"),
}
# AUC-ROC solo per classificazione binaria
if len(set(y_test)) == 2:
y_proba = pipeline.predict_proba(X_test)[:, 1]
metrics["auc_roc"] = roc_auc_score(y_test, y_proba)
return metrics
Scriptul de evaluare
"""Script di valutazione del modello con confronto soglie."""
import argparse
import json
import yaml
import joblib
import pandas as pd
from pathlib import Path
from src.data.preprocessing import clean_text
from src.models.trainer import evaluate_model
def parse_args():
parser = argparse.ArgumentParser(description="Evaluate trained model")
parser.add_argument("--model-path", required=True, help="Path al modello .pkl")
parser.add_argument("--test-data", required=True, help="Path ai dati di test")
parser.add_argument("--thresholds", required=True, help="Path al file soglie YAML")
parser.add_argument("--output", required=True, help="Path per il report JSON")
return parser.parse_args()
def main():
args = parse_args()
# 1. Carica modello e dati
pipeline = joblib.load(args.model_path)
df = pd.read_csv(args.test_data)
df["clean_text"] = df["review_text"].apply(clean_text)
X_test = df["clean_text"]
y_test = df["sentiment"]
# 2. Valuta
metrics = evaluate_model(pipeline, X_test, y_test)
# 3. Confronta con soglie
with open(args.thresholds) as f:
thresholds = yaml.safe_load(f)["thresholds"]
all_pass = True
results = {}
for metric_name, threshold_value in thresholds.items():
actual = metrics.get(metric_name, 0.0)
passed = actual >= threshold_value
if not passed:
all_pass = False
results[metric_name] = {
"value": actual,
"threshold": threshold_value,
"pass": passed
}
# 4. Genera report
report = {
**metrics,
"thresholds": thresholds,
"details": results,
"pass": all_pass
}
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(report, f, indent=2)
# 5. Stampa risultati
print("\n=== Evaluation Report ===")
for name, detail in results.items():
status = "PASS" if detail["pass"] else "FAIL"
print(f" {name}: {detail['value']:.4f} (threshold: {detail['threshold']}) [{status}]")
print(f"\nOverall: {'PASS' if all_pass else 'FAIL'}")
# Exit code non-zero se le metriche non passano
if not all_pass:
print("\nWARNING: Le metriche non raggiungono le soglie minime!")
# Non usiamo exit(1) perchè il workflow legge l'output
if __name__ == "__main__":
main()
Configurare antrenament și praguri
# Configurazione pipeline di training
data:
train_path: "data/raw/reviews.csv"
test_size: 0.2
random_state: 42
model:
algorithm: "logistic_regression"
max_iter: 1000
C: 1.0
random_state: 42
tfidf:
max_features: 15000
ngram_range: [1, 2]
min_df: 3
max_df: 0.9
mlflow:
experiment_name: "sentiment-classifier"
registered_model_name: "sentiment-classifier"
# Soglie minime per approvare il deployment
thresholds:
accuracy: 0.85
f1_score: 0.83
precision: 0.80
recall: 0.80
auc_roc: 0.90
# Confronto con modello in produzione
comparison:
# Il nuovo modello deve migliorare almeno dello 0.5%
min_improvement: 0.005
# Metriche su cui e richiesto il miglioramento
compare_metrics:
- f1_score
- auc_roc
Cerințe pentru fișiere
# Core ML
scikit-learn==1.4.0
pandas==2.2.0
numpy==1.26.3
# NLP preprocessing
nltk==3.8.1
# Experiment tracking
mlflow==2.10.0
# Model serving
fastapi==0.109.0
uvicorn==0.27.0
pydantic==2.5.3
# Data validation
pandera==0.18.0
great-expectations==0.18.8
# Data versioning
dvc[s3]==3.42.0
# Configuration
pyyaml==6.0.1
python-dotenv==1.0.1
# Serialization
joblib==1.3.2
# Testing
pytest==7.4.4
httpx==0.26.0
Validarea datelor în pipeline
Înainte de a antrena un model, trebuie să ne asigurăm că datele sunt valide. Un model antrenat pe date corupte produce rezultate imprevizibile. Data validarii și prima poartă a conductei noastre: dacă datele eșuează verificările, antrenamentul nu pleacă.
"""Validazione della qualità dei dati con Pandera."""
import argparse
import sys
import yaml
import pandas as pd
import pandera as pa
from pandera import Column, Check, DataFrameSchema
def build_schema(schema_config: dict) -> DataFrameSchema:
"""Costruisce uno schema Pandera dalla configurazione YAML."""
columns = {}
for col_name, col_spec in schema_config["columns"].items():
checks = []
if "min_length" in col_spec:
checks.append(Check.str_length(min_value=col_spec["min_length"]))
if "max_length" in col_spec:
checks.append(Check.str_length(max_value=col_spec["max_length"]))
if "allowed_values" in col_spec:
checks.append(Check.isin(col_spec["allowed_values"]))
if "min_value" in col_spec:
checks.append(Check.greater_than_or_equal_to(col_spec["min_value"]))
if "max_value" in col_spec:
checks.append(Check.less_than_or_equal_to(col_spec["max_value"]))
columns[col_name] = Column(
dtype=col_spec.get("dtype", "object"),
nullable=col_spec.get("nullable", False),
checks=checks if checks else None
)
return DataFrameSchema(
columns=columns,
coerce=True,
strict=schema_config.get("strict", False)
)
def validate_data(data_path: str, schema_path: str) -> bool:
"""Valida il dataset contro lo schema definito."""
# Carica schema
with open(schema_path) as f:
schema_config = yaml.safe_load(f)
schema = build_schema(schema_config)
# Carica dati
df = pd.read_csv(data_path)
# Controlla dimensione minima
min_rows = schema_config.get("min_rows", 100)
if len(df) < min_rows:
print(f"FAIL: Dataset ha {len(df)} righe, minimo richiesto: {min_rows}")
return False
# Controlla duplicati
max_duplicates_pct = schema_config.get("max_duplicates_pct", 0.05)
duplicates_pct = df.duplicated().mean()
if duplicates_pct > max_duplicates_pct:
print(f"FAIL: {duplicates_pct:.1%} duplicati (max: {max_duplicates_pct:.1%})")
return False
# Valida schema
try:
schema.validate(df, lazy=True)
print(f"PASS: Dataset valido ({len(df)} righe, {len(df.columns)} colonne)")
return True
except pa.errors.SchemaErrors as e:
print(f"FAIL: Schema validation errors:")
print(e.failure_cases.head(20))
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", required=True)
parser.add_argument("--schema-path", required=True)
args = parser.parse_args()
valid = validate_data(args.data_path, args.schema_path)
sys.exit(0 if valid else 1)
# Schema per il dataset di recensioni
min_rows: 1000
max_duplicates_pct: 0.05
strict: false
columns:
review_text:
dtype: "object"
nullable: false
min_length: 10
max_length: 5000
sentiment:
dtype: "int64"
nullable: false
allowed_values: [0, 1]
rating:
dtype: "float64"
nullable: true
min_value: 1.0
max_value: 5.0
review_date:
dtype: "object"
nullable: true
Versiunea datelor cu DVC în Pipeline
Datele sunt prea mari pentru Git, dar trebuie versionate și sincronizate
în conducta CI/CD. DVC (control versiuni de date) rezolvă această problemă:
salvați datele în stocarea de la distanță (S3, GCS, Azure Blob) și urmăriți numai în Git
metadate (hash, dimensiune). GitHub Actions poate folosi dvc pull pentru
descărcați exact versiunea datelor asociate cu comiterea curentă.
stages:
prepare:
cmd: python -m src.data.preprocessing --config config/training.yaml
deps:
- src/data/preprocessing.py
- config/training.yaml
- data/raw/reviews.csv
outs:
- data/processed/train.csv
- data/processed/test.csv
train:
cmd: python train.py --config config/training.yaml
deps:
- train.py
- src/models/trainer.py
- data/processed/train.csv
- config/training.yaml
outs:
- models/model.pkl
metrics:
- metrics/train_metrics.json:
cache: false
evaluate:
cmd: >-
python evaluate.py
--model-path models/model.pkl
--test-data data/processed/test.csv
--thresholds config/thresholds.yaml
--output metrics/eval_metrics.json
deps:
- evaluate.py
- models/model.pkl
- data/processed/test.csv
- config/thresholds.yaml
metrics:
- metrics/eval_metrics.json:
cache: false
# Inizializza DVC
dvc init
# Configura storage remoto (S3 in questo esempio)
dvc remote add -d myremote s3://my-ml-data-bucket/sentiment-classifier
dvc remote modify myremote region eu-west-1
# Traccia il dataset
dvc add data/raw/reviews.csv
# Committa i file DVC in Git
git add data/raw/reviews.csv.dvc data/raw/.gitignore dvc.yaml dvc.lock
git commit -m "feat: add DVC tracking for training data"
# Push dei dati su S3
dvc push
Acțiuni DVC + GitHub: Cum funcționează
- Git urmărește fișierele
.dvc(metadate: hash SHA256, dimensiune) - Datele reale se află pe S3 (sau GCS, Azure Blob, Google Drive)
- Se execută fluxul de lucru GitHub Actions
dvc pullpentru a descărca datele - Acreditările S3 sunt transmise prin GitHub Secrets
- Fiecare comitere Git corespunde unei versiuni exacte a datelor
Model Registry cu MLflow
Model Registry este componenta care gestionează ciclul de viață al modelelor după antrenament. Fiecare model antrenat este înregistrat cu un nume, o versiune și un stare (Montare, Producție, Arhivat). Conducta CI/CD interacționează cu registry să promoveze modele care depășesc pragurile de validare.
"""Script per la promozione del modello nel registry MLflow."""
import mlflow
from mlflow.tracking import MlflowClient
def promote_model(
model_name: str,
run_id: str,
target_stage: str = "Production"
) -> None:
"""Promuove un modello a Production nel registry."""
client = MlflowClient()
# Cerca la versione del modello associata al run
model_versions = client.search_model_versions(
f"name='{model_name}'"
)
target_version = None
for mv in model_versions:
if mv.run_id == run_id:
target_version = mv.version
break
if target_version is None:
raise ValueError(
f"Nessun modello trovato per run_id={run_id}"
)
# Archivia il modello attualmente in Production
for mv in model_versions:
if mv.current_stage == "Production":
client.transition_model_version_stage(
name=model_name,
version=mv.version,
stage="Archived",
archive_existing_versions=False
)
print(f"Archiviato modello v{mv.version} (precedente Production)")
# Promuovi il nuovo modello
client.transition_model_version_stage(
name=model_name,
version=target_version,
stage=target_stage
)
print(f"Promosso modello v{target_version} a {target_stage}")
def load_production_model(model_name: str):
"""Carica il modello attualmente in Production."""
model_uri = f"models:/{model_name}/Production"
return mlflow.sklearn.load_model(model_uri)
Semnătura modelului și exemplu de introducere
Semnătura modelului documentează modelul intrărilor și ieșirilor modelului. Aceasta servește atât ca documentare, cât și ca validare automată: dacă cineva încercați să treceți intrarea cu o schemă diferită, MLflow aruncă o eroare clară.
import mlflow
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec
# Definisci la signature esplicita
input_schema = Schema([
ColSpec("string", "text")
])
output_schema = Schema([
ColSpec("long", "prediction")
])
signature = ModelSignature(
inputs=input_schema,
outputs=output_schema
)
# Esempio di input per documentazione
input_example = {
"text": "This product is excellent, highly recommended!"
}
# Registra il modello con signature e esempio
mlflow.sklearn.log_model(
sk_model=trained_pipeline,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name="sentiment-classifier"
)
Testarea în ML Pipeline
Testarea pentru ML este mai complexă decât testarea tradițională a software-ului. Nu este suficient să verifici că codul „funcționează”: trebuie să testați calitatea datelor, corectitudinea preprocesare, stabilitate de instruire și comportament API de servire. The Conducta CI/CD efectuează trei niveluri de testare.
Test unitar pentru preprocesare
"""Unit test per il modulo di preprocessing."""
import pytest
import pandas as pd
from src.data.preprocessing import clean_text, load_and_preprocess_data
class TestCleanText:
"""Test per la funzione clean_text."""
def test_removes_html_tags(self):
assert clean_text("<p>Hello</p>") == "hello"
def test_removes_urls(self):
assert clean_text("Visit http://example.com for info") == "visit for info"
def test_removes_special_characters(self):
assert clean_text("Hello!!! World???") == "hello world"
def test_normalizes_whitespace(self):
assert clean_text("Hello World") == "hello world"
def test_lowercases(self):
assert clean_text("HELLO WORLD") == "hello world"
def test_empty_string(self):
assert clean_text("") == ""
def test_none_input(self):
assert clean_text(None) == ""
def test_numeric_preserved(self):
assert clean_text("Rating 5 out of 10") == "rating 5 out of 10"
class TestLoadAndPreprocess:
"""Test per il caricamento e preprocessing dei dati."""
def test_missing_columns_raises(self, tmp_path):
"""Verifica che colonne mancanti generino un errore."""
df = pd.DataFrame({"wrong_col": ["text"]})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
with pytest.raises(ValueError, match="Colonne mancanti"):
load_and_preprocess_data(str(csv_path))
def test_drops_na_rows(self, tmp_path):
"""Verifica che le righe con NA vengano rimosse."""
df = pd.DataFrame({
"review_text": ["Good product", None, "Bad product"],
"sentiment": [1, 0, 0]
})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
X, y = load_and_preprocess_data(str(csv_path))
assert len(X) == 2
def test_output_types(self, tmp_path):
"""Verifica i tipi di output."""
df = pd.DataFrame({
"review_text": ["Great product", "Terrible service"],
"sentiment": [1, 0]
})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
X, y = load_and_preprocess_data(str(csv_path))
assert isinstance(X, pd.Series)
assert isinstance(y, pd.Series)
Test de integrare pentru conductă
"""Integration test per il training e la valutazione del modello."""
import pytest
import pandas as pd
from src.models.trainer import create_pipeline, train_model, evaluate_model
@pytest.fixture
def sample_data():
"""Crea un dataset di test sintetico."""
texts = [
"amazing product love it", "terrible waste of money",
"great quality recommended", "horrible experience never again",
"excellent value for price", "poor quality disappointed",
"best purchase ever made", "worst product i bought",
"fantastic results happy", "awful terrible regret buying",
] * 10 # Ripetuto per avere abbastanza dati
sentiments = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0] * 10
return pd.Series(texts), pd.Series(sentiments)
class TestPipeline:
"""Test per la pipeline di training."""
def test_create_pipeline_logistic(self):
"""Verifica creazione pipeline con LogisticRegression."""
config = {"algorithm": "logistic_regression", "max_iter": 100}
pipeline = create_pipeline(config)
assert len(pipeline.steps) == 2
def test_create_pipeline_invalid_algorithm(self):
"""Verifica errore con algoritmo non supportato."""
with pytest.raises(ValueError, match="non supportato"):
create_pipeline({"algorithm": "invalid_algo"})
def test_train_and_evaluate(self, sample_data):
"""Test end-to-end: training + evaluation."""
X, y = sample_data
config = {
"algorithm": "logistic_regression",
"max_iter": 200,
"random_state": 42
}
pipeline = create_pipeline(config)
trained = train_model(pipeline, X[:80], y[:80])
metrics = evaluate_model(trained, X[80:], y[80:])
assert "accuracy" in metrics
assert "f1_score" in metrics
assert 0.0 <= metrics["accuracy"] <= 1.0
assert 0.0 <= metrics["f1_score"] <= 1.0
def test_model_deterministic(self, sample_data):
"""Verifica che il training sia deterministico con seed fisso."""
X, y = sample_data
config = {
"algorithm": "logistic_regression",
"max_iter": 200,
"random_state": 42
}
p1 = train_model(create_pipeline(config), X[:80], y[:80])
p2 = train_model(create_pipeline(config), X[:80], y[:80])
m1 = evaluate_model(p1, X[80:], y[80:])
m2 = evaluate_model(p2, X[80:], y[80:])
assert m1["accuracy"] == m2["accuracy"]
Test de fum pentru servire
"""Smoke test per l'API di serving FastAPI."""
import pytest
from httpx import AsyncClient, ASGITransport
from src.serving.app import app
@pytest.fixture
def client():
"""Client HTTP per testare l'API."""
transport = ASGITransport(app=app)
return AsyncClient(transport=transport, base_url="http://test")
@pytest.mark.asyncio
async def test_health_endpoint(client):
"""Verifica che l'endpoint /health risponda 200."""
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
@pytest.mark.asyncio
async def test_predict_positive(client):
"""Verifica predizione per testo positivo."""
response = await client.post(
"/predict",
json={"text": "This product is amazing, I love it!"}
)
assert response.status_code == 200
data = response.json()
assert "prediction" in data
assert "confidence" in data
assert data["confidence"] > 0.0
@pytest.mark.asyncio
async def test_predict_empty_text(client):
"""Verifica errore con testo vuoto."""
response = await client.post(
"/predict",
json={"text": ""}
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_predict_batch(client):
"""Verifica predizione batch."""
response = await client.post(
"/predict/batch",
json={"texts": [
"Great product",
"Terrible experience"
]}
)
assert response.status_code == 200
data = response.json()
assert len(data["predictions"]) == 2
# Aggiungere questo job prima del training nel workflow
tests:
name: Run Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest-asyncio pytest-cov
- name: Run unit tests
run: pytest tests/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v4
with:
file: coverage.xml
Servirea API cu FastAPI
Modelul antrenat trebuie să fie accesibil prin intermediul unui API REST. FastAPI și alegere ideal pentru servirea ML în Python: și rapid, are validare automată a intrării prin Pydantic și generează automat documentația OpenAPI.
"""API di serving per il classificatore di sentiment."""
import os
import time
import joblib
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from src.serving.schemas import (
PredictionRequest, PredictionResponse,
BatchPredictionRequest, BatchPredictionResponse,
HealthResponse
)
from src.data.preprocessing import clean_text
# Variabili globali per il modello
model_pipeline = None
model_version = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Carica il modello all'avvio dell'applicazione."""
global model_pipeline, model_version
model_path = os.getenv("MODEL_PATH", "models/model.pkl")
if not Path(model_path).exists():
raise RuntimeError(f"Modello non trovato: {model_path}")
model_pipeline = joblib.load(model_path)
model_version = os.getenv("MODEL_VERSION", "unknown")
print(f"Modello caricato: v{model_version}")
yield
model_pipeline = None
app = FastAPI(
title="Sentiment Classifier API",
version="1.0.0",
lifespan=lifespan
)
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check dell'API."""
return HealthResponse(
status="healthy" if model_pipeline is not None else "unhealthy",
model_version=model_version or "not loaded"
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Predizione singola."""
if model_pipeline is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
cleaned = clean_text(request.text)
if not cleaned:
raise HTTPException(status_code=422, detail="Text is empty after cleaning")
prediction = model_pipeline.predict([cleaned])[0]
probabilities = model_pipeline.predict_proba([cleaned])[0]
confidence = float(max(probabilities))
latency_ms = (time.time() - start) * 1000
return PredictionResponse(
prediction=int(prediction),
confidence=confidence,
label="positive" if prediction == 1 else "negative",
latency_ms=round(latency_ms, 2)
)
@app.post("/predict/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
"""Predizione batch su più testi."""
if model_pipeline is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
cleaned_texts = [clean_text(t) for t in request.texts]
predictions = model_pipeline.predict(cleaned_texts)
probabilities = model_pipeline.predict_proba(cleaned_texts)
latency_ms = (time.time() - start) * 1000
results = []
for i, text in enumerate(request.texts):
results.append(PredictionResponse(
prediction=int(predictions[i]),
confidence=float(max(probabilities[i])),
label="positive" if predictions[i] == 1 else "negative",
latency_ms=0
))
return BatchPredictionResponse(
predictions=results,
total_latency_ms=round(latency_ms, 2)
)
"""Pydantic schemas per l'API di serving."""
from pydantic import BaseModel, Field
from typing import List
class PredictionRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=5000,
description="Testo della recensione")
class PredictionResponse(BaseModel):
prediction: int = Field(..., description="0=negativo, 1=positivo")
confidence: float = Field(..., ge=0.0, le=1.0,
description="Confidenza della predizione")
label: str = Field(..., description="Label leggibile")
latency_ms: float = Field(..., description="Latenza in millisecondi")
class BatchPredictionRequest(BaseModel):
texts: List[str] = Field(..., min_length=1, max_length=100,
description="Lista di testi")
class BatchPredictionResponse(BaseModel):
predictions: List[PredictionResponse]
total_latency_ms: float
class HealthResponse(BaseModel):
status: str
model_version: str
Monitorizare post-implementare
Implementarea nu este sfârșitul conductei, ci începutul celei mai critice faze: monitorizarea in productie. Un model ML se poate degrada în tăcere atunci când i datele reale diferă de datele de antrenament. Conducta trebuie să includă controale de sănătate Înregistrare continuă, predicție și declanșatoare de reantrenare automată.
"""Monitoring post-deployment per il modello ML."""
import time
import logging
from datetime import datetime, timedelta
from collections import deque
from typing import Dict, Optional
from dataclasses import dataclass, field
@dataclass
class PredictionLog:
"""Log di una singola predizione."""
timestamp: datetime
input_text: str
prediction: int
confidence: float
latency_ms: float
class ModelMonitor:
"""Monitora le performance del modello in produzione."""
def __init__(
self,
window_size: int = 1000,
min_confidence_threshold: float = 0.6,
max_latency_ms: float = 500.0,
drift_check_interval: int = 100
):
self.window_size = window_size
self.min_confidence = min_confidence_threshold
self.max_latency = max_latency_ms
self.drift_check_interval = drift_check_interval
self.predictions: deque = deque(maxlen=window_size)
self.alert_callbacks = []
self.prediction_count = 0
self.logger = logging.getLogger("model_monitor")
def log_prediction(self, log: PredictionLog) -> None:
"""Registra una predizione e verifica le metriche."""
self.predictions.append(log)
self.prediction_count += 1
# Check latenza
if log.latency_ms > self.max_latency:
self._alert(
"HIGH_LATENCY",
f"Latenza {log.latency_ms:.0f}ms supera soglia {self.max_latency}ms"
)
# Check confidenza bassa
if log.confidence < self.min_confidence:
self._alert(
"LOW_CONFIDENCE",
f"Confidenza {log.confidence:.2f} sotto soglia {self.min_confidence}"
)
# Check periodico per drift
if self.prediction_count % self.drift_check_interval == 0:
self._check_distribution_drift()
def get_metrics(self) -> Dict:
"""Restituisce le metriche correnti della finestra."""
if not self.predictions:
return {"status": "no_data"}
recent = list(self.predictions)
confidences = [p.confidence for p in recent]
latencies = [p.latency_ms for p in recent]
predictions = [p.prediction for p in recent]
positive_rate = sum(1 for p in predictions if p == 1) / len(predictions)
return {
"total_predictions": self.prediction_count,
"window_size": len(recent),
"avg_confidence": sum(confidences) / len(confidences),
"min_confidence": min(confidences),
"avg_latency_ms": sum(latencies) / len(latencies),
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"positive_rate": positive_rate,
"low_confidence_pct": sum(
1 for c in confidences if c < self.min_confidence
) / len(confidences),
}
def _check_distribution_drift(self) -> None:
"""Verifica se la distribuzione delle predizioni e cambiata."""
if len(self.predictions) < self.window_size:
return
recent = list(self.predictions)
half = len(recent) // 2
first_half = [p.prediction for p in recent[:half]]
second_half = [p.prediction for p in recent[half:]]
rate_first = sum(first_half) / len(first_half)
rate_second = sum(second_half) / len(second_half)
# Se la distribuzione cambia più del 15%, segnala drift
if abs(rate_first - rate_second) > 0.15:
self._alert(
"DISTRIBUTION_DRIFT",
f"Positive rate cambiato: {rate_first:.2f} -> {rate_second:.2f}"
)
def _alert(self, alert_type: str, message: str) -> None:
"""Invia un alert."""
self.logger.warning(f"[{alert_type}] {message}")
for callback in self.alert_callbacks:
callback(alert_type, message)
Valori cheie de monitorizat
- Latență (p50, p95, p99): Timp de răspuns API
- Debit: numărul de predicții pe secundă
- Distribuția predicțiilor: modificări ale raportului pozitiv/negativ
- Încredere medie: un declin indică faptul că modelul este „incert”
- Rata de eroare: Rata de eroare HTTP 5xx
- Derivarea datei: diferența dintre datele de producție și datele de antrenament
Costuri și optimizare
GitHub Actions oferă 2.000 de minute gratuite/lună pentru depozitele publice și 500 de minute pentru depozite private (plan gratuit). Antrenamentul ML poate consuma rapid aceste minute. Iată cum să optimizați.
Strategii de stocare în cache
# Cache delle dipendenze Python
- name: Cache pip packages
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
# Cache del dataset DVC (evita download ripetuti)
- name: Cache DVC data
uses: actions/cache@v4
with:
path: |
data/
.dvc/cache/
key: dvc-${{ hashFiles('data/*.dvc', 'dvc.lock') }}
restore-keys: |
dvc-
# Cache della Docker layer
- name: Build with cache
uses: docker/build-push-action@v5
with:
context: .
cache-from: type=gha
cache-to: type=gha,mode=max
Alergători auto-găzduiți cu GPU
Pentru antrenamentul GPU, alergătorii găzduiți de GitHub nu sunt de ajuns (nu au GPU). Solutia și un alergător auto-găzduit pe o mașină cu GPU. Acest lucru elimină și costul pe minut de GitHub Actions.
# 1. Sulla macchina con GPU, scarica il runner
mkdir actions-runner && cd actions-runner
curl -o actions-runner.tar.gz -L \
https://github.com/actions/runner/releases/download/v2.311.0/actions-runner-linux-x64-2.311.0.tar.gz
tar xzf actions-runner.tar.gz
# 2. Configura il runner
./config.sh --url https://github.com/YOUR_ORG/YOUR_REPO \
--token YOUR_TOKEN \
--labels gpu,cuda12,ml-training
# 3. Installa come servizio
sudo ./svc.sh install
sudo ./svc.sh start
training:
name: Train Model (GPU)
runs-on: [self-hosted, gpu, cuda12]
# Il job viene eseguito sulla macchina con GPU
steps:
- uses: actions/checkout@v4
- name: Train with GPU
run: |
nvidia-smi # Verifica GPU disponibile
python train.py --config config/training-gpu.yaml --device cuda
Estimarea costurilor pentru scenariu
| Scenariu | Minute/lună | Costul acțiunilor GitHub | Costul total |
|---|---|---|---|
| Prototip (antrenament manual, scikit-learn) | ~200 | Gratuit (plan gratuit) | ~0 EUR/lună |
| PMI (antrenament săptămânal, model mediu) | ~800 | ~12 EUR/luna | ~50 EUR/lună (cu S3) |
| Creștere (formare zilnică, învățare profundă) | ~3.000 | ~48 EUR/luna | ~200 EUR/lună (cu GPU cloud) |
| Întreprindere (multi-model, formare continuă) | ~10.000+ | alergător auto-găzduit | ~500+ EUR/lună |
Comparația instrumentelor CI/CD pentru ML
GitHub Actions nu este singura opțiune. Fiecare instrument are avantaje specifice în funcție de el a contextului. Iată o comparație practică pentru a vă ajuta să alegeți.
| Caracteristică | Acțiuni GitHub | GitLab CI | Jenkins | Fluxuri de lucru Argo |
|---|---|---|---|---|
| Înființat | Zero (integrat) | Zero (integrat) | Server dedicat | clustere Kubernetes |
| Suport GPU | alergător auto-găzduit | alergător auto-găzduit | Pluginuri NVIDIA | Nativ (GPU K8s) |
| Cost (echipă mică) | Gratuit/scăzut | Gratuit/scăzut | Costul serverului | Costul clusterului K8s |
| Paralelism | Bun (matrice) | Bun | Optimal | Excelent (DAG) |
| Pipeline ca cod | YAML | YAML | Groovy/YAML | SDK YAML/Python |
| Ecosistemul ML | Piață vastă | Bun | Pluginuri | Cloud-nativ |
| Curba de învățare | Scăzut | Scăzut | Medie | Ridicat |
| Ideal pentru | Echipe mici/medii, depozit GitHub | Echipa pe GitLab, autogestionată | Enterprise, on-premise | Echipa K8, conducte complexe |
Pe care să o aleg?
- Început și prototip: Acțiuni GitHub - configurare zero, integrare nativă, gratuită pentru depozitele publice
- Echipa pe GitLab: GitLab CI - integrare nativă cu GitLab, container de registru excelent
- Enterprise on-premise: Jenkins - flexibilitate maximă, ecosistem de plugin matur
- Echipe native din cloud cu K8: Argo Workflows - pipeline DAG, scalare nativă, excelent pentru ML complex
Configurare completă sub 5.000 EUR/an
Pentru un IMM care dorește să implementeze o conductă completă ML CI/CD, este posibil rămâneți sub 5.000 EUR/an folosind instrumente open-source și servicii cloud cu niveluri gratuite sau low-cost. Iată stiva recomandată.
| Componentă | Instrumente | Cost/an | Note |
|---|---|---|---|
| Depozit + CI/CD | GitHub (echipă) | ~400 EUR | 3.000 min/lună Acțiuni incluse |
| Stocarea datelor | AWS S3 | ~120 EUR | ~500 GB set de date, transfer inclus |
| Urmărirea experimentului | MLflow (auto-găzduit) | ~0 EUR | Open-source, implementat pe VM cloud |
| Registrul modelului | Registrul modelului MLflow | ~0 EUR | Inclus în MLflow |
| Registrul containerelor | Registrul Containerului GitHub | ~0 EUR | Inclus cu GitHub |
| Model de gazduire | Cloud VM (e2-mediu) | ~500 EUR | Pentru servirea serverului FastAPI + MLflow |
| Versiunea datelor | DVC | ~0 EUR | Open-source, stocare deja numărată mai sus |
| Monitorizare | Prometeu + Grafana | ~0 EUR | Open-source, pe aceeași VM |
| Validarea datelor | Pandera / Mari așteptări | ~0 EUR | Sursă deschisă |
| Antrenament GPU (ocazional) | Instanță spot GPU în cloud | ~600 EUR | ~50 ore/luna spot T4 |
Total estimat: ~1.620 EUR/an - buget cu mult sub 5.000 EUR, cu margine pentru scalare. Cel mai mare cost este antrenamentul GPU: dacă folosești modele clasice (scikit-learn, XGBoost) costul GPU ajunge la zero.
Atenție la costurile ascunse
Costul instrumentelor este doar o parte. Cel mai important cost și timpul de echipă: Configurarea conductei inițiale durează aproximativ 2-4 săptămâni pentru un inginer cu experiență. Întreținerea continuă durează aproximativ 2-4 ore/săptămână. Calculați și costul transferul de date între serviciile cloud (ieșire), care poate crește rapid cu seturi mari de date.
Concluzii și pașii următori
În acest articol, am construit o conductă completă CI/CD pentru învățarea automată, de la Dockerfile în mai multe etape la fluxul de lucru GitHub Actions cu validare a datelor, instruire, evaluare și desfășurare automată. Conceptele cheie de reținut sunt:
- CI/CD pentru ML gestionează trei artefacte (cod, date, model) și introduce Formarea Continuă
- Docker cu mai multe etape construirea, antrenamentul și difuzarea separată pentru imagini optimizate
- Acțiuni GitHub orchestrați întreaga conductă cu locuri de muncă dependente și condiționate
- Validarea datelor și prima poartă: datele corupte produc modele inutilizabile
- Registrul modelului gestionează lansările de șabloane și promovarea
- Testarea ML acoperă trei niveluri: unitate, integrare, test de fum
- Monitorizare după desfășurare și critică pentru a detecta deriva și degradarea
- Costurile sunt gestionabile: cu instrumente open-source rămâneți sub 2.000 EUR/an
Conducta pe care am construit-o corespunde cu Nivelul 2 al modelului de maturitate Google MLOps: Automatizarea completă a instruirii și implementării, cu validare și monitorizare integrată. Plecând de la această bază, în articolul următor vom aprofunda MLflow pentru urmărirea avansată a experimentelor, registrul modelului și managementul artefactelor.
Foaia de parcurs a seriei
- Articolul 1: MLOps: de la experiment la producție (finalizat)
- Articolul 2: Conductă ML cu CI/CD: GitHub Actions + Docker (acest articol)
- Articolul 3: MLflow Deep Dive - Urmărirea experimentelor și Registrul modelelor
- Articolul 4: DVC - Data Versioning for ML
- Articolul 5: Servire de modele scalabile cu FastAPI și Docker
- Articolul 6: Kubernetes pentru ML: orchestrare și scalare
- Articolul 7: Monitorizare avansată: deriva de date și evident AI
- Articolul 8: Testare A/B pentru modele ML în producție
- Articolul 9: Guvernare, conformitate și ML responsabil
- Articolul 10: Studiu de caz: conductă MLOps end-to-end







