CQRS: Samostatné čtení a zápis pro nezávislé škálování
Klasické REST API používá stejný model pro čtení a zápis dat. Ale potřeby čtení a psaní se často velmi liší: zápisy musí zaručovat konzistenci a ověřovat složitá obchodní pravidla; čtení musí být rychlé, škálovatelné a musí se vracet data ve formátech optimalizovaných pro uživatelské rozhraní. Snažte se s ním uspokojit obě potřeby stejný model vede ke kompromisům: složité dotazy na zobrazení nebo „nabubřelé“ modely podporovat jakýkoli typ čtení.
CQRS (oddělení odpovědnosti za příkazový dotaz) explicitně odděluje psací model (velitelská strana) z modelu čtení (strana dotazu). Příkazová strana spravuje operace, které mění stav (vytváření, aktualizace, mazání); strana dotazu zpracovává operace čtení, obvykle s optimalizovanými datovými modely pro konkrétní pohledy aplikací. Synchronizace mezi těmito dvěma probíhá přes asynchronní události nebo projekce.
Co se naučíte
- Architektura CQRS: strana příkazů, strana dotazů, asynchronní synchronizace
- Implementujte obslužnou rutinu příkazů v TypeScript s ověřením
- Projekce: sestavte modely čtení optimalizované pro pohledy
- Asynchronní synchronizace prostřednictvím událostí (Sourcing událostí + CQRS)
- CQRS bez Event Sourcing: Zjednodušený hybridní model
- Nezávislé škálování čtení a zápisu
- Kompromis: možná konzistence a provozní složitost
Architektura CQRS
V CQRS je každá operace nebo a Příkaz (změnit stav) nebo jeden Dotazy (přečte stav bez jeho úpravy). Nikdy ne obojí zároveň metoda (princip CQS Bertranda Meyera):
// WRONG: un metodo che fa sia command che query
async function reserveInventory(productId: string, quantity: number): Promise<Stock> {
const stock = await db.getStock(productId);
stock.reserved += quantity; // MODIFICA
await db.saveStock(stock);
return stock; // LETTURA
}
// RIGHT CQRS: separa command e query
async function reserveInventory(productId: string, quantity: number): Promise<void> {
// Command: solo modifica, nessun ritorno di stato
const stock = await stockRepo.findById(productId);
stock.reserve(quantity);
await stockRepo.save(stock);
// Pubblica evento per aggiornare il read model
await eventBus.publish(new InventoryReservedEvent(productId, quantity));
}
async function getStockLevel(productId: string): Promise<StockLevelDto> {
// Query: legge dal read model ottimizzato, ZERO side effects
return await stockReadModel.findById(productId);
}
Příkazová strana: Správa příkazů
Příkazová strana přijímá Příkaz (neměnné objekty, které popisují záměr), li platný, provádí obchodní logiku na Agregátu a publikuje události. Vzor Obsluha příkazů a komponenta, která koordinuje tento tok:
// ---- COMMANDS ----
// Ogni command e un DTO immutabile
class PlaceOrderCommand {
constructor(
public readonly orderId: string,
public readonly customerId: string,
public readonly items: ReadonlyArray<{
productId: string;
quantity: number;
}>,
public readonly shippingAddress: Readonly<Address>
) {}
}
// ---- COMMAND HANDLER ----
class PlaceOrderCommandHandler {
constructor(
private readonly orderRepo: OrderRepository,
private readonly productRepo: ProductRepository,
private readonly eventBus: EventBus
) {}
async handle(command: PlaceOrderCommand): Promise<void> {
// 1. Validazione: prodotti esistono?
const products = await Promise.all(
command.items.map((item) => this.productRepo.findById(item.productId))
);
if (products.some((p) => p === null)) {
throw new Error('One or more products not found');
}
// 2. Crea l'Aggregate (logica business)
const order = new OrderAggregate();
order.create(command.orderId, command.customerId);
for (let i = 0; i < command.items.length; i++) {
const product = products[i]!;
order.addItem(
command.items[i].productId,
command.items[i].quantity,
product.price
);
}
order.confirm();
// 3. Persisti gli eventi generati dall'Aggregate
await this.orderRepo.save(order);
// 4. Pubblica gli eventi per aggiornare il read model e notificare altri servizi
const events = order.getUncommittedEvents();
await this.eventBus.publishAll(events);
}
}
// ---- COMMAND BUS ----
// Dispatcher che instrada i command all'handler corretto
class CommandBus {
private handlers = new Map<string, (cmd: unknown) => Promise<void>>();
register<T>(commandType: string, handler: (cmd: T) => Promise<void>): void {
this.handlers.set(commandType, handler as (cmd: unknown) => Promise<void>);
}
async dispatch<T>(commandType: string, command: T): Promise<void> {
const handler = this.handlers.get(commandType);
if (!handler) {
throw new Error(`No handler registered for command: ${commandType}`);
}
await handler(command);
}
}
// Setup
const commandBus = new CommandBus();
commandBus.register('PlaceOrder', (cmd: PlaceOrderCommand) =>
placeOrderHandler.handle(cmd)
);
Strana dotazu: Přečtěte si model a projekce
Strana dotazu udržuje a Přečtěte si Model: reprezentace dat optimalizované pro specifické aplikační dotazy. Zatímco příkazová strana pracuje s Aggregate, strana dotazu pracuje s denormalizovanými pohledy vytvořenými pomocí Projekce.
// ---- READ MODEL ----
// Viste denormalizzate ottimizzate per le query dell'UI
// View per la lista ordini: include dati del cliente + totale + stato
interface OrderListItemReadModel {
orderId: string;
customerName: string; // denormalizzato (non serve join)
customerEmail: string;
totalAmount: number;
currency: string;
status: string;
itemCount: number;
placedAt: string;
}
// View per il dettaglio ordine
interface OrderDetailReadModel {
orderId: string;
customerId: string;
customerName: string;
items: Array<{
productId: string;
productName: string; // denormalizzato
quantity: number;
unitPrice: number;
subtotal: number;
}>;
totalAmount: number;
shippingAddress: Address;
status: string;
statusHistory: Array<{ status: string; changedAt: string }>;
estimatedDelivery?: string;
}
// ---- PROIEZIONE ----
// Aggiorna il read model in risposta agli eventi del command side
class OrderReadModelProjection {
constructor(
private readonly readModelDb: ReadModelDatabase,
private readonly customerRepo: CustomerReadRepository
) {}
// Quando un ordine viene confermato, crea/aggiorna le view nel read model
async onOrderConfirmed(event: OrderConfirmedEvent): Promise<void> {
// Carica i dati aggiuntivi necessari per la view denormalizzata
const customer = await this.customerRepo.findById(event.customerId);
const orderState = await this.orderRepo.findById(event.orderId);
// Inserisce/aggiorna la view della lista ordini
await this.readModelDb.upsert('order_list_view', {
order_id: event.orderId,
customer_name: customer.fullName,
customer_email: customer.email,
total_amount: orderState.totalAmount,
currency: orderState.currency,
status: 'Confirmed',
item_count: orderState.items.size,
placed_at: orderState.createdAt,
});
// Inserisce la view di dettaglio
const itemsWithNames = await this.enrichItemsWithProductNames(orderState.items);
await this.readModelDb.upsert('order_detail_view', {
order_id: event.orderId,
// ... tutti i campi della view dettaglio
items: JSON.stringify(itemsWithNames),
status_history: JSON.stringify([
{ status: 'Confirmed', changedAt: event.occurredAt }
]),
});
}
async onOrderCancelled(event: OrderCancelledEvent): Promise<void> {
// Aggiorna solo lo status nella view
await this.readModelDb.update('order_list_view',
{ order_id: event.orderId },
{ status: 'Cancelled' }
);
// Aggiungi alla status history nella view dettaglio
await this.readModelDb.appendToJsonArray('order_detail_view',
{ order_id: event.orderId },
'status_history',
{ status: 'Cancelled', changedAt: event.occurredAt }
);
}
}
// ---- QUERY HANDLER ----
class OrderQueryHandler {
constructor(private readonly readModelDb: ReadModelDatabase) {}
// Query velocissima sul read model denormalizzato
async getOrderList(customerId: string, page: number, pageSize: number):
Promise<OrderListItemReadModel[]>
{
return this.readModelDb.query(
`SELECT * FROM order_list_view
WHERE customer_id = $1
ORDER BY placed_at DESC
LIMIT $2 OFFSET $3`,
[customerId, pageSize, page * pageSize]
);
}
async getOrderDetail(orderId: string): Promise<OrderDetailReadModel | null> {
return this.readModelDb.queryOne(
'SELECT * FROM order_detail_view WHERE order_id = $1',
[orderId]
);
}
}
CQRS bez Event Sourcingu
CQRS a Event Sourcing jsou ortogonální: dobře spolupracují, ale lze je použít samostatně. Zjednodušený model CQRS bez Event Sourcing využívá databázi hlavní databáze pro zápisy a samostatná databáze (nebo materializované pohledy) pro čtení:
// CQRS semplificato: stesso database, modelli separati
// Write side usa le entita ORM normali
// Read side usa query SQL ottimizzate o viste materializzate
// View materializzata PostgreSQL per la lista ordini
CREATE MATERIALIZED VIEW order_list_view AS
SELECT
o.id AS order_id,
c.full_name AS customer_name,
c.email AS customer_email,
o.total_amount,
o.currency,
o.status,
COUNT(oi.id) AS item_count,
o.created_at AS placed_at
FROM orders o
JOIN customers c ON c.id = o.customer_id
LEFT JOIN order_items oi ON oi.order_id = o.id
GROUP BY o.id, c.full_name, c.email;
-- Refresh automatico della vista materializzata
CREATE INDEX idx_order_list_customer ON order_list_view (customer_email);
-- Con PostgreSQL 17: INCREMENTAL REFRESH (solo le righe cambiate)
-- REFRESH MATERIALIZED VIEW CONCURRENTLY order_list_view;
-- Query sul read model: 100x piu veloce di una query con JOIN
SELECT * FROM order_list_view
WHERE customer_email = 'mario@example.com'
ORDER BY placed_at DESC
LIMIT 20;
Nezávislé škálování Čtení a zápis
S CQRS můžete nezávisle škálovat stranu pro čtení a zápis. Pokud systém má 100x více čtení než zápisů (typické pro elektronický obchod), můžete mít:
- Napište stranu: 2 instance s primární databází PostgreSQL
- Strana čtení: 10 instancí s mezipamětí Redis + replikace PostgreSQL pouze pro čtení
// Architettura di scaling con Kubernetes
# command-side-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-command-service
spec:
replicas: 2 # write side: pochi ma consistenti
template:
spec:
containers:
- name: api
image: company/order-service:v1
env:
- name: DB_URL
value: "postgres://primary-db:5432/orders" # database primario
- name: SERVICE_MODE
value: "command"
---
# query-side-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-query-service
spec:
replicas: 10 # read side: molte repliche, stateless
template:
spec:
containers:
- name: api
image: company/order-service:v1
env:
- name: DB_URL
value: "postgres://read-replica:5432/orders" # replica read-only
- name: REDIS_URL
value: "redis://cache:6379"
- name: SERVICE_MODE
value: "query"
Správa případné konzistence ve frontendu
V CQRS s asynchronní synchronizací je krátká perioda (obvykle milisekundy) kde čtený model ještě není aktualizován po zápisu. Frontend to musí zvládnout toto správně:
// Pattern 1: Optimistic Update
// Il frontend aggiorna l'UI immediatamente, senza aspettare la query
async function placeOrder(orderData: PlaceOrderDto) {
// 1. Ottimisticamente aggiorna l'UI locale
dispatch({ type: 'ADD_ORDER_OPTIMISTIC', order: { ...orderData, status: 'Pending' } });
try {
// 2. Invia il command al backend
const response = await api.placeOrder(orderData);
// 3. Dopo N ms, ricarica dal read model (che dovrebbe essere aggiornato)
setTimeout(async () => {
const updatedOrder = await api.getOrder(response.orderId);
dispatch({ type: 'UPDATE_ORDER', order: updatedOrder });
}, 500);
} catch (error) {
// 4. In caso di errore, reverta l'ottimistic update
dispatch({ type: 'REVERT_ORDER_OPTIMISTIC' });
throw error;
}
}
// Pattern 2: Poll finche il read model non e aggiornato
async function pollUntilUpdated(orderId: string, expectedStatus: string) {
const MAX_ATTEMPTS = 10;
const DELAY_MS = 200;
for (let i = 0; i < MAX_ATTEMPTS; i++) {
const order = await api.getOrder(orderId);
if (order.status === expectedStatus) return order;
await new Promise(resolve => setTimeout(resolve, DELAY_MS));
}
throw new Error('Read model not updated within expected time');
}
CQRS kompromis
Výhody
- Nezávislé škálování: Číst boční měřítko a zapisovat boční měřítko samostatně na základě skutečného zatížení
- Optimalizované šablony: Čtený model lze denormalizovat přesně pro pohled, čímž se eliminují složité dotazy
- Vysoký výkon při čtení: Dotazy na model čtení jsou jednoduché SELECTy na předem vypočítaných tabulkách
- Rozdělení odpovědnosti: Zápis a čtení kódu se navzájem nekontaminují
Komplexní pro správu
- Možná konzistence: Model čtení může mírně zaostávat za zápisem. Frontend to musí zvládnout
- Duplikace logiky: Některá ověření možná budou muset být v obslužném programu příkazů i v modelu čtení
- Další komponenty k nasazení: Příkazová služba, dotazovací služba, projekce, čtená modelová databáze
- Nevhodné pro jednoduché CRUD: Složitost CQRS se pro operace bez složité obchodní logiky nevyplatí
Závěry a další kroky
CQRS je jednou z nejúčinnějších architektur pro systémy se zátěží pro čtení a zápis velmi odlišné. Explicitní oddělení strany příkazu (konzistentní, ověřené, řízené událostmi) ze strany dotazu (rychlý, denormalizovaný, škálovatelný) řeší kompromisy jediného modelu.
Další článek kombinuje Event Sourcing a CQRS dohromady: uvidíme, jak projekce čtou z úložiště událostí, aby vytvořili model čtení, jak spravovat projekce s opakováním v případě chyby a jak používat snímky k optimalizaci přestavby čteného modelu po neúspěchu.
Připravované články ze série Event-Driven Architecture Series
Související série
- Sourcing událostí — přirozený doplněk CQRS
- Praktická softwarová architektura — kam umístit CQRS v kontextu celkové architektury







