Rurociąg telemetrii gier: analityka graczy w Scali
Współczesna gra wieloosobowa generuje każdego dnia miliardy zdarzeń: gracz porusza się, wykonuje atak, kup przedmiot, porzuć grę. Fortnite przekracza 350 milionów zarejestrowanych graczy i generuje dziesiątki terabajtów danych dziennie. League of Legends gromadzi ponad 100 zmiennych na każdy ukończony mecz. Wezwanie Duty rejestruje każdą wystrzeloną kulę, każdą śmierć i każdą klatkę, w której opóźnienie przekracza próg krytyczny.
Ta ogromna masa danych nie jest szumem tła: jest to układ nerwowy gry. I telemetria do ujawnienia dlaczego gracze porzucają grę po trzecim poziomie, która broń jest za silna w meta, który region serwerów cierpi na opóźnienia w godzinach szczytu oraz ile czasu potrzeba, zanim część użytkowników przestanie płacić. Bez solidnego rurociągu telemetrycznego jedziesz z zawiązanymi oczami z prędkością 200 km/h.
W tym artykule budujemy taki gra telemetryczna rurociągów przemysłowych od końca do końca: z kolekcji zdarzeń po stronie klienta i serwera, pozyskiwanie za pomocą Apache Kafka, przetwarzanie w czasie rzeczywistym za pomocą Apache Flink, aż do hurtowni danych do analizy historycznej i pulpitu nawigacyjnego do podejmowania decyzji LiveOps. Zobaczymy również jak wdrożyć segmentacja graczy e la przewidywanie rezygnacji bezpośrednio na rurociągu.
Czego się nauczysz
- Taksonomia zdarzeń w grze: zdarzenia graczy, zdarzenia związane z rozgrywką, zdarzenia ekonomiczne, zdarzenia systemowe
- Architektura potokowa: SDK klienta, magistrala zdarzeń, przetwarzanie strumieniowe, hurtownia danych
- Wdrożenie z Kafką, Flink i ClickHouse do analityki w czasie rzeczywistym
- Schemat wiadomości z Avro i rejestrem schematu do sprawdzenia
- Segmentacja graczy RFM w czasie rzeczywistym (od niedawna, częstotliwość, monetarnie).
- Przewidywanie rezygnacji dzięki inżynierii funkcji w potoku Flink
- Analiza ścieżek i wskaźniki utrzymania decyzji LiveOps
- Jakość danych, deduplikacja i obsługa późnych zdarzeń
1. Taksonomia wydarzeń w grze
Zanim zbudujesz rurociąg, musisz wiedzieć, co zbierasz. Wydarzenia w grze są podzielone na cztery główne kategorie, każda z inną częstotliwością, priorytetem i metodami gromadzenia.
| Kategoria | Przykłady | Częstotliwość | Priorytet | Zatrzymanie |
|---|---|---|---|---|
| Wydarzenia graczy | logowanie, wylogowanie, rozpoczęcie_sesji, poziom_w górę, osiągnięcie | Niska (1-10/h) | Wysoki | Na zawsze |
| Wydarzenia związane z rozgrywką | zabij, śmierć, początek_rozpoczęcia, wykorzystana_zdolność, wprowadź_strefę | Wysoka (100-1000/min) | Przeciętny | 90 dni |
| Wydarzenia Gospodarcze | zakup, przedmiot_grant, waluta_zarobek, sklep_otwarty | Niska (0-5/h) | Krytyka | Na zawsze |
| Zdarzenia systemowe | fps_drop, utrata_pakietu, ponowne połączenie, raport_awarii | Średni (10-100/min) | Wysoki | 30 dni |
| Wydarzenia towarzyskie | dodawanie_znajomego, dołączenie do imprezy, wysłanie_czatu, gracz_raportu | Niska (0-20/h) | Przeciętny | 180 dni |
Każde zdarzenie musi mieć wspólną strukturę (obwiednię) ze standardowymi polami: event_id,
event_type, player_id, session_id, server_timestamp,
client_timestampi ładunek specyficzny dla typu. Sztywny i podstawowy schemat dla
jakość danych w dalszej części.
// Schema base evento telemetria (TypeScript/protobuf-like)
interface TelemetryEvent {
// Envelope comune a tutti gli eventi
event_id: string; // UUID v4 per deduplicazione
event_type: string; // "player.level_up", "gameplay.kill", etc.
schema_version: string; // "1.2.0" per compatibilità
// Identita
player_id: string; // ID univoco giocatore
session_id: string; // ID sessione corrente
game_build: string; // "2024.12.1" versione client
// Timestamp dual (client + server)
client_ts: number; // Unix ms dal client (non fidarsi ciecamente)
server_ts: number; // Unix ms dal server (source of truth)
client_tz: string; // "Europe/Rome" per analytics geo
// Contesto
platform: "pc" | "console" | "mobile";
region: string; // "eu-west-1"
match_id?: string; // Se in una partita
// Payload specifico del tipo
payload: Record<string, unknown>;
}
// Esempio evento kill
const killEvent: TelemetryEvent = {
event_id: "a3f7e2d1-8c4b-4e9a-b1f2-3d5e7c9a1b2c",
event_type: "gameplay.kill",
schema_version: "1.2.0",
player_id: "player_12345",
session_id: "sess_abcdef",
game_build: "2024.12.1",
client_ts: 1703123456789,
server_ts: 1703123456812,
client_tz: "Europe/Rome",
platform: "pc",
region: "eu-west-1",
match_id: "match_789xyz",
payload: {
victim_id: "player_67890",
weapon: "assault_rifle",
headshot: true,
distance_meters: 47.3,
position: { x: 145.2, y: 89.1, z: 12.0 },
ttk_ms: 320
}
};
2. Architektura rurociągów: od klienta do pulpitu nawigacyjnego
Architektura potoku telemetrii gier przemysłowych jest zgodna ze wzorcem Architektura Lambdy lub, w bardziej nowoczesnej wersji, Architektura Kappy gdzie pojedyncza warstwa przetwarzania strumienia służy zarówno do analizy w czasie rzeczywistym, jak i analizy historycznej.
Elementy rurociągu
| Warstwy | Część | Technologia | Rola |
|---|---|---|---|
| Kolekcja | SDK klienta | TypeScript/C++/Unity | Buforowanie, przetwarzanie wsadowe, ponowna próba |
| Kolekcja | Brama telemetryczna | Idź/Wysłanniku | Authn, ograniczanie szybkości, fan-out |
| Przyjmowanie pokarmu | Autobus imprezowy | Apacz Kafka | Trwałość, powtarzalność, porządkowanie |
| Przetwarzanie | Silnik strumieniowy | Apache Flink | Analityka w czasie rzeczywistym, wzbogacanie |
| Porcja | Baza danych OLAP | KliknijDom | Szybkie zapytania w miliardach wierszy |
| Porcja | Gorący sklep | Redisa | Dane w czasie rzeczywistym dla LiveOps |
| Wyobrażanie sobie | Pulpity nawigacyjne | Grafana/Metabaza | Panel i analizy LiveOps |
3. SDK klienta: buforowanie i grupowanie
Wysyłanie każdego zdarzenia indywidualnie na serwer to klasyczna awaria powodująca ogromne obciążenie sieci. Dobry klient SDK implementuje lokalne buforowanie e wysyłanie partii: wydarzenia są one gromadzone w buforze w pamięci i wysyłane okresowo lub gdy bufor osiągnie określony poziom rozmiar. W przypadku awarii sieci SDK używa a bufor dysku aby nie przegapić wydarzeń.
// Client SDK di telemetria in TypeScript
class TelemetrySDK {
private buffer: TelemetryEvent[] = [];
private diskBuffer: DiskQueue;
private readonly BATCH_SIZE = 100;
private readonly FLUSH_INTERVAL_MS = 5000;
private readonly MAX_RETRY_ATTEMPTS = 3;
constructor(private config: SDKConfig) {
this.diskBuffer = new DiskQueue('telemetry_offline');
this.startFlushLoop();
this.startOfflineRetryLoop();
}
// Registra un evento nel buffer in-memory
track(eventType: string, payload: Record<string, unknown>): void {
const event: TelemetryEvent = {
event_id: crypto.randomUUID(),
event_type: eventType,
schema_version: "1.2.0",
player_id: this.config.playerId,
session_id: this.config.sessionId,
game_build: this.config.gameBuild,
client_ts: Date.now(),
server_ts: 0, // Valorizzato dal server
client_tz: Intl.DateTimeFormat().resolvedOptions().timeZone,
platform: this.config.platform,
region: this.config.region,
payload
};
this.buffer.push(event);
if (this.buffer.length >= this.BATCH_SIZE) {
this.flush(); // Flush immediato se buffer pieno
}
}
private async flush(): Promise<void> {
if (this.buffer.length === 0) return;
const batch = this.buffer.splice(0, this.BATCH_SIZE);
try {
await this.sendBatch(batch);
} catch (error) {
// Fallback su disk buffer per invio offline
console.warn('Telemetry send failed, persisting to disk', error);
this.diskBuffer.enqueue(batch);
}
}
private async sendBatch(events: TelemetryEvent[]): Promise<void> {
const response = await fetch(this.config.gatewayUrl + '/v1/events', {
method: 'POST',
headers: {
'Content-Type': 'application/x-ndjson',
'X-Game-Build': this.config.gameBuild,
'Authorization': `Bearer ${this.config.apiKey}`
},
// NDJSON: una riga per evento, più efficiente di JSON array
body: events.map(e => JSON.stringify(e)).join('\n'),
signal: AbortSignal.timeout(5000) // 5s timeout
});
if (!response.ok) {
throw new Error(`Gateway error: ${response.status}`);
}
}
private startFlushLoop(): void {
setInterval(() => this.flush(), this.FLUSH_INTERVAL_MS);
}
private startOfflineRetryLoop(): void {
setInterval(async () => {
const pendingBatch = await this.diskBuffer.dequeue(this.BATCH_SIZE);
if (pendingBatch.length > 0) {
try {
await this.sendBatch(pendingBatch);
} catch {
this.diskBuffer.requeue(pendingBatch);
}
}
}, 30_000); // Retry ogni 30s
}
}
4. Rejestr tematów i schematów Kafki
Sercem rurociągu jest Apache Kafka. Wydarzenia publikowane są w odrębnych tematach według kategorii, umożliwienie konsumentom rejestrowania się wyłącznie na te rodzaje wydarzeń, które ich interesują. The Rejestr schematów Confluent zapewnia, że każda wiadomość jest zgodna ze zdefiniowanym schematem Avro, blokując zniekształcone wiadomości zanim zanieczyszczą hurtownię danych.
# Schema Avro per TelemetryEvent (Avro IDL)
{
"type": "record",
"name": "TelemetryEvent",
"namespace": "io.gamestudio.telemetry",
"fields": [
{ "name": "event_id", "type": "string" },
{ "name": "event_type", "type": "string" },
{ "name": "schema_version", "type": "string" },
{ "name": "player_id", "type": "string" },
{ "name": "session_id", "type": "string" },
{ "name": "game_build", "type": "string" },
{ "name": "client_ts", "type": "long", "logicalType": "timestamp-millis" },
{ "name": "server_ts", "type": "long", "logicalType": "timestamp-millis" },
{ "name": "platform", "type": { "type": "enum", "name": "Platform",
"symbols": ["pc", "console", "mobile"] } },
{ "name": "region", "type": "string" },
{ "name": "match_id", "type": ["null", "string"], "default": null },
{ "name": "payload", "type": { "type": "map", "values": "string" } }
]
}
# Configurazione Kafka topics con partitioning
# Topic per categoria, partitionato per player_id (ordering per player)
kafka-topics.sh --create \
--bootstrap-server kafka:9092 \
--topic game.player.events \
--partitions 32 \
--replication-factor 3 \
--config retention.ms=2592000000 # 30 giorni
--config compression.type=lz4 # 60% riduzione size
--config segment.ms=3600000 # Roll segment ogni ora
kafka-topics.sh --create \
--bootstrap-server kafka:9092 \
--topic game.gameplay.events \
--partitions 128 \ # Più partizioni: alta throughput
--replication-factor 3 \
--config retention.ms=7776000000 # 90 giorni
--config compression.type=snappy
kafka-topics.sh --create \
--bootstrap-server kafka:9092 \
--topic game.economy.events \
--partitions 32 \
--replication-factor 3 \
--config retention.ms=-1 # Forever (economia critica)
--config cleanup.policy=compact # Compact per mantenerla
5. Przetwarzanie strumienia Flink: analiza w czasie rzeczywistym
Apache Flink to silnik przetwarzania strumieni wybrany przez EA, Riot Games i inne firmy telemetria do gier. Jego umiejętność zarządzania przetwarzanie czasu zdarzenia ze znakami wodnymi i fundamentalne znaczenie dla zarządzania spóźnionymi zdarzeniami (zdarzeniami przychodzącymi z sieci z opóźnieniem) i ich statusem rozproszone umożliwia agregację w oknach czasowych bez utraty danych.
// Apache Flink job: Kill/Death ratio in real-time per match (Java/Flink API)
public class KDAStreamProcessor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpoint ogni 10s per fault tolerance
env.enableCheckpointing(10_000);
env.getCheckpointConfig()
.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Source: Kafka consumer per eventi gameplay
KafkaSource<TelemetryEvent> source = KafkaSource
.<TelemetryEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("game.gameplay.events")
.setGroupId("flink-kda-processor")
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(new TelemetryEventDeserializer())
.build();
DataStream<TelemetryEvent> events = env
.fromSource(source, WatermarkStrategy
// Tollerata latenza massima di 30s per eventi ritardati
.<TelemetryEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((e, ts) -> e.getServerTs()),
"Kafka-Telemetry-Source"
);
// Filtra solo eventi kill e death
DataStream<TelemetryEvent> combatEvents = events
.filter(e -> e.getEventType().equals("gameplay.kill") ||
e.getEventType().equals("gameplay.death"));
// Aggrega KDA per player ogni minuto, keyed by player_id
DataStream<PlayerKDA> kdaStream = combatEvents
.keyBy(TelemetryEvent::getPlayerId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new KDAAggregateFunctionK(), new KDAWindowFunctionK())
.name("KDA-Per-Minute");
// Sink 1: Redis per LiveOps dashboard (latenza < 1s)
kdaStream.addSink(new RedisSink<>(redisConfig, new KDARedisMapper()));
// Sink 2: ClickHouse per storico (latenza accettabile 5-10s)
kdaStream.addSink(ClickHouseSink.builder()
.setTableName("game_analytics.player_kda_minutes")
.build());
env.execute("Game KDA Real-Time Processor");
}
}
// Funzione di aggregazione KDA
public class KDAAggregateFunctionK
implements AggregateFunction<TelemetryEvent, KDAAcc, KDAAcc> {
@Override
public KDAAcc createAccumulator() {
return new KDAAcc(0, 0, 0);
}
@Override
public KDAAcc add(TelemetryEvent event, KDAAcc acc) {
if (event.getEventType().equals("gameplay.kill")) {
return new KDAAcc(acc.kills + 1, acc.deaths, acc.assists);
} else {
return new KDAAcc(acc.kills, acc.deaths + 1, acc.assists);
}
}
@Override
public KDAAcc getResult(KDAAcc acc) { return acc; }
@Override
public KDAAcc merge(KDAAcc a, KDAAcc b) {
return new KDAAcc(a.kills + b.kills, a.deaths + b.deaths,
a.assists + b.assists);
}
}
6. Segmentacja graczy: RFM w czasie rzeczywistym
Model RFM (aktualność, częstotliwość, wartość pieniężna), zapożyczony z handlu elektronicznego, dostosowuje się doskonale nadaje się do gier, aby podzielić graczy na grupy, w których można podjąć działania: tych, którzy ryzykują odejściem, i tych, którzy są gotowy na ofertę premium, kto wydaje najwięcej pieniędzy, należy go traktować ostrożnie.
- Niedawność: dni od ostatniej sesji (mała częstotliwość = ryzyko rezygnacji)
- Częstotliwość: sesje w ciągu ostatnich 30 dni (wysoka częstotliwość = aktywny gracz)
- Monetarny: Całkowite wydatki w prawdziwej walucie w ciągu ostatnich 90 dni
-- ClickHouse: query RFM per segmentazione giocatori
-- Eseguita ogni ora tramite scheduled materialized view
CREATE MATERIALIZED VIEW game_analytics.player_rfm_segments
ENGINE = ReplacingMergeTree()
ORDER BY (player_id, computed_at)
AS
WITH rfm_base AS (
SELECT
player_id,
-- Recency: giorni dall'ultima sessione
dateDiff('day', max(toDate(server_ts / 1000)), today()) AS recency_days,
-- Frequency: sessioni negli ultimi 30 giorni
countDistinct(
CASE WHEN server_ts > subtractDays(now(), 30) THEN session_id END
) AS frequency_30d,
-- Monetary: spesa totale ultimi 90 giorni (economy events)
sumIf(
toFloat64OrZero(payload['amount_usd']),
event_type = 'economy.purchase' AND
server_ts > subtractDays(now(), 90)
) AS monetary_90d,
now() AS computed_at
FROM game_analytics.events_all
WHERE event_type IN ('player.session_start', 'economy.purchase')
GROUP BY player_id
),
rfm_scored AS (
SELECT
player_id,
recency_days,
frequency_30d,
monetary_90d,
computed_at,
-- Score da 1-5 per ogni dimensione (quintili)
ntile(5) OVER (ORDER BY recency_days DESC) AS r_score, -- Desc: bassa recency = buono
ntile(5) OVER (ORDER BY frequency_30d ASC) AS f_score,
ntile(5) OVER (ORDER BY monetary_90d ASC) AS m_score
FROM rfm_base
)
SELECT
player_id,
recency_days,
frequency_30d,
monetary_90d,
r_score,
f_score,
m_score,
-- Segment label
multiIf(
r_score >= 4 AND f_score >= 4 AND m_score >= 4, 'champions',
r_score >= 4 AND f_score >= 4, 'loyal_players',
r_score >= 4 AND m_score >= 4, 'potential_champions',
r_score <= 2 AND f_score >= 3, 'at_risk',
r_score <= 2 AND f_score <= 2, 'churned_risk',
m_score >= 4, 'big_spenders',
'casual'
) AS segment,
computed_at
FROM rfm_scored;
Segmenty RFM i zalecane działania LiveOps
| Segment | % Typowe | Zalecane działanie | Kanał |
|---|---|---|---|
| Mistrzowie | 5% | Leczenie VIP, dostęp do wersji beta, ekskluzywna zawartość | W grze + e-mail |
| Lojalni gracze | 15% | Nagrody lojalnościowe, premie za przepustkę bojową | W grze |
| Ryzyko | 20% | Kampania ponownego zaangażowania, oferta rabatu 30%. | Naciśnij + e-mail |
| Przeklęte ryzyko | 25% | Win-back: 7 dni bezpłatnej subskrypcji premium | E-mail + SMS |
| Wielcy wydawcy | 3% | Ochrona wielorybów, spersonalizowane oferty | Bezpośredni zasięg |
| Zwykły | 32% | Ulepszony samouczek, usprawnione wdrażanie | W grze |
7. Przewidywanie rezygnacji z inżynierią funkcji w Flink
Przewidywanie rezygnacji wymaga inżynierii funkcji w czasie rzeczywistym: wyodrębnianie plików cechy, które model ML wykorzystuje do przewidywania prawdopodobieństwa porzucenia w ciągu najbliższych 7 dni. Funkcje te są obliczane w przesuwanych oknach i zapisywane w magazynie funkcji (Redis lub Feast).
// Flink: Feature engineering per churn prediction
// Calcola feature su finestra scorrevole di 7 giorni
DataStream<ChurnFeatures> churnFeatures = events
.keyBy(TelemetryEvent::getPlayerId)
.window(SlidingEventTimeWindows.of(Time.days(7), Time.hours(1)))
.process(new ChurnFeatureExtractor())
.name("Churn-Feature-Engineering");
public class ChurnFeatureExtractor
extends ProcessWindowFunction<TelemetryEvent, ChurnFeatures, String, TimeWindow> {
@Override
public void process(String playerId, Context ctx,
Iterable<TelemetryEvent> events, Collector<ChurnFeatures> out) {
List<TelemetryEvent> eventList = new ArrayList<>();
events.forEach(eventList::add);
// Feature calcolo
long sessionCount = eventList.stream()
.filter(e -> e.getEventType().equals("player.session_start"))
.count();
double avgSessionDurationMin = eventList.stream()
.filter(e -> e.getEventType().equals("player.session_end"))
.mapToDouble(e -> Double.parseDouble(e.getPayload().getOrDefault("duration_sec", "0")))
.average().orElse(0.0) / 60.0;
long matchesCompleted = eventList.stream()
.filter(e -> e.getEventType().equals("gameplay.match_end"))
.count();
long matchesAbandoned = eventList.stream()
.filter(e -> e.getEventType().equals("gameplay.match_abandon"))
.count();
double abandonRate = matchesCompleted > 0
? (double) matchesAbandoned / (matchesCompleted + matchesAbandoned)
: 0.0;
double totalSpend = eventList.stream()
.filter(e -> e.getEventType().equals("economy.purchase"))
.mapToDouble(e -> Double.parseDouble(e.getPayload().getOrDefault("amount_usd", "0")))
.sum();
long daysSinceLastSession = eventList.stream()
.filter(e -> e.getEventType().equals("player.session_start"))
.mapToLong(TelemetryEvent::getServerTs)
.max()
.map(lastTs -> (System.currentTimeMillis() - lastTs) / 86_400_000L)
.orElse(999L);
out.collect(new ChurnFeatures(
playerId,
sessionCount,
avgSessionDurationMin,
matchesCompleted,
abandonRate,
totalSpend,
daysSinceLastSession,
ctx.window().getEnd()
));
}
}
// Sink su Redis Feature Store con TTL
churnFeatures.addSink(new RedisSink<>(
redisConfig,
(ChurnFeatures f, FlinkJedisPoolConfig config) -> {
try (Jedis jedis = new Jedis(config.getHost(), config.getPort())) {
String key = "churn_features:" + f.getPlayerId();
Map<String, String> fields = new HashMap<>();
fields.put("session_count_7d", String.valueOf(f.getSessionCount()));
fields.put("avg_session_min", String.valueOf(f.getAvgSessionDurationMin()));
fields.put("abandon_rate", String.valueOf(f.getAbandonRate()));
fields.put("total_spend_usd", String.valueOf(f.getTotalSpend()));
fields.put("days_since_last", String.valueOf(f.getDaysSinceLastSession()));
jedis.hset(key, fields);
jedis.expire(key, 86400 * 7); // TTL 7 giorni
}
}
));
8. Jakość danych: deduplikacja i obsługa późnych zdarzeń
W systemie rozproszonym duplikaty są nieuniknione: klient może ponownie wysłać partię, jeśli jej nie otrzyma
potwierdź, a partycja sieciowa może powodować podwójne zapisy. Deduplikacja oparta na event_id
musi to nastąpić na jak najwcześniejszym etapie rurociągu.
// Flink: Deduplicazione stateful con event_id
// Usa RocksDB state backend per gestire miliardi di ID
DataStream<TelemetryEvent> deduplicated = rawEvents
.keyBy(TelemetryEvent::getEventId)
.process(new DeduplicationFunction(Duration.ofHours(24)))
.name("Event-Deduplication");
public class DeduplicationFunction
extends KeyedProcessFunction<String, TelemetryEvent, TelemetryEvent> {
private final Duration deduplicationWindow;
private ValueState<Boolean> seenState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Boolean> descriptor =
new ValueStateDescriptor<>("event-seen", Boolean.class);
// TTL: pulisce lo stato dopo 24h per evitare memory leak
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
descriptor.enableTimeToLive(ttlConfig);
seenState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(TelemetryEvent event, Context ctx,
Collector<TelemetryEvent> out) throws Exception {
if (seenState.value() == null) {
// Prima volta che vediamo questo event_id
seenState.update(true);
out.collect(event); // Passa all'avanti
}
// Altrimenti: duplicato, scartato silenziosamente
// (registra metrica per monitoring)
}
}
-- ClickHouse: tabella eventi con ReplacingMergeTree per dedup storage-level
CREATE TABLE game_analytics.events_all (
event_id String,
event_type LowCardinality(String),
player_id String,
session_id String,
server_ts DateTime64(3),
platform LowCardinality(String),
region LowCardinality(String),
match_id Nullable(String),
payload Map(String, String),
ingested_at DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(ingested_at)
PARTITION BY toYYYYMM(server_ts)
ORDER BY (player_id, event_type, server_ts)
SETTINGS index_granularity = 8192;
-- Indice bloom filter su player_id per query veloci per giocatore
ALTER TABLE game_analytics.events_all
ADD INDEX bf_player_id player_id TYPE bloom_filter(0.01) GRANULARITY 4;
Ostrzeżenie: klient znacznika czasu a serwer znacznika czasu
Nigdy nie ufaj client_ts jako źródło prawdy do analiz: zegary klienckie
mogą być godzinami przesunięte w fazie (szczególnie na urządzeniach mobilnych), manipulowane przez oszustów lub po prostu
źle. Zawsze używaj server_ts do zapytań analitycznych. The client_ts i przydatne
tylko do pomiaru opóźnienia sieci (server_ts - client_ts) i odbudować
sekwencje zdarzeń w ramach jednej sesji.
9. Analiza ścieżki i wskaźniki zatrzymania
Wskaźniki utrzymania są najważniejsze w przypadku gier opartych na usługach na żywo. Zatrzymanie w dniu 1, dzień 7 i dzień 30 (D1/D7/D30) to standardowe w branży wskaźniki KPI służące do oceny stanu gry.
-- ClickHouse: calcolo retention D1/D7/D30 per coorte
-- Una "coorte" e il gruppo di giocatori con lo stesso primo giorno di gioco
SELECT
install_date,
count(DISTINCT player_id) AS cohort_size,
-- D1 Retention: tornati il giorno dopo l'installazione
countDistinctIf(player_id,
dateDiff('day', install_date, last_seen_date) >= 1
) AS retained_d1,
-- D7 Retention
countDistinctIf(player_id,
dateDiff('day', install_date, last_seen_date) >= 7
) AS retained_d7,
-- D30 Retention
countDistinctIf(player_id,
dateDiff('day', install_date, last_seen_date) >= 30
) AS retained_d30,
-- Percentuali
round(100.0 * retained_d1 / cohort_size, 2) AS d1_pct,
round(100.0 * retained_d7 / cohort_size, 2) AS d7_pct,
round(100.0 * retained_d30 / cohort_size, 2) AS d30_pct
FROM (
-- Subquery: per ogni giocatore, data installazione e ultima sessione
SELECT
player_id,
min(toDate(server_ts / 1000)) AS install_date,
max(toDate(server_ts / 1000)) AS last_seen_date
FROM game_analytics.events_all
WHERE event_type = 'player.session_start'
GROUP BY player_id
)
WHERE install_date >= today() - 90 -- Ultimi 90 giorni di coorti
GROUP BY install_date
ORDER BY install_date DESC;
-- Benchmark settore (mobile games 2024):
-- D1: 25-40% (buono), D7: 10-20%, D30: 3-8%
-- Top performers: D1 40%+, D7 20%+, D30 8%+
10. Skalowanie: liczby rzeczywiste i względy kosztowe
Gra AAA w weekend premiery może wygenerować szczytową liczbę 10–50 milionów zdarzeń na minutę. Oto jak to zrobić jaka jest skala rurociągu i jakie są orientacyjne koszty.
Rozmiar dla 1 miliona jednoczesnych graczy
| Część | Rozmiarowanie | Koszt AWS/miesiąc (szac.) |
|---|---|---|
| Kafka (MSK) | 12 m5,4xdużych brokerów | ~8000 dolarów |
| Mrugnięcie (EMR) | 20 menedżer zadań r5.2xlarge | ~12 000 dolarów |
| ClickHouse (własny hosting) | 6 węzłów r6a.8xlarge + dysk SSD 50 TB | ~15 000 dolarów |
| Redis (ElastiCache) | 3 węzły klastra r7g.2xlarge | ~3000 dolarów |
| Brama telemetryczna | Automatyczne skalowanie ECS (10-50 zadań) | ~5000 dolarów |
| Całkowity | ~ 43 000 USD miesięcznie |
W przypadku 10 milionów aktywnych graczy miesięcznie koszt wynosi ~0,004 USD/użytkownika/miesiąc: duża inwestycja uzasadnione możliwością optymalizacji ARPU i retencji.
Optymalizacje kosztów
- Inteligentne próbkowanie: W przypadku zdarzeń związanych z rozgrywką o dużej częstotliwości (pozycja, animacja), wysyłaj tylko 1 zdarzenie na 5 zamiast wszystkich. Niewiele tracisz na jakości i oszczędzasz 80% wydajności.
- Warstwowe przechowywanie: ClickHouse z zimną warstwą na S3. Najnowsze dane (7 dni) na lokalnym dysku SSD, dane historyczne na S3 z wolniejszym dostępem, ale 10x taniej.
- Agregacja przed wysłaniem: W przypadku prostych wskaźników, takich jak liczba klatek na sekundę lub ping, należy zagregować dane po stronie klienta (średnia, min., maks. co 30 s) zamiast wysyłać każdą klatkę.
Wnioski
Potok telemetrii gier przemysłowych nie jest standardowym projektem inżynierii danych: wymaga głębokie zrozumienie wzorców gier, ograniczeń opóźnień i potrzeb LiveOps. Kombinacja Kafka + Flink + ClickHouse + Redis i stał się de facto standardem w branży ze względu na zdolność do obsługi ogromnych wolumenów przy jednoczesnym utrzymaniu opóźnienia poniżej sekundy krytyka.
Segmentacja odtwarzaczy RFM i przewidywanie rezygnacji w czasie rzeczywistym przekształcają surowe dane w działania konkretne: ukierunkowane kampanie ponownie angażujące, spersonalizowane oferty i decyzje LiveOps opiera się na danych, a nie na intuicji. Gry, które inwestują w wysokiej jakości telemetrię, zobacz średnio 15-25% poprawa retencji D30 w porównaniu do tych, którzy tego nie robią.
Kolejne kroki w serii Game Backend
- Poprzedni artykuł: LiveOps: System wydarzeń i flaga funkcji
- Następny artykuł: Gry w chmurze: przesyłanie strumieniowe za pomocą WebRTC i węzła brzegowego
- Powiązane spostrzeżenia: MLOps dla biznesu – modele AI w produkcji







