Learning Analytics: Data Pipeline cu xAPI și Kafka
Fiecare clic, fiecare răspuns greșit, fiecare videoclip întrerupt și fiecare exercițiu abandonat spune ceva important despre procesul de învățare al unui elev. Platformele EdTech-urile moderne colectează miliarde din aceste evenimente în fiecare zi, dar majoritatea le aruncă sau le folosește doar pentru statistici agregate banale: „cursul X are 70% din finalizare”. Aceasta este o risipă de date și o oportunitate ratată.
Analiza învățării și disciplina care transformă aceste date comportamentale în perspectivă acționabilă: preziceți care studenți vor abandona cursul, identificați conținuturile care provoacă cele mai multe dificultăți, optimizează traseele de învățare în timp real. Dar pentru a face acest lucru aveți nevoie de două ingrediente tehnice fundamentale: unul standardele de date care permite interoperabilitatea (xAPI) șiinfrastructură capabil de a gestiona evenimente de streaming de mare volum (Apache Kafka).
În acest articol vom construi o conductă completă de analiză a învățării: de la generație a declarațiilor xAPI în clienți, la ingerarea în timp real cu Kafka, la procesare cu Flink pentru detectarea tiparelor comportamentale, până la tablouri de bord pentru profesori.
Ce veți învăța în acest articol
- xAPI (Experience API / Tin Can): structură de declarații și cele mai bune practici
- Learning Record Store (LRS): arhitectura și alegerea soluției
- Conducta Kafka pentru evenimente educaționale de mare volum
- Grupuri de consumatori pentru procesare paralelă și tolerantă la erori
- Detectarea precoce a elevilor cu risc de abandon școlar
- Agregări în timp real cu Kafka Streams
- Tabloul de bord de analiză a profesorilor cu valori cheie
- GDPR și confidențialitate în colectarea datelor de învățare
1. xAPI: Standardul pentru datele de învățare
xAPI (Experience API, numit și „Tin Can API”) este standardul internațional pentru reprezentarea experiențelor de învățare. Depășiți limitările SCORM (care a urmărit doar „finalizat/nefinalizat” într-un LMS închis) permițând pentru a urmări orice activitate de învățare, online sau offline, în interior sau în exterior dintr-un LMS, cu un vocabular standardizat și interoperabil.
Structura de bază a unei instrucțiuni xAPI este simplă: Actor (OMS), Verb (ce a facut), Obiect (despre ce). Opțional: Rezultat (cu ce rezultat), Context (în ce context) e Marca temporală. Această structură subiect-verb-obiect și suficient de intuitiv și flexibil pentru a acoperi orice scenariu educațional.
# xapi/statement_builder.py
from dataclasses import dataclass, field, asdict
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
import uuid
@dataclass
class XAPIActor:
"""Chi compie l'azione."""
mbox: str # mailto:user@example.com
name: str
objectType: str = "Agent"
@dataclass
class XAPIVerb:
"""Cosa ha fatto l'attore. Usa vocabolari standardizzati (ADL, TinCan)."""
id: str # URI del verbo, es: http://adlnet.gov/expapi/verbs/completed
display: Dict[str, str] # {"en-US": "completed", "it-IT": "completato"}
@dataclass
class XAPIActivity:
"""Su cosa e stata compiuta l'azione."""
id: str # URI univoca dell'oggetto
objectType: str = "Activity"
definition: Optional[Dict] = None
@dataclass
class XAPIResult:
"""Risultato dell'azione (opzionale)."""
score: Optional[Dict] = None # {"scaled": 0.85, "raw": 85, "min": 0, "max": 100}
success: Optional[bool] = None
completion: Optional[bool] = None
duration: Optional[str] = None # ISO 8601: "PT5M30S" = 5 minuti 30 secondi
response: Optional[str] = None # Risposta testuale dello studente
@dataclass
class XAPIContext:
"""Contesto aggiuntivo."""
platform: str = "EdTech Platform"
language: str = "it-IT"
contextActivities: Optional[Dict] = None # Gerarchia: course -> module -> lesson
extensions: Optional[Dict] = None # Dati custom (device, browser, etc.)
@dataclass
class XAPIStatement:
actor: XAPIActor
verb: XAPIVerb
object: XAPIActivity
id: str = field(default_factory=lambda: str(uuid.uuid4()))
result: Optional[XAPIResult] = None
context: Optional[XAPIContext] = None
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
stored: Optional[str] = None
version: str = "1.0.3"
def to_dict(self) -> Dict:
return asdict(self)
# Verbi xAPI standard (ADL vocabulary)
XAPI_VERBS = {
"completed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/completed",
display={"en-US": "completed", "it-IT": "completato"},
),
"attempted": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/attempted",
display={"en-US": "attempted", "it-IT": "tentato"},
),
"passed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/passed",
display={"en-US": "passed", "it-IT": "superato"},
),
"failed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/failed",
display={"en-US": "failed", "it-IT": "fallito"},
),
"experienced": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/experienced",
display={"en-US": "experienced", "it-IT": "esperienzato"},
),
"asked": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/asked",
display={"en-US": "asked", "it-IT": "chiesto"},
),
"interacted": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/interacted",
display={"en-US": "interacted", "it-IT": "interagito"},
),
}
class StatementBuilder:
"""Builder per statement xAPI con validazione."""
def __init__(self, platform_url: str, tenant_id: str):
self.platform_url = platform_url
self.tenant_id = tenant_id
def lesson_completed(
self,
student_id: str,
student_email: str,
lesson_id: str,
lesson_title: str,
course_id: str,
duration_seconds: int,
score_percent: Optional[float] = None,
) -> XAPIStatement:
return XAPIStatement(
actor=XAPIActor(
mbox=f"mailto:{student_email}",
name=student_id,
),
verb=XAPI_VERBS["completed"],
object=XAPIActivity(
id=f"{self.platform_url}/activities/{lesson_id}",
definition={
"name": {"it-IT": lesson_title},
"type": "http://adlnet.gov/expapi/activities/lesson",
},
),
result=XAPIResult(
completion=True,
duration=f"PT{duration_seconds}S",
score={"scaled": score_percent / 100, "raw": score_percent, "min": 0, "max": 100} if score_percent else None,
),
context=XAPIContext(
contextActivities={
"parent": [{"id": f"{self.platform_url}/activities/course/{course_id}"}],
"grouping": [{"id": f"{self.platform_url}/activities/tenant/{self.tenant_id}"}],
},
extensions={
"https://schema.example.com/extensions/student_id": student_id,
"https://schema.example.com/extensions/tenant_id": self.tenant_id,
},
),
)
def quiz_answered(
self,
student_id: str,
student_email: str,
question_id: str,
question_text: str,
student_response: str,
correct: bool,
time_spent_seconds: int,
) -> XAPIStatement:
verb = XAPI_VERBS["passed"] if correct else XAPI_VERBS["failed"]
return XAPIStatement(
actor=XAPIActor(mbox=f"mailto:{student_email}", name=student_id),
verb=verb,
object=XAPIActivity(
id=f"{self.platform_url}/activities/question/{question_id}",
definition={
"name": {"it-IT": question_text[:100]},
"type": "http://adlnet.gov/expapi/activities/cmi.interaction",
},
),
result=XAPIResult(
success=correct,
response=student_response[:500], # Tronca per privacy
duration=f"PT{time_spent_seconds}S",
),
)
2. Conducta Kafka pentru evenimente educaționale
Cu milioane de studenți care generează evenimente simultan, avem nevoie de a sistem de mesagerie care garantează: disponibilitate ridicată, sortare garantată per elev (toate evenimentele aceluiași studentul trebuie procesat în ordine), e reluare (poate reelaborăm evenimentele istorice când schimbăm logica analizei). Apache Kafka îndeplinește toate aceste cerințe și este alegerea standard în producție pentru conducte de evenimente de mare volum.
# kafka/producer.py
import json
import logging
from typing import Optional
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
logger = logging.getLogger(__name__)
class LearningEventProducer:
"""
Producer Kafka per eventi xAPI.
Usa la chiave di partizione student_id per garantire ordine per studente.
"""
TOPIC = "edtech.xapi.statements"
def __init__(self, bootstrap_servers: str):
self.bootstrap_servers = bootstrap_servers
self._producer: Optional[AIOKafkaProducer] = None
async def start(self):
self._producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None,
# Configurazione per affidabilità
acks="all", # Conferma da tutti i replica
enable_idempotence=True, # Exactly-once delivery
compression_type="gzip", # Compressione per ridurre banda
max_batch_size=64 * 1024, # 64KB batch
linger_ms=10, # Aspetta 10ms per batch più grandi
)
await self._producer.start()
logger.info(f"Kafka producer started, topic: {self.TOPIC}")
async def stop(self):
if self._producer:
await self._producer.stop()
async def send_statement(self, statement: "XAPIStatement") -> bool:
"""
Invia uno statement xAPI a Kafka.
La partition key e lo student_id per garantire ordine per studente.
"""
if not self._producer:
raise RuntimeError("Producer non avviato. Chiama start() prima.")
statement_dict = statement.to_dict()
student_id = statement_dict["actor"]["name"]
try:
await self._producer.send_and_wait(
topic=self.TOPIC,
key=student_id, # Partition key: stesso studente -> stessa partizione
value=statement_dict,
headers=[
("content-type", b"application/json"),
("schema-version", b"1.0.3"),
],
)
return True
except KafkaError as e:
logger.error(f"Errore invio statement Kafka: {e}", exc_info=True)
return False
async def send_batch(self, statements: list) -> int:
"""Invia un batch di statement. Ritorna il numero di statement inviati con successo."""
success_count = 0
async with self._producer.transaction():
for stmt in statements:
if await self.send_statement(stmt):
success_count += 1
return success_count
# kafka/consumer.py
import asyncio
import json
import logging
from typing import Callable, Awaitable
from aiokafka import AIOKafkaConsumer
logger = logging.getLogger(__name__)
class LearningEventConsumer:
"""
Consumer Kafka per elaborazione statement xAPI.
Consumer groups per elaborazione parallela e fault-tolerant.
"""
def __init__(
self,
bootstrap_servers: str,
group_id: str,
handler: Callable[[dict], Awaitable[None]],
):
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.handler = handler
self._consumer: Optional[AIOKafkaConsumer] = None
self._running = False
async def start(self, topics: list = None):
topics = topics or ["edtech.xapi.statements"]
self._consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
auto_offset_reset="earliest",
enable_auto_commit=False, # Commit manuale per at-least-once
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
max_poll_records=100, # Batch di 100 messaggi per poll
)
await self._consumer.start()
self._running = True
logger.info(f"Consumer {self.group_id} avviato su {topics}")
async def run(self):
"""Loop principale di consumo messaggi."""
if not self._consumer:
raise RuntimeError("Consumer non avviato.")
try:
async for message in self._consumer:
try:
await self.handler(message.value)
# Commit dopo elaborazione con successo
await self._consumer.commit()
except Exception as e:
logger.error(
f"Errore elaborazione messaggio offset {message.offset}: {e}",
exc_info=True,
)
# Non committare: il messaggio verrà rielaborato
# In produzione: invia a Dead Letter Queue dopo N tentativi
finally:
await self._consumer.stop()
3. Detectare de avertizare timpurie: studenți cu risc de abandon
Cea mai importantă măsură pentru o platformă EdTech nu este numărul de înscrieri, dar cel rata de finalizare. Identificați elevii devreme cu risc de abandon (predicția abandonului) vă permite să interveniți cu notificări personalizate, sprijinul tutorelui sau modificarea traseului de învățare. Folosim Kafka Streams pentru a calcula semnalele de risc în timp real.
# analytics/dropout_detector.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class DropoutRiskSignal:
student_id: str
course_id: str
risk_level: RiskLevel
risk_score: float # 0.0 - 1.0
contributing_factors: List[str]
last_activity: Optional[datetime]
recommended_action: str
calculated_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class StudentEngagementMetrics:
student_id: str
course_id: str
days_since_last_activity: int
completion_rate: float # 0.0 - 1.0
avg_quiz_score: float # 0.0 - 100.0
quiz_failure_rate: float # 0.0 - 1.0
avg_session_duration_minutes: float
total_sessions_last_30d: int
video_rewatch_rate: float # Quante volte rivede video (difficolta?)
help_requests_last_7d: int
class DropoutRiskDetector:
"""
Calcola il rischio di abbandono basato su un modello a regole.
In produzione: sostituisci con un modello ML addestrato su dati storici.
"""
RISK_THRESHOLDS = {
RiskLevel.LOW: (0.0, 0.3),
RiskLevel.MEDIUM: (0.3, 0.6),
RiskLevel.HIGH: (0.6, 0.8),
RiskLevel.CRITICAL: (0.8, 1.0),
}
RECOMMENDED_ACTIONS = {
RiskLevel.LOW: "Monitoraggio routinario. Nessuna azione immediata.",
RiskLevel.MEDIUM: "Invia notifica push motivazionale. Suggerisci contenuto più facile.",
RiskLevel.HIGH: "Contatta il tutor del corso. Proponi sessione di supporto 1:1.",
RiskLevel.CRITICAL: "Escalation immediata: email al responsabile del corso e allo studente.",
}
def calculate_risk(self, metrics: StudentEngagementMetrics) -> DropoutRiskSignal:
score = 0.0
factors = []
# Fattore 1: Inattivita recente (peso: 35%)
inactivity_score = self._score_inactivity(metrics.days_since_last_activity)
score += inactivity_score * 0.35
if inactivity_score > 0.5:
factors.append(f"Inattivo da {metrics.days_since_last_activity} giorni")
# Fattore 2: Progresso nel corso (peso: 25%)
progress_score = 1.0 - metrics.completion_rate
score += progress_score * 0.25
if progress_score > 0.7:
factors.append(f"Progresso corso: solo {metrics.completion_rate:.0%}")
# Fattore 3: Performance ai quiz (peso: 20%)
quiz_score = self._score_quiz_performance(metrics.avg_quiz_score, metrics.quiz_failure_rate)
score += quiz_score * 0.20
if quiz_score > 0.6:
factors.append(f"Media quiz bassa: {metrics.avg_quiz_score:.1f}/100")
# Fattore 4: Riduzione sessioni (peso: 15%)
session_score = 1.0 - min(metrics.total_sessions_last_30d / 10, 1.0)
score += session_score * 0.15
if metrics.total_sessions_last_30d < 3:
factors.append(f"Solo {metrics.total_sessions_last_30d} sessioni in 30 giorni")
# Fattore 5: Segnali di frustrazione (peso: 5%)
frustration_score = min(metrics.video_rewatch_rate + metrics.help_requests_last_7d * 0.1, 1.0)
score += frustration_score * 0.05
if metrics.video_rewatch_rate > 2.0:
factors.append("Alto numero di revisioni video (possibile difficolta)")
score = min(max(score, 0.0), 1.0)
risk_level = self._score_to_level(score)
return DropoutRiskSignal(
student_id=metrics.student_id,
course_id=metrics.course_id,
risk_level=risk_level,
risk_score=score,
contributing_factors=factors,
last_activity=None, # Iniettare datetime reale
recommended_action=self.RECOMMENDED_ACTIONS[risk_level],
)
def _score_inactivity(self, days: int) -> float:
if days <= 1: return 0.0
if days <= 3: return 0.2
if days <= 7: return 0.5
if days <= 14: return 0.8
return 1.0
def _score_quiz_performance(self, avg_score: float, failure_rate: float) -> float:
score_component = max(0, (60 - avg_score) / 60) # Baseline 60%
failure_component = min(failure_rate * 1.5, 1.0)
return (score_component + failure_component) / 2
def _score_to_level(self, score: float) -> RiskLevel:
for level, (low, high) in self.RISK_THRESHOLDS.items():
if low <= score < high:
return level
return RiskLevel.CRITICAL
4. Agregări în timp real cu fluxuri Kafka
Kafka Streams vă permite să procesați evenimente în timp real direct în clusterul Kafka,
fără infrastructură suplimentară. Folosim biblioteca Python faust
pentru a calcula valorile agregate pe o fereastră de timp de 1 oră.
# analytics/streaming_aggregator.py
import faust
from datetime import datetime, timedelta
from typing import Optional
app = faust.App(
"edtech-analytics",
broker="kafka://localhost:9092",
value_serializer="json",
)
# Topic di input
xapi_topic = app.topic("edtech.xapi.statements")
# Topic di output per dashboard
course_metrics_topic = app.topic("edtech.analytics.course-metrics")
dropout_alerts_topic = app.topic("edtech.analytics.dropout-alerts")
class CourseMetrics(faust.Record):
course_id: str
window_start: str
window_end: str
total_events: int = 0
unique_students: int = 0
lessons_completed: int = 0
quizzes_passed: int = 0
quizzes_failed: int = 0
avg_quiz_score: float = 0.0
# Tabella aggregata per finestra di 1 ora
course_hourly_table = app.Table(
"course-hourly-metrics",
default=dict,
partitions=8,
)
student_activity_table = app.Table(
"student-activity",
default=dict,
partitions=8,
)
@app.agent(xapi_topic)
async def process_xapi_statement(statements):
"""Processa ogni statement xAPI e aggiorna le metriche aggregate."""
async for statement in statements:
actor = statement.get("actor", {})
verb = statement.get("verb", {}).get("id", "")
result = statement.get("result", {}) or {}
context = statement.get("context", {}) or {}
student_id = actor.get("name", "unknown")
course_id = _extract_course_id(context)
timestamp = statement.get("timestamp", "")
if not course_id:
continue
# Chiave finestra oraria
hour_key = f"{course_id}:{timestamp[:13]}" # ISO troncato all'ora
# Aggiorna metriche corso
metrics = course_hourly_table[hour_key]
metrics["total_events"] = metrics.get("total_events", 0) + 1
metrics.setdefault("students", set())
metrics["students"].add(student_id)
verb_local = verb.split("/")[-1] # Prendi solo il nome del verbo
if verb_local == "completed":
metrics["lessons_completed"] = metrics.get("lessons_completed", 0) + 1
elif verb_local == "passed":
metrics["quizzes_passed"] = metrics.get("quizzes_passed", 0) + 1
score = result.get("score", {}).get("raw", 0)
prev_avg = metrics.get("avg_quiz_score", 0.0)
prev_count = metrics.get("quiz_count", 0)
metrics["avg_quiz_score"] = (prev_avg * prev_count + score) / (prev_count + 1)
metrics["quiz_count"] = prev_count + 1
elif verb_local == "failed":
metrics["quizzes_failed"] = metrics.get("quizzes_failed", 0) + 1
course_hourly_table[hour_key] = metrics
# Aggiorna attivita studente per dropout detection
student_key = f"{student_id}:{course_id}"
student_data = student_activity_table[student_key]
student_data["last_activity"] = timestamp
student_data["event_count"] = student_data.get("event_count", 0) + 1
student_activity_table[student_key] = student_data
# Emetti metriche aggregate ogni 100 eventi
if metrics["total_events"] % 100 == 0:
await course_metrics_topic.send(
key=course_id,
value={
"course_id": course_id,
"hour_key": hour_key,
"total_events": metrics["total_events"],
"unique_students": len(metrics.get("students", set())),
"lessons_completed": metrics.get("lessons_completed", 0),
"quizzes_passed": metrics.get("quizzes_passed", 0),
"quizzes_failed": metrics.get("quizzes_failed", 0),
"avg_quiz_score": metrics.get("avg_quiz_score", 0.0),
},
)
def _extract_course_id(context: dict) -> Optional[str]:
"""Estrae il course_id dal contesto xAPI."""
parent_activities = context.get("contextActivities", {}).get("parent", [])
for activity in parent_activities:
activity_id = activity.get("id", "")
if "/activities/course/" in activity_id:
return activity_id.split("/activities/course/")[-1]
return None
5. Tabloul de bord pentru profesori: valori cheie
Datele colectate trebuie să devină informații utile pentru profesori. Un tablou de bord bun analiza învățării trebuie să arate nu numai ce se întâmplă (descriptiv), dar si de ce (diagnostic) și ce va face studentul (predictiv).
# api/analytics_dashboard.py
from fastapi import FastAPI, Depends, Query
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime, date, timedelta
class CourseOverview(BaseModel):
course_id: str
course_name: str
total_enrolled: int
active_last_7d: int
completion_rate: float
avg_quiz_score: float
at_risk_count: int
dropout_rate_30d: float
class StudentProgressDetail(BaseModel):
student_id: str
student_name: str
enrollment_date: date
completion_percent: float
avg_quiz_score: float
streak_days: int
days_since_last_activity: int
risk_level: str
risk_score: float
class ContentDifficultyReport(BaseModel):
lesson_id: str
lesson_title: str
avg_time_spent_minutes: float
rewatch_rate: float
quit_rate: float # % studenti che hanno abbandonato durante questo contenuto
avg_quiz_score_after: float
difficulty_index: float # Calcolato: alto = contenuto difficile
app = FastAPI(title="Learning Analytics Dashboard API")
@app.get("/api/analytics/courses/{course_id}/overview", response_model=CourseOverview)
async def get_course_overview(course_id: str, db=Depends(get_db)):
"""Panoramica corso con metriche chiave per il dashboard insegnante."""
row = await db.execute("""
SELECT
c.name,
COUNT(DISTINCT e.student_id) as enrolled,
COUNT(DISTINCT CASE WHEN a.last_activity > NOW() - INTERVAL '7 days' THEN a.student_id END) as active_7d,
AVG(CASE WHEN e.completed THEN 1.0 ELSE 0.0 END) as completion_rate,
AVG(q.avg_score) as avg_quiz,
COUNT(DISTINCT CASE WHEN r.risk_level IN ('high','critical') THEN r.student_id END) as at_risk,
COUNT(DISTINCT CASE WHEN e.dropped_out AND e.dropout_date > NOW() - INTERVAL '30 days' THEN e.student_id END)::float /
NULLIF(COUNT(DISTINCT CASE WHEN e.enrollment_date < NOW() - INTERVAL '30 days' THEN e.student_id END), 0) as dropout_rate
FROM courses c
LEFT JOIN enrollments e ON c.id = e.course_id
LEFT JOIN student_activity a ON e.student_id = a.student_id AND a.course_id = c.id
LEFT JOIN student_quiz_stats q ON e.student_id = q.student_id AND q.course_id = c.id
LEFT JOIN dropout_risk r ON e.student_id = r.student_id AND r.course_id = c.id
WHERE c.id = :cid
GROUP BY c.id, c.name
""", {"cid": course_id})
data = row.fetchone()
return CourseOverview(
course_id=course_id,
course_name=data[0],
total_enrolled=data[1] or 0,
active_last_7d=data[2] or 0,
completion_rate=float(data[3] or 0),
avg_quiz_score=float(data[4] or 0),
at_risk_count=data[5] or 0,
dropout_rate_30d=float(data[6] or 0),
)
@app.get("/api/analytics/courses/{course_id}/at-risk", response_model=List[StudentProgressDetail])
async def get_at_risk_students(
course_id: str,
risk_level: Optional[str] = Query(None, description="Filter by risk: low, medium, high, critical"),
db=Depends(get_db),
):
"""Lista studenti a rischio abbandono con dettagli progressione."""
query = """
SELECT
s.id, s.name,
e.enrollment_date,
COALESCE(prog.completion_percent, 0) as completion_percent,
COALESCE(qs.avg_score, 0) as avg_quiz_score,
COALESCE(str.current_streak, 0) as streak_days,
EXTRACT(DAY FROM NOW() - a.last_activity)::int as days_inactive,
r.risk_level,
r.risk_score
FROM students s
JOIN enrollments e ON s.id = e.student_id AND e.course_id = :cid
JOIN dropout_risk r ON s.id = r.student_id AND r.course_id = :cid
LEFT JOIN course_progress prog ON s.id = prog.student_id AND prog.course_id = :cid
LEFT JOIN student_quiz_stats qs ON s.id = qs.student_id AND qs.course_id = :cid
LEFT JOIN student_streaks str ON s.id = str.student_id
LEFT JOIN student_activity a ON s.id = a.student_id AND a.course_id = :cid
WHERE (:risk IS NULL OR r.risk_level = :risk)
ORDER BY r.risk_score DESC
LIMIT 100
"""
rows = (await db.execute(query, {"cid": course_id, "risk": risk_level})).fetchall()
return [
StudentProgressDetail(
student_id=r[0], student_name=r[1], enrollment_date=r[2],
completion_percent=float(r[3]), avg_quiz_score=float(r[4]),
streak_days=r[5], days_since_last_activity=r[6],
risk_level=r[7], risk_score=float(r[8]),
)
for r in rows
]
6. GDPR și confidențialitate în Learning Analytics
Datele de învățare sunt date personale sensibile. Colectați și analizați Interacțiunile studenților necesită conformitatea GDPR și, pentru minori, la legislația specifică (COPPA în SUA, directiva europeană privind protecția minorilor). Principii fundamentale: minimizarea datelor (numai colectați după cum este necesar), anonimizarea pentru rapoartele agregate, dreptul de a fi uitat (posibilitatea de a șterge toate datele studenților) e transparenţă (informații clare despre ce se colectează și de ce).
Lista de verificare GDPR pentru Learning Analytics
- Consimțământul explicit înainte de colectarea datelor detaliate despre comportament
- Anonimizați student_id în rapoartele agregate (nu includeți datele personale)
- Implementați DELETE /students/{id}/analytics-data endpoint pentru dreptul de a fi uitat
- Datele de la elevi minori necesită acordul părinților
- Politica de păstrare a datelor: definiți cât timp păstrați evenimentele brute (de exemplu, 2 ani)
- Jurnalul de audit al persoanelor care accesează analizele studenților
- Criptarea datelor în repaus și în tranzit
- Nu partajați date cu terți fără consimțământul explicit
Concluzii și pașii următori
Am construit o conductă completă de analiză a învățării: instrucțiuni xAPI pentru standardizarea datelor, Kafka pentru asimilare de volum mare cu garanții livrare, agregări în timp real cu Faust/Kafka Streams, detectarea abandonului bazat pe semnale comportamentale și API de bord pentru profesori.
Următorul pas este integrarea unui model ML antrenat pe date istorice înlocuiți detectorul de abandon bazat pe reguli: cu date suficiente, modelele ML atinge o acuratețe mai mare de 85% în identificarea timpurie a elevii aflați în risc, cu 2-3 săptămâni înainte de abandonul efectiv.
În următorul articol vom explora colaborare în timp real în platformele EdTech: CRDT cu Yjs pentru editare colaborativă și WebSocket pentru sincronizarea în timp real a documentelor partajate.
Seria EdTech Engineering
- Arhitectură LMS scalabilă: model multi-chiriași
- Algoritmi de învățare adaptivă: de la teorie la producție
- Streaming video pentru educație: WebRTC vs HLS vs DASH
- Sisteme de supraveghere AI: confidențialitate-în primul rând cu computer Vision
- Tutor personalizat cu LLM: RAG pentru fundamentarea cunoștințelor
- Motor de gamification: Arhitectură și mașină de stat
- Learning Analytics: Data Pipeline cu xAPI și Kafka (acest articol)
- Colaborare în timp real în EdTech: CRDT și WebSocket
- Mobile-First EdTech: Offline-First Architecture
- Managementul conținutului cu mai mulți chiriași: Versiune și SCORM







