Automatisering van waarschuwingstriage: verminder MTTD met grafiekanalyse
Het probleem van waarschuwingstriage is een van de duurste en frustrerendste in het leven van een SOC-analist. Volgens IBM-gegevens uit 2025 is de gemiddelde tijd om een inbreuk te identificeren (MTTD - Mean Time to Detect) en opnieuw van 194 dagen voor organisaties die geen geavanceerde automatisering hebben geïmplementeerd. Aan de andere kant verminderen organisaties die AI-automatisering combineren met op grafieken gebaseerde correlatie dit waarde een uren of zelfs minuten over dezelfde categorieën bedreigingen.
De kern van het probleem is waarschuwingsmoeheid: een gemiddeld SOC verwerkt duizenden waarschuwingen per dag, in een bepaald tempo van valse positieven die in sommige omgevingen de 97% overschrijdt. Analisten besteden het grootste deel van hun tijd om goedaardige waarschuwingen te evalueren in plaats van echte bedreigingen te onderzoeken. Grafiekanalyse biedt één benadering radicaal anders: in plaats van elke waarschuwing afzonderlijk te evalueren, correleert het waarschuwingen in grafieken van activiteiten, identificeert aanvalspatronen in meerdere stappen en stelt automatisch prioriteiten op basis van ernst contextueel.
Wat je gaat leren
- waarom traditionele triage slecht schaalt en hoe grafiekanalyse deze transformeert
- Architectuur van een op grafieken gebaseerd waarschuwingscorrelatiesysteem
- Praktische implementatie met NetworkX en Neo4j
- Scoringsalgoritmen voor automatische prioritering
- Integratie met bestaande SOC-pijplijnen
- Successtatistieken: MTTD, fout-positief percentage, doorvoer van analisten
Het probleem van alerte vermoeidheid
Voordat oplossingen worden ontwikkeld, is het noodzakelijk om het probleem grondig te begrijpen. Alerte vermoeidheid is dat niet simpelweg ‘te veel waarschuwingen’: het is een systemisch probleem dat zijn wortels heeft in de architectuur van traditionele SIEM’s en binnen menselijke cognitieve grenzen.
Een traditionele SIEM evalueert elke loggebeurtenis afzonderlijk aan de hand van een reeks regels. Wanneer een regel overeenkomt, genereert een waarschuwing. Het resultaat is dat:
- Een legitieme netwerkscan (bijvoorbeeld een Nessus-kwetsbaarheidsscan) genereert honderden waarschuwingen
Port Scan Detected - Een automatisch patchproces genereert tientallen waarschuwingen
Suspicious Process Creation - Een gebruiker die vanuit huis inlogt via VPN genereert alerts
Impossible Travelals het niet correct is geconfigureerd
Grafiekanalyse lost dit probleem op elegante wijze op: groepeert de waarschuwingen die er deel van uitmaken van hetzelfde aanvalsscenario in één enkel gecontextualiseerd incident, waardoor de analisten de context die nodig is om weloverwogen beslissingen te nemen in seconden in plaats van minuten.
Sectorgegevens (2025)
- 73% van de organisaties beschikt over geautomatiseerde waarschuwingstriage (Gurucul 2025)
- AI-automatisering vermindert de onderzoekstijd met 25-50% voor 60% van de gebruikers
- ReliaQuest: met AI-automatisering, responstijd < 7 minuten versus 2,3 dagen zonder
- Dropzone AI: 90% waarschuwingsdekking met onderzoeken van 3-10 minuten
Grondbeginselen van op grafieken gebaseerde waarschuwingscorrelatie
Het fundamentele en eenvoudige concept: elke waarschuwing is een knooppunt in de grafiek, en de betrekkingen tussen waarschuwingen (dezelfde host, dezelfde gebruiker, hetzelfde tijdvenster, dezelfde ATT&CK-techniek) zijn de bogen. De resulterende grafiek onthult clusters van gerelateerde waarschuwingen die potentiële aanvalsscenario's vertegenwoordigen.
De meest bruikbare soorten correlaties zijn:
| Type correlatie | Criterium | Kracht | Voorbeeld |
|---|---|---|---|
| Storm | Alarm binnen venster T (bijv. 5 min) | Laag | Poortscan + brute kracht tegelijkertijd |
| Entiteit | Dezelfde host/IP/gebruiker | Gemiddeld | Verschillende waarschuwingen op hetzelfde eindpunt |
| ATT&CK-kill-keten | Technieken in logische volgorde | Hoog | Verkenning + initiële toegang + persistentie |
| IOC-overlap | Dezelfde kwaadaardige hash/domein/IP | Hoog | Zelfde C2 plus waarschuwing |
| Oorzakelijk | Ouder/kind-proces, ontstaan netwerk | Zeer hoog | cmd.exe gestart door word.exe waarmee de payload wordt gedownload |
Implementatie: Alert Graph met NetworkX
Laten we beginnen met een Python-implementatie met behulp van NetwerkX voor grafiekbeheer. Deze oplossing is geschikt voor prototypes en omgevingen met een gemiddeld volume (tot ~100.000 waarschuwingen/dag). Voor hogere volumes wordt Neo4j gebruikt (zie volgende sectie).
# Sistema di Alert Graph Correlation
# File: alert_graph.py
import networkx as nx
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
import uuid
import json
@dataclass
class Alert:
id: str
timestamp: datetime
rule_name: str
severity: str # 'low', 'medium', 'high', 'critical'
host: str
user: Optional[str]
src_ip: Optional[str]
dst_ip: Optional[str]
technique_id: Optional[str] # MITRE ATT&CK ID (es. T1059.001)
raw_data: dict = field(default_factory=dict)
def severity_score(self) -> int:
return {'low': 1, 'medium': 2, 'high': 3, 'critical': 4}.get(self.severity, 1)
@dataclass
class AlertCluster:
id: str
alerts: list[Alert]
score: float
attack_chain: list[str] # Sequenza di tecniche ATT&CK
primary_host: str
created_at: datetime
class AlertGraphCorrelator:
# Finestra temporale per correlazione (default: 30 minuti)
CORRELATION_WINDOW_MINUTES = 30
# Pesi per il calcolo del punteggio cluster
WEIGHT_SEVERITY = 3.0
WEIGHT_TECHNIQUE_CHAIN = 5.0
WEIGHT_SAME_HOST = 2.0
WEIGHT_SAME_USER = 2.5
WEIGHT_IOC_OVERLAP = 4.0
# Kill chain ATT&CK semplificata per correlazione
KILL_CHAIN_ORDER = [
'TA0043', # Reconnaissance
'TA0042', # Resource Development
'TA0001', # Initial Access
'TA0002', # Execution
'TA0003', # Persistence
'TA0004', # Privilege Escalation
'TA0005', # Defense Evasion
'TA0006', # Credential Access
'TA0007', # Discovery
'TA0008', # Lateral Movement
'TA0009', # Collection
'TA0011', # Command and Control
'TA0010', # Exfiltration
'TA0040', # Impact
]
def __init__(self):
self.graph = nx.DiGraph()
self.alerts: dict[str, Alert] = {}
def add_alert(self, alert: Alert) -> None:
"""Aggiunge un alert al grafo e crea correlazioni."""
self.alerts[alert.id] = alert
# Aggiungi nodo con attributi
self.graph.add_node(alert.id, **{
'timestamp': alert.timestamp.isoformat(),
'severity': alert.severity,
'host': alert.host,
'user': alert.user,
'technique': alert.technique_id,
'score': alert.severity_score()
})
# Cerca correlazioni con alert esistenti
for existing_id, existing in self.alerts.items():
if existing_id == alert.id:
continue
correlations = self._calculate_correlations(alert, existing)
if correlations:
total_weight = sum(c['weight'] for c in correlations)
edge_labels = [c['type'] for c in correlations]
self.graph.add_edge(
existing_id, alert.id,
weight=total_weight,
correlation_types=edge_labels
)
def _calculate_correlations(self, alert1: Alert,
alert2: Alert) -> list[dict]:
"""Calcola le correlazioni tra due alert."""
correlations = []
# 1. Correlazione temporale
time_diff = abs((alert1.timestamp - alert2.timestamp).total_seconds())
if time_diff <= self.CORRELATION_WINDOW_MINUTES * 60:
time_weight = 1.0 - (time_diff / (self.CORRELATION_WINDOW_MINUTES * 60))
correlations.append({'type': 'temporal', 'weight': time_weight})
# 2. Stesso host
if alert1.host == alert2.host:
correlations.append({'type': 'same_host', 'weight': self.WEIGHT_SAME_HOST})
# 3. Stesso utente
if (alert1.user and alert2.user and alert1.user == alert2.user):
correlations.append({'type': 'same_user', 'weight': self.WEIGHT_SAME_USER})
# 4. IOC overlap (IP)
if (alert1.src_ip and alert2.src_ip and alert1.src_ip == alert2.src_ip):
correlations.append({'type': 'ioc_overlap_ip', 'weight': self.WEIGHT_IOC_OVERLAP})
# 5. Kill chain sequenziale ATT&CK
if alert1.technique_id and alert2.technique_id:
chain_score = self._calculate_kill_chain_score(
alert1.technique_id, alert2.technique_id
)
if chain_score > 0:
correlations.append({'type': 'kill_chain', 'weight': chain_score})
return correlations
def _calculate_kill_chain_score(self, technique1: str,
technique2: str) -> float:
"""Calcola un punteggio basato sulla progressione kill chain."""
# Mapping semplificato tecnica -> tattica
# In produzione si usa la MITRE ATT&CK API
technique_to_tactic = {
'T1595': 'TA0043', # Recon - Active Scanning
'T1190': 'TA0001', # Initial Access - Exploit Public-Facing App
'T1059': 'TA0002', # Execution - Command and Scripting
'T1053': 'TA0003', # Persistence - Scheduled Task
'T1078': 'TA0004', # Privilege Escalation - Valid Accounts
'T1562': 'TA0005', # Defense Evasion - Impair Defenses
'T1003': 'TA0006', # Credential Access - OS Credential Dumping
'T1087': 'TA0007', # Discovery - Account Discovery
'T1021': 'TA0008', # Lateral Movement - Remote Services
'T1071': 'TA0011', # C2 - Application Layer Protocol
}
tactic1 = technique_to_tactic.get(technique1)
tactic2 = technique_to_tactic.get(technique2)
if not tactic1 or not tactic2:
return 0.0
try:
idx1 = self.KILL_CHAIN_ORDER.index(tactic1)
idx2 = self.KILL_CHAIN_ORDER.index(tactic2)
# Punteggio più alto se la progressione e logica (tecnica più avanzata dopo)
if idx2 > idx1:
progression = (idx2 - idx1) / len(self.KILL_CHAIN_ORDER)
return self.WEIGHT_TECHNIQUE_CHAIN * progression
except ValueError:
pass
return 0.0
def get_clusters(self, min_cluster_size: int = 2) -> list[AlertCluster]:
"""Identifica cluster di alert correlati."""
# Usa connected components sul grafo non diretto per trovare i cluster
undirected = self.graph.to_undirected()
components = list(nx.connected_components(undirected))
clusters = []
for component in components:
if len(component) < min_cluster_size:
continue
component_alerts = [self.alerts[aid] for aid in component
if aid in self.alerts]
score = self._calculate_cluster_score(component_alerts, component)
attack_chain = self._extract_attack_chain(component_alerts)
primary_host = self._find_primary_host(component_alerts)
clusters.append(AlertCluster(
id=str(uuid.uuid4()),
alerts=component_alerts,
score=score,
attack_chain=attack_chain,
primary_host=primary_host,
created_at=datetime.now()
))
# Ordina per score decrescente (massima priorità prima)
return sorted(clusters, key=lambda c: c.score, reverse=True)
def _calculate_cluster_score(self, alerts: list[Alert],
component: set) -> float:
"""Calcola il punteggio di priorità del cluster."""
score = 0.0
# 1. Contributo severita
severity_sum = sum(a.severity_score() for a in alerts)
max_severity = max(a.severity_score() for a in alerts)
score += severity_sum * self.WEIGHT_SEVERITY
score += max_severity * 2 # Bonus per alert critici
# 2. Numero di tecniche ATT&CK distinte
techniques = set(a.technique_id for a in alerts if a.technique_id)
score += len(techniques) * self.WEIGHT_TECHNIQUE_CHAIN
# 3. Peso archi nel sottografo
subgraph = self.graph.subgraph(component)
edge_weight_sum = sum(
data.get('weight', 0)
for _, _, data in subgraph.edges(data=True)
)
score += edge_weight_sum
# 4. Numero host distinti (lateral movement indicator)
hosts = set(a.host for a in alerts)
if len(hosts) > 1:
score += len(hosts) * 3.0 # Lateral movement e molto significativo
return score
def _extract_attack_chain(self, alerts: list[Alert]) -> list[str]:
"""Estrae la kill chain osservata dal cluster."""
techniques = [a.technique_id for a in alerts if a.technique_id]
# Ordina per timestamp
sorted_alerts = sorted(alerts, key=lambda a: a.timestamp)
return [a.technique_id for a in sorted_alerts if a.technique_id]
def _find_primary_host(self, alerts: list[Alert]) -> str:
"""Identifica l'host più coinvolto nel cluster."""
host_counts = {}
for alert in alerts:
host_counts[alert.host] = host_counts.get(alert.host, 0) + 1
return max(host_counts, key=host_counts.get) if host_counts else 'unknown'
Automatische prioriteitstelling met multi-factor scoring
De score van een cluster moet niet alleen de ernst van individuele waarschuwingen vastleggen, maar ook de contextuele context: de voortgang in de moordketen, de kriticiteit van de betrokken activa, de aanwezigheid van bekende kwaadaardige IOC's.
# Sistema di scoring avanzato con contesto asset
# File: alert_scorer.py
@dataclass
class AssetCriticality:
hostname: str
criticality: str # 'low', 'medium', 'high', 'critical'
asset_type: str # 'workstation', 'server', 'dc', 'database', 'ot'
business_owner: str
class ContextualScorer:
# Moltiplicatori per criticalita asset
ASSET_MULTIPLIERS = {
'workstation': 1.0,
'server': 1.5,
'database': 2.0,
'dc': 3.0, # Domain Controller
'ot': 4.0 # OT/ICS systems
}
CRITICALITY_MULTIPLIERS = {
'low': 1.0,
'medium': 1.5,
'high': 2.0,
'critical': 3.0
}
def __init__(self, asset_registry: dict[str, AssetCriticality],
threat_intel_ips: set[str]):
self.asset_registry = asset_registry
self.threat_intel_ips = threat_intel_ips
def score_cluster(self, cluster: AlertCluster) -> dict:
"""Calcola score completo con breakdown."""
base_score = cluster.score
context_multiplier = 1.0
breakdown = {}
# 1. Asset criticality multiplier
asset = self.asset_registry.get(cluster.primary_host)
if asset:
type_mult = self.ASSET_MULTIPLIERS.get(asset.asset_type, 1.0)
crit_mult = self.CRITICALITY_MULTIPLIERS.get(asset.criticality, 1.0)
asset_mult = type_mult * crit_mult
context_multiplier *= asset_mult
breakdown['asset_multiplier'] = asset_mult
# 2. Threat Intel overlap
ti_hits = sum(
1 for alert in cluster.alerts
if alert.src_ip in self.threat_intel_ips
)
if ti_hits > 0:
ti_boost = 1.0 + (ti_hits * 0.5)
context_multiplier *= ti_boost
breakdown['threat_intel_boost'] = ti_boost
# 3. Kill chain completeness
chain_length = len(cluster.attack_chain)
chain_multiplier = 1.0 + (chain_length * 0.2) # +20% per ogni step
context_multiplier *= chain_multiplier
breakdown['chain_multiplier'] = chain_multiplier
# 4. Time pressure (alert recenti hanno priorità maggiore)
most_recent = max(a.timestamp for a in cluster.alerts)
age_minutes = (datetime.now() - most_recent).total_seconds() / 60
recency_multiplier = max(0.5, 1.0 - (age_minutes / 1440)) # Decade in 24h
context_multiplier *= recency_multiplier
breakdown['recency_multiplier'] = recency_multiplier
final_score = base_score * context_multiplier
breakdown['base_score'] = base_score
breakdown['context_multiplier'] = context_multiplier
breakdown['final_score'] = final_score
return breakdown
def prioritize_queue(self, clusters: list[AlertCluster]) -> list[dict]:
"""Genera la coda di lavoro prioritizzata per gli analisti."""
scored = []
for cluster in clusters:
score_breakdown = self.score_cluster(cluster)
scored.append({
'cluster': cluster,
'score': score_breakdown['final_score'],
'breakdown': score_breakdown,
'priority': self._score_to_priority(score_breakdown['final_score'])
})
return sorted(scored, key=lambda x: x['score'], reverse=True)
def _score_to_priority(self, score: float) -> str:
if score >= 100:
return 'P1 - Critical'
elif score >= 50:
return 'P2 - High'
elif score >= 20:
return 'P3 - Medium'
else:
return 'P4 - Low'
Integratie met Neo4j voor Enterprise-volumes
Voor bedrijfsomgevingen met miljoenen waarschuwingen per dag is NetworkX in het geheugen niet schaalbaar. Neo4j, de populairste grafiekdatabase, biedt native queryprestaties van complexe correlaties en persistentie van historische gegevens.
# Alert Graph su Neo4j
# File: neo4j_correlator.py
from neo4j import GraphDatabase
from datetime import datetime, timedelta
class Neo4jAlertCorrelator:
def __init__(self, uri: str, username: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(username, password))
self._create_indexes()
def _create_indexes(self) -> None:
"""Crea indici per query performance."""
with self.driver.session() as session:
session.run("""
CREATE INDEX alert_timestamp IF NOT EXISTS
FOR (a:Alert) ON (a.timestamp)
""")
session.run("""
CREATE INDEX alert_host IF NOT EXISTS
FOR (a:Alert) ON (a.host)
""")
session.run("""
CREATE INDEX alert_user IF NOT EXISTS
FOR (a:Alert) ON (a.user)
""")
def ingest_alert(self, alert: dict) -> None:
"""Inserisce un alert e crea relazioni di correlazione."""
with self.driver.session() as session:
# Crea il nodo Alert
session.run("""
CREATE (a:Alert {
id: $id,
timestamp: datetime($timestamp),
rule_name: $rule_name,
severity: $severity,
host: $host,
user: $user,
src_ip: $src_ip,
technique_id: $technique_id
})
""", **alert)
# Crea relazione SAME_HOST con alert recenti
session.run("""
MATCH (a:Alert {id: $id})
MATCH (b:Alert)
WHERE b.id <> $id
AND b.host = a.host
AND b.timestamp >= datetime($cutoff)
AND NOT (a)-[:SAME_HOST]-(b)
MERGE (a)-[:SAME_HOST {weight: 2.0}]-(b)
""", id=alert['id'],
cutoff=(datetime.fromisoformat(alert['timestamp'])
- timedelta(minutes=30)).isoformat())
# Crea relazione KILL_CHAIN per progressione logica
session.run("""
MATCH (a:Alert {id: $id})
MATCH (b:Alert)
WHERE b.id <> $id
AND b.host = a.host
AND b.timestamp < a.timestamp
AND b.timestamp >= datetime($cutoff)
AND b.technique_id IS NOT NULL
AND a.technique_id IS NOT NULL
MERGE (b)-[:PRECEDES {weight: 3.0}]->(a)
""", id=alert['id'],
cutoff=(datetime.fromisoformat(alert['timestamp'])
- timedelta(hours=2)).isoformat())
def find_incidents(self, min_alerts: int = 3,
hours_back: int = 24) -> list[dict]:
"""Trova cluster di alert che rappresentano potenziali incidenti."""
cutoff = (datetime.now() - timedelta(hours=hours_back)).isoformat()
with self.driver.session() as session:
result = session.run("""
MATCH (a:Alert)
WHERE a.timestamp >= datetime($cutoff)
CALL apoc.path.subgraphNodes(a, {
relationshipFilter: 'SAME_HOST|SAME_USER|PRECEDES',
maxLevel: 5
}) YIELD node
WITH collect(DISTINCT node) AS cluster_nodes
WHERE size(cluster_nodes) >= $min_alerts
RETURN cluster_nodes,
reduce(s = 0, n IN cluster_nodes |
s + CASE n.severity
WHEN 'critical' THEN 4
WHEN 'high' THEN 3
WHEN 'medium' THEN 2
ELSE 1 END) AS total_score
ORDER BY total_score DESC
LIMIT 100
""", cutoff=cutoff, min_alerts=min_alerts)
return [dict(record) for record in result]
def get_attack_path(self, incident_id: str) -> list[dict]:
"""Recupera il percorso di attacco per un incidente."""
with self.driver.session() as session:
result = session.run("""
MATCH path = (start:Alert)-[:PRECEDES*]->(end:Alert)
WHERE start.id IN $incident_alerts
AND NOT ()-[:PRECEDES]->(start)
RETURN [node IN nodes(path) |
{id: node.id, technique: node.technique_id,
host: node.host, timestamp: node.timestamp}
] AS attack_path
ORDER BY length(path) DESC
LIMIT 1
""", incident_alerts=[incident_id])
return [dict(record) for record in result]
def close(self) -> None:
self.driver.close()
Triagedashboard en API
Het correlatie- en prioriteringssysteem moet een API beschikbaar stellen die SOC-platforms (TheHive, Cortex XSOAR, IBM QRadar) kunnen analisten een werkvoorraad presenteren intelligent in plaats van een platte lijst met waarschuwingen.
# FastAPI endpoint per il triage queue
# File: triage_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime
app = FastAPI(title="Alert Triage API")
class AlertIngestionRequest(BaseModel):
id: str
timestamp: str
rule_name: str
severity: str
host: str
user: str | None = None
src_ip: str | None = None
technique_id: str | None = None
raw_data: dict = {}
class TriageResponse(BaseModel):
incident_id: str
priority: str
score: float
alert_count: int
primary_host: str
attack_chain: list[str]
recommended_actions: list[str]
@app.post("/api/v1/alerts")
async def ingest_alert(alert: AlertIngestionRequest) -> dict:
"""Ingesta un alert e ritorna la correlazione risultante."""
correlator = get_correlator() # Singleton o dependency injection
alert_obj = Alert(**alert.dict())
correlator.add_alert(alert_obj)
# Ottieni cluster aggiornato
clusters = correlator.get_clusters()
affected_cluster = next(
(c for c in clusters if any(a.id == alert.id for a in c.alerts)),
None
)
if affected_cluster:
return {
"status": "correlated",
"cluster_id": affected_cluster.id,
"cluster_size": len(affected_cluster.alerts),
"cluster_score": affected_cluster.score
}
return {"status": "isolated", "cluster_id": None}
@app.get("/api/v1/triage-queue")
async def get_triage_queue(limit: int = 50, min_score: float = 0) -> list[dict]:
"""Ritorna la coda di triage prioritizzata."""
correlator = get_correlator()
scorer = get_scorer()
clusters = correlator.get_clusters()
prioritized = scorer.prioritize_queue(clusters)
return [
{
"incident_id": item['cluster'].id,
"priority": item['priority'],
"score": round(item['score'], 2),
"alert_count": len(item['cluster'].alerts),
"primary_host": item['cluster'].primary_host,
"attack_chain": item['cluster'].attack_chain,
"created_at": item['cluster'].created_at.isoformat(),
"score_breakdown": item['breakdown']
}
for item in prioritized
if item['score'] >= min_score
][:limit]
@app.get("/api/v1/incident/{incident_id}/timeline")
async def get_incident_timeline(incident_id: str) -> dict:
"""Ritorna la timeline degli eventi per un incidente."""
correlator = get_correlator()
clusters = correlator.get_clusters()
cluster = next((c for c in clusters if c.id == incident_id), None)
if not cluster:
raise HTTPException(status_code=404, detail="Incident not found")
sorted_alerts = sorted(cluster.alerts, key=lambda a: a.timestamp)
return {
"incident_id": incident_id,
"timeline": [
{
"timestamp": a.timestamp.isoformat(),
"rule": a.rule_name,
"host": a.host,
"user": a.user,
"technique": a.technique_id,
"severity": a.severity
}
for a in sorted_alerts
],
"duration_minutes": (
(sorted_alerts[-1].timestamp - sorted_alerts[0].timestamp).total_seconds() / 60
) if len(sorted_alerts) > 1 else 0
}
Successtatistieken en monitoring
Elk triage-automatiseringssysteem moet worden gemonitord met objectieve meetgegevens om te verifiëren dat het feitelijk de efficiëntie van het SOC verbetert en geen nieuwe problemen introduceert.
# Monitoring delle metriche SOC
# File: soc_metrics.py
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class SOCMetrics:
# Metriche chiave
total_alerts: int = 0
correlated_alerts: int = 0
true_positives: int = 0
false_positives: int = 0
total_incidents: int = 0
# Tempi (in minuti)
mttd_values: list[float] = field(default_factory=list) # Mean Time to Detect
mtti_values: list[float] = field(default_factory=list) # Mean Time to Investigate
mttr_values: list[float] = field(default_factory=list) # Mean Time to Respond
def correlation_rate(self) -> float:
"""% di alert correlati in incidenti."""
if self.total_alerts == 0:
return 0.0
return (self.correlated_alerts / self.total_alerts) * 100
def false_positive_rate(self) -> float:
"""% di falsi positivi sul totale investigato."""
total = self.true_positives + self.false_positives
if total == 0:
return 0.0
return (self.false_positives / total) * 100
def avg_mttd(self) -> float:
if not self.mttd_values:
return 0.0
return sum(self.mttd_values) / len(self.mttd_values)
def alert_compression_ratio(self) -> float:
"""Quanti alert per incidente in media (riduzione noise)."""
if self.total_incidents == 0:
return 1.0
return self.correlated_alerts / self.total_incidents
def report(self) -> dict:
return {
"total_alerts": self.total_alerts,
"total_incidents": self.total_incidents,
"alert_compression_ratio": f"{self.alert_compression_ratio():.1f}:1",
"correlation_rate_pct": f"{self.correlation_rate():.1f}%",
"false_positive_rate_pct": f"{self.false_positive_rate():.1f}%",
"avg_mttd_minutes": f"{self.avg_mttd():.1f}",
"avg_mtti_minutes": (
f"{sum(self.mtti_values)/len(self.mtti_values):.1f}"
if self.mtti_values else "N/A"
)
}
Antipatronen in triage-automatisering
- Correlatiedrempel te laag: Correleer elke waarschuwing binnen 24 uur op dezelfde host creëert enorme, nutteloze clusters. Gebruik smalle tijdvensters (15-30 min) voor zwakke correlaties.
- Score zonder itemcontext: Een "hoge" waarschuwing voor een honeypot en nog veel minder dringend van dezelfde waarschuwing op een domeincontroller. Verrijk altijd met kritieke assets.
- Automatisering zonder feedbacklus: Het systeem moet leren van gebruikersfeedback analisten (TP/FP) om in de loop van de tijd te verbeteren. Een statisch systeem degradeert.
- Negeer eenmalige waarschuwingsincidenten: Niet elke aanval maakt je alerter. Kritieke geïsoleerde waarschuwingen (bijvoorbeeld DCSync) moeten de correlatie omzeilen en rechtstreeks naar de P1-wachtrij gaan.
Integratie met TheHive en SOAR
Het triagesysteem kan op natuurlijke wijze worden geïntegreerd met SOAR-platforms zoals TheHive of Cortex die de incidentlevenscyclus en responsautomatisering beheren.
# Integrazione TheHive
# File: thehive_integration.py
import httpx
from datetime import datetime
class TheHiveIntegration:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def create_case_from_cluster(self, cluster: AlertCluster,
score_info: dict) -> str:
"""Crea un caso TheHive da un cluster di alert."""
severity_map = {
'P1 - Critical': 3,
'P2 - High': 2,
'P3 - Medium': 1,
'P4 - Low': 1
}
case_payload = {
"title": f"[AUTO] Incident {cluster.id[:8]} - {cluster.primary_host}",
"description": self._build_description(cluster, score_info),
"severity": severity_map.get(score_info.get('priority', 'P4 - Low'), 1),
"startDate": int(cluster.created_at.timestamp() * 1000),
"tags": [
f"auto-generated",
f"host:{cluster.primary_host}",
f"alerts:{len(cluster.alerts)}",
*[f"att&ck:{t}" for t in cluster.attack_chain[:5]]
],
"tasks": self._generate_tasks(cluster, score_info)
}
with httpx.Client() as client:
response = client.post(
f"{self.base_url}/api/case",
json=case_payload,
headers=self.headers
)
response.raise_for_status()
return response.json()['id']
def _build_description(self, cluster: AlertCluster,
score_info: dict) -> str:
alerts_summary = "\n".join(
f"- [{a.severity.upper()}] {a.rule_name} @ {a.host} ({a.timestamp.strftime('%H:%M:%S')})"
for a in sorted(cluster.alerts, key=lambda x: x.timestamp)
)
return f"""## Alert Cluster Auto-Generated
**Score**: {score_info.get('score', 'N/A')}
**Priority**: {score_info.get('priority', 'N/A')}
**Primary Host**: {cluster.primary_host}
**Alert Count**: {len(cluster.alerts)}
### Attack Chain
{' -> '.join(cluster.attack_chain) if cluster.attack_chain else 'N/A'}
### Alert Timeline
{alerts_summary}
### Score Breakdown
{chr(10).join(f"- {k}: {v}" for k, v in score_info.get('breakdown', {}).items())}
"""
def _generate_tasks(self, cluster: AlertCluster,
score_info: dict) -> list[dict]:
"""Genera task di investigazione automatici."""
tasks = [
{"title": "Verify alert legitimacy", "order": 0},
{"title": f"Investigate host: {cluster.primary_host}", "order": 1},
]
if len(set(a.host for a in cluster.alerts)) > 1:
tasks.append({"title": "Assess lateral movement scope", "order": 2})
if cluster.attack_chain:
tasks.append({"title": "Map attack progression to ATT&CK", "order": 3})
tasks.append({"title": "Document findings and close/escalate", "order": 99})
return tasks
Conclusies en belangrijkste conclusies
Triage-automatisering door middel van grafiekanalyse is geen luxe: het is een operationele noodzaak elk SOC dat wil opschalen zonder het aantal analisten proportioneel te schalen. Alerte ruisonderdrukking, contextuele correlatie en intelligente prioritering ze stellen analisten in staat zich te concentreren op wat er echt toe doet: het onderzoeken van echte bedreigingen.
Belangrijkste afhaalrestaurants
- Grafiekanalyse transformeert geïsoleerde waarschuwingen in gecontextualiseerde aanvalsscenario's
- De score op meerdere factoren (ernst + kritieke activa + kill-keten + informatie over bedreigingen) is superieur aan de eenvoudige rangschikking op basis van ernst
- NetworkX voor prototypes, Neo4j voor bedrijfsproductie
- Integratie met SOAR (TheHive, XSOAR) sluit de lus van automatiseringsonderzoek
- Houd altijd de SOC-statistieken in de gaten: MTTD, fout-positief percentage, waarschuwingscompressieverhouding
- Feedback van analisten is van fundamenteel belang voor de voortdurende verbetering van het systeem
Gerelateerde artikelen
- SOAR Playbook in Python: Incident Response Automation
- AI-ondersteunde detectie: LLM voor het genereren van Sigma-regels
- Detectie van gedragsafwijkingen: ML op loggegevens
- Detectie-engineering als discipline: van script tot pijplijn







