Uyarı Triyaj Otomasyonu: Grafik Analizi ile MTTD'yi Azaltın
Uyarı önceliklendirme sorunu, bir SOC analistinin hayatındaki en pahalı ve sinir bozucu sorunlardan biridir. IBM 2025 verilerine göre bir ihlalin tespit edilmesi için geçen ortalama süre (MTTD - Ortalama Tespit Süresi) ve tekrar arasında 194 gün Gelişmiş otomasyon uygulamayan kuruluşlar için. Öte yandan yapay zeka otomasyonunu grafik tabanlı korelasyonla birleştiren kuruluşlar bunu azaltıyor değer a saatler, hatta dakikalar aynı tehdit kategorilerinde.
Sorunun özü uyarı yorgunluğudur: Ortalama bir SOC, günde binlerce uyarıyı aynı hızda yönetir. Bazı ortamlarda %97'yi aşan yanlış pozitiflerin oranı. Analistler zamanlarının çoğunu Gerçek tehditleri araştırmak yerine zararsız uyarıları değerlendirmek. Grafik analizi bir yaklaşım sunar tamamen farklıdır: her bir uyarıyı ayrı ayrı değerlendirmek yerine, uyarıları aşağıdaki grafiklerde ilişkilendirir: faaliyetleri, çok adımlı saldırı modellerini tanımlar ve önem derecesine göre otomatik olarak önceliklendirir bağlamsal.
Ne Öğreneceksiniz
- geleneksel triyaj neden zayıf ölçekleniyor ve grafik analizi bunu nasıl dönüştürüyor?
- Grafik tabanlı uyarı korelasyon sisteminin mimarisi
- NetworkX ve Neo4j ile pratik uygulama
- Otomatik önceliklendirme için puanlama algoritmaları
- Mevcut SOC işlem hatlarıyla entegrasyon
- Başarı ölçümleri: MTTD, yanlış pozitif oranı, analist verimi
Uyarı Yorgunluğu Sorunu
Çözüm üretmeden önce sorunu derinlemesine anlamak gerekir. Uyarı yorgunluğu değil basitçe "çok fazla uyarı": kökleri geleneksel SIEM'lerin mimarisinden gelen sistemik bir sorundur ve insanın bilişsel sınırları dahilinde.
Geleneksel bir SIEM, her günlük olayını bir dizi kurala göre ayrı ayrı değerlendirir. Ne zaman bir kural eşleşir, bir uyarı oluşturur. Sonuç şudur:
- Yasal bir ağ taraması (örn. Nessus güvenlik açığı taraması) yüzlerce uyarı üretir
Port Scan Detected - Otomatik yama işlemi düzinelerce uyarı oluşturur
Suspicious Process Creation - VPN aracılığıyla evden giriş yapan bir kullanıcı uyarı oluşturur
Impossible Traveldoğru yapılandırılmamışsa
Grafik analizi bu sorunu zarif bir şekilde çözer: bunun bir parçası olan uyarıları gruplandırır Aynı saldırı senaryosunun bağlamsallaştırılmış tek bir olayda, Analistler, dakikalar yerine saniyeler içinde bilinçli kararlar almak için gereken bağlamı sağlıyor.
Sektör Verileri (2025)
- Kuruluşların %73'ünde otomatik uyarı önceliklendirmesi bulunmaktadır (Gurucul 2025)
- Yapay zeka otomasyonu, benimseyenlerin %60'ı için araştırma süresini %25-50 oranında azaltır
- ReliaQuest: AI otomasyonu ile yanıt süresi < 7 dakikaya karşılık 2,3 gün
- Dropzone AI: 3-10 dakikalık incelemelerle %90 uyarı kapsamı
Grafik Tabanlı Uyarı Korelasyonunun Temelleri
Temel ve basit kavram: Her uyarı bir düğüm grafikte ve ilişkiler uyarılar arasında (aynı ana bilgisayar, aynı kullanıcı, aynı zaman penceresi, aynı ATT&CK tekniği) kemerler. Ortaya çıkan grafik ortaya çıkıyor Potansiyel saldırı senaryolarını temsil eden ilgili uyarı kümeleri.
En kullanışlı korelasyon türleri şunlardır:
| Korelasyon Türü | Kriter | Güç | Örnek |
|---|---|---|---|
| Fırtına | T penceresi içindeki uyarı (örn. 5 dakika) | Düşük | Bağlantı noktası taraması + kaba kuvvet aynı anda |
| Varlık | Aynı ana bilgisayar/IP/kullanıcı | Ortalama | Aynı uç noktada farklı uyarılar |
| ATT&CK Öldürme Zinciri | Mantıksal sırayla teknikler | Yüksek | Keşif + İlk Erişim + Kalıcılık |
| IOC Örtüşmesi | Aynı kötü amaçlı karma/etki alanı/IP | Yüksek | Aynı C2 artı uyarısı |
| Nedensel | Ebeveyn/çocuk süreci, kaynaklı ağ | Çok yüksek | cmd.exe, yükü indiren word.exe tarafından başlatıldı |
Uygulama: NetworkX ile Uyarı Grafiği
Aşağıdakileri kullanarak bir Python uygulamasıyla başlayalım: AğX Grafik yönetimi için. Bu çözüm, prototipler ve orta hacimli ortamlar için uygundur (günde ~100 bin uyarıya kadar). Daha yüksek hacimler için Neo4j kullanılır (sonraki bölüme bakın).
# Sistema di Alert Graph Correlation
# File: alert_graph.py
import networkx as nx
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
import uuid
import json
@dataclass
class Alert:
id: str
timestamp: datetime
rule_name: str
severity: str # 'low', 'medium', 'high', 'critical'
host: str
user: Optional[str]
src_ip: Optional[str]
dst_ip: Optional[str]
technique_id: Optional[str] # MITRE ATT&CK ID (es. T1059.001)
raw_data: dict = field(default_factory=dict)
def severity_score(self) -> int:
return {'low': 1, 'medium': 2, 'high': 3, 'critical': 4}.get(self.severity, 1)
@dataclass
class AlertCluster:
id: str
alerts: list[Alert]
score: float
attack_chain: list[str] # Sequenza di tecniche ATT&CK
primary_host: str
created_at: datetime
class AlertGraphCorrelator:
# Finestra temporale per correlazione (default: 30 minuti)
CORRELATION_WINDOW_MINUTES = 30
# Pesi per il calcolo del punteggio cluster
WEIGHT_SEVERITY = 3.0
WEIGHT_TECHNIQUE_CHAIN = 5.0
WEIGHT_SAME_HOST = 2.0
WEIGHT_SAME_USER = 2.5
WEIGHT_IOC_OVERLAP = 4.0
# Kill chain ATT&CK semplificata per correlazione
KILL_CHAIN_ORDER = [
'TA0043', # Reconnaissance
'TA0042', # Resource Development
'TA0001', # Initial Access
'TA0002', # Execution
'TA0003', # Persistence
'TA0004', # Privilege Escalation
'TA0005', # Defense Evasion
'TA0006', # Credential Access
'TA0007', # Discovery
'TA0008', # Lateral Movement
'TA0009', # Collection
'TA0011', # Command and Control
'TA0010', # Exfiltration
'TA0040', # Impact
]
def __init__(self):
self.graph = nx.DiGraph()
self.alerts: dict[str, Alert] = {}
def add_alert(self, alert: Alert) -> None:
"""Aggiunge un alert al grafo e crea correlazioni."""
self.alerts[alert.id] = alert
# Aggiungi nodo con attributi
self.graph.add_node(alert.id, **{
'timestamp': alert.timestamp.isoformat(),
'severity': alert.severity,
'host': alert.host,
'user': alert.user,
'technique': alert.technique_id,
'score': alert.severity_score()
})
# Cerca correlazioni con alert esistenti
for existing_id, existing in self.alerts.items():
if existing_id == alert.id:
continue
correlations = self._calculate_correlations(alert, existing)
if correlations:
total_weight = sum(c['weight'] for c in correlations)
edge_labels = [c['type'] for c in correlations]
self.graph.add_edge(
existing_id, alert.id,
weight=total_weight,
correlation_types=edge_labels
)
def _calculate_correlations(self, alert1: Alert,
alert2: Alert) -> list[dict]:
"""Calcola le correlazioni tra due alert."""
correlations = []
# 1. Correlazione temporale
time_diff = abs((alert1.timestamp - alert2.timestamp).total_seconds())
if time_diff <= self.CORRELATION_WINDOW_MINUTES * 60:
time_weight = 1.0 - (time_diff / (self.CORRELATION_WINDOW_MINUTES * 60))
correlations.append({'type': 'temporal', 'weight': time_weight})
# 2. Stesso host
if alert1.host == alert2.host:
correlations.append({'type': 'same_host', 'weight': self.WEIGHT_SAME_HOST})
# 3. Stesso utente
if (alert1.user and alert2.user and alert1.user == alert2.user):
correlations.append({'type': 'same_user', 'weight': self.WEIGHT_SAME_USER})
# 4. IOC overlap (IP)
if (alert1.src_ip and alert2.src_ip and alert1.src_ip == alert2.src_ip):
correlations.append({'type': 'ioc_overlap_ip', 'weight': self.WEIGHT_IOC_OVERLAP})
# 5. Kill chain sequenziale ATT&CK
if alert1.technique_id and alert2.technique_id:
chain_score = self._calculate_kill_chain_score(
alert1.technique_id, alert2.technique_id
)
if chain_score > 0:
correlations.append({'type': 'kill_chain', 'weight': chain_score})
return correlations
def _calculate_kill_chain_score(self, technique1: str,
technique2: str) -> float:
"""Calcola un punteggio basato sulla progressione kill chain."""
# Mapping semplificato tecnica -> tattica
# In produzione si usa la MITRE ATT&CK API
technique_to_tactic = {
'T1595': 'TA0043', # Recon - Active Scanning
'T1190': 'TA0001', # Initial Access - Exploit Public-Facing App
'T1059': 'TA0002', # Execution - Command and Scripting
'T1053': 'TA0003', # Persistence - Scheduled Task
'T1078': 'TA0004', # Privilege Escalation - Valid Accounts
'T1562': 'TA0005', # Defense Evasion - Impair Defenses
'T1003': 'TA0006', # Credential Access - OS Credential Dumping
'T1087': 'TA0007', # Discovery - Account Discovery
'T1021': 'TA0008', # Lateral Movement - Remote Services
'T1071': 'TA0011', # C2 - Application Layer Protocol
}
tactic1 = technique_to_tactic.get(technique1)
tactic2 = technique_to_tactic.get(technique2)
if not tactic1 or not tactic2:
return 0.0
try:
idx1 = self.KILL_CHAIN_ORDER.index(tactic1)
idx2 = self.KILL_CHAIN_ORDER.index(tactic2)
# Punteggio più alto se la progressione e logica (tecnica più avanzata dopo)
if idx2 > idx1:
progression = (idx2 - idx1) / len(self.KILL_CHAIN_ORDER)
return self.WEIGHT_TECHNIQUE_CHAIN * progression
except ValueError:
pass
return 0.0
def get_clusters(self, min_cluster_size: int = 2) -> list[AlertCluster]:
"""Identifica cluster di alert correlati."""
# Usa connected components sul grafo non diretto per trovare i cluster
undirected = self.graph.to_undirected()
components = list(nx.connected_components(undirected))
clusters = []
for component in components:
if len(component) < min_cluster_size:
continue
component_alerts = [self.alerts[aid] for aid in component
if aid in self.alerts]
score = self._calculate_cluster_score(component_alerts, component)
attack_chain = self._extract_attack_chain(component_alerts)
primary_host = self._find_primary_host(component_alerts)
clusters.append(AlertCluster(
id=str(uuid.uuid4()),
alerts=component_alerts,
score=score,
attack_chain=attack_chain,
primary_host=primary_host,
created_at=datetime.now()
))
# Ordina per score decrescente (massima priorità prima)
return sorted(clusters, key=lambda c: c.score, reverse=True)
def _calculate_cluster_score(self, alerts: list[Alert],
component: set) -> float:
"""Calcola il punteggio di priorità del cluster."""
score = 0.0
# 1. Contributo severita
severity_sum = sum(a.severity_score() for a in alerts)
max_severity = max(a.severity_score() for a in alerts)
score += severity_sum * self.WEIGHT_SEVERITY
score += max_severity * 2 # Bonus per alert critici
# 2. Numero di tecniche ATT&CK distinte
techniques = set(a.technique_id for a in alerts if a.technique_id)
score += len(techniques) * self.WEIGHT_TECHNIQUE_CHAIN
# 3. Peso archi nel sottografo
subgraph = self.graph.subgraph(component)
edge_weight_sum = sum(
data.get('weight', 0)
for _, _, data in subgraph.edges(data=True)
)
score += edge_weight_sum
# 4. Numero host distinti (lateral movement indicator)
hosts = set(a.host for a in alerts)
if len(hosts) > 1:
score += len(hosts) * 3.0 # Lateral movement e molto significativo
return score
def _extract_attack_chain(self, alerts: list[Alert]) -> list[str]:
"""Estrae la kill chain osservata dal cluster."""
techniques = [a.technique_id for a in alerts if a.technique_id]
# Ordina per timestamp
sorted_alerts = sorted(alerts, key=lambda a: a.timestamp)
return [a.technique_id for a in sorted_alerts if a.technique_id]
def _find_primary_host(self, alerts: list[Alert]) -> str:
"""Identifica l'host più coinvolto nel cluster."""
host_counts = {}
for alert in alerts:
host_counts[alert.host] = host_counts.get(alert.host, 0) + 1
return max(host_counts, key=host_counts.get) if host_counts else 'unknown'
Çok Faktörlü Puanlama ile Otomatik Önceliklendirme
Bir kümenin puanlanması yalnızca bireysel uyarıların ciddiyetini değil aynı zamanda the bağlamsal bağlam: Öldürme zincirindeki ilerleme, olayın kritikliği ilgili varlıklar, bilinen kötü niyetli IOC'lerin varlığı.
# Sistema di scoring avanzato con contesto asset
# File: alert_scorer.py
@dataclass
class AssetCriticality:
hostname: str
criticality: str # 'low', 'medium', 'high', 'critical'
asset_type: str # 'workstation', 'server', 'dc', 'database', 'ot'
business_owner: str
class ContextualScorer:
# Moltiplicatori per criticalita asset
ASSET_MULTIPLIERS = {
'workstation': 1.0,
'server': 1.5,
'database': 2.0,
'dc': 3.0, # Domain Controller
'ot': 4.0 # OT/ICS systems
}
CRITICALITY_MULTIPLIERS = {
'low': 1.0,
'medium': 1.5,
'high': 2.0,
'critical': 3.0
}
def __init__(self, asset_registry: dict[str, AssetCriticality],
threat_intel_ips: set[str]):
self.asset_registry = asset_registry
self.threat_intel_ips = threat_intel_ips
def score_cluster(self, cluster: AlertCluster) -> dict:
"""Calcola score completo con breakdown."""
base_score = cluster.score
context_multiplier = 1.0
breakdown = {}
# 1. Asset criticality multiplier
asset = self.asset_registry.get(cluster.primary_host)
if asset:
type_mult = self.ASSET_MULTIPLIERS.get(asset.asset_type, 1.0)
crit_mult = self.CRITICALITY_MULTIPLIERS.get(asset.criticality, 1.0)
asset_mult = type_mult * crit_mult
context_multiplier *= asset_mult
breakdown['asset_multiplier'] = asset_mult
# 2. Threat Intel overlap
ti_hits = sum(
1 for alert in cluster.alerts
if alert.src_ip in self.threat_intel_ips
)
if ti_hits > 0:
ti_boost = 1.0 + (ti_hits * 0.5)
context_multiplier *= ti_boost
breakdown['threat_intel_boost'] = ti_boost
# 3. Kill chain completeness
chain_length = len(cluster.attack_chain)
chain_multiplier = 1.0 + (chain_length * 0.2) # +20% per ogni step
context_multiplier *= chain_multiplier
breakdown['chain_multiplier'] = chain_multiplier
# 4. Time pressure (alert recenti hanno priorità maggiore)
most_recent = max(a.timestamp for a in cluster.alerts)
age_minutes = (datetime.now() - most_recent).total_seconds() / 60
recency_multiplier = max(0.5, 1.0 - (age_minutes / 1440)) # Decade in 24h
context_multiplier *= recency_multiplier
breakdown['recency_multiplier'] = recency_multiplier
final_score = base_score * context_multiplier
breakdown['base_score'] = base_score
breakdown['context_multiplier'] = context_multiplier
breakdown['final_score'] = final_score
return breakdown
def prioritize_queue(self, clusters: list[AlertCluster]) -> list[dict]:
"""Genera la coda di lavoro prioritizzata per gli analisti."""
scored = []
for cluster in clusters:
score_breakdown = self.score_cluster(cluster)
scored.append({
'cluster': cluster,
'score': score_breakdown['final_score'],
'breakdown': score_breakdown,
'priority': self._score_to_priority(score_breakdown['final_score'])
})
return sorted(scored, key=lambda x: x['score'], reverse=True)
def _score_to_priority(self, score: float) -> str:
if score >= 100:
return 'P1 - Critical'
elif score >= 50:
return 'P2 - High'
elif score >= 20:
return 'P3 - Medium'
else:
return 'P4 - Low'
Kurumsal Birimler için Neo4j ile entegrasyon
Günde milyonlarca uyarının bulunduğu kurumsal ortamlar için bellek içi NetworkX ölçeklenebilir değildir. Neo4jEn popüler grafik veritabanı, yerel sorgu performansı sunar karmaşık korelasyonlar ve tarihsel verilerin kalıcılığı.
# Alert Graph su Neo4j
# File: neo4j_correlator.py
from neo4j import GraphDatabase
from datetime import datetime, timedelta
class Neo4jAlertCorrelator:
def __init__(self, uri: str, username: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(username, password))
self._create_indexes()
def _create_indexes(self) -> None:
"""Crea indici per query performance."""
with self.driver.session() as session:
session.run("""
CREATE INDEX alert_timestamp IF NOT EXISTS
FOR (a:Alert) ON (a.timestamp)
""")
session.run("""
CREATE INDEX alert_host IF NOT EXISTS
FOR (a:Alert) ON (a.host)
""")
session.run("""
CREATE INDEX alert_user IF NOT EXISTS
FOR (a:Alert) ON (a.user)
""")
def ingest_alert(self, alert: dict) -> None:
"""Inserisce un alert e crea relazioni di correlazione."""
with self.driver.session() as session:
# Crea il nodo Alert
session.run("""
CREATE (a:Alert {
id: $id,
timestamp: datetime($timestamp),
rule_name: $rule_name,
severity: $severity,
host: $host,
user: $user,
src_ip: $src_ip,
technique_id: $technique_id
})
""", **alert)
# Crea relazione SAME_HOST con alert recenti
session.run("""
MATCH (a:Alert {id: $id})
MATCH (b:Alert)
WHERE b.id <> $id
AND b.host = a.host
AND b.timestamp >= datetime($cutoff)
AND NOT (a)-[:SAME_HOST]-(b)
MERGE (a)-[:SAME_HOST {weight: 2.0}]-(b)
""", id=alert['id'],
cutoff=(datetime.fromisoformat(alert['timestamp'])
- timedelta(minutes=30)).isoformat())
# Crea relazione KILL_CHAIN per progressione logica
session.run("""
MATCH (a:Alert {id: $id})
MATCH (b:Alert)
WHERE b.id <> $id
AND b.host = a.host
AND b.timestamp < a.timestamp
AND b.timestamp >= datetime($cutoff)
AND b.technique_id IS NOT NULL
AND a.technique_id IS NOT NULL
MERGE (b)-[:PRECEDES {weight: 3.0}]->(a)
""", id=alert['id'],
cutoff=(datetime.fromisoformat(alert['timestamp'])
- timedelta(hours=2)).isoformat())
def find_incidents(self, min_alerts: int = 3,
hours_back: int = 24) -> list[dict]:
"""Trova cluster di alert che rappresentano potenziali incidenti."""
cutoff = (datetime.now() - timedelta(hours=hours_back)).isoformat()
with self.driver.session() as session:
result = session.run("""
MATCH (a:Alert)
WHERE a.timestamp >= datetime($cutoff)
CALL apoc.path.subgraphNodes(a, {
relationshipFilter: 'SAME_HOST|SAME_USER|PRECEDES',
maxLevel: 5
}) YIELD node
WITH collect(DISTINCT node) AS cluster_nodes
WHERE size(cluster_nodes) >= $min_alerts
RETURN cluster_nodes,
reduce(s = 0, n IN cluster_nodes |
s + CASE n.severity
WHEN 'critical' THEN 4
WHEN 'high' THEN 3
WHEN 'medium' THEN 2
ELSE 1 END) AS total_score
ORDER BY total_score DESC
LIMIT 100
""", cutoff=cutoff, min_alerts=min_alerts)
return [dict(record) for record in result]
def get_attack_path(self, incident_id: str) -> list[dict]:
"""Recupera il percorso di attacco per un incidente."""
with self.driver.session() as session:
result = session.run("""
MATCH path = (start:Alert)-[:PRECEDES*]->(end:Alert)
WHERE start.id IN $incident_alerts
AND NOT ()-[:PRECEDES]->(start)
RETURN [node IN nodes(path) |
{id: node.id, technique: node.technique_id,
host: node.host, timestamp: node.timestamp}
] AS attack_path
ORDER BY length(path) DESC
LIMIT 1
""", incident_alerts=[incident_id])
return [dict(record) for record in result]
def close(self) -> None:
self.driver.close()
Triyaj Kontrol Paneli ve API
Korelasyon ve önceliklendirme sistemi, SOC platformlarının (TheHive, Cortex XSOAR, IBM QRadar) analistlere bir iş kuyruğu sunmak için tüketebilir düz bir uyarı listesi yerine akıllı.
# FastAPI endpoint per il triage queue
# File: triage_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime
app = FastAPI(title="Alert Triage API")
class AlertIngestionRequest(BaseModel):
id: str
timestamp: str
rule_name: str
severity: str
host: str
user: str | None = None
src_ip: str | None = None
technique_id: str | None = None
raw_data: dict = {}
class TriageResponse(BaseModel):
incident_id: str
priority: str
score: float
alert_count: int
primary_host: str
attack_chain: list[str]
recommended_actions: list[str]
@app.post("/api/v1/alerts")
async def ingest_alert(alert: AlertIngestionRequest) -> dict:
"""Ingesta un alert e ritorna la correlazione risultante."""
correlator = get_correlator() # Singleton o dependency injection
alert_obj = Alert(**alert.dict())
correlator.add_alert(alert_obj)
# Ottieni cluster aggiornato
clusters = correlator.get_clusters()
affected_cluster = next(
(c for c in clusters if any(a.id == alert.id for a in c.alerts)),
None
)
if affected_cluster:
return {
"status": "correlated",
"cluster_id": affected_cluster.id,
"cluster_size": len(affected_cluster.alerts),
"cluster_score": affected_cluster.score
}
return {"status": "isolated", "cluster_id": None}
@app.get("/api/v1/triage-queue")
async def get_triage_queue(limit: int = 50, min_score: float = 0) -> list[dict]:
"""Ritorna la coda di triage prioritizzata."""
correlator = get_correlator()
scorer = get_scorer()
clusters = correlator.get_clusters()
prioritized = scorer.prioritize_queue(clusters)
return [
{
"incident_id": item['cluster'].id,
"priority": item['priority'],
"score": round(item['score'], 2),
"alert_count": len(item['cluster'].alerts),
"primary_host": item['cluster'].primary_host,
"attack_chain": item['cluster'].attack_chain,
"created_at": item['cluster'].created_at.isoformat(),
"score_breakdown": item['breakdown']
}
for item in prioritized
if item['score'] >= min_score
][:limit]
@app.get("/api/v1/incident/{incident_id}/timeline")
async def get_incident_timeline(incident_id: str) -> dict:
"""Ritorna la timeline degli eventi per un incidente."""
correlator = get_correlator()
clusters = correlator.get_clusters()
cluster = next((c for c in clusters if c.id == incident_id), None)
if not cluster:
raise HTTPException(status_code=404, detail="Incident not found")
sorted_alerts = sorted(cluster.alerts, key=lambda a: a.timestamp)
return {
"incident_id": incident_id,
"timeline": [
{
"timestamp": a.timestamp.isoformat(),
"rule": a.rule_name,
"host": a.host,
"user": a.user,
"technique": a.technique_id,
"severity": a.severity
}
for a in sorted_alerts
],
"duration_minutes": (
(sorted_alerts[-1].timestamp - sorted_alerts[0].timestamp).total_seconds() / 60
) if len(sorted_alerts) > 1 else 0
}
Başarı Metrikleri ve İzleme
Herhangi bir triyaj otomasyon sistemi, doğrulamak için objektif ölçümlerle izlenmelidir. aslında SOC'nin verimliliğini artırıyor ve yeni sorunlar getirmiyor.
# Monitoring delle metriche SOC
# File: soc_metrics.py
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class SOCMetrics:
# Metriche chiave
total_alerts: int = 0
correlated_alerts: int = 0
true_positives: int = 0
false_positives: int = 0
total_incidents: int = 0
# Tempi (in minuti)
mttd_values: list[float] = field(default_factory=list) # Mean Time to Detect
mtti_values: list[float] = field(default_factory=list) # Mean Time to Investigate
mttr_values: list[float] = field(default_factory=list) # Mean Time to Respond
def correlation_rate(self) -> float:
"""% di alert correlati in incidenti."""
if self.total_alerts == 0:
return 0.0
return (self.correlated_alerts / self.total_alerts) * 100
def false_positive_rate(self) -> float:
"""% di falsi positivi sul totale investigato."""
total = self.true_positives + self.false_positives
if total == 0:
return 0.0
return (self.false_positives / total) * 100
def avg_mttd(self) -> float:
if not self.mttd_values:
return 0.0
return sum(self.mttd_values) / len(self.mttd_values)
def alert_compression_ratio(self) -> float:
"""Quanti alert per incidente in media (riduzione noise)."""
if self.total_incidents == 0:
return 1.0
return self.correlated_alerts / self.total_incidents
def report(self) -> dict:
return {
"total_alerts": self.total_alerts,
"total_incidents": self.total_incidents,
"alert_compression_ratio": f"{self.alert_compression_ratio():.1f}:1",
"correlation_rate_pct": f"{self.correlation_rate():.1f}%",
"false_positive_rate_pct": f"{self.false_positive_rate():.1f}%",
"avg_mttd_minutes": f"{self.avg_mttd():.1f}",
"avg_mtti_minutes": (
f"{sum(self.mtti_values)/len(self.mtti_values):.1f}"
if self.mtti_values else "N/A"
)
}
Triyaj Otomasyonunda Anti-Paternler
- Korelasyon eşiği çok düşük: Herhangi bir uyarıyı 24 saat içinde ilişkilendirin aynı ana bilgisayarda çok büyük, işe yaramaz kümeler oluşturur. Dar zaman aralıkları kullanın (15-30 dakika) zayıf korelasyonlar için.
- Varlık bağlamı olmadan puan: Bal küpünde "yüksek" uyarısı ve çok daha azı Etki Alanı Denetleyicisinde aynı uyarının aciliyeti. Her zaman kritik varlıklarla zenginleştirin.
- Geri bildirim döngüsü olmayan otomasyon: Sistem kullanıcı geri bildirimlerinden öğrenmelidir analistlerin (TP/FP) zaman içinde iyileşmesi. Statik bir sistem bozulur.
- Tek uyarı olaylarını yoksay: Her saldırı sizi daha fazla tetikte bırakmaz. Kritik izole uyarılar (örneğin DCSync) korelasyonu atlamalı ve doğrudan P1 kuyruğuna gitmelidir.
TheHive ve SOAR ile entegrasyon
Triyaj sistemi, TheHive veya Cortex gibi SOAR platformlarıyla doğal olarak bütünleşir Olay yaşam döngüsünü ve yanıt otomasyonunu yöneten.
# Integrazione TheHive
# File: thehive_integration.py
import httpx
from datetime import datetime
class TheHiveIntegration:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def create_case_from_cluster(self, cluster: AlertCluster,
score_info: dict) -> str:
"""Crea un caso TheHive da un cluster di alert."""
severity_map = {
'P1 - Critical': 3,
'P2 - High': 2,
'P3 - Medium': 1,
'P4 - Low': 1
}
case_payload = {
"title": f"[AUTO] Incident {cluster.id[:8]} - {cluster.primary_host}",
"description": self._build_description(cluster, score_info),
"severity": severity_map.get(score_info.get('priority', 'P4 - Low'), 1),
"startDate": int(cluster.created_at.timestamp() * 1000),
"tags": [
f"auto-generated",
f"host:{cluster.primary_host}",
f"alerts:{len(cluster.alerts)}",
*[f"att&ck:{t}" for t in cluster.attack_chain[:5]]
],
"tasks": self._generate_tasks(cluster, score_info)
}
with httpx.Client() as client:
response = client.post(
f"{self.base_url}/api/case",
json=case_payload,
headers=self.headers
)
response.raise_for_status()
return response.json()['id']
def _build_description(self, cluster: AlertCluster,
score_info: dict) -> str:
alerts_summary = "\n".join(
f"- [{a.severity.upper()}] {a.rule_name} @ {a.host} ({a.timestamp.strftime('%H:%M:%S')})"
for a in sorted(cluster.alerts, key=lambda x: x.timestamp)
)
return f"""## Alert Cluster Auto-Generated
**Score**: {score_info.get('score', 'N/A')}
**Priority**: {score_info.get('priority', 'N/A')}
**Primary Host**: {cluster.primary_host}
**Alert Count**: {len(cluster.alerts)}
### Attack Chain
{' -> '.join(cluster.attack_chain) if cluster.attack_chain else 'N/A'}
### Alert Timeline
{alerts_summary}
### Score Breakdown
{chr(10).join(f"- {k}: {v}" for k, v in score_info.get('breakdown', {}).items())}
"""
def _generate_tasks(self, cluster: AlertCluster,
score_info: dict) -> list[dict]:
"""Genera task di investigazione automatici."""
tasks = [
{"title": "Verify alert legitimacy", "order": 0},
{"title": f"Investigate host: {cluster.primary_host}", "order": 1},
]
if len(set(a.host for a in cluster.alerts)) > 1:
tasks.append({"title": "Assess lateral movement scope", "order": 2})
if cluster.attack_chain:
tasks.append({"title": "Map attack progression to ATT&CK", "order": 3})
tasks.append({"title": "Document findings and close/escalate", "order": 99})
return tasks
Sonuçlar ve Temel Çıkarımlar
Grafik analizi yoluyla triyaj otomasyonu bir lüks değildir: operasyonel bir gerekliliktir. Analistlerin sayısını orantılı olarak ölçeklendirmeden ölçeklendirmek isteyen herhangi bir SOC. Uyarı gürültüsünü azaltma, bağlamsal korelasyon ve akıllı önceliklendirme analistlerin asıl önemli olana, yani gerçek tehditleri araştırmaya odaklanmasına olanak tanır.
Temel Çıkarımlar
- Grafik analizi, yalıtılmış uyarıları bağlama dayalı saldırı senaryolarına dönüştürür
- Çok faktörlü puanlama (önem + varlık kritikliği + öldürme zinciri + tehdit istihbaratı), önem derecesine göre basit sıralamadan daha üstündür
- Prototipler için NetworkX, kurumsal üretim için Neo4j
- SOAR (TheHive, XSOAR) ile entegrasyon, otomasyon soruşturma döngüsünü kapatır
- Her zaman SOC ölçümlerini izleyin: MTTD, yanlış pozitif oranı, uyarı sıkıştırma oranı
- Analist geri bildirimi sistemin sürekli iyileştirilmesinin temelidir
İlgili Makaleler
- Python'da SOAR Başucu Kitabı: Olay Müdahale Otomasyonu
- Yapay Zeka Destekli Tespit: Sigma Kuralları Oluşturma için Yüksek Lisans
- Davranışsal Anormallik Tespiti: Günlük Verilerinde ML
- Bir Disiplin Olarak Tespit Mühendisliği: Komut Dosyasından İşlem Hattına







