Realtime datapijplijn voor de vastgoedmarkt
De vastgoedmarkt produceert elke dag astronomische hoeveelheden gegevens: nieuwe aanbiedingen, kortingen prijs, geregistreerde transacties, kadastrale gegevens, bouwvergunningen, bezettingsgraden commerciële grondstoffenprijzen voor de bouw. Transformeer deze stroom van ruwe data in bruikbare realtime marktindicatoren en het concurrentievoordeel dat de platforms scheidt Toonaangevende PropTech vanuit traditionele oplossingen.
In dit artikel bouwen we een volledige pijplijn voor gegevensopname, -verwerking en -analyse van het gebruik van de vastgoedmarkt Apache Kafka voor streamen, Apache Flink voor realtime verwerking, TijdschaalDB voor tijdreeksen e Elastischzoeken voor zoeken en aggregaties.
Wat je gaat leren
- Lambda- en Kappa-architectuur voor realtime vastgoedgegevens
- Ethisch schrapen en normaliseren van gegevens uit heterogene bronnen (MLS, portalen, kadaster)
- Apache Kafka: onderwerpontwerp voor vastgoedevenementen
- Apache Flink: streamverwerking voor marktindicatoren
- TimescaleDB: optimalisatie van tijdreeksen voor historische prijzen
- Berekening van indicatoren: prijsindex, dagen op de markt, prijsverlagingspercentage
- Realtime waarschuwingen: meldingen wanneer er marktkansen ontstaan
- Analytics-dashboard met Apache Superset en Grafana
Gegevensbronnen voor de vastgoedmarkt
Een pijplijn voor informatie over de vastgoedmarkt moet zeer heterogene bronnen integreren, met formats, compleet andere kwaliteit en vernieuwingsfrequenties.
Belangrijkste gegevensbronnen
- MLS-feed (RESO Web API): gestructureerde gegevens van hoge kwaliteit, RESO/ODATA-standaard, elke 15 minuten bijgewerkt
- Vastgoedportals (Immobiliare.it, Idealista, Zillow): scraping met betrekking tot tarieflimieten en servicevoorwaarden
- Kadaster (Inkomstenagentschap, Kadaster): geregistreerde transacties, kadastrale waarden, hypotheekgegevens
- Bouwvergunningen: anticiperend signaal van groei/achteruitgang van de buurt
- Demografische gegevens (ISTAT, Census): bevolking, inkomens, migratiedynamiek
- DAN OpenStreetMap: diensten, transport, scholen voor geospatiale verrijking
- ECB/Fed-tarieven: directe impact op de hypotheekmarkt en de aankoop van onroerend goed
Datamodel voor vastgoedevenementen
We modelleren elke marktgebeurtenis als een getypt Kafka-bericht. Het gebeurtenisgestuurde patroon garandeert dat elke statusverandering wordt gevolgd, herverwerkbaar en analyseerbaar in de loop van de tijd.
// Event types per il mercato immobiliare
type PropertyEventType =
| 'listing.created'
| 'listing.updated'
| 'listing.price_changed'
| 'listing.status_changed'
| 'listing.removed'
| 'transaction.recorded' // vendita/affitto registrata
| 'appraisal.completed'
| 'permit.issued'; // permesso di costruzione
interface PropertyEvent {
id: string; // UUID evento
type: PropertyEventType;
sourceId: string; // ID nella fonte originale
sourceName: string; // 'mls_rome', 'immobiliare_it', 'catasto'
timestamp: string; // ISO 8601
propertyId?: string; // ID univoco nostra piattaforma (post-dedup)
payload: PropertyListingPayload | TransactionPayload | PermitPayload;
metadata: {
ingestTimestamp: string;
schemaVersion: string;
quality: 'high' | 'medium' | 'low';
};
}
interface PropertyListingPayload {
externalId: string;
title: string;
price: number;
previousPrice?: number; // se price_changed
currency: string;
propertyType: string;
squareMeters: number;
rooms: number;
location: {
lat: number;
lon: number;
address: string;
city: string;
neighborhood?: string;
h3Cell?: string; // pre-calcolato all'ingestione
};
listingType: 'sale' | 'rent';
status: 'active' | 'sold' | 'rented' | 'withdrawn';
listedAt: string;
updatedAt: string;
daysOnMarket?: number; // calcolato
features: Record<string, unknown>;
}
interface TransactionPayload {
transactionId: string;
salePrice: number;
assessedValue?: number; // valore catastale
transactionDate: string;
buyerType: 'individual' | 'company' | 'investment_fund';
financingType: 'cash' | 'mortgage' | 'unknown';
location: { lat: number; lon: number; city: string };
}
Kafka: topologieën en partities
Het ontwerp van het Kafka-onderwerp is prestatiekritisch. Opdelingen per geografisch gebied zijn toegestaan consumenten geparallelliseerd per stad, waardoor de volgorde van gebeurtenissen voor specifieke eigendommen wordt gegarandeerd.
import { Kafka, Partitioners } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const kafka = new Kafka({
clientId: 'proptech-ingestion',
brokers: process.env['KAFKA_BROKERS']!.split(','),
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: process.env['KAFKA_USERNAME']!,
password: process.env['KAFKA_PASSWORD']!,
},
});
// Topic configuration
// - property.events.raw: tutti gli eventi grezzi, 50 partizioni per citta
// - property.events.enriched: post-processing con geocoding e deduplication
// - market.indicators: medie/aggregazioni per area geografica
// - alerts: opportunità di mercato detected da Flink
const TOPICS = [
{
topic: 'property.events.raw',
numPartitions: 50,
replicationFactor: 3,
configEntries: [
{ name: 'retention.ms', value: String(7 * 24 * 60 * 60 * 1000) }, // 7 giorni
{ name: 'compression.type', value: 'lz4' },
{ name: 'cleanup.policy', value: 'delete' },
],
},
{
topic: 'property.events.enriched',
numPartitions: 50,
replicationFactor: 3,
configEntries: [
{ name: 'retention.ms', value: String(30 * 24 * 60 * 60 * 1000) }, // 30 giorni
{ name: 'compression.type', value: 'zstd' },
],
},
{
topic: 'market.indicators',
numPartitions: 10,
replicationFactor: 3,
configEntries: [
{ name: 'cleanup.policy', value: 'compact' }, // log compaction per latest state
{ name: 'retention.ms', value: String(-1) }, // indefinita (compacted)
],
},
];
// Producer con partitioning per area geografica
export class PropertyEventProducer {
private producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
transactionTimeout: 30000,
});
async sendEvent(event: PropertyEvent): Promise<void> {
await this.producer.send({
topic: 'property.events.raw',
messages: [{
// Key = city code: garantisce ordine eventi per citta sullo stesso partition
key: this.getCityCode(event),
value: JSON.stringify(event),
headers: {
'event-type': event.type,
'source': event.sourceName,
'schema-version': '1.0',
},
timestamp: String(Date.now()),
}],
});
}
private getCityCode(event: PropertyEvent): string {
const payload = event.payload as PropertyListingPayload;
return payload.location?.city?.toLowerCase().replace(/\s/g, '-') ?? 'unknown';
}
}
Apache Flink: realtime indicatorberekening
Apache Flink en het krachtigste raamwerk voor stroomverwerking om te verwerken de stroom van vastgoedgebeurtenissen en bereken marktindicatoren in glijdende tijdvensters. We gebruiken Flink met PyFlink om de Prijsindex voor H3-cel elke 15 minuten.
# PyFlink: calcolo Price Index per area geografica in real-time
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import lit, col
import json
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(8)
env.get_checkpoint_config().set_checkpointing_interval(60_000) # checkpoint ogni 60s
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
tenv = StreamTableEnvironment.create(env, settings)
# Definizione source Kafka
tenv.execute_sql("""
CREATE TABLE property_events (
event_id STRING,
event_type STRING,
city STRING,
h3_cell STRING,
price DOUBLE,
square_meters DOUBLE,
price_per_sqm DOUBLE AS (price / square_meters),
listing_type STRING,
property_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'property.events.enriched',
'properties.bootstrap.servers' = '{{KAFKA_BROKERS}}',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Calcolo Price Index per H3 cell: media mobile su finestra 15 minuti
tenv.execute_sql("""
CREATE TABLE market_indicators (
h3_cell STRING,
city STRING,
listing_type STRING,
property_type STRING,
avg_price_per_sqm DOUBLE,
median_price DOUBLE,
listing_count BIGINT,
price_index DOUBLE,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
PRIMARY KEY (h3_cell, listing_type, property_type, window_end) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'market.indicators',
'format' = 'json'
)
""")
# Query: aggregazione per finestra temporale scorrevole
tenv.execute_sql("""
INSERT INTO market_indicators
SELECT
h3_cell,
city,
listing_type,
property_type,
AVG(price_per_sqm) AS avg_price_per_sqm,
-- Approssimazione mediana con percentile (Flink non ha MEDIAN nativo)
PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY price_per_sqm) AS median_price,
COUNT(*) AS listing_count,
-- Price Index: rapporto vs baseline di 30 giorni fa (in produzione: join con window storica)
AVG(price_per_sqm) / NULLIF(AVG(price_per_sqm) OVER (
PARTITION BY h3_cell, listing_type
ORDER BY event_time
RANGE BETWEEN INTERVAL '30' DAY PRECEDING AND INTERVAL '15' MINUTE PRECEDING
), 0) AS price_index,
TUMBLE_START(event_time, INTERVAL '15' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '15' MINUTE) AS window_end
FROM property_events
WHERE
event_type IN ('listing.created', 'listing.price_changed')
AND price > 0
AND square_meters > 20
GROUP BY
h3_cell, city, listing_type, property_type,
TUMBLE(event_time, INTERVAL '15' MINUTE)
""")
# Query alert: proprietà con prezzo ridotto >10% ultimi 7 giorni
tenv.execute_sql("""
CREATE VIEW price_reduction_alerts AS
SELECT
event_id,
city,
h3_cell,
price,
(price - LAG(price) OVER (PARTITION BY event_id ORDER BY event_time)) / price AS reduction_pct,
event_time
FROM property_events
WHERE event_type = 'listing.price_changed'
HAVING reduction_pct < -0.10
""")
TimescaleDB: tijdreeksgeoptimaliseerde opslag
TijdschaalDB en een PostgreSQL-extensie die is geoptimaliseerd voor tijdreeksgegevens. Verdeelt tabellen automatisch in tijdelijke delen (hypertabellen) en biedt fijne compressie 95% voor historische gegevens, met uitstekende queryprestaties over tijdsintervallen.
-- Schema TimescaleDB per dati mercato immobiliare
-- Tabella principali price observations (time-series)
CREATE TABLE price_observations (
time TIMESTAMPTZ NOT NULL,
h3_cell TEXT NOT NULL,
city TEXT NOT NULL,
listing_type TEXT NOT NULL, -- 'sale' | 'rent'
property_type TEXT NOT NULL,
avg_price_sqm NUMERIC(10, 2),
median_price NUMERIC(12, 2),
listing_count INT,
price_index NUMERIC(8, 4), -- 1.0 = baseline
dom_avg NUMERIC(8, 2) -- Days on Market media
);
-- Converte in hypertable (partizionamento automatico per tempo)
SELECT create_hypertable('price_observations', 'time', chunk_time_interval => INTERVAL '1 week');
-- Indici per query temporali e geografiche
CREATE INDEX idx_price_obs_h3_time ON price_observations (h3_cell, time DESC);
CREATE INDEX idx_price_obs_city_time ON price_observations (city, listing_type, time DESC);
-- Policy di compressione: comprimi chunk >30 giorni (risparmio 90%+ spazio)
ALTER TABLE price_observations SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'h3_cell, city, listing_type'
);
SELECT add_compression_policy('price_observations', INTERVAL '30 days');
-- Retention policy: aggrega automaticamente in bucket mensili dopo 1 anno
-- Usa Continuous Aggregates di TimescaleDB per le aggregazioni
CREATE MATERIALIZED VIEW monthly_market_summary
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 month', time) AS month,
city,
listing_type,
property_type,
AVG(avg_price_sqm) AS avg_price_sqm,
AVG(price_index) AS avg_price_index,
SUM(listing_count) AS total_listings,
AVG(dom_avg) AS avg_days_on_market
FROM price_observations
GROUP BY month, city, listing_type, property_type
WITH NO DATA;
-- Refresh automatico ogni ora
SELECT add_continuous_aggregate_policy('monthly_market_summary',
start_offset => INTERVAL '3 months',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
-- Query: trend prezzi mq negli ultimi 12 mesi per un quartiere H3
SELECT
time_bucket('1 month', time) AS month,
AVG(avg_price_sqm) AS avg_price_sqm,
MAX(price_index) AS price_index
FROM price_observations
WHERE
h3_cell = '{h3_cell_value}'
AND listing_type = 'sale'
AND time > NOW() - INTERVAL '12 months'
GROUP BY month
ORDER BY month;
-- Query: Days on Market trend per citta
SELECT
time_bucket('1 week', time) AS week,
city,
AVG(dom_avg) AS avg_dom,
SUM(listing_count) AS listings
FROM price_observations
WHERE city = 'Roma' AND listing_type = 'sale'
AND time > NOW() - INTERVAL '6 months'
GROUP BY week, city
ORDER BY week;
Marktindicatoren: berekening en interpretatie
// Calcolo Days on Market (DOM) e Price Reduction Rate
export interface MarketIndicators {
citySlug: string;
period: string; // '2026-Q1', '2026-03'
listingType: 'sale' | 'rent';
// Prezzi
avgPricePerSqm: number;
medianPricePerSqm: number;
priceIndex: number; // 1.0 = anno base, 1.05 = +5% YoY
// Velocita mercato
avgDaysOnMarket: number;
medianDaysOnMarket: number;
listingCountNew: number; // nuovi annunci nel periodo
listingCountClosed: number; // conclusi (venduti/affittati)
absorbRate: number; // conclusi / nuovi (>1 = mercato caldo)
// Tensione offerta/domanda
priceReductionRate: number; // % annunci con riduzione prezzo
avgPriceReductionPct: number; // riduzione media quando avviene
stockMonths: number; // mesi per esaurire offerta attuale
// Segmenti
byPropertyType: Record<string, { avgPrice: number; count: number }>;
}
export async function computeMarketIndicators(
db: Pool,
city: string,
periodStart: Date,
periodEnd: Date
): Promise<MarketIndicators> {
const [priceResult, velocityResult, reductionResult] = await Promise.all([
// 1. Prezzi medi
db.query(
`SELECT
AVG(price / square_meters) AS avg_price_sqm,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price / square_meters) AS median_price_sqm
FROM property_listings
WHERE city = $1 AND listing_type = 'sale' AND status = 'active'
AND listed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
// 2. Days on Market
db.query(
`SELECT
AVG(EXTRACT(DAY FROM (closed_at - listed_at))) AS avg_dom,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY EXTRACT(DAY FROM (closed_at - listed_at))) AS median_dom,
COUNT(*) AS closed_count
FROM property_listings
WHERE city = $1 AND listing_type = 'sale' AND status IN ('sold')
AND closed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
// 3. Price reductions
db.query(
`SELECT
COUNT(*) FILTER (WHERE price_change < 0) * 100.0 / COUNT(*) AS reduction_rate_pct,
AVG(price_change / original_price) FILTER (WHERE price_change < 0) AS avg_reduction_pct
FROM listing_price_history
WHERE city = $1 AND changed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
]);
const activeCount = await db.query(
'SELECT COUNT(*) AS cnt FROM property_listings WHERE city = $1 AND status = $2',
[city, 'active']
);
const closedCount = parseInt(velocityResult.rows[0]?.closed_count ?? '0');
const durationDays = (periodEnd.getTime() - periodStart.getTime()) / (1000 * 86400);
const closedPerDay = closedCount / durationDays;
const stockMonths = closedPerDay > 0
? (parseInt(activeCount.rows[0].cnt) / (closedPerDay * 30))
: 99;
return {
citySlug: city.toLowerCase().replace(/\s/g, '-'),
period: periodStart.toISOString().substring(0, 7),
listingType: 'sale',
avgPricePerSqm: parseFloat(priceResult.rows[0]?.avg_price_sqm ?? '0'),
medianPricePerSqm: parseFloat(priceResult.rows[0]?.median_price_sqm ?? '0'),
priceIndex: 1.0, // In produzione: confronto con periodo precedente
avgDaysOnMarket: parseFloat(velocityResult.rows[0]?.avg_dom ?? '30'),
medianDaysOnMarket: parseFloat(velocityResult.rows[0]?.median_dom ?? '25'),
listingCountNew: 0, // Query separata
listingCountClosed: closedCount,
absorbRate: closedCount / Math.max(1, parseInt(activeCount.rows[0].cnt)),
priceReductionRate: parseFloat(reductionResult.rows[0]?.reduction_rate_pct ?? '0'),
avgPriceReductionPct: parseFloat(reductionResult.rows[0]?.avg_reduction_pct ?? '0'),
stockMonths,
byPropertyType: {}, // Popolare con query separata
};
}
Realtime waarschuwingen: marktkansen
De echte waarde van de real-time pijplijn ligt in het proactief waarschuwen: het informeren van investeerders en kopers wanneer zich kansen voordoen voordat de markt ze absorbeert.
// Sistema di alerting per opportunità di mercato
interface MarketAlert {
alertId: string;
type: 'price_drop' | 'new_listing_below_market' | 'high_demand_area' | 'distressed_sale';
severity: 'low' | 'medium' | 'high';
propertyId?: string;
h3Cell?: string;
city: string;
description: string;
metrics: Record<string, number>;
timestamp: string;
}
export class MarketAlertEngine {
async processEvent(
event: PropertyEvent,
marketContext: MarketIndicators
): Promise<MarketAlert[]> {
const alerts: MarketAlert[] = [];
const payload = event.payload as PropertyListingPayload;
// Alert 1: Prezzo significativamente sotto mercato
if (event.type === 'listing.created' && payload.price > 0) {
const priceSqm = payload.price / payload.squareMeters;
const marketPriceSqm = marketContext.avgPricePerSqm;
const discount = (marketPriceSqm - priceSqm) / marketPriceSqm;
if (discount > 0.15) {
alerts.push({
alertId: `alert-${Date.now()}`,
type: 'new_listing_below_market',
severity: discount > 0.25 ? 'high' : 'medium',
propertyId: payload.externalId,
city: payload.location.city,
description: `Nuovo annuncio ${(discount * 100).toFixed(0)}% sotto la media di mercato`,
metrics: {
priceSqm,
marketPriceSqm,
discountPercent: discount * 100,
},
timestamp: new Date().toISOString(),
});
}
}
// Alert 2: Riduzione prezzo significativa
if (event.type === 'listing.price_changed' && payload.previousPrice) {
const reduction = (payload.previousPrice - payload.price) / payload.previousPrice;
if (reduction > 0.08) {
alerts.push({
alertId: `alert-${Date.now()}-pr`,
type: 'price_drop',
severity: reduction > 0.15 ? 'high' : 'medium',
propertyId: payload.externalId,
city: payload.location.city,
description: `Riduzione prezzo ${(reduction * 100).toFixed(0)}% (da €${payload.previousPrice} a €${payload.price})`,
metrics: {
previousPrice: payload.previousPrice,
currentPrice: payload.price,
reductionPercent: reduction * 100,
},
timestamp: new Date().toISOString(),
});
}
}
// Alert 3: Area con alta domanda (DOM molto basso)
if (marketContext.avgDaysOnMarket < 7 && marketContext.absorbRate > 1.5) {
alerts.push({
alertId: `alert-${Date.now()}-hd`,
type: 'high_demand_area',
severity: 'medium',
h3Cell: payload.location.h3Cell,
city: payload.location.city,
description: `Area ad altissima domanda: DOM medio ${marketContext.avgDaysOnMarket} giorni, absorption rate ${marketContext.absorbRate.toFixed(1)}x`,
metrics: {
avgDom: marketContext.avgDaysOnMarket,
absorbRate: marketContext.absorbRate,
},
timestamp: new Date().toISOString(),
});
}
return alerts;
}
}
Belangrijkste marktindicatoren
| Indicator | Berekening | Interpretatie |
|---|---|---|
| Dagen op de markt (DOM) | Gemiddeld aantal dagen vanaf vermelding tot sluiting | <30 = warme markt, >90 = koude markt |
| Absorptiesnelheid | Verkoop/maand / Actieve voorraad | >20% = verkopersmarkt, <15% = kopersmarkt |
| Prijsverlagingspercentage | % vermelding met prijsverlaging | >30% = bearish druk |
| Maanden van aanbod | Voorraad / maandelijkse omzet | <3 = tekort, 4-6 = evenwicht, >6 = overschot |
| Lijst-tot-verkoop-ratio | Verkoopprijs/Lijstprijs | >100% = concurrerende markt (aanbiedingen op aanvraag) |
Conclusies
Een real-time pijplijn met informatie over de vastgoedmarkt zet ruwe gegevens om in inzichten uitvoerbaar: de juiste investeerder weet wanneer en waar hij moet kopen, de vastgoedbeheerder weet hoe prijs, de makelaar weet welke eigendommen hij moet presenteren. De combinatie Kafka + Flink + TimescaleDB biedt een robuuste, schaalbare en onderhoudbare architectuur die tientallen miljoenen kan verwerken aantal gebeurtenissen per dag met latenties in de orde van seconden.







