Algoritmy adaptivního učení: Od teorie k produkci
Sen o osobním učiteli pro každého studenta, o někom, kdo neustále kalibruje úroveň problémy, identifikuje mezery ve znalostech a nabízí správný obsah ve správný čas, dnes dosažitelné díky adaptivní učební algoritmy. Není to sci-fi: platformy jako Khan Academy, Duolingo a Coursera slouží milionům studentů s vlastními cestami algoritmicky v reálném čase.
Výzva není teoretická, ale inženýrská. Jak zavést systém Teorie odezvy na položku (IRT) který se škáluje na milion studentů, aniž by se snížila latence? Jak integrovat model Sledování znalostí v produkčním ML potrubí s nepřetržitým monitorováním a A/B testováním? Jak vyvažujete průzkum a využívání v systému doporučení, který musí být obojí přesné a pedagogicky platné?
V tomto článku se těmito otázkami zabýváme konkrétním kódem, škálovatelnými architekturami a lekcemi naučili se ze systémů ve výrobě. Začneme od matematiky IRT, projdeme Bayesovskými znalostmi Tracing a Deep Knowledge Tracing, aby se dospělo ke kompletnímu systému s kanálem funkcí, modelem nasazený a A/B testovací rámec.
Co se naučíte
- Matematické základy Item Response Theory (IRT) a jak ji implementovat v Pythonu
- Bayesian Knowledge Tracing (BKT) vs Deep Knowledge Tracing (DKT): Kdy použít které
- Funkce pro učení signálů: čas, spolehlivost, sekvenční chyby
- Architektura adaptivního doporučovacího enginu s FastAPI a PostgreSQL
- A/B testovací rámec pro ověřování algoritmů v produkci
- Monitorování a detekce posunu pro vzdělávací modely
1. Teorie odezvy na položku: Základní model
L'Teorie odezvy na položku (IRT) a matematický základ moderního měření vzdělávání. IRT, kterou v 60. letech zavedli Georg Rasch a Lord, modeluje pravděpodobnost, že student zareaguje. správně na položku (otázku) v závislosti na její latentní schopnosti a charakteristikách položky.
Ve výrobě je nejpoužívanější model 2PL (Two-Parameter Logistic):
P(X=1 | theta, a, b) = 1 / (1 + exp(-a * (theta - b)))
Kde theta a schopnosti studenta, a a diskriminační parametr
(jak dobře položka rozlišuje mezi studenty různých schopností) e b a parametr
obtížnost (hodnota theta, pro kterou je pravděpodobnost správné odpovědi 0,5).
import numpy as np
from scipy.optimize import minimize
from scipy.special import expit # sigmoid function
class IRTModel2PL:
"""
Two-Parameter Logistic IRT Model.
Calibra difficulty (b) e discrimination (a) per ogni item.
Stima l'ability (theta) per ogni studente.
"""
def __init__(self):
self.item_params = {} # {item_id: {'a': float, 'b': float}}
self.student_abilities = {} # {student_id: float}
def probability_correct(self, theta: float, a: float, b: float) -> float:
"""P(correct | theta, a, b) usando il modello 2PL."""
return expit(a * (theta - b))
def log_likelihood_student(self, theta: float, responses: list) -> float:
"""
Log-likelihood per uno studente data la sequenza di risposte.
responses: lista di tuple (item_id, is_correct)
"""
ll = 0.0
for item_id, is_correct in responses:
if item_id not in self.item_params:
continue
a = self.item_params[item_id]['a']
b = self.item_params[item_id]['b']
p = self.probability_correct(theta, a, b)
# Clip per stabilità numerica
p = np.clip(p, 1e-9, 1 - 1e-9)
ll += is_correct * np.log(p) + (1 - is_correct) * np.log(1 - p)
return ll
def estimate_student_ability(
self,
student_id: str,
responses: list,
prior_mean: float = 0.0,
prior_std: float = 1.0
) -> float:
"""
MAP estimation dell'ability di uno studente.
Usa prior Gaussiano N(0,1) per regolarizzazione.
"""
def negative_map(theta_array):
theta = theta_array[0]
ll = self.log_likelihood_student(theta, responses)
# Log prior Gaussiano
log_prior = -0.5 * ((theta - prior_mean) / prior_std) ** 2
return -(ll + log_prior)
result = minimize(
negative_map,
x0=[0.0],
method='L-BFGS-B',
bounds=[(-4.0, 4.0)]
)
ability = result.x[0]
self.student_abilities[student_id] = ability
return ability
def next_item_cat(
self,
student_id: str,
available_items: list,
strategy: str = 'max_information'
) -> str:
"""
Computerized Adaptive Testing: seleziona il prossimo item ottimale.
Strategie disponibili:
- 'max_information': massimizza l'informazione di Fisher al theta corrente
- 'target_difficulty': seleziona item con b vicino a theta
"""
theta = self.student_abilities.get(student_id, 0.0)
best_item = None
best_score = -np.inf
for item_id in available_items:
if item_id not in self.item_params:
continue
a = self.item_params[item_id]['a']
b = self.item_params[item_id]['b']
if strategy == 'max_information':
# Informazione di Fisher per il modello 2PL
p = self.probability_correct(theta, a, b)
q = 1 - p
fisher_info = (a ** 2) * p * q
score = fisher_info
elif strategy == 'target_difficulty':
# Item più vicino al livello dello studente
score = -abs(b - theta)
if score > best_score:
best_score = score
best_item = item_id
return best_item
def calibrate_items_mle(self, response_matrix: np.ndarray) -> None:
"""
Calibra i parametri degli item usando Maximum Likelihood.
response_matrix: (n_students, n_items) con valori 0/1
Implementazione semplificata - in produzione usa py-irt o mirt.
"""
# Stima iniziale delle abilita con proporzione di risposte corrette
n_students, n_items = response_matrix.shape
abilities = np.zeros(n_students)
for iteration in range(20): # EM iterations
# E-step: aggiorna abilita dato params items
for s in range(n_students):
responses = [
(f'item_{i}', int(response_matrix[s, i]))
for i in range(n_items)
if f'item_{i}' in self.item_params
]
if responses:
abilities[s] = self.estimate_student_ability(
f'student_{s}', responses
)
# M-step: aggiorna params items dato abilita
for i in range(n_items):
item_id = f'item_{i}'
# Stima semplificata: in produzione usa marginal MLE
p_mean = response_matrix[:, i].mean()
b_estimate = -np.log(p_mean / (1 - p_mean + 1e-9))
self.item_params[item_id] = {
'a': 1.0, # discrimination iniziale
'b': np.clip(b_estimate, -3.0, 3.0)
}
# Esempio di utilizzo
model = IRTModel2PL()
# Calibra con dati storici
import numpy as np
np.random.seed(42)
responses = np.random.binomial(1, 0.6, (500, 50))
model.calibrate_items_mle(responses)
# Stima ability di un nuovo studente
student_responses = [
('item_0', 1), ('item_5', 0), ('item_10', 1),
('item_15', 1), ('item_20', 0)
]
ability = model.estimate_student_ability('student_new', student_responses)
print(f"Estimated ability: {ability:.3f}") # es: 0.342
# Seleziona il prossimo item
next_item = model.next_item_cat('student_new', [f'item_{i}' for i in range(50)])
print(f"Next optimal item: {next_item}")
2. Bayesian Knowledge Tracing: Modelování progrese v čase
Zatímco IRT vyfotografuje schopnosti studenta během okamžiku, Bayesian Knowledge Tracing (BKT) modeluje, jak se znalosti v průběhu času mění prostřednictvím učení. Zaveden Corbettem a Andersonem v roce 1994, BKT zůstává základem v adaptivních systémech díky své interpretovatelnosti.
BKT je skrytý Markovův model se čtyřmi parametry pro znalostní složku (KC):
- P(L0): počáteční pravděpodobnost, že student zná KC
- P(T): pravděpodobnost přechodu (naučení KC po praktické příležitosti)
- P(G): pravděpodobnost uhodnutí (odpovíte správně, přestože neví)
- P(S): pravděpodobnost uklouznutí (odpoví nesprávně, přestože ví)
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class BKTParams:
"""Parametri BKT per un knowledge component."""
p_learn: float # P(L0) - prior knowledge
p_transit: float # P(T) - learning rate
p_guess: float # P(G) - guess rate
p_slip: float # P(S) - slip rate
kc_id: str # Knowledge Component ID
class BKTTracker:
"""
Bayesian Knowledge Tracing tracker per singolo studente.
Aggiorna la probabilità di padronanza dopo ogni risposta.
"""
def __init__(self, params: BKTParams):
self.params = params
self.p_mastery = params.p_learn # Stima corrente di padronanza
self.history = []
def update(self, is_correct: bool) -> float:
"""
Aggiorna la stima di padronanza dopo una risposta.
Restituisce la nuova probabilità di padronanza.
Passi:
1. Calcola P(correct | mastery) e P(correct | !mastery)
2. Applica Bayes per aggiornare P(mastery | correct/incorrect)
3. Applica la transizione di apprendimento
"""
p = self.p_mastery
# Step 1: Calcola probabilità di osservare la risposta
if is_correct:
# P(correct | mastery) = 1 - P(slip)
p_obs_given_know = 1 - self.params.p_slip
# P(correct | !mastery) = P(guess)
p_obs_given_not = self.params.p_guess
else:
# P(incorrect | mastery) = P(slip)
p_obs_given_know = self.params.p_slip
# P(incorrect | !mastery) = 1 - P(guess)
p_obs_given_not = 1 - self.params.p_guess
# Step 2: Posterior di padronanza (formula di Bayes)
p_correct_total = (
p_obs_given_know * p +
p_obs_given_not * (1 - p)
)
# Evita divisione per zero
if p_correct_total < 1e-10:
p_posterior = p
else:
p_posterior = (p_obs_given_know * p) / p_correct_total
# Step 3: Applica transizione di apprendimento
p_new = p_posterior + (1 - p_posterior) * self.params.p_transit
self.p_mastery = p_new
self.history.append({
'response': is_correct,
'p_mastery_before': p,
'p_mastery_after': p_new
})
return p_new
def is_mastered(self, threshold: float = 0.95) -> bool:
"""Ritorna True se la padronanza supera la soglia."""
return self.p_mastery >= threshold
def recommended_action(self) -> str:
"""Raccomanda l'azione successiva basata sullo stato corrente."""
p = self.p_mastery
if p >= 0.95:
return 'advance' # Procedi al KC successivo
elif p >= 0.70:
return 'practice' # Pratica ulteriore
elif p >= 0.40:
return 'hint' # Offri un hint
else:
return 'remediation' # Torna ai prerequisiti
class BKTSystem:
"""Sistema BKT multi-studente multi-KC."""
def __init__(self, kc_params: dict):
"""
kc_params: {kc_id: BKTParams}
"""
self.kc_params = kc_params
self.trackers = {} # {(student_id, kc_id): BKTTracker}
def get_tracker(self, student_id: str, kc_id: str) -> BKTTracker:
"""Recupera o crea un tracker per student/KC."""
key = (student_id, kc_id)
if key not in self.trackers:
if kc_id not in self.kc_params:
raise ValueError(f"KC {kc_id} non trovato")
self.trackers[key] = BKTTracker(self.kc_params[kc_id])
return self.trackers[key]
def process_response(
self,
student_id: str,
kc_id: str,
is_correct: bool
) -> dict:
"""Processa una risposta e restituisce lo stato aggiornato."""
tracker = self.get_tracker(student_id, kc_id)
p_mastery = tracker.update(is_correct)
return {
'student_id': student_id,
'kc_id': kc_id,
'p_mastery': round(p_mastery, 4),
'is_mastered': tracker.is_mastered(),
'recommended_action': tracker.recommended_action()
}
def get_mastery_profile(self, student_id: str) -> dict:
"""Restituisce il profilo completo di padronanza di uno studente."""
profile = {}
for (sid, kc_id), tracker in self.trackers.items():
if sid == student_id:
profile[kc_id] = {
'p_mastery': round(tracker.p_mastery, 4),
'is_mastered': tracker.is_mastered(),
'attempts': len(tracker.history)
}
return profile
# Esempio configurazione sistema
kc_params = {
'algebra_linear': BKTParams(
p_learn=0.20, # 20% conosce gia
p_transit=0.15, # 15% impara per ogni pratica
p_guess=0.25, # 25% probabilità di indovinare
p_slip=0.10, # 10% probabilità di sbagliare pur sapendo
kc_id='algebra_linear'
),
'calcolo_derivate': BKTParams(
p_learn=0.10,
p_transit=0.12,
p_guess=0.15,
p_slip=0.08,
kc_id='calcolo_derivate'
)
}
bkt_system = BKTSystem(kc_params)
# Simula una sessione di apprendimento
responses_sequence = [False, False, True, True, True, True, False, True]
for i, correct in enumerate(responses_sequence):
result = bkt_system.process_response('student_42', 'algebra_linear', correct)
print(f"Risposta {i+1}: {'corretta' if correct else 'errata'} -> "
f"P(mastery)={result['p_mastery']:.3f}, "
f"Azione: {result['recommended_action']}")
3. Deep Knowledge Tracing: Neural Approach
Il Deep Knowledge Tracing (DKT), představený Piechem et al. v roce 2015 nahrazuje zjednodušující předpoklady BKT s rekurentní neuronovou sítí schopnou zachytit závislosti komplex mezi znalostními složkami. DKT dosahuje u datových sad o 20–30 % vyšší přesnosti než BKT reálné podle výzkumu z roku 2025 (87,5% přesnost předpovědi v nedávných studiích).
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
class DKTModel(nn.Module):
"""
Deep Knowledge Tracing con LSTM.
Input: sequenza di (item_id, risposta) encodata
Output: probabilità di risposta corretta per ogni item
"""
def __init__(
self,
n_items: int,
hidden_size: int = 128,
n_layers: int = 2,
dropout: float = 0.2
):
super().__init__()
self.n_items = n_items
self.hidden_size = hidden_size
# Input encoding: item_id * 2 (correct/incorrect)
# Ogni item ha 2 rappresentazioni: risposta corretta e risposta errata
self.input_size = n_items * 2
self.lstm = nn.LSTM(
input_size=self.input_size,
hidden_size=hidden_size,
num_layers=n_layers,
dropout=dropout if n_layers > 1 else 0,
batch_first=True
)
self.output_layer = nn.Sequential(
nn.Linear(hidden_size, n_items),
nn.Sigmoid()
)
def encode_input(
self,
item_ids: torch.Tensor,
responses: torch.Tensor
) -> torch.Tensor:
"""
Encoding one-hot per (item, response).
item_ids: (batch, seq_len) - ID degli item
responses: (batch, seq_len) - 0 o 1
Restituisce: (batch, seq_len, n_items * 2)
"""
batch_size, seq_len = item_ids.shape
x = torch.zeros(batch_size, seq_len, self.input_size)
# Item risposto correttamente: posizione item_id
# Item risposto erroneamente: posizione item_id + n_items
for b in range(batch_size):
for t in range(seq_len):
iid = item_ids[b, t].item()
r = responses[b, t].item()
if r == 1:
x[b, t, iid] = 1.0
else:
x[b, t, iid + self.n_items] = 1.0
return x
def forward(
self,
item_ids: torch.Tensor,
responses: torch.Tensor
) -> torch.Tensor:
"""
Forward pass.
Restituisce predizioni per il prossimo item in sequenza.
Output shape: (batch, seq_len, n_items)
"""
x = self.encode_input(item_ids, responses)
lstm_out, _ = self.lstm(x)
predictions = self.output_layer(lstm_out)
return predictions
class DKTDataset(Dataset):
"""Dataset per DKT - sequenze di interazioni studente."""
def __init__(
self,
sequences: list,
max_seq_len: int = 200,
pad_value: int = -1
):
"""
sequences: lista di dizionari
{'item_ids': [...], 'responses': [...]}
"""
self.sequences = sequences
self.max_seq_len = max_seq_len
self.pad_value = pad_value
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
seq = self.sequences[idx]
item_ids = seq['item_ids'][:self.max_seq_len]
responses = seq['responses'][:self.max_seq_len]
seq_len = len(item_ids)
# Padding
pad_len = self.max_seq_len - seq_len
item_ids = item_ids + [0] * pad_len
responses = responses + [0] * pad_len
mask = [1] * seq_len + [0] * pad_len
return {
'item_ids': torch.tensor(item_ids, dtype=torch.long),
'responses': torch.tensor(responses, dtype=torch.float),
'mask': torch.tensor(mask, dtype=torch.bool),
'seq_len': seq_len
}
def train_dkt(
model: DKTModel,
train_loader: DataLoader,
val_loader: DataLoader,
n_epochs: int = 50,
lr: float = 1e-3
) -> dict:
"""Training loop per DKT con early stopping."""
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.BCELoss(reduction='none')
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=5, factor=0.5
)
best_val_auc = 0.0
history = {'train_loss': [], 'val_auc': []}
for epoch in range(n_epochs):
model.train()
total_loss = 0.0
for batch in train_loader:
item_ids = batch['item_ids']
responses = batch['responses']
mask = batch['mask']
# Predici su t-1, target su t
# input: sequenza[:-1], target: sequenza[1:]
pred = model(item_ids[:, :-1], responses[:, :-1])
# Raccoglie le predizioni per gli item effettivi
target_items = item_ids[:, 1:]
target_responses = responses[:, 1:]
target_mask = mask[:, 1:]
# Estrai predizione per l'item corretto
batch_size, seq_len, n_items = pred.shape
pred_flat = pred.reshape(-1, n_items)
target_items_flat = target_items.reshape(-1)
pred_selected = pred_flat.gather(
1, target_items_flat.unsqueeze(1)
).squeeze(1)
# Loss mascherata (ignora padding)
loss = criterion(
pred_selected,
target_responses.reshape(-1)
)
mask_flat = target_mask.reshape(-1).float()
loss = (loss * mask_flat).sum() / mask_flat.sum()
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
history['train_loss'].append(avg_loss)
# Validation
val_auc = evaluate_dkt(model, val_loader)
history['val_auc'].append(val_auc)
scheduler.step(1 - val_auc) # Minimizza 1-AUC
if val_auc > best_val_auc:
best_val_auc = val_auc
torch.save(model.state_dict(), 'best_dkt_model.pt')
print(f"Epoch {epoch+1}/{n_epochs} - "
f"Loss: {avg_loss:.4f}, Val AUC: {val_auc:.4f}")
return history
4. Inženýrství funkcí pro výukové signály
Kvalita funkcí je často důležitější než architektura modelu. Adaptivní systém ve výrobě musí jít nad rámec jednoduché dvojice (položka, správně/nesprávně) a vystihnout kontext bohaté na interakci při učení.
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
import math
@dataclass
class LearningInteraction:
"""Una singola interazione di apprendimento."""
student_id: str
item_id: str
kc_ids: list # Knowledge components coinvolti
is_correct: bool
response_time_ms: int # Tempo di risposta in millisecondi
timestamp: datetime
hint_used: bool = False
attempt_number: int = 1
confidence_level: Optional[int] = None # 1-5 se richiesto
@dataclass
class StudentFeatures:
"""Feature aggregate per uno studente su un KC."""
student_id: str
kc_id: str
# Accuracy features
total_attempts: int = 0
correct_count: int = 0
recent_accuracy: float = 0.0 # Ultimi 5 tentativi
streak_correct: int = 0 # Sequenza corrente di corrette
streak_incorrect: int = 0
# Time features
avg_response_time: float = 0.0
response_time_trend: float = 0.0 # Positivo = rallentamento
time_since_last_practice: float = 0.0 # In ore
# Engagement features
hint_rate: float = 0.0 # Proporzione con hint
first_attempt_accuracy: float = 0.0 # Solo primi tentativi
# Memory features (Ebbinghaus forgetting curve)
estimated_retention: float = 1.0
class FeatureExtractor:
"""
Estrae feature per il sistema adattivo da sequenze di interazioni.
"""
def __init__(self, decay_factor: float = 0.3):
"""
decay_factor: controlla la decadenza della curva di dimenticanza
"""
self.decay_factor = decay_factor
def ebbinghaus_retention(
self,
days_since_last: float,
memory_strength: float = 1.0
) -> float:
"""
Modello di dimenticanza di Ebbinghaus.
R = exp(-t / (S * stability_factor))
"""
if days_since_last <= 0:
return 1.0
stability = max(memory_strength, 0.1)
return math.exp(-days_since_last / (stability * 30))
def compute_features(
self,
student_id: str,
kc_id: str,
interactions: list
) -> StudentFeatures:
"""
Calcola le feature per uno studente su un KC.
interactions: lista di LearningInteraction ordinate per timestamp
"""
kc_interactions = [
i for i in interactions
if kc_id in i.kc_ids and i.student_id == student_id
]
if not kc_interactions:
return StudentFeatures(student_id=student_id, kc_id=kc_id)
features = StudentFeatures(student_id=student_id, kc_id=kc_id)
# Accuracy
features.total_attempts = len(kc_interactions)
features.correct_count = sum(1 for i in kc_interactions if i.is_correct)
# Recent accuracy (finestra ultimi 5)
recent = kc_interactions[-5:]
if recent:
features.recent_accuracy = sum(1 for i in recent if i.is_correct) / len(recent)
# Streak
streak_c = 0
streak_i = 0
for interaction in reversed(kc_interactions):
if interaction.is_correct:
if streak_i == 0:
streak_c += 1
else:
break
else:
if streak_c == 0:
streak_i += 1
else:
break
features.streak_correct = streak_c
features.streak_incorrect = streak_i
# Response time
times = [i.response_time_ms for i in kc_interactions]
features.avg_response_time = sum(times) / len(times)
# Trend risposta (positivo = rallentamento)
if len(times) >= 3:
recent_avg = sum(times[-3:]) / 3
older_avg = sum(times[:-3]) / max(len(times) - 3, 1)
features.response_time_trend = (recent_avg - older_avg) / max(older_avg, 1)
# Tempo dall'ultima pratica
last_practice = kc_interactions[-1].timestamp
now = datetime.now()
hours_since = (now - last_practice).total_seconds() / 3600
features.time_since_last_practice = hours_since
# Hint rate
features.hint_rate = sum(1 for i in kc_interactions if i.hint_used) / len(kc_interactions)
# First attempt accuracy
first_attempts = [i for i in kc_interactions if i.attempt_number == 1]
if first_attempts:
features.first_attempt_accuracy = (
sum(1 for i in first_attempts if i.is_correct) / len(first_attempts)
)
# Retention stimata
days_since = hours_since / 24
memory_strength = features.first_attempt_accuracy * 2 # 0-2
features.estimated_retention = self.ebbinghaus_retention(days_since, memory_strength)
return features
def compute_next_review_time(self, features: StudentFeatures) -> datetime:
"""
Calcola il momento ottimale per la prossima revisione (spaced repetition).
Ispirato all'algoritmo SM-2 di SuperMemo.
"""
accuracy = features.first_attempt_accuracy
attempts = features.total_attempts
# Intervallo di base in giorni
if attempts <= 1:
interval_days = 1
elif attempts == 2:
interval_days = 3
else:
# Ease factor basato sull'accuracy
ease = 1.3 + (2.5 - 1.3) * accuracy
interval_days = max(1, round(
features.time_since_last_practice / 24 * ease
))
return datetime.now() + timedelta(days=interval_days)
5. Architektura produkčního systému
Adaptivní systém v produkci musí zpracovat stovky požadavků za sekundu s nižší latencí těch 100 ms. Následující architektura odděluje online cesta (podává se v reálném čase) odoffline cesta (dávkové školení a kalibrace).
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
import aioredis
import asyncpg
app = FastAPI(title="Adaptive Learning API")
# ---- Modelli Pydantic ----
class ResponseRequest(BaseModel):
student_id: str
item_id: str
is_correct: bool
response_time_ms: int
hint_used: bool = False
class NextItemResponse(BaseModel):
item_id: str
estimated_difficulty: float
reason: str
mastery_state: dict
# ---- Servizi ----
class AdaptiveEngine:
"""
Engine principale per raccomandazioni adattive.
Usa Redis per feature cache e PostgreSQL per persistenza.
"""
def __init__(self, redis_client, db_pool):
self.redis = redis_client
self.db = db_pool
self.bkt_system = None # Inizializzato al startup
self.irt_model = None
async def process_response(
self,
request: ResponseRequest,
background_tasks: BackgroundTasks
) -> dict:
"""
Pipeline di processing per una risposta.
1. Persiste la risposta nel DB (async)
2. Aggiorna lo stato BKT dalla cache Redis
3. Seleziona il prossimo item
4. Schedula aggiornamento features in background
"""
student_id = request.student_id
item_id = request.item_id
# 1. Get KCs per questo item
kc_ids = await self.get_item_kcs(item_id)
# 2. Aggiorna BKT per ogni KC (via Redis)
mastery_updates = {}
for kc_id in kc_ids:
cache_key = f"bkt:{student_id}:{kc_id}"
# Recupera stato dalla cache
cached = await self.redis.get(cache_key)
if cached:
p_mastery = float(cached)
else:
# Fallback al DB
p_mastery = await self.get_mastery_from_db(student_id, kc_id)
# Aggiorna con nuova risposta
tracker = BKTTracker(await self.get_bkt_params(kc_id))
tracker.p_mastery = p_mastery
new_mastery = tracker.update(request.is_correct)
# Salva in Redis con TTL 24h
await self.redis.setex(cache_key, 86400, str(new_mastery))
mastery_updates[kc_id] = new_mastery
# 3. Seleziona prossimo item
next_item = await self.select_next_item(student_id, mastery_updates)
# 4. Background: persisti nel DB
background_tasks.add_task(
self.persist_interaction,
request,
mastery_updates
)
return {
'mastery_updates': mastery_updates,
'next_item': next_item
}
async def select_next_item(
self,
student_id: str,
mastery_updates: dict
) -> dict:
"""
Seleziona il prossimo item usando una strategia multi-obiettivo:
- Bilancia apprendimento di nuovi KC vs rinforzo di quelli deboli
- Considera la curva di dimenticanza
- Rispetta il curriculum graph
"""
# Ottieni profilo completo del curriculum
curriculum = await self.get_curriculum_graph(student_id)
# Calcola priorità per ogni KC disponibile
priorities = {}
for kc_id, kc_data in curriculum.items():
mastery = mastery_updates.get(kc_id, kc_data.get('mastery', 0.0))
# Priorità alta per KC quasi-mastered (0.7-0.9) - spingili oltre la soglia
if 0.70 <= mastery < 0.95:
priority = 2.0 * (mastery - 0.70) / 0.25
# Priorità media per KC molto deboli - ma non sopraffatti
elif mastery < 0.30:
priority = 0.5
# KC mastered - bassa priorità, solo spaced repetition
elif mastery >= 0.95:
retention = kc_data.get('estimated_retention', 1.0)
priority = (1 - retention) * 0.3
else:
priority = 1.0
priorities[kc_id] = priority
# Seleziona KC con più alta priorità
if not priorities:
raise HTTPException(status_code=404, detail="No available KCs")
target_kc = max(priorities, key=priorities.get)
# Seleziona item per questo KC usando IRT
available_items = await self.get_items_for_kc(target_kc)
student_ability = await self.get_student_ability(student_id)
# CAT selection
optimal_item = self.irt_model.next_item_cat(
student_id, available_items, strategy='max_information'
)
item_params = self.irt_model.item_params.get(optimal_item, {'a': 1.0, 'b': 0.0})
return {
'item_id': optimal_item,
'kc_id': target_kc,
'estimated_difficulty': item_params.get('b', 0.0),
'student_ability': student_ability,
'mastery_before': mastery_updates.get(target_kc, 0.0),
'priority_reason': 'near_mastery' if priorities[target_kc] > 1.5 else 'standard'
}
async def get_item_kcs(self, item_id: str) -> list:
"""Recupera i KC associati a un item dal DB."""
async with self.db.acquire() as conn:
rows = await conn.fetch(
"SELECT kc_id FROM item_kc_mapping WHERE item_id = $1",
item_id
)
return [row['kc_id'] for row in rows]
# ---- Route FastAPI ----
@app.post("/api/v1/adaptive/response")
async def process_response(
request: ResponseRequest,
background_tasks: BackgroundTasks,
engine: AdaptiveEngine = None # Dipendenza iniettata
) -> NextItemResponse:
"""
Processa una risposta e restituisce il prossimo item adattivo.
Target latency: <100ms (p95)
"""
result = await engine.process_response(request, background_tasks)
next_item = result['next_item']
return NextItemResponse(
item_id=next_item['item_id'],
estimated_difficulty=next_item['estimated_difficulty'],
reason=next_item['priority_reason'],
mastery_state=result['mastery_updates']
)
6. A/B testování pro adaptivní algoritmy
Ověření nového algoritmu v produkci vyžaduje A/B testovací rámec, který respektuje zvláštnosti vzdělávání: žáci nejsou zaměnitelní, účinky učení jsou kumulativní a relevantní metriky přesahují míru prokliku.
import hashlib
from enum import Enum
from typing import Callable
import scipy.stats as stats
class ExperimentVariant(Enum):
CONTROL = "control"
TREATMENT = "treatment"
class ABTestManager:
"""
Framework A/B testing per algoritmi adattivi.
Usa student_id hashing per assegnazione deterministica.
"""
def __init__(self, experiment_id: str, treatment_fraction: float = 0.5):
self.experiment_id = experiment_id
self.treatment_fraction = treatment_fraction
self.metrics = {
ExperimentVariant.CONTROL: [],
ExperimentVariant.TREATMENT: []
}
def assign_variant(self, student_id: str) -> ExperimentVariant:
"""
Assegna uno studente a control o treatment in modo deterministico.
Lo stesso studente riceve sempre la stessa variante (no leakage).
"""
hash_input = f"{self.experiment_id}:{student_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
bucket = (hash_value % 100) / 100.0
return (
ExperimentVariant.TREATMENT
if bucket < self.treatment_fraction
else ExperimentVariant.CONTROL
)
def record_metric(
self,
student_id: str,
metric_value: float,
metric_name: str = 'mastery_gain'
):
"""Registra una metrica per uno studente."""
variant = self.assign_variant(student_id)
self.metrics[variant].append(metric_value)
def analyze_results(self) -> dict:
"""
Analisi statistica con Welch's t-test.
Adatto a campioni di dimensioni diverse.
"""
control = self.metrics[ExperimentVariant.CONTROL]
treatment = self.metrics[ExperimentVariant.TREATMENT]
if len(control) < 30 or len(treatment) < 30:
return {'error': 'Campione insufficiente (min 30 per variante)'}
t_stat, p_value = stats.ttest_ind(
treatment, control, equal_var=False
)
control_mean = sum(control) / len(control)
treatment_mean = sum(treatment) / len(treatment)
relative_lift = (treatment_mean - control_mean) / max(abs(control_mean), 1e-9)
# Cohen's d per effect size
pooled_std = (
(sum((x - control_mean)**2 for x in control) +
sum((x - treatment_mean)**2 for x in treatment)) /
(len(control) + len(treatment) - 2)
) ** 0.5
cohens_d = (treatment_mean - control_mean) / max(pooled_std, 1e-9)
return {
'experiment_id': self.experiment_id,
'control_n': len(control),
'treatment_n': len(treatment),
'control_mean': round(control_mean, 4),
'treatment_mean': round(treatment_mean, 4),
'relative_lift': round(relative_lift * 100, 2), # Percentuale
'p_value': round(p_value, 4),
'statistically_significant': p_value < 0.05,
'cohens_d': round(cohens_d, 3),
'practical_significance': abs(cohens_d) > 0.2,
'recommendation': (
'Deploy treatment' if p_value < 0.05 and cohens_d > 0.2
else 'Keep control' if p_value < 0.05 and cohens_d < -0.2
else 'No significant difference'
)
}
# Metriche chiave per A/B testing in EdTech
EDTECH_METRICS = [
'mastery_gain_per_session', # Incremento P(mastery) per sessione
'time_to_mastery', # Minuti necessari per raggiungere P(mastery)>=0.95
'session_completion_rate', # % sessioni completate
'return_rate_7d', # % studenti che tornano entro 7 giorni
'assessment_accuracy_post', # Accuratezza nei test formali post-training
]
7. Monitorování a detekce driftu
Vzdělávací modely podléhají koncept drift z různých důvodů: změna kurikulum, akademická sezónnost, aktualizace učebního materiálu. Monitorovací systém robustní a zásadní pro udržení kvality předpovědí v průběhu času.
Driftové signály v adaptivních systémech
- Posun přesnosti: AUC modelu DKT klesá v aktuálním období pod 0,70
- Kalibrační drift: Pravděpodobnostní segmenty neodpovídají pozorovaným přesnostem
- Posun funkce: Rozložení doby odezvy se výrazně mění
- Posun štítku: podíl správných odpovědí se u stabilních kohort mění
from collections import deque
import numpy as np
class ModelMonitor:
"""
Monitor per rilevamento drift in modelli adattivi.
Implementa ADWIN (Adaptive Windowing) per concept drift detection.
"""
def __init__(
self,
window_size: int = 1000,
drift_threshold: float = 0.02,
alert_callback: Callable = None
):
self.window = deque(maxlen=window_size)
self.drift_threshold = drift_threshold
self.alert_callback = alert_callback
self.baseline_auc = None
def add_prediction(self, predicted_prob: float, actual_label: int):
"""Aggiunge una predizione al buffer di monitoraggio."""
self.window.append((predicted_prob, actual_label))
if len(self.window) >= 100:
self._check_drift()
def _check_drift(self):
"""Verifica il drift confrontando finestre recenti vs storiche."""
window_list = list(self.window)
n = len(window_list)
half = n // 2
# Calcola AUC su prima e seconda meta
first_half = window_list[:half]
second_half = window_list[half:]
auc_first = self._compute_auc(first_half)
auc_second = self._compute_auc(second_half)
drift_magnitude = abs(auc_first - auc_second)
if drift_magnitude > self.drift_threshold:
alert = {
'type': 'concept_drift',
'auc_old': round(auc_first, 4),
'auc_new': round(auc_second, 4),
'magnitude': round(drift_magnitude, 4),
'recommended_action': (
'retrain' if drift_magnitude > 0.10
else 'monitor_closely'
)
}
if self.alert_callback:
self.alert_callback(alert)
def _compute_auc(self, predictions: list) -> float:
"""Calcola AUC-ROC per una lista di (predicted_prob, actual_label)."""
if not predictions:
return 0.5
sorted_preds = sorted(predictions, key=lambda x: x[0], reverse=True)
n_pos = sum(1 for _, y in predictions if y == 1)
n_neg = len(predictions) - n_pos
if n_pos == 0 or n_neg == 0:
return 0.5
tpr_points = []
fpr_points = []
tp = fp = 0
for prob, label in sorted_preds:
if label == 1:
tp += 1
else:
fp += 1
tpr_points.append(tp / n_pos)
fpr_points.append(fp / n_neg)
# Trapezoidal integration
auc = 0.0
for i in range(1, len(fpr_points)):
auc += (fpr_points[i] - fpr_points[i-1]) * (tpr_points[i] + tpr_points[i-1]) / 2
return auc
Anti-Pattern: Optimalizace nesprávné metriky
Nejzáludnějším nebezpečím v adaptivních systémech je optimalizace pro metriky zapojení (čas na platformě, kliknutí, relace) spíše než pro skutečné výsledky učení. Systém, který maximalizuje čas na platformě, by mohl vytvořit snadnou smyčku uspokojení, které studenty udržuje v komfortní zóně, místo aby je k ní tlačilo mistrovství. Před budováním definujte metriky úspěchu z pedagogického hlediska.
Nejlepší postupy pro adaptivní systémy ve výrobě
- Studený start: Pro nové studenty bez historie použijte krátké diagnostické hodnocení 5-10 položek před aktivací adaptivního algoritmu. Plochý prior vede k sekvencím pedagogicky neplatné.
- interpretovatelnost: Ke každému vždy implementujte srozumitelné vysvětlení doporučení. „Doporučujeme toto cvičení, protože máte potíže s odvozováním složených funkcí“ a zásadní pro důvěru studentů.
- Rozmanitost: Vyvarujte se vždy doporučování stejného typu položky. Integrovat parametr diverzity ve funkci výběru, aby se zabránilo efektu "filtrační bubliny".
- Omezení kurikula: Graf kurikula musí doporučení omezit. Nenavrhujte integrály studentovi, který ještě nezvládl derivace, samostatně z toho, co algoritmus navrhuje.
- Lidský dohled: Učitelé musí mít možnost přepsat doporučení a přijímat zprávy o postupu třídy.
Závěry
Algoritmy adaptivního učení představují jednu z nejvíce fascinujících oblastí na průsečíku strojového učení a pedagogických věd. Z matematické teorie IRT ke Scalable Architecture s Redis a FastAPI, cesta od výzkumu k produkci vyžaduje pozornost pedagogické validitě i výpočetní efektivitě.
Výsledky jsou konkrétní: dobře navržené systémy vykazují zlepšení o 20–40 %. čas do zvládnutí ve srovnání se statickými lineárními cestami. S rozvojem neurálních modelů stejně jako DKT a integrace s LLM se obor rychle vyvíjí směrem k doučování stále sofistikovanější personalizované.
Související články v sérii EdTech
- Článek 00: Škálovatelná architektura LMS: Vzor pro více nájemců
- Článek 04: Personalizovaný tutor s LLM a RAG
- Článek 06: Výuka analýzy s xAPI a Kafka







