CQRS: Oddzielny odczyt i zapis dla niezależnego skalowania
Klasyczny interfejs API REST wykorzystuje ten sam model do odczytu i zapisu danych. Ale potrzeby czytanie i pisanie są często bardzo różne: zapisy muszą gwarantować spójność i weryfikować złożone reguły biznesowe; odczyty muszą być szybkie, skalowalne i zwracane dane w formatach zoptymalizowanych pod kątem interfejsu użytkownika. Spróbuj zaspokoić nim obydwie potrzeby ten sam model prowadzi do kompromisów: złożonych zapytań o widoki lub „rozdętych” modeli wspierać każdy rodzaj czytania.
CQRS (oddzielenie odpowiedzialności za zapytania dotyczące poleceń) wyraźnie oddziela model pisania (strona dowodzenia) z modelu odczytu (strona zapytania). Strona poleceń zarządza operacjami zmieniającymi stan (utwórz, zaktualizuj, usuń); strona zapytania obsługuje operacje odczytu, zazwyczaj ze zoptymalizowanymi modelami danych dla konkretnych widoków aplikacji. Synchronizacja między nimi następuje poprzez zdarzenia lub projekcje asynchroniczne.
Czego się nauczysz
- Architektura CQRS: strona poleceń, strona zapytań, synchronizacja asynchroniczna
- Zaimplementuj procedurę obsługi poleceń w TypeScript z walidacją
- Projekcje: buduj modele odczytu zoptymalizowane pod kątem widoków
- Asynchroniczna synchronizacja poprzez zdarzenia (Event Sourcing + CQRS)
- CQRS bez pozyskiwania zdarzeń: uproszczony model hybrydowy
- Niezależne skalowanie odczytu i zapisu
- Kompromis: możliwa spójność i złożoność operacyjna
Architektura CQRS
W CQRS każda operacja to lub a Rozkaz (zmiana statusu) lub jeden Zapytania (odczytuje stan bez modyfikowania go). Nigdy oba jednocześnie metoda (zasada CQS Bertranda Meyera):
// WRONG: un metodo che fa sia command che query
async function reserveInventory(productId: string, quantity: number): Promise<Stock> {
const stock = await db.getStock(productId);
stock.reserved += quantity; // MODIFICA
await db.saveStock(stock);
return stock; // LETTURA
}
// RIGHT CQRS: separa command e query
async function reserveInventory(productId: string, quantity: number): Promise<void> {
// Command: solo modifica, nessun ritorno di stato
const stock = await stockRepo.findById(productId);
stock.reserve(quantity);
await stockRepo.save(stock);
// Pubblica evento per aggiornare il read model
await eventBus.publish(new InventoryReservedEvent(productId, quantity));
}
async function getStockLevel(productId: string): Promise<StockLevelDto> {
// Query: legge dal read model ottimizzato, ZERO side effects
return await stockReadModel.findById(productId);
}
Strona dowodzenia: zarządzanie dowodzeniami
Strona poleceń otrzymuje polecenie (niezmienne obiekty opisujące intencję), li valid, wykonuje logikę biznesową na Agregacie i publikuje zdarzenia. Wzór Osoba obsługująca polecenia oraz komponent koordynujący ten przepływ:
// ---- COMMANDS ----
// Ogni command e un DTO immutabile
class PlaceOrderCommand {
constructor(
public readonly orderId: string,
public readonly customerId: string,
public readonly items: ReadonlyArray<{
productId: string;
quantity: number;
}>,
public readonly shippingAddress: Readonly<Address>
) {}
}
// ---- COMMAND HANDLER ----
class PlaceOrderCommandHandler {
constructor(
private readonly orderRepo: OrderRepository,
private readonly productRepo: ProductRepository,
private readonly eventBus: EventBus
) {}
async handle(command: PlaceOrderCommand): Promise<void> {
// 1. Validazione: prodotti esistono?
const products = await Promise.all(
command.items.map((item) => this.productRepo.findById(item.productId))
);
if (products.some((p) => p === null)) {
throw new Error('One or more products not found');
}
// 2. Crea l'Aggregate (logica business)
const order = new OrderAggregate();
order.create(command.orderId, command.customerId);
for (let i = 0; i < command.items.length; i++) {
const product = products[i]!;
order.addItem(
command.items[i].productId,
command.items[i].quantity,
product.price
);
}
order.confirm();
// 3. Persisti gli eventi generati dall'Aggregate
await this.orderRepo.save(order);
// 4. Pubblica gli eventi per aggiornare il read model e notificare altri servizi
const events = order.getUncommittedEvents();
await this.eventBus.publishAll(events);
}
}
// ---- COMMAND BUS ----
// Dispatcher che instrada i command all'handler corretto
class CommandBus {
private handlers = new Map<string, (cmd: unknown) => Promise<void>>();
register<T>(commandType: string, handler: (cmd: T) => Promise<void>): void {
this.handlers.set(commandType, handler as (cmd: unknown) => Promise<void>);
}
async dispatch<T>(commandType: string, command: T): Promise<void> {
const handler = this.handlers.get(commandType);
if (!handler) {
throw new Error(`No handler registered for command: ${commandType}`);
}
await handler(command);
}
}
// Setup
const commandBus = new CommandBus();
commandBus.register('PlaceOrder', (cmd: PlaceOrderCommand) =>
placeOrderHandler.handle(cmd)
);
Strona zapytania: przeczytaj model i rzuty
Strona zapytania utrzymuje a Przeczytaj model: reprezentacja danych zoptymalizowane pod kątem konkretnych zapytań aplikacji. Podczas gdy strona poleceń działa z Agregat, strona zapytania działa ze zdenormalizowanymi widokami skonstruowanymi za pomocą Projekcje.
// ---- READ MODEL ----
// Viste denormalizzate ottimizzate per le query dell'UI
// View per la lista ordini: include dati del cliente + totale + stato
interface OrderListItemReadModel {
orderId: string;
customerName: string; // denormalizzato (non serve join)
customerEmail: string;
totalAmount: number;
currency: string;
status: string;
itemCount: number;
placedAt: string;
}
// View per il dettaglio ordine
interface OrderDetailReadModel {
orderId: string;
customerId: string;
customerName: string;
items: Array<{
productId: string;
productName: string; // denormalizzato
quantity: number;
unitPrice: number;
subtotal: number;
}>;
totalAmount: number;
shippingAddress: Address;
status: string;
statusHistory: Array<{ status: string; changedAt: string }>;
estimatedDelivery?: string;
}
// ---- PROIEZIONE ----
// Aggiorna il read model in risposta agli eventi del command side
class OrderReadModelProjection {
constructor(
private readonly readModelDb: ReadModelDatabase,
private readonly customerRepo: CustomerReadRepository
) {}
// Quando un ordine viene confermato, crea/aggiorna le view nel read model
async onOrderConfirmed(event: OrderConfirmedEvent): Promise<void> {
// Carica i dati aggiuntivi necessari per la view denormalizzata
const customer = await this.customerRepo.findById(event.customerId);
const orderState = await this.orderRepo.findById(event.orderId);
// Inserisce/aggiorna la view della lista ordini
await this.readModelDb.upsert('order_list_view', {
order_id: event.orderId,
customer_name: customer.fullName,
customer_email: customer.email,
total_amount: orderState.totalAmount,
currency: orderState.currency,
status: 'Confirmed',
item_count: orderState.items.size,
placed_at: orderState.createdAt,
});
// Inserisce la view di dettaglio
const itemsWithNames = await this.enrichItemsWithProductNames(orderState.items);
await this.readModelDb.upsert('order_detail_view', {
order_id: event.orderId,
// ... tutti i campi della view dettaglio
items: JSON.stringify(itemsWithNames),
status_history: JSON.stringify([
{ status: 'Confirmed', changedAt: event.occurredAt }
]),
});
}
async onOrderCancelled(event: OrderCancelledEvent): Promise<void> {
// Aggiorna solo lo status nella view
await this.readModelDb.update('order_list_view',
{ order_id: event.orderId },
{ status: 'Cancelled' }
);
// Aggiungi alla status history nella view dettaglio
await this.readModelDb.appendToJsonArray('order_detail_view',
{ order_id: event.orderId },
'status_history',
{ status: 'Cancelled', changedAt: event.occurredAt }
);
}
}
// ---- QUERY HANDLER ----
class OrderQueryHandler {
constructor(private readonly readModelDb: ReadModelDatabase) {}
// Query velocissima sul read model denormalizzato
async getOrderList(customerId: string, page: number, pageSize: number):
Promise<OrderListItemReadModel[]>
{
return this.readModelDb.query(
`SELECT * FROM order_list_view
WHERE customer_id = $1
ORDER BY placed_at DESC
LIMIT $2 OFFSET $3`,
[customerId, pageSize, page * pageSize]
);
}
async getOrderDetail(orderId: string): Promise<OrderDetailReadModel | null> {
return this.readModelDb.queryOne(
'SELECT * FROM order_detail_view WHERE order_id = $1',
[orderId]
);
}
}
CQRS bez pozyskiwania zdarzeń
CQRS i Event Sourcing są ortogonalne: dobrze ze sobą współpracują, ale można ich używać osobno. Uproszczony model CQRS bez pozyskiwania zdarzeń korzysta z bazy danych główna baza danych do zapisów i osobna baza danych (lub zmaterializowane widoki) do odczytów:
// CQRS semplificato: stesso database, modelli separati
// Write side usa le entita ORM normali
// Read side usa query SQL ottimizzate o viste materializzate
// View materializzata PostgreSQL per la lista ordini
CREATE MATERIALIZED VIEW order_list_view AS
SELECT
o.id AS order_id,
c.full_name AS customer_name,
c.email AS customer_email,
o.total_amount,
o.currency,
o.status,
COUNT(oi.id) AS item_count,
o.created_at AS placed_at
FROM orders o
JOIN customers c ON c.id = o.customer_id
LEFT JOIN order_items oi ON oi.order_id = o.id
GROUP BY o.id, c.full_name, c.email;
-- Refresh automatico della vista materializzata
CREATE INDEX idx_order_list_customer ON order_list_view (customer_email);
-- Con PostgreSQL 17: INCREMENTAL REFRESH (solo le righe cambiate)
-- REFRESH MATERIALIZED VIEW CONCURRENTLY order_list_view;
-- Query sul read model: 100x piu veloce di una query con JOIN
SELECT * FROM order_list_view
WHERE customer_email = 'mario@example.com'
ORDER BY placed_at DESC
LIMIT 20;
Niezależne skalowanie Odczyt i zapis
Dzięki CQRS możesz niezależnie skalować stronę odczytu i stronę zapisu. Jeśli systemu ma 100x więcej odczytów niż zapisów (typowe dla e-commerce), możesz mieć:
- Napisz stronę: 2 instancje z podstawową bazą danych PostgreSQL
- Przeczytaj stronę: 10 instancji z pamięcią podręczną Redis + replikacją PostgreSQL tylko do odczytu
// Architettura di scaling con Kubernetes
# command-side-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-command-service
spec:
replicas: 2 # write side: pochi ma consistenti
template:
spec:
containers:
- name: api
image: company/order-service:v1
env:
- name: DB_URL
value: "postgres://primary-db:5432/orders" # database primario
- name: SERVICE_MODE
value: "command"
---
# query-side-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-query-service
spec:
replicas: 10 # read side: molte repliche, stateless
template:
spec:
containers:
- name: api
image: company/order-service:v1
env:
- name: DB_URL
value: "postgres://read-replica:5432/orders" # replica read-only
- name: REDIS_URL
value: "redis://cache:6379"
- name: SERVICE_MODE
value: "query"
Zarządzaj ostateczną spójnością w interfejsie użytkownika
W CQRS z synchronizacją asynchroniczną występuje krótki okres (zazwyczaj milisekundy) gdzie model odczytu nie jest jeszcze zaktualizowany po zapisie. Frontend musi sobie poradzić to poprawnie:
// Pattern 1: Optimistic Update
// Il frontend aggiorna l'UI immediatamente, senza aspettare la query
async function placeOrder(orderData: PlaceOrderDto) {
// 1. Ottimisticamente aggiorna l'UI locale
dispatch({ type: 'ADD_ORDER_OPTIMISTIC', order: { ...orderData, status: 'Pending' } });
try {
// 2. Invia il command al backend
const response = await api.placeOrder(orderData);
// 3. Dopo N ms, ricarica dal read model (che dovrebbe essere aggiornato)
setTimeout(async () => {
const updatedOrder = await api.getOrder(response.orderId);
dispatch({ type: 'UPDATE_ORDER', order: updatedOrder });
}, 500);
} catch (error) {
// 4. In caso di errore, reverta l'ottimistic update
dispatch({ type: 'REVERT_ORDER_OPTIMISTIC' });
throw error;
}
}
// Pattern 2: Poll finche il read model non e aggiornato
async function pollUntilUpdated(orderId: string, expectedStatus: string) {
const MAX_ATTEMPTS = 10;
const DELAY_MS = 200;
for (let i = 0; i < MAX_ATTEMPTS; i++) {
const order = await api.getOrder(orderId);
if (order.status === expectedStatus) return order;
await new Promise(resolve => setTimeout(resolve, DELAY_MS));
}
throw new Error('Read model not updated within expected time');
}
Kompromis CQRS
Korzyści
- Niezależne skalowanie: Odczytaj stronę i zapisz skalę boczną oddzielnie w oparciu o rzeczywiste obciążenie
- Zoptymalizowane szablony: Model odczytu można zdenormalizować dokładnie dla widoku, eliminując złożone zapytania
- Wysoka wydajność odczytu: Zapytania w modelu odczytu to proste SELECTy na wstępnie obliczonych tabelach
- Podział obowiązków: Zapis i odczyt kodu nie zanieczyszczają się nawzajem
Złożone w zarządzaniu
- Możliwa konsystencja: Model odczytu może nieznacznie pozostawać w tyle za zapisem. Frontend musi sobie z tym poradzić
- Powielanie logiki: Niektóre walidacje mogą wymagać przeprowadzenia zarówno procedury obsługi poleceń, jak i modelu odczytu
- Więcej komponentów do wdrożenia: Usługa dowodzenia, obsługa zapytań, projekcje, baza danych modeli odczytu
- Nie nadaje się do prostego CRUD: Złożoność CQRS nie jest tego warta w przypadku operacji bez złożonej logiki biznesowej
Wnioski i dalsze kroki
CQRS to jedna z najskuteczniejszych architektur dla systemów z obciążeniem odczytu i zapisu bardzo różne. Wyraźne oddzielenie strony poleceń (spójne, sprawdzone, sterowane zdarzeniami) od strony zapytania (szybki, zdenormalizowany, skalowalny) rozwiązuje kompromisy pojedynczego modelu.
Następny artykuł łączy w sobie Event Sourcing i CQRS: zobaczymy, jak wypadną prognozy czytają ze składnicy zdarzeń, aby zbudować model odczytu, jak zarządzać projekcjami przy ponawianiu prób w przypadku błędu oraz jak wykorzystać migawki do optymalizacji przebudowy odczytanego modelu po porażce.
Nadchodzące artykuły z serii Architektura sterowana zdarzeniami
Powiązane serie
- Pozyskiwanie zdarzeń — naturalne uzupełnienie CQRS
- Praktyczna architektura oprogramowania — gdzie umieścić CQRS w kontekście ogólnej architektury







