Creo applicazioni web moderne e strumenti digitali personalizzati per aiutare le attività a crescere attraverso l'innovazione tecnologica. La mia passione è unire informatica ed economia per generare valore reale.
La mia passione per l'informatica è nata tra i banchi dell'Istituto Tecnico Commerciale di Maglie, dove ho scoperto il potere della programmazione e il fascino di creare soluzioni digitali. Fin da subito, ho capito che l'informatica non era solo codice, ma uno strumento straordinario per trasformare idee in realtà.
Durante gli studi superiori in Sistemi Informativi Aziendali, ho iniziato a intrecciare informatica ed economia, comprendendo come la tecnologia possa essere il motore della crescita per qualsiasi attività. Questa visione mi ha accompagnato all'Università degli Studi di Bari, dove ho conseguito la Laurea in Informatica, approfondendo le mie competenze tecniche e la mia passione per lo sviluppo software.
Oggi metto questa esperienza al servizio di imprese, professionisti e startup, creando soluzioni digitali su misura che automatizzano processi, ottimizzano risorse e aprono nuove opportunità di business. Perché la vera innovazione inizia quando la tecnologia incontra le esigenze reali delle persone.
Le Mie Competenze
Analisi Dati & Modelli Previsionali
Trasformo i dati in insights strategici con analisi approfondite e modelli predittivi per decisioni informate
Automazione Processi
Creo strumenti personalizzati che automatizzano operazioni ripetitive e liberano tempo per attività a valore aggiunto
Sistemi Custom
Sviluppo sistemi software su misura, dalle integrazioni tra piattaforme alle dashboard personalizzate
Credo fermamente che l'informatica sia lo strumento più potente per trasformare le idee in realtà e migliorare la vita delle persone.
Democratizzare la Tecnologia
La mia missione è rendere l'informatica accessibile a tutti: dalle piccole imprese locali alle startup innovative, fino ai professionisti che vogliono digitalizzare la propria attività. Ogni realtà merita di sfruttare le potenzialità del digitale.
Unire Informatica ed Economia
Non è solo questione di scrivere codice: è capire come la tecnologia possa generare valore reale. Intrecciando competenze informatiche e visione economica, aiuto le attività a crescere, ottimizzare processi e raggiungere nuovi traguardi di efficienza e redditività.
Creare Soluzioni su Misura
Ogni attività è unica, e così devono esserlo le soluzioni. Sviluppo strumenti personalizzati che rispondono alle esigenze specifiche di ciascun cliente, automatizzando processi ripetitivi e liberando tempo per ciò che conta davvero: far crescere il business.
Trasforma la Tua Attività con la Tecnologia
Che tu gestisca un negozio, uno studio professionale o un'azienda, posso aiutarti a sfruttare le potenzialità dell'informatica per lavorare meglio, più velocemente e in modo più intelligente.
Il mio percorso accademico e le tecnologie che padroneggio
Certificazioni Professionali
8 certificazioni conseguite
Nuovo
Visualizza
Reinvention With Agentic AI Learning Program
Anthropic
Dicembre 2024
Nuovo
Visualizza
Agentic AI Fluency
Anthropic
Dicembre 2024
Nuovo
Visualizza
AI Fluency for Students
Anthropic
Dicembre 2024
Nuovo
Visualizza
AI Fluency: Framework and Foundations
Anthropic
Dicembre 2024
Nuovo
Visualizza
Claude with the Anthropic API
Anthropic
Dicembre 2024
Visualizza
Master SQL
RoadMap.sh
Novembre 2024
Visualizza
Oracle Certified Foundations Associate
Oracle
Ottobre 2024
Visualizza
People Leadership Credential
Connect
Settembre 2024
Linguaggi & Tecnologie
Java
Python
JavaScript
Angular
React
TypeScript
SQL
PHP
CSS/SCSS
Node.js
Docker
Git
💼
12/2024 - Presente
Custom Software Engineering Analyst
Accenture
Bari, Puglia, Italia · Ibrida
Analisi e sviluppo di sistemi informatici attraverso l'utilizzo di Java e Quarkus in Health and Public Sector. Formazione continua su tecnologie moderne per la creazione di soluzioni software personalizzate ed efficienti e sugli agenti.
💼
06/2022 - 12/2024
Analista software e Back End Developer Associate Consultant
Links Management and Technology SpA
Esperienza nell'analisi di sistemi software as-is e flussi ETL utilizzando PowerCenter. Formazione completata su Spring Boot per lo sviluppo di applicazioni backend moderne e scalabili. Sviluppatore Backend specializzato in Spring Boot, con esperienza in progettazione di database, analisi, sviluppo e testing dei task assegnati.
💼
02/2021 - 10/2021
Programmatore software
Adesso.it (prima era WebScience srl)
Esperienza nell'analisi AS-IS e TO-BE, evoluzioni SEO ed evoluzioni website per migliorare le performance e l'engagement degli utenti.
🎓
2018 - 2025
Laurea in Informatica
Università degli Studi di Bari Aldo Moro
Bachelor's degree in Computer Science, focusing on software engineering, algorithms, and modern development practices.
📚
2013 - 2018
Diploma - Sistemi Informativi Aziendali
Istituto Tecnico Commerciale di Maglie
Technical diploma specializing in Business Information Systems, combining IT knowledge with business management.
Contattami
Hai un progetto in mente? Parliamone! Compila il form qui sotto e ti risponderò al più presto.
* Campi obbligatori. I tuoi dati saranno utilizzati solo per rispondere alla tua richiesta.
02 - Pipeline ML con CI/CD: GitHub Actions + Docker
Nel primo articolo della serie abbiamo visto perchè l'85% dei progetti ML non arriva in produzione
e come MLOps risolve questo problema. Abbiamo trasformato un notebook monolitico in una pipeline
modulare e configurabile. Ora e il momento di fare il passo successivo: automatizzare l'intera
pipeline con CI/CD, in modo che ogni modifica al codice, ai dati o alla configurazione
inneschi automaticamente training, validazione e deployment del modello.
In questo articolo costruiremo una pipeline CI/CD completa per machine learning utilizzando
GitHub Actions come orchestratore e Docker come runtime di
esecuzione. Non ci limiteremo alla teoria: creeremo un progetto funzionante con un classificatore
di sentiment, completo di Dockerfile multi-stage, workflow YAML, data validation, model registry
e deployment automatico.
Cosa Imparerai
perchè il CI/CD per ML e diverso dal CI/CD software tradizionale
Come progettare l'architettura di una pipeline ML end-to-end
Come creare Dockerfile multi-stage ottimizzati per training e serving
Come scrivere workflow GitHub Actions completi per ML
Come integrare data validation, model registry e deployment automatico
Come gestire il data versioning con DVC nella pipeline
Come implementare testing specifico per ML (unit, integration, smoke)
Come monitorare il modello dopo il deployment
Come ottimizzare i costi con caching e self-hosted runners
Come scegliere il tool CI/CD giusto per il proprio team
perchè CI/CD per ML e Diverso
Se vieni dal mondo dello sviluppo software tradizionale, potresti pensare che basti applicare
le stesse pratiche CI/CD ai progetti ML. In realta, il machine learning introduce complessità
uniche che richiedono un approccio specifico. La differenza fondamentale e che nel software
tradizionale il CI/CD gestisce un solo artefatto (il codice), mentre nel ML deve gestirne
tre contemporaneamente: codice, dati e modello.
I Tre Artefatti del ML
Nel software tradizionale, se il codice non cambia, l'output non cambia. Nel ML, anche con
codice identico, un cambiamento nei dati produce un modello diverso. Questo significa che
la pipeline CI/CD deve tracciare e validare tre dimensioni indipendenti.
Dimensione
CI/CD Tradizionale
CI/CD per ML
Codice
Git push trigghera build + test
Git push trigghera training + evaluation
Dati
Non applicabile
Nuovi dati triggerano retraining
Modello
Non applicabile
Nuovo modello richiede validazione + promozione
Configurazione
Feature flags, env vars
Iperparametri, feature set, soglie metriche
Ambiente
OS + librerie
OS + librerie + driver GPU + versione CUDA
Validazione
Test passano/falliscono
Metriche sopra/sotto soglia + confronto col modello in produzione
Deployment
Deploy o rollback
Deploy graduale + A/B test + monitoring drift
Continuous Training: Il Concetto Chiave
Il CI/CD per ML introduce un concetto che non esiste nel software tradizionale:
il Continuous Training (CT). Oltre a Continuous Integration e Continuous
Deployment, il CT garantisce che il modello venga riaddestrato automaticamente quando:
Arrivano nuovi dati: il dataset viene aggiornato con nuove osservazioni
Cambia il codice: viene modificato il preprocessing o l'algoritmo
Degradano le metriche: il monitoring rileva data drift o calo di performance
Scade un timer: il retraining schedulato (es. settimanale) si attiva
Errore Comune: CI/CD Senza CT
Molti team implementano CI/CD per il codice ML ma dimenticano il Continuous Training.
Il risultato e un modello che viene deployato una volta e poi mai più aggiornato,
degradando silenziosamente nel tempo man mano che i dati in produzione divergono
dai dati di training. Una pipeline senza CT e come un'auto senza manutenzione:
funziona finchè non si rompe.
Architettura della Pipeline ML
Prima di scrivere codice, progettiamo l'architettura completa della pipeline. Ogni fase
ha input e output specifici, e il fallimento di una fase blocca le successive. Questo
approccio "fail fast" garantisce che solo modelli validati raggiungano la produzione.
Ogni blocco corrisponde a uno step del workflow GitHub Actions. Vediamo ora in
dettaglio come implementare ciascuna fase, partendo dalla containerizzazione con Docker.
Docker per Machine Learning
Docker risolve uno dei problemi più frustranti del ML: "funziona sulla mia macchina".
Containerizzando l'ambiente di training e serving, garantiamo che il codice produca gli
stessi risultati ovunque venga eseguito: sul laptop del data scientist, nel CI/CD runner
e in produzione. Per ML, Docker richiede attenzioni particolari: le immagini tendono a
essere molto grandi (librerie scientifiche + driver GPU) e il build può essere lento.
Base Images per ML
La scelta della base image e critica per dimensioni e compatibilità. Ecco le opzioni
principali e quando usarle.
Base Image
Dimensione
Uso
Quando Sceglierla
python:3.11-slim
~120 MB
Training/Serving CPU
Modelli scikit-learn, XGBoost, serving leggero
python:3.11-bookworm
~900 MB
Training con build tools
Dipendenze che richiedono compilazione C/C++
nvidia/cuda:12.1-runtime
~3.5 GB
Inference GPU
Serving di modelli deep learning
nvidia/cuda:12.1-devel
~5.2 GB
Training GPU
Training PyTorch/TensorFlow con CUDA
pytorch/pytorch:2.1.0-cuda12.1
~6 GB
Training/Serving PyTorch
Progetti PyTorch che vogliono evitare setup CUDA manuale
Dockerfile Multi-Stage per Training e Serving
Il pattern multi-stage e fondamentale per ML. Usando due stage separati, possiamo avere
un ambiente di build completo (con compilatori e build tools) e un'immagine finale
snella che contiene solo il runtime necessario. Questo riduce la dimensione dell'immagine
finale fino al 60%.
Dockerfile - Multi-stage per training e serving
# ============================================
# Stage 1: Builder - installa dipendenze
# ============================================
FROM python:3.11-slim AS builder
WORKDIR /build
# Installa build tools necessari per compilare dipendenze native
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Copia e installa dipendenze in un virtual environment
COPY requirements.txt .
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# ============================================
# Stage 2: Training - esegue il training
# ============================================
FROM python:3.11-slim AS trainer
WORKDIR /app
# Copia virtual environment dallo stage builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copia codice sorgente
COPY src/ ./src/
COPY config/ ./config/
COPY train.py .
COPY evaluate.py .
# Entrypoint per training
ENTRYPOINT ["python", "train.py"]
# ============================================
# Stage 3: Serving - API di produzione
# ============================================
FROM python:3.11-slim AS serving
WORKDIR /app
# Utente non-root per sicurezza
RUN useradd --create-home appuser
# Copia virtual environment dallo stage builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copia solo il codice necessario per serving
COPY src/serving/ ./src/serving/
COPY src/preprocessing/ ./src/preprocessing/
# Healthcheck endpoint
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"
# Switch a utente non-root
USER appuser
# Porta di default
EXPOSE 8000
# Entrypoint per serving
ENTRYPOINT ["uvicorn", "src.serving.app:app", "--host", "0.0.0.0", "--port", "8000"]
perchè Multi-Stage per ML?
Sicurezza: l'immagine di serving non contiene compilatori o build tools
Dimensione: lo stage di serving e molto più leggero (~300 MB vs ~1.2 GB)
Cache: le dipendenze cambiano meno spesso del codice, sfruttando la layer cache
Flessibilità: puoi buildare solo lo stage di training o solo quello di serving
Ottimizzazione della Layer Cache
L'ordine delle istruzioni COPY nel Dockerfile e cruciale per la cache. Le dipendenze Python
cambiano raramente, il codice sorgente cambia spesso. Copiando prima il
requirements.txt e poi il codice, evitiamo di reinstallare le dipendenze
a ogni modifica del codice.
.dockerignore - Escludere file non necessari
# Dati e modelli (gestiti da DVC, non da Docker)
data/
models/
*.pkl
*.h5
*.pt
# Ambiente di sviluppo
.venv/
__pycache__/
*.pyc
.pytest_cache/
.mypy_cache/
# Git e CI
.git/
.github/
.dvc/cache/
# IDE e editor
.vscode/
.idea/
*.swp
# Documentazione
docs/
*.md
LICENSE
Docker con GPU Support
Per il training di modelli deep learning, serve il supporto GPU nel container. Docker supporta
le GPU NVIDIA tramite il NVIDIA Container Toolkit. La configurazione richiede il driver NVIDIA
sull'host e il toolkit installato.
Dockerfile.gpu - Dockerfile con supporto GPU
# Base image con CUDA runtime
FROM nvidia/cuda:12.1.1-runtime-ubuntu22.04 AS gpu-trainer
WORKDIR /app
# Installa Python e dipendenze di sistema
RUN apt-get update && apt-get install -y --no-install-recommends \
python3.11 \
python3.11-venv \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Virtual environment
RUN python3.11 -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Dipendenze PyTorch con CUDA
COPY requirements-gpu.txt .
RUN pip install --no-cache-dir -r requirements-gpu.txt
COPY src/ ./src/
COPY train.py .
# Variabili ambiente per CUDA
ENV NVIDIA_VISIBLE_DEVICES=all
ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility
ENTRYPOINT ["python", "train.py"]
GitHub Actions e un servizio CI/CD integrato in GitHub che esegue workflow automatizzati
in risposta a eventi (push, pull request, schedule, dispatch manuale). Per il ML, offre
vantaggi significativi: integrazione nativa con il repository Git, marketplace con azioni
predefinite, gestione di secrets per credenziali e fino a 2.000 minuti/mese gratuiti
per repository pubblici.
Struttura del Workflow ML
Un workflow GitHub Actions per ML ha una struttura specifica: più job che corrispondono
alle fasi della pipeline, con dipendenze esplicite tra job e condizioni di esecuzione
basate sulle metriche del modello.
Mai inserire credenziali direttamente nel workflow YAML. Utilizza sempre GitHub Secrets
per le chiavi AWS, i token MLflow, le credenziali del registry Docker e qualsiasi altra
informazione sensibile. Configura gli secrets in
Settings > Secrets and variables > Actions nel repository GitHub.
Progetto Esempio: Classificatore di Sentiment
Mettiamo tutto insieme con un progetto concreto. Costruiremo un classificatore di sentiment
per recensioni di prodotti, con pipeline CI/CD completa. Il progetto usa scikit-learn per
semplicità, ma l'architettura si applica identicamente a modelli PyTorch o TensorFlow.
Struttura del Progetto
Struttura directory del progetto
sentiment-classifier/
src/
data/
__init__.py
preprocessing.py # Pulizia e trasformazione testi
validate_data.py # Validazione schema e qualità
models/
__init__.py
trainer.py # Training del classificatore
serving/
__init__.py
app.py # FastAPI application
schemas.py # Pydantic schemas
monitoring/
__init__.py
health.py # Health checks
tests/
test_preprocessing.py # Unit test preprocessing
test_trainer.py # Unit test training
test_api.py # Integration test API
config/
training.yaml # Configurazione training
thresholds.yaml # Soglie metriche
data_schema.yaml # Schema validazione dati
train.py # Entrypoint training
evaluate.py # Entrypoint evaluation
Dockerfile # Multi-stage build
requirements.txt # Dipendenze Python
.github/
workflows/
ml-pipeline.yml # Pipeline CI/CD
.dvc/ # Configurazione DVC
dvc.yaml # Pipeline DVC
dvc.lock # Lock file DVC
Script di Training
train.py - Entrypoint di training
"""Script principale di training per il classificatore di sentiment."""
import argparse
import yaml
import mlflow
import mlflow.sklearn
from pathlib import Path
from datetime import datetime
from src.data.preprocessing import load_and_preprocess_data, split_dataset
from src.models.trainer import create_pipeline, train_model
def parse_args():
"""Parse degli argomenti da riga di comando."""
parser = argparse.ArgumentParser(
description="Train sentiment classifier"
)
parser.add_argument(
"--config",
type=str,
default="config/training.yaml",
help="Path al file di configurazione"
)
parser.add_argument(
"--output-dir",
type=str,
default="models/",
help="Directory per salvare il modello"
)
parser.add_argument(
"--experiment-name",
type=str,
default="sentiment-classifier",
help="Nome dell'esperimento MLflow"
)
return parser.parse_args()
def main():
"""Esegue la pipeline di training completa."""
args = parse_args()
# 1. Carica configurazione
with open(args.config) as f:
config = yaml.safe_load(f)
# 2. Setup MLflow
mlflow.set_experiment(args.experiment_name)
with mlflow.start_run(run_name=f"train-{datetime.now().strftime('%Y%m%d-%H%M%S')}") as run:
# 3. Log dei parametri
mlflow.log_params(config["model"])
mlflow.log_param("data_path", config["data"]["train_path"])
mlflow.log_param("test_size", config["data"]["test_size"])
# 4. Caricamento e preprocessing dati
print("[1/5] Caricamento e preprocessing dati...")
X, y = load_and_preprocess_data(config["data"]["train_path"])
# 5. Split dataset
print("[2/5] Split train/validation...")
X_train, X_val, y_train, y_val = split_dataset(
X, y,
test_size=config["data"]["test_size"],
random_state=config["data"]["random_state"]
)
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("val_size", len(X_val))
# 6. Crea pipeline di preprocessing + modello
print("[3/5] Creazione pipeline ML...")
pipeline = create_pipeline(config["model"])
# 7. Training
print("[4/5] Training in corso...")
trained_pipeline = train_model(pipeline, X_train, y_train)
# 8. Valutazione su validation set
print("[5/5] Valutazione...")
from src.models.trainer import evaluate_model
metrics = evaluate_model(trained_pipeline, X_val, y_val)
# 9. Log metriche in MLflow
mlflow.log_metrics(metrics)
# 10. Salva modello
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
import joblib
model_path = output_dir / "model.pkl"
joblib.dump(trained_pipeline, model_path)
# Log modello in MLflow con signature
from mlflow.models.signature import infer_signature
signature = infer_signature(X_train[:5], trained_pipeline.predict(X_train[:5]))
mlflow.sklearn.log_model(
trained_pipeline,
"model",
signature=signature,
registered_model_name="sentiment-classifier"
)
# Salva versione e run_id per il CI/CD
(output_dir / "version.txt").write_text(run.info.run_id[:8])
(output_dir / "run_id.txt").write_text(run.info.run_id)
print(f"\nTraining completato!")
print(f" Run ID: {run.info.run_id}")
for name, value in metrics.items():
print(f" {name}: {value:.4f}")
if __name__ == "__main__":
main()
Modulo di Preprocessing
src/data/preprocessing.py - Preprocessing dei testi
"""Modulo per il preprocessing dei dati testuali."""
import re
import pandas as pd
from typing import Tuple
from sklearn.model_selection import train_test_split
def clean_text(text: str) -> str:
"""Pulisce un singolo testo rimuovendo HTML, caratteri speciali e spazi extra."""
if not isinstance(text, str):
return ""
# Rimuovi tag HTML
text = re.sub(r'<[^>]+>', '', text)
# Rimuovi URL
text = re.sub(r'http\S+|www\.\S+', '', text)
# Rimuovi caratteri speciali (mantieni lettere, numeri, spazi)
text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
# Normalizza spazi
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
def load_and_preprocess_data(data_path: str) -> Tuple[pd.Series, pd.Series]:
"""Carica e preprocess il dataset di recensioni."""
df = pd.read_csv(data_path)
# Validazione colonne richieste
required_cols = ["review_text", "sentiment"]
missing = [c for c in required_cols if c not in df.columns]
if missing:
raise ValueError(f"Colonne mancanti nel dataset: {missing}")
# Rimuovi righe con valori mancanti
df = df.dropna(subset=required_cols)
# Pulisci testi
df["clean_text"] = df["review_text"].apply(clean_text)
# Rimuovi testi vuoti dopo pulizia
df = df[df["clean_text"].str.len() > 0]
return df["clean_text"], df["sentiment"]
def split_dataset(
X: pd.Series,
y: pd.Series,
test_size: float = 0.2,
random_state: int = 42
) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series]:
"""Split stratificato del dataset."""
return train_test_split(
X, y,
test_size=test_size,
random_state=random_state,
stratify=y
)
Modulo di Training
src/models/trainer.py - Training e valutazione
"""Modulo per la creazione, il training e la valutazione del modello."""
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
accuracy_score, f1_score, precision_score,
recall_score, roc_auc_score
)
from typing import Dict, Any
import pandas as pd
MODELS = {
"logistic_regression": LogisticRegression,
"random_forest": RandomForestClassifier,
}
def create_pipeline(model_config: Dict[str, Any]) -> Pipeline:
"""Crea una pipeline sklearn con TF-IDF + classificatore."""
algorithm = model_config.get("algorithm", "logistic_regression")
model_class = MODELS.get(algorithm)
if model_class is None:
raise ValueError(
f"Algoritmo non supportato: {algorithm}. "
f"Supportati: {list(MODELS.keys())}"
)
# Parametri specifici del modello
model_params = {
k: v for k, v in model_config.items()
if k not in ("algorithm", "tfidf")
}
# Parametri TF-IDF
tfidf_params = model_config.get("tfidf", {})
return Pipeline([
("tfidf", TfidfVectorizer(
max_features=tfidf_params.get("max_features", 10000),
ngram_range=tuple(tfidf_params.get("ngram_range", [1, 2])),
min_df=tfidf_params.get("min_df", 2),
max_df=tfidf_params.get("max_df", 0.95),
)),
("classifier", model_class(**model_params)),
])
def train_model(
pipeline: Pipeline,
X_train: pd.Series,
y_train: pd.Series
) -> Pipeline:
"""Addestra la pipeline sul training set."""
pipeline.fit(X_train, y_train)
return pipeline
def evaluate_model(
pipeline: Pipeline,
X_test: pd.Series,
y_test: pd.Series
) -> Dict[str, float]:
"""Valuta il modello e restituisce tutte le metriche."""
y_pred = pipeline.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred, average="weighted"),
"precision": precision_score(y_test, y_pred, average="weighted"),
"recall": recall_score(y_test, y_pred, average="weighted"),
}
# AUC-ROC solo per classificazione binaria
if len(set(y_test)) == 2:
y_proba = pipeline.predict_proba(X_test)[:, 1]
metrics["auc_roc"] = roc_auc_score(y_test, y_proba)
return metrics
Script di Evaluation
evaluate.py - Valutazione e confronto con produzione
"""Script di valutazione del modello con confronto soglie."""
import argparse
import json
import yaml
import joblib
import pandas as pd
from pathlib import Path
from src.data.preprocessing import clean_text
from src.models.trainer import evaluate_model
def parse_args():
parser = argparse.ArgumentParser(description="Evaluate trained model")
parser.add_argument("--model-path", required=True, help="Path al modello .pkl")
parser.add_argument("--test-data", required=True, help="Path ai dati di test")
parser.add_argument("--thresholds", required=True, help="Path al file soglie YAML")
parser.add_argument("--output", required=True, help="Path per il report JSON")
return parser.parse_args()
def main():
args = parse_args()
# 1. Carica modello e dati
pipeline = joblib.load(args.model_path)
df = pd.read_csv(args.test_data)
df["clean_text"] = df["review_text"].apply(clean_text)
X_test = df["clean_text"]
y_test = df["sentiment"]
# 2. Valuta
metrics = evaluate_model(pipeline, X_test, y_test)
# 3. Confronta con soglie
with open(args.thresholds) as f:
thresholds = yaml.safe_load(f)["thresholds"]
all_pass = True
results = {}
for metric_name, threshold_value in thresholds.items():
actual = metrics.get(metric_name, 0.0)
passed = actual >= threshold_value
if not passed:
all_pass = False
results[metric_name] = {
"value": actual,
"threshold": threshold_value,
"pass": passed
}
# 4. Genera report
report = {
**metrics,
"thresholds": thresholds,
"details": results,
"pass": all_pass
}
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(report, f, indent=2)
# 5. Stampa risultati
print("\n=== Evaluation Report ===")
for name, detail in results.items():
status = "PASS" if detail["pass"] else "FAIL"
print(f" {name}: {detail['value']:.4f} (threshold: {detail['threshold']}) [{status}]")
print(f"\nOverall: {'PASS' if all_pass else 'FAIL'}")
# Exit code non-zero se le metriche non passano
if not all_pass:
print("\nWARNING: Le metriche non raggiungono le soglie minime!")
# Non usiamo exit(1) perchè il workflow legge l'output
if __name__ == "__main__":
main()
config/thresholds.yaml - Soglie metriche per deployment
# Soglie minime per approvare il deployment
thresholds:
accuracy: 0.85
f1_score: 0.83
precision: 0.80
recall: 0.80
auc_roc: 0.90
# Confronto con modello in produzione
comparison:
# Il nuovo modello deve migliorare almeno dello 0.5%
min_improvement: 0.005
# Metriche su cui e richiesto il miglioramento
compare_metrics:
- f1_score
- auc_roc
File Requirements
requirements.txt - Dipendenze Python
# Core ML
scikit-learn==1.4.0
pandas==2.2.0
numpy==1.26.3
# NLP preprocessing
nltk==3.8.1
# Experiment tracking
mlflow==2.10.0
# Model serving
fastapi==0.109.0
uvicorn==0.27.0
pydantic==2.5.3
# Data validation
pandera==0.18.0
great-expectations==0.18.8
# Data versioning
dvc[s3]==3.42.0
# Configuration
pyyaml==6.0.1
python-dotenv==1.0.1
# Serialization
joblib==1.3.2
# Testing
pytest==7.4.4
httpx==0.26.0
Data Validation nella Pipeline
Prima di addestrare un modello, dobbiamo assicurarci che i dati siano validi.
Un modello addestrato su dati corrotti produce risultati imprevedibili. La data validation
e il primo gate della nostra pipeline: se i dati non superano i controlli, il training
non parte.
src/data/validate_data.py - Validazione con Pandera
"""Validazione della qualità dei dati con Pandera."""
import argparse
import sys
import yaml
import pandas as pd
import pandera as pa
from pandera import Column, Check, DataFrameSchema
def build_schema(schema_config: dict) -> DataFrameSchema:
"""Costruisce uno schema Pandera dalla configurazione YAML."""
columns = {}
for col_name, col_spec in schema_config["columns"].items():
checks = []
if "min_length" in col_spec:
checks.append(Check.str_length(min_value=col_spec["min_length"]))
if "max_length" in col_spec:
checks.append(Check.str_length(max_value=col_spec["max_length"]))
if "allowed_values" in col_spec:
checks.append(Check.isin(col_spec["allowed_values"]))
if "min_value" in col_spec:
checks.append(Check.greater_than_or_equal_to(col_spec["min_value"]))
if "max_value" in col_spec:
checks.append(Check.less_than_or_equal_to(col_spec["max_value"]))
columns[col_name] = Column(
dtype=col_spec.get("dtype", "object"),
nullable=col_spec.get("nullable", False),
checks=checks if checks else None
)
return DataFrameSchema(
columns=columns,
coerce=True,
strict=schema_config.get("strict", False)
)
def validate_data(data_path: str, schema_path: str) -> bool:
"""Valida il dataset contro lo schema definito."""
# Carica schema
with open(schema_path) as f:
schema_config = yaml.safe_load(f)
schema = build_schema(schema_config)
# Carica dati
df = pd.read_csv(data_path)
# Controlla dimensione minima
min_rows = schema_config.get("min_rows", 100)
if len(df) < min_rows:
print(f"FAIL: Dataset ha {len(df)} righe, minimo richiesto: {min_rows}")
return False
# Controlla duplicati
max_duplicates_pct = schema_config.get("max_duplicates_pct", 0.05)
duplicates_pct = df.duplicated().mean()
if duplicates_pct > max_duplicates_pct:
print(f"FAIL: {duplicates_pct:.1%} duplicati (max: {max_duplicates_pct:.1%})")
return False
# Valida schema
try:
schema.validate(df, lazy=True)
print(f"PASS: Dataset valido ({len(df)} righe, {len(df.columns)} colonne)")
return True
except pa.errors.SchemaErrors as e:
print(f"FAIL: Schema validation errors:")
print(e.failure_cases.head(20))
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", required=True)
parser.add_argument("--schema-path", required=True)
args = parser.parse_args()
valid = validate_data(args.data_path, args.schema_path)
sys.exit(0 if valid else 1)
I dati sono troppo grandi per Git, ma devono essere versionati e sincronizzati
nella pipeline CI/CD. DVC (Data Version Control) risolve questo problema:
salva i dati su storage remoto (S3, GCS, Azure Blob) e traccia in Git solo i
metadata (hash, dimensione). GitHub Actions può usare dvc pull per
scaricare esattamente la versione dei dati associata al commit corrente.
# Inizializza DVC
dvc init
# Configura storage remoto (S3 in questo esempio)
dvc remote add -d myremote s3://my-ml-data-bucket/sentiment-classifier
dvc remote modify myremote region eu-west-1
# Traccia il dataset
dvc add data/raw/reviews.csv
# Committa i file DVC in Git
git add data/raw/reviews.csv.dvc data/raw/.gitignore dvc.yaml dvc.lock
git commit -m "feat: add DVC tracking for training data"
# Push dei dati su S3
dvc push
DVC + GitHub Actions: Come Funziona
Git traccia i file .dvc (metadata: hash SHA256, dimensione)
I dati reali risiedono su S3 (o GCS, Azure Blob, Google Drive)
Il workflow GitHub Actions esegue dvc pull per scaricare i dati
Le credenziali S3 sono passate tramite GitHub Secrets
Ogni commit Git corrisponde a una versione esatta dei dati
Model Registry con MLflow
Il Model Registry e il componente che gestisce il ciclo di vita dei modelli dopo il
training. Ogni modello addestrato viene registrato con un nome, una versione e uno
stato (Staging, Production, Archived). La pipeline CI/CD interagisce con il registry
per promuovere i modelli che superano le soglie di validazione.
Promozione modello nel workflow GitHub Actions
"""Script per la promozione del modello nel registry MLflow."""
import mlflow
from mlflow.tracking import MlflowClient
def promote_model(
model_name: str,
run_id: str,
target_stage: str = "Production"
) -> None:
"""Promuove un modello a Production nel registry."""
client = MlflowClient()
# Cerca la versione del modello associata al run
model_versions = client.search_model_versions(
f"name='{model_name}'"
)
target_version = None
for mv in model_versions:
if mv.run_id == run_id:
target_version = mv.version
break
if target_version is None:
raise ValueError(
f"Nessun modello trovato per run_id={run_id}"
)
# Archivia il modello attualmente in Production
for mv in model_versions:
if mv.current_stage == "Production":
client.transition_model_version_stage(
name=model_name,
version=mv.version,
stage="Archived",
archive_existing_versions=False
)
print(f"Archiviato modello v{mv.version} (precedente Production)")
# Promuovi il nuovo modello
client.transition_model_version_stage(
name=model_name,
version=target_version,
stage=target_stage
)
print(f"Promosso modello v{target_version} a {target_stage}")
def load_production_model(model_name: str):
"""Carica il modello attualmente in Production."""
model_uri = f"models:/{model_name}/Production"
return mlflow.sklearn.load_model(model_uri)
Model Signature e Input Example
La model signature documenta lo schema degli input e degli output del modello.
Questo serve sia come documentazione sia come validazione automatica: se qualcuno
prova a passare input con schema diverso, MLflow genera un errore chiaro.
Registrazione con signature e input example
import mlflow
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec
# Definisci la signature esplicita
input_schema = Schema([
ColSpec("string", "text")
])
output_schema = Schema([
ColSpec("long", "prediction")
])
signature = ModelSignature(
inputs=input_schema,
outputs=output_schema
)
# Esempio di input per documentazione
input_example = {
"text": "This product is excellent, highly recommended!"
}
# Registra il modello con signature e esempio
mlflow.sklearn.log_model(
sk_model=trained_pipeline,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name="sentiment-classifier"
)
Testing nella Pipeline ML
Il testing per ML e più complesso del testing software tradizionale. Non basta verificare
che il codice "funziona": bisogna testare la qualità dei dati, la correttezza del
preprocessing, la stabilità del training e il comportamento dell'API di serving. La
pipeline CI/CD esegue tre livelli di test.
Unit Test per il Preprocessing
tests/test_preprocessing.py
"""Unit test per il modulo di preprocessing."""
import pytest
import pandas as pd
from src.data.preprocessing import clean_text, load_and_preprocess_data
class TestCleanText:
"""Test per la funzione clean_text."""
def test_removes_html_tags(self):
assert clean_text("<p>Hello</p>") == "hello"
def test_removes_urls(self):
assert clean_text("Visit http://example.com for info") == "visit for info"
def test_removes_special_characters(self):
assert clean_text("Hello!!! World???") == "hello world"
def test_normalizes_whitespace(self):
assert clean_text("Hello World") == "hello world"
def test_lowercases(self):
assert clean_text("HELLO WORLD") == "hello world"
def test_empty_string(self):
assert clean_text("") == ""
def test_none_input(self):
assert clean_text(None) == ""
def test_numeric_preserved(self):
assert clean_text("Rating 5 out of 10") == "rating 5 out of 10"
class TestLoadAndPreprocess:
"""Test per il caricamento e preprocessing dei dati."""
def test_missing_columns_raises(self, tmp_path):
"""Verifica che colonne mancanti generino un errore."""
df = pd.DataFrame({"wrong_col": ["text"]})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
with pytest.raises(ValueError, match="Colonne mancanti"):
load_and_preprocess_data(str(csv_path))
def test_drops_na_rows(self, tmp_path):
"""Verifica che le righe con NA vengano rimosse."""
df = pd.DataFrame({
"review_text": ["Good product", None, "Bad product"],
"sentiment": [1, 0, 0]
})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
X, y = load_and_preprocess_data(str(csv_path))
assert len(X) == 2
def test_output_types(self, tmp_path):
"""Verifica i tipi di output."""
df = pd.DataFrame({
"review_text": ["Great product", "Terrible service"],
"sentiment": [1, 0]
})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
X, y = load_and_preprocess_data(str(csv_path))
assert isinstance(X, pd.Series)
assert isinstance(y, pd.Series)
Integration Test per la Pipeline
tests/test_trainer.py - Test della pipeline completa
"""Integration test per il training e la valutazione del modello."""
import pytest
import pandas as pd
from src.models.trainer import create_pipeline, train_model, evaluate_model
@pytest.fixture
def sample_data():
"""Crea un dataset di test sintetico."""
texts = [
"amazing product love it", "terrible waste of money",
"great quality recommended", "horrible experience never again",
"excellent value for price", "poor quality disappointed",
"best purchase ever made", "worst product i bought",
"fantastic results happy", "awful terrible regret buying",
] * 10 # Ripetuto per avere abbastanza dati
sentiments = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0] * 10
return pd.Series(texts), pd.Series(sentiments)
class TestPipeline:
"""Test per la pipeline di training."""
def test_create_pipeline_logistic(self):
"""Verifica creazione pipeline con LogisticRegression."""
config = {"algorithm": "logistic_regression", "max_iter": 100}
pipeline = create_pipeline(config)
assert len(pipeline.steps) == 2
def test_create_pipeline_invalid_algorithm(self):
"""Verifica errore con algoritmo non supportato."""
with pytest.raises(ValueError, match="non supportato"):
create_pipeline({"algorithm": "invalid_algo"})
def test_train_and_evaluate(self, sample_data):
"""Test end-to-end: training + evaluation."""
X, y = sample_data
config = {
"algorithm": "logistic_regression",
"max_iter": 200,
"random_state": 42
}
pipeline = create_pipeline(config)
trained = train_model(pipeline, X[:80], y[:80])
metrics = evaluate_model(trained, X[80:], y[80:])
assert "accuracy" in metrics
assert "f1_score" in metrics
assert 0.0 <= metrics["accuracy"] <= 1.0
assert 0.0 <= metrics["f1_score"] <= 1.0
def test_model_deterministic(self, sample_data):
"""Verifica che il training sia deterministico con seed fisso."""
X, y = sample_data
config = {
"algorithm": "logistic_regression",
"max_iter": 200,
"random_state": 42
}
p1 = train_model(create_pipeline(config), X[:80], y[:80])
p2 = train_model(create_pipeline(config), X[:80], y[:80])
m1 = evaluate_model(p1, X[80:], y[80:])
m2 = evaluate_model(p2, X[80:], y[80:])
assert m1["accuracy"] == m2["accuracy"]
Smoke Test per il Serving
tests/test_api.py - Smoke test dell'API di serving
"""Smoke test per l'API di serving FastAPI."""
import pytest
from httpx import AsyncClient, ASGITransport
from src.serving.app import app
@pytest.fixture
def client():
"""Client HTTP per testare l'API."""
transport = ASGITransport(app=app)
return AsyncClient(transport=transport, base_url="http://test")
@pytest.mark.asyncio
async def test_health_endpoint(client):
"""Verifica che l'endpoint /health risponda 200."""
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
@pytest.mark.asyncio
async def test_predict_positive(client):
"""Verifica predizione per testo positivo."""
response = await client.post(
"/predict",
json={"text": "This product is amazing, I love it!"}
)
assert response.status_code == 200
data = response.json()
assert "prediction" in data
assert "confidence" in data
assert data["confidence"] > 0.0
@pytest.mark.asyncio
async def test_predict_empty_text(client):
"""Verifica errore con testo vuoto."""
response = await client.post(
"/predict",
json={"text": ""}
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_predict_batch(client):
"""Verifica predizione batch."""
response = await client.post(
"/predict/batch",
json={"texts": [
"Great product",
"Terrible experience"
]}
)
assert response.status_code == 200
data = response.json()
assert len(data["predictions"]) == 2
Aggiungere test al workflow GitHub Actions
# Aggiungere questo job prima del training nel workflow
tests:
name: Run Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest-asyncio pytest-cov
- name: Run unit tests
run: pytest tests/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v4
with:
file: coverage.xml
API di Serving con FastAPI
Il modello addestrato deve essere accessibile tramite un'API REST. FastAPI e la scelta
ideale per ML serving in Python: e veloce, ha validazione automatica degli input tramite
Pydantic e genera documentazione OpenAPI automaticamente.
src/serving/app.py - API FastAPI per serving
"""API di serving per il classificatore di sentiment."""
import os
import time
import joblib
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from src.serving.schemas import (
PredictionRequest, PredictionResponse,
BatchPredictionRequest, BatchPredictionResponse,
HealthResponse
)
from src.data.preprocessing import clean_text
# Variabili globali per il modello
model_pipeline = None
model_version = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Carica il modello all'avvio dell'applicazione."""
global model_pipeline, model_version
model_path = os.getenv("MODEL_PATH", "models/model.pkl")
if not Path(model_path).exists():
raise RuntimeError(f"Modello non trovato: {model_path}")
model_pipeline = joblib.load(model_path)
model_version = os.getenv("MODEL_VERSION", "unknown")
print(f"Modello caricato: v{model_version}")
yield
model_pipeline = None
app = FastAPI(
title="Sentiment Classifier API",
version="1.0.0",
lifespan=lifespan
)
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check dell'API."""
return HealthResponse(
status="healthy" if model_pipeline is not None else "unhealthy",
model_version=model_version or "not loaded"
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Predizione singola."""
if model_pipeline is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
cleaned = clean_text(request.text)
if not cleaned:
raise HTTPException(status_code=422, detail="Text is empty after cleaning")
prediction = model_pipeline.predict([cleaned])[0]
probabilities = model_pipeline.predict_proba([cleaned])[0]
confidence = float(max(probabilities))
latency_ms = (time.time() - start) * 1000
return PredictionResponse(
prediction=int(prediction),
confidence=confidence,
label="positive" if prediction == 1 else "negative",
latency_ms=round(latency_ms, 2)
)
@app.post("/predict/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
"""Predizione batch su più testi."""
if model_pipeline is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
cleaned_texts = [clean_text(t) for t in request.texts]
predictions = model_pipeline.predict(cleaned_texts)
probabilities = model_pipeline.predict_proba(cleaned_texts)
latency_ms = (time.time() - start) * 1000
results = []
for i, text in enumerate(request.texts):
results.append(PredictionResponse(
prediction=int(predictions[i]),
confidence=float(max(probabilities[i])),
label="positive" if predictions[i] == 1 else "negative",
latency_ms=0
))
return BatchPredictionResponse(
predictions=results,
total_latency_ms=round(latency_ms, 2)
)
src/serving/schemas.py - Pydantic schemas
"""Pydantic schemas per l'API di serving."""
from pydantic import BaseModel, Field
from typing import List
class PredictionRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=5000,
description="Testo della recensione")
class PredictionResponse(BaseModel):
prediction: int = Field(..., description="0=negativo, 1=positivo")
confidence: float = Field(..., ge=0.0, le=1.0,
description="Confidenza della predizione")
label: str = Field(..., description="Label leggibile")
latency_ms: float = Field(..., description="Latenza in millisecondi")
class BatchPredictionRequest(BaseModel):
texts: List[str] = Field(..., min_length=1, max_length=100,
description="Lista di testi")
class BatchPredictionResponse(BaseModel):
predictions: List[PredictionResponse]
total_latency_ms: float
class HealthResponse(BaseModel):
status: str
model_version: str
Monitoring Post-Deployment
Il deployment non e la fine della pipeline, ma l'inizio della fase più critica:
il monitoring in produzione. Un modello ML può degradare silenziosamente quando i
dati reali divergono dai dati di training. La pipeline deve includere health check
continui, logging delle predizioni e trigger di retraining automatico.
src/monitoring/health.py - Monitoring e alerting
"""Monitoring post-deployment per il modello ML."""
import time
import logging
from datetime import datetime, timedelta
from collections import deque
from typing import Dict, Optional
from dataclasses import dataclass, field
@dataclass
class PredictionLog:
"""Log di una singola predizione."""
timestamp: datetime
input_text: str
prediction: int
confidence: float
latency_ms: float
class ModelMonitor:
"""Monitora le performance del modello in produzione."""
def __init__(
self,
window_size: int = 1000,
min_confidence_threshold: float = 0.6,
max_latency_ms: float = 500.0,
drift_check_interval: int = 100
):
self.window_size = window_size
self.min_confidence = min_confidence_threshold
self.max_latency = max_latency_ms
self.drift_check_interval = drift_check_interval
self.predictions: deque = deque(maxlen=window_size)
self.alert_callbacks = []
self.prediction_count = 0
self.logger = logging.getLogger("model_monitor")
def log_prediction(self, log: PredictionLog) -> None:
"""Registra una predizione e verifica le metriche."""
self.predictions.append(log)
self.prediction_count += 1
# Check latenza
if log.latency_ms > self.max_latency:
self._alert(
"HIGH_LATENCY",
f"Latenza {log.latency_ms:.0f}ms supera soglia {self.max_latency}ms"
)
# Check confidenza bassa
if log.confidence < self.min_confidence:
self._alert(
"LOW_CONFIDENCE",
f"Confidenza {log.confidence:.2f} sotto soglia {self.min_confidence}"
)
# Check periodico per drift
if self.prediction_count % self.drift_check_interval == 0:
self._check_distribution_drift()
def get_metrics(self) -> Dict:
"""Restituisce le metriche correnti della finestra."""
if not self.predictions:
return {"status": "no_data"}
recent = list(self.predictions)
confidences = [p.confidence for p in recent]
latencies = [p.latency_ms for p in recent]
predictions = [p.prediction for p in recent]
positive_rate = sum(1 for p in predictions if p == 1) / len(predictions)
return {
"total_predictions": self.prediction_count,
"window_size": len(recent),
"avg_confidence": sum(confidences) / len(confidences),
"min_confidence": min(confidences),
"avg_latency_ms": sum(latencies) / len(latencies),
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"positive_rate": positive_rate,
"low_confidence_pct": sum(
1 for c in confidences if c < self.min_confidence
) / len(confidences),
}
def _check_distribution_drift(self) -> None:
"""Verifica se la distribuzione delle predizioni e cambiata."""
if len(self.predictions) < self.window_size:
return
recent = list(self.predictions)
half = len(recent) // 2
first_half = [p.prediction for p in recent[:half]]
second_half = [p.prediction for p in recent[half:]]
rate_first = sum(first_half) / len(first_half)
rate_second = sum(second_half) / len(second_half)
# Se la distribuzione cambia più del 15%, segnala drift
if abs(rate_first - rate_second) > 0.15:
self._alert(
"DISTRIBUTION_DRIFT",
f"Positive rate cambiato: {rate_first:.2f} -> {rate_second:.2f}"
)
def _alert(self, alert_type: str, message: str) -> None:
"""Invia un alert."""
self.logger.warning(f"[{alert_type}] {message}")
for callback in self.alert_callbacks:
callback(alert_type, message)
Metriche Chiave da Monitorare
Latenza (p50, p95, p99): tempo di risposta dell'API
Throughput: numero di predizioni al secondo
Distribuzione predizioni: cambiamenti nel rapporto positivo/negativo
Confidenza media: un calo indica che il modello e "incerto"
Error rate: percentuale di errori HTTP 5xx
Data drift: divergenza tra dati in produzione e dati di training
Costi e Ottimizzazione
GitHub Actions offre 2.000 minuti/mese gratuiti per repository pubblici e 500 minuti per
repository privati (piano Free). Il training ML può consumare rapidamente questi minuti.
Ecco come ottimizzare.
Per il training con GPU, i runner GitHub-hosted non bastano (non hanno GPU). La soluzione
e un self-hosted runner su una macchina con GPU. Questo elimina anche il costo per minuto
di GitHub Actions.
Setup self-hosted runner con GPU
# 1. Sulla macchina con GPU, scarica il runner
mkdir actions-runner && cd actions-runner
curl -o actions-runner.tar.gz -L \
https://github.com/actions/runner/releases/download/v2.311.0/actions-runner-linux-x64-2.311.0.tar.gz
tar xzf actions-runner.tar.gz
# 2. Configura il runner
./config.sh --url https://github.com/YOUR_ORG/YOUR_REPO \
--token YOUR_TOKEN \
--labels gpu,cuda12,ml-training
# 3. Installa come servizio
sudo ./svc.sh install
sudo ./svc.sh start
Usare il self-hosted runner nel workflow
training:
name: Train Model (GPU)
runs-on: [self-hosted, gpu, cuda12]
# Il job viene eseguito sulla macchina con GPU
steps:
- uses: actions/checkout@v4
- name: Train with GPU
run: |
nvidia-smi # Verifica GPU disponibile
python train.py --config config/training-gpu.yaml --device cuda
Stima Costi per Scenario
Scenario
Minuti/Mese
Costo GitHub Actions
Costo Totale
Prototipo (training manuale, scikit-learn)
~200
Gratuito (piano Free)
~0 EUR/mese
PMI (training settimanale, modello medio)
~800
~12 EUR/mese
~50 EUR/mese (con S3)
Scale-up (training giornaliero, deep learning)
~3.000
~48 EUR/mese
~200 EUR/mese (con GPU cloud)
Enterprise (multi-modello, continuous training)
~10.000+
Self-hosted runner
~500+ EUR/mese
Confronto Tool CI/CD per ML
GitHub Actions non e l'unica opzione. Ogni tool ha vantaggi specifici a seconda
del contesto. Ecco un confronto pratico per aiutarti a scegliere.
Caratteristica
GitHub Actions
GitLab CI
Jenkins
Argo Workflows
Setup
Zero (integrato)
Zero (integrato)
Server dedicato
Cluster Kubernetes
GPU Support
Self-hosted runner
Self-hosted runner
Plugin NVIDIA
Nativo (K8s GPU)
Costo (piccolo team)
Gratuito/basso
Gratuito/basso
Costo server
Costo cluster K8s
Parallelismo
Buono (matrix)
Buono
Ottimo
Eccellente (DAG)
Pipeline as Code
YAML
YAML
Groovy/YAML
YAML/Python SDK
Ecosistema ML
Marketplace vasto
Buono
Plugin
Cloud-native
Curva apprendimento
Bassa
Bassa
Media
Alta
Ideale per
Team piccoli/medi, repo GitHub
Team su GitLab, self-managed
Enterprise, on-premise
Team K8s, pipeline complesse
Quale Scegliere?
Inizio e prototipo: GitHub Actions - zero setup, integrazione nativa, gratis per repo pubblici
Team su GitLab: GitLab CI - integrazione nativa con GitLab, ottimo registry container
Team cloud-native con K8s: Argo Workflows - pipeline DAG, scaling nativo, ottimo per ML complesso
Setup Completo sotto 5.000 EUR/anno
Per una PMI che vuole implementare una pipeline ML CI/CD completa, e possibile
rimanere sotto i 5.000 EUR/anno utilizzando strumenti open-source e servizi cloud
con tier gratuiti o a basso costo. Ecco lo stack consigliato.
Componente
Tool
Costo/anno
Note
Repository + CI/CD
GitHub (Team)
~400 EUR
3.000 min/mese Actions inclusi
Data Storage
AWS S3
~120 EUR
~500 GB dataset, transfer incluso
Experiment Tracking
MLflow (self-hosted)
~0 EUR
Open-source, deployato su cloud VM
Model Registry
MLflow Model Registry
~0 EUR
Incluso in MLflow
Container Registry
GitHub Container Registry
~0 EUR
Incluso con GitHub
Hosting Modello
Cloud VM (e2-medium)
~500 EUR
Per serving FastAPI + MLflow server
Data Versioning
DVC
~0 EUR
Open-source, storage già contato sopra
Monitoring
Prometheus + Grafana
~0 EUR
Open-source, on stessa VM
Data Validation
Pandera / Great Expectations
~0 EUR
Open-source
GPU Training (occasionale)
Cloud GPU spot instance
~600 EUR
~50 ore/mese T4 spot
Totale stimato: ~1.620 EUR/anno - ben sotto i 5.000 EUR di budget,
con margine per scaling. Il costo maggiore e il GPU training: se si usano modelli
classici (scikit-learn, XGBoost) il costo GPU si azzera.
Attenzione ai Costi Nascosti
Il costo degli strumenti e solo una parte. Il costo più significativo e il tempo del team:
configurare la pipeline iniziale richiede circa 2-4 settimane per un ingegnere esperto.
La manutenzione continua richiede circa 2-4 ore/settimana. Calcola anche il costo del
data transfer tra servizi cloud (egress), che può crescere rapidamente con dataset grandi.
Conclusioni e Prossimi Passi
In questo articolo abbiamo costruito una pipeline CI/CD completa per machine learning,
dal Dockerfile multi-stage al workflow GitHub Actions con data validation, training,
evaluation e deployment automatico. I concetti chiave da ricordare sono:
CI/CD per ML gestisce tre artefatti (codice, dati, modello) e introduce il Continuous Training
Docker multi-stage separa build, training e serving per immagini ottimizzate
GitHub Actions orchestra l'intera pipeline con job dipendenti e condizionali
Data validation e il primo gate: dati corrotti producono modelli inutilizzabili
Il Model Registry gestisce le versioni e la promozione dei modelli
Il testing ML copre tre livelli: unit, integration, smoke test
Il monitoring dopo il deployment e critico per rilevare drift e degradazione
I costi sono gestibili: con strumenti open-source si resta sotto 2.000 EUR/anno
La pipeline che abbiamo costruito corrisponde al Level 2 del modello di maturita
MLOps di Google: automazione completa del training e del deployment, con
validazione e monitoring integrati. Partendo da questa base, nel prossimo articolo
approfondiremo MLflow per l'experiment tracking avanzato, il model registry
e la gestione degli artefatti.
Roadmap della Serie
Articolo 1: MLOps: Da Esperimento a Produzione (completato)
Articolo 2: Pipeline ML con CI/CD: GitHub Actions + Docker (questo articolo)
Articolo 3: MLflow Deep Dive - Experiment Tracking e Model Registry
Articolo 4: DVC - Versionamento dei Dati per ML
Articolo 5: Model Serving Scalabile con FastAPI e Docker
Articolo 6: Kubernetes per ML: Orchestrazione e Scaling
Articolo 7: Monitoring Avanzato: Data Drift ed Evidently AI
Articolo 8: A/B Testing per Modelli ML in Produzione
Articolo 9: Governance, Compliance e ML Responsabile
Articolo 10: Case Study: Pipeline MLOps End-to-End