Learning Analytics: Potok danych z xAPI i Kafką
Każde kliknięcie, każda zła odpowiedź, każde wstrzymane wideo i każde porzucone ćwiczenie mówi coś ważnego o procesie uczenia się ucznia. Platformy Nowoczesne EdTechy zbierają codziennie miliardy takich wydarzeń, ale większość wyrzuca je lub wykorzystuje jedynie do trywialnych zbiorczych statystyk: „kurs X ma 70%. ukończenie”. To strata danych i stracona szansa.
Analityka uczenia się oraz dyscyplina, która przekształca te dane behawioralne w praktyczny sposób: przewidź, którzy studenci opuszczą kurs, zidentyfikuj treści sprawiające najwięcej trudności, optymalizujemy ścieżki nauki w czasie rzeczywistym. Ale do tego potrzebne są dwa podstawowe składniki techniczne: jeden standardy danych który umożliwia interoperacyjność (xAPI) iinfrastruktura potrafi zarządzać wydarzenia o dużej liczbie transmisji strumieniowych (Apache Kafka).
W tym artykule zbudujemy kompletny potok analityki uczenia się: od generacji instrukcji xAPI u klientów, po pozyskiwanie w czasie rzeczywistym za pomocą Kafki, po przetwarzanie z Flink do wykrywania wzorców zachowań, aż po dashboardy dla nauczycieli.
Czego dowiesz się w tym artykule
- xAPI (Experience API / Tin Can): struktura instrukcji i najlepsze praktyki
- Learning Record Store (LRS): architektura i wybór rozwiązania
- Rurociąg Kafka do obsługi masowych wydarzeń edukacyjnych
- Grupy konsumentów do przetwarzania równoległego i odpornego na błędy
- Wczesne wykrywanie uczniów zagrożonych porzuceniem nauki
- Agregacje w czasie rzeczywistym za pomocą strumieni Kafka
- Panel analityczny dla nauczycieli z kluczowymi wskaźnikami
- RODO i prywatność w gromadzeniu danych edukacyjnych
1. xAPI: Standard danych edukacyjnych
xAPI (Experience API, zwane także „Tin Can API”) to międzynarodowy standard reprezentacja doświadczeń edukacyjnych. Pokonaj ograniczenia SCORM (które śledziło tylko „ukończone/nieukończone” w zamkniętym systemie LMS). do śledzenia wszelkich działań edukacyjnych, online lub offline, wewnątrz lub na zewnątrz z LMS, ze znormalizowanym i interoperacyjnym słownictwem.
Podstawowa struktura instrukcji xAPI jest prosta: Aktor (Kto), Czasownik (co on zrobił), Obiekt (o czym). Fakultatywnie: Wynik (z jakim skutkiem), Kontekst (w jakim kontekście) e Znacznik czasu. Taka struktura podmiot-czasownik-dopełnienie oraz intuicyjny i wystarczająco elastyczny, aby obsłużyć każdy scenariusz edukacyjny.
# xapi/statement_builder.py
from dataclasses import dataclass, field, asdict
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
import uuid
@dataclass
class XAPIActor:
"""Chi compie l'azione."""
mbox: str # mailto:user@example.com
name: str
objectType: str = "Agent"
@dataclass
class XAPIVerb:
"""Cosa ha fatto l'attore. Usa vocabolari standardizzati (ADL, TinCan)."""
id: str # URI del verbo, es: http://adlnet.gov/expapi/verbs/completed
display: Dict[str, str] # {"en-US": "completed", "it-IT": "completato"}
@dataclass
class XAPIActivity:
"""Su cosa e stata compiuta l'azione."""
id: str # URI univoca dell'oggetto
objectType: str = "Activity"
definition: Optional[Dict] = None
@dataclass
class XAPIResult:
"""Risultato dell'azione (opzionale)."""
score: Optional[Dict] = None # {"scaled": 0.85, "raw": 85, "min": 0, "max": 100}
success: Optional[bool] = None
completion: Optional[bool] = None
duration: Optional[str] = None # ISO 8601: "PT5M30S" = 5 minuti 30 secondi
response: Optional[str] = None # Risposta testuale dello studente
@dataclass
class XAPIContext:
"""Contesto aggiuntivo."""
platform: str = "EdTech Platform"
language: str = "it-IT"
contextActivities: Optional[Dict] = None # Gerarchia: course -> module -> lesson
extensions: Optional[Dict] = None # Dati custom (device, browser, etc.)
@dataclass
class XAPIStatement:
actor: XAPIActor
verb: XAPIVerb
object: XAPIActivity
id: str = field(default_factory=lambda: str(uuid.uuid4()))
result: Optional[XAPIResult] = None
context: Optional[XAPIContext] = None
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
stored: Optional[str] = None
version: str = "1.0.3"
def to_dict(self) -> Dict:
return asdict(self)
# Verbi xAPI standard (ADL vocabulary)
XAPI_VERBS = {
"completed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/completed",
display={"en-US": "completed", "it-IT": "completato"},
),
"attempted": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/attempted",
display={"en-US": "attempted", "it-IT": "tentato"},
),
"passed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/passed",
display={"en-US": "passed", "it-IT": "superato"},
),
"failed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/failed",
display={"en-US": "failed", "it-IT": "fallito"},
),
"experienced": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/experienced",
display={"en-US": "experienced", "it-IT": "esperienzato"},
),
"asked": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/asked",
display={"en-US": "asked", "it-IT": "chiesto"},
),
"interacted": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/interacted",
display={"en-US": "interacted", "it-IT": "interagito"},
),
}
class StatementBuilder:
"""Builder per statement xAPI con validazione."""
def __init__(self, platform_url: str, tenant_id: str):
self.platform_url = platform_url
self.tenant_id = tenant_id
def lesson_completed(
self,
student_id: str,
student_email: str,
lesson_id: str,
lesson_title: str,
course_id: str,
duration_seconds: int,
score_percent: Optional[float] = None,
) -> XAPIStatement:
return XAPIStatement(
actor=XAPIActor(
mbox=f"mailto:{student_email}",
name=student_id,
),
verb=XAPI_VERBS["completed"],
object=XAPIActivity(
id=f"{self.platform_url}/activities/{lesson_id}",
definition={
"name": {"it-IT": lesson_title},
"type": "http://adlnet.gov/expapi/activities/lesson",
},
),
result=XAPIResult(
completion=True,
duration=f"PT{duration_seconds}S",
score={"scaled": score_percent / 100, "raw": score_percent, "min": 0, "max": 100} if score_percent else None,
),
context=XAPIContext(
contextActivities={
"parent": [{"id": f"{self.platform_url}/activities/course/{course_id}"}],
"grouping": [{"id": f"{self.platform_url}/activities/tenant/{self.tenant_id}"}],
},
extensions={
"https://schema.example.com/extensions/student_id": student_id,
"https://schema.example.com/extensions/tenant_id": self.tenant_id,
},
),
)
def quiz_answered(
self,
student_id: str,
student_email: str,
question_id: str,
question_text: str,
student_response: str,
correct: bool,
time_spent_seconds: int,
) -> XAPIStatement:
verb = XAPI_VERBS["passed"] if correct else XAPI_VERBS["failed"]
return XAPIStatement(
actor=XAPIActor(mbox=f"mailto:{student_email}", name=student_id),
verb=verb,
object=XAPIActivity(
id=f"{self.platform_url}/activities/question/{question_id}",
definition={
"name": {"it-IT": question_text[:100]},
"type": "http://adlnet.gov/expapi/activities/cmi.interaction",
},
),
result=XAPIResult(
success=correct,
response=student_response[:500], # Tronca per privacy
duration=f"PT{time_spent_seconds}S",
),
)
2. Rurociąg Kafki dla wydarzeń edukacyjnych
Ponieważ miliony studentów generują wydarzenia jednocześnie, potrzebujemy system powiadamiania gwarantujący: wysoka dostępność, gwarantowane sortowanie na ucznia (wszystkie wydarzenia tego samego uczeń musi być przetwarzany w kolejności), e powtórna rozgrywka (może przerobić wydarzenia historyczne, gdy zmienimy logikę analizy). Apache Kafka spełnia wszystkie te wymagania i jest standardowym wyborem w produkcji w przypadku potoków zdarzeń o dużej liczbie zdarzeń.
# kafka/producer.py
import json
import logging
from typing import Optional
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
logger = logging.getLogger(__name__)
class LearningEventProducer:
"""
Producer Kafka per eventi xAPI.
Usa la chiave di partizione student_id per garantire ordine per studente.
"""
TOPIC = "edtech.xapi.statements"
def __init__(self, bootstrap_servers: str):
self.bootstrap_servers = bootstrap_servers
self._producer: Optional[AIOKafkaProducer] = None
async def start(self):
self._producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None,
# Configurazione per affidabilità
acks="all", # Conferma da tutti i replica
enable_idempotence=True, # Exactly-once delivery
compression_type="gzip", # Compressione per ridurre banda
max_batch_size=64 * 1024, # 64KB batch
linger_ms=10, # Aspetta 10ms per batch più grandi
)
await self._producer.start()
logger.info(f"Kafka producer started, topic: {self.TOPIC}")
async def stop(self):
if self._producer:
await self._producer.stop()
async def send_statement(self, statement: "XAPIStatement") -> bool:
"""
Invia uno statement xAPI a Kafka.
La partition key e lo student_id per garantire ordine per studente.
"""
if not self._producer:
raise RuntimeError("Producer non avviato. Chiama start() prima.")
statement_dict = statement.to_dict()
student_id = statement_dict["actor"]["name"]
try:
await self._producer.send_and_wait(
topic=self.TOPIC,
key=student_id, # Partition key: stesso studente -> stessa partizione
value=statement_dict,
headers=[
("content-type", b"application/json"),
("schema-version", b"1.0.3"),
],
)
return True
except KafkaError as e:
logger.error(f"Errore invio statement Kafka: {e}", exc_info=True)
return False
async def send_batch(self, statements: list) -> int:
"""Invia un batch di statement. Ritorna il numero di statement inviati con successo."""
success_count = 0
async with self._producer.transaction():
for stmt in statements:
if await self.send_statement(stmt):
success_count += 1
return success_count
# kafka/consumer.py
import asyncio
import json
import logging
from typing import Callable, Awaitable
from aiokafka import AIOKafkaConsumer
logger = logging.getLogger(__name__)
class LearningEventConsumer:
"""
Consumer Kafka per elaborazione statement xAPI.
Consumer groups per elaborazione parallela e fault-tolerant.
"""
def __init__(
self,
bootstrap_servers: str,
group_id: str,
handler: Callable[[dict], Awaitable[None]],
):
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.handler = handler
self._consumer: Optional[AIOKafkaConsumer] = None
self._running = False
async def start(self, topics: list = None):
topics = topics or ["edtech.xapi.statements"]
self._consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
auto_offset_reset="earliest",
enable_auto_commit=False, # Commit manuale per at-least-once
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
max_poll_records=100, # Batch di 100 messaggi per poll
)
await self._consumer.start()
self._running = True
logger.info(f"Consumer {self.group_id} avviato su {topics}")
async def run(self):
"""Loop principale di consumo messaggi."""
if not self._consumer:
raise RuntimeError("Consumer non avviato.")
try:
async for message in self._consumer:
try:
await self.handler(message.value)
# Commit dopo elaborazione con successo
await self._consumer.commit()
except Exception as e:
logger.error(
f"Errore elaborazione messaggio offset {message.offset}: {e}",
exc_info=True,
)
# Non committare: il messaggio verrà rielaborato
# In produzione: invia a Dead Letter Queue dopo N tentativi
finally:
await self._consumer.stop()
3. Wczesne wykrywanie: uczniowie zagrożeni porzuceniem nauki
Najważniejszym wskaźnikiem dla platformy EdTech nie jest liczba rejestracji, ale wskaźnik ukończenia. Wcześnie identyfikuj uczniów ryzyko porzucenia (przewidywanie porzucenia) umożliwia interweniowanie za pomocą spersonalizowanych powiadomień, wsparcie tutorskie lub modyfikacja ścieżki uczenia się. Korzystamy ze strumieni Kafka do obliczania sygnałów ryzyka w czasie rzeczywistym.
# analytics/dropout_detector.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class DropoutRiskSignal:
student_id: str
course_id: str
risk_level: RiskLevel
risk_score: float # 0.0 - 1.0
contributing_factors: List[str]
last_activity: Optional[datetime]
recommended_action: str
calculated_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class StudentEngagementMetrics:
student_id: str
course_id: str
days_since_last_activity: int
completion_rate: float # 0.0 - 1.0
avg_quiz_score: float # 0.0 - 100.0
quiz_failure_rate: float # 0.0 - 1.0
avg_session_duration_minutes: float
total_sessions_last_30d: int
video_rewatch_rate: float # Quante volte rivede video (difficolta?)
help_requests_last_7d: int
class DropoutRiskDetector:
"""
Calcola il rischio di abbandono basato su un modello a regole.
In produzione: sostituisci con un modello ML addestrato su dati storici.
"""
RISK_THRESHOLDS = {
RiskLevel.LOW: (0.0, 0.3),
RiskLevel.MEDIUM: (0.3, 0.6),
RiskLevel.HIGH: (0.6, 0.8),
RiskLevel.CRITICAL: (0.8, 1.0),
}
RECOMMENDED_ACTIONS = {
RiskLevel.LOW: "Monitoraggio routinario. Nessuna azione immediata.",
RiskLevel.MEDIUM: "Invia notifica push motivazionale. Suggerisci contenuto più facile.",
RiskLevel.HIGH: "Contatta il tutor del corso. Proponi sessione di supporto 1:1.",
RiskLevel.CRITICAL: "Escalation immediata: email al responsabile del corso e allo studente.",
}
def calculate_risk(self, metrics: StudentEngagementMetrics) -> DropoutRiskSignal:
score = 0.0
factors = []
# Fattore 1: Inattivita recente (peso: 35%)
inactivity_score = self._score_inactivity(metrics.days_since_last_activity)
score += inactivity_score * 0.35
if inactivity_score > 0.5:
factors.append(f"Inattivo da {metrics.days_since_last_activity} giorni")
# Fattore 2: Progresso nel corso (peso: 25%)
progress_score = 1.0 - metrics.completion_rate
score += progress_score * 0.25
if progress_score > 0.7:
factors.append(f"Progresso corso: solo {metrics.completion_rate:.0%}")
# Fattore 3: Performance ai quiz (peso: 20%)
quiz_score = self._score_quiz_performance(metrics.avg_quiz_score, metrics.quiz_failure_rate)
score += quiz_score * 0.20
if quiz_score > 0.6:
factors.append(f"Media quiz bassa: {metrics.avg_quiz_score:.1f}/100")
# Fattore 4: Riduzione sessioni (peso: 15%)
session_score = 1.0 - min(metrics.total_sessions_last_30d / 10, 1.0)
score += session_score * 0.15
if metrics.total_sessions_last_30d < 3:
factors.append(f"Solo {metrics.total_sessions_last_30d} sessioni in 30 giorni")
# Fattore 5: Segnali di frustrazione (peso: 5%)
frustration_score = min(metrics.video_rewatch_rate + metrics.help_requests_last_7d * 0.1, 1.0)
score += frustration_score * 0.05
if metrics.video_rewatch_rate > 2.0:
factors.append("Alto numero di revisioni video (possibile difficolta)")
score = min(max(score, 0.0), 1.0)
risk_level = self._score_to_level(score)
return DropoutRiskSignal(
student_id=metrics.student_id,
course_id=metrics.course_id,
risk_level=risk_level,
risk_score=score,
contributing_factors=factors,
last_activity=None, # Iniettare datetime reale
recommended_action=self.RECOMMENDED_ACTIONS[risk_level],
)
def _score_inactivity(self, days: int) -> float:
if days <= 1: return 0.0
if days <= 3: return 0.2
if days <= 7: return 0.5
if days <= 14: return 0.8
return 1.0
def _score_quiz_performance(self, avg_score: float, failure_rate: float) -> float:
score_component = max(0, (60 - avg_score) / 60) # Baseline 60%
failure_component = min(failure_rate * 1.5, 1.0)
return (score_component + failure_component) / 2
def _score_to_level(self, score: float) -> RiskLevel:
for level, (low, high) in self.RISK_THRESHOLDS.items():
if low <= score < high:
return level
return RiskLevel.CRITICAL
4. Agregacje w czasie rzeczywistym ze strumieniami Kafki
Kafka Streams umożliwia przetwarzanie zdarzeń w czasie rzeczywistym bezpośrednio w klastrze Kafka,
bez dodatkowej infrastruktury. Korzystamy z biblioteki Python faust
do obliczania zbiorczych wskaźników w 1-godzinnym oknie czasowym.
# analytics/streaming_aggregator.py
import faust
from datetime import datetime, timedelta
from typing import Optional
app = faust.App(
"edtech-analytics",
broker="kafka://localhost:9092",
value_serializer="json",
)
# Topic di input
xapi_topic = app.topic("edtech.xapi.statements")
# Topic di output per dashboard
course_metrics_topic = app.topic("edtech.analytics.course-metrics")
dropout_alerts_topic = app.topic("edtech.analytics.dropout-alerts")
class CourseMetrics(faust.Record):
course_id: str
window_start: str
window_end: str
total_events: int = 0
unique_students: int = 0
lessons_completed: int = 0
quizzes_passed: int = 0
quizzes_failed: int = 0
avg_quiz_score: float = 0.0
# Tabella aggregata per finestra di 1 ora
course_hourly_table = app.Table(
"course-hourly-metrics",
default=dict,
partitions=8,
)
student_activity_table = app.Table(
"student-activity",
default=dict,
partitions=8,
)
@app.agent(xapi_topic)
async def process_xapi_statement(statements):
"""Processa ogni statement xAPI e aggiorna le metriche aggregate."""
async for statement in statements:
actor = statement.get("actor", {})
verb = statement.get("verb", {}).get("id", "")
result = statement.get("result", {}) or {}
context = statement.get("context", {}) or {}
student_id = actor.get("name", "unknown")
course_id = _extract_course_id(context)
timestamp = statement.get("timestamp", "")
if not course_id:
continue
# Chiave finestra oraria
hour_key = f"{course_id}:{timestamp[:13]}" # ISO troncato all'ora
# Aggiorna metriche corso
metrics = course_hourly_table[hour_key]
metrics["total_events"] = metrics.get("total_events", 0) + 1
metrics.setdefault("students", set())
metrics["students"].add(student_id)
verb_local = verb.split("/")[-1] # Prendi solo il nome del verbo
if verb_local == "completed":
metrics["lessons_completed"] = metrics.get("lessons_completed", 0) + 1
elif verb_local == "passed":
metrics["quizzes_passed"] = metrics.get("quizzes_passed", 0) + 1
score = result.get("score", {}).get("raw", 0)
prev_avg = metrics.get("avg_quiz_score", 0.0)
prev_count = metrics.get("quiz_count", 0)
metrics["avg_quiz_score"] = (prev_avg * prev_count + score) / (prev_count + 1)
metrics["quiz_count"] = prev_count + 1
elif verb_local == "failed":
metrics["quizzes_failed"] = metrics.get("quizzes_failed", 0) + 1
course_hourly_table[hour_key] = metrics
# Aggiorna attivita studente per dropout detection
student_key = f"{student_id}:{course_id}"
student_data = student_activity_table[student_key]
student_data["last_activity"] = timestamp
student_data["event_count"] = student_data.get("event_count", 0) + 1
student_activity_table[student_key] = student_data
# Emetti metriche aggregate ogni 100 eventi
if metrics["total_events"] % 100 == 0:
await course_metrics_topic.send(
key=course_id,
value={
"course_id": course_id,
"hour_key": hour_key,
"total_events": metrics["total_events"],
"unique_students": len(metrics.get("students", set())),
"lessons_completed": metrics.get("lessons_completed", 0),
"quizzes_passed": metrics.get("quizzes_passed", 0),
"quizzes_failed": metrics.get("quizzes_failed", 0),
"avg_quiz_score": metrics.get("avg_quiz_score", 0.0),
},
)
def _extract_course_id(context: dict) -> Optional[str]:
"""Estrae il course_id dal contesto xAPI."""
parent_activities = context.get("contextActivities", {}).get("parent", [])
for activity in parent_activities:
activity_id = activity.get("id", "")
if "/activities/course/" in activity_id:
return activity_id.split("/activities/course/")[-1]
return None
5. Pulpit nauczyciela: kluczowe wskaźniki
Zebrane dane muszą stać się użytecznymi spostrzeżeniami dla nauczycieli. Dobra deska rozdzielcza analityki uczenia się musi pokazywać nie tylko to, co się dzieje (opisowy), ale także dlaczego (diagnostyczny) i co uczeń zrobi (proroczy).
# api/analytics_dashboard.py
from fastapi import FastAPI, Depends, Query
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime, date, timedelta
class CourseOverview(BaseModel):
course_id: str
course_name: str
total_enrolled: int
active_last_7d: int
completion_rate: float
avg_quiz_score: float
at_risk_count: int
dropout_rate_30d: float
class StudentProgressDetail(BaseModel):
student_id: str
student_name: str
enrollment_date: date
completion_percent: float
avg_quiz_score: float
streak_days: int
days_since_last_activity: int
risk_level: str
risk_score: float
class ContentDifficultyReport(BaseModel):
lesson_id: str
lesson_title: str
avg_time_spent_minutes: float
rewatch_rate: float
quit_rate: float # % studenti che hanno abbandonato durante questo contenuto
avg_quiz_score_after: float
difficulty_index: float # Calcolato: alto = contenuto difficile
app = FastAPI(title="Learning Analytics Dashboard API")
@app.get("/api/analytics/courses/{course_id}/overview", response_model=CourseOverview)
async def get_course_overview(course_id: str, db=Depends(get_db)):
"""Panoramica corso con metriche chiave per il dashboard insegnante."""
row = await db.execute("""
SELECT
c.name,
COUNT(DISTINCT e.student_id) as enrolled,
COUNT(DISTINCT CASE WHEN a.last_activity > NOW() - INTERVAL '7 days' THEN a.student_id END) as active_7d,
AVG(CASE WHEN e.completed THEN 1.0 ELSE 0.0 END) as completion_rate,
AVG(q.avg_score) as avg_quiz,
COUNT(DISTINCT CASE WHEN r.risk_level IN ('high','critical') THEN r.student_id END) as at_risk,
COUNT(DISTINCT CASE WHEN e.dropped_out AND e.dropout_date > NOW() - INTERVAL '30 days' THEN e.student_id END)::float /
NULLIF(COUNT(DISTINCT CASE WHEN e.enrollment_date < NOW() - INTERVAL '30 days' THEN e.student_id END), 0) as dropout_rate
FROM courses c
LEFT JOIN enrollments e ON c.id = e.course_id
LEFT JOIN student_activity a ON e.student_id = a.student_id AND a.course_id = c.id
LEFT JOIN student_quiz_stats q ON e.student_id = q.student_id AND q.course_id = c.id
LEFT JOIN dropout_risk r ON e.student_id = r.student_id AND r.course_id = c.id
WHERE c.id = :cid
GROUP BY c.id, c.name
""", {"cid": course_id})
data = row.fetchone()
return CourseOverview(
course_id=course_id,
course_name=data[0],
total_enrolled=data[1] or 0,
active_last_7d=data[2] or 0,
completion_rate=float(data[3] or 0),
avg_quiz_score=float(data[4] or 0),
at_risk_count=data[5] or 0,
dropout_rate_30d=float(data[6] or 0),
)
@app.get("/api/analytics/courses/{course_id}/at-risk", response_model=List[StudentProgressDetail])
async def get_at_risk_students(
course_id: str,
risk_level: Optional[str] = Query(None, description="Filter by risk: low, medium, high, critical"),
db=Depends(get_db),
):
"""Lista studenti a rischio abbandono con dettagli progressione."""
query = """
SELECT
s.id, s.name,
e.enrollment_date,
COALESCE(prog.completion_percent, 0) as completion_percent,
COALESCE(qs.avg_score, 0) as avg_quiz_score,
COALESCE(str.current_streak, 0) as streak_days,
EXTRACT(DAY FROM NOW() - a.last_activity)::int as days_inactive,
r.risk_level,
r.risk_score
FROM students s
JOIN enrollments e ON s.id = e.student_id AND e.course_id = :cid
JOIN dropout_risk r ON s.id = r.student_id AND r.course_id = :cid
LEFT JOIN course_progress prog ON s.id = prog.student_id AND prog.course_id = :cid
LEFT JOIN student_quiz_stats qs ON s.id = qs.student_id AND qs.course_id = :cid
LEFT JOIN student_streaks str ON s.id = str.student_id
LEFT JOIN student_activity a ON s.id = a.student_id AND a.course_id = :cid
WHERE (:risk IS NULL OR r.risk_level = :risk)
ORDER BY r.risk_score DESC
LIMIT 100
"""
rows = (await db.execute(query, {"cid": course_id, "risk": risk_level})).fetchall()
return [
StudentProgressDetail(
student_id=r[0], student_name=r[1], enrollment_date=r[2],
completion_percent=float(r[3]), avg_quiz_score=float(r[4]),
streak_days=r[5], days_since_last_activity=r[6],
risk_level=r[7], risk_score=float(r[8]),
)
for r in rows
]
6. RODO i prywatność w analityce uczenia się
Dane edukacyjne to wrażliwe dane osobowe. Zbieraj i analizuj Interakcje uczniów wymagają zgodności z RODO, a w przypadku osób niepełnoletnich do konkretnego ustawodawstwa (COPPA w USA, europejska dyrektywa o ochronie nieletnich). Podstawowe zasady: minimalizacja danych (tylko zbieraj w razie potrzeby), anonimizacja dla raportów zbiorczych, prawo do bycia zapomnianym (możliwość usunięcia wszystkich danych ucznia) tj przezroczystość (jasne informacje o tym, co jest gromadzone i dlaczego).
Lista kontrolna RODO dotycząca analizy uczenia się
- Wyraźna zgoda przed zebraniem szczegółowych danych behawioralnych
- Anonimizuj student_id w raportach zbiorczych (nie uwzględniaj danych osobowych)
- Zaimplementuj punkt końcowy DELETE /students/{id}/analytics-data, aby uzyskać prawo do bycia zapomnianym
- Dane pochodzące od nieletnich uczniów wymagają zgody rodziców
- Polityka przechowywania danych: określ, jak długo będziesz przechowywać surowe zdarzenia (np. 2 lata)
- Dziennik kontroli osób uzyskujących dostęp do analiz uczniów
- Szyfrowanie danych przechowywanych i przesyłanych
- Nie udostępniaj danych osobom trzecim bez wyraźnej zgody
Wnioski i dalsze kroki
Zbudowaliśmy kompletny potok analityki uczenia się: instrukcje xAPI dla standaryzacja danych, Kafka do przetwarzania dużych ilości danych z gwarancjami dostarczanie, agregacje w czasie rzeczywistym ze strumieniami Fausta/Kafki, wykrywanie porzuceń w oparciu o sygnały behawioralne oraz API dashboardu dla nauczycieli.
Następnym krokiem jest integracja modelu ML wyszkolonego na danych historycznych zastąp oparty na regułach detektor porzucenia: z wystarczającą ilością danych, modele ML osiąga dokładność większą niż 85% we wczesnej identyfikacji uczniów z grupy ryzyka, 2–3 tygodnie przed faktycznym porzuceniem nauki.
W następnym artykule omówimy współpraca w czasie rzeczywistym na platformach EdTech: CRDT z Yjs do wspólnego edytowania i WebSocket do synchronizacji w czasie rzeczywistym udostępnianych dokumentów.
Seria inżynieryjna EdTech
- Skalowalna architektura LMS: wzorzec wielu najemców
- Algorytmy uczenia się adaptacyjnego: od teorii do produkcji
- Strumieniowe przesyłanie wideo dla edukacji: WebRTC vs HLS vs DASH
- Systemy AI Proctoring: przede wszystkim prywatność dzięki wizji komputerowej
- Spersonalizowany nauczyciel z LLM: RAG dla uziemienia wiedzy
- Silnik grywalizacji: architektura i maszyna stanu
- Learning Analytics: Data Pipeline z xAPI i Kafką (ten artykuł)
- Współpraca w czasie rzeczywistym w EdTech: CRDT i WebSocket
- EdTech zorientowany na urządzenia mobilne: architektura zorientowana na tryb offline
- Zarządzanie treścią dla wielu dzierżawców: wersjonowanie i SCORM







