02 — Potok ML z CI/CD: Akcje GitHub + Docker
W pierwszym artykule z tej serii zobaczyliśmy, dlaczego 85% projektów ML nigdy nie trafia do produkcji i jak MLOps rozwiązuje ten problem. Zamieniliśmy monolityczny notatnik w potok modułowe i konfigurowalne. Nadszedł czas na kolejny krok: zautomatyzować całość potok z CI/CD, tak aby wszelkie zmiany w kodzie, danych lub konfiguracji automatycznie wyzwalaj szkolenie, walidację i wdrażanie modelu.
W tym artykule zbudujemy kompletny potok CI/CD do wykorzystania w uczeniu maszynowym Akcje GitHuba jako orkiestrator i Doker jako czas wykonania wykonanie. Nie będziemy ograniczać się do teorii - stworzymy działający projekt z klasyfikatorem sentymentu, wraz z wieloetapowym plikiem Dockerfile, przepływem pracy YAML, walidacją danych, rejestrem modeli i automatyczne wdrażanie.
Czego się nauczysz
- Dlaczego CI/CD dla ML różni się od tradycyjnego oprogramowania CI/CD?
- Jak zaprojektować architekturę kompleksowego potoku ML
- Jak tworzyć wieloetapowe pliki Dockerfile zoptymalizowane pod kątem szkolenia i udostępniania
- Jak napisać kompletne przepływy pracy GitHub Actions dla ML
- Jak zintegrować walidację danych, rejestrację modeli i automatyczne wdrażanie
- Jak zarządzać wersjonowaniem danych za pomocą DVC w przygotowaniu
- Jak wdrożyć testy specyficzne dla ML (jednostkowe, integracyjne, dymne)
- Jak monitorować model po wdrożeniu
- Jak zoptymalizować koszty dzięki buforowaniu i samodzielnym modułom uruchamiającym
- Jak wybrać odpowiednie narzędzie CI/CD dla swojego zespołu
dlaczego CI/CD dla ML i innych
Jeśli pochodzisz ze świata tradycyjnego tworzenia oprogramowania, możesz pomyśleć, że po prostu aplikujesz te same praktyki CI/CD w projektach ML. W rzeczywistości uczenie maszynowe wprowadza złożoność wyjątkowe, wymagające specyficznego podejścia. Zasadnicza różnica polega na tym, że w oprogramowaniu Tradycyjnie CI/CD zarządza tylko jednym artefaktem (kodem), podczas gdy w ML musi zarządzać kilkoma trzy na raz: kod, dane i model.
Trzy artefakty ML
W tradycyjnym oprogramowaniu, jeśli kod się nie zmienia, dane wyjściowe się nie zmieniają. W ML, nawet z identyczny kod, zmiana danych skutkuje powstaniem innego modelu. To oznacza, że rurociąg CI/CD musi śledzić i weryfikować trzy niezależne wymiary.
| Rozmiar | Tradycyjny CI/CD | CI/CD dla ML |
|---|---|---|
| Kod | Git Push wyzwala kompilację + test | Git Push wyzwala szkolenie + ocenę |
| Dane | Nie dotyczy | Nowe dane powodują przekwalifikowanie |
| Model | Nie dotyczy | Nowy model wymaga walidacji + promocji |
| Konfiguracja | Flagi funkcji, env vars | Hiperparametry, zestawy cech, progi metryki |
| Środowisko | System operacyjny + biblioteki | System operacyjny + biblioteki + sterowniki GPU + wersja CUDA |
| Walidacja | Testy pozytywne/niezaliczone | Wskaźniki powyżej/poniżej progu + porównanie z modelem w produkcji |
| Zastosowanie | Wdróż lub wycofaj | Stopniowe wdrażanie + testy A/B + monitorowanie dryfu |
Szkolenie ciągłe: kluczowa koncepcja
CI/CD for ML wprowadza koncepcję, która nie istnieje w tradycyjnym oprogramowaniu: the Szkolenie ciągłe (CT). Jak również ciągła integracja i ciągła Po wdrożeniu CT zapewnia automatyczne przeszkolenie modelu, gdy:
- Nadchodzą nowe dane: zbiór danych jest aktualizowany o nowe obserwacje
- Zmień kod: przetwarzanie wstępne lub algorytm uległy zmianie
- Pogarszają wskaźniki: monitorowanie wykrywa dryf danych lub spadki wydajności
- Wygasa licznik czasu: zaplanowane przekwalifikowanie (np. cotygodniowe) jest aktywowane
Częsty błąd: CI/CD bez CT
Wiele zespołów wdraża CI/CD dla kodu ML, ale zapomina o ciągłym szkoleniu. Rezultatem jest model, który jest wdrażany raz i nigdy więcej nie aktualizowany, z biegiem czasu ulegają cichej degradacji w miarę rozbieżności danych w produkcji z danych treningowych. Rurociąg bez przekładnika prądowego jest jak samochód bez konserwacji: Działa dopóki się nie zepsuje.
Architektura rurociągów ML
Przed napisaniem kodu projektujemy kompletną architekturę potoku. Każda faza ma określone wejścia i wyjścia, a awaria jednej fazy blokuje kolejne. To Podejście „szybko awaria” zapewnia, że do produkcji trafiają wyłącznie sprawdzone modele.
+------------------+ +------------------+ +------------------+
| DATA INGESTION |---->| PREPROCESSING |---->| TRAINING |
| | | | | |
| - Pull dati DVC | | - Pulizia | | - Train modello |
| - Validazione | | - Feature eng. | | - Log metriche |
| - Schema check | | - Split train/ | | - Log parametri |
| | | test/val | | - Salva artefatti|
+------------------+ +------------------+ +------------------+
| |
| (trigger: dati |
| nuovi/schedule) v
| +------------------+
| | EVALUATION |
| | |
| | - Metriche |
| | - Confronto con |
| | produzione |
| | - Gate: soglie |
| +------------------+
| |
| (se metriche > threshold)
| v
+------------------+ +------------------+ +------------------+
| MONITORING |<----| SMOKE TEST |<----| DEPLOYMENT |
| | | | | |
| - Health check | | - Test endpoint | | - Push registry |
| - Drift detect. | | - Predizione | | - Stage/Prod |
| - Alert | | di prova | | - Rollback ready |
| - Trigger retrain| | - Latenza check | | |
+------------------+ +------------------+ +------------------+
Każdy blok odpowiada etapowi przepływu pracy akcji GitHub. Zobaczmy teraz szczegółowo opisz, jak wdrożyć każdą fazę, zaczynając od konteneryzacji za pomocą Dockera.
Docker do uczenia maszynowego
Docker rozwiązuje jeden z najbardziej frustrujących problemów w ML: „działa na mojej maszynie”. Konteneryzując środowisko szkoleniowe i obsługujące, zapewniamy, że kod generuje plik takie same wyniki niezależnie od tego, gdzie zostanie uruchomiony: na laptopie analityka danych, w module CI/CD i w produkcji. W przypadku ML Docker wymaga szczególnej uwagi: obrazy zwykle tego wymagają być bardzo duży (biblioteki naukowe + sterowniki GPU), a kompilacja może być powolna.
Obrazy podstawowe dla ML
Wybór podstawy obrazu ma kluczowe znaczenie dla rozmiaru i kompatybilności. Oto opcje najważniejsze i kiedy ich używać.
| Obraz bazowy | Rozmiar | Używać | Kiedy to wybrać |
|---|---|---|---|
| python: 3.11-szczupły | ~120MB | Szkolenie/obsługa procesora | Modele Scikit-learn, XGBoost, lekkie serwowanie |
| python:3.11-mól książkowy | ~900MB | Szkolenie z narzędzi do budowania | Zależności wymagające kompilacji C/C++ |
| nvidia/cuda: 12.1-runtime | ~3,5 GB | Wnioskowanie z GPU | Obsługa modeli głębokiego uczenia się |
| nvidia/cuda: wersja 12.1 | ~5,2 GB | Trening GPU | Szkolenie PyTorch/TensorFlow z CUDA |
| pytorch/pytorch:2.1.0-cuda12.1 | ~6 GB | Szkolenie/obsługa PyTorch | Projekty PyTorch, które chcą uniknąć ręcznej konfiguracji CUDA |
Wieloetapowy plik Dockerfile do szkolenia i obsługi
Schemat wieloetapowy ma fundamentalne znaczenie dla ML. Możemy mieć dwa oddzielne etapy kompletne środowisko kompilacji (z kompilatorami i narzędziami do kompilacji) oraz ostateczny obraz usprawniony, który zawiera tylko niezbędny czas działania. Zmniejsza to rozmiar obrazu finalnie do 60%.
# ============================================
# Stage 1: Builder - installa dipendenze
# ============================================
FROM python:3.11-slim AS builder
WORKDIR /build
# Installa build tools necessari per compilare dipendenze native
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Copia e installa dipendenze in un virtual environment
COPY requirements.txt .
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# ============================================
# Stage 2: Training - esegue il training
# ============================================
FROM python:3.11-slim AS trainer
WORKDIR /app
# Copia virtual environment dallo stage builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copia codice sorgente
COPY src/ ./src/
COPY config/ ./config/
COPY train.py .
COPY evaluate.py .
# Entrypoint per training
ENTRYPOINT ["python", "train.py"]
# ============================================
# Stage 3: Serving - API di produzione
# ============================================
FROM python:3.11-slim AS serving
WORKDIR /app
# Utente non-root per sicurezza
RUN useradd --create-home appuser
# Copia virtual environment dallo stage builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copia solo il codice necessario per serving
COPY src/serving/ ./src/serving/
COPY src/preprocessing/ ./src/preprocessing/
# Healthcheck endpoint
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"
# Switch a utente non-root
USER appuser
# Porta di default
EXPOSE 8000
# Entrypoint per serving
ENTRYPOINT ["uvicorn", "src.serving.app:app", "--host", "0.0.0.0", "--port", "8000"]
Dlaczego wieloetapowy dla ML?
- Bezpieczeństwo: wyświetlany obraz nie zawiera kompilatorów ani narzędzi do kompilacji
- Rozmiar: etap obsługi jest znacznie lżejszy (~300 MB w porównaniu z ~1,2 GB)
- Kryjówka: zależności zmieniają się rzadziej niż kod, korzystając z pamięci podręcznej warstwy
- Elastyczność: możesz zbudować tylko etap szkolenia lub tylko etap serwowania
Optymalizacja pamięci podręcznej warstwy
Kolejność instrukcji COPY w pliku Dockerfile ma kluczowe znaczenie dla pamięci podręcznej. Zależności Pythona
rzadko się zmieniają, kod źródłowy zmienia się często. Najpierw kopiując plik
requirements.txt a następnie kod, unikamy ponownej instalacji zależności
przy każdej zmianie kodu.
# Dati e modelli (gestiti da DVC, non da Docker)
data/
models/
*.pkl
*.h5
*.pt
# Ambiente di sviluppo
.venv/
__pycache__/
*.pyc
.pytest_cache/
.mypy_cache/
# Git e CI
.git/
.github/
.dvc/cache/
# IDE e editor
.vscode/
.idea/
*.swp
# Documentazione
docs/
*.md
LICENSE
Docker z obsługą GPU
Aby trenować modele głębokiego uczenia się, potrzebujesz obsługi procesora GPU w kontenerze. Docker obsługuje Procesory graficzne NVIDIA za pośrednictwem zestawu narzędzi NVIDIA Container Toolkit. Instalacja wymaga sterownika NVIDIA na hoście i zainstalowanym zestawie narzędzi.
# Base image con CUDA runtime
FROM nvidia/cuda:12.1.1-runtime-ubuntu22.04 AS gpu-trainer
WORKDIR /app
# Installa Python e dipendenze di sistema
RUN apt-get update && apt-get install -y --no-install-recommends \
python3.11 \
python3.11-venv \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Virtual environment
RUN python3.11 -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Dipendenze PyTorch con CUDA
COPY requirements-gpu.txt .
RUN pip install --no-cache-dir -r requirements-gpu.txt
COPY src/ ./src/
COPY train.py .
# Variabili ambiente per CUDA
ENV NVIDIA_VISIBLE_DEVICES=all
ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility
ENTRYPOINT ["python", "train.py"]
# Build dell'immagine GPU
docker build -f Dockerfile.gpu -t ml-trainer:gpu .
# Esecuzione con accesso GPU
docker run --gpus all \
-v $(pwd)/data:/app/data \
-v $(pwd)/models:/app/models \
ml-trainer:gpu \
--config config/training.yaml
Działania GitHub dotyczące uczenia maszynowego
GitHub Actions to usługa CI/CD zintegrowana z GitHub, która uruchamia zautomatyzowane przepływy pracy w odpowiedzi na zdarzenia (push, pull request, harmonogram, ręczna wysyłka). W przypadku ML to zapewnia istotne zalety: natywna integracja z repozytorium Git, marketplace z akcjami predefiniowane, zarządzanie sekretami danych uwierzytelniających i do 2000 bezpłatnych minut/miesiąc dla repozytoriów publicznych.
Struktura przepływu pracy ML
Przepływ pracy GitHub Actions dla ML ma specyficzną strukturę: wiele pasujących zadań do etapów potoku, z wyraźnymi zależnościami między zadaniami i warunkami wykonania w oparciu o metryki modelu.
name: ML Pipeline - Train, Evaluate, Deploy
on:
# Trigger su push al branch main (codice o config)
push:
branches: [main]
paths:
- 'src/**'
- 'config/**'
- 'requirements.txt'
- 'train.py'
- 'evaluate.py'
# Trigger schedulato per retraining periodico
schedule:
- cron: '0 6 * * 1' # Ogni lunedi alle 6:00 UTC
# Trigger manuale con parametri
workflow_dispatch:
inputs:
force_deploy:
description: 'Forza il deployment anche se le metriche non migliorano'
required: false
default: 'false'
type: choice
options:
- 'true'
- 'false'
training_config:
description: 'File di configurazione per il training'
required: false
default: 'config/training.yaml'
env:
PYTHON_VERSION: '3.11'
DOCKER_REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/ml-model
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
jobs:
# ============================================
# Job 1: Data Validation
# ============================================
data-validation:
name: Validate Data Quality
runs-on: ubuntu-latest
outputs:
data_valid: ${{ steps.validate.outputs.valid }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Setup DVC
uses: iterative/setup-dvc@v2
- name: Pull data from DVC
run: dvc pull
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Validate data quality
id: validate
run: |
python -m src.data.validate_data \
--data-path data/raw/reviews.csv \
--schema-path config/data_schema.yaml
echo "valid=true" >> $GITHUB_OUTPUT
# ============================================
# Job 2: Model Training
# ============================================
training:
name: Train Model
needs: data-validation
if: needs.data-validation.outputs.data_valid == 'true'
runs-on: ubuntu-latest
outputs:
model_version: ${{ steps.train.outputs.model_version }}
run_id: ${{ steps.train.outputs.run_id }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Setup DVC and pull data
run: |
pip install dvc[s3]
dvc pull
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Train model
id: train
run: |
python train.py \
--config ${{ github.event.inputs.training_config || 'config/training.yaml' }} \
--output-dir models/
echo "model_version=$(cat models/version.txt)" >> $GITHUB_OUTPUT
echo "run_id=$(cat models/run_id.txt)" >> $GITHUB_OUTPUT
env:
MLFLOW_TRACKING_URI: ${{ env.MLFLOW_TRACKING_URI }}
MLFLOW_TRACKING_USERNAME: ${{ secrets.MLFLOW_USERNAME }}
MLFLOW_TRACKING_PASSWORD: ${{ secrets.MLFLOW_PASSWORD }}
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: trained-model
path: models/
retention-days: 30
# ============================================
# Job 3: Model Evaluation
# ============================================
evaluation:
name: Evaluate Model
needs: training
runs-on: ubuntu-latest
outputs:
metrics_pass: ${{ steps.evaluate.outputs.metrics_pass }}
accuracy: ${{ steps.evaluate.outputs.accuracy }}
f1_score: ${{ steps.evaluate.outputs.f1_score }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: trained-model
path: models/
- name: Setup DVC and pull test data
run: |
pip install dvc[s3]
dvc pull data/test/
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Evaluate model
id: evaluate
run: |
python evaluate.py \
--model-path models/model.pkl \
--test-data data/test/reviews_test.csv \
--thresholds config/thresholds.yaml \
--output metrics/report.json
echo "accuracy=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"accuracy\"])')" >> $GITHUB_OUTPUT
echo "f1_score=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"f1_score\"])')" >> $GITHUB_OUTPUT
echo "metrics_pass=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"pass\"])')" >> $GITHUB_OUTPUT
- name: Post metrics as PR comment
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const metrics = JSON.parse(fs.readFileSync('metrics/report.json'));
const body = `## Model Evaluation Results
| Metric | Value | Threshold | Status |
|--------|-------|-----------|--------|
| Accuracy | ${metrics.accuracy.toFixed(4)} | ${metrics.thresholds.accuracy} | ${metrics.accuracy >= metrics.thresholds.accuracy ? 'PASS' : 'FAIL'} |
| F1 Score | ${metrics.f1_score.toFixed(4)} | ${metrics.thresholds.f1_score} | ${metrics.f1_score >= metrics.thresholds.f1_score ? 'PASS' : 'FAIL'} |
| AUC-ROC | ${metrics.auc_roc.toFixed(4)} | ${metrics.thresholds.auc_roc} | ${metrics.auc_roc >= metrics.thresholds.auc_roc ? 'PASS' : 'FAIL'} |`;
github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: body
});
- name: Upload evaluation report
uses: actions/upload-artifact@v4
with:
name: evaluation-report
path: metrics/
# ============================================
# Job 4: Build and Push Docker Image
# ============================================
build-image:
name: Build Docker Image
needs: [evaluation]
if: |
needs.evaluation.outputs.metrics_pass == 'true' ||
github.event.inputs.force_deploy == 'true'
runs-on: ubuntu-latest
outputs:
image_tag: ${{ steps.meta.outputs.tags }}
steps:
- uses: actions/checkout@v4
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: trained-model
path: models/
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=sha,prefix=
type=raw,value=latest
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.DOCKER_REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push serving image
uses: docker/build-push-action@v5
with:
context: .
target: serving
push: true
tags: ${{ steps.meta.outputs.tags }}
cache-from: type=gha
cache-to: type=gha,mode=max
# ============================================
# Job 5: Deploy to Staging + Smoke Test
# ============================================
deploy:
name: Deploy and Smoke Test
needs: [build-image, training]
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
echo "Deploying model version ${{ needs.training.outputs.model_version }}"
# Qui il comando di deploy reale (kubectl, docker-compose, etc.)
- name: Smoke test
run: |
# Attendi che il servizio sia pronto
for i in $(seq 1 30); do
if curl -sf http://staging:8000/health; then
echo "Service is ready"
break
fi
echo "Waiting for service... attempt $i"
sleep 5
done
# Test di predizione
RESPONSE=$(curl -sf -X POST http://staging:8000/predict \
-H "Content-Type: application/json" \
-d '{"text": "This product is amazing, I love it!"}')
echo "Prediction response: $RESPONSE"
# Verifica che la risposta sia valida
echo "$RESPONSE" | python -c "
import sys, json
data = json.load(sys.stdin)
assert 'prediction' in data, 'Missing prediction field'
assert 'confidence' in data, 'Missing confidence field'
assert data['confidence'] > 0.5, 'Low confidence'
print('Smoke test PASSED')
"
Tajemnice i bezpieczeństwo w rurociągu
Nigdy nie wprowadzaj poświadczeń bezpośrednio do przepływu pracy YAML. Zawsze używaj sekretów GitHub dla kluczy AWS, tokenów MLflow, poświadczeń rejestru Docker i wszelkich innych wrażliwe informacje. Skonfiguruj wpisy tajne w Ustawienia > Sekrety i zmienne > Akcje w repozytorium GitHub.
Przykładowy projekt: Klasyfikator nastrojów
Połączmy wszystko w jeden konkretny projekt. Zbudujemy klasyfikator nastrojów do recenzji produktów, z pełnym procesem CI/CD. Projekt wykorzystuje scikit-learn dla prostota, ale architektura dotyczy identycznie modeli PyTorch czy TensorFlow.
Struktura projektu
sentiment-classifier/
src/
data/
__init__.py
preprocessing.py # Pulizia e trasformazione testi
validate_data.py # Validazione schema e qualità
models/
__init__.py
trainer.py # Training del classificatore
serving/
__init__.py
app.py # FastAPI application
schemas.py # Pydantic schemas
monitoring/
__init__.py
health.py # Health checks
tests/
test_preprocessing.py # Unit test preprocessing
test_trainer.py # Unit test training
test_api.py # Integration test API
config/
training.yaml # Configurazione training
thresholds.yaml # Soglie metriche
data_schema.yaml # Schema validazione dati
train.py # Entrypoint training
evaluate.py # Entrypoint evaluation
Dockerfile # Multi-stage build
requirements.txt # Dipendenze Python
.github/
workflows/
ml-pipeline.yml # Pipeline CI/CD
.dvc/ # Configurazione DVC
dvc.yaml # Pipeline DVC
dvc.lock # Lock file DVC
Skrypty szkoleniowe
"""Script principale di training per il classificatore di sentiment."""
import argparse
import yaml
import mlflow
import mlflow.sklearn
from pathlib import Path
from datetime import datetime
from src.data.preprocessing import load_and_preprocess_data, split_dataset
from src.models.trainer import create_pipeline, train_model
def parse_args():
"""Parse degli argomenti da riga di comando."""
parser = argparse.ArgumentParser(
description="Train sentiment classifier"
)
parser.add_argument(
"--config",
type=str,
default="config/training.yaml",
help="Path al file di configurazione"
)
parser.add_argument(
"--output-dir",
type=str,
default="models/",
help="Directory per salvare il modello"
)
parser.add_argument(
"--experiment-name",
type=str,
default="sentiment-classifier",
help="Nome dell'esperimento MLflow"
)
return parser.parse_args()
def main():
"""Esegue la pipeline di training completa."""
args = parse_args()
# 1. Carica configurazione
with open(args.config) as f:
config = yaml.safe_load(f)
# 2. Setup MLflow
mlflow.set_experiment(args.experiment_name)
with mlflow.start_run(run_name=f"train-{datetime.now().strftime('%Y%m%d-%H%M%S')}") as run:
# 3. Log dei parametri
mlflow.log_params(config["model"])
mlflow.log_param("data_path", config["data"]["train_path"])
mlflow.log_param("test_size", config["data"]["test_size"])
# 4. Caricamento e preprocessing dati
print("[1/5] Caricamento e preprocessing dati...")
X, y = load_and_preprocess_data(config["data"]["train_path"])
# 5. Split dataset
print("[2/5] Split train/validation...")
X_train, X_val, y_train, y_val = split_dataset(
X, y,
test_size=config["data"]["test_size"],
random_state=config["data"]["random_state"]
)
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("val_size", len(X_val))
# 6. Crea pipeline di preprocessing + modello
print("[3/5] Creazione pipeline ML...")
pipeline = create_pipeline(config["model"])
# 7. Training
print("[4/5] Training in corso...")
trained_pipeline = train_model(pipeline, X_train, y_train)
# 8. Valutazione su validation set
print("[5/5] Valutazione...")
from src.models.trainer import evaluate_model
metrics = evaluate_model(trained_pipeline, X_val, y_val)
# 9. Log metriche in MLflow
mlflow.log_metrics(metrics)
# 10. Salva modello
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
import joblib
model_path = output_dir / "model.pkl"
joblib.dump(trained_pipeline, model_path)
# Log modello in MLflow con signature
from mlflow.models.signature import infer_signature
signature = infer_signature(X_train[:5], trained_pipeline.predict(X_train[:5]))
mlflow.sklearn.log_model(
trained_pipeline,
"model",
signature=signature,
registered_model_name="sentiment-classifier"
)
# Salva versione e run_id per il CI/CD
(output_dir / "version.txt").write_text(run.info.run_id[:8])
(output_dir / "run_id.txt").write_text(run.info.run_id)
print(f"\nTraining completato!")
print(f" Run ID: {run.info.run_id}")
for name, value in metrics.items():
print(f" {name}: {value:.4f}")
if __name__ == "__main__":
main()
Moduł wstępnego przetwarzania
"""Modulo per il preprocessing dei dati testuali."""
import re
import pandas as pd
from typing import Tuple
from sklearn.model_selection import train_test_split
def clean_text(text: str) -> str:
"""Pulisce un singolo testo rimuovendo HTML, caratteri speciali e spazi extra."""
if not isinstance(text, str):
return ""
# Rimuovi tag HTML
text = re.sub(r'<[^>]+>', '', text)
# Rimuovi URL
text = re.sub(r'http\S+|www\.\S+', '', text)
# Rimuovi caratteri speciali (mantieni lettere, numeri, spazi)
text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
# Normalizza spazi
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
def load_and_preprocess_data(data_path: str) -> Tuple[pd.Series, pd.Series]:
"""Carica e preprocess il dataset di recensioni."""
df = pd.read_csv(data_path)
# Validazione colonne richieste
required_cols = ["review_text", "sentiment"]
missing = [c for c in required_cols if c not in df.columns]
if missing:
raise ValueError(f"Colonne mancanti nel dataset: {missing}")
# Rimuovi righe con valori mancanti
df = df.dropna(subset=required_cols)
# Pulisci testi
df["clean_text"] = df["review_text"].apply(clean_text)
# Rimuovi testi vuoti dopo pulizia
df = df[df["clean_text"].str.len() > 0]
return df["clean_text"], df["sentiment"]
def split_dataset(
X: pd.Series,
y: pd.Series,
test_size: float = 0.2,
random_state: int = 42
) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series]:
"""Split stratificato del dataset."""
return train_test_split(
X, y,
test_size=test_size,
random_state=random_state,
stratify=y
)
Moduł szkoleniowy
"""Modulo per la creazione, il training e la valutazione del modello."""
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
accuracy_score, f1_score, precision_score,
recall_score, roc_auc_score
)
from typing import Dict, Any
import pandas as pd
MODELS = {
"logistic_regression": LogisticRegression,
"random_forest": RandomForestClassifier,
}
def create_pipeline(model_config: Dict[str, Any]) -> Pipeline:
"""Crea una pipeline sklearn con TF-IDF + classificatore."""
algorithm = model_config.get("algorithm", "logistic_regression")
model_class = MODELS.get(algorithm)
if model_class is None:
raise ValueError(
f"Algoritmo non supportato: {algorithm}. "
f"Supportati: {list(MODELS.keys())}"
)
# Parametri specifici del modello
model_params = {
k: v for k, v in model_config.items()
if k not in ("algorithm", "tfidf")
}
# Parametri TF-IDF
tfidf_params = model_config.get("tfidf", {})
return Pipeline([
("tfidf", TfidfVectorizer(
max_features=tfidf_params.get("max_features", 10000),
ngram_range=tuple(tfidf_params.get("ngram_range", [1, 2])),
min_df=tfidf_params.get("min_df", 2),
max_df=tfidf_params.get("max_df", 0.95),
)),
("classifier", model_class(**model_params)),
])
def train_model(
pipeline: Pipeline,
X_train: pd.Series,
y_train: pd.Series
) -> Pipeline:
"""Addestra la pipeline sul training set."""
pipeline.fit(X_train, y_train)
return pipeline
def evaluate_model(
pipeline: Pipeline,
X_test: pd.Series,
y_test: pd.Series
) -> Dict[str, float]:
"""Valuta il modello e restituisce tutte le metriche."""
y_pred = pipeline.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred, average="weighted"),
"precision": precision_score(y_test, y_pred, average="weighted"),
"recall": recall_score(y_test, y_pred, average="weighted"),
}
# AUC-ROC solo per classificazione binaria
if len(set(y_test)) == 2:
y_proba = pipeline.predict_proba(X_test)[:, 1]
metrics["auc_roc"] = roc_auc_score(y_test, y_proba)
return metrics
Skrypt oceny
"""Script di valutazione del modello con confronto soglie."""
import argparse
import json
import yaml
import joblib
import pandas as pd
from pathlib import Path
from src.data.preprocessing import clean_text
from src.models.trainer import evaluate_model
def parse_args():
parser = argparse.ArgumentParser(description="Evaluate trained model")
parser.add_argument("--model-path", required=True, help="Path al modello .pkl")
parser.add_argument("--test-data", required=True, help="Path ai dati di test")
parser.add_argument("--thresholds", required=True, help="Path al file soglie YAML")
parser.add_argument("--output", required=True, help="Path per il report JSON")
return parser.parse_args()
def main():
args = parse_args()
# 1. Carica modello e dati
pipeline = joblib.load(args.model_path)
df = pd.read_csv(args.test_data)
df["clean_text"] = df["review_text"].apply(clean_text)
X_test = df["clean_text"]
y_test = df["sentiment"]
# 2. Valuta
metrics = evaluate_model(pipeline, X_test, y_test)
# 3. Confronta con soglie
with open(args.thresholds) as f:
thresholds = yaml.safe_load(f)["thresholds"]
all_pass = True
results = {}
for metric_name, threshold_value in thresholds.items():
actual = metrics.get(metric_name, 0.0)
passed = actual >= threshold_value
if not passed:
all_pass = False
results[metric_name] = {
"value": actual,
"threshold": threshold_value,
"pass": passed
}
# 4. Genera report
report = {
**metrics,
"thresholds": thresholds,
"details": results,
"pass": all_pass
}
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(report, f, indent=2)
# 5. Stampa risultati
print("\n=== Evaluation Report ===")
for name, detail in results.items():
status = "PASS" if detail["pass"] else "FAIL"
print(f" {name}: {detail['value']:.4f} (threshold: {detail['threshold']}) [{status}]")
print(f"\nOverall: {'PASS' if all_pass else 'FAIL'}")
# Exit code non-zero se le metriche non passano
if not all_pass:
print("\nWARNING: Le metriche non raggiungono le soglie minime!")
# Non usiamo exit(1) perchè il workflow legge l'output
if __name__ == "__main__":
main()
Konfiguracja treningu i progów
# Configurazione pipeline di training
data:
train_path: "data/raw/reviews.csv"
test_size: 0.2
random_state: 42
model:
algorithm: "logistic_regression"
max_iter: 1000
C: 1.0
random_state: 42
tfidf:
max_features: 15000
ngram_range: [1, 2]
min_df: 3
max_df: 0.9
mlflow:
experiment_name: "sentiment-classifier"
registered_model_name: "sentiment-classifier"
# Soglie minime per approvare il deployment
thresholds:
accuracy: 0.85
f1_score: 0.83
precision: 0.80
recall: 0.80
auc_roc: 0.90
# Confronto con modello in produzione
comparison:
# Il nuovo modello deve migliorare almeno dello 0.5%
min_improvement: 0.005
# Metriche su cui e richiesto il miglioramento
compare_metrics:
- f1_score
- auc_roc
Wymagania dotyczące plików
# Core ML
scikit-learn==1.4.0
pandas==2.2.0
numpy==1.26.3
# NLP preprocessing
nltk==3.8.1
# Experiment tracking
mlflow==2.10.0
# Model serving
fastapi==0.109.0
uvicorn==0.27.0
pydantic==2.5.3
# Data validation
pandera==0.18.0
great-expectations==0.18.8
# Data versioning
dvc[s3]==3.42.0
# Configuration
pyyaml==6.0.1
python-dotenv==1.0.1
# Serialization
joblib==1.3.2
# Testing
pytest==7.4.4
httpx==0.26.0
Walidacja danych w potoku
Przed szkoleniem modelu musimy upewnić się, że dane są prawidłowe. Model wyszkolony na uszkodzonych danych daje nieprzewidywalne wyniki. Data walidacji i pierwsza brama naszego rurociągu: jeśli dane nie przejdą kontroli, szkolenie to nie odchodzi.
"""Validazione della qualità dei dati con Pandera."""
import argparse
import sys
import yaml
import pandas as pd
import pandera as pa
from pandera import Column, Check, DataFrameSchema
def build_schema(schema_config: dict) -> DataFrameSchema:
"""Costruisce uno schema Pandera dalla configurazione YAML."""
columns = {}
for col_name, col_spec in schema_config["columns"].items():
checks = []
if "min_length" in col_spec:
checks.append(Check.str_length(min_value=col_spec["min_length"]))
if "max_length" in col_spec:
checks.append(Check.str_length(max_value=col_spec["max_length"]))
if "allowed_values" in col_spec:
checks.append(Check.isin(col_spec["allowed_values"]))
if "min_value" in col_spec:
checks.append(Check.greater_than_or_equal_to(col_spec["min_value"]))
if "max_value" in col_spec:
checks.append(Check.less_than_or_equal_to(col_spec["max_value"]))
columns[col_name] = Column(
dtype=col_spec.get("dtype", "object"),
nullable=col_spec.get("nullable", False),
checks=checks if checks else None
)
return DataFrameSchema(
columns=columns,
coerce=True,
strict=schema_config.get("strict", False)
)
def validate_data(data_path: str, schema_path: str) -> bool:
"""Valida il dataset contro lo schema definito."""
# Carica schema
with open(schema_path) as f:
schema_config = yaml.safe_load(f)
schema = build_schema(schema_config)
# Carica dati
df = pd.read_csv(data_path)
# Controlla dimensione minima
min_rows = schema_config.get("min_rows", 100)
if len(df) < min_rows:
print(f"FAIL: Dataset ha {len(df)} righe, minimo richiesto: {min_rows}")
return False
# Controlla duplicati
max_duplicates_pct = schema_config.get("max_duplicates_pct", 0.05)
duplicates_pct = df.duplicated().mean()
if duplicates_pct > max_duplicates_pct:
print(f"FAIL: {duplicates_pct:.1%} duplicati (max: {max_duplicates_pct:.1%})")
return False
# Valida schema
try:
schema.validate(df, lazy=True)
print(f"PASS: Dataset valido ({len(df)} righe, {len(df.columns)} colonne)")
return True
except pa.errors.SchemaErrors as e:
print(f"FAIL: Schema validation errors:")
print(e.failure_cases.head(20))
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", required=True)
parser.add_argument("--schema-path", required=True)
args = parser.parse_args()
valid = validate_data(args.data_path, args.schema_path)
sys.exit(0 if valid else 1)
# Schema per il dataset di recensioni
min_rows: 1000
max_duplicates_pct: 0.05
strict: false
columns:
review_text:
dtype: "object"
nullable: false
min_length: 10
max_length: 5000
sentiment:
dtype: "int64"
nullable: false
allowed_values: [0, 1]
rating:
dtype: "float64"
nullable: true
min_value: 1.0
max_value: 5.0
review_date:
dtype: "object"
nullable: true
Wersjonowanie danych za pomocą DVC w potoku
Dane są za duże dla Git, ale należy je wersjonować i zsynchronizować
w rurociągu CI/CD. DVC (kontrola wersji danych) rozwiązuje ten problem:
zapisuj dane w zdalnym magazynie (S3, GCS, Azure Blob) i śledź tylko w Git
metadane (hash, rozmiar). Można używać akcji GitHub dvc pull dla
pobierz dokładnie wersję danych powiązaną z bieżącym zatwierdzeniem.
stages:
prepare:
cmd: python -m src.data.preprocessing --config config/training.yaml
deps:
- src/data/preprocessing.py
- config/training.yaml
- data/raw/reviews.csv
outs:
- data/processed/train.csv
- data/processed/test.csv
train:
cmd: python train.py --config config/training.yaml
deps:
- train.py
- src/models/trainer.py
- data/processed/train.csv
- config/training.yaml
outs:
- models/model.pkl
metrics:
- metrics/train_metrics.json:
cache: false
evaluate:
cmd: >-
python evaluate.py
--model-path models/model.pkl
--test-data data/processed/test.csv
--thresholds config/thresholds.yaml
--output metrics/eval_metrics.json
deps:
- evaluate.py
- models/model.pkl
- data/processed/test.csv
- config/thresholds.yaml
metrics:
- metrics/eval_metrics.json:
cache: false
# Inizializza DVC
dvc init
# Configura storage remoto (S3 in questo esempio)
dvc remote add -d myremote s3://my-ml-data-bucket/sentiment-classifier
dvc remote modify myremote region eu-west-1
# Traccia il dataset
dvc add data/raw/reviews.csv
# Committa i file DVC in Git
git add data/raw/reviews.csv.dvc data/raw/.gitignore dvc.yaml dvc.lock
git commit -m "feat: add DVC tracking for training data"
# Push dei dati su S3
dvc push
Działania DVC + GitHub: jak to działa
- Git śledzi pliki
.dvc(metadane: skrót SHA256, rozmiar) - Prawdziwe dane znajdują się na S3 (lub GCS, Azure Blob, Google Drive)
- Wykonywany jest przepływ pracy akcji GitHub
dvc pullaby pobrać dane - Poświadczenia S3 są przekazywane przez sekrety GitHub
- Każde zatwierdzenie Git odpowiada dokładnej wersji danych
Rejestr modelu z MLflow
Rejestr modelu to komponent zarządzający cyklem życia modeli po szkolenie. Każdy przeszkolony model jest zarejestrowany z nazwą, wersją i rozszerzeniem status (W przygotowaniu, Produkcja, Zarchiwizowane). Potok CI/CD współdziała z rejestrem promowanie modeli przekraczających progi walidacji.
"""Script per la promozione del modello nel registry MLflow."""
import mlflow
from mlflow.tracking import MlflowClient
def promote_model(
model_name: str,
run_id: str,
target_stage: str = "Production"
) -> None:
"""Promuove un modello a Production nel registry."""
client = MlflowClient()
# Cerca la versione del modello associata al run
model_versions = client.search_model_versions(
f"name='{model_name}'"
)
target_version = None
for mv in model_versions:
if mv.run_id == run_id:
target_version = mv.version
break
if target_version is None:
raise ValueError(
f"Nessun modello trovato per run_id={run_id}"
)
# Archivia il modello attualmente in Production
for mv in model_versions:
if mv.current_stage == "Production":
client.transition_model_version_stage(
name=model_name,
version=mv.version,
stage="Archived",
archive_existing_versions=False
)
print(f"Archiviato modello v{mv.version} (precedente Production)")
# Promuovi il nuovo modello
client.transition_model_version_stage(
name=model_name,
version=target_version,
stage=target_stage
)
print(f"Promosso modello v{target_version} a {target_stage}")
def load_production_model(model_name: str):
"""Carica il modello attualmente in Production."""
model_uri = f"models:/{model_name}/Production"
return mlflow.sklearn.load_model(model_uri)
Podpis modelu i przykład wprowadzania
Sygnatura modelu dokumentuje wzór wejść i wyjść modelu. Służy to zarówno jako dokumentacja, jak i automatyczna walidacja: jeśli ktoś spróbuj przekazać dane wejściowe z innym schematem, MLflow zgłasza wyraźny błąd.
import mlflow
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec
# Definisci la signature esplicita
input_schema = Schema([
ColSpec("string", "text")
])
output_schema = Schema([
ColSpec("long", "prediction")
])
signature = ModelSignature(
inputs=input_schema,
outputs=output_schema
)
# Esempio di input per documentazione
input_example = {
"text": "This product is excellent, highly recommended!"
}
# Registra il modello con signature e esempio
mlflow.sklearn.log_model(
sk_model=trained_pipeline,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name="sentiment-classifier"
)
Testowanie w rurociągu ML
Testowanie uczenia maszynowego jest bardziej złożone niż tradycyjne testowanie oprogramowania. Nie wystarczy sprawdzić że kod „działa”: musisz przetestować jakość danych, poprawność przetwarzanie wstępne, stabilność uczenia i zachowanie interfejsu API. The Potok CI/CD wykonuje trzy poziomy testowania.
Test jednostkowy do przetwarzania wstępnego
"""Unit test per il modulo di preprocessing."""
import pytest
import pandas as pd
from src.data.preprocessing import clean_text, load_and_preprocess_data
class TestCleanText:
"""Test per la funzione clean_text."""
def test_removes_html_tags(self):
assert clean_text("<p>Hello</p>") == "hello"
def test_removes_urls(self):
assert clean_text("Visit http://example.com for info") == "visit for info"
def test_removes_special_characters(self):
assert clean_text("Hello!!! World???") == "hello world"
def test_normalizes_whitespace(self):
assert clean_text("Hello World") == "hello world"
def test_lowercases(self):
assert clean_text("HELLO WORLD") == "hello world"
def test_empty_string(self):
assert clean_text("") == ""
def test_none_input(self):
assert clean_text(None) == ""
def test_numeric_preserved(self):
assert clean_text("Rating 5 out of 10") == "rating 5 out of 10"
class TestLoadAndPreprocess:
"""Test per il caricamento e preprocessing dei dati."""
def test_missing_columns_raises(self, tmp_path):
"""Verifica che colonne mancanti generino un errore."""
df = pd.DataFrame({"wrong_col": ["text"]})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
with pytest.raises(ValueError, match="Colonne mancanti"):
load_and_preprocess_data(str(csv_path))
def test_drops_na_rows(self, tmp_path):
"""Verifica che le righe con NA vengano rimosse."""
df = pd.DataFrame({
"review_text": ["Good product", None, "Bad product"],
"sentiment": [1, 0, 0]
})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
X, y = load_and_preprocess_data(str(csv_path))
assert len(X) == 2
def test_output_types(self, tmp_path):
"""Verifica i tipi di output."""
df = pd.DataFrame({
"review_text": ["Great product", "Terrible service"],
"sentiment": [1, 0]
})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
X, y = load_and_preprocess_data(str(csv_path))
assert isinstance(X, pd.Series)
assert isinstance(y, pd.Series)
Test integracji dla potoku
"""Integration test per il training e la valutazione del modello."""
import pytest
import pandas as pd
from src.models.trainer import create_pipeline, train_model, evaluate_model
@pytest.fixture
def sample_data():
"""Crea un dataset di test sintetico."""
texts = [
"amazing product love it", "terrible waste of money",
"great quality recommended", "horrible experience never again",
"excellent value for price", "poor quality disappointed",
"best purchase ever made", "worst product i bought",
"fantastic results happy", "awful terrible regret buying",
] * 10 # Ripetuto per avere abbastanza dati
sentiments = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0] * 10
return pd.Series(texts), pd.Series(sentiments)
class TestPipeline:
"""Test per la pipeline di training."""
def test_create_pipeline_logistic(self):
"""Verifica creazione pipeline con LogisticRegression."""
config = {"algorithm": "logistic_regression", "max_iter": 100}
pipeline = create_pipeline(config)
assert len(pipeline.steps) == 2
def test_create_pipeline_invalid_algorithm(self):
"""Verifica errore con algoritmo non supportato."""
with pytest.raises(ValueError, match="non supportato"):
create_pipeline({"algorithm": "invalid_algo"})
def test_train_and_evaluate(self, sample_data):
"""Test end-to-end: training + evaluation."""
X, y = sample_data
config = {
"algorithm": "logistic_regression",
"max_iter": 200,
"random_state": 42
}
pipeline = create_pipeline(config)
trained = train_model(pipeline, X[:80], y[:80])
metrics = evaluate_model(trained, X[80:], y[80:])
assert "accuracy" in metrics
assert "f1_score" in metrics
assert 0.0 <= metrics["accuracy"] <= 1.0
assert 0.0 <= metrics["f1_score"] <= 1.0
def test_model_deterministic(self, sample_data):
"""Verifica che il training sia deterministico con seed fisso."""
X, y = sample_data
config = {
"algorithm": "logistic_regression",
"max_iter": 200,
"random_state": 42
}
p1 = train_model(create_pipeline(config), X[:80], y[:80])
p2 = train_model(create_pipeline(config), X[:80], y[:80])
m1 = evaluate_model(p1, X[80:], y[80:])
m2 = evaluate_model(p2, X[80:], y[80:])
assert m1["accuracy"] == m2["accuracy"]
Test dymu do serwowania
"""Smoke test per l'API di serving FastAPI."""
import pytest
from httpx import AsyncClient, ASGITransport
from src.serving.app import app
@pytest.fixture
def client():
"""Client HTTP per testare l'API."""
transport = ASGITransport(app=app)
return AsyncClient(transport=transport, base_url="http://test")
@pytest.mark.asyncio
async def test_health_endpoint(client):
"""Verifica che l'endpoint /health risponda 200."""
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
@pytest.mark.asyncio
async def test_predict_positive(client):
"""Verifica predizione per testo positivo."""
response = await client.post(
"/predict",
json={"text": "This product is amazing, I love it!"}
)
assert response.status_code == 200
data = response.json()
assert "prediction" in data
assert "confidence" in data
assert data["confidence"] > 0.0
@pytest.mark.asyncio
async def test_predict_empty_text(client):
"""Verifica errore con testo vuoto."""
response = await client.post(
"/predict",
json={"text": ""}
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_predict_batch(client):
"""Verifica predizione batch."""
response = await client.post(
"/predict/batch",
json={"texts": [
"Great product",
"Terrible experience"
]}
)
assert response.status_code == 200
data = response.json()
assert len(data["predictions"]) == 2
# Aggiungere questo job prima del training nel workflow
tests:
name: Run Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest-asyncio pytest-cov
- name: Run unit tests
run: pytest tests/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v4
with:
file: coverage.xml
Obsługa API za pomocą FastAPI
Wyszkolony model musi być dostępny za pośrednictwem interfejsu API REST. FastAPI i wybór idealny do obsługi ML w Pythonie: i szybki, ma automatyczną walidację danych wejściowych poprzez Pydantic i automatycznie generuje dokumentację OpenAPI.
"""API di serving per il classificatore di sentiment."""
import os
import time
import joblib
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from src.serving.schemas import (
PredictionRequest, PredictionResponse,
BatchPredictionRequest, BatchPredictionResponse,
HealthResponse
)
from src.data.preprocessing import clean_text
# Variabili globali per il modello
model_pipeline = None
model_version = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Carica il modello all'avvio dell'applicazione."""
global model_pipeline, model_version
model_path = os.getenv("MODEL_PATH", "models/model.pkl")
if not Path(model_path).exists():
raise RuntimeError(f"Modello non trovato: {model_path}")
model_pipeline = joblib.load(model_path)
model_version = os.getenv("MODEL_VERSION", "unknown")
print(f"Modello caricato: v{model_version}")
yield
model_pipeline = None
app = FastAPI(
title="Sentiment Classifier API",
version="1.0.0",
lifespan=lifespan
)
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check dell'API."""
return HealthResponse(
status="healthy" if model_pipeline is not None else "unhealthy",
model_version=model_version or "not loaded"
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Predizione singola."""
if model_pipeline is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
cleaned = clean_text(request.text)
if not cleaned:
raise HTTPException(status_code=422, detail="Text is empty after cleaning")
prediction = model_pipeline.predict([cleaned])[0]
probabilities = model_pipeline.predict_proba([cleaned])[0]
confidence = float(max(probabilities))
latency_ms = (time.time() - start) * 1000
return PredictionResponse(
prediction=int(prediction),
confidence=confidence,
label="positive" if prediction == 1 else "negative",
latency_ms=round(latency_ms, 2)
)
@app.post("/predict/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
"""Predizione batch su più testi."""
if model_pipeline is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
cleaned_texts = [clean_text(t) for t in request.texts]
predictions = model_pipeline.predict(cleaned_texts)
probabilities = model_pipeline.predict_proba(cleaned_texts)
latency_ms = (time.time() - start) * 1000
results = []
for i, text in enumerate(request.texts):
results.append(PredictionResponse(
prediction=int(predictions[i]),
confidence=float(max(probabilities[i])),
label="positive" if predictions[i] == 1 else "negative",
latency_ms=0
))
return BatchPredictionResponse(
predictions=results,
total_latency_ms=round(latency_ms, 2)
)
"""Pydantic schemas per l'API di serving."""
from pydantic import BaseModel, Field
from typing import List
class PredictionRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=5000,
description="Testo della recensione")
class PredictionResponse(BaseModel):
prediction: int = Field(..., description="0=negativo, 1=positivo")
confidence: float = Field(..., ge=0.0, le=1.0,
description="Confidenza della predizione")
label: str = Field(..., description="Label leggibile")
latency_ms: float = Field(..., description="Latenza in millisecondi")
class BatchPredictionRequest(BaseModel):
texts: List[str] = Field(..., min_length=1, max_length=100,
description="Lista di testi")
class BatchPredictionResponse(BaseModel):
predictions: List[PredictionResponse]
total_latency_ms: float
class HealthResponse(BaseModel):
status: str
model_version: str
Monitorowanie po wdrożeniu
Wdrożenie to nie koniec rurociągu, ale początek najbardziej krytycznej fazy: monitorowanie produkcji. Model ML może po cichu ulec degradacji, gdy i dane rzeczywiste odbiegają od danych treningowych. Rurociąg musi obejmować kontrole stanu ciągłe, rejestrowanie prognoz i wyzwalacze automatycznego ponownego szkolenia.
"""Monitoring post-deployment per il modello ML."""
import time
import logging
from datetime import datetime, timedelta
from collections import deque
from typing import Dict, Optional
from dataclasses import dataclass, field
@dataclass
class PredictionLog:
"""Log di una singola predizione."""
timestamp: datetime
input_text: str
prediction: int
confidence: float
latency_ms: float
class ModelMonitor:
"""Monitora le performance del modello in produzione."""
def __init__(
self,
window_size: int = 1000,
min_confidence_threshold: float = 0.6,
max_latency_ms: float = 500.0,
drift_check_interval: int = 100
):
self.window_size = window_size
self.min_confidence = min_confidence_threshold
self.max_latency = max_latency_ms
self.drift_check_interval = drift_check_interval
self.predictions: deque = deque(maxlen=window_size)
self.alert_callbacks = []
self.prediction_count = 0
self.logger = logging.getLogger("model_monitor")
def log_prediction(self, log: PredictionLog) -> None:
"""Registra una predizione e verifica le metriche."""
self.predictions.append(log)
self.prediction_count += 1
# Check latenza
if log.latency_ms > self.max_latency:
self._alert(
"HIGH_LATENCY",
f"Latenza {log.latency_ms:.0f}ms supera soglia {self.max_latency}ms"
)
# Check confidenza bassa
if log.confidence < self.min_confidence:
self._alert(
"LOW_CONFIDENCE",
f"Confidenza {log.confidence:.2f} sotto soglia {self.min_confidence}"
)
# Check periodico per drift
if self.prediction_count % self.drift_check_interval == 0:
self._check_distribution_drift()
def get_metrics(self) -> Dict:
"""Restituisce le metriche correnti della finestra."""
if not self.predictions:
return {"status": "no_data"}
recent = list(self.predictions)
confidences = [p.confidence for p in recent]
latencies = [p.latency_ms for p in recent]
predictions = [p.prediction for p in recent]
positive_rate = sum(1 for p in predictions if p == 1) / len(predictions)
return {
"total_predictions": self.prediction_count,
"window_size": len(recent),
"avg_confidence": sum(confidences) / len(confidences),
"min_confidence": min(confidences),
"avg_latency_ms": sum(latencies) / len(latencies),
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"positive_rate": positive_rate,
"low_confidence_pct": sum(
1 for c in confidences if c < self.min_confidence
) / len(confidences),
}
def _check_distribution_drift(self) -> None:
"""Verifica se la distribuzione delle predizioni e cambiata."""
if len(self.predictions) < self.window_size:
return
recent = list(self.predictions)
half = len(recent) // 2
first_half = [p.prediction for p in recent[:half]]
second_half = [p.prediction for p in recent[half:]]
rate_first = sum(first_half) / len(first_half)
rate_second = sum(second_half) / len(second_half)
# Se la distribuzione cambia più del 15%, segnala drift
if abs(rate_first - rate_second) > 0.15:
self._alert(
"DISTRIBUTION_DRIFT",
f"Positive rate cambiato: {rate_first:.2f} -> {rate_second:.2f}"
)
def _alert(self, alert_type: str, message: str) -> None:
"""Invia un alert."""
self.logger.warning(f"[{alert_type}] {message}")
for callback in self.alert_callbacks:
callback(alert_type, message)
Kluczowe wskaźniki do monitorowania
- Opóźnienie (p50, p95, p99): Czas odpowiedzi API
- Przepustowość: liczba prognoz na sekundę
- Rozkład prognoz: zmiany stosunku dodatniego do ujemnego
- Średnia pewność: spadek wskazuje, że model jest „niepewny”
- Poziom błędów: Poziom błędów HTTP 5xx
- Przesunięcie daty: rozbieżność między danymi produkcyjnymi i danymi szkoleniowymi
Koszty i optymalizacja
GitHub Actions oferuje 2000 bezpłatnych minut miesięcznie na repozytoria publiczne i 500 minut na prywatne repozytoria (bezpłatny plan). Trening ML może szybko pochłonąć te minuty. Oto jak zoptymalizować.
Strategie buforowania
# Cache delle dipendenze Python
- name: Cache pip packages
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
# Cache del dataset DVC (evita download ripetuti)
- name: Cache DVC data
uses: actions/cache@v4
with:
path: |
data/
.dvc/cache/
key: dvc-${{ hashFiles('data/*.dvc', 'dvc.lock') }}
restore-keys: |
dvc-
# Cache della Docker layer
- name: Build with cache
uses: docker/build-push-action@v5
with:
context: .
cache-from: type=gha
cache-to: type=gha,mode=max
Samodzielnie hostowane moduły uruchamiające z procesorami graficznymi
Do treningu na GPU nie wystarczą biegacze hostowani na GitHubie (nie mają GPU). Rozwiązanie oraz hostowany na własnym serwerze moduł uruchamiający na komputerze z procesorem graficznym. Eliminuje to również koszt za minutę przez GitHub Actions.
# 1. Sulla macchina con GPU, scarica il runner
mkdir actions-runner && cd actions-runner
curl -o actions-runner.tar.gz -L \
https://github.com/actions/runner/releases/download/v2.311.0/actions-runner-linux-x64-2.311.0.tar.gz
tar xzf actions-runner.tar.gz
# 2. Configura il runner
./config.sh --url https://github.com/YOUR_ORG/YOUR_REPO \
--token YOUR_TOKEN \
--labels gpu,cuda12,ml-training
# 3. Installa come servizio
sudo ./svc.sh install
sudo ./svc.sh start
training:
name: Train Model (GPU)
runs-on: [self-hosted, gpu, cuda12]
# Il job viene eseguito sulla macchina con GPU
steps:
- uses: actions/checkout@v4
- name: Train with GPU
run: |
nvidia-smi # Verifica GPU disponibile
python train.py --config config/training-gpu.yaml --device cuda
Szacunek kosztów dla scenariusza
| Scenariusz | Minuty/miesiąc | Koszt akcji GitHub | Całkowity koszt |
|---|---|---|---|
| Prototyp (szkolenie manualne, nauka scikit) | ~200 | Bezpłatny (bezpłatny plan) | ~0 EUR/miesiąc |
| PMI (trening cotygodniowy, model średni) | ~800 | ~12 EUR/miesiąc | ~50 EUR/miesiąc (z S3) |
| Scale-up (codzienne szkolenia, głębokie uczenie się) | ~3000 | ~48 EUR/miesiąc | ~200 EUR/miesiąc (z procesorem graficznym w chmurze) |
| Przedsiębiorstwo (wielomodelowe, szkolenia ciągłe) | ~10 000+ | Biegacz z własnym gospodarzem | ~500+ EUR/miesiąc |
Porównanie narzędzi CI/CD dla ML
GitHub Actions nie jest jedyną opcją. Każde narzędzie ma określone zalety w zależności od niego kontekstu. Oto praktyczne porównanie, które pomoże Ci wybrać.
| Charakterystyczny | Akcje GitHuba | GitLab CI | Jenkinsa | Przepływy pracy Argo |
|---|---|---|---|---|
| Organizować coś | Zero (zintegrowany) | Zero (zintegrowany) | Serwer dedykowany | Klastry Kubernetesa |
| Wsparcie GPU | Biegacz z własnym gospodarzem | Biegacz z własnym gospodarzem | Wtyczki NVIDIA | Natywny (procesor graficzny K8s) |
| Koszt (mały zespół) | Bezpłatny/niski | Bezpłatny/niski | Koszt serwera | Koszt klastra K8 |
| Równoległość | Dobry (matryca) | Dobry | Optymalny | Znakomity (DAG) |
| Potok jako kod | YAML | YAML | Groovy/YAML | SDK YAML/Pythona |
| Ekosystem ML | Ogromny rynek | Dobry | Wtyczki | Natywny dla chmury |
| Krzywa uczenia się | Niski | Niski | Przeciętny | Wysoki |
| Idealny dla | Małe/średnie zespoły, repozytorium GitHub | Zespół na GitLabie, samodzielnie zarządzany | Przedsiębiorstwo, lokalnie | Zespół K8, złożone rurociągi |
Który wybrać?
- Początek i prototyp: GitHub Actions - zerowa konfiguracja, natywna integracja, bezpłatne dla publicznych repozytoriów
- Zespół na GitLabie: GitLab CI - natywna integracja z GitLabem, doskonały kontener rejestru
- Lokalnie w przedsiębiorstwie: Jenkins - maksymalna elastyczność, dojrzały ekosystem wtyczek
- Zespoły natywne w chmurze z K8: Argo Workflows — potok DAG, skalowanie natywne, idealne do złożonego uczenia maszynowego
Kompletna konfiguracja poniżej 5000 EUR/rok
Jest to możliwe w przypadku małych i średnich firm, które chcą wdrożyć kompletny potok ML CI/CD pozostać poniżej 5000 EUR rocznie, korzystając z narzędzi open source i usług w chmurze z bezpłatnymi lub tanimi poziomami. Oto zalecany stos.
| Część | Narzędzia | Koszt/rok | Notatki |
|---|---|---|---|
| Repozytorium + CI/CD | GitHub (zespół) | ~400 EUR | 3000 min/miesiąc Zawiera akcje |
| Przechowywanie danych | AWS-a3 | ~120 EUR | Zbiór danych ~500 GB, transfer wliczony w cenę |
| Śledzenie eksperymentu | MLflow (własny hosting) | ~0 EUR | Oprogramowanie typu open source, wdrożone na maszynie wirtualnej w chmurze |
| Rejestr modeli | Rejestr modelu MLflow | ~0 EUR | Zawarte w MLflow |
| Rejestr kontenerów | Rejestr kontenerów GitHub | ~0 EUR | Dołączone do GitHuba |
| Model hostingu | Maszyna wirtualna w chmurze (średnia e2) | ~500 EUR | Do obsługi serwera FastAPI + MLflow |
| Wersjonowanie danych | DVC | ~0 EUR | Oprogramowanie typu open source, miejsce na dane zostało już policzone powyżej |
| Monitorowanie | Prometeusz + Grafana | ~0 EUR | Open-source, na tej samej maszynie wirtualnej |
| Walidacja danych | Pandera / Wielkie nadzieje | ~0 EUR | Otwarte źródło |
| Szkolenie dotyczące GPU (okazjonalne) | Instancja spotowa Cloud GPU | ~600 EUR | ~50 godzin/miesiąc spot T4 |
Szacunkowa suma: ~1620 EUR/rok - budżet znacznie poniżej 5000 EUR, z marginesem na skalowanie. Największym kosztem jest szkolenie GPU: jeśli używasz modeli klasyki (scikit-learn, XGBoost) koszt GPU spada do zera.
Uważaj na ukryte koszty
Koszt narzędzi to tylko część. Najbardziej znaczący koszt i czas pracy zespołu: Dla doświadczonego inżyniera wstępne ustawienie rurociągu zajmuje około 2–4 tygodni. Ciągła konserwacja zajmuje około 2-4 godzin tygodniowo. Oblicz także koszt transfer danych pomiędzy usługami w chmurze (wyjście), który może szybko rosnąć przy dużych zbiorach danych.
Wnioski i dalsze kroki
W tym artykule zbudowaliśmy kompletny potok CI/CD na potrzeby uczenia maszynowego, od wieloetapowego pliku Dockerfile po GitHub Actions z walidacją danych, szkoleniem, ocena i automatyczne wdrażanie. Kluczowe pojęcia, o których należy pamiętać to:
- CI/CD dla ML obsługuje trzy artefakty (kod, dane, model) i wprowadza szkolenie ciągłe
- Wieloetapowy Docker osobna kompilacja, szkolenie i udostępnianie zoptymalizowanych obrazów
- Akcje GitHuba koordynować cały potok za pomocą zadań zależnych i warunkowych
- Walidacja danych i pierwsza bramka: uszkodzone dane tworzą modele bezużyteczne
- Rejestr modeli zarządza wydawaniem szablonów i promocją
- Testowanie ml obejmuje trzy poziomy: jednostka, integracja, test dymu
- Monitorowanie po wdrożeniu i ma kluczowe znaczenie dla wykrywania dryfu i degradacji
- Koszty są do zarządzania: dzięki narzędziom open source pozostaniesz poniżej 2000 EUR/rok
Wybudowany przez nas gazociąg odpowiada ww Poziom 2 modelu dojrzałości Google MLOps: Pełna automatyzacja szkoleń i wdrażania, z zintegrowana walidacja i monitorowanie. Wychodząc od tego, w następnym artykule zagłębimy się głębiej MLflow do zaawansowanego śledzenia eksperymentów – rejestr modeli i zarządzanie artefaktami.
Plan działania serii
- Artykuł 1: MLOps: od eksperymentu do produkcji (ukończone)
- Artykuł 2: Potok ML z CI/CD: GitHub Actions + Docker (ten artykuł)
- Artykuł 3: MLflow Deep Dive — śledzenie eksperymentów i rejestr modeli
- Artykuł 4: DVC — wersjonowanie danych dla ML
- Artykuł 5: Skalowalne udostępnianie modelu za pomocą FastAPI i Dockera
- Artykuł 6: Kubernetes dla ML: orkiestracja i skalowanie
- Artykuł 7: Zaawansowane monitorowanie: dryf danych i ewidentna sztuczna inteligencja
- Artykuł 8: Testy A/B dla modeli ML w produkcji
- Artykuł 9: Zarządzanie, zgodność i odpowiedzialne ML
- Artykuł 10: Studium przypadku: Kompleksowy potok MLOps







