Algorytmy uczenia się adaptacyjnego: od teorii do produkcji
Marzenie o osobistym nauczycielu dla każdego ucznia, kimś, kto na bieżąco kalibruje poziom trudności, identyfikuje luki w wiedzy i oferuje właściwe treści we właściwym czasie już dziś osiągalne dzięki Algorytmy adaptacyjnego uczenia się. To nie science fiction: platformy jak Khan Academy, Duolingo i Coursera służą milionom uczniów za pomocą specjalnie generowanych ścieżek algorytmicznie w czasie rzeczywistym.
Wyzwanie nie jest teoretyczne, lecz inżynieryjne. Jak wdrożyć system Teoria odpowiedzi na przedmiot (IRT) skalowalny do miliona uczniów bez pogorszenia opóźnienia? Jak zintegrować model Śledzenie wiedzy w procesie produkcyjnego uczenia maszynowego z ciągłym monitorowaniem i testami A/B? Jak zrównoważyć eksplorację i eksploatację w systemie rekomendacyjnym, który musi spełniać jedno i drugie dokładne i uzasadnione pedagogicznie?
W tym artykule odpowiadamy na te pytania, przedstawiając konkretny kod, skalowalne architektury i lekcje wyciągnięte z systemów produkcyjnych. Zaczniemy od matematyki IRT, przejdziemy przez Wiedzę Bayesowską Tracing i Deep Knowledge Tracing, aby uzyskać kompletny system z potokiem funkcji i modelem wdrożone środowisko testów A/B.
Czego się nauczysz
- Matematyczne podstawy teorii odpowiedzi na elementy (IRT) i sposoby jej implementacji w Pythonie
- Śledzenie wiedzy Bayesa (BKT) a śledzenie głębokiej wiedzy (DKT): Kiedy którego używać
- Inżynieria cech sygnałów uczących się: czas, pewność, błędy sekwencyjne
- Architektura adaptacyjnego silnika rekomendacji z FastAPI i PostgreSQL
- Struktura testów A/B do sprawdzania algorytmów w środowisku produkcyjnym
- Monitorowanie i wykrywanie dryftu dla modeli edukacyjnych
1. Teoria reakcji na pozycje: model podstawowy
L'Teoria odpowiedzi na przedmiot (IRT) oraz matematyczne podstawy współczesnego pomiaru edukacyjnego. Wprowadzony w latach 60. XX wieku przez Georga Rascha i Lorda, IRT modeluje prawdopodobieństwo, że uczeń odpowie poprawnie do przedmiotu (pytania) w zależności od jego ukrytych zdolności i charakterystyki przedmiotu.
W produkcji najczęściej stosowany jest model 2PL (Two-Parameter Logistic):
P(X=1 | theta, a, b) = 1 / (1 + exp(-a * (theta - b)))
Gdzie theta i możliwości ucznia, a oraz parametr dyskryminacji
(jak dobrze przedmiot rozróżnia uczniów o różnych zdolnościach) e b i parametr
trudność (wartość theta, dla której prawdopodobieństwo poprawnej odpowiedzi wynosi 0,5).
import numpy as np
from scipy.optimize import minimize
from scipy.special import expit # sigmoid function
class IRTModel2PL:
"""
Two-Parameter Logistic IRT Model.
Calibra difficulty (b) e discrimination (a) per ogni item.
Stima l'ability (theta) per ogni studente.
"""
def __init__(self):
self.item_params = {} # {item_id: {'a': float, 'b': float}}
self.student_abilities = {} # {student_id: float}
def probability_correct(self, theta: float, a: float, b: float) -> float:
"""P(correct | theta, a, b) usando il modello 2PL."""
return expit(a * (theta - b))
def log_likelihood_student(self, theta: float, responses: list) -> float:
"""
Log-likelihood per uno studente data la sequenza di risposte.
responses: lista di tuple (item_id, is_correct)
"""
ll = 0.0
for item_id, is_correct in responses:
if item_id not in self.item_params:
continue
a = self.item_params[item_id]['a']
b = self.item_params[item_id]['b']
p = self.probability_correct(theta, a, b)
# Clip per stabilità numerica
p = np.clip(p, 1e-9, 1 - 1e-9)
ll += is_correct * np.log(p) + (1 - is_correct) * np.log(1 - p)
return ll
def estimate_student_ability(
self,
student_id: str,
responses: list,
prior_mean: float = 0.0,
prior_std: float = 1.0
) -> float:
"""
MAP estimation dell'ability di uno studente.
Usa prior Gaussiano N(0,1) per regolarizzazione.
"""
def negative_map(theta_array):
theta = theta_array[0]
ll = self.log_likelihood_student(theta, responses)
# Log prior Gaussiano
log_prior = -0.5 * ((theta - prior_mean) / prior_std) ** 2
return -(ll + log_prior)
result = minimize(
negative_map,
x0=[0.0],
method='L-BFGS-B',
bounds=[(-4.0, 4.0)]
)
ability = result.x[0]
self.student_abilities[student_id] = ability
return ability
def next_item_cat(
self,
student_id: str,
available_items: list,
strategy: str = 'max_information'
) -> str:
"""
Computerized Adaptive Testing: seleziona il prossimo item ottimale.
Strategie disponibili:
- 'max_information': massimizza l'informazione di Fisher al theta corrente
- 'target_difficulty': seleziona item con b vicino a theta
"""
theta = self.student_abilities.get(student_id, 0.0)
best_item = None
best_score = -np.inf
for item_id in available_items:
if item_id not in self.item_params:
continue
a = self.item_params[item_id]['a']
b = self.item_params[item_id]['b']
if strategy == 'max_information':
# Informazione di Fisher per il modello 2PL
p = self.probability_correct(theta, a, b)
q = 1 - p
fisher_info = (a ** 2) * p * q
score = fisher_info
elif strategy == 'target_difficulty':
# Item più vicino al livello dello studente
score = -abs(b - theta)
if score > best_score:
best_score = score
best_item = item_id
return best_item
def calibrate_items_mle(self, response_matrix: np.ndarray) -> None:
"""
Calibra i parametri degli item usando Maximum Likelihood.
response_matrix: (n_students, n_items) con valori 0/1
Implementazione semplificata - in produzione usa py-irt o mirt.
"""
# Stima iniziale delle abilita con proporzione di risposte corrette
n_students, n_items = response_matrix.shape
abilities = np.zeros(n_students)
for iteration in range(20): # EM iterations
# E-step: aggiorna abilita dato params items
for s in range(n_students):
responses = [
(f'item_{i}', int(response_matrix[s, i]))
for i in range(n_items)
if f'item_{i}' in self.item_params
]
if responses:
abilities[s] = self.estimate_student_ability(
f'student_{s}', responses
)
# M-step: aggiorna params items dato abilita
for i in range(n_items):
item_id = f'item_{i}'
# Stima semplificata: in produzione usa marginal MLE
p_mean = response_matrix[:, i].mean()
b_estimate = -np.log(p_mean / (1 - p_mean + 1e-9))
self.item_params[item_id] = {
'a': 1.0, # discrimination iniziale
'b': np.clip(b_estimate, -3.0, 3.0)
}
# Esempio di utilizzo
model = IRTModel2PL()
# Calibra con dati storici
import numpy as np
np.random.seed(42)
responses = np.random.binomial(1, 0.6, (500, 50))
model.calibrate_items_mle(responses)
# Stima ability di un nuovo studente
student_responses = [
('item_0', 1), ('item_5', 0), ('item_10', 1),
('item_15', 1), ('item_20', 0)
]
ability = model.estimate_student_ability('student_new', student_responses)
print(f"Estimated ability: {ability:.3f}") # es: 0.342
# Seleziona il prossimo item
next_item = model.next_item_cat('student_new', [f'item_{i}' for i in range(50)])
print(f"Next optimal item: {next_item}")
2. Bayesowskie śledzenie wiedzy: modelowanie postępu w czasie
Podczas gdy IRT fotografuje umiejętności ucznia w mgnieniu oka, Bayesowskie śledzenie wiedzy (BKT) modeluje zmiany wiedzy w czasie poprzez uczenie się. Wprowadzony przez Corbetta i Andersona w 1994 r. BKT pozostaje ostoją systemów adaptacyjnych ze względu na możliwość interpretacji.
BKT to ukryty model Markowa z czterema parametrami komponentu wiedzy (KC):
- P(L0): początkowe prawdopodobieństwo, że uczeń zna KC
- P(T): prawdopodobieństwo przejścia (nauka KC po okazji ćwiczeń)
- P(G): prawdopodobieństwo zgadnięcia (odpowiedź poprawna pomimo niewiedzy)
- P(S): prawdopodobieństwo poślizgu (odpowiedź błędna pomimo wiedzy)
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class BKTParams:
"""Parametri BKT per un knowledge component."""
p_learn: float # P(L0) - prior knowledge
p_transit: float # P(T) - learning rate
p_guess: float # P(G) - guess rate
p_slip: float # P(S) - slip rate
kc_id: str # Knowledge Component ID
class BKTTracker:
"""
Bayesian Knowledge Tracing tracker per singolo studente.
Aggiorna la probabilità di padronanza dopo ogni risposta.
"""
def __init__(self, params: BKTParams):
self.params = params
self.p_mastery = params.p_learn # Stima corrente di padronanza
self.history = []
def update(self, is_correct: bool) -> float:
"""
Aggiorna la stima di padronanza dopo una risposta.
Restituisce la nuova probabilità di padronanza.
Passi:
1. Calcola P(correct | mastery) e P(correct | !mastery)
2. Applica Bayes per aggiornare P(mastery | correct/incorrect)
3. Applica la transizione di apprendimento
"""
p = self.p_mastery
# Step 1: Calcola probabilità di osservare la risposta
if is_correct:
# P(correct | mastery) = 1 - P(slip)
p_obs_given_know = 1 - self.params.p_slip
# P(correct | !mastery) = P(guess)
p_obs_given_not = self.params.p_guess
else:
# P(incorrect | mastery) = P(slip)
p_obs_given_know = self.params.p_slip
# P(incorrect | !mastery) = 1 - P(guess)
p_obs_given_not = 1 - self.params.p_guess
# Step 2: Posterior di padronanza (formula di Bayes)
p_correct_total = (
p_obs_given_know * p +
p_obs_given_not * (1 - p)
)
# Evita divisione per zero
if p_correct_total < 1e-10:
p_posterior = p
else:
p_posterior = (p_obs_given_know * p) / p_correct_total
# Step 3: Applica transizione di apprendimento
p_new = p_posterior + (1 - p_posterior) * self.params.p_transit
self.p_mastery = p_new
self.history.append({
'response': is_correct,
'p_mastery_before': p,
'p_mastery_after': p_new
})
return p_new
def is_mastered(self, threshold: float = 0.95) -> bool:
"""Ritorna True se la padronanza supera la soglia."""
return self.p_mastery >= threshold
def recommended_action(self) -> str:
"""Raccomanda l'azione successiva basata sullo stato corrente."""
p = self.p_mastery
if p >= 0.95:
return 'advance' # Procedi al KC successivo
elif p >= 0.70:
return 'practice' # Pratica ulteriore
elif p >= 0.40:
return 'hint' # Offri un hint
else:
return 'remediation' # Torna ai prerequisiti
class BKTSystem:
"""Sistema BKT multi-studente multi-KC."""
def __init__(self, kc_params: dict):
"""
kc_params: {kc_id: BKTParams}
"""
self.kc_params = kc_params
self.trackers = {} # {(student_id, kc_id): BKTTracker}
def get_tracker(self, student_id: str, kc_id: str) -> BKTTracker:
"""Recupera o crea un tracker per student/KC."""
key = (student_id, kc_id)
if key not in self.trackers:
if kc_id not in self.kc_params:
raise ValueError(f"KC {kc_id} non trovato")
self.trackers[key] = BKTTracker(self.kc_params[kc_id])
return self.trackers[key]
def process_response(
self,
student_id: str,
kc_id: str,
is_correct: bool
) -> dict:
"""Processa una risposta e restituisce lo stato aggiornato."""
tracker = self.get_tracker(student_id, kc_id)
p_mastery = tracker.update(is_correct)
return {
'student_id': student_id,
'kc_id': kc_id,
'p_mastery': round(p_mastery, 4),
'is_mastered': tracker.is_mastered(),
'recommended_action': tracker.recommended_action()
}
def get_mastery_profile(self, student_id: str) -> dict:
"""Restituisce il profilo completo di padronanza di uno studente."""
profile = {}
for (sid, kc_id), tracker in self.trackers.items():
if sid == student_id:
profile[kc_id] = {
'p_mastery': round(tracker.p_mastery, 4),
'is_mastered': tracker.is_mastered(),
'attempts': len(tracker.history)
}
return profile
# Esempio configurazione sistema
kc_params = {
'algebra_linear': BKTParams(
p_learn=0.20, # 20% conosce gia
p_transit=0.15, # 15% impara per ogni pratica
p_guess=0.25, # 25% probabilità di indovinare
p_slip=0.10, # 10% probabilità di sbagliare pur sapendo
kc_id='algebra_linear'
),
'calcolo_derivate': BKTParams(
p_learn=0.10,
p_transit=0.12,
p_guess=0.15,
p_slip=0.08,
kc_id='calcolo_derivate'
)
}
bkt_system = BKTSystem(kc_params)
# Simula una sessione di apprendimento
responses_sequence = [False, False, True, True, True, True, False, True]
for i, correct in enumerate(responses_sequence):
result = bkt_system.process_response('student_42', 'algebra_linear', correct)
print(f"Risposta {i+1}: {'corretta' if correct else 'errata'} -> "
f"P(mastery)={result['p_mastery']:.3f}, "
f"Azione: {result['recommended_action']}")
3. Głębokie śledzenie wiedzy: podejście neuronowe
Il Głębokie śledzenie wiedzy (DKT), wprowadzony przez Piecha i in. w 2015 r. zastępuje upraszczające założenia BKT z rekurencyjną siecią neuronową zdolną do wychwytywania zależności złożony pomiędzy komponentami wiedzy. DKT osiąga o 20-30% większą dokładność niż BKT na zbiorach danych realne według badań z 2025 roku (w ostatnich badaniach trafność przewidywań na poziomie 87,5%).
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
class DKTModel(nn.Module):
"""
Deep Knowledge Tracing con LSTM.
Input: sequenza di (item_id, risposta) encodata
Output: probabilità di risposta corretta per ogni item
"""
def __init__(
self,
n_items: int,
hidden_size: int = 128,
n_layers: int = 2,
dropout: float = 0.2
):
super().__init__()
self.n_items = n_items
self.hidden_size = hidden_size
# Input encoding: item_id * 2 (correct/incorrect)
# Ogni item ha 2 rappresentazioni: risposta corretta e risposta errata
self.input_size = n_items * 2
self.lstm = nn.LSTM(
input_size=self.input_size,
hidden_size=hidden_size,
num_layers=n_layers,
dropout=dropout if n_layers > 1 else 0,
batch_first=True
)
self.output_layer = nn.Sequential(
nn.Linear(hidden_size, n_items),
nn.Sigmoid()
)
def encode_input(
self,
item_ids: torch.Tensor,
responses: torch.Tensor
) -> torch.Tensor:
"""
Encoding one-hot per (item, response).
item_ids: (batch, seq_len) - ID degli item
responses: (batch, seq_len) - 0 o 1
Restituisce: (batch, seq_len, n_items * 2)
"""
batch_size, seq_len = item_ids.shape
x = torch.zeros(batch_size, seq_len, self.input_size)
# Item risposto correttamente: posizione item_id
# Item risposto erroneamente: posizione item_id + n_items
for b in range(batch_size):
for t in range(seq_len):
iid = item_ids[b, t].item()
r = responses[b, t].item()
if r == 1:
x[b, t, iid] = 1.0
else:
x[b, t, iid + self.n_items] = 1.0
return x
def forward(
self,
item_ids: torch.Tensor,
responses: torch.Tensor
) -> torch.Tensor:
"""
Forward pass.
Restituisce predizioni per il prossimo item in sequenza.
Output shape: (batch, seq_len, n_items)
"""
x = self.encode_input(item_ids, responses)
lstm_out, _ = self.lstm(x)
predictions = self.output_layer(lstm_out)
return predictions
class DKTDataset(Dataset):
"""Dataset per DKT - sequenze di interazioni studente."""
def __init__(
self,
sequences: list,
max_seq_len: int = 200,
pad_value: int = -1
):
"""
sequences: lista di dizionari
{'item_ids': [...], 'responses': [...]}
"""
self.sequences = sequences
self.max_seq_len = max_seq_len
self.pad_value = pad_value
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
seq = self.sequences[idx]
item_ids = seq['item_ids'][:self.max_seq_len]
responses = seq['responses'][:self.max_seq_len]
seq_len = len(item_ids)
# Padding
pad_len = self.max_seq_len - seq_len
item_ids = item_ids + [0] * pad_len
responses = responses + [0] * pad_len
mask = [1] * seq_len + [0] * pad_len
return {
'item_ids': torch.tensor(item_ids, dtype=torch.long),
'responses': torch.tensor(responses, dtype=torch.float),
'mask': torch.tensor(mask, dtype=torch.bool),
'seq_len': seq_len
}
def train_dkt(
model: DKTModel,
train_loader: DataLoader,
val_loader: DataLoader,
n_epochs: int = 50,
lr: float = 1e-3
) -> dict:
"""Training loop per DKT con early stopping."""
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.BCELoss(reduction='none')
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=5, factor=0.5
)
best_val_auc = 0.0
history = {'train_loss': [], 'val_auc': []}
for epoch in range(n_epochs):
model.train()
total_loss = 0.0
for batch in train_loader:
item_ids = batch['item_ids']
responses = batch['responses']
mask = batch['mask']
# Predici su t-1, target su t
# input: sequenza[:-1], target: sequenza[1:]
pred = model(item_ids[:, :-1], responses[:, :-1])
# Raccoglie le predizioni per gli item effettivi
target_items = item_ids[:, 1:]
target_responses = responses[:, 1:]
target_mask = mask[:, 1:]
# Estrai predizione per l'item corretto
batch_size, seq_len, n_items = pred.shape
pred_flat = pred.reshape(-1, n_items)
target_items_flat = target_items.reshape(-1)
pred_selected = pred_flat.gather(
1, target_items_flat.unsqueeze(1)
).squeeze(1)
# Loss mascherata (ignora padding)
loss = criterion(
pred_selected,
target_responses.reshape(-1)
)
mask_flat = target_mask.reshape(-1).float()
loss = (loss * mask_flat).sum() / mask_flat.sum()
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
history['train_loss'].append(avg_loss)
# Validation
val_auc = evaluate_dkt(model, val_loader)
history['val_auc'].append(val_auc)
scheduler.step(1 - val_auc) # Minimizza 1-AUC
if val_auc > best_val_auc:
best_val_auc = val_auc
torch.save(model.state_dict(), 'best_dkt_model.pt')
print(f"Epoch {epoch+1}/{n_epochs} - "
f"Loss: {avg_loss:.4f}, Val AUC: {val_auc:.4f}")
return history
4. Inżynieria cech sygnałów uczących się
Jakość funkcji jest często ważniejsza niż architektura modelu. System adaptacyjny w produkcji musi wykraczać poza prostą parę (przedmiot, poprawny/nieprawidłowy) i uchwycić kontekst bogaty w interakcje edukacyjne.
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
import math
@dataclass
class LearningInteraction:
"""Una singola interazione di apprendimento."""
student_id: str
item_id: str
kc_ids: list # Knowledge components coinvolti
is_correct: bool
response_time_ms: int # Tempo di risposta in millisecondi
timestamp: datetime
hint_used: bool = False
attempt_number: int = 1
confidence_level: Optional[int] = None # 1-5 se richiesto
@dataclass
class StudentFeatures:
"""Feature aggregate per uno studente su un KC."""
student_id: str
kc_id: str
# Accuracy features
total_attempts: int = 0
correct_count: int = 0
recent_accuracy: float = 0.0 # Ultimi 5 tentativi
streak_correct: int = 0 # Sequenza corrente di corrette
streak_incorrect: int = 0
# Time features
avg_response_time: float = 0.0
response_time_trend: float = 0.0 # Positivo = rallentamento
time_since_last_practice: float = 0.0 # In ore
# Engagement features
hint_rate: float = 0.0 # Proporzione con hint
first_attempt_accuracy: float = 0.0 # Solo primi tentativi
# Memory features (Ebbinghaus forgetting curve)
estimated_retention: float = 1.0
class FeatureExtractor:
"""
Estrae feature per il sistema adattivo da sequenze di interazioni.
"""
def __init__(self, decay_factor: float = 0.3):
"""
decay_factor: controlla la decadenza della curva di dimenticanza
"""
self.decay_factor = decay_factor
def ebbinghaus_retention(
self,
days_since_last: float,
memory_strength: float = 1.0
) -> float:
"""
Modello di dimenticanza di Ebbinghaus.
R = exp(-t / (S * stability_factor))
"""
if days_since_last <= 0:
return 1.0
stability = max(memory_strength, 0.1)
return math.exp(-days_since_last / (stability * 30))
def compute_features(
self,
student_id: str,
kc_id: str,
interactions: list
) -> StudentFeatures:
"""
Calcola le feature per uno studente su un KC.
interactions: lista di LearningInteraction ordinate per timestamp
"""
kc_interactions = [
i for i in interactions
if kc_id in i.kc_ids and i.student_id == student_id
]
if not kc_interactions:
return StudentFeatures(student_id=student_id, kc_id=kc_id)
features = StudentFeatures(student_id=student_id, kc_id=kc_id)
# Accuracy
features.total_attempts = len(kc_interactions)
features.correct_count = sum(1 for i in kc_interactions if i.is_correct)
# Recent accuracy (finestra ultimi 5)
recent = kc_interactions[-5:]
if recent:
features.recent_accuracy = sum(1 for i in recent if i.is_correct) / len(recent)
# Streak
streak_c = 0
streak_i = 0
for interaction in reversed(kc_interactions):
if interaction.is_correct:
if streak_i == 0:
streak_c += 1
else:
break
else:
if streak_c == 0:
streak_i += 1
else:
break
features.streak_correct = streak_c
features.streak_incorrect = streak_i
# Response time
times = [i.response_time_ms for i in kc_interactions]
features.avg_response_time = sum(times) / len(times)
# Trend risposta (positivo = rallentamento)
if len(times) >= 3:
recent_avg = sum(times[-3:]) / 3
older_avg = sum(times[:-3]) / max(len(times) - 3, 1)
features.response_time_trend = (recent_avg - older_avg) / max(older_avg, 1)
# Tempo dall'ultima pratica
last_practice = kc_interactions[-1].timestamp
now = datetime.now()
hours_since = (now - last_practice).total_seconds() / 3600
features.time_since_last_practice = hours_since
# Hint rate
features.hint_rate = sum(1 for i in kc_interactions if i.hint_used) / len(kc_interactions)
# First attempt accuracy
first_attempts = [i for i in kc_interactions if i.attempt_number == 1]
if first_attempts:
features.first_attempt_accuracy = (
sum(1 for i in first_attempts if i.is_correct) / len(first_attempts)
)
# Retention stimata
days_since = hours_since / 24
memory_strength = features.first_attempt_accuracy * 2 # 0-2
features.estimated_retention = self.ebbinghaus_retention(days_since, memory_strength)
return features
def compute_next_review_time(self, features: StudentFeatures) -> datetime:
"""
Calcola il momento ottimale per la prossima revisione (spaced repetition).
Ispirato all'algoritmo SM-2 di SuperMemo.
"""
accuracy = features.first_attempt_accuracy
attempts = features.total_attempts
# Intervallo di base in giorni
if attempts <= 1:
interval_days = 1
elif attempts == 2:
interval_days = 3
else:
# Ease factor basato sull'accuracy
ease = 1.3 + (2.5 - 1.3) * accuracy
interval_days = max(1, round(
features.time_since_last_practice / 24 * ease
))
return datetime.now() + timedelta(days=interval_days)
5. Architektura systemu produkcyjnego
System adaptacyjny w środowisku produkcyjnym musi obsłużyć setki żądań na sekundę z opóźnieniami podanymi poniżej 100 ms. Następująca architektura oddziela ścieżka internetowa (serwowanie w czasie rzeczywistym) odścieżka offline (szkolenie wsadowe i kalibracja).
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
import aioredis
import asyncpg
app = FastAPI(title="Adaptive Learning API")
# ---- Modelli Pydantic ----
class ResponseRequest(BaseModel):
student_id: str
item_id: str
is_correct: bool
response_time_ms: int
hint_used: bool = False
class NextItemResponse(BaseModel):
item_id: str
estimated_difficulty: float
reason: str
mastery_state: dict
# ---- Servizi ----
class AdaptiveEngine:
"""
Engine principale per raccomandazioni adattive.
Usa Redis per feature cache e PostgreSQL per persistenza.
"""
def __init__(self, redis_client, db_pool):
self.redis = redis_client
self.db = db_pool
self.bkt_system = None # Inizializzato al startup
self.irt_model = None
async def process_response(
self,
request: ResponseRequest,
background_tasks: BackgroundTasks
) -> dict:
"""
Pipeline di processing per una risposta.
1. Persiste la risposta nel DB (async)
2. Aggiorna lo stato BKT dalla cache Redis
3. Seleziona il prossimo item
4. Schedula aggiornamento features in background
"""
student_id = request.student_id
item_id = request.item_id
# 1. Get KCs per questo item
kc_ids = await self.get_item_kcs(item_id)
# 2. Aggiorna BKT per ogni KC (via Redis)
mastery_updates = {}
for kc_id in kc_ids:
cache_key = f"bkt:{student_id}:{kc_id}"
# Recupera stato dalla cache
cached = await self.redis.get(cache_key)
if cached:
p_mastery = float(cached)
else:
# Fallback al DB
p_mastery = await self.get_mastery_from_db(student_id, kc_id)
# Aggiorna con nuova risposta
tracker = BKTTracker(await self.get_bkt_params(kc_id))
tracker.p_mastery = p_mastery
new_mastery = tracker.update(request.is_correct)
# Salva in Redis con TTL 24h
await self.redis.setex(cache_key, 86400, str(new_mastery))
mastery_updates[kc_id] = new_mastery
# 3. Seleziona prossimo item
next_item = await self.select_next_item(student_id, mastery_updates)
# 4. Background: persisti nel DB
background_tasks.add_task(
self.persist_interaction,
request,
mastery_updates
)
return {
'mastery_updates': mastery_updates,
'next_item': next_item
}
async def select_next_item(
self,
student_id: str,
mastery_updates: dict
) -> dict:
"""
Seleziona il prossimo item usando una strategia multi-obiettivo:
- Bilancia apprendimento di nuovi KC vs rinforzo di quelli deboli
- Considera la curva di dimenticanza
- Rispetta il curriculum graph
"""
# Ottieni profilo completo del curriculum
curriculum = await self.get_curriculum_graph(student_id)
# Calcola priorità per ogni KC disponibile
priorities = {}
for kc_id, kc_data in curriculum.items():
mastery = mastery_updates.get(kc_id, kc_data.get('mastery', 0.0))
# Priorità alta per KC quasi-mastered (0.7-0.9) - spingili oltre la soglia
if 0.70 <= mastery < 0.95:
priority = 2.0 * (mastery - 0.70) / 0.25
# Priorità media per KC molto deboli - ma non sopraffatti
elif mastery < 0.30:
priority = 0.5
# KC mastered - bassa priorità, solo spaced repetition
elif mastery >= 0.95:
retention = kc_data.get('estimated_retention', 1.0)
priority = (1 - retention) * 0.3
else:
priority = 1.0
priorities[kc_id] = priority
# Seleziona KC con più alta priorità
if not priorities:
raise HTTPException(status_code=404, detail="No available KCs")
target_kc = max(priorities, key=priorities.get)
# Seleziona item per questo KC usando IRT
available_items = await self.get_items_for_kc(target_kc)
student_ability = await self.get_student_ability(student_id)
# CAT selection
optimal_item = self.irt_model.next_item_cat(
student_id, available_items, strategy='max_information'
)
item_params = self.irt_model.item_params.get(optimal_item, {'a': 1.0, 'b': 0.0})
return {
'item_id': optimal_item,
'kc_id': target_kc,
'estimated_difficulty': item_params.get('b', 0.0),
'student_ability': student_ability,
'mastery_before': mastery_updates.get(target_kc, 0.0),
'priority_reason': 'near_mastery' if priorities[target_kc] > 1.5 else 'standard'
}
async def get_item_kcs(self, item_id: str) -> list:
"""Recupera i KC associati a un item dal DB."""
async with self.db.acquire() as conn:
rows = await conn.fetch(
"SELECT kc_id FROM item_kc_mapping WHERE item_id = $1",
item_id
)
return [row['kc_id'] for row in rows]
# ---- Route FastAPI ----
@app.post("/api/v1/adaptive/response")
async def process_response(
request: ResponseRequest,
background_tasks: BackgroundTasks,
engine: AdaptiveEngine = None # Dipendenza iniettata
) -> NextItemResponse:
"""
Processa una risposta e restituisce il prossimo item adattivo.
Target latency: <100ms (p95)
"""
result = await engine.process_response(request, background_tasks)
next_item = result['next_item']
return NextItemResponse(
item_id=next_item['item_id'],
estimated_difficulty=next_item['estimated_difficulty'],
reason=next_item['priority_reason'],
mastery_state=result['mastery_updates']
)
6. Testowanie A/B algorytmów adaptacyjnych
Walidacja nowego algorytmu w środowisku produkcyjnym wymaga struktury testów A/B, która uwzględnia: specyfika edukacji: uczniowie nie są wymienni, efekty uczenia się mają one charakter kumulacyjny, a odpowiednie wskaźniki wykraczają poza współczynnik klikalności.
import hashlib
from enum import Enum
from typing import Callable
import scipy.stats as stats
class ExperimentVariant(Enum):
CONTROL = "control"
TREATMENT = "treatment"
class ABTestManager:
"""
Framework A/B testing per algoritmi adattivi.
Usa student_id hashing per assegnazione deterministica.
"""
def __init__(self, experiment_id: str, treatment_fraction: float = 0.5):
self.experiment_id = experiment_id
self.treatment_fraction = treatment_fraction
self.metrics = {
ExperimentVariant.CONTROL: [],
ExperimentVariant.TREATMENT: []
}
def assign_variant(self, student_id: str) -> ExperimentVariant:
"""
Assegna uno studente a control o treatment in modo deterministico.
Lo stesso studente riceve sempre la stessa variante (no leakage).
"""
hash_input = f"{self.experiment_id}:{student_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
bucket = (hash_value % 100) / 100.0
return (
ExperimentVariant.TREATMENT
if bucket < self.treatment_fraction
else ExperimentVariant.CONTROL
)
def record_metric(
self,
student_id: str,
metric_value: float,
metric_name: str = 'mastery_gain'
):
"""Registra una metrica per uno studente."""
variant = self.assign_variant(student_id)
self.metrics[variant].append(metric_value)
def analyze_results(self) -> dict:
"""
Analisi statistica con Welch's t-test.
Adatto a campioni di dimensioni diverse.
"""
control = self.metrics[ExperimentVariant.CONTROL]
treatment = self.metrics[ExperimentVariant.TREATMENT]
if len(control) < 30 or len(treatment) < 30:
return {'error': 'Campione insufficiente (min 30 per variante)'}
t_stat, p_value = stats.ttest_ind(
treatment, control, equal_var=False
)
control_mean = sum(control) / len(control)
treatment_mean = sum(treatment) / len(treatment)
relative_lift = (treatment_mean - control_mean) / max(abs(control_mean), 1e-9)
# Cohen's d per effect size
pooled_std = (
(sum((x - control_mean)**2 for x in control) +
sum((x - treatment_mean)**2 for x in treatment)) /
(len(control) + len(treatment) - 2)
) ** 0.5
cohens_d = (treatment_mean - control_mean) / max(pooled_std, 1e-9)
return {
'experiment_id': self.experiment_id,
'control_n': len(control),
'treatment_n': len(treatment),
'control_mean': round(control_mean, 4),
'treatment_mean': round(treatment_mean, 4),
'relative_lift': round(relative_lift * 100, 2), # Percentuale
'p_value': round(p_value, 4),
'statistically_significant': p_value < 0.05,
'cohens_d': round(cohens_d, 3),
'practical_significance': abs(cohens_d) > 0.2,
'recommendation': (
'Deploy treatment' if p_value < 0.05 and cohens_d > 0.2
else 'Keep control' if p_value < 0.05 and cohens_d < -0.2
else 'No significant difference'
)
}
# Metriche chiave per A/B testing in EdTech
EDTECH_METRICS = [
'mastery_gain_per_session', # Incremento P(mastery) per sessione
'time_to_mastery', # Minuti necessari per raggiungere P(mastery)>=0.95
'session_completion_rate', # % sessioni completate
'return_rate_7d', # % studenti che tornano entro 7 giorni
'assessment_accuracy_post', # Accuratezza nei test formali post-training
]
7. Monitorowanie i wykrywanie dryfu
Modele edukacyjne podlegają dryf koncepcji z różnych powodów: zmiana program nauczania, sezonowość akademicka, aktualizacja materiałów dydaktycznych. System monitorowania solidne i niezbędne do utrzymania jakości prognoz w czasie.
Sygnały dryfu w systemach adaptacyjnych
- Dryf dokładności: AUC modelu DKT spada w bieżącym okresie poniżej 0,70
- Dryft kalibracyjny: Przedziały prawdopodobieństwa nie odpowiadają zaobserwowanym dokładnościom
- Dryf funkcji: Rozkład czasów reakcji zmienia się znacząco
- Zmiana etykiety: odsetek poprawnych odpowiedzi zmienia się dla stabilnych kohort
from collections import deque
import numpy as np
class ModelMonitor:
"""
Monitor per rilevamento drift in modelli adattivi.
Implementa ADWIN (Adaptive Windowing) per concept drift detection.
"""
def __init__(
self,
window_size: int = 1000,
drift_threshold: float = 0.02,
alert_callback: Callable = None
):
self.window = deque(maxlen=window_size)
self.drift_threshold = drift_threshold
self.alert_callback = alert_callback
self.baseline_auc = None
def add_prediction(self, predicted_prob: float, actual_label: int):
"""Aggiunge una predizione al buffer di monitoraggio."""
self.window.append((predicted_prob, actual_label))
if len(self.window) >= 100:
self._check_drift()
def _check_drift(self):
"""Verifica il drift confrontando finestre recenti vs storiche."""
window_list = list(self.window)
n = len(window_list)
half = n // 2
# Calcola AUC su prima e seconda meta
first_half = window_list[:half]
second_half = window_list[half:]
auc_first = self._compute_auc(first_half)
auc_second = self._compute_auc(second_half)
drift_magnitude = abs(auc_first - auc_second)
if drift_magnitude > self.drift_threshold:
alert = {
'type': 'concept_drift',
'auc_old': round(auc_first, 4),
'auc_new': round(auc_second, 4),
'magnitude': round(drift_magnitude, 4),
'recommended_action': (
'retrain' if drift_magnitude > 0.10
else 'monitor_closely'
)
}
if self.alert_callback:
self.alert_callback(alert)
def _compute_auc(self, predictions: list) -> float:
"""Calcola AUC-ROC per una lista di (predicted_prob, actual_label)."""
if not predictions:
return 0.5
sorted_preds = sorted(predictions, key=lambda x: x[0], reverse=True)
n_pos = sum(1 for _, y in predictions if y == 1)
n_neg = len(predictions) - n_pos
if n_pos == 0 or n_neg == 0:
return 0.5
tpr_points = []
fpr_points = []
tp = fp = 0
for prob, label in sorted_preds:
if label == 1:
tp += 1
else:
fp += 1
tpr_points.append(tp / n_pos)
fpr_points.append(fp / n_neg)
# Trapezoidal integration
auc = 0.0
for i in range(1, len(fpr_points)):
auc += (fpr_points[i] - fpr_points[i-1]) * (tpr_points[i] + tpr_points[i-1]) / 2
return auc
Anty-wzorzec: optymalizacja niewłaściwej metryki
Najbardziej podstępnym zagrożeniem w systemach adaptacyjnych jest optymalizacja pod kątem wskaźników zaangażowania (czas na platformie, kliknięcia, sesje), a nie na rzeczywiste efekty uczenia się. System, który maksymalizuje czas spędzony na platformie, mógłby stworzyć łatwą pętlę satysfakcję, która utrzymuje uczniów w strefie komfortu, zamiast popychać ich w jej kierunku mistrzostwo. Przed rozpoczęciem budowy zdefiniuj wskaźniki sukcesu w kategoriach pedagogicznych.
Najlepsze praktyki dotyczące systemów adaptacyjnych w produkcji
- Zimny start: W przypadku nowych uczniów bez historii należy zastosować krótką ocenę diagnostyczną 5–10 elementów przed aktywacją algorytmu adaptacyjnego. Mieszkanie przed prowadzi do sekwencji pedagogicznie nieważne.
- Interpretowalność: Zawsze wdrażaj czytelne wyjaśnienie dla każdego z nich zalecenie. „Proponujemy to ćwiczenie, ponieważ masz trudności z wyprowadzaniem funkcji złożonych” i są niezbędne dla pewności siebie uczniów.
- Różnorodność: Unikaj polecania zawsze tego samego typu produktów. Zintegruj parametr różnorodności w funkcji selekcji, aby zapobiec efektowi „pęcherzyka filtrującego”.
- Ograniczenia programowe: Wykres programu nauczania musi ograniczać zalecenia. Nie proponuj całek uczniowi, który samodzielnie nie opanował jeszcze pochodnych z tego, co sugeruje algorytm.
- Nadzór człowieka: Nauczyciele muszą mieć możliwość ignorowania zaleceń i otrzymywać raporty dotyczące postępów w zajęciach.
Wnioski
Algorytmy uczenia się adaptacyjnego stanowią jedną z najbardziej fascynujących dziedzin na styku uczenia maszynowego i nauk o edukacji. Z teorii matematycznej IRT do skalowalnej architektury z Redis i FastAPI, podróż od badań do produkcji wymaga dbałości o trafność pedagogiczną i efektywność obliczeniową.
Wyniki są konkretne: dobrze zaprojektowane systemy wykazują poprawę o 20–40%. czas potrzebny do opanowania w porównaniu ze statycznymi ścieżkami liniowymi. Wraz z rozwojem modeli neuronowych podobnie jak DKT i integracje z LLM, dziedzina ta szybko ewoluuje w kierunku korepetycji coraz bardziej wyrafinowane, spersonalizowane.
Powiązane artykuły z serii EdTech
- Artykuł 00: Skalowalna architektura LMS: wzorzec wielu najemców
- Artykuł 04: Spersonalizowany nauczyciel z LLM i RAG
- Artykuł 06: Analityka uczenia się z xAPI i Kafką







