Conductă de date în timp real pentru piața imobiliară
Piața imobiliară produce zilnic cantități astronomice de date: noi listări, reduceri pret, tranzactii inregistrate, date cadastrale, autorizatii de construire, gradul de ocupare comerciale, preturi materii prime pentru constructii. Transformați acest val de date brute în indicatori de piață acționabili în timp real și avantajul competitiv care separă platformele Conducerea PropTech din soluțiile tradiționale.
În acest articol construim o conductă completă de asimilare, procesare și analiză a datelor de utilizare a pieţei imobiliare Apache Kafka pentru streaming, Apache Flink pentru procesare în timp real, TimecaleDB pentru serii temporale e Elasticsearch pentru căutare și agregare.
Ce vei învăța
- Arhitectură Lambda și Kappa pentru date imobiliare în timp real
- Razuirea etică și normalizarea datelor din surse eterogene (MLS, portaluri, carte funciară)
- Apache Kafka: design de subiecte pentru evenimente imobiliare
- Apache Flink: procesare flux pentru indicatorii de piață
- TimescaleDB: Optimizarea serii temporale pentru prețurile istorice
- Calculul indicatorilor: Indicele prețurilor, Zilele pe piață, Rata de reducere a prețurilor
- Alerte în timp real: notificări când apar oportunități de piață
- Tabloul de bord Analytics cu Apache Superset și Grafana
Surse de date ale pieței imobiliare
O conductă de informații despre piața imobiliară trebuie să integreze surse foarte eterogene, cu formate, calitate și rate de reîmprospătare complet diferite.
Principalele surse de date
- Flux MLS (RESO Web API): date structurate de înaltă calitate, standard RESO/ODATA, actualizare la fiecare 15 min
- Portaluri imobiliare (Immobiliare.it, Idealista, Zillow): scraping cu privire la limitele ratelor și a Termenilor și condițiilor
- Cadastru (Agenția de Venituri, Cartea Funciară): tranzactii inregistrate, valori cadastrale, date ipotecare
- Autorizații de construire: semnal anticipator al creșterii/declinului cartierului
- Date demografice (ISTAT, Recensământ): populaţia, veniturile, dinamica migraţiei
- APOI OpenStreetMap: servicii, transport, școli de îmbogățire geospațială
- Ratele BCE/Fed: impact direct asupra pieței ipotecare și achiziționării proprietății
Model de date pentru evenimente imobiliare
Modelăm fiecare eveniment de piață ca un mesaj Kafka tastat. Modelul bazat pe evenimente garantează că fiecare schimbare de stare este urmărită, reprocesabilă și analizabilă în timp.
// Event types per il mercato immobiliare
type PropertyEventType =
| 'listing.created'
| 'listing.updated'
| 'listing.price_changed'
| 'listing.status_changed'
| 'listing.removed'
| 'transaction.recorded' // vendita/affitto registrata
| 'appraisal.completed'
| 'permit.issued'; // permesso di costruzione
interface PropertyEvent {
id: string; // UUID evento
type: PropertyEventType;
sourceId: string; // ID nella fonte originale
sourceName: string; // 'mls_rome', 'immobiliare_it', 'catasto'
timestamp: string; // ISO 8601
propertyId?: string; // ID univoco nostra piattaforma (post-dedup)
payload: PropertyListingPayload | TransactionPayload | PermitPayload;
metadata: {
ingestTimestamp: string;
schemaVersion: string;
quality: 'high' | 'medium' | 'low';
};
}
interface PropertyListingPayload {
externalId: string;
title: string;
price: number;
previousPrice?: number; // se price_changed
currency: string;
propertyType: string;
squareMeters: number;
rooms: number;
location: {
lat: number;
lon: number;
address: string;
city: string;
neighborhood?: string;
h3Cell?: string; // pre-calcolato all'ingestione
};
listingType: 'sale' | 'rent';
status: 'active' | 'sold' | 'rented' | 'withdrawn';
listedAt: string;
updatedAt: string;
daysOnMarket?: number; // calcolato
features: Record<string, unknown>;
}
interface TransactionPayload {
transactionId: string;
salePrice: number;
assessedValue?: number; // valore catastale
transactionDate: string;
buyerType: 'individual' | 'company' | 'investment_fund';
financingType: 'cash' | 'mortgage' | 'unknown';
location: { lat: number; lon: number; city: string };
}
Kafka: Topologii și partiționare
Designul subiectului Kafka este esențial pentru performanță. Sunt permise împărțiri pe zone geografice consumatori paralelizați pe oraș, garantând ordinea evenimentelor pentru proprietăți specifice.
import { Kafka, Partitioners } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const kafka = new Kafka({
clientId: 'proptech-ingestion',
brokers: process.env['KAFKA_BROKERS']!.split(','),
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: process.env['KAFKA_USERNAME']!,
password: process.env['KAFKA_PASSWORD']!,
},
});
// Topic configuration
// - property.events.raw: tutti gli eventi grezzi, 50 partizioni per citta
// - property.events.enriched: post-processing con geocoding e deduplication
// - market.indicators: medie/aggregazioni per area geografica
// - alerts: opportunità di mercato detected da Flink
const TOPICS = [
{
topic: 'property.events.raw',
numPartitions: 50,
replicationFactor: 3,
configEntries: [
{ name: 'retention.ms', value: String(7 * 24 * 60 * 60 * 1000) }, // 7 giorni
{ name: 'compression.type', value: 'lz4' },
{ name: 'cleanup.policy', value: 'delete' },
],
},
{
topic: 'property.events.enriched',
numPartitions: 50,
replicationFactor: 3,
configEntries: [
{ name: 'retention.ms', value: String(30 * 24 * 60 * 60 * 1000) }, // 30 giorni
{ name: 'compression.type', value: 'zstd' },
],
},
{
topic: 'market.indicators',
numPartitions: 10,
replicationFactor: 3,
configEntries: [
{ name: 'cleanup.policy', value: 'compact' }, // log compaction per latest state
{ name: 'retention.ms', value: String(-1) }, // indefinita (compacted)
],
},
];
// Producer con partitioning per area geografica
export class PropertyEventProducer {
private producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
transactionTimeout: 30000,
});
async sendEvent(event: PropertyEvent): Promise<void> {
await this.producer.send({
topic: 'property.events.raw',
messages: [{
// Key = city code: garantisce ordine eventi per citta sullo stesso partition
key: this.getCityCode(event),
value: JSON.stringify(event),
headers: {
'event-type': event.type,
'source': event.sourceName,
'schema-version': '1.0',
},
timestamp: String(Date.now()),
}],
});
}
private getCityCode(event: PropertyEvent): string {
const payload = event.payload as PropertyListingPayload;
return payload.location?.city?.toLowerCase().replace(/\s/g, '-') ?? 'unknown';
}
}
Apache Flink: Calcul indicator în timp real
Apache Flink și cel mai puternic cadru de procesare a fluxului de procesat fluxul de evenimente imobiliare și calcularea indicatorilor de piață în ferestre de timp glisante. Folosim Flink cu PyFlink pentru a calcula Indicele prețurilor pentru celula H3 la fiecare 15 minute.
# PyFlink: calcolo Price Index per area geografica in real-time
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import lit, col
import json
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(8)
env.get_checkpoint_config().set_checkpointing_interval(60_000) # checkpoint ogni 60s
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
tenv = StreamTableEnvironment.create(env, settings)
# Definizione source Kafka
tenv.execute_sql("""
CREATE TABLE property_events (
event_id STRING,
event_type STRING,
city STRING,
h3_cell STRING,
price DOUBLE,
square_meters DOUBLE,
price_per_sqm DOUBLE AS (price / square_meters),
listing_type STRING,
property_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'property.events.enriched',
'properties.bootstrap.servers' = '{{KAFKA_BROKERS}}',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Calcolo Price Index per H3 cell: media mobile su finestra 15 minuti
tenv.execute_sql("""
CREATE TABLE market_indicators (
h3_cell STRING,
city STRING,
listing_type STRING,
property_type STRING,
avg_price_per_sqm DOUBLE,
median_price DOUBLE,
listing_count BIGINT,
price_index DOUBLE,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
PRIMARY KEY (h3_cell, listing_type, property_type, window_end) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'market.indicators',
'format' = 'json'
)
""")
# Query: aggregazione per finestra temporale scorrevole
tenv.execute_sql("""
INSERT INTO market_indicators
SELECT
h3_cell,
city,
listing_type,
property_type,
AVG(price_per_sqm) AS avg_price_per_sqm,
-- Approssimazione mediana con percentile (Flink non ha MEDIAN nativo)
PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY price_per_sqm) AS median_price,
COUNT(*) AS listing_count,
-- Price Index: rapporto vs baseline di 30 giorni fa (in produzione: join con window storica)
AVG(price_per_sqm) / NULLIF(AVG(price_per_sqm) OVER (
PARTITION BY h3_cell, listing_type
ORDER BY event_time
RANGE BETWEEN INTERVAL '30' DAY PRECEDING AND INTERVAL '15' MINUTE PRECEDING
), 0) AS price_index,
TUMBLE_START(event_time, INTERVAL '15' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '15' MINUTE) AS window_end
FROM property_events
WHERE
event_type IN ('listing.created', 'listing.price_changed')
AND price > 0
AND square_meters > 20
GROUP BY
h3_cell, city, listing_type, property_type,
TUMBLE(event_time, INTERVAL '15' MINUTE)
""")
# Query alert: proprietà con prezzo ridotto >10% ultimi 7 giorni
tenv.execute_sql("""
CREATE VIEW price_reduction_alerts AS
SELECT
event_id,
city,
h3_cell,
price,
(price - LAG(price) OVER (PARTITION BY event_id ORDER BY event_time)) / price AS reduction_pct,
event_time
FROM property_events
WHERE event_type = 'listing.price_changed'
HAVING reduction_pct < -0.10
""")
TimescaleDB: Stocare optimizată pentru serii de timp
TimecaleDB și o extensie PostgreSQL optimizată pentru date din seria temporală. Împarte automat tabelele în bucăți temporale (hipertable) și oferă o compresie fină 95% pentru datele istorice, cu performanțe excelente de interogare pe intervale de timp.
-- Schema TimescaleDB per dati mercato immobiliare
-- Tabella principali price observations (time-series)
CREATE TABLE price_observations (
time TIMESTAMPTZ NOT NULL,
h3_cell TEXT NOT NULL,
city TEXT NOT NULL,
listing_type TEXT NOT NULL, -- 'sale' | 'rent'
property_type TEXT NOT NULL,
avg_price_sqm NUMERIC(10, 2),
median_price NUMERIC(12, 2),
listing_count INT,
price_index NUMERIC(8, 4), -- 1.0 = baseline
dom_avg NUMERIC(8, 2) -- Days on Market media
);
-- Converte in hypertable (partizionamento automatico per tempo)
SELECT create_hypertable('price_observations', 'time', chunk_time_interval => INTERVAL '1 week');
-- Indici per query temporali e geografiche
CREATE INDEX idx_price_obs_h3_time ON price_observations (h3_cell, time DESC);
CREATE INDEX idx_price_obs_city_time ON price_observations (city, listing_type, time DESC);
-- Policy di compressione: comprimi chunk >30 giorni (risparmio 90%+ spazio)
ALTER TABLE price_observations SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'h3_cell, city, listing_type'
);
SELECT add_compression_policy('price_observations', INTERVAL '30 days');
-- Retention policy: aggrega automaticamente in bucket mensili dopo 1 anno
-- Usa Continuous Aggregates di TimescaleDB per le aggregazioni
CREATE MATERIALIZED VIEW monthly_market_summary
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 month', time) AS month,
city,
listing_type,
property_type,
AVG(avg_price_sqm) AS avg_price_sqm,
AVG(price_index) AS avg_price_index,
SUM(listing_count) AS total_listings,
AVG(dom_avg) AS avg_days_on_market
FROM price_observations
GROUP BY month, city, listing_type, property_type
WITH NO DATA;
-- Refresh automatico ogni ora
SELECT add_continuous_aggregate_policy('monthly_market_summary',
start_offset => INTERVAL '3 months',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
-- Query: trend prezzi mq negli ultimi 12 mesi per un quartiere H3
SELECT
time_bucket('1 month', time) AS month,
AVG(avg_price_sqm) AS avg_price_sqm,
MAX(price_index) AS price_index
FROM price_observations
WHERE
h3_cell = '{h3_cell_value}'
AND listing_type = 'sale'
AND time > NOW() - INTERVAL '12 months'
GROUP BY month
ORDER BY month;
-- Query: Days on Market trend per citta
SELECT
time_bucket('1 week', time) AS week,
city,
AVG(dom_avg) AS avg_dom,
SUM(listing_count) AS listings
FROM price_observations
WHERE city = 'Roma' AND listing_type = 'sale'
AND time > NOW() - INTERVAL '6 months'
GROUP BY week, city
ORDER BY week;
Indicatori de piață: calcul și interpretare
// Calcolo Days on Market (DOM) e Price Reduction Rate
export interface MarketIndicators {
citySlug: string;
period: string; // '2026-Q1', '2026-03'
listingType: 'sale' | 'rent';
// Prezzi
avgPricePerSqm: number;
medianPricePerSqm: number;
priceIndex: number; // 1.0 = anno base, 1.05 = +5% YoY
// Velocita mercato
avgDaysOnMarket: number;
medianDaysOnMarket: number;
listingCountNew: number; // nuovi annunci nel periodo
listingCountClosed: number; // conclusi (venduti/affittati)
absorbRate: number; // conclusi / nuovi (>1 = mercato caldo)
// Tensione offerta/domanda
priceReductionRate: number; // % annunci con riduzione prezzo
avgPriceReductionPct: number; // riduzione media quando avviene
stockMonths: number; // mesi per esaurire offerta attuale
// Segmenti
byPropertyType: Record<string, { avgPrice: number; count: number }>;
}
export async function computeMarketIndicators(
db: Pool,
city: string,
periodStart: Date,
periodEnd: Date
): Promise<MarketIndicators> {
const [priceResult, velocityResult, reductionResult] = await Promise.all([
// 1. Prezzi medi
db.query(
`SELECT
AVG(price / square_meters) AS avg_price_sqm,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price / square_meters) AS median_price_sqm
FROM property_listings
WHERE city = $1 AND listing_type = 'sale' AND status = 'active'
AND listed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
// 2. Days on Market
db.query(
`SELECT
AVG(EXTRACT(DAY FROM (closed_at - listed_at))) AS avg_dom,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY EXTRACT(DAY FROM (closed_at - listed_at))) AS median_dom,
COUNT(*) AS closed_count
FROM property_listings
WHERE city = $1 AND listing_type = 'sale' AND status IN ('sold')
AND closed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
// 3. Price reductions
db.query(
`SELECT
COUNT(*) FILTER (WHERE price_change < 0) * 100.0 / COUNT(*) AS reduction_rate_pct,
AVG(price_change / original_price) FILTER (WHERE price_change < 0) AS avg_reduction_pct
FROM listing_price_history
WHERE city = $1 AND changed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
]);
const activeCount = await db.query(
'SELECT COUNT(*) AS cnt FROM property_listings WHERE city = $1 AND status = $2',
[city, 'active']
);
const closedCount = parseInt(velocityResult.rows[0]?.closed_count ?? '0');
const durationDays = (periodEnd.getTime() - periodStart.getTime()) / (1000 * 86400);
const closedPerDay = closedCount / durationDays;
const stockMonths = closedPerDay > 0
? (parseInt(activeCount.rows[0].cnt) / (closedPerDay * 30))
: 99;
return {
citySlug: city.toLowerCase().replace(/\s/g, '-'),
period: periodStart.toISOString().substring(0, 7),
listingType: 'sale',
avgPricePerSqm: parseFloat(priceResult.rows[0]?.avg_price_sqm ?? '0'),
medianPricePerSqm: parseFloat(priceResult.rows[0]?.median_price_sqm ?? '0'),
priceIndex: 1.0, // In produzione: confronto con periodo precedente
avgDaysOnMarket: parseFloat(velocityResult.rows[0]?.avg_dom ?? '30'),
medianDaysOnMarket: parseFloat(velocityResult.rows[0]?.median_dom ?? '25'),
listingCountNew: 0, // Query separata
listingCountClosed: closedCount,
absorbRate: closedCount / Math.max(1, parseInt(activeCount.rows[0].cnt)),
priceReductionRate: parseFloat(reductionResult.rows[0]?.reduction_rate_pct ?? '0'),
avgPriceReductionPct: parseFloat(reductionResult.rows[0]?.avg_reduction_pct ?? '0'),
stockMonths,
byPropertyType: {}, // Popolare con query separata
};
}
Alerte în timp real: oportunități de piață
Valoarea reală a conductei în timp real este în alerta proactivă: notificarea investitorilor și cumpărători atunci când oportunitățile apar înainte ca piața să le absoarbă.
// Sistema di alerting per opportunità di mercato
interface MarketAlert {
alertId: string;
type: 'price_drop' | 'new_listing_below_market' | 'high_demand_area' | 'distressed_sale';
severity: 'low' | 'medium' | 'high';
propertyId?: string;
h3Cell?: string;
city: string;
description: string;
metrics: Record<string, number>;
timestamp: string;
}
export class MarketAlertEngine {
async processEvent(
event: PropertyEvent,
marketContext: MarketIndicators
): Promise<MarketAlert[]> {
const alerts: MarketAlert[] = [];
const payload = event.payload as PropertyListingPayload;
// Alert 1: Prezzo significativamente sotto mercato
if (event.type === 'listing.created' && payload.price > 0) {
const priceSqm = payload.price / payload.squareMeters;
const marketPriceSqm = marketContext.avgPricePerSqm;
const discount = (marketPriceSqm - priceSqm) / marketPriceSqm;
if (discount > 0.15) {
alerts.push({
alertId: `alert-${Date.now()}`,
type: 'new_listing_below_market',
severity: discount > 0.25 ? 'high' : 'medium',
propertyId: payload.externalId,
city: payload.location.city,
description: `Nuovo annuncio ${(discount * 100).toFixed(0)}% sotto la media di mercato`,
metrics: {
priceSqm,
marketPriceSqm,
discountPercent: discount * 100,
},
timestamp: new Date().toISOString(),
});
}
}
// Alert 2: Riduzione prezzo significativa
if (event.type === 'listing.price_changed' && payload.previousPrice) {
const reduction = (payload.previousPrice - payload.price) / payload.previousPrice;
if (reduction > 0.08) {
alerts.push({
alertId: `alert-${Date.now()}-pr`,
type: 'price_drop',
severity: reduction > 0.15 ? 'high' : 'medium',
propertyId: payload.externalId,
city: payload.location.city,
description: `Riduzione prezzo ${(reduction * 100).toFixed(0)}% (da €${payload.previousPrice} a €${payload.price})`,
metrics: {
previousPrice: payload.previousPrice,
currentPrice: payload.price,
reductionPercent: reduction * 100,
},
timestamp: new Date().toISOString(),
});
}
}
// Alert 3: Area con alta domanda (DOM molto basso)
if (marketContext.avgDaysOnMarket < 7 && marketContext.absorbRate > 1.5) {
alerts.push({
alertId: `alert-${Date.now()}-hd`,
type: 'high_demand_area',
severity: 'medium',
h3Cell: payload.location.h3Cell,
city: payload.location.city,
description: `Area ad altissima domanda: DOM medio ${marketContext.avgDaysOnMarket} giorni, absorption rate ${marketContext.absorbRate.toFixed(1)}x`,
metrics: {
avgDom: marketContext.avgDaysOnMarket,
absorbRate: marketContext.absorbRate,
},
timestamp: new Date().toISOString(),
});
}
return alerts;
}
}
Indicatori cheie de piață
| Indicator | Calcul | Interpretare |
|---|---|---|
| Zile pe piață (DOM) | Zile medii de la listare până la închidere | <30 = piață fierbinte, >90 = piață rece |
| Rata de absorbție | Vânzări/lună / Stoc activ | >20% = piața vânzătorului, <15% = piața cumpărătorului |
| Rata de reducere a prețului | % listare cu reducere de preț | >30% = presiunea de urs |
| Luni de aprovizionare | Stoc/Vânzări lunare | <3 = deficit, 4-6 = echilibru, >6 = surplus |
| Raportul dintre listă și vânzare | Preț de vânzare / preț de listare | >100% = piata competitiva (oferte la cerere) |
Concluzii
O conductă de informații despre piața imobiliară în timp real transformă datele brute în informații acționabil: investitorul potrivit știe când și unde să cumpere, administratorul imobiliar știe cum preț, agentul știe ce proprietăți să prezinte. Combinația Kafka + Flink + TimescaleDB oferă o arhitectură robustă, scalabilă și care poate fi întreținută, care poate procesa zeci de milioane de evenimente pe zi cu latențe de ordinul secundelor.







