Gayrimenkul Piyasası için Gerçek Zamanlı Veri Hattı
Emlak piyasası her gün astronomik miktarda veri üretiyor: yeni ilanlar, indirimler fiyat, kayıtlı işlemler, kadastro verileri, inşaat izinleri, doluluk oranları inşaat için ticari, hammadde fiyatları. Bu ham veri akışını dönüştürün eyleme dönüştürülebilir gerçek zamanlı pazar göstergelerine ve platformları ayıran rekabet avantajına Geleneksel çözümlerden PropTech'e öncülük ediyoruz.
Bu makalede eksiksiz bir veri alımı, işleme ve analiz hattı oluşturuyoruz emlak piyasasının kullanımı Apaçi Kafka akış için, Apache Flink'i gerçek zamanlı işleme için Zaman ÖlçeğiDB zaman serisi için e Elasticsearch Arama ve toplamalar için.
Ne Öğreneceksiniz
- Gerçek zamanlı emlak verileri için Lambda ve Kappa mimarisi
- Heterojen kaynaklardan (MLS, portallar, tapu sicilleri) verilerin etik olarak ayıklanması ve normalleştirilmesi
- Apache Kafka: emlak etkinlikleri için konu tasarımı
- Apache Flink: piyasa göstergeleri için akış işleme
- TimescaleDB: Geçmiş fiyatlar için zaman serisi optimizasyonu
- Göstergelerin hesaplanması: Fiyat Endeksi, Piyasadaki Günler, Fiyat İndirim Oranı
- Gerçek zamanlı uyarı: pazar fırsatları ortaya çıktığında bildirimler
- Apache Superset ve Grafana ile analitik kontrol paneli
Emlak Piyasası Veri Kaynakları
Bir emlak piyasası istihbarat hattı, çok heterojen kaynakları formatlarla entegre etmelidir. tamamen farklı kalite ve yenileme oranları.
Ana Veri Kaynakları
- MLS Akışı (RESO Web API'si): yüksek kaliteli yapılandırılmış veriler, RESO/ODATA standardı, her 15 dakikada bir güncellenir
- Emlak portalları (Immobiliare.it, Idealista, Zillow): oran sınırları ve Hizmet Şartları açısından kazıma
- Kadastro (Gelir Dairesi, Tapu Sicili): kayıtlı işlemler, kadastro değerleri, ipotek verileri
- İnşaat izinleri: mahalle büyümesi/düşüşüne dair öngörü sinyali
- Demografik veriler (ISTAT, Nüfus Sayımı): nüfus, gelirler, göç dinamikleri
- SONRA OpenStreetMap'i açın: hizmetler, ulaşım, mekânsal zenginleştirmeye yönelik okullar
- ECB/Fed oranları: Mortgage piyasası ve mülk alımı üzerinde doğrudan etki
Gayrimenkul Etkinlikleri için Veri Modeli
Her pazar olayını yazılı bir Kafka mesajı olarak modelliyoruz. Olay odaklı model zaman içinde her durum değişikliğinin takip edilebileceğini, yeniden işlenebileceğini ve analiz edilebileceğini garanti eder.
// Event types per il mercato immobiliare
type PropertyEventType =
| 'listing.created'
| 'listing.updated'
| 'listing.price_changed'
| 'listing.status_changed'
| 'listing.removed'
| 'transaction.recorded' // vendita/affitto registrata
| 'appraisal.completed'
| 'permit.issued'; // permesso di costruzione
interface PropertyEvent {
id: string; // UUID evento
type: PropertyEventType;
sourceId: string; // ID nella fonte originale
sourceName: string; // 'mls_rome', 'immobiliare_it', 'catasto'
timestamp: string; // ISO 8601
propertyId?: string; // ID univoco nostra piattaforma (post-dedup)
payload: PropertyListingPayload | TransactionPayload | PermitPayload;
metadata: {
ingestTimestamp: string;
schemaVersion: string;
quality: 'high' | 'medium' | 'low';
};
}
interface PropertyListingPayload {
externalId: string;
title: string;
price: number;
previousPrice?: number; // se price_changed
currency: string;
propertyType: string;
squareMeters: number;
rooms: number;
location: {
lat: number;
lon: number;
address: string;
city: string;
neighborhood?: string;
h3Cell?: string; // pre-calcolato all'ingestione
};
listingType: 'sale' | 'rent';
status: 'active' | 'sold' | 'rented' | 'withdrawn';
listedAt: string;
updatedAt: string;
daysOnMarket?: number; // calcolato
features: Record<string, unknown>;
}
interface TransactionPayload {
transactionId: string;
salePrice: number;
assessedValue?: number; // valore catastale
transactionDate: string;
buyerType: 'individual' | 'company' | 'investment_fund';
financingType: 'cash' | 'mortgage' | 'unknown';
location: { lat: number; lon: number; city: string };
}
Kafka: Topolojiler ve Bölümleme
Kafka konu tasarımı performans açısından kritik öneme sahiptir. Coğrafi bölgeye göre bölmelere izin verilir Tüketicilerin şehirlere göre paralelleştirilmesi, belirli mülkler için olayların sırasını garanti eder.
import { Kafka, Partitioners } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const kafka = new Kafka({
clientId: 'proptech-ingestion',
brokers: process.env['KAFKA_BROKERS']!.split(','),
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: process.env['KAFKA_USERNAME']!,
password: process.env['KAFKA_PASSWORD']!,
},
});
// Topic configuration
// - property.events.raw: tutti gli eventi grezzi, 50 partizioni per citta
// - property.events.enriched: post-processing con geocoding e deduplication
// - market.indicators: medie/aggregazioni per area geografica
// - alerts: opportunità di mercato detected da Flink
const TOPICS = [
{
topic: 'property.events.raw',
numPartitions: 50,
replicationFactor: 3,
configEntries: [
{ name: 'retention.ms', value: String(7 * 24 * 60 * 60 * 1000) }, // 7 giorni
{ name: 'compression.type', value: 'lz4' },
{ name: 'cleanup.policy', value: 'delete' },
],
},
{
topic: 'property.events.enriched',
numPartitions: 50,
replicationFactor: 3,
configEntries: [
{ name: 'retention.ms', value: String(30 * 24 * 60 * 60 * 1000) }, // 30 giorni
{ name: 'compression.type', value: 'zstd' },
],
},
{
topic: 'market.indicators',
numPartitions: 10,
replicationFactor: 3,
configEntries: [
{ name: 'cleanup.policy', value: 'compact' }, // log compaction per latest state
{ name: 'retention.ms', value: String(-1) }, // indefinita (compacted)
],
},
];
// Producer con partitioning per area geografica
export class PropertyEventProducer {
private producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
transactionTimeout: 30000,
});
async sendEvent(event: PropertyEvent): Promise<void> {
await this.producer.send({
topic: 'property.events.raw',
messages: [{
// Key = city code: garantisce ordine eventi per citta sullo stesso partition
key: this.getCityCode(event),
value: JSON.stringify(event),
headers: {
'event-type': event.type,
'source': event.sourceName,
'schema-version': '1.0',
},
timestamp: String(Date.now()),
}],
});
}
private getCityCode(event: PropertyEvent): string {
const payload = event.payload as PropertyListingPayload;
return payload.location?.city?.toLowerCase().replace(/\s/g, '-') ?? 'unknown';
}
}
Apache Flink: Gerçek Zamanlı Gösterge Hesaplaması
Apache Flink'i ve işlenecek en güçlü akış işleme çerçevesi Gayrimenkul olaylarının akışını ve kayan zaman pencerelerinde piyasa göstergelerini hesaplayın. Hesaplamak için Flink'i PyFlink ile kullanıyoruz H3 hücresi için Fiyat Endeksi her 15 dakikada bir.
# PyFlink: calcolo Price Index per area geografica in real-time
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import lit, col
import json
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(8)
env.get_checkpoint_config().set_checkpointing_interval(60_000) # checkpoint ogni 60s
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
tenv = StreamTableEnvironment.create(env, settings)
# Definizione source Kafka
tenv.execute_sql("""
CREATE TABLE property_events (
event_id STRING,
event_type STRING,
city STRING,
h3_cell STRING,
price DOUBLE,
square_meters DOUBLE,
price_per_sqm DOUBLE AS (price / square_meters),
listing_type STRING,
property_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'property.events.enriched',
'properties.bootstrap.servers' = '{{KAFKA_BROKERS}}',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Calcolo Price Index per H3 cell: media mobile su finestra 15 minuti
tenv.execute_sql("""
CREATE TABLE market_indicators (
h3_cell STRING,
city STRING,
listing_type STRING,
property_type STRING,
avg_price_per_sqm DOUBLE,
median_price DOUBLE,
listing_count BIGINT,
price_index DOUBLE,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
PRIMARY KEY (h3_cell, listing_type, property_type, window_end) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'market.indicators',
'format' = 'json'
)
""")
# Query: aggregazione per finestra temporale scorrevole
tenv.execute_sql("""
INSERT INTO market_indicators
SELECT
h3_cell,
city,
listing_type,
property_type,
AVG(price_per_sqm) AS avg_price_per_sqm,
-- Approssimazione mediana con percentile (Flink non ha MEDIAN nativo)
PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY price_per_sqm) AS median_price,
COUNT(*) AS listing_count,
-- Price Index: rapporto vs baseline di 30 giorni fa (in produzione: join con window storica)
AVG(price_per_sqm) / NULLIF(AVG(price_per_sqm) OVER (
PARTITION BY h3_cell, listing_type
ORDER BY event_time
RANGE BETWEEN INTERVAL '30' DAY PRECEDING AND INTERVAL '15' MINUTE PRECEDING
), 0) AS price_index,
TUMBLE_START(event_time, INTERVAL '15' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '15' MINUTE) AS window_end
FROM property_events
WHERE
event_type IN ('listing.created', 'listing.price_changed')
AND price > 0
AND square_meters > 20
GROUP BY
h3_cell, city, listing_type, property_type,
TUMBLE(event_time, INTERVAL '15' MINUTE)
""")
# Query alert: proprietà con prezzo ridotto >10% ultimi 7 giorni
tenv.execute_sql("""
CREATE VIEW price_reduction_alerts AS
SELECT
event_id,
city,
h3_cell,
price,
(price - LAG(price) OVER (PARTITION BY event_id ORDER BY event_time)) / price AS reduction_pct,
event_time
FROM property_events
WHERE event_type = 'listing.price_changed'
HAVING reduction_pct < -0.10
""")
TimescaleDB: Zaman Serisi İçin Optimize Edilmiş Depolama
Zaman ÖlçeğiDB ve zaman serisi verileri için optimize edilmiş bir PostgreSQL uzantısı. Tabloları otomatik olarak zamansal parçalara (hipertablolar) böler ve hassas sıkıştırma sunar Zaman aralıklarında mükemmel sorgu performansıyla geçmiş veriler için %95.
-- Schema TimescaleDB per dati mercato immobiliare
-- Tabella principali price observations (time-series)
CREATE TABLE price_observations (
time TIMESTAMPTZ NOT NULL,
h3_cell TEXT NOT NULL,
city TEXT NOT NULL,
listing_type TEXT NOT NULL, -- 'sale' | 'rent'
property_type TEXT NOT NULL,
avg_price_sqm NUMERIC(10, 2),
median_price NUMERIC(12, 2),
listing_count INT,
price_index NUMERIC(8, 4), -- 1.0 = baseline
dom_avg NUMERIC(8, 2) -- Days on Market media
);
-- Converte in hypertable (partizionamento automatico per tempo)
SELECT create_hypertable('price_observations', 'time', chunk_time_interval => INTERVAL '1 week');
-- Indici per query temporali e geografiche
CREATE INDEX idx_price_obs_h3_time ON price_observations (h3_cell, time DESC);
CREATE INDEX idx_price_obs_city_time ON price_observations (city, listing_type, time DESC);
-- Policy di compressione: comprimi chunk >30 giorni (risparmio 90%+ spazio)
ALTER TABLE price_observations SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'h3_cell, city, listing_type'
);
SELECT add_compression_policy('price_observations', INTERVAL '30 days');
-- Retention policy: aggrega automaticamente in bucket mensili dopo 1 anno
-- Usa Continuous Aggregates di TimescaleDB per le aggregazioni
CREATE MATERIALIZED VIEW monthly_market_summary
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 month', time) AS month,
city,
listing_type,
property_type,
AVG(avg_price_sqm) AS avg_price_sqm,
AVG(price_index) AS avg_price_index,
SUM(listing_count) AS total_listings,
AVG(dom_avg) AS avg_days_on_market
FROM price_observations
GROUP BY month, city, listing_type, property_type
WITH NO DATA;
-- Refresh automatico ogni ora
SELECT add_continuous_aggregate_policy('monthly_market_summary',
start_offset => INTERVAL '3 months',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
-- Query: trend prezzi mq negli ultimi 12 mesi per un quartiere H3
SELECT
time_bucket('1 month', time) AS month,
AVG(avg_price_sqm) AS avg_price_sqm,
MAX(price_index) AS price_index
FROM price_observations
WHERE
h3_cell = '{h3_cell_value}'
AND listing_type = 'sale'
AND time > NOW() - INTERVAL '12 months'
GROUP BY month
ORDER BY month;
-- Query: Days on Market trend per citta
SELECT
time_bucket('1 week', time) AS week,
city,
AVG(dom_avg) AS avg_dom,
SUM(listing_count) AS listings
FROM price_observations
WHERE city = 'Roma' AND listing_type = 'sale'
AND time > NOW() - INTERVAL '6 months'
GROUP BY week, city
ORDER BY week;
Piyasa Göstergeleri: Hesaplama ve Yorumlama
// Calcolo Days on Market (DOM) e Price Reduction Rate
export interface MarketIndicators {
citySlug: string;
period: string; // '2026-Q1', '2026-03'
listingType: 'sale' | 'rent';
// Prezzi
avgPricePerSqm: number;
medianPricePerSqm: number;
priceIndex: number; // 1.0 = anno base, 1.05 = +5% YoY
// Velocita mercato
avgDaysOnMarket: number;
medianDaysOnMarket: number;
listingCountNew: number; // nuovi annunci nel periodo
listingCountClosed: number; // conclusi (venduti/affittati)
absorbRate: number; // conclusi / nuovi (>1 = mercato caldo)
// Tensione offerta/domanda
priceReductionRate: number; // % annunci con riduzione prezzo
avgPriceReductionPct: number; // riduzione media quando avviene
stockMonths: number; // mesi per esaurire offerta attuale
// Segmenti
byPropertyType: Record<string, { avgPrice: number; count: number }>;
}
export async function computeMarketIndicators(
db: Pool,
city: string,
periodStart: Date,
periodEnd: Date
): Promise<MarketIndicators> {
const [priceResult, velocityResult, reductionResult] = await Promise.all([
// 1. Prezzi medi
db.query(
`SELECT
AVG(price / square_meters) AS avg_price_sqm,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price / square_meters) AS median_price_sqm
FROM property_listings
WHERE city = $1 AND listing_type = 'sale' AND status = 'active'
AND listed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
// 2. Days on Market
db.query(
`SELECT
AVG(EXTRACT(DAY FROM (closed_at - listed_at))) AS avg_dom,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY EXTRACT(DAY FROM (closed_at - listed_at))) AS median_dom,
COUNT(*) AS closed_count
FROM property_listings
WHERE city = $1 AND listing_type = 'sale' AND status IN ('sold')
AND closed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
// 3. Price reductions
db.query(
`SELECT
COUNT(*) FILTER (WHERE price_change < 0) * 100.0 / COUNT(*) AS reduction_rate_pct,
AVG(price_change / original_price) FILTER (WHERE price_change < 0) AS avg_reduction_pct
FROM listing_price_history
WHERE city = $1 AND changed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
]);
const activeCount = await db.query(
'SELECT COUNT(*) AS cnt FROM property_listings WHERE city = $1 AND status = $2',
[city, 'active']
);
const closedCount = parseInt(velocityResult.rows[0]?.closed_count ?? '0');
const durationDays = (periodEnd.getTime() - periodStart.getTime()) / (1000 * 86400);
const closedPerDay = closedCount / durationDays;
const stockMonths = closedPerDay > 0
? (parseInt(activeCount.rows[0].cnt) / (closedPerDay * 30))
: 99;
return {
citySlug: city.toLowerCase().replace(/\s/g, '-'),
period: periodStart.toISOString().substring(0, 7),
listingType: 'sale',
avgPricePerSqm: parseFloat(priceResult.rows[0]?.avg_price_sqm ?? '0'),
medianPricePerSqm: parseFloat(priceResult.rows[0]?.median_price_sqm ?? '0'),
priceIndex: 1.0, // In produzione: confronto con periodo precedente
avgDaysOnMarket: parseFloat(velocityResult.rows[0]?.avg_dom ?? '30'),
medianDaysOnMarket: parseFloat(velocityResult.rows[0]?.median_dom ?? '25'),
listingCountNew: 0, // Query separata
listingCountClosed: closedCount,
absorbRate: closedCount / Math.max(1, parseInt(activeCount.rows[0].cnt)),
priceReductionRate: parseFloat(reductionResult.rows[0]?.reduction_rate_pct ?? '0'),
avgPriceReductionPct: parseFloat(reductionResult.rows[0]?.avg_reduction_pct ?? '0'),
stockMonths,
byPropertyType: {}, // Popolare con query separata
};
}
Gerçek Zamanlı Uyarı: Pazar Fırsatları
Gerçek zamanlı boru hattının gerçek değeri proaktif uyarıda yatmaktadır: yatırımcıları bilgilendirmek ve alıcılar, pazar onları özümsemeden önce fırsatlar ortaya çıktığında.
// Sistema di alerting per opportunità di mercato
interface MarketAlert {
alertId: string;
type: 'price_drop' | 'new_listing_below_market' | 'high_demand_area' | 'distressed_sale';
severity: 'low' | 'medium' | 'high';
propertyId?: string;
h3Cell?: string;
city: string;
description: string;
metrics: Record<string, number>;
timestamp: string;
}
export class MarketAlertEngine {
async processEvent(
event: PropertyEvent,
marketContext: MarketIndicators
): Promise<MarketAlert[]> {
const alerts: MarketAlert[] = [];
const payload = event.payload as PropertyListingPayload;
// Alert 1: Prezzo significativamente sotto mercato
if (event.type === 'listing.created' && payload.price > 0) {
const priceSqm = payload.price / payload.squareMeters;
const marketPriceSqm = marketContext.avgPricePerSqm;
const discount = (marketPriceSqm - priceSqm) / marketPriceSqm;
if (discount > 0.15) {
alerts.push({
alertId: `alert-${Date.now()}`,
type: 'new_listing_below_market',
severity: discount > 0.25 ? 'high' : 'medium',
propertyId: payload.externalId,
city: payload.location.city,
description: `Nuovo annuncio ${(discount * 100).toFixed(0)}% sotto la media di mercato`,
metrics: {
priceSqm,
marketPriceSqm,
discountPercent: discount * 100,
},
timestamp: new Date().toISOString(),
});
}
}
// Alert 2: Riduzione prezzo significativa
if (event.type === 'listing.price_changed' && payload.previousPrice) {
const reduction = (payload.previousPrice - payload.price) / payload.previousPrice;
if (reduction > 0.08) {
alerts.push({
alertId: `alert-${Date.now()}-pr`,
type: 'price_drop',
severity: reduction > 0.15 ? 'high' : 'medium',
propertyId: payload.externalId,
city: payload.location.city,
description: `Riduzione prezzo ${(reduction * 100).toFixed(0)}% (da €${payload.previousPrice} a €${payload.price})`,
metrics: {
previousPrice: payload.previousPrice,
currentPrice: payload.price,
reductionPercent: reduction * 100,
},
timestamp: new Date().toISOString(),
});
}
}
// Alert 3: Area con alta domanda (DOM molto basso)
if (marketContext.avgDaysOnMarket < 7 && marketContext.absorbRate > 1.5) {
alerts.push({
alertId: `alert-${Date.now()}-hd`,
type: 'high_demand_area',
severity: 'medium',
h3Cell: payload.location.h3Cell,
city: payload.location.city,
description: `Area ad altissima domanda: DOM medio ${marketContext.avgDaysOnMarket} giorni, absorption rate ${marketContext.absorbRate.toFixed(1)}x`,
metrics: {
avgDom: marketContext.avgDaysOnMarket,
absorbRate: marketContext.absorbRate,
},
timestamp: new Date().toISOString(),
});
}
return alerts;
}
}
Temel Piyasa Göstergeleri
| Gösterge | Hesaplama | Tercüme |
|---|---|---|
| Piyasadaki Günler (DOM) | Listelemeden kapanışa kadar ortalama gün sayısı | <30 = sıcak pazar, >90 = soğuk pazar |
| Emilim Oranı | Satış/ay / Aktif stok | >%20 = satıcının pazarı, <%15 = alıcının pazarı |
| Fiyat İndirim Oranı | Fiyat indirimi ile listeleme yüzdesi | >%30 = düşüş baskısı |
| Aylık Tedarik | Stok / Aylık Satışlar | <3 = eksiklik, 4-6 = denge, >6 = fazlalık |
| Liste-Satış Oranı | Satış fiyatı / Liste fiyatı | >%100 = rekabetçi pazar (talep üzerine teklifler) |
Sonuçlar
Gerçek zamanlı bir emlak piyasası istihbarat hattı, ham verileri içgörülere dönüştürüyor Uygulanabilir: Doğru yatırımcı ne zaman ve nereden satın alacağını bilir, mülk yöneticisi ise nasıl satın alacağını bilir Fiyat, acente hangi özellikleri sunacağını bilir. Kafka + Flink + TimescaleDB kombinasyonu on milyonlarca veriyi işleyebilen sağlam, ölçeklenebilir ve bakımı kolay bir mimari sunar saniye mertebesinde gecikmelerle günlük olay sayısı.







