Monitorování NLP ve výrobě: Detekce posunu a přeškolení
An NLP model that is great at deployment can quickly become obsolete. The language evolves, patterns change, real data deviates from the training set. Tento jev se nazývá datový drift a pokud není monitorován, způsobuje a tiché snížení výkonu, které je často odhaleno příliš pozdě — když si zákazníci stěžují nebo se KPI společnosti propadnou.
V tomto článku vytvoříme kompletní monitorovací systém pro modely NLP v produkce: od sledování predikcí v reálném čase po automatickou detekci driftu, od upozornění k automatickému přeškolení pomocí MLflow a Airflow. Tento článek uzavírá sérii Moderní NLP: od BERT po LLM s pokročilým zaměřením na operace.
Co se naučíte
- Typy driftu: datový drift, koncepční drift, štítkový drift, rysový drift
- Metriky ke sledování pro modely NLP ve výrobě
- Detekce posunu textu: posun vložení pomocí indexu stability populace (PSI)
- Sledování kvality předpovědí bez štítků (proxy metriky)
- Výstražný systém s prahovými hodnotami a upozorněními
- Strukturované protokolování NLP predikcí
- Automatické přeškolovací potrubí se spouštěči založenými na driftu
- A/B testování nových verzí šablon
- Monitorovací panel s Grafana a Prometheus
- Stínové nasazení pro ověření nových modelů bez dopadu
1. Typy driftů v modelech NLP
„Posun“ v modelu NLP se může projevovat různými způsoby, přičemž každý má své příčiny a různá řešení.
Taxonomie driftu
| Typ | Definice | Příklad NLP | Řešení |
|---|---|---|---|
| Posun data | Změny distribuce vstupů | Nový slang na Twitteru | Přeškolte se s novými údaji |
| Koncept Drift | Vztah vstup-výstup se mění | „Trump“ = politika versus člověk | Častá rekvalifikace |
| Label Drift | Změny distribuce výstupu | Více negativních předpovědí v krizi | Monitorování distribuce výstupu |
| Funkce Drift | Statistiky funkcí se mění | Průměrná délka textu se zvyšuje | Sledování funkcí + upozornění |
2. Predikční protokolovací systém
Základem každého monitorovacího systému je strukturované protokolování každé predikce. Potřebujeme zachytit dostatek informací, abychom mohli analyzovat chování modelu časem.
import json
import time
import hashlib
import logging
from dataclasses import dataclass, asdict, field
from typing import Optional, List, Dict, Any
from datetime import datetime
import uuid
@dataclass
class NLPPredictionLog:
"""Schema di logging per predizioni NLP."""
prediction_id: str
timestamp: str
model_version: str
input_text: str
input_hash: str # hash del testo (non il testo per privacy)
input_length_chars: int
input_length_tokens: int
predicted_label: str
predicted_label_id: int
confidence_score: float
all_class_scores: Dict[str, float]
inference_latency_ms: float
true_label: Optional[str] = None # None se non disponibile
feedback: Optional[str] = None # feedback utente se disponibile
metadata: Dict[str, Any] = field(default_factory=dict)
class NLPPredictionLogger:
"""Logger strutturato per predizioni NLP."""
def __init__(self, model_version: str, log_path: str = "./prediction_logs"):
self.model_version = model_version
self.log_path = log_path
self.logger = logging.getLogger("nlp_predictions")
# Handler per file JSON Lines (JSONL)
handler = logging.FileHandler(f"{log_path}/predictions.jsonl")
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_prediction(self,
text: str,
predicted_label: str,
predicted_label_id: int,
confidence: float,
all_scores: Dict[str, float],
latency_ms: float,
num_tokens: int,
true_label: Optional[str] = None,
metadata: Optional[dict] = None) -> str:
"""Logga una singola predizione. Restituisce prediction_id."""
# Hash dell'input (non salvare il testo originale per GDPR)
input_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
prediction_id = str(uuid.uuid4())
log_entry = NLPPredictionLog(
prediction_id=prediction_id,
timestamp=datetime.utcnow().isoformat(),
model_version=self.model_version,
input_text=text[:500], # troncato per storage
input_hash=input_hash,
input_length_chars=len(text),
input_length_tokens=num_tokens,
predicted_label=predicted_label,
predicted_label_id=predicted_label_id,
confidence_score=confidence,
all_class_scores=all_scores,
inference_latency_ms=latency_ms,
true_label=true_label,
metadata=metadata or {}
)
self.logger.info(json.dumps(asdict(log_entry)))
return prediction_id
# Uso nella pipeline di inferenza
class MonitoredSentimentClassifier:
def __init__(self, model_path: str, model_version: str):
from transformers import pipeline, AutoTokenizer
self.pipeline = pipeline("text-classification", model=model_path)
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.logger = NLPPredictionLogger(model_version)
self.model_version = model_version
def predict(self, text: str, metadata: dict = None) -> dict:
start = time.time()
# Inferenza
result = self.pipeline(text)[0]
# Calcola numero di token
num_tokens = len(self.tokenizer.tokenize(text)[:128])
latency_ms = (time.time() - start) * 1000
# Log
pred_id = self.logger.log_prediction(
text=text,
predicted_label=result['label'],
predicted_label_id=0 if result['label'] == 'NEGATIVE' else 1,
confidence=result['score'],
all_scores={result['label']: result['score']},
latency_ms=latency_ms,
num_tokens=num_tokens,
metadata=metadata or {}
)
return {
"prediction_id": pred_id,
"label": result['label'],
"confidence": result['score'],
"latency_ms": latency_ms
}
3. Detekce posunu: Přístup k vkládání textu
Nejrobustnější metoda pro detekci posunu dat v textu a porovnání distribuce vkládání vět v tréninkové sestavě s těmi ve výrobě.
import numpy as np
from sentence_transformers import SentenceTransformer
from scipy.stats import ks_2samp
from scipy.spatial.distance import jensenshannon
import warnings
class EmbeddingDriftDetector:
"""
Rileva data drift confrontando la distribuzione degli embedding.
Usa il test di Kolmogorov-Smirnov (KS) per ogni dimensione dell'embedding.
"""
def __init__(self, embedding_model: str = 'all-MiniLM-L6-v2',
ks_threshold: float = 0.1,
psi_threshold: float = 0.2):
self.model = SentenceTransformer(embedding_model)
self.ks_threshold = ks_threshold # soglia test KS
self.psi_threshold = psi_threshold # soglia PSI
self.reference_embeddings = None
self.reference_stats = None
def fit(self, reference_texts: List[str], batch_size: int = 64):
"""Calcola statistiche di riferimento dal training set."""
print(f"Computing reference embeddings for {len(reference_texts)} texts...")
self.reference_embeddings = self.model.encode(
reference_texts, batch_size=batch_size, show_progress_bar=True
)
self.reference_stats = {
'mean': self.reference_embeddings.mean(axis=0),
'std': self.reference_embeddings.std(axis=0),
'n': len(reference_texts)
}
print(f"Reference embeddings computed: shape={self.reference_embeddings.shape}")
def detect_drift(self, production_texts: List[str],
batch_size: int = 64) -> Dict[str, Any]:
"""Rileva drift confrontando produzione con riferimento."""
if self.reference_embeddings is None:
raise ValueError("Call fit() first with reference data")
prod_embeddings = self.model.encode(
production_texts, batch_size=batch_size, show_progress_bar=False
)
# Metodo 1: KS test per ogni dimensione dell'embedding
ks_stats = []
ks_pvalues = []
for dim in range(self.reference_embeddings.shape[1]):
stat, pvalue = ks_2samp(
self.reference_embeddings[:, dim],
prod_embeddings[:, dim]
)
ks_stats.append(stat)
ks_pvalues.append(pvalue)
avg_ks = np.mean(ks_stats)
max_ks = np.max(ks_stats)
# Metodo 2: Cosine distance media tra centroidi
ref_centroid = self.reference_embeddings.mean(axis=0)
prod_centroid = prod_embeddings.mean(axis=0)
centroid_distance = 1 - np.dot(ref_centroid, prod_centroid) / (
np.linalg.norm(ref_centroid) * np.linalg.norm(prod_centroid)
)
# Metodo 3: PSI (Population Stability Index)
psi = self._compute_psi(
self.reference_embeddings[:, :10], # prime 10 dim per PSI
prod_embeddings[:, :10]
)
drift_detected = (avg_ks > self.ks_threshold or
centroid_distance > 0.05)
return {
"drift_detected": drift_detected,
"avg_ks_statistic": float(avg_ks),
"max_ks_statistic": float(max_ks),
"centroid_cosine_distance": float(centroid_distance),
"psi": float(psi),
"n_production": len(production_texts),
"alert_level": "HIGH" if avg_ks > self.ks_threshold * 2
else "MEDIUM" if drift_detected
else "LOW"
}
def _compute_psi(self, reference: np.ndarray, production: np.ndarray,
n_bins: int = 10) -> float:
"""Population Stability Index: misura lo shift della distribuzione."""
psi_values = []
for dim in range(reference.shape[1]):
ref = reference[:, dim]
prod = production[:, dim]
bins = np.percentile(ref, np.linspace(0, 100, n_bins + 1))
bins[0] -= 0.001
bins[-1] += 0.001
ref_counts, _ = np.histogram(ref, bins=bins)
prod_counts, _ = np.histogram(prod, bins=bins)
ref_pct = (ref_counts / ref_counts.sum()) + 1e-10
prod_pct = (prod_counts / prod_counts.sum()) + 1e-10
psi = np.sum((prod_pct - ref_pct) * np.log(prod_pct / ref_pct))
psi_values.append(psi)
return float(np.mean(psi_values))
4. Proxy metriky: Monitor bez štítků
Ve výrobě často nemáme skutečné štítky pro výpočet přesnosti. Pojďme použít proxy metriky které korelují s kvalitou modelu.
import numpy as np
from collections import defaultdict
from datetime import datetime, timedelta
import pandas as pd
class NLPProxyMetricsMonitor:
"""
Monitora metriche proxy per modelli NLP senza label.
"""
def __init__(self, window_hours: int = 24):
self.window_hours = window_hours
self.predictions = []
def add_prediction(self, prediction: dict):
"""Aggiunge una predizione al log."""
prediction['timestamp'] = datetime.utcnow()
self.predictions.append(prediction)
def compute_proxy_metrics(self) -> dict:
"""Calcola metriche proxy dalla finestra temporale corrente."""
cutoff = datetime.utcnow() - timedelta(hours=self.window_hours)
recent = [p for p in self.predictions if p['timestamp'] > cutoff]
if not recent:
return {"error": "Nessuna predizione nella finestra temporale"}
confidences = [p['confidence'] for p in recent]
latencies = [p['latency_ms'] for p in recent]
labels = [p['predicted_label'] for p in recent]
# 1. Confidence distribution (bassa confidenza = modello incerto)
low_conf_pct = sum(1 for c in confidences if c < 0.7) / len(confidences)
avg_confidence = np.mean(confidences)
confidence_entropy = -np.sum(
[(c * np.log(c) + (1-c) * np.log(1-c + 1e-10)) for c in confidences]
) / len(confidences)
# 2. Label distribution (drift nelle predizioni)
label_counts = defaultdict(int)
for l in labels:
label_counts[l] += 1
label_distribution = {k: v/len(labels) for k, v in label_counts.items()}
# 3. Latency percentiles
p50 = np.percentile(latencies, 50)
p95 = np.percentile(latencies, 95)
p99 = np.percentile(latencies, 99)
# 4. Text length statistics
lengths = [p.get('input_length_chars', 0) for p in recent]
# 5. Refusal rate (se il modello ritorna "UNCERTAIN")
uncertain_pct = sum(1 for l in labels if l == 'UNCERTAIN') / len(labels)
return {
"window_hours": self.window_hours,
"n_predictions": len(recent),
"avg_confidence": round(avg_confidence, 4),
"low_confidence_pct": round(low_conf_pct, 4),
"confidence_entropy": round(float(confidence_entropy), 4),
"label_distribution": label_distribution,
"latency_p50_ms": round(p50, 1),
"latency_p95_ms": round(p95, 1),
"latency_p99_ms": round(p99, 1),
"avg_input_length": round(np.mean(lengths), 1),
"uncertain_pct": round(uncertain_pct, 4)
}
def check_alerts(self, thresholds: dict) -> list:
"""Verifica se le metriche proxy superano le soglie di alert."""
metrics = self.compute_proxy_metrics()
alerts = []
checks = {
"avg_confidence": ("<", thresholds.get("min_confidence", 0.75)),
"low_confidence_pct": (">", thresholds.get("max_low_conf_pct", 0.20)),
"latency_p95_ms": (">", thresholds.get("max_p95_latency_ms", 500)),
"uncertain_pct": (">", thresholds.get("max_uncertain_pct", 0.10)),
}
for metric_name, (op, threshold) in checks.items():
value = metrics.get(metric_name)
if value is None:
continue
triggered = (value < threshold if op == "<" else value > threshold)
if triggered:
alerts.append({
"metric": metric_name,
"value": value,
"threshold": threshold,
"severity": "HIGH" if abs(value - threshold) / threshold > 0.5 else "MEDIUM"
})
return alerts
5. Potrubí pro automatické přeškolení
import subprocess
from pathlib import Path
import json
from datetime import datetime
class AutoRetrainingPipeline:
"""
Pipeline di retraining automatico triggered dal drift detection.
"""
def __init__(self,
drift_detector: EmbeddingDriftDetector,
proxy_monitor: NLPProxyMetricsMonitor,
base_model_path: str,
data_path: str,
output_path: str):
self.drift_detector = drift_detector
self.proxy_monitor = proxy_monitor
self.base_model_path = base_model_path
self.data_path = data_path
self.output_path = output_path
self.retraining_history = []
def should_retrain(self,
production_texts: list,
drift_threshold: float = 0.1,
confidence_threshold: float = 0.75) -> dict:
"""
Determina se e necessario il retraining.
Ritorna {should_retrain: bool, reason: str, severity: str}
"""
# Check 1: Embedding drift
drift_report = self.drift_detector.detect_drift(production_texts)
if drift_report['drift_detected']:
return {
"should_retrain": True,
"reason": f"Embedding drift rilevato: KS={drift_report['avg_ks_statistic']:.4f}",
"severity": drift_report['alert_level'],
"drift_report": drift_report
}
# Check 2: Proxy metrics
metrics = self.proxy_monitor.compute_proxy_metrics()
alerts = self.proxy_monitor.check_alerts({
"min_confidence": confidence_threshold,
"max_low_conf_pct": 0.25
})
if any(a['severity'] == 'HIGH' for a in alerts):
return {
"should_retrain": True,
"reason": f"Metriche proxy critiche: {alerts}",
"severity": "HIGH",
"alerts": alerts
}
return {
"should_retrain": False,
"reason": "Tutte le metriche nella norma",
"severity": "LOW"
}
def trigger_retraining(self, trigger_reason: str, new_data_path: str):
"""Avvia il retraining con i nuovi dati."""
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
new_model_path = f"{self.output_path}/model_v{timestamp}"
print(f"Avvio retraining: {trigger_reason}")
print(f"Nuovo modello: {new_model_path}")
# Log del retraining
self.retraining_history.append({
"timestamp": timestamp,
"trigger_reason": trigger_reason,
"base_model": self.base_model_path,
"new_data": new_data_path,
"output_model": new_model_path,
"status": "started"
})
# In produzione: trigghera una pipeline CI/CD (Airflow, GitHub Actions, Kubeflow)
# Esempio con subprocess:
# subprocess.Popen([
# "python", "train.py",
# "--base-model", self.base_model_path,
# "--train-data", new_data_path,
# "--output", new_model_path,
# ])
return {
"retraining_id": timestamp,
"new_model_path": new_model_path,
"status": "triggered"
}
6. A/B testování pro nové verze modelu
import random
from typing import Callable
class ABTestingRouter:
"""
Router per A/B testing tra versioni del modello.
Splitta il traffico tra il modello corrente (A) e il nuovo (B).
"""
def __init__(self,
model_a: Callable,
model_b: Callable,
traffic_split_b: float = 0.1,
experiment_id: str = "exp_001"):
self.model_a = model_a
self.model_b = model_b
self.traffic_split_b = traffic_split_b
self.experiment_id = experiment_id
self.results = {"a": [], "b": []}
def predict(self, text: str, user_id: str = None) -> dict:
"""Instrada la richiesta al modello A o B in base al traffic split."""
# Instradamento deterministico basato su user_id (per coerenza)
if user_id:
use_b = int(hashlib.md5(user_id.encode()).hexdigest(), 16) % 100 < (self.traffic_split_b * 100)
else:
use_b = random.random() < self.traffic_split_b
model_variant = "b" if use_b else "a"
model_fn = self.model_b if use_b else self.model_a
result = model_fn(text)
result["model_variant"] = model_variant
result["experiment_id"] = self.experiment_id
self.results[model_variant].append({
"confidence": result.get("confidence", 0),
"latency_ms": result.get("latency_ms", 0),
})
return result
def get_experiment_stats(self) -> dict:
"""Calcola statistiche dell'esperimento A/B."""
stats = {}
for variant in ["a", "b"]:
if self.results[variant]:
confs = [r["confidence"] for r in self.results[variant]]
lats = [r["latency_ms"] for r in self.results[variant]]
stats[variant] = {
"n_requests": len(self.results[variant]),
"avg_confidence": round(np.mean(confs), 4),
"avg_latency_ms": round(np.mean(lats), 1),
}
return {"experiment_id": self.experiment_id, "variants": stats}
7. Dashboard s Prometheus a Grafana
# monitoring_api.py
from fastapi import FastAPI
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
import time
app = FastAPI()
# Metriche Prometheus per modelli NLP
PREDICTIONS_TOTAL = Counter(
"nlp_predictions_total",
"Numero totale di predizioni NLP",
["model_version", "predicted_label"]
)
CONFIDENCE_HISTOGRAM = Histogram(
"nlp_prediction_confidence",
"Distribuzione del confidence score",
["model_version"],
buckets=[0.5, 0.6, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 0.99, 1.0]
)
LATENCY_HISTOGRAM = Histogram(
"nlp_inference_latency_seconds",
"Latenza dell'inferenza NLP",
["model_version"],
buckets=[0.01, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0]
)
DRIFT_SCORE = Gauge(
"nlp_embedding_drift_score",
"Score del drift degli embedding (0=no drift, 1=drift massimo)",
["model_version"]
)
INPUT_LENGTH_HISTOGRAM = Histogram(
"nlp_input_length_chars",
"Lunghezza dell'input in caratteri",
["model_version"],
buckets=[50, 100, 200, 500, 1000, 2000, 5000]
)
MODEL_VERSION = "v2.1.0"
@app.post("/predict")
def predict_with_monitoring(request: dict):
text = request["text"]
start = time.time()
# ... Inferenza ...
result = {"label": "POSITIVE", "score": 0.92}
latency = time.time() - start
# Aggiorna metriche Prometheus
PREDICTIONS_TOTAL.labels(
model_version=MODEL_VERSION,
predicted_label=result["label"]
).inc()
CONFIDENCE_HISTOGRAM.labels(model_version=MODEL_VERSION).observe(result["score"])
LATENCY_HISTOGRAM.labels(model_version=MODEL_VERSION).observe(latency)
INPUT_LENGTH_HISTOGRAM.labels(model_version=MODEL_VERSION).observe(len(text))
return {**result, "latency_ms": latency * 1000}
@app.get("/metrics")
def metrics():
"""Endpoint Prometheus per il scraping delle metriche."""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
# docker-compose.yml per Prometheus + Grafana:
# services:
# prometheus:
# image: prom/prometheus
# volumes:
# - ./prometheus.yml:/etc/prometheus/prometheus.yml
# grafana:
# image: grafana/grafana
# ports:
# - "3000:3000"
8. Stínové nasazení a postupné zavádění
Před vystavením nového modelu reálnému provozu, stínové nasazení umožňuje ověřit latenci a chování bez jakéhokoli rizika pro uživatele. Stínový model dostává stejné požadavky jako produkční model, ale vlastní předpovědi jsou vyřazeny – slouží pouze k monitorování.
import asyncio
import time
from typing import Callable, Dict, Any
class ShadowDeploymentManager:
"""
Gestisce il shadow deployment di un nuovo modello NLP.
Il modello shadow riceve tutto il traffico ma non risponde agli utenti.
"""
def __init__(self,
production_model: Callable,
shadow_model: Callable,
shadow_name: str = "shadow_v2"):
self.production_model = production_model
self.shadow_model = shadow_model
self.shadow_name = shadow_name
self.comparison_log: list = []
def predict(self, text: str, user_id: str = None) -> Dict[str, Any]:
"""
Esegue la predizione in produzione e in background quella shadow.
Restituisce solo il risultato del modello di produzione.
"""
# Predizione produzione (sincrona)
prod_start = time.time()
prod_result = self.production_model(text)
prod_latency = (time.time() - prod_start) * 1000
# Predizione shadow (asincrona, non blocca la risposta)
shadow_start = time.time()
try:
shadow_result = self.shadow_model(text)
shadow_latency = (time.time() - shadow_start) * 1000
shadow_error = None
except Exception as e:
shadow_result = None
shadow_latency = None
shadow_error = str(e)
# Log del confronto
self.comparison_log.append({
"text_hash": hash(text),
"prod_label": prod_result.get("label"),
"prod_confidence": prod_result.get("confidence"),
"prod_latency_ms": prod_latency,
"shadow_label": shadow_result.get("label") if shadow_result else None,
"shadow_confidence": shadow_result.get("confidence") if shadow_result else None,
"shadow_latency_ms": shadow_latency,
"shadow_error": shadow_error,
"agreement": prod_result.get("label") == (shadow_result or {}).get("label")
})
# Restituisce SOLO il risultato del modello di produzione
return prod_result
def get_shadow_stats(self) -> Dict[str, Any]:
"""Calcola statistiche di confronto tra produzione e shadow."""
if not self.comparison_log:
return {"error": "Nessun dato di confronto disponibile"}
agreement_rate = sum(1 for r in self.comparison_log if r["agreement"]) / len(self.comparison_log)
prod_latencies = [r["prod_latency_ms"] for r in self.comparison_log if r["prod_latency_ms"]]
shadow_latencies = [r["shadow_latency_ms"] for r in self.comparison_log if r["shadow_latency_ms"]]
error_rate = sum(1 for r in self.comparison_log if r["shadow_error"]) / len(self.comparison_log)
import numpy as np
return {
"n_requests": len(self.comparison_log),
"agreement_rate": round(agreement_rate, 4),
"prod_p95_latency_ms": round(np.percentile(prod_latencies, 95), 1) if prod_latencies else None,
"shadow_p95_latency_ms": round(np.percentile(shadow_latencies, 95), 1) if shadow_latencies else None,
"shadow_error_rate": round(error_rate, 4),
"ready_for_promotion": agreement_rate >= 0.95 and error_rate < 0.01
}
# Strategia di rollout graduale: 1% → 10% → 50% → 100%
ROLLOUT_STAGES = [
{"traffic_pct": 0.01, "min_requests": 500, "min_agreement": 0.95},
{"traffic_pct": 0.10, "min_requests": 2000, "min_agreement": 0.96},
{"traffic_pct": 0.50, "min_requests": 5000, "min_agreement": 0.97},
{"traffic_pct": 1.00, "min_requests": None, "min_agreement": None}, # full rollout
]
9. Společné anti-vzorce v monitorování NLP
Anti-vzory, kterým je třeba se vyhnout
- Sledujte pouze latenci: latence a metrika infrastruktury, ne kvalita modelu. Rychlý, ale špatný model a horší než pomalý správný.
- Žádná referenční distribuce: Detekce posunu nemá smysl bez pevné referenční distribuce vypočítané na základě tréninkových/validačních dat.
- Pozor únava: Příliš citlivé prahové hodnoty zaplavují tým na zavolání falešnými poplachy. Začněte s konzervativními prahovými hodnotami a proveďte kalibraci na základě pozorovaných vzorců.
- Rozhodnutí na jeden signál: Nikdy nespouštějte přeškolování na základě na jednom ukazateli. Vyžadujte alespoň dva nezávislé, souhlasné signály.
- Ignorujte kvalitu upstream dat: Sledovat model bez monitorovat datový kanál a neúplné. Platné schéma a aktuálnost vstupních dat.
- Přeškolení bez offline ověřování: Automaticky zasunutý model musí před nasazením projít offline testovací sadou, i když byl spouštěč automatický.
10. Kompletní kontrolní seznam pro monitorování NLP
Kontrolní seznam monitorování NLP ve výrobě
- Logování: zaznamenejte každou předpověď pomocí textu (nebo hash), spolehlivosti, latence, verze modelu a časové razítko ve formátu JSONL
- Detekce driftu: Kontrolujte posun vkládání týdně na 1000 vzorových oknech; okamžité upozornění, pokud KS > 0,15 nebo PSI > 0,2
- Proxy metriky: sledování distribuce spolehlivosti, distribuce štítků a latence v reálném čase přes Prometheus
- Sbírka Ground Truth: sbírat skutečné štítky prostřednictvím zpětné vazby od uživatelů, anotační tým nebo náhodný výběr (1–5 % návštěvnosti)
- Spouštěč rekvalifikace: Definujte jasné prahové hodnoty pro automatickou rekvalifikaci (např. skóre driftu > 0,2 nebo odhadovaná přesnost < 0,85); požádat o 2+ souhlasné signály
- Stínové nasazení: Před A/B testováním ověřte nový model v režimu stínu po dobu alespoň 24 hodin
- A/B testování: ověřuje každou novou verzi modelu s 10 % provozu po dobu alespoň 48 hodin před úplným zavedením
- Upozornění: konfigurace upozornění (Slack, PagerDuty) pro výstrahy s vysokou závažností s odkazy na runbooky odpovědí
- Uchovávání dat: Uchovávejte protokoly alespoň 90 dní pro historickou analýzu
- GDPR: anonymizovat nebo hashovat uživatelské texty v produkčních protokolech; Nikdy neukládejte PII bez výslovného souhlasu a šifrování
Závěry: Konec série
Tímto článkem seriál uzavíráme Moderní NLP: od BERT po LLM. Byli jsme na úplné cestě: od základů tokenizace a vkládání, po architekturu BERT, od analýzy sentimentu pro italštinu až po jemné doladění místních LLM, od sémantické podobnosti k monitorování ve výrobě.
Shrnutí seriálu
| # | Položka | Klíčové pojmy |
|---|---|---|
| 1 | Základy NLP | Tokenizace, Word2Vec, GloVe, potrubí |
| 2 | BERT a transformátory | Architektura, MLM, NSP, dolaďování |
| 3 | Analýza sentimentu | VADER, BERT, výroba, FastAPI |
| 4 | NLP italsky | Feel-it, AlBERTo, spaCy, dialekty |
| 5 | Rozpoznávání pojmenované entity | BIO formát, spaCy, BERT NER, seqeval |
| 6 | Klasifikace textu | Multi-label, zero-shot, SetFit |
| 7 | Transformátory HuggingFace | AutoClass, Trainer, PEFT, Accelerate |
| 8 | Lokální doladění | LoRA, QLoRA, DAPT, katastrofické zapomínání |
| 9 | Sémantická podobnost | SBERT, FAISS, bi-kodér, křížový kodér |
| 10 | Monitorování NLP | Detekce posunu, proxy metriky, rekvalifikace |
Související série k prozkoumání
- AI inženýrství / RAG: Sestavte kompletní systémy RAG s vložkami a techniky sémantického vyhledávání v této sérii
- Pokročilé hluboké učení: dozvědět se více o kvantování, ořezávání a optimalizační techniky pro velké modely
- MLOps: automatizujte monitorování a rekvalifikaci pomocí MLflow, Potrubí DVC a CI/CD pro modely ML
- Počítačové vidění: mnoho technik v této sérii (architektury podobné BERT, ViT, jemné ladění) platí i pro CV







