Colaborare în timp real în EdTech: CRDT și WebSocket
Google Docs a demonstrat că este posibilă colaborarea sincronă pe documente. Notion, Figma, Miro au extins această paradigmă la infinite de note, desene și table albe. Acum platformele EdTech vor să aducă aceeași experiență învățării colaborative: studenții care lucrează la același document, cod sau exercițiu în timp real, vizualizarea cursorelor altora, schimbări instantanee, fără conflicte.
Problema fundamentală este competiție distribuită: când doi elevi editează același text în același timp de la mașini diferite, care editează câștigă? Pur și simplu „last-write-wins” produce rezultate inacceptabile (pierdere de modificări). Există două soluții standard: Transformare operațională (OT) (utilizat de Google Docs) e Tipuri de date replicate fără conflicte (CRDT). În 2024-2025, CRDT-urile au ajuns la maturitate de producție și devin standardul de facto pentru noile implementări, datorită simplității lor robustețe conceptuală și matematică.
În acest articol vom construi un sistem de colaborare în timp real pentru EdTech: editarea documentelor în colaborare cu Yjs (cel mai popular CRDT), WebSocket pentru sincronizare și caracteristici specifice pentru învățare, cum ar fi adnotări partajate, cursore denumite și comentarii în fir.
Ce veți învăța în acest articol
- CRDT vs OT: diferențe, avantaje și când să alegi ce
- Yjs: structură internă, tipuri de date și Y.Doc
- Server de sincronizare WebSocket cu y-websocket
- Prezență: cursoare cu nume, selecții și utilizatori online
- Persistență: salvarea și încărcarea stării CRDT
- Adnotări educaționale: comentarii, evidențieri și fire de discuții
- Offline-mai întâi cu CRDT: sincronizare automată la reconectare
- Scalare: de la un singur server la cluster cu Redis pub/sub
1. CRDT vs Transformare Operațională
Transformare operațională (OT) operațiunile de transformare înainte de a le aplica pentru a lua în considerare modificările concomitente. Funcționează bine, dar necesită un server central care coordonează transformările: Google Docs folosește această arhitectură cu un server transformare centralizată.
CRDT rezolvă problema într-un mod radical diferit: structuri de date sunt concepute matematic pentru a fi îmbinare-capabil fără conflicte. Operațiunile sunt comutativ (indiferent acelasi rezultat din ordin) e idempotent (aplicați aceeași operație de două ori nu modifică rezultatul). Acest lucru elimină necesitatea unui coordonator central și permite operarea offline cu sincronizare automată.
Comparație OT vs CRDT
| astept | OT (Google Docs) | CRDT (Yjs, Automerge) |
|---|---|---|
| Coordonator central | Necesar | Nu este necesar |
| Asistență offline | Limitat | Nativ |
| Complexitatea implementării | Ridicat | Media (cu biblioteci) |
| Garanții matematice | Depinde de implementare | Testat formal |
| Memorie deasupra capului | Bas | Mediu (piatră funerară pentru articolele șterse) |
| Biblioteci mature | ShareDB, OT.js | Yjs, Automerge, Cola |
| Scalabilitate | Vertical (server central) | Orizontal (P2P sau multi-server) |
| Adopție 2025 | Moștenire (sisteme existente) | Standarde pentru sisteme noi |
2. Yjs: CRDT pentru EdTech
Yjs și cea mai matură bibliotecă CRDT adoptată în 2025.
Oferă legături pentru toți editorii majori (ProseMirror, TipTap, CodeMirror, Monaco),
suport pentru WebSocket, WebRTC și IndexedDB și un API simplu bazat pe
Y.Doc (documentul de colaborare) și tipurile sale de date partajate.
Unitatea fundamentală a lui Yjs e Y.Doc: un document care conține
o ierarhie a structurilor de date partajate. Vin modificări ale acestor date
serializat automat în actualizări binare compact cât pot
să fie trimise tuturor colegilor și aplicate în orice ordine, fără conflicte.
// Frontend: src/collaboration/collaborative-editor.ts
import * as Y from 'yjs';
import { WebsocketProvider } from 'y-websocket';
import { QuillBinding } from 'y-quill';
import { IndexeddbPersistence } from 'y-indexeddb';
interface CollabUser {
name: string;
color: string;
avatar: string;
studentId: string;
}
export class CollaborativeEditorSession {
private doc: Y.Doc;
private wsProvider: WebsocketProvider;
private dbProvider: IndexeddbPersistence;
private text: Y.Text;
private annotations: Y.Array<any>;
private comments: Y.Map<any>;
constructor(
private readonly documentId: string,
private readonly user: CollabUser,
private readonly wsUrl: string,
) {
// Crea il documento collaborativo
this.doc = new Y.Doc();
// Strutture dati condivise
this.text = this.doc.getText('content');
this.annotations = this.doc.getArray('annotations');
this.comments = this.doc.getMap('comments');
}
async initialize(): Promise<void> {
// Persistenza locale con IndexedDB (supporto offline)
this.dbProvider = new IndexeddbPersistence(
`edtech-doc-${this.documentId}`,
this.doc,
);
await new Promise<void>((resolve) => {
this.dbProvider.on('synced', () => {
console.log('Documento caricato da IndexedDB');
resolve();
});
});
// Connessione WebSocket per sync real-time
this.wsProvider = new WebsocketProvider(
this.wsUrl,
`doc-${this.documentId}`,
this.doc,
{
connect: true,
awareness: this.createAwareness(),
},
);
this.wsProvider.on('status', (event: { status: string }) => {
console.log('WebSocket status:', event.status);
});
// Gestione presenza: cursori degli altri utenti
this.setupPresence();
}
private createAwareness() {
// Awareness e il meccanismo Yjs per stato temporaneo (presenza, cursori)
// NON fa parte del documento persistente, solo stato effimero
return {
getLocalState: () => ({
user: this.user,
cursor: null, // Aggiornato durante l'editing
selection: null,
lastSeen: Date.now(),
}),
};
}
private setupPresence(): void {
const awareness = this.wsProvider.awareness;
// Imposta stato locale dell'utente
awareness.setLocalStateField('user', this.user);
// Ascolta cambiamenti nella presenza degli altri utenti
awareness.on('change', () => {
const states = Array.from(awareness.getStates().entries());
const onlineUsers = states
.filter(([clientId]) => clientId !== this.doc.clientID)
.map(([, state]) => state.user)
.filter(Boolean);
this.onUsersChanged(onlineUsers);
});
}
updateCursor(position: number, selection: { from: number; to: number } | null): void {
this.wsProvider.awareness.setLocalStateField('cursor', position);
this.wsProvider.awareness.setLocalStateField('selection', selection);
}
addAnnotation(annotation: {
from: number;
to: number;
type: 'highlight' | 'underline' | 'comment';
comment?: string;
authorId: string;
}): void {
this.doc.transact(() => {
this.annotations.push([{
id: crypto.randomUUID(),
...annotation,
createdAt: Date.now(),
}]);
});
}
addComment(commentId: string, comment: {
text: string;
authorId: string;
authorName: string;
replyTo?: string;
}): void {
this.doc.transact(() => {
const thread = this.comments.get(commentId) || [];
this.comments.set(commentId, [...thread, {
id: crypto.randomUUID(),
...comment,
timestamp: Date.now(),
}]);
});
}
getAnnotations(): any[] {
return this.annotations.toArray();
}
getComments(): Map<string, any[]> {
return new Map(Array.from(this.comments.entries()));
}
protected onUsersChanged(users: CollabUser[]): void {
// Override in componente per aggiornare UI
console.log('Utenti online:', users.map(u => u.name));
}
destroy(): void {
this.wsProvider?.destroy();
this.dbProvider?.destroy();
this.doc?.destroy();
}
}
3. WebSocket Sync Server
Serverul WebSocket Yjs este responsabil pentru: colectarea actualizărilor de la clienți,
distribuie-le tuturor colegilor conectați la același document și persistă starea
a documentului pentru noi clienți care se conectează. Să folosim y-websocket
ca baza si o extindem cu autentificare, autorizare per chirias
și persistență pe Redis/PostgreSQL.
# server/collab_server.py
import asyncio
import json
import logging
from typing import Dict, Set, Optional
from dataclasses import dataclass, field
import websockets
from websockets.server import WebSocketServerProtocol
logger = logging.getLogger(__name__)
@dataclass
class DocumentRoom:
document_id: str
clients: Set[WebSocketServerProtocol] = field(default_factory=set)
awareness_states: Dict[int, dict] = field(default_factory=dict)
# Ultimo stato persistito del documento (aggiornamenti Yjs binari)
persisted_state: Optional[bytes] = None
class YjsWebSocketServer:
"""
Server WebSocket per sincronizzazione Yjs multi-room.
Ogni 'room' corrisponde a un documento collaborativo.
"""
MESSAGE_SYNC = 0
MESSAGE_AWARENESS = 1
MESSAGE_AUTH = 2
def __init__(
self,
redis_client,
db,
auth_service,
port: int = 8080,
):
self.redis = redis_client
self.db = db
self.auth = auth_service
self.port = port
# rooms: document_id -> DocumentRoom
self.rooms: Dict[str, DocumentRoom] = {}
async def handle_client(self, websocket: WebSocketServerProtocol, path: str):
"""
Handler principale per ogni connessione WebSocket.
path: /collab/{document_id}
"""
# Estrai document_id dal path
try:
document_id = path.split("/collab/")[1]
except (IndexError, AttributeError):
await websocket.close(4001, "Invalid path")
return
# Autenticazione: primo messaggio deve essere il token
try:
auth_msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
auth_data = json.loads(auth_msg)
user = await self.auth.verify_token(auth_data.get("token", ""))
if not user:
await websocket.close(4003, "Unauthorized")
return
# Verifica accesso al documento
if not await self.auth.can_access_document(user.id, document_id):
await websocket.close(4003, "Forbidden")
return
except asyncio.TimeoutError:
await websocket.close(4002, "Auth timeout")
return
# Unisci alla room del documento
room = self._get_or_create_room(document_id)
room.clients.add(websocket)
try:
# Invia stato corrente al nuovo client (sync iniziale)
await self._send_initial_state(websocket, document_id, room)
# Loop messaggi
async for message in websocket:
await self._handle_message(websocket, room, message, user)
except websockets.ConnectionClosed:
pass
finally:
room.clients.discard(websocket)
room.awareness_states.pop(id(websocket), None)
# Notifica agli altri client che questo utente e andato offline
await self._broadcast_awareness(room, exclude=websocket)
if not room.clients:
# Ultima persone nella room: persisti e rimuovi
await self._persist_document(document_id, room)
del self.rooms[document_id]
def _get_or_create_room(self, document_id: str) -> DocumentRoom:
if document_id not in self.rooms:
self.rooms[document_id] = DocumentRoom(document_id=document_id)
return self.rooms[document_id]
async def _send_initial_state(
self,
websocket: WebSocketServerProtocol,
document_id: str,
room: DocumentRoom,
) -> None:
"""Invia lo stato corrente del documento al nuovo client."""
# Carica da Redis (cache) o PostgreSQL
state = room.persisted_state or await self._load_document(document_id)
if state:
# Messaggio di sync: tipo 0 = SYNC_STEP1 in protocollo Yjs
await websocket.send(
bytes([self.MESSAGE_SYNC, 0]) + state
)
# Invia stati di awareness degli altri utenti
if room.awareness_states:
awareness_payload = json.dumps({
"clients": room.awareness_states
}).encode()
await websocket.send(bytes([self.MESSAGE_AWARENESS]) + awareness_payload)
async def _handle_message(
self,
websocket: WebSocketServerProtocol,
room: DocumentRoom,
message: bytes,
user,
) -> None:
"""Processa un messaggio dal client e lo propaga."""
if not isinstance(message, bytes) or not message:
return
msg_type = message[0]
payload = message[1:]
if msg_type == self.MESSAGE_SYNC:
# Aggiornamento documento: broadcast a tutti gli altri client
await self._broadcast(room, message, exclude=websocket)
# Aggiorna stato in memoria e schedula persistenza
await self._update_document_state(room, payload)
elif msg_type == self.MESSAGE_AWARENESS:
# Aggiornamento presenza: cursori, selezioni, utenti online
client_id = id(websocket)
room.awareness_states[client_id] = {
"user": user.to_dict(),
"payload": payload.decode("utf-8", errors="replace"),
}
await self._broadcast_awareness(room, exclude=None)
async def _broadcast(
self,
room: DocumentRoom,
message: bytes,
exclude: Optional[WebSocketServerProtocol] = None,
) -> None:
"""Invia un messaggio a tutti i client nella room eccetto il mittente."""
dead_clients = set()
for client in room.clients:
if client == exclude:
continue
try:
await client.send(message)
except websockets.ConnectionClosed:
dead_clients.add(client)
room.clients -= dead_clients
async def _broadcast_awareness(self, room: DocumentRoom, exclude=None) -> None:
awareness_payload = json.dumps({
"clients": {
str(cid): state
for cid, state in room.awareness_states.items()
}
}).encode()
message = bytes([self.MESSAGE_AWARENESS]) + awareness_payload
await self._broadcast(room, message, exclude=exclude)
async def _load_document(self, document_id: str) -> Optional[bytes]:
"""Carica documento da Redis o PostgreSQL."""
# Prima prova Redis
cached = await self.redis.get(f"yjsdoc:{document_id}")
if cached:
return cached
# Fallback su PostgreSQL
result = await self.db.execute(
"SELECT yjs_state FROM collaborative_documents WHERE id = :did",
{"did": document_id},
)
row = result.fetchone()
if row and row[0]:
state = bytes(row[0])
await self.redis.setex(f"yjsdoc:{document_id}", 3600, state)
return state
return None
async def _persist_document(self, document_id: str, room: DocumentRoom) -> None:
if not room.persisted_state:
return
await self.db.execute(
"""INSERT INTO collaborative_documents (id, yjs_state, updated_at)
VALUES (:did, :state, NOW())
ON CONFLICT (id) DO UPDATE
SET yjs_state = :state, updated_at = NOW()""",
{"did": document_id, "state": room.persisted_state},
)
await self.db.commit()
await self.redis.setex(f"yjsdoc:{document_id}", 3600, room.persisted_state)
async def _update_document_state(self, room: DocumentRoom, update: bytes) -> None:
"""Applica un update al documento in memoria."""
# In produzione usa una libreria Yjs lato server (y-py) per merge corretto
# Qui semplificato: conserva l'ultimo update ricevuto
room.persisted_state = update
async def start(self):
async with websockets.serve(self.handle_client, "0.0.0.0", self.port):
logger.info(f"Yjs WebSocket server in ascolto su porta {self.port}")
await asyncio.Future() # Loop infinito
4. Scalare: de la un singur server la cluster
Un singur server WebSocket nu poate gestiona mii de documente concurente. Pentru a scala orizontal, folosim Redis Pub/Sub ca magistrală de mesaje între servere: când un client de pe Server A trimite o actualizare, Server A o publică pe Redis, iar Server B (care are alți clienți conectați la același document) îl primește și îl transmite clienților săi.
# server/collab_cluster.py
import asyncio
import json
from typing import Dict, Optional
import redis.asyncio as aioredis
class ClusteredYjsServer:
"""
Estensione del server Yjs per deployment in cluster.
Usa Redis Pub/Sub per sincronizzare i nodi del cluster.
"""
def __init__(self, base_server: "YjsWebSocketServer", redis_url: str):
self.base = base_server
self.redis_url = redis_url
self.pub_redis: Optional[aioredis.Redis] = None
self.sub_redis: Optional[aioredis.Redis] = None
async def start_cluster_sync(self):
"""Avvia la sincronizzazione tra nodi del cluster via Redis."""
self.pub_redis = await aioredis.from_url(self.redis_url)
self.sub_redis = await aioredis.from_url(self.redis_url)
pubsub = self.sub_redis.pubsub()
await pubsub.psubscribe("yjs:*") # Subscribe a tutti i canali documento
asyncio.create_task(self._listen_cluster_messages(pubsub))
logger.info("Cluster sync avviato via Redis Pub/Sub")
async def _listen_cluster_messages(self, pubsub):
"""Ricevi e distribuisci messaggi dagli altri nodi del cluster."""
async for message in pubsub.listen():
if message["type"] not in ("message", "pmessage"):
continue
channel = message["channel"].decode()
document_id = channel.split("yjs:")[-1]
if document_id not in self.base.rooms:
continue # Questo nodo non ha client per questo documento
data = message["data"]
room = self.base.rooms[document_id]
# Propaga ai client locali (escludi il mittente tramite source_node)
source_node = json.loads(data[:36]) if len(data) > 36 else {}
payload = data[36:]
await self.base._broadcast(room, payload, exclude=None)
async def publish_update(self, document_id: str, update: bytes, node_id: str) -> None:
"""Pubblica un update a tutti i nodi del cluster."""
channel = f"yjs:{document_id}"
# Prefissa con il node_id per evitare loop
message = json.dumps({"node": node_id}).encode()[:36].ljust(36) + update
await self.pub_redis.publish(channel, message)
5. Caracteristici educaționale: adnotări și fire
Pe lângă editarea în colaborare, platformele EdTech au nevoie de funcționalitate specificatii pentru invatare: adnotări (relele și note pe părți ale textului), fir de discuție (intrebari si raspunsuri ancorat la trepte specifice), e modul de revizuire (profesor comentarii asupra lucrării elevului fără a o modifica direct).
// Frontend: src/collaboration/annotations.service.ts
import * as Y from 'yjs';
import { Injectable } from '@angular/core';
export interface Annotation {
id: string;
from: number;
to: number;
type: 'highlight' | 'underline' | 'comment' | 'correction';
color?: string;
authorId: string;
authorName: string;
text?: string;
timestamp: number;
resolved?: boolean;
}
export interface CommentThread {
id: string;
annotationId: string;
replies: CommentReply[];
resolved: boolean;
}
export interface CommentReply {
id: string;
text: string;
authorId: string;
authorName: string;
timestamp: number;
isTeacher: boolean;
}
@Injectable({ providedIn: 'root' })
export class AnnotationsService {
private doc: Y.Doc | null = null;
private annotations: Y.Array<Annotation> | null = null;
private threads: Y.Map<CommentThread> | null = null;
initialize(doc: Y.Doc): void {
this.doc = doc;
this.annotations = doc.getArray<Annotation>('annotations');
this.threads = doc.getMap<CommentThread>('threads');
}
addAnnotation(annotation: Omit<Annotation, 'id' | 'timestamp'>): string {
if (!this.annotations || !this.doc) throw new Error('Non inizializzato');
const id = crypto.randomUUID();
const fullAnnotation: Annotation = {
...annotation,
id,
timestamp: Date.now(),
};
this.doc.transact(() => {
this.annotations!.push([fullAnnotation]);
// Crea thread vuoto se e un commento
if (annotation.type === 'comment') {
this.threads!.set(id, {
id,
annotationId: id,
replies: [],
resolved: false,
});
}
});
return id;
}
addReply(threadId: string, reply: Omit<CommentReply, 'id' | 'timestamp'>): void {
if (!this.threads || !this.doc) return;
const thread = this.threads.get(threadId);
if (!thread) return;
const updatedThread: CommentThread = {
...thread,
replies: [...thread.replies, {
...reply,
id: crypto.randomUUID(),
timestamp: Date.now(),
}],
};
this.doc.transact(() => {
this.threads!.set(threadId, updatedThread);
});
}
resolveThread(threadId: string): void {
if (!this.threads || !this.doc) return;
const thread = this.threads.get(threadId);
if (!thread) return;
this.doc.transact(() => {
this.threads!.set(threadId, { ...thread, resolved: true });
// Segna l'annotazione come risolta
const annotations = this.annotations!.toArray();
const idx = annotations.findIndex(a => a.id === thread.annotationId);
if (idx !== -1) {
this.annotations!.delete(idx, 1);
this.annotations!.insert(idx, [{ ...annotations[idx], resolved: true }]);
}
});
}
getAnnotations(): Annotation[] {
return this.annotations?.toArray() ?? [];
}
getThread(threadId: string): CommentThread | undefined {
return this.threads?.get(threadId);
}
onAnnotationsChange(callback: (annotations: Annotation[]) => void): () => void {
if (!this.annotations) return () => {};
const handler = () => callback(this.getAnnotations());
this.annotations.observe(handler);
return () => this.annotations?.unobserve(handler);
}
}
Anti-modele de evitat
- Stare mutabilă partajată fără CRDT: Utilizați o variabilă partajată cu blocare și un anti-model cu risc ridicat de blocare. Utilizați întotdeauna CRDT sau OT.
- Difuzare fără filtru: Trimiterea fiecărei apăsări de tastă către fiecare client este costisitoare. Aplicați debounce la nivelul clientului (50-100 ms) înainte de a trimite actualizări.
- Conștientizare persistentă: Starea de prezenta (cursori, utilizatori online) este efemera, nu persistenta. Nu îl salvați în documentul principal.
- Fără colectare a gunoiului: Yjs acumulează „piatră funerară” pentru articolele șterse. Activați colectarea periodică a gunoiului pentru a reduce utilizarea memoriei.
- WebSocket fără bătăi de inimă: Conexiunile inactive sunt închise de proxy/echilibratoare de sarcină. Implementați ping/pong la fiecare 30 de secunde.
- Fără autentificare pe WebSocket: WebSockets ocolește autentificarea HTTP standard. Verificați întotdeauna jetonul când vă conectați.
Concluzii și pașii următori
Am construit un sistem complet de colaborare în timp real pentru EdTech: CRDT cu Yjs pentru gestionarea fără conflicte a documentelor partajate, WebSocket pentru sincronizare în timp real, prezență pentru cursoare și utilizatori online, adnotări și fire de discuții pentru caracteristici educaționale specifice, și arhitectură cluster cu Redis Pub/Sub pentru scalabilitate.
Combinația CRDT + offline-first face ca acest sistem să fie deosebit de potrivit pentru contexte educaționale în care conexiunea nu este întotdeauna garantată: elevi pot continua să lucreze offline, iar modificările lor vor fi sincronizate automat când conexiunea devine din nou disponibilă.
În articolul următor ne vom adresa offline-first arhitectură pentru aplicațiile mobile EdTech: IndexedDB, Service Workers, strategii de sincronizare și îmbunătățire progresivă pentru a garanta învățarea chiar și fără conexiune.
Seria EdTech Engineering
- Arhitectură LMS scalabilă: model multi-chiriași
- Algoritmi de învățare adaptivă: de la teorie la producție
- Streaming video pentru educație: WebRTC vs HLS vs DASH
- Sisteme de supraveghere AI: confidențialitate-în primul rând cu computer Vision
- Tutor personalizat cu LLM: RAG pentru fundamentarea cunoștințelor
- Motor de gamification: Arhitectură și mașină de stat
- Learning Analytics: Data Pipeline cu xAPI și Kafka
- Colaborare în timp real în EdTech: CRDT și WebSocket (acest articol)
- Mobile-First EdTech: Offline-First Architecture
- Managementul conținutului cu mai mulți chiriași: Versiune și SCORM







