Öğrenme Analitiği: xAPI ve Kafka ile Veri Hattı
Her tıklama, her yanlış cevap, her video duraklatılır ve her egzersiz yarım bırakılır öğrencinin öğrenme süreci hakkında önemli bir şey söyler. Platformlar Modern Eğitim Teknolojileri her gün bu etkinliklerin milyarlarcasını topluyor, ancak çoğu bunları bir kenara atar veya yalnızca önemsiz toplu istatistikler için kullanır: "X kursunun %70'i var tamamlanması". Bu, veri israfıdır ve kaçırılmış bir fırsattır.
Öğrenme Analitiği ve bu davranışsal verileri dönüştüren disiplin eyleme geçirilebilir içgörüyle: hangi öğrencilerin kursu bırakacağını tahmin edin, belirleyin En çok zorluğa neden olan içerikler, öğrenme yollarını gerçek zamanlı olarak optimize eder. Ancak bunu yapmak için iki temel teknik bileşene ihtiyacınız var: veri standartları birlikte çalışabilirliğe (xAPI) izin veren vealtyapı yönetebilen yüksek hacimli akış etkinlikleri (Apache Kafka).
Bu makalede nesilden itibaren eksiksiz bir öğrenme analitiği hattı oluşturacağız. istemcilerdeki xAPI ifadelerinin Kafka ile gerçek zamanlı alımına, işlenmesine Öğretmenlere yönelik kontrol panellerine kadar davranış kalıplarının tespiti için Flink ile.
Bu Makalede Neler Öğreneceksiniz?
- xAPI (Experience API / Tin Can): ifade yapısı ve en iyi uygulamalar
- Öğrenme Kayıt Deposu (LRS): mimari ve çözüm seçimi
- Yüksek hacimli eğitim etkinlikleri için Kafka hattı
- Paralel ve hataya dayanıklı işleme için tüketici grupları
- Okulu bırakma riski taşıyan öğrencilerin erken tespiti
- Kafka Streams ile gerçek zamanlı toplamalar
- Temel ölçümleri içeren öğretmen analitiği kontrol paneli
- Öğrenim verileri toplamada GDPR ve gizlilik
1. xAPI: Veri Öğrenme Standardı
xAPI ("Tin Can API" olarak da adlandırılan Deneyim API'si), öğrenme deneyimlerinin temsili. SCORM'un sınırlamalarının üstesinden gelin (kapalı bir ÖYS/LMS'de yalnızca "tamamlandı/tamamlanmadı" durumunu izleyen) çevrimiçi veya çevrimdışı, içeride veya dışarıda herhangi bir öğrenme etkinliğini izlemek için Standartlaştırılmış ve birlikte çalışılabilir bir kelime dağarcığına sahip bir ÖYS/ÖYS'den.
Bir xAPI ifadesinin temel yapısı basittir: Aktör (DSÖ), Fiil (ne yaptı), Nesne (ne hakkında). İsteğe bağlı olarak: Sonuç (hangi sonuçla), Bağlam (hangi bağlamda) e Zaman damgası. Bu özne-fiil-nesne yapısı ve herhangi bir eğitim senaryosunu kapsayacak kadar sezgisel ve esnektir.
# xapi/statement_builder.py
from dataclasses import dataclass, field, asdict
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
import uuid
@dataclass
class XAPIActor:
"""Chi compie l'azione."""
mbox: str # mailto:user@example.com
name: str
objectType: str = "Agent"
@dataclass
class XAPIVerb:
"""Cosa ha fatto l'attore. Usa vocabolari standardizzati (ADL, TinCan)."""
id: str # URI del verbo, es: http://adlnet.gov/expapi/verbs/completed
display: Dict[str, str] # {"en-US": "completed", "it-IT": "completato"}
@dataclass
class XAPIActivity:
"""Su cosa e stata compiuta l'azione."""
id: str # URI univoca dell'oggetto
objectType: str = "Activity"
definition: Optional[Dict] = None
@dataclass
class XAPIResult:
"""Risultato dell'azione (opzionale)."""
score: Optional[Dict] = None # {"scaled": 0.85, "raw": 85, "min": 0, "max": 100}
success: Optional[bool] = None
completion: Optional[bool] = None
duration: Optional[str] = None # ISO 8601: "PT5M30S" = 5 minuti 30 secondi
response: Optional[str] = None # Risposta testuale dello studente
@dataclass
class XAPIContext:
"""Contesto aggiuntivo."""
platform: str = "EdTech Platform"
language: str = "it-IT"
contextActivities: Optional[Dict] = None # Gerarchia: course -> module -> lesson
extensions: Optional[Dict] = None # Dati custom (device, browser, etc.)
@dataclass
class XAPIStatement:
actor: XAPIActor
verb: XAPIVerb
object: XAPIActivity
id: str = field(default_factory=lambda: str(uuid.uuid4()))
result: Optional[XAPIResult] = None
context: Optional[XAPIContext] = None
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
stored: Optional[str] = None
version: str = "1.0.3"
def to_dict(self) -> Dict:
return asdict(self)
# Verbi xAPI standard (ADL vocabulary)
XAPI_VERBS = {
"completed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/completed",
display={"en-US": "completed", "it-IT": "completato"},
),
"attempted": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/attempted",
display={"en-US": "attempted", "it-IT": "tentato"},
),
"passed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/passed",
display={"en-US": "passed", "it-IT": "superato"},
),
"failed": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/failed",
display={"en-US": "failed", "it-IT": "fallito"},
),
"experienced": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/experienced",
display={"en-US": "experienced", "it-IT": "esperienzato"},
),
"asked": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/asked",
display={"en-US": "asked", "it-IT": "chiesto"},
),
"interacted": XAPIVerb(
id="http://adlnet.gov/expapi/verbs/interacted",
display={"en-US": "interacted", "it-IT": "interagito"},
),
}
class StatementBuilder:
"""Builder per statement xAPI con validazione."""
def __init__(self, platform_url: str, tenant_id: str):
self.platform_url = platform_url
self.tenant_id = tenant_id
def lesson_completed(
self,
student_id: str,
student_email: str,
lesson_id: str,
lesson_title: str,
course_id: str,
duration_seconds: int,
score_percent: Optional[float] = None,
) -> XAPIStatement:
return XAPIStatement(
actor=XAPIActor(
mbox=f"mailto:{student_email}",
name=student_id,
),
verb=XAPI_VERBS["completed"],
object=XAPIActivity(
id=f"{self.platform_url}/activities/{lesson_id}",
definition={
"name": {"it-IT": lesson_title},
"type": "http://adlnet.gov/expapi/activities/lesson",
},
),
result=XAPIResult(
completion=True,
duration=f"PT{duration_seconds}S",
score={"scaled": score_percent / 100, "raw": score_percent, "min": 0, "max": 100} if score_percent else None,
),
context=XAPIContext(
contextActivities={
"parent": [{"id": f"{self.platform_url}/activities/course/{course_id}"}],
"grouping": [{"id": f"{self.platform_url}/activities/tenant/{self.tenant_id}"}],
},
extensions={
"https://schema.example.com/extensions/student_id": student_id,
"https://schema.example.com/extensions/tenant_id": self.tenant_id,
},
),
)
def quiz_answered(
self,
student_id: str,
student_email: str,
question_id: str,
question_text: str,
student_response: str,
correct: bool,
time_spent_seconds: int,
) -> XAPIStatement:
verb = XAPI_VERBS["passed"] if correct else XAPI_VERBS["failed"]
return XAPIStatement(
actor=XAPIActor(mbox=f"mailto:{student_email}", name=student_id),
verb=verb,
object=XAPIActivity(
id=f"{self.platform_url}/activities/question/{question_id}",
definition={
"name": {"it-IT": question_text[:100]},
"type": "http://adlnet.gov/expapi/activities/cmi.interaction",
},
),
result=XAPIResult(
success=correct,
response=student_response[:500], # Tronca per privacy
duration=f"PT{time_spent_seconds}S",
),
)
2. Eğitim Etkinlikleri için Kafka Boru Hattı
Milyonlarca öğrencinin aynı anda etkinlik oluşturması nedeniyle, Aşağıdakileri garanti eden mesajlaşma sistemi: yüksek kullanılabilirlik, öğrenci başına garantili sıralama (aynı olayın tüm olayları öğrenci sırasıyla işlenmelidir), e tekrar oynatma (yapabilir analiz mantığını değiştirdiğimizde tarihsel olayları yeniden çalışın). Apache Kafka tüm bu gereksinimleri karşılar ve üretimde standart seçimdir yüksek hacimli etkinlik hatları için.
# kafka/producer.py
import json
import logging
from typing import Optional
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
logger = logging.getLogger(__name__)
class LearningEventProducer:
"""
Producer Kafka per eventi xAPI.
Usa la chiave di partizione student_id per garantire ordine per studente.
"""
TOPIC = "edtech.xapi.statements"
def __init__(self, bootstrap_servers: str):
self.bootstrap_servers = bootstrap_servers
self._producer: Optional[AIOKafkaProducer] = None
async def start(self):
self._producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None,
# Configurazione per affidabilità
acks="all", # Conferma da tutti i replica
enable_idempotence=True, # Exactly-once delivery
compression_type="gzip", # Compressione per ridurre banda
max_batch_size=64 * 1024, # 64KB batch
linger_ms=10, # Aspetta 10ms per batch più grandi
)
await self._producer.start()
logger.info(f"Kafka producer started, topic: {self.TOPIC}")
async def stop(self):
if self._producer:
await self._producer.stop()
async def send_statement(self, statement: "XAPIStatement") -> bool:
"""
Invia uno statement xAPI a Kafka.
La partition key e lo student_id per garantire ordine per studente.
"""
if not self._producer:
raise RuntimeError("Producer non avviato. Chiama start() prima.")
statement_dict = statement.to_dict()
student_id = statement_dict["actor"]["name"]
try:
await self._producer.send_and_wait(
topic=self.TOPIC,
key=student_id, # Partition key: stesso studente -> stessa partizione
value=statement_dict,
headers=[
("content-type", b"application/json"),
("schema-version", b"1.0.3"),
],
)
return True
except KafkaError as e:
logger.error(f"Errore invio statement Kafka: {e}", exc_info=True)
return False
async def send_batch(self, statements: list) -> int:
"""Invia un batch di statement. Ritorna il numero di statement inviati con successo."""
success_count = 0
async with self._producer.transaction():
for stmt in statements:
if await self.send_statement(stmt):
success_count += 1
return success_count
# kafka/consumer.py
import asyncio
import json
import logging
from typing import Callable, Awaitable
from aiokafka import AIOKafkaConsumer
logger = logging.getLogger(__name__)
class LearningEventConsumer:
"""
Consumer Kafka per elaborazione statement xAPI.
Consumer groups per elaborazione parallela e fault-tolerant.
"""
def __init__(
self,
bootstrap_servers: str,
group_id: str,
handler: Callable[[dict], Awaitable[None]],
):
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.handler = handler
self._consumer: Optional[AIOKafkaConsumer] = None
self._running = False
async def start(self, topics: list = None):
topics = topics or ["edtech.xapi.statements"]
self._consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
auto_offset_reset="earliest",
enable_auto_commit=False, # Commit manuale per at-least-once
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
max_poll_records=100, # Batch di 100 messaggi per poll
)
await self._consumer.start()
self._running = True
logger.info(f"Consumer {self.group_id} avviato su {topics}")
async def run(self):
"""Loop principale di consumo messaggi."""
if not self._consumer:
raise RuntimeError("Consumer non avviato.")
try:
async for message in self._consumer:
try:
await self.handler(message.value)
# Commit dopo elaborazione con successo
await self._consumer.commit()
except Exception as e:
logger.error(
f"Errore elaborazione messaggio offset {message.offset}: {e}",
exc_info=True,
)
# Non committare: il messaggio verrà rielaborato
# In produzione: invia a Dead Letter Queue dopo N tentativi
finally:
await self._consumer.stop()
3. Erken Uyarı Tespiti: Okulu Bırakma Riski Altındaki Öğrenciler
Bir Eğitim Teknolojisi platformu için en önemli ölçüm kayıt sayısı değil, ama tamamlanma oranı. Öğrencileri erken tespit edin Terk edilme riski altında (bırakma tahmini) kişiselleştirilmiş bildirimlerle müdahale etmenize olanak tanır, öğretmen desteği veya öğrenme yolunun değiştirilmesi. Kafka Akışlarını kullanıyoruz Risk sinyallerini gerçek zamanlı olarak hesaplamak için.
# analytics/dropout_detector.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class DropoutRiskSignal:
student_id: str
course_id: str
risk_level: RiskLevel
risk_score: float # 0.0 - 1.0
contributing_factors: List[str]
last_activity: Optional[datetime]
recommended_action: str
calculated_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class StudentEngagementMetrics:
student_id: str
course_id: str
days_since_last_activity: int
completion_rate: float # 0.0 - 1.0
avg_quiz_score: float # 0.0 - 100.0
quiz_failure_rate: float # 0.0 - 1.0
avg_session_duration_minutes: float
total_sessions_last_30d: int
video_rewatch_rate: float # Quante volte rivede video (difficolta?)
help_requests_last_7d: int
class DropoutRiskDetector:
"""
Calcola il rischio di abbandono basato su un modello a regole.
In produzione: sostituisci con un modello ML addestrato su dati storici.
"""
RISK_THRESHOLDS = {
RiskLevel.LOW: (0.0, 0.3),
RiskLevel.MEDIUM: (0.3, 0.6),
RiskLevel.HIGH: (0.6, 0.8),
RiskLevel.CRITICAL: (0.8, 1.0),
}
RECOMMENDED_ACTIONS = {
RiskLevel.LOW: "Monitoraggio routinario. Nessuna azione immediata.",
RiskLevel.MEDIUM: "Invia notifica push motivazionale. Suggerisci contenuto più facile.",
RiskLevel.HIGH: "Contatta il tutor del corso. Proponi sessione di supporto 1:1.",
RiskLevel.CRITICAL: "Escalation immediata: email al responsabile del corso e allo studente.",
}
def calculate_risk(self, metrics: StudentEngagementMetrics) -> DropoutRiskSignal:
score = 0.0
factors = []
# Fattore 1: Inattivita recente (peso: 35%)
inactivity_score = self._score_inactivity(metrics.days_since_last_activity)
score += inactivity_score * 0.35
if inactivity_score > 0.5:
factors.append(f"Inattivo da {metrics.days_since_last_activity} giorni")
# Fattore 2: Progresso nel corso (peso: 25%)
progress_score = 1.0 - metrics.completion_rate
score += progress_score * 0.25
if progress_score > 0.7:
factors.append(f"Progresso corso: solo {metrics.completion_rate:.0%}")
# Fattore 3: Performance ai quiz (peso: 20%)
quiz_score = self._score_quiz_performance(metrics.avg_quiz_score, metrics.quiz_failure_rate)
score += quiz_score * 0.20
if quiz_score > 0.6:
factors.append(f"Media quiz bassa: {metrics.avg_quiz_score:.1f}/100")
# Fattore 4: Riduzione sessioni (peso: 15%)
session_score = 1.0 - min(metrics.total_sessions_last_30d / 10, 1.0)
score += session_score * 0.15
if metrics.total_sessions_last_30d < 3:
factors.append(f"Solo {metrics.total_sessions_last_30d} sessioni in 30 giorni")
# Fattore 5: Segnali di frustrazione (peso: 5%)
frustration_score = min(metrics.video_rewatch_rate + metrics.help_requests_last_7d * 0.1, 1.0)
score += frustration_score * 0.05
if metrics.video_rewatch_rate > 2.0:
factors.append("Alto numero di revisioni video (possibile difficolta)")
score = min(max(score, 0.0), 1.0)
risk_level = self._score_to_level(score)
return DropoutRiskSignal(
student_id=metrics.student_id,
course_id=metrics.course_id,
risk_level=risk_level,
risk_score=score,
contributing_factors=factors,
last_activity=None, # Iniettare datetime reale
recommended_action=self.RECOMMENDED_ACTIONS[risk_level],
)
def _score_inactivity(self, days: int) -> float:
if days <= 1: return 0.0
if days <= 3: return 0.2
if days <= 7: return 0.5
if days <= 14: return 0.8
return 1.0
def _score_quiz_performance(self, avg_score: float, failure_rate: float) -> float:
score_component = max(0, (60 - avg_score) / 60) # Baseline 60%
failure_component = min(failure_rate * 1.5, 1.0)
return (score_component + failure_component) / 2
def _score_to_level(self, score: float) -> RiskLevel:
for level, (low, high) in self.RISK_THRESHOLDS.items():
if low <= score < high:
return level
return RiskLevel.CRITICAL
4. Kafka Akışlarıyla Gerçek Zamanlı Toplamalar
Kafka Akışları, gerçek zamanlı olayları doğrudan Kafka kümesinde işlemenize olanak tanır,
ek altyapı olmadan. Python kütüphanesini kullanıyoruz faust
1 saatlik bir zaman aralığı boyunca toplu metrikleri hesaplamak için.
# analytics/streaming_aggregator.py
import faust
from datetime import datetime, timedelta
from typing import Optional
app = faust.App(
"edtech-analytics",
broker="kafka://localhost:9092",
value_serializer="json",
)
# Topic di input
xapi_topic = app.topic("edtech.xapi.statements")
# Topic di output per dashboard
course_metrics_topic = app.topic("edtech.analytics.course-metrics")
dropout_alerts_topic = app.topic("edtech.analytics.dropout-alerts")
class CourseMetrics(faust.Record):
course_id: str
window_start: str
window_end: str
total_events: int = 0
unique_students: int = 0
lessons_completed: int = 0
quizzes_passed: int = 0
quizzes_failed: int = 0
avg_quiz_score: float = 0.0
# Tabella aggregata per finestra di 1 ora
course_hourly_table = app.Table(
"course-hourly-metrics",
default=dict,
partitions=8,
)
student_activity_table = app.Table(
"student-activity",
default=dict,
partitions=8,
)
@app.agent(xapi_topic)
async def process_xapi_statement(statements):
"""Processa ogni statement xAPI e aggiorna le metriche aggregate."""
async for statement in statements:
actor = statement.get("actor", {})
verb = statement.get("verb", {}).get("id", "")
result = statement.get("result", {}) or {}
context = statement.get("context", {}) or {}
student_id = actor.get("name", "unknown")
course_id = _extract_course_id(context)
timestamp = statement.get("timestamp", "")
if not course_id:
continue
# Chiave finestra oraria
hour_key = f"{course_id}:{timestamp[:13]}" # ISO troncato all'ora
# Aggiorna metriche corso
metrics = course_hourly_table[hour_key]
metrics["total_events"] = metrics.get("total_events", 0) + 1
metrics.setdefault("students", set())
metrics["students"].add(student_id)
verb_local = verb.split("/")[-1] # Prendi solo il nome del verbo
if verb_local == "completed":
metrics["lessons_completed"] = metrics.get("lessons_completed", 0) + 1
elif verb_local == "passed":
metrics["quizzes_passed"] = metrics.get("quizzes_passed", 0) + 1
score = result.get("score", {}).get("raw", 0)
prev_avg = metrics.get("avg_quiz_score", 0.0)
prev_count = metrics.get("quiz_count", 0)
metrics["avg_quiz_score"] = (prev_avg * prev_count + score) / (prev_count + 1)
metrics["quiz_count"] = prev_count + 1
elif verb_local == "failed":
metrics["quizzes_failed"] = metrics.get("quizzes_failed", 0) + 1
course_hourly_table[hour_key] = metrics
# Aggiorna attivita studente per dropout detection
student_key = f"{student_id}:{course_id}"
student_data = student_activity_table[student_key]
student_data["last_activity"] = timestamp
student_data["event_count"] = student_data.get("event_count", 0) + 1
student_activity_table[student_key] = student_data
# Emetti metriche aggregate ogni 100 eventi
if metrics["total_events"] % 100 == 0:
await course_metrics_topic.send(
key=course_id,
value={
"course_id": course_id,
"hour_key": hour_key,
"total_events": metrics["total_events"],
"unique_students": len(metrics.get("students", set())),
"lessons_completed": metrics.get("lessons_completed", 0),
"quizzes_passed": metrics.get("quizzes_passed", 0),
"quizzes_failed": metrics.get("quizzes_failed", 0),
"avg_quiz_score": metrics.get("avg_quiz_score", 0.0),
},
)
def _extract_course_id(context: dict) -> Optional[str]:
"""Estrae il course_id dal contesto xAPI."""
parent_activities = context.get("contextActivities", {}).get("parent", [])
for activity in parent_activities:
activity_id = activity.get("id", "")
if "/activities/course/" in activity_id:
return activity_id.split("/activities/course/")[-1]
return None
5. Öğretmen Kontrol Paneli: Temel Metrikler
Toplanan veriler öğretmenler için yararlı bilgiler haline gelmelidir. İyi bir kontrol paneli Öğrenme analitiğinin yalnızca ne olduğunu göstermemesi gerekir (tanımlayıcı), ama aynı zamanda neden (teşhis) ve öğrencinin ne yapacağı (öngörücü).
# api/analytics_dashboard.py
from fastapi import FastAPI, Depends, Query
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime, date, timedelta
class CourseOverview(BaseModel):
course_id: str
course_name: str
total_enrolled: int
active_last_7d: int
completion_rate: float
avg_quiz_score: float
at_risk_count: int
dropout_rate_30d: float
class StudentProgressDetail(BaseModel):
student_id: str
student_name: str
enrollment_date: date
completion_percent: float
avg_quiz_score: float
streak_days: int
days_since_last_activity: int
risk_level: str
risk_score: float
class ContentDifficultyReport(BaseModel):
lesson_id: str
lesson_title: str
avg_time_spent_minutes: float
rewatch_rate: float
quit_rate: float # % studenti che hanno abbandonato durante questo contenuto
avg_quiz_score_after: float
difficulty_index: float # Calcolato: alto = contenuto difficile
app = FastAPI(title="Learning Analytics Dashboard API")
@app.get("/api/analytics/courses/{course_id}/overview", response_model=CourseOverview)
async def get_course_overview(course_id: str, db=Depends(get_db)):
"""Panoramica corso con metriche chiave per il dashboard insegnante."""
row = await db.execute("""
SELECT
c.name,
COUNT(DISTINCT e.student_id) as enrolled,
COUNT(DISTINCT CASE WHEN a.last_activity > NOW() - INTERVAL '7 days' THEN a.student_id END) as active_7d,
AVG(CASE WHEN e.completed THEN 1.0 ELSE 0.0 END) as completion_rate,
AVG(q.avg_score) as avg_quiz,
COUNT(DISTINCT CASE WHEN r.risk_level IN ('high','critical') THEN r.student_id END) as at_risk,
COUNT(DISTINCT CASE WHEN e.dropped_out AND e.dropout_date > NOW() - INTERVAL '30 days' THEN e.student_id END)::float /
NULLIF(COUNT(DISTINCT CASE WHEN e.enrollment_date < NOW() - INTERVAL '30 days' THEN e.student_id END), 0) as dropout_rate
FROM courses c
LEFT JOIN enrollments e ON c.id = e.course_id
LEFT JOIN student_activity a ON e.student_id = a.student_id AND a.course_id = c.id
LEFT JOIN student_quiz_stats q ON e.student_id = q.student_id AND q.course_id = c.id
LEFT JOIN dropout_risk r ON e.student_id = r.student_id AND r.course_id = c.id
WHERE c.id = :cid
GROUP BY c.id, c.name
""", {"cid": course_id})
data = row.fetchone()
return CourseOverview(
course_id=course_id,
course_name=data[0],
total_enrolled=data[1] or 0,
active_last_7d=data[2] or 0,
completion_rate=float(data[3] or 0),
avg_quiz_score=float(data[4] or 0),
at_risk_count=data[5] or 0,
dropout_rate_30d=float(data[6] or 0),
)
@app.get("/api/analytics/courses/{course_id}/at-risk", response_model=List[StudentProgressDetail])
async def get_at_risk_students(
course_id: str,
risk_level: Optional[str] = Query(None, description="Filter by risk: low, medium, high, critical"),
db=Depends(get_db),
):
"""Lista studenti a rischio abbandono con dettagli progressione."""
query = """
SELECT
s.id, s.name,
e.enrollment_date,
COALESCE(prog.completion_percent, 0) as completion_percent,
COALESCE(qs.avg_score, 0) as avg_quiz_score,
COALESCE(str.current_streak, 0) as streak_days,
EXTRACT(DAY FROM NOW() - a.last_activity)::int as days_inactive,
r.risk_level,
r.risk_score
FROM students s
JOIN enrollments e ON s.id = e.student_id AND e.course_id = :cid
JOIN dropout_risk r ON s.id = r.student_id AND r.course_id = :cid
LEFT JOIN course_progress prog ON s.id = prog.student_id AND prog.course_id = :cid
LEFT JOIN student_quiz_stats qs ON s.id = qs.student_id AND qs.course_id = :cid
LEFT JOIN student_streaks str ON s.id = str.student_id
LEFT JOIN student_activity a ON s.id = a.student_id AND a.course_id = :cid
WHERE (:risk IS NULL OR r.risk_level = :risk)
ORDER BY r.risk_score DESC
LIMIT 100
"""
rows = (await db.execute(query, {"cid": course_id, "risk": risk_level})).fetchall()
return [
StudentProgressDetail(
student_id=r[0], student_name=r[1], enrollment_date=r[2],
completion_percent=float(r[3]), avg_quiz_score=float(r[4]),
streak_days=r[5], days_since_last_activity=r[6],
risk_level=r[7], risk_score=float(r[8]),
)
for r in rows
]
6. Öğrenme Analitiğinde GDPR ve Gizlilik
Öğrenme verileri hassas kişisel verilerdir. Toplayın ve analiz edin Öğrenci etkileşimleri GDPR uyumluluğunu gerektirir ve reşit olmayanlar için özel mevzuata (ABD'de COPPA, küçüklerin korunmasına ilişkin Avrupa direktifi). Temel ilkeler: veri minimizasyonu (yalnızca topla gerektiği gibi), anonimleştirme toplu raporlar için, unutulma hakkı (tüm öğrenci verilerini silme imkanı) e şeffaflık (neyin toplandığı ve neden toplandığı hakkında net bilgi).
Öğrenme Analitiği için GDPR Kontrol Listesi
- Ayrıntılı davranışsal verilerin toplanmasından önce açık onay
- Toplu raporlarda student_id'yi anonimleştirin (kişisel verileri dahil etmeyin)
- Unutulma hakkı için DELETE /students/{id}/analytics-data uç noktasını uygulayın
- Reşit olmayan öğrencilerden elde edilen veriler ebeveyn izni gerektirir
- Veri saklama politikası: ham olayları ne kadar süre saklayacağınızı tanımlayın (ör. 2 yıl)
- Öğrenci analitiğine kimlerin eriştiğine ilişkin denetim günlüğü
- Beklemedeki ve aktarım halindeki verilerin şifrelenmesi
- Açık rıza olmadan verileri üçüncü taraflarla paylaşmayın
Sonuçlar ve Sonraki Adımlar
Eksiksiz bir öğrenme analitiği hattı oluşturduk: xAPI ifadeleri veri standardizasyonu, garantili yüksek hacimli alım için Kafka teslimat, Faust/Kafka Streams ile gerçek zamanlı toplamalar, ayrılma tespiti davranışsal sinyallere ve öğretmenler için kontrol paneli API'sine dayalıdır.
Bir sonraki adım, geçmiş verilerle eğitilmiş bir makine öğrenimi modelini entegre etmektir. kural tabanlı bırakma dedektörünü değiştirin: yeterli veriyle modeller ML, erken teşhiste %85'in üzerinde doğruluk elde ediyor risk altındaki öğrenciler, fiili okulu bırakmadan 2-3 hafta önce.
Bir sonraki yazımızda bunları inceleyeceğiz gerçek zamanlı işbirliği EdTech platformlarında: İşbirliğine dayalı düzenleme ve WebSocket için Yjs ile CRDT Paylaşılan belgelerin gerçek zamanlı senkronizasyonu için.
EdTech Mühendislik Serisi
- Ölçeklenebilir LMS Mimarisi: Çok Kiracılı Model
- Uyarlanabilir Öğrenme Algoritmaları: Teoriden Üretime
- Eğitim için Video Yayını: WebRTC vs HLS vs DASH
- Yapay Zeka Gözetleme Sistemleri: Bilgisayarlı Görme ile Öncelik Gizlilik
- LLM'de Kişiselleştirilmiş Öğretmen: Bilgi Temellendirme için RAG
- Oyunlaştırma Motoru: Mimari ve Durum Makinesi
- Öğrenme Analitiği: xAPI ve Kafka ile Veri Boru Hattı (bu makale)
- Eğitim Teknolojisinde Gerçek Zamanlı İşbirliği: CRDT ve WebSocket
- Mobil Öncelikli Eğitim Teknolojisi: Çevrimdışı Öncelikli Mimari
- Çok Kiracılı İçerik Yönetimi: Sürüm Oluşturma ve SCORM







