Spolupráce v reálném čase v EdTech: CRDT a WebSocket
Dokumenty Google prokázaly, že synchronní spolupráce na dokumentech je možná. Notion, Figma, Miro rozšířili toto paradigma na nekonečné poznámky, návrhy a tabule. Nyní chtějí platformy EdTech přinést stejný zážitek do kolaborativního učení: studenti pracující na stejném dokumentu, kódu nebo cvičení v reálném čase, vidět kurzory jiných lidí, okamžité změny, bez konfliktů.
Zásadním problémem je distribuovaná konkurence: když dva studenti upravují stejný text ve stejnou dobu z různých strojů, která úprava vyhraje? Jednoduše „poslední zápis-vítězí“ přináší nepřijatelné výsledky (ztráta změn). Existují dvě standardní řešení: Provozní transformace (OT) (používané Dokumenty Google) e Bezkonfliktní replikované datové typy (CRDT). V letech 2024–2025 dosáhly CRDT produkční vyspělosti a stávají se de facto standard pro nové implementace díky jejich jednoduchosti koncepční a matematická robustnost.
V tomto článku vytvoříme systém spolupráce v reálném čase pro EdTech: kolaborativní úpravy dokumentů pomocí Yjs (nejoblíbenější CRDT), WebSocket pro synchronizaci a specifické funkce pro učení jako např sdílené anotace, pojmenované kurzory a vláknové komentáře.
Co se dozvíte v tomto článku
- CRDT vs OT: rozdíly, výhody a kdy si vybrat co
- Yjs: vnitřní struktura, datové typy a Y.Doc
- Synchronizační server WebSocket s y-websocket
- Přítomnost: pojmenované kurzory, výběry a online uživatelé
- Persistence: ukládání a načítání stavu CRDT
- Vzdělávací anotace: komentáře, zvýraznění a diskusní vlákna
- Offline-first s CRDT: automatická synchronizace při opětovném připojení
- Škálování: z jednoho serveru na cluster s Redis pub/sub
1. CRDT vs provozní transformace
Provozní transformace (OT) transformovat operace před jejich aplikací zohlednit souběžné změny. Funguje dobře, ale vyžaduje centrální server který koordinuje transformace: Dokumenty Google používají tuto architekturu se serverem centralizovaná transformace.
CRDT řeší problém radikálně odlišným způsobem: datové struktury jsou matematicky navrženy tak, aby byly spojit-schopný bez konfliktů. Operace jsou komutativní (bez ohledu na to stejný výsledek z objednávky) e idempotentní (aplikujte stejnou operaci dvakrát nemění výsledek). Tím odpadá potřeba centrálního koordinátora a umožňuje offline provoz s automatickou synchronizací.
Srovnání OT vs CRDT
| čekám | OT (Dokumenty Google) | CRDT (Yjs, Automerge) |
|---|---|---|
| Centrální koordinátor | Požadovaný | Není nutné |
| Podpora offline | Omezený | Rodák |
| Složitost implementace | Vysoký | Média (s knihovnami) |
| Matematické záruky | Záleží na provedení | Formálně testováno |
| Paměť nad hlavou | Bas | Střední (náhrobní kámen pro smazané položky) |
| Vyspělé knihovny | ShareDB, OT.js | Yjs, Automerge, Cola |
| Škálovatelnost | Vertikální (centrální server) | Horizontální (P2P nebo více serverů) |
| Adopce 2025 | Starší (stávající systémy) | Normy pro nové systémy |
2. Yjs: CRDT pro EdTech
Yjs a nejvyspělejší, přijatá knihovna CRDT v roce 2025.
Nabízí vazby pro všechny hlavní editory (ProseMirror, TipTap, CodeMirror, Monaco),
podpora pro WebSocket, WebRTC a IndexedDB a jednoduché API založené na
Y.Doc (dokument pro spolupráci) a jeho sdílené datové typy.
Základní jednotka Yjs e Y.Doc: dokument, který obsahuje
hierarchie sdílených datových struktur. Přijdou změny těchto údajů
automaticky serializovat v binární aktualizace kompaktní, jak mohou
být zaslány všem kolegům a aplikovány v libovolném pořadí bez konfliktů.
// Frontend: src/collaboration/collaborative-editor.ts
import * as Y from 'yjs';
import { WebsocketProvider } from 'y-websocket';
import { QuillBinding } from 'y-quill';
import { IndexeddbPersistence } from 'y-indexeddb';
interface CollabUser {
name: string;
color: string;
avatar: string;
studentId: string;
}
export class CollaborativeEditorSession {
private doc: Y.Doc;
private wsProvider: WebsocketProvider;
private dbProvider: IndexeddbPersistence;
private text: Y.Text;
private annotations: Y.Array<any>;
private comments: Y.Map<any>;
constructor(
private readonly documentId: string,
private readonly user: CollabUser,
private readonly wsUrl: string,
) {
// Crea il documento collaborativo
this.doc = new Y.Doc();
// Strutture dati condivise
this.text = this.doc.getText('content');
this.annotations = this.doc.getArray('annotations');
this.comments = this.doc.getMap('comments');
}
async initialize(): Promise<void> {
// Persistenza locale con IndexedDB (supporto offline)
this.dbProvider = new IndexeddbPersistence(
`edtech-doc-${this.documentId}`,
this.doc,
);
await new Promise<void>((resolve) => {
this.dbProvider.on('synced', () => {
console.log('Documento caricato da IndexedDB');
resolve();
});
});
// Connessione WebSocket per sync real-time
this.wsProvider = new WebsocketProvider(
this.wsUrl,
`doc-${this.documentId}`,
this.doc,
{
connect: true,
awareness: this.createAwareness(),
},
);
this.wsProvider.on('status', (event: { status: string }) => {
console.log('WebSocket status:', event.status);
});
// Gestione presenza: cursori degli altri utenti
this.setupPresence();
}
private createAwareness() {
// Awareness e il meccanismo Yjs per stato temporaneo (presenza, cursori)
// NON fa parte del documento persistente, solo stato effimero
return {
getLocalState: () => ({
user: this.user,
cursor: null, // Aggiornato durante l'editing
selection: null,
lastSeen: Date.now(),
}),
};
}
private setupPresence(): void {
const awareness = this.wsProvider.awareness;
// Imposta stato locale dell'utente
awareness.setLocalStateField('user', this.user);
// Ascolta cambiamenti nella presenza degli altri utenti
awareness.on('change', () => {
const states = Array.from(awareness.getStates().entries());
const onlineUsers = states
.filter(([clientId]) => clientId !== this.doc.clientID)
.map(([, state]) => state.user)
.filter(Boolean);
this.onUsersChanged(onlineUsers);
});
}
updateCursor(position: number, selection: { from: number; to: number } | null): void {
this.wsProvider.awareness.setLocalStateField('cursor', position);
this.wsProvider.awareness.setLocalStateField('selection', selection);
}
addAnnotation(annotation: {
from: number;
to: number;
type: 'highlight' | 'underline' | 'comment';
comment?: string;
authorId: string;
}): void {
this.doc.transact(() => {
this.annotations.push([{
id: crypto.randomUUID(),
...annotation,
createdAt: Date.now(),
}]);
});
}
addComment(commentId: string, comment: {
text: string;
authorId: string;
authorName: string;
replyTo?: string;
}): void {
this.doc.transact(() => {
const thread = this.comments.get(commentId) || [];
this.comments.set(commentId, [...thread, {
id: crypto.randomUUID(),
...comment,
timestamp: Date.now(),
}]);
});
}
getAnnotations(): any[] {
return this.annotations.toArray();
}
getComments(): Map<string, any[]> {
return new Map(Array.from(this.comments.entries()));
}
protected onUsersChanged(users: CollabUser[]): void {
// Override in componente per aggiornare UI
console.log('Utenti online:', users.map(u => u.name));
}
destroy(): void {
this.wsProvider?.destroy();
this.dbProvider?.destroy();
this.doc?.destroy();
}
}
3. WebSocket Sync Server
Server WebSocket Yjs je zodpovědný za: shromažďování aktualizací od klientů,
distribuovat je všem kolegům připojeným ke stejnému dokumentu a zachovat stav
dokumentu pro připojení nových klientů. Pojďme použít y-websocket
jako základ a rozšiřujeme jej o autentizaci, autorizaci na nájemce
a vytrvalost na Redis/PostgreSQL.
# server/collab_server.py
import asyncio
import json
import logging
from typing import Dict, Set, Optional
from dataclasses import dataclass, field
import websockets
from websockets.server import WebSocketServerProtocol
logger = logging.getLogger(__name__)
@dataclass
class DocumentRoom:
document_id: str
clients: Set[WebSocketServerProtocol] = field(default_factory=set)
awareness_states: Dict[int, dict] = field(default_factory=dict)
# Ultimo stato persistito del documento (aggiornamenti Yjs binari)
persisted_state: Optional[bytes] = None
class YjsWebSocketServer:
"""
Server WebSocket per sincronizzazione Yjs multi-room.
Ogni 'room' corrisponde a un documento collaborativo.
"""
MESSAGE_SYNC = 0
MESSAGE_AWARENESS = 1
MESSAGE_AUTH = 2
def __init__(
self,
redis_client,
db,
auth_service,
port: int = 8080,
):
self.redis = redis_client
self.db = db
self.auth = auth_service
self.port = port
# rooms: document_id -> DocumentRoom
self.rooms: Dict[str, DocumentRoom] = {}
async def handle_client(self, websocket: WebSocketServerProtocol, path: str):
"""
Handler principale per ogni connessione WebSocket.
path: /collab/{document_id}
"""
# Estrai document_id dal path
try:
document_id = path.split("/collab/")[1]
except (IndexError, AttributeError):
await websocket.close(4001, "Invalid path")
return
# Autenticazione: primo messaggio deve essere il token
try:
auth_msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
auth_data = json.loads(auth_msg)
user = await self.auth.verify_token(auth_data.get("token", ""))
if not user:
await websocket.close(4003, "Unauthorized")
return
# Verifica accesso al documento
if not await self.auth.can_access_document(user.id, document_id):
await websocket.close(4003, "Forbidden")
return
except asyncio.TimeoutError:
await websocket.close(4002, "Auth timeout")
return
# Unisci alla room del documento
room = self._get_or_create_room(document_id)
room.clients.add(websocket)
try:
# Invia stato corrente al nuovo client (sync iniziale)
await self._send_initial_state(websocket, document_id, room)
# Loop messaggi
async for message in websocket:
await self._handle_message(websocket, room, message, user)
except websockets.ConnectionClosed:
pass
finally:
room.clients.discard(websocket)
room.awareness_states.pop(id(websocket), None)
# Notifica agli altri client che questo utente e andato offline
await self._broadcast_awareness(room, exclude=websocket)
if not room.clients:
# Ultima persone nella room: persisti e rimuovi
await self._persist_document(document_id, room)
del self.rooms[document_id]
def _get_or_create_room(self, document_id: str) -> DocumentRoom:
if document_id not in self.rooms:
self.rooms[document_id] = DocumentRoom(document_id=document_id)
return self.rooms[document_id]
async def _send_initial_state(
self,
websocket: WebSocketServerProtocol,
document_id: str,
room: DocumentRoom,
) -> None:
"""Invia lo stato corrente del documento al nuovo client."""
# Carica da Redis (cache) o PostgreSQL
state = room.persisted_state or await self._load_document(document_id)
if state:
# Messaggio di sync: tipo 0 = SYNC_STEP1 in protocollo Yjs
await websocket.send(
bytes([self.MESSAGE_SYNC, 0]) + state
)
# Invia stati di awareness degli altri utenti
if room.awareness_states:
awareness_payload = json.dumps({
"clients": room.awareness_states
}).encode()
await websocket.send(bytes([self.MESSAGE_AWARENESS]) + awareness_payload)
async def _handle_message(
self,
websocket: WebSocketServerProtocol,
room: DocumentRoom,
message: bytes,
user,
) -> None:
"""Processa un messaggio dal client e lo propaga."""
if not isinstance(message, bytes) or not message:
return
msg_type = message[0]
payload = message[1:]
if msg_type == self.MESSAGE_SYNC:
# Aggiornamento documento: broadcast a tutti gli altri client
await self._broadcast(room, message, exclude=websocket)
# Aggiorna stato in memoria e schedula persistenza
await self._update_document_state(room, payload)
elif msg_type == self.MESSAGE_AWARENESS:
# Aggiornamento presenza: cursori, selezioni, utenti online
client_id = id(websocket)
room.awareness_states[client_id] = {
"user": user.to_dict(),
"payload": payload.decode("utf-8", errors="replace"),
}
await self._broadcast_awareness(room, exclude=None)
async def _broadcast(
self,
room: DocumentRoom,
message: bytes,
exclude: Optional[WebSocketServerProtocol] = None,
) -> None:
"""Invia un messaggio a tutti i client nella room eccetto il mittente."""
dead_clients = set()
for client in room.clients:
if client == exclude:
continue
try:
await client.send(message)
except websockets.ConnectionClosed:
dead_clients.add(client)
room.clients -= dead_clients
async def _broadcast_awareness(self, room: DocumentRoom, exclude=None) -> None:
awareness_payload = json.dumps({
"clients": {
str(cid): state
for cid, state in room.awareness_states.items()
}
}).encode()
message = bytes([self.MESSAGE_AWARENESS]) + awareness_payload
await self._broadcast(room, message, exclude=exclude)
async def _load_document(self, document_id: str) -> Optional[bytes]:
"""Carica documento da Redis o PostgreSQL."""
# Prima prova Redis
cached = await self.redis.get(f"yjsdoc:{document_id}")
if cached:
return cached
# Fallback su PostgreSQL
result = await self.db.execute(
"SELECT yjs_state FROM collaborative_documents WHERE id = :did",
{"did": document_id},
)
row = result.fetchone()
if row and row[0]:
state = bytes(row[0])
await self.redis.setex(f"yjsdoc:{document_id}", 3600, state)
return state
return None
async def _persist_document(self, document_id: str, room: DocumentRoom) -> None:
if not room.persisted_state:
return
await self.db.execute(
"""INSERT INTO collaborative_documents (id, yjs_state, updated_at)
VALUES (:did, :state, NOW())
ON CONFLICT (id) DO UPDATE
SET yjs_state = :state, updated_at = NOW()""",
{"did": document_id, "state": room.persisted_state},
)
await self.db.commit()
await self.redis.setex(f"yjsdoc:{document_id}", 3600, room.persisted_state)
async def _update_document_state(self, room: DocumentRoom, update: bytes) -> None:
"""Applica un update al documento in memoria."""
# In produzione usa una libreria Yjs lato server (y-py) per merge corretto
# Qui semplificato: conserva l'ultimo update ricevuto
room.persisted_state = update
async def start(self):
async with websockets.serve(self.handle_client, "0.0.0.0", self.port):
logger.info(f"Yjs WebSocket server in ascolto su porta {self.port}")
await asyncio.Future() # Loop infinito
4. Škálování: Od jednoho serveru ke klastru
Jediný server WebSocket nedokáže zpracovat tisíce souběžných dokumentů. Pro horizontální měřítko používáme Redis Pub/Sub jako sběrnice zpráv mezi servery: když klient na serveru A odešle aktualizaci, server A ji publikuje na Redis a server B (který má další klienty připojené ke stejnému dokumentu) jej přijme a předává je svým klientům.
# server/collab_cluster.py
import asyncio
import json
from typing import Dict, Optional
import redis.asyncio as aioredis
class ClusteredYjsServer:
"""
Estensione del server Yjs per deployment in cluster.
Usa Redis Pub/Sub per sincronizzare i nodi del cluster.
"""
def __init__(self, base_server: "YjsWebSocketServer", redis_url: str):
self.base = base_server
self.redis_url = redis_url
self.pub_redis: Optional[aioredis.Redis] = None
self.sub_redis: Optional[aioredis.Redis] = None
async def start_cluster_sync(self):
"""Avvia la sincronizzazione tra nodi del cluster via Redis."""
self.pub_redis = await aioredis.from_url(self.redis_url)
self.sub_redis = await aioredis.from_url(self.redis_url)
pubsub = self.sub_redis.pubsub()
await pubsub.psubscribe("yjs:*") # Subscribe a tutti i canali documento
asyncio.create_task(self._listen_cluster_messages(pubsub))
logger.info("Cluster sync avviato via Redis Pub/Sub")
async def _listen_cluster_messages(self, pubsub):
"""Ricevi e distribuisci messaggi dagli altri nodi del cluster."""
async for message in pubsub.listen():
if message["type"] not in ("message", "pmessage"):
continue
channel = message["channel"].decode()
document_id = channel.split("yjs:")[-1]
if document_id not in self.base.rooms:
continue # Questo nodo non ha client per questo documento
data = message["data"]
room = self.base.rooms[document_id]
# Propaga ai client locali (escludi il mittente tramite source_node)
source_node = json.loads(data[:36]) if len(data) > 36 else {}
payload = data[36:]
await self.base._broadcast(room, payload, exclude=None)
async def publish_update(self, document_id: str, update: bytes, node_id: str) -> None:
"""Pubblica un update a tutti i nodi del cluster."""
channel = f"yjs:{document_id}"
# Prefissa con il node_id per evitare loop
message = json.dumps({"node": node_id}).encode()[:36].ljust(36) + update
await self.pub_redis.publish(channel, message)
5. Vzdělávací prvky: Anotace a vlákna
Kromě společných úprav potřebují platformy EdTech funkčnost specifika pro učení: anotace (zvýraznění a poznámky na části textu), diskusní vlákno (otázky a odpovědi ukotvené ke konkrétním krokům), e režim kontroly (učitelka komentuje práci studenta, aniž by ji přímo upravoval).
// Frontend: src/collaboration/annotations.service.ts
import * as Y from 'yjs';
import { Injectable } from '@angular/core';
export interface Annotation {
id: string;
from: number;
to: number;
type: 'highlight' | 'underline' | 'comment' | 'correction';
color?: string;
authorId: string;
authorName: string;
text?: string;
timestamp: number;
resolved?: boolean;
}
export interface CommentThread {
id: string;
annotationId: string;
replies: CommentReply[];
resolved: boolean;
}
export interface CommentReply {
id: string;
text: string;
authorId: string;
authorName: string;
timestamp: number;
isTeacher: boolean;
}
@Injectable({ providedIn: 'root' })
export class AnnotationsService {
private doc: Y.Doc | null = null;
private annotations: Y.Array<Annotation> | null = null;
private threads: Y.Map<CommentThread> | null = null;
initialize(doc: Y.Doc): void {
this.doc = doc;
this.annotations = doc.getArray<Annotation>('annotations');
this.threads = doc.getMap<CommentThread>('threads');
}
addAnnotation(annotation: Omit<Annotation, 'id' | 'timestamp'>): string {
if (!this.annotations || !this.doc) throw new Error('Non inizializzato');
const id = crypto.randomUUID();
const fullAnnotation: Annotation = {
...annotation,
id,
timestamp: Date.now(),
};
this.doc.transact(() => {
this.annotations!.push([fullAnnotation]);
// Crea thread vuoto se e un commento
if (annotation.type === 'comment') {
this.threads!.set(id, {
id,
annotationId: id,
replies: [],
resolved: false,
});
}
});
return id;
}
addReply(threadId: string, reply: Omit<CommentReply, 'id' | 'timestamp'>): void {
if (!this.threads || !this.doc) return;
const thread = this.threads.get(threadId);
if (!thread) return;
const updatedThread: CommentThread = {
...thread,
replies: [...thread.replies, {
...reply,
id: crypto.randomUUID(),
timestamp: Date.now(),
}],
};
this.doc.transact(() => {
this.threads!.set(threadId, updatedThread);
});
}
resolveThread(threadId: string): void {
if (!this.threads || !this.doc) return;
const thread = this.threads.get(threadId);
if (!thread) return;
this.doc.transact(() => {
this.threads!.set(threadId, { ...thread, resolved: true });
// Segna l'annotazione come risolta
const annotations = this.annotations!.toArray();
const idx = annotations.findIndex(a => a.id === thread.annotationId);
if (idx !== -1) {
this.annotations!.delete(idx, 1);
this.annotations!.insert(idx, [{ ...annotations[idx], resolved: true }]);
}
});
}
getAnnotations(): Annotation[] {
return this.annotations?.toArray() ?? [];
}
getThread(threadId: string): CommentThread | undefined {
return this.threads?.get(threadId);
}
onAnnotationsChange(callback: (annotations: Annotation[]) => void): () => void {
if (!this.annotations) return () => {};
const handler = () => callback(this.getAnnotations());
this.annotations.observe(handler);
return () => this.annotations?.unobserve(handler);
}
}
Anti-vzory, kterým je třeba se vyhnout
- Sdílený proměnlivý stav bez CRDT: Použijte sdílenou proměnnou se zámkem a anti-vzorem s vysokým rizikem uváznutí. Vždy používejte CRDT nebo OT.
- Vysílání bez filtru: Odeslání každého stisku klávesy každému klientovi je drahé. Před odesláním aktualizací použijte debounce na straně klienta (50-100 ms).
- Trvalé povědomí: Stav přítomnosti (kurzory, online uživatelé) je pomíjivý, nikoli trvalý. Neukládejte jej do hlavního dokumentu.
- Žádný svoz odpadu: Yjs hromadí „náhrobní kámen“ pro smazané položky. Chcete-li snížit využití paměti, povolte pravidelné shromažďování odpadu.
- WebSocket bez srdečního tepu: Nečinná spojení jsou uzavřena proxy/load balancery. Provádějte ping/pong každých 30 sekund.
- Žádné ověření na WebSocket: WebSockets obchází standardní ověřování HTTP. Při připojování vždy ověřte svůj token.
Závěry a další kroky
Pro EdTech jsme vybudovali kompletní systém spolupráce v reálném čase: CRDT s Yjs pro bezkonfliktní správu sdílených dokumentů, WebSocket pro synchronizaci v reálném čase, přítomnost pro kurzory a online uživatele, anotace a diskusní vlákna pro konkrétní vzdělávací funkce, a clusterová architektura s Redis Pub/Sub pro škálovatelnost.
Díky kombinaci CRDT + offline-first je tento systém obzvláště vhodný pro vzdělávací kontexty, kde spojení není vždy zaručeno: studenti mohou pokračovat v práci offline a jejich změny budou synchronizovány automaticky, jakmile bude připojení opět dostupné.
V příštím článku se budeme věnovat offline-first architektura pro mobilní aplikace EdTech: IndexedDB, Service Workers, strategie synchronizace a progresivní vylepšení zaručující učení i bez připojení.
EdTech Engineering Series
- Škálovatelná architektura LMS: Vzor pro více nájemců
- Algoritmy adaptivního učení: Od teorie k produkci
- Streamování videa pro vzdělávání: WebRTC vs HLS vs DASH
- AI Proctoring Systems: Privacy-first with Computer Vision
- Personalizovaný lektor s LLM: RAG pro ukotvení znalostí
- Gamification Engine: Architektura a státní stroj
- Learning Analytics: Data Pipeline s xAPI a Kafka
- Spolupráce v reálném čase v EdTech: CRDT a WebSocket (tento článek)
- Mobile-First EdTech: Offline-First Architecture
- Správa obsahu pro více nájemců: Správa verzí a SCORM







