Algoritmi de învățare adaptivă: de la teorie la producție
Visul unui profesor personal pentru fiecare elev, cineva care calibrează continuu nivelul de dificultăți, identifică lacunele de cunoștințe și oferă conținutul potrivit la momentul potrivit, astăzi realizabil datorită algoritmi de învățare adaptivă. Nu este science fiction: platforme precum Khan Academy, Duolingo și Coursera deservesc milioane de studenți cu căi personalizate algoritmic în timp real.
Provocarea nu este teoretică, ci este de inginerie. Cum se implementează un sistem Teoria răspunsului la item (IRT) care crește la un milion de studenți fără a se degrada în latență? Cum se integrează un model Urmărirea cunoştinţelor într-o conductă ML de producție cu monitorizare continuă și testare A/B? Cum echilibrați explorarea și exploatarea într-un sistem de recomandare care trebuie să fie ambele exacte și valabile din punct de vedere pedagogic?
În acest articol, abordăm aceste întrebări cu cod concret, arhitecturi scalabile și lecții învăţat din sistemele în producţie. Vom începe de la matematica IRT, vom trece prin Cunoașterea Bayesiană Urmărirea și urmărirea profundă a cunoștințelor, pentru a ajunge la un sistem complet cu pipeline de caracteristici, model implementat și cadru de testare A/B.
Ce vei învăța
- Bazele matematice ale Teoriei răspunsului la item (IRT) și cum să o implementați în Python
- Urmărirea cunoștințelor bayesiane (BKT) vs Urmărirea cunoștințelor profunde (DKT): Când să folosiți care
- Inginerie caracteristică pentru semnalele de învățare: timp, încredere, erori secvențiale
- Arhitectura unui motor de recomandare adaptiv cu FastAPI și PostgreSQL
- Cadru de testare A/B pentru validarea algoritmilor în producție
- Monitorizare și detecție a derivă pentru modele educaționale
1. Teoria răspunsului la item: modelul fundamental
L'Teoria răspunsului la item (IRT) și fundamentul matematic al măsurării educaționale moderne. Introdus în anii 1960 de Georg Rasch și Lord, IRT modelează probabilitatea ca un student să răspundă corect la un item (întrebare) în funcție de capacitatea sa latentă și de caracteristicile articolului.
Modelul 2PL (Logistică cu doi parametri) este cel mai utilizat în producție:
P(X=1 | theta, a, b) = 1 / (1 + exp(-a * (theta - b)))
Unde theta și capacitatea elevului, a și parametrul de discriminare
(cât de bine distinge itemul între elevi cu abilități diferite) e b iar parametrul de
dificultate (valoarea lui theta pentru care probabilitatea unui răspuns corect este 0,5).
import numpy as np
from scipy.optimize import minimize
from scipy.special import expit # sigmoid function
class IRTModel2PL:
"""
Two-Parameter Logistic IRT Model.
Calibra difficulty (b) e discrimination (a) per ogni item.
Stima l'ability (theta) per ogni studente.
"""
def __init__(self):
self.item_params = {} # {item_id: {'a': float, 'b': float}}
self.student_abilities = {} # {student_id: float}
def probability_correct(self, theta: float, a: float, b: float) -> float:
"""P(correct | theta, a, b) usando il modello 2PL."""
return expit(a * (theta - b))
def log_likelihood_student(self, theta: float, responses: list) -> float:
"""
Log-likelihood per uno studente data la sequenza di risposte.
responses: lista di tuple (item_id, is_correct)
"""
ll = 0.0
for item_id, is_correct in responses:
if item_id not in self.item_params:
continue
a = self.item_params[item_id]['a']
b = self.item_params[item_id]['b']
p = self.probability_correct(theta, a, b)
# Clip per stabilità numerica
p = np.clip(p, 1e-9, 1 - 1e-9)
ll += is_correct * np.log(p) + (1 - is_correct) * np.log(1 - p)
return ll
def estimate_student_ability(
self,
student_id: str,
responses: list,
prior_mean: float = 0.0,
prior_std: float = 1.0
) -> float:
"""
MAP estimation dell'ability di uno studente.
Usa prior Gaussiano N(0,1) per regolarizzazione.
"""
def negative_map(theta_array):
theta = theta_array[0]
ll = self.log_likelihood_student(theta, responses)
# Log prior Gaussiano
log_prior = -0.5 * ((theta - prior_mean) / prior_std) ** 2
return -(ll + log_prior)
result = minimize(
negative_map,
x0=[0.0],
method='L-BFGS-B',
bounds=[(-4.0, 4.0)]
)
ability = result.x[0]
self.student_abilities[student_id] = ability
return ability
def next_item_cat(
self,
student_id: str,
available_items: list,
strategy: str = 'max_information'
) -> str:
"""
Computerized Adaptive Testing: seleziona il prossimo item ottimale.
Strategie disponibili:
- 'max_information': massimizza l'informazione di Fisher al theta corrente
- 'target_difficulty': seleziona item con b vicino a theta
"""
theta = self.student_abilities.get(student_id, 0.0)
best_item = None
best_score = -np.inf
for item_id in available_items:
if item_id not in self.item_params:
continue
a = self.item_params[item_id]['a']
b = self.item_params[item_id]['b']
if strategy == 'max_information':
# Informazione di Fisher per il modello 2PL
p = self.probability_correct(theta, a, b)
q = 1 - p
fisher_info = (a ** 2) * p * q
score = fisher_info
elif strategy == 'target_difficulty':
# Item più vicino al livello dello studente
score = -abs(b - theta)
if score > best_score:
best_score = score
best_item = item_id
return best_item
def calibrate_items_mle(self, response_matrix: np.ndarray) -> None:
"""
Calibra i parametri degli item usando Maximum Likelihood.
response_matrix: (n_students, n_items) con valori 0/1
Implementazione semplificata - in produzione usa py-irt o mirt.
"""
# Stima iniziale delle abilita con proporzione di risposte corrette
n_students, n_items = response_matrix.shape
abilities = np.zeros(n_students)
for iteration in range(20): # EM iterations
# E-step: aggiorna abilita dato params items
for s in range(n_students):
responses = [
(f'item_{i}', int(response_matrix[s, i]))
for i in range(n_items)
if f'item_{i}' in self.item_params
]
if responses:
abilities[s] = self.estimate_student_ability(
f'student_{s}', responses
)
# M-step: aggiorna params items dato abilita
for i in range(n_items):
item_id = f'item_{i}'
# Stima semplificata: in produzione usa marginal MLE
p_mean = response_matrix[:, i].mean()
b_estimate = -np.log(p_mean / (1 - p_mean + 1e-9))
self.item_params[item_id] = {
'a': 1.0, # discrimination iniziale
'b': np.clip(b_estimate, -3.0, 3.0)
}
# Esempio di utilizzo
model = IRTModel2PL()
# Calibra con dati storici
import numpy as np
np.random.seed(42)
responses = np.random.binomial(1, 0.6, (500, 50))
model.calibrate_items_mle(responses)
# Stima ability di un nuovo studente
student_responses = [
('item_0', 1), ('item_5', 0), ('item_10', 1),
('item_15', 1), ('item_20', 0)
]
ability = model.estimate_student_ability('student_new', student_responses)
print(f"Estimated ability: {ability:.3f}") # es: 0.342
# Seleziona il prossimo item
next_item = model.next_item_cat('student_new', [f'item_{i}' for i in range(50)])
print(f"Next optimal item: {next_item}")
2. Urmărirea cunoștințelor bayesiene: modelarea progresiei în timp
În timp ce IRT fotografieză capacitatea unui student într-o clipă, Urmărirea cunoștințelor bayesiene (BKT) modelează modul în care cunoștințele se schimbă în timp prin învățare. Introdus de Corbett și Anderson în 1994, BKT rămâne un pilon de bază în sistemele adaptive datorită interpretabilității sale.
BKT este un model Markov ascuns cu patru parametri pentru componenta cunoașterii (KC):
- P(L0): probabilitatea inițială ca elevul să cunoască KC
- P(T): probabilitatea de tranziție (învățarea KC după o oportunitate de practică)
- P(G): probabilitatea de a ghici (răspunde corect deși nu știe)
- P(S): probabilitatea de alunecare (răspunde incorect în ciuda faptului că știi)
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class BKTParams:
"""Parametri BKT per un knowledge component."""
p_learn: float # P(L0) - prior knowledge
p_transit: float # P(T) - learning rate
p_guess: float # P(G) - guess rate
p_slip: float # P(S) - slip rate
kc_id: str # Knowledge Component ID
class BKTTracker:
"""
Bayesian Knowledge Tracing tracker per singolo studente.
Aggiorna la probabilità di padronanza dopo ogni risposta.
"""
def __init__(self, params: BKTParams):
self.params = params
self.p_mastery = params.p_learn # Stima corrente di padronanza
self.history = []
def update(self, is_correct: bool) -> float:
"""
Aggiorna la stima di padronanza dopo una risposta.
Restituisce la nuova probabilità di padronanza.
Passi:
1. Calcola P(correct | mastery) e P(correct | !mastery)
2. Applica Bayes per aggiornare P(mastery | correct/incorrect)
3. Applica la transizione di apprendimento
"""
p = self.p_mastery
# Step 1: Calcola probabilità di osservare la risposta
if is_correct:
# P(correct | mastery) = 1 - P(slip)
p_obs_given_know = 1 - self.params.p_slip
# P(correct | !mastery) = P(guess)
p_obs_given_not = self.params.p_guess
else:
# P(incorrect | mastery) = P(slip)
p_obs_given_know = self.params.p_slip
# P(incorrect | !mastery) = 1 - P(guess)
p_obs_given_not = 1 - self.params.p_guess
# Step 2: Posterior di padronanza (formula di Bayes)
p_correct_total = (
p_obs_given_know * p +
p_obs_given_not * (1 - p)
)
# Evita divisione per zero
if p_correct_total < 1e-10:
p_posterior = p
else:
p_posterior = (p_obs_given_know * p) / p_correct_total
# Step 3: Applica transizione di apprendimento
p_new = p_posterior + (1 - p_posterior) * self.params.p_transit
self.p_mastery = p_new
self.history.append({
'response': is_correct,
'p_mastery_before': p,
'p_mastery_after': p_new
})
return p_new
def is_mastered(self, threshold: float = 0.95) -> bool:
"""Ritorna True se la padronanza supera la soglia."""
return self.p_mastery >= threshold
def recommended_action(self) -> str:
"""Raccomanda l'azione successiva basata sullo stato corrente."""
p = self.p_mastery
if p >= 0.95:
return 'advance' # Procedi al KC successivo
elif p >= 0.70:
return 'practice' # Pratica ulteriore
elif p >= 0.40:
return 'hint' # Offri un hint
else:
return 'remediation' # Torna ai prerequisiti
class BKTSystem:
"""Sistema BKT multi-studente multi-KC."""
def __init__(self, kc_params: dict):
"""
kc_params: {kc_id: BKTParams}
"""
self.kc_params = kc_params
self.trackers = {} # {(student_id, kc_id): BKTTracker}
def get_tracker(self, student_id: str, kc_id: str) -> BKTTracker:
"""Recupera o crea un tracker per student/KC."""
key = (student_id, kc_id)
if key not in self.trackers:
if kc_id not in self.kc_params:
raise ValueError(f"KC {kc_id} non trovato")
self.trackers[key] = BKTTracker(self.kc_params[kc_id])
return self.trackers[key]
def process_response(
self,
student_id: str,
kc_id: str,
is_correct: bool
) -> dict:
"""Processa una risposta e restituisce lo stato aggiornato."""
tracker = self.get_tracker(student_id, kc_id)
p_mastery = tracker.update(is_correct)
return {
'student_id': student_id,
'kc_id': kc_id,
'p_mastery': round(p_mastery, 4),
'is_mastered': tracker.is_mastered(),
'recommended_action': tracker.recommended_action()
}
def get_mastery_profile(self, student_id: str) -> dict:
"""Restituisce il profilo completo di padronanza di uno studente."""
profile = {}
for (sid, kc_id), tracker in self.trackers.items():
if sid == student_id:
profile[kc_id] = {
'p_mastery': round(tracker.p_mastery, 4),
'is_mastered': tracker.is_mastered(),
'attempts': len(tracker.history)
}
return profile
# Esempio configurazione sistema
kc_params = {
'algebra_linear': BKTParams(
p_learn=0.20, # 20% conosce gia
p_transit=0.15, # 15% impara per ogni pratica
p_guess=0.25, # 25% probabilità di indovinare
p_slip=0.10, # 10% probabilità di sbagliare pur sapendo
kc_id='algebra_linear'
),
'calcolo_derivate': BKTParams(
p_learn=0.10,
p_transit=0.12,
p_guess=0.15,
p_slip=0.08,
kc_id='calcolo_derivate'
)
}
bkt_system = BKTSystem(kc_params)
# Simula una sessione di apprendimento
responses_sequence = [False, False, True, True, True, True, False, True]
for i, correct in enumerate(responses_sequence):
result = bkt_system.process_response('student_42', 'algebra_linear', correct)
print(f"Risposta {i+1}: {'corretta' if correct else 'errata'} -> "
f"P(mastery)={result['p_mastery']:.3f}, "
f"Azione: {result['recommended_action']}")
3. Urmărirea cunoștințelor profunde: abordarea neuronală
Il Urmărirea cunoștințelor profunde (DKT), introdus de Piech et al. în 2015, înlocuiește ipotezele simplificatoare ale BKT cu o rețea neuronală recurentă capabilă să capteze dependențe complex între componentele cunoaşterii. DKT realizează o acuratețe cu 20-30% mai mare decât BKT pe seturile de date real conform cercetărilor din 2025 (87,5% precizie predicție în studii recente).
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
class DKTModel(nn.Module):
"""
Deep Knowledge Tracing con LSTM.
Input: sequenza di (item_id, risposta) encodata
Output: probabilità di risposta corretta per ogni item
"""
def __init__(
self,
n_items: int,
hidden_size: int = 128,
n_layers: int = 2,
dropout: float = 0.2
):
super().__init__()
self.n_items = n_items
self.hidden_size = hidden_size
# Input encoding: item_id * 2 (correct/incorrect)
# Ogni item ha 2 rappresentazioni: risposta corretta e risposta errata
self.input_size = n_items * 2
self.lstm = nn.LSTM(
input_size=self.input_size,
hidden_size=hidden_size,
num_layers=n_layers,
dropout=dropout if n_layers > 1 else 0,
batch_first=True
)
self.output_layer = nn.Sequential(
nn.Linear(hidden_size, n_items),
nn.Sigmoid()
)
def encode_input(
self,
item_ids: torch.Tensor,
responses: torch.Tensor
) -> torch.Tensor:
"""
Encoding one-hot per (item, response).
item_ids: (batch, seq_len) - ID degli item
responses: (batch, seq_len) - 0 o 1
Restituisce: (batch, seq_len, n_items * 2)
"""
batch_size, seq_len = item_ids.shape
x = torch.zeros(batch_size, seq_len, self.input_size)
# Item risposto correttamente: posizione item_id
# Item risposto erroneamente: posizione item_id + n_items
for b in range(batch_size):
for t in range(seq_len):
iid = item_ids[b, t].item()
r = responses[b, t].item()
if r == 1:
x[b, t, iid] = 1.0
else:
x[b, t, iid + self.n_items] = 1.0
return x
def forward(
self,
item_ids: torch.Tensor,
responses: torch.Tensor
) -> torch.Tensor:
"""
Forward pass.
Restituisce predizioni per il prossimo item in sequenza.
Output shape: (batch, seq_len, n_items)
"""
x = self.encode_input(item_ids, responses)
lstm_out, _ = self.lstm(x)
predictions = self.output_layer(lstm_out)
return predictions
class DKTDataset(Dataset):
"""Dataset per DKT - sequenze di interazioni studente."""
def __init__(
self,
sequences: list,
max_seq_len: int = 200,
pad_value: int = -1
):
"""
sequences: lista di dizionari
{'item_ids': [...], 'responses': [...]}
"""
self.sequences = sequences
self.max_seq_len = max_seq_len
self.pad_value = pad_value
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
seq = self.sequences[idx]
item_ids = seq['item_ids'][:self.max_seq_len]
responses = seq['responses'][:self.max_seq_len]
seq_len = len(item_ids)
# Padding
pad_len = self.max_seq_len - seq_len
item_ids = item_ids + [0] * pad_len
responses = responses + [0] * pad_len
mask = [1] * seq_len + [0] * pad_len
return {
'item_ids': torch.tensor(item_ids, dtype=torch.long),
'responses': torch.tensor(responses, dtype=torch.float),
'mask': torch.tensor(mask, dtype=torch.bool),
'seq_len': seq_len
}
def train_dkt(
model: DKTModel,
train_loader: DataLoader,
val_loader: DataLoader,
n_epochs: int = 50,
lr: float = 1e-3
) -> dict:
"""Training loop per DKT con early stopping."""
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.BCELoss(reduction='none')
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=5, factor=0.5
)
best_val_auc = 0.0
history = {'train_loss': [], 'val_auc': []}
for epoch in range(n_epochs):
model.train()
total_loss = 0.0
for batch in train_loader:
item_ids = batch['item_ids']
responses = batch['responses']
mask = batch['mask']
# Predici su t-1, target su t
# input: sequenza[:-1], target: sequenza[1:]
pred = model(item_ids[:, :-1], responses[:, :-1])
# Raccoglie le predizioni per gli item effettivi
target_items = item_ids[:, 1:]
target_responses = responses[:, 1:]
target_mask = mask[:, 1:]
# Estrai predizione per l'item corretto
batch_size, seq_len, n_items = pred.shape
pred_flat = pred.reshape(-1, n_items)
target_items_flat = target_items.reshape(-1)
pred_selected = pred_flat.gather(
1, target_items_flat.unsqueeze(1)
).squeeze(1)
# Loss mascherata (ignora padding)
loss = criterion(
pred_selected,
target_responses.reshape(-1)
)
mask_flat = target_mask.reshape(-1).float()
loss = (loss * mask_flat).sum() / mask_flat.sum()
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
history['train_loss'].append(avg_loss)
# Validation
val_auc = evaluate_dkt(model, val_loader)
history['val_auc'].append(val_auc)
scheduler.step(1 - val_auc) # Minimizza 1-AUC
if val_auc > best_val_auc:
best_val_auc = val_auc
torch.save(model.state_dict(), 'best_dkt_model.pt')
print(f"Epoch {epoch+1}/{n_epochs} - "
f"Loss: {avg_loss:.4f}, Val AUC: {val_auc:.4f}")
return history
4. Inginerie caracteristică pentru semnalele de învățare
Calitatea caracteristicilor este adesea mai crucială decât arhitectura modelului. Un sistem adaptativ în producție trebuie să depășească perechea simplă (articol, corect/incorect) și să surprindă contextul bogat în interacțiune de învățare.
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
import math
@dataclass
class LearningInteraction:
"""Una singola interazione di apprendimento."""
student_id: str
item_id: str
kc_ids: list # Knowledge components coinvolti
is_correct: bool
response_time_ms: int # Tempo di risposta in millisecondi
timestamp: datetime
hint_used: bool = False
attempt_number: int = 1
confidence_level: Optional[int] = None # 1-5 se richiesto
@dataclass
class StudentFeatures:
"""Feature aggregate per uno studente su un KC."""
student_id: str
kc_id: str
# Accuracy features
total_attempts: int = 0
correct_count: int = 0
recent_accuracy: float = 0.0 # Ultimi 5 tentativi
streak_correct: int = 0 # Sequenza corrente di corrette
streak_incorrect: int = 0
# Time features
avg_response_time: float = 0.0
response_time_trend: float = 0.0 # Positivo = rallentamento
time_since_last_practice: float = 0.0 # In ore
# Engagement features
hint_rate: float = 0.0 # Proporzione con hint
first_attempt_accuracy: float = 0.0 # Solo primi tentativi
# Memory features (Ebbinghaus forgetting curve)
estimated_retention: float = 1.0
class FeatureExtractor:
"""
Estrae feature per il sistema adattivo da sequenze di interazioni.
"""
def __init__(self, decay_factor: float = 0.3):
"""
decay_factor: controlla la decadenza della curva di dimenticanza
"""
self.decay_factor = decay_factor
def ebbinghaus_retention(
self,
days_since_last: float,
memory_strength: float = 1.0
) -> float:
"""
Modello di dimenticanza di Ebbinghaus.
R = exp(-t / (S * stability_factor))
"""
if days_since_last <= 0:
return 1.0
stability = max(memory_strength, 0.1)
return math.exp(-days_since_last / (stability * 30))
def compute_features(
self,
student_id: str,
kc_id: str,
interactions: list
) -> StudentFeatures:
"""
Calcola le feature per uno studente su un KC.
interactions: lista di LearningInteraction ordinate per timestamp
"""
kc_interactions = [
i for i in interactions
if kc_id in i.kc_ids and i.student_id == student_id
]
if not kc_interactions:
return StudentFeatures(student_id=student_id, kc_id=kc_id)
features = StudentFeatures(student_id=student_id, kc_id=kc_id)
# Accuracy
features.total_attempts = len(kc_interactions)
features.correct_count = sum(1 for i in kc_interactions if i.is_correct)
# Recent accuracy (finestra ultimi 5)
recent = kc_interactions[-5:]
if recent:
features.recent_accuracy = sum(1 for i in recent if i.is_correct) / len(recent)
# Streak
streak_c = 0
streak_i = 0
for interaction in reversed(kc_interactions):
if interaction.is_correct:
if streak_i == 0:
streak_c += 1
else:
break
else:
if streak_c == 0:
streak_i += 1
else:
break
features.streak_correct = streak_c
features.streak_incorrect = streak_i
# Response time
times = [i.response_time_ms for i in kc_interactions]
features.avg_response_time = sum(times) / len(times)
# Trend risposta (positivo = rallentamento)
if len(times) >= 3:
recent_avg = sum(times[-3:]) / 3
older_avg = sum(times[:-3]) / max(len(times) - 3, 1)
features.response_time_trend = (recent_avg - older_avg) / max(older_avg, 1)
# Tempo dall'ultima pratica
last_practice = kc_interactions[-1].timestamp
now = datetime.now()
hours_since = (now - last_practice).total_seconds() / 3600
features.time_since_last_practice = hours_since
# Hint rate
features.hint_rate = sum(1 for i in kc_interactions if i.hint_used) / len(kc_interactions)
# First attempt accuracy
first_attempts = [i for i in kc_interactions if i.attempt_number == 1]
if first_attempts:
features.first_attempt_accuracy = (
sum(1 for i in first_attempts if i.is_correct) / len(first_attempts)
)
# Retention stimata
days_since = hours_since / 24
memory_strength = features.first_attempt_accuracy * 2 # 0-2
features.estimated_retention = self.ebbinghaus_retention(days_since, memory_strength)
return features
def compute_next_review_time(self, features: StudentFeatures) -> datetime:
"""
Calcola il momento ottimale per la prossima revisione (spaced repetition).
Ispirato all'algoritmo SM-2 di SuperMemo.
"""
accuracy = features.first_attempt_accuracy
attempts = features.total_attempts
# Intervallo di base in giorni
if attempts <= 1:
interval_days = 1
elif attempts == 2:
interval_days = 3
else:
# Ease factor basato sull'accuracy
ease = 1.3 + (2.5 - 1.3) * accuracy
interval_days = max(1, round(
features.time_since_last_practice / 24 * ease
))
return datetime.now() + timedelta(days=interval_days)
5. Arhitectura Sistemului de Productie
Un sistem adaptiv în producție trebuie să gestioneze sute de solicitări pe secundă cu latențe mai jos cei 100 ms. Următoarea arhitectură separă calea online (servire în timp real) dincale offline (antrenament pe lot și calibrare).
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
import aioredis
import asyncpg
app = FastAPI(title="Adaptive Learning API")
# ---- Modelli Pydantic ----
class ResponseRequest(BaseModel):
student_id: str
item_id: str
is_correct: bool
response_time_ms: int
hint_used: bool = False
class NextItemResponse(BaseModel):
item_id: str
estimated_difficulty: float
reason: str
mastery_state: dict
# ---- Servizi ----
class AdaptiveEngine:
"""
Engine principale per raccomandazioni adattive.
Usa Redis per feature cache e PostgreSQL per persistenza.
"""
def __init__(self, redis_client, db_pool):
self.redis = redis_client
self.db = db_pool
self.bkt_system = None # Inizializzato al startup
self.irt_model = None
async def process_response(
self,
request: ResponseRequest,
background_tasks: BackgroundTasks
) -> dict:
"""
Pipeline di processing per una risposta.
1. Persiste la risposta nel DB (async)
2. Aggiorna lo stato BKT dalla cache Redis
3. Seleziona il prossimo item
4. Schedula aggiornamento features in background
"""
student_id = request.student_id
item_id = request.item_id
# 1. Get KCs per questo item
kc_ids = await self.get_item_kcs(item_id)
# 2. Aggiorna BKT per ogni KC (via Redis)
mastery_updates = {}
for kc_id in kc_ids:
cache_key = f"bkt:{student_id}:{kc_id}"
# Recupera stato dalla cache
cached = await self.redis.get(cache_key)
if cached:
p_mastery = float(cached)
else:
# Fallback al DB
p_mastery = await self.get_mastery_from_db(student_id, kc_id)
# Aggiorna con nuova risposta
tracker = BKTTracker(await self.get_bkt_params(kc_id))
tracker.p_mastery = p_mastery
new_mastery = tracker.update(request.is_correct)
# Salva in Redis con TTL 24h
await self.redis.setex(cache_key, 86400, str(new_mastery))
mastery_updates[kc_id] = new_mastery
# 3. Seleziona prossimo item
next_item = await self.select_next_item(student_id, mastery_updates)
# 4. Background: persisti nel DB
background_tasks.add_task(
self.persist_interaction,
request,
mastery_updates
)
return {
'mastery_updates': mastery_updates,
'next_item': next_item
}
async def select_next_item(
self,
student_id: str,
mastery_updates: dict
) -> dict:
"""
Seleziona il prossimo item usando una strategia multi-obiettivo:
- Bilancia apprendimento di nuovi KC vs rinforzo di quelli deboli
- Considera la curva di dimenticanza
- Rispetta il curriculum graph
"""
# Ottieni profilo completo del curriculum
curriculum = await self.get_curriculum_graph(student_id)
# Calcola priorità per ogni KC disponibile
priorities = {}
for kc_id, kc_data in curriculum.items():
mastery = mastery_updates.get(kc_id, kc_data.get('mastery', 0.0))
# Priorità alta per KC quasi-mastered (0.7-0.9) - spingili oltre la soglia
if 0.70 <= mastery < 0.95:
priority = 2.0 * (mastery - 0.70) / 0.25
# Priorità media per KC molto deboli - ma non sopraffatti
elif mastery < 0.30:
priority = 0.5
# KC mastered - bassa priorità, solo spaced repetition
elif mastery >= 0.95:
retention = kc_data.get('estimated_retention', 1.0)
priority = (1 - retention) * 0.3
else:
priority = 1.0
priorities[kc_id] = priority
# Seleziona KC con più alta priorità
if not priorities:
raise HTTPException(status_code=404, detail="No available KCs")
target_kc = max(priorities, key=priorities.get)
# Seleziona item per questo KC usando IRT
available_items = await self.get_items_for_kc(target_kc)
student_ability = await self.get_student_ability(student_id)
# CAT selection
optimal_item = self.irt_model.next_item_cat(
student_id, available_items, strategy='max_information'
)
item_params = self.irt_model.item_params.get(optimal_item, {'a': 1.0, 'b': 0.0})
return {
'item_id': optimal_item,
'kc_id': target_kc,
'estimated_difficulty': item_params.get('b', 0.0),
'student_ability': student_ability,
'mastery_before': mastery_updates.get(target_kc, 0.0),
'priority_reason': 'near_mastery' if priorities[target_kc] > 1.5 else 'standard'
}
async def get_item_kcs(self, item_id: str) -> list:
"""Recupera i KC associati a un item dal DB."""
async with self.db.acquire() as conn:
rows = await conn.fetch(
"SELECT kc_id FROM item_kc_mapping WHERE item_id = $1",
item_id
)
return [row['kc_id'] for row in rows]
# ---- Route FastAPI ----
@app.post("/api/v1/adaptive/response")
async def process_response(
request: ResponseRequest,
background_tasks: BackgroundTasks,
engine: AdaptiveEngine = None # Dipendenza iniettata
) -> NextItemResponse:
"""
Processa una risposta e restituisce il prossimo item adattivo.
Target latency: <100ms (p95)
"""
result = await engine.process_response(request, background_tasks)
next_item = result['next_item']
return NextItemResponse(
item_id=next_item['item_id'],
estimated_difficulty=next_item['estimated_difficulty'],
reason=next_item['priority_reason'],
mastery_state=result['mastery_updates']
)
6. Testarea A/B pentru algoritmi adaptivi
Validarea unui nou algoritm în producție necesită un cadru de testare A/B care respectă particularitățile educației: elevii nu sunt interschimbabili, efectele învățării sunt cumulative, iar valorile relevante depășesc rata de clic.
import hashlib
from enum import Enum
from typing import Callable
import scipy.stats as stats
class ExperimentVariant(Enum):
CONTROL = "control"
TREATMENT = "treatment"
class ABTestManager:
"""
Framework A/B testing per algoritmi adattivi.
Usa student_id hashing per assegnazione deterministica.
"""
def __init__(self, experiment_id: str, treatment_fraction: float = 0.5):
self.experiment_id = experiment_id
self.treatment_fraction = treatment_fraction
self.metrics = {
ExperimentVariant.CONTROL: [],
ExperimentVariant.TREATMENT: []
}
def assign_variant(self, student_id: str) -> ExperimentVariant:
"""
Assegna uno studente a control o treatment in modo deterministico.
Lo stesso studente riceve sempre la stessa variante (no leakage).
"""
hash_input = f"{self.experiment_id}:{student_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
bucket = (hash_value % 100) / 100.0
return (
ExperimentVariant.TREATMENT
if bucket < self.treatment_fraction
else ExperimentVariant.CONTROL
)
def record_metric(
self,
student_id: str,
metric_value: float,
metric_name: str = 'mastery_gain'
):
"""Registra una metrica per uno studente."""
variant = self.assign_variant(student_id)
self.metrics[variant].append(metric_value)
def analyze_results(self) -> dict:
"""
Analisi statistica con Welch's t-test.
Adatto a campioni di dimensioni diverse.
"""
control = self.metrics[ExperimentVariant.CONTROL]
treatment = self.metrics[ExperimentVariant.TREATMENT]
if len(control) < 30 or len(treatment) < 30:
return {'error': 'Campione insufficiente (min 30 per variante)'}
t_stat, p_value = stats.ttest_ind(
treatment, control, equal_var=False
)
control_mean = sum(control) / len(control)
treatment_mean = sum(treatment) / len(treatment)
relative_lift = (treatment_mean - control_mean) / max(abs(control_mean), 1e-9)
# Cohen's d per effect size
pooled_std = (
(sum((x - control_mean)**2 for x in control) +
sum((x - treatment_mean)**2 for x in treatment)) /
(len(control) + len(treatment) - 2)
) ** 0.5
cohens_d = (treatment_mean - control_mean) / max(pooled_std, 1e-9)
return {
'experiment_id': self.experiment_id,
'control_n': len(control),
'treatment_n': len(treatment),
'control_mean': round(control_mean, 4),
'treatment_mean': round(treatment_mean, 4),
'relative_lift': round(relative_lift * 100, 2), # Percentuale
'p_value': round(p_value, 4),
'statistically_significant': p_value < 0.05,
'cohens_d': round(cohens_d, 3),
'practical_significance': abs(cohens_d) > 0.2,
'recommendation': (
'Deploy treatment' if p_value < 0.05 and cohens_d > 0.2
else 'Keep control' if p_value < 0.05 and cohens_d < -0.2
else 'No significant difference'
)
}
# Metriche chiave per A/B testing in EdTech
EDTECH_METRICS = [
'mastery_gain_per_session', # Incremento P(mastery) per sessione
'time_to_mastery', # Minuti necessari per raggiungere P(mastery)>=0.95
'session_completion_rate', # % sessioni completate
'return_rate_7d', # % studenti che tornano entro 7 giorni
'assessment_accuracy_post', # Accuratezza nei test formali post-training
]
7. Monitorizare și detecție a derivă
Modelele educaționale sunt supuse deriva conceptului din diverse motive: schimbarea de curriculum, sezonalitate academică, actualizarea materialului didactic. Un sistem de monitorizare robust și esențial pentru menținerea calității predicțiilor în timp.
Semnale de deriva în sistemele adaptive
- Deriva de precizie: AUC-ul modelului DKT scade sub 0,70 în perioada curentă
- Deriva de calibrare: compartimentele de probabilitate nu se potrivesc cu preciziile observate
- Deriva caracteristică: Distribuția timpilor de răspuns se modifică semnificativ
- Schimbarea etichetei: proporția de răspunsuri corecte se modifică pentru cohortele stabile
from collections import deque
import numpy as np
class ModelMonitor:
"""
Monitor per rilevamento drift in modelli adattivi.
Implementa ADWIN (Adaptive Windowing) per concept drift detection.
"""
def __init__(
self,
window_size: int = 1000,
drift_threshold: float = 0.02,
alert_callback: Callable = None
):
self.window = deque(maxlen=window_size)
self.drift_threshold = drift_threshold
self.alert_callback = alert_callback
self.baseline_auc = None
def add_prediction(self, predicted_prob: float, actual_label: int):
"""Aggiunge una predizione al buffer di monitoraggio."""
self.window.append((predicted_prob, actual_label))
if len(self.window) >= 100:
self._check_drift()
def _check_drift(self):
"""Verifica il drift confrontando finestre recenti vs storiche."""
window_list = list(self.window)
n = len(window_list)
half = n // 2
# Calcola AUC su prima e seconda meta
first_half = window_list[:half]
second_half = window_list[half:]
auc_first = self._compute_auc(first_half)
auc_second = self._compute_auc(second_half)
drift_magnitude = abs(auc_first - auc_second)
if drift_magnitude > self.drift_threshold:
alert = {
'type': 'concept_drift',
'auc_old': round(auc_first, 4),
'auc_new': round(auc_second, 4),
'magnitude': round(drift_magnitude, 4),
'recommended_action': (
'retrain' if drift_magnitude > 0.10
else 'monitor_closely'
)
}
if self.alert_callback:
self.alert_callback(alert)
def _compute_auc(self, predictions: list) -> float:
"""Calcola AUC-ROC per una lista di (predicted_prob, actual_label)."""
if not predictions:
return 0.5
sorted_preds = sorted(predictions, key=lambda x: x[0], reverse=True)
n_pos = sum(1 for _, y in predictions if y == 1)
n_neg = len(predictions) - n_pos
if n_pos == 0 or n_neg == 0:
return 0.5
tpr_points = []
fpr_points = []
tp = fp = 0
for prob, label in sorted_preds:
if label == 1:
tp += 1
else:
fp += 1
tpr_points.append(tp / n_pos)
fpr_points.append(fp / n_neg)
# Trapezoidal integration
auc = 0.0
for i in range(1, len(fpr_points)):
auc += (fpr_points[i] - fpr_points[i-1]) * (tpr_points[i] + tpr_points[i-1]) / 2
return auc
Anti-pattern: optimizarea valorii greșite
Cel mai insidios pericol în sistemele adaptive este optimizarea pentru valorile de implicare (timp pe platformă, clicuri, sesiuni) mai degrabă decât pentru rezultatele reale ale învățării. Un sistem care maximizează timpul pe platformă ar putea crea o buclă ușoară satisfacţii care ţin elevii în zona de confort în loc să-i împingă spre măiestrie. Definiți valorile de succes în termeni pedagogici înainte de a construi.
Cele mai bune practici pentru sistemele adaptive în producție
- Pornire la rece: Pentru studenții noi fără antecedente, utilizați o scurtă evaluare de diagnosticare de 5-10 itemi înainte de a activa algoritmul adaptiv. Un prealabil plat duce la secvențe invalid din punct de vedere pedagogic.
- Interpretabilitate: Implementați întotdeauna o explicație lizibilă pentru fiecare recomandare. „Sugerăm acest exercițiu pentru că aveți dificultăți cu derivarea de funcţii compuse” şi esenţiale pentru încrederea elevilor.
- Diversitate: Evitați să recomandați întotdeauna același tip de articol. Integrați un parametru de diversitate în funcția de selecție pentru a preveni efectul de „bule de filtrare”.
- Constrângeri curriculare: Graficul curriculum-ului trebuie să constrângă recomandările. Nu propune integrale unui student care nu a stăpânit încă derivate, independent din ceea ce sugerează algoritmul.
- Supravegherea umană: Profesorii trebuie să poată trece peste recomandări și primiți rapoarte despre progresul clasei.
Concluzii
Algoritmii de învățare adaptivă reprezintă unul dintre cele mai fascinante domenii la intersecția învățării automate și a științelor educaționale. Din teoria matematică a IRT la arhitectură scalabilă cu Redis și FastAPI, călătoria de la cercetare la producție necesită atenție la validitatea pedagogică, precum și la eficiența computațională.
Rezultatele sunt concrete: sistemele bine proiectate prezintă îmbunătățiri de 20-40% în timpul până la stăpânire în comparație cu traseele liniare statice. Odată cu avansarea modelelor neuronale precum DKT și integrările cu LLM, domeniul evoluează rapid către tutorat din ce în ce mai sofisticate personalizate.
Articole similare din seria EdTech
- Articolul 00: Arhitectură LMS scalabilă: model multi-chiriași
- Articolul 04: Tutor Personalizat cu LLM și RAG
- Articolul 06: Learning Analytics cu xAPI și Kafka







