CQRS: Citire și scriere separate pentru scalare independentă
Un API REST clasic folosește același model pentru a citi și scrie date. Dar nevoile cititul şi scrisul sunt adesea foarte diferite: scrierile trebuie să garanteze consistenţa şi validarea regulilor de afaceri complexe; citirile trebuie să fie rapide, scalabile și să revină date în formate optimizate pentru UI. Încercați să satisfaceți ambele nevoi cu el același model duce la compromisuri: interogări complexe pentru vizualizări sau modele „umflate”. pentru a sprijini orice tip de lectură.
CQRS (Segregarea responsabilității interogărilor de comandă) separă în mod explicit model de scriere (partea de comandă) din modelul de citire (partea de interogare). Partea de comandă gestionează operațiunile care modifică starea (creare, actualizare, ștergere); partea de interogare gestionează operațiunile de citire, de obicei cu modele de date optimizate pentru vizualizări specifice aplicației. Sincronizarea între cele două are loc prin evenimente sau proiecții asincrone.
Ce vei învăța
- Arhitectura CQRS: partea de comandă, partea de interogare, sincronizare asincronă
- Implementați un handler de comandă în TypeScript cu validare
- Proiecții: construiți modele de citire optimizate pentru vizualizări
- Sincronizare asincronă prin evenimente (Event Sourcing + CQRS)
- CQRS fără Event Sourcing: model hibrid simplificat
- Scalare independentă de citire și scriere
- Compensație: posibilă consistență și complexitate operațională
Arhitectura CQRS
În CQRS, fiecare operație este sau a Comanda (schimba starea) sau unul Interogări (citește starea fără a o modifica). Niciodată ambele în același timp metoda (principiul CQS al lui Bertrand Meyer):
// WRONG: un metodo che fa sia command che query
async function reserveInventory(productId: string, quantity: number): Promise<Stock> {
const stock = await db.getStock(productId);
stock.reserved += quantity; // MODIFICA
await db.saveStock(stock);
return stock; // LETTURA
}
// RIGHT CQRS: separa command e query
async function reserveInventory(productId: string, quantity: number): Promise<void> {
// Command: solo modifica, nessun ritorno di stato
const stock = await stockRepo.findById(productId);
stock.reserve(quantity);
await stockRepo.save(stock);
// Pubblica evento per aggiornare il read model
await eventBus.publish(new InventoryReservedEvent(productId, quantity));
}
async function getStockLevel(productId: string): Promise<StockLevelDto> {
// Query: legge dal read model ottimizzato, ZERO side effects
return await stockReadModel.findById(productId);
}
Partea de comandă: managementul comenzilor
Partea de comandă primește Command (obiecte imuabile care descriu o intenție), li valid, execută logica de afaceri pe agregat și publică evenimente. Modelul Manager de comandă și componenta care coordonează acest flux:
// ---- COMMANDS ----
// Ogni command e un DTO immutabile
class PlaceOrderCommand {
constructor(
public readonly orderId: string,
public readonly customerId: string,
public readonly items: ReadonlyArray<{
productId: string;
quantity: number;
}>,
public readonly shippingAddress: Readonly<Address>
) {}
}
// ---- COMMAND HANDLER ----
class PlaceOrderCommandHandler {
constructor(
private readonly orderRepo: OrderRepository,
private readonly productRepo: ProductRepository,
private readonly eventBus: EventBus
) {}
async handle(command: PlaceOrderCommand): Promise<void> {
// 1. Validazione: prodotti esistono?
const products = await Promise.all(
command.items.map((item) => this.productRepo.findById(item.productId))
);
if (products.some((p) => p === null)) {
throw new Error('One or more products not found');
}
// 2. Crea l'Aggregate (logica business)
const order = new OrderAggregate();
order.create(command.orderId, command.customerId);
for (let i = 0; i < command.items.length; i++) {
const product = products[i]!;
order.addItem(
command.items[i].productId,
command.items[i].quantity,
product.price
);
}
order.confirm();
// 3. Persisti gli eventi generati dall'Aggregate
await this.orderRepo.save(order);
// 4. Pubblica gli eventi per aggiornare il read model e notificare altri servizi
const events = order.getUncommittedEvents();
await this.eventBus.publishAll(events);
}
}
// ---- COMMAND BUS ----
// Dispatcher che instrada i command all'handler corretto
class CommandBus {
private handlers = new Map<string, (cmd: unknown) => Promise<void>>();
register<T>(commandType: string, handler: (cmd: T) => Promise<void>): void {
this.handlers.set(commandType, handler as (cmd: unknown) => Promise<void>);
}
async dispatch<T>(commandType: string, command: T): Promise<void> {
const handler = this.handlers.get(commandType);
if (!handler) {
throw new Error(`No handler registered for command: ${commandType}`);
}
await handler(command);
}
}
// Setup
const commandBus = new CommandBus();
commandBus.register('PlaceOrder', (cmd: PlaceOrderCommand) =>
placeOrderHandler.handle(cmd)
);
Partea de interogare: citiți modelul și proiecțiile
Partea de interogare menține a Citiți modelul: o reprezentare a datelor optimizat pentru interogări specifice aplicației. În timp ce partea de comandă funcționează cu Agregat, partea de interogare funcționează cu vederi denormalizate construite prin Proiecții.
// ---- READ MODEL ----
// Viste denormalizzate ottimizzate per le query dell'UI
// View per la lista ordini: include dati del cliente + totale + stato
interface OrderListItemReadModel {
orderId: string;
customerName: string; // denormalizzato (non serve join)
customerEmail: string;
totalAmount: number;
currency: string;
status: string;
itemCount: number;
placedAt: string;
}
// View per il dettaglio ordine
interface OrderDetailReadModel {
orderId: string;
customerId: string;
customerName: string;
items: Array<{
productId: string;
productName: string; // denormalizzato
quantity: number;
unitPrice: number;
subtotal: number;
}>;
totalAmount: number;
shippingAddress: Address;
status: string;
statusHistory: Array<{ status: string; changedAt: string }>;
estimatedDelivery?: string;
}
// ---- PROIEZIONE ----
// Aggiorna il read model in risposta agli eventi del command side
class OrderReadModelProjection {
constructor(
private readonly readModelDb: ReadModelDatabase,
private readonly customerRepo: CustomerReadRepository
) {}
// Quando un ordine viene confermato, crea/aggiorna le view nel read model
async onOrderConfirmed(event: OrderConfirmedEvent): Promise<void> {
// Carica i dati aggiuntivi necessari per la view denormalizzata
const customer = await this.customerRepo.findById(event.customerId);
const orderState = await this.orderRepo.findById(event.orderId);
// Inserisce/aggiorna la view della lista ordini
await this.readModelDb.upsert('order_list_view', {
order_id: event.orderId,
customer_name: customer.fullName,
customer_email: customer.email,
total_amount: orderState.totalAmount,
currency: orderState.currency,
status: 'Confirmed',
item_count: orderState.items.size,
placed_at: orderState.createdAt,
});
// Inserisce la view di dettaglio
const itemsWithNames = await this.enrichItemsWithProductNames(orderState.items);
await this.readModelDb.upsert('order_detail_view', {
order_id: event.orderId,
// ... tutti i campi della view dettaglio
items: JSON.stringify(itemsWithNames),
status_history: JSON.stringify([
{ status: 'Confirmed', changedAt: event.occurredAt }
]),
});
}
async onOrderCancelled(event: OrderCancelledEvent): Promise<void> {
// Aggiorna solo lo status nella view
await this.readModelDb.update('order_list_view',
{ order_id: event.orderId },
{ status: 'Cancelled' }
);
// Aggiungi alla status history nella view dettaglio
await this.readModelDb.appendToJsonArray('order_detail_view',
{ order_id: event.orderId },
'status_history',
{ status: 'Cancelled', changedAt: event.occurredAt }
);
}
}
// ---- QUERY HANDLER ----
class OrderQueryHandler {
constructor(private readonly readModelDb: ReadModelDatabase) {}
// Query velocissima sul read model denormalizzato
async getOrderList(customerId: string, page: number, pageSize: number):
Promise<OrderListItemReadModel[]>
{
return this.readModelDb.query(
`SELECT * FROM order_list_view
WHERE customer_id = $1
ORDER BY placed_at DESC
LIMIT $2 OFFSET $3`,
[customerId, pageSize, page * pageSize]
);
}
async getOrderDetail(orderId: string): Promise<OrderDetailReadModel | null> {
return this.readModelDb.queryOne(
'SELECT * FROM order_detail_view WHERE order_id = $1',
[orderId]
);
}
}
CQRS fără aprovizionare cu evenimente
CQRS și Event Sourcing sunt ortogonale: funcționează bine împreună, dar pot fi folosite separat. Modelul CQRS simplificat fără Event Sourcing utilizează baza de date bază de date principală pentru scrieri și o bază de date separată (sau vizualizări materializate) pentru citiri:
// CQRS semplificato: stesso database, modelli separati
// Write side usa le entita ORM normali
// Read side usa query SQL ottimizzate o viste materializzate
// View materializzata PostgreSQL per la lista ordini
CREATE MATERIALIZED VIEW order_list_view AS
SELECT
o.id AS order_id,
c.full_name AS customer_name,
c.email AS customer_email,
o.total_amount,
o.currency,
o.status,
COUNT(oi.id) AS item_count,
o.created_at AS placed_at
FROM orders o
JOIN customers c ON c.id = o.customer_id
LEFT JOIN order_items oi ON oi.order_id = o.id
GROUP BY o.id, c.full_name, c.email;
-- Refresh automatico della vista materializzata
CREATE INDEX idx_order_list_customer ON order_list_view (customer_email);
-- Con PostgreSQL 17: INCREMENTAL REFRESH (solo le righe cambiate)
-- REFRESH MATERIALIZED VIEW CONCURRENTLY order_list_view;
-- Query sul read model: 100x piu veloce di una query con JOIN
SELECT * FROM order_list_view
WHERE customer_email = 'mario@example.com'
ORDER BY placed_at DESC
LIMIT 20;
Scalare independentă Citire și scriere
Cu CQRS, puteți scala partea de citire și partea de scriere independent. Dacă sistemul are de 100 de ori mai multe citiri decât scrieri (tipic pentru un comerț electronic), puteți avea:
- Partea de scris: 2 instanțe cu baza de date primară PostgreSQL
- Partea de citit: 10 instanțe cu cache Redis + replicare PostgreSQL numai pentru citire
// Architettura di scaling con Kubernetes
# command-side-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-command-service
spec:
replicas: 2 # write side: pochi ma consistenti
template:
spec:
containers:
- name: api
image: company/order-service:v1
env:
- name: DB_URL
value: "postgres://primary-db:5432/orders" # database primario
- name: SERVICE_MODE
value: "command"
---
# query-side-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-query-service
spec:
replicas: 10 # read side: molte repliche, stateless
template:
spec:
containers:
- name: api
image: company/order-service:v1
env:
- name: DB_URL
value: "postgres://read-replica:5432/orders" # replica read-only
- name: REDIS_URL
value: "redis://cache:6379"
- name: SERVICE_MODE
value: "query"
Gestionați coerența eventuală în front-end
În CQRS cu sincronizare asincronă, există o perioadă scurtă (de obicei milisecunde) unde modelul citit nu este încă actualizat după o scriere. Frontend-ul trebuie să se ocupe asta corect:
// Pattern 1: Optimistic Update
// Il frontend aggiorna l'UI immediatamente, senza aspettare la query
async function placeOrder(orderData: PlaceOrderDto) {
// 1. Ottimisticamente aggiorna l'UI locale
dispatch({ type: 'ADD_ORDER_OPTIMISTIC', order: { ...orderData, status: 'Pending' } });
try {
// 2. Invia il command al backend
const response = await api.placeOrder(orderData);
// 3. Dopo N ms, ricarica dal read model (che dovrebbe essere aggiornato)
setTimeout(async () => {
const updatedOrder = await api.getOrder(response.orderId);
dispatch({ type: 'UPDATE_ORDER', order: updatedOrder });
}, 500);
} catch (error) {
// 4. In caso di errore, reverta l'ottimistic update
dispatch({ type: 'REVERT_ORDER_OPTIMISTIC' });
throw error;
}
}
// Pattern 2: Poll finche il read model non e aggiornato
async function pollUntilUpdated(orderId: string, expectedStatus: string) {
const MAX_ATTEMPTS = 10;
const DELAY_MS = 200;
for (let i = 0; i < MAX_ATTEMPTS; i++) {
const order = await api.getOrder(orderId);
if (order.status === expectedStatus) return order;
await new Promise(resolve => setTimeout(resolve, DELAY_MS));
}
throw new Error('Read model not updated within expected time');
}
Schimbul CQRS
Beneficii
- Scalare independentă: Citiți lateral și scrieți scara laterală separat, în funcție de sarcina reală
- Șabloane optimizate: Modelul citit poate fi denormalizat exact pentru vizualizare, eliminând interogările complexe
- Performanță ridicată la citire: Interogările pe modelul citit sunt simple SELECT-uri pe tabele precalculate
- Separarea responsabilitatilor: Scrierea și citirea codului nu se contaminează reciproc
Complex de Gestionat
- Consistență posibilă: Modelul citit poate rămâne ușor în urma scrierilor. The frontend has to handle it
- Dublare logica: Este posibil ca unele validări să fie necesare atât în gestionarea comenzilor, cât și în modelul de citire
- Mai multe componente de implementat: Serviciu de comandă, serviciu de interogare, proiecții, citire baze de date model
- Nu este potrivit pentru CRUD simplu: Complexitatea CQRS nu merită pentru operațiuni fără o logică complexă de afaceri
Concluzii și pașii următori
CQRS este una dintre cele mai eficiente arhitecturi pentru sisteme cu încărcări de citire și scriere foarte diferit. Separarea explicită a părții de comandă (consecventă, validată, bazată pe evenimente) din partea interogării (rapid, denormalizat, scalabil) rezolvă compromisurile modelului unic.
Următorul articol combină Event Sourcing și CQRS împreună: vom vedea cum sunt proiecțiile au citit din magazinul de evenimente pentru a construi modelul de citire, cum să gestioneze proiecțiile cu reîncercare în caz de eroare și cum să utilizați instantaneele pentru a optimiza reconstrucția modelului citit dupa un esec.
Articole viitoare din seria Event-Driven Architecture
Serii înrudite
- Aprovizionare pentru evenimente — complementul natural al CQRS
- Arhitectură software practică — unde se plasează CQRS în contextul arhitecturii generale







