Automatyzacja selekcji alertów: zmniejsz MTTD dzięki analizie wykresów
Problem segregacji alarmowej jest jednym z najkosztowniejszych i najbardziej frustrujących w życiu analityka SOC. Według danych IBM 2025 średni czas identyfikacji naruszenia (MTTD – Mean Time to Detect) i ponownie z 194 dni dla organizacji, które nie wdrożyły zaawansowanej automatyzacji. Z drugiej strony organizacje, które łączą automatyzację sztucznej inteligencji z korelacją opartą na wykresach, zmniejszają to wartość a godziny lub nawet minuty dotyczące tych samych kategorii zagrożeń.
Sednem problemu jest zmęczenie alertami: przeciętny SOC obsługuje tysiące alertów dziennie w określonym tempie fałszywych alarmów, która w niektórych środowiskach przekracza 97%. Analitycy spędzają większość swojego czasu do oceny łagodnych alertów zamiast badania rzeczywistych zagrożeń. Analiza wykresów oferuje jedno podejście radykalnie odmienne: zamiast oceniać każdy alert z osobna, koreluje je na wykresach działań, identyfikuje wieloetapowe wzorce ataków i automatycznie ustala priorytety na podstawie ważności kontekstowe.
Czego się nauczysz
- dlaczego tradycyjna selekcja jest słabo skalowana i jak zmienia ją analiza wykresów
- Architektura grafowego systemu korelacji alertów
- Praktyczne wdrożenie z NetworkX i Neo4j
- Algorytmy scoringowe do automatycznego ustalania priorytetów
- Integracja z istniejącymi rurociągami SOC
- Metryki sukcesu: MTTD, odsetek wyników fałszywie dodatnich, przepustowość analityków
Problem zmęczenia alarmowego
Przed zbudowaniem rozwiązań konieczne jest głębokie zrozumienie problemu. Alertowe zmęczenie nie jest po prostu „zbyt wiele alertów”: jest to problem systemowy mający korzenie w architekturze tradycyjnych SIEM i w granicach poznawczych człowieka.
Tradycyjny SIEM ocenia każde zdarzenie w dzienniku oddzielnie na podstawie zestawu reguł. Kiedy zasada pasuje, generuje alert. Rezultatem jest to, że:
- Prawidłowe skanowanie sieci (np. skanowanie podatności Nessus) generuje setki alertów
Port Scan Detected - Automatyczny proces łatania generuje dziesiątki alertów
Suspicious Process Creation - Użytkownik logujący się z domu przez VPN generuje alerty
Impossible Traveljeśli nie jest poprawnie skonfigurowany
Analiza wykresów elegancko rozwiązuje ten problem: grupuje alerty, które są jego częścią tego samego scenariusza ataku w pojedynczym zdarzeniu kontekstowym, zapewniając analitykom kontekst niezbędny do podejmowania świadomych decyzji w ciągu kilku sekund, a nie minut.
Dane sektorowe (2025)
- 73% organizacji korzysta z automatycznej selekcji alertów (Gurucul 2025)
- Automatyzacja sztucznej inteligencji skraca czas dochodzenia o 25–50% w przypadku 60% użytkowników
- ReliaQuest: z automatyzacją AI, czas reakcji < 7 minut w porównaniu do 2,3 dnia bez
- Sztuczna inteligencja Dropzone: 90% zasięgu alertów przy dochodzeniu trwającym od 3 do 10 minut
Podstawy korelacji alertów w oparciu o wykresy
Podstawowa i prosta koncepcja: każdy alert jest węzeł na wykresie i kontakty pomiędzy alertami (ten sam host, ten sam użytkownik, to samo okno czasowe, ta sama technika ATT&CK) to łuki. Powstały wykres ujawnia klastry powiązanych alertów reprezentujących potencjalne scenariusze ataku.
Najbardziej przydatne typy korelacji to:
| Rodzaj korelacji | Kryterium | Siła | Przykład |
|---|---|---|---|
| Burza | Alarm w oknie T (np. 5 min) | Niski | Skanowanie portów + brutalna siła w tym samym czasie |
| Podmiot | Ten sam host/adres IP/użytkownik | Przeciętny | Różne alerty w tym samym punkcie końcowym |
| Łańcuch zabijania ATT&CK | Techniki w logicznej kolejności | Wysoki | Rozpoznanie + Wstępny dostęp + Trwałość |
| Nakładanie się MKOl | Ten sam złośliwy skrót/domena/adres IP | Wysoki | To samo C2 plus alert |
| Przyczynowy | Proces nadrzędny/podrzędny, sieć pochodząca | Bardzo wysoki | cmd.exe uruchamiany przez word.exe, który pobiera ładunek |
Implementacja: Wykres alertów z NetworkX
Zacznijmy od implementacji Pythona za pomocą SiećX do zarządzania wykresami. To rozwiązanie jest odpowiednie dla prototypów i środowisk o średniej objętości (do ~100 tys. alertów dziennie). W przypadku większych wolumenów używany jest Neo4j (patrz następna sekcja).
# Sistema di Alert Graph Correlation
# File: alert_graph.py
import networkx as nx
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
import uuid
import json
@dataclass
class Alert:
id: str
timestamp: datetime
rule_name: str
severity: str # 'low', 'medium', 'high', 'critical'
host: str
user: Optional[str]
src_ip: Optional[str]
dst_ip: Optional[str]
technique_id: Optional[str] # MITRE ATT&CK ID (es. T1059.001)
raw_data: dict = field(default_factory=dict)
def severity_score(self) -> int:
return {'low': 1, 'medium': 2, 'high': 3, 'critical': 4}.get(self.severity, 1)
@dataclass
class AlertCluster:
id: str
alerts: list[Alert]
score: float
attack_chain: list[str] # Sequenza di tecniche ATT&CK
primary_host: str
created_at: datetime
class AlertGraphCorrelator:
# Finestra temporale per correlazione (default: 30 minuti)
CORRELATION_WINDOW_MINUTES = 30
# Pesi per il calcolo del punteggio cluster
WEIGHT_SEVERITY = 3.0
WEIGHT_TECHNIQUE_CHAIN = 5.0
WEIGHT_SAME_HOST = 2.0
WEIGHT_SAME_USER = 2.5
WEIGHT_IOC_OVERLAP = 4.0
# Kill chain ATT&CK semplificata per correlazione
KILL_CHAIN_ORDER = [
'TA0043', # Reconnaissance
'TA0042', # Resource Development
'TA0001', # Initial Access
'TA0002', # Execution
'TA0003', # Persistence
'TA0004', # Privilege Escalation
'TA0005', # Defense Evasion
'TA0006', # Credential Access
'TA0007', # Discovery
'TA0008', # Lateral Movement
'TA0009', # Collection
'TA0011', # Command and Control
'TA0010', # Exfiltration
'TA0040', # Impact
]
def __init__(self):
self.graph = nx.DiGraph()
self.alerts: dict[str, Alert] = {}
def add_alert(self, alert: Alert) -> None:
"""Aggiunge un alert al grafo e crea correlazioni."""
self.alerts[alert.id] = alert
# Aggiungi nodo con attributi
self.graph.add_node(alert.id, **{
'timestamp': alert.timestamp.isoformat(),
'severity': alert.severity,
'host': alert.host,
'user': alert.user,
'technique': alert.technique_id,
'score': alert.severity_score()
})
# Cerca correlazioni con alert esistenti
for existing_id, existing in self.alerts.items():
if existing_id == alert.id:
continue
correlations = self._calculate_correlations(alert, existing)
if correlations:
total_weight = sum(c['weight'] for c in correlations)
edge_labels = [c['type'] for c in correlations]
self.graph.add_edge(
existing_id, alert.id,
weight=total_weight,
correlation_types=edge_labels
)
def _calculate_correlations(self, alert1: Alert,
alert2: Alert) -> list[dict]:
"""Calcola le correlazioni tra due alert."""
correlations = []
# 1. Correlazione temporale
time_diff = abs((alert1.timestamp - alert2.timestamp).total_seconds())
if time_diff <= self.CORRELATION_WINDOW_MINUTES * 60:
time_weight = 1.0 - (time_diff / (self.CORRELATION_WINDOW_MINUTES * 60))
correlations.append({'type': 'temporal', 'weight': time_weight})
# 2. Stesso host
if alert1.host == alert2.host:
correlations.append({'type': 'same_host', 'weight': self.WEIGHT_SAME_HOST})
# 3. Stesso utente
if (alert1.user and alert2.user and alert1.user == alert2.user):
correlations.append({'type': 'same_user', 'weight': self.WEIGHT_SAME_USER})
# 4. IOC overlap (IP)
if (alert1.src_ip and alert2.src_ip and alert1.src_ip == alert2.src_ip):
correlations.append({'type': 'ioc_overlap_ip', 'weight': self.WEIGHT_IOC_OVERLAP})
# 5. Kill chain sequenziale ATT&CK
if alert1.technique_id and alert2.technique_id:
chain_score = self._calculate_kill_chain_score(
alert1.technique_id, alert2.technique_id
)
if chain_score > 0:
correlations.append({'type': 'kill_chain', 'weight': chain_score})
return correlations
def _calculate_kill_chain_score(self, technique1: str,
technique2: str) -> float:
"""Calcola un punteggio basato sulla progressione kill chain."""
# Mapping semplificato tecnica -> tattica
# In produzione si usa la MITRE ATT&CK API
technique_to_tactic = {
'T1595': 'TA0043', # Recon - Active Scanning
'T1190': 'TA0001', # Initial Access - Exploit Public-Facing App
'T1059': 'TA0002', # Execution - Command and Scripting
'T1053': 'TA0003', # Persistence - Scheduled Task
'T1078': 'TA0004', # Privilege Escalation - Valid Accounts
'T1562': 'TA0005', # Defense Evasion - Impair Defenses
'T1003': 'TA0006', # Credential Access - OS Credential Dumping
'T1087': 'TA0007', # Discovery - Account Discovery
'T1021': 'TA0008', # Lateral Movement - Remote Services
'T1071': 'TA0011', # C2 - Application Layer Protocol
}
tactic1 = technique_to_tactic.get(technique1)
tactic2 = technique_to_tactic.get(technique2)
if not tactic1 or not tactic2:
return 0.0
try:
idx1 = self.KILL_CHAIN_ORDER.index(tactic1)
idx2 = self.KILL_CHAIN_ORDER.index(tactic2)
# Punteggio più alto se la progressione e logica (tecnica più avanzata dopo)
if idx2 > idx1:
progression = (idx2 - idx1) / len(self.KILL_CHAIN_ORDER)
return self.WEIGHT_TECHNIQUE_CHAIN * progression
except ValueError:
pass
return 0.0
def get_clusters(self, min_cluster_size: int = 2) -> list[AlertCluster]:
"""Identifica cluster di alert correlati."""
# Usa connected components sul grafo non diretto per trovare i cluster
undirected = self.graph.to_undirected()
components = list(nx.connected_components(undirected))
clusters = []
for component in components:
if len(component) < min_cluster_size:
continue
component_alerts = [self.alerts[aid] for aid in component
if aid in self.alerts]
score = self._calculate_cluster_score(component_alerts, component)
attack_chain = self._extract_attack_chain(component_alerts)
primary_host = self._find_primary_host(component_alerts)
clusters.append(AlertCluster(
id=str(uuid.uuid4()),
alerts=component_alerts,
score=score,
attack_chain=attack_chain,
primary_host=primary_host,
created_at=datetime.now()
))
# Ordina per score decrescente (massima priorità prima)
return sorted(clusters, key=lambda c: c.score, reverse=True)
def _calculate_cluster_score(self, alerts: list[Alert],
component: set) -> float:
"""Calcola il punteggio di priorità del cluster."""
score = 0.0
# 1. Contributo severita
severity_sum = sum(a.severity_score() for a in alerts)
max_severity = max(a.severity_score() for a in alerts)
score += severity_sum * self.WEIGHT_SEVERITY
score += max_severity * 2 # Bonus per alert critici
# 2. Numero di tecniche ATT&CK distinte
techniques = set(a.technique_id for a in alerts if a.technique_id)
score += len(techniques) * self.WEIGHT_TECHNIQUE_CHAIN
# 3. Peso archi nel sottografo
subgraph = self.graph.subgraph(component)
edge_weight_sum = sum(
data.get('weight', 0)
for _, _, data in subgraph.edges(data=True)
)
score += edge_weight_sum
# 4. Numero host distinti (lateral movement indicator)
hosts = set(a.host for a in alerts)
if len(hosts) > 1:
score += len(hosts) * 3.0 # Lateral movement e molto significativo
return score
def _extract_attack_chain(self, alerts: list[Alert]) -> list[str]:
"""Estrae la kill chain osservata dal cluster."""
techniques = [a.technique_id for a in alerts if a.technique_id]
# Ordina per timestamp
sorted_alerts = sorted(alerts, key=lambda a: a.timestamp)
return [a.technique_id for a in sorted_alerts if a.technique_id]
def _find_primary_host(self, alerts: list[Alert]) -> str:
"""Identifica l'host più coinvolto nel cluster."""
host_counts = {}
for alert in alerts:
host_counts[alert.host] = host_counts.get(alert.host, 0) + 1
return max(host_counts, key=host_counts.get) if host_counts else 'unknown'
Automatyczne ustalanie priorytetów z punktacją wieloczynnikową
Ocena klastra musi uwzględniać nie tylko ważność poszczególnych alertów, ale także the kontekst kontekstowy: postęp w łańcuchu zabijania, krytyczność zaangażowane zasoby, obecność znanych złośliwych IOC.
# Sistema di scoring avanzato con contesto asset
# File: alert_scorer.py
@dataclass
class AssetCriticality:
hostname: str
criticality: str # 'low', 'medium', 'high', 'critical'
asset_type: str # 'workstation', 'server', 'dc', 'database', 'ot'
business_owner: str
class ContextualScorer:
# Moltiplicatori per criticalita asset
ASSET_MULTIPLIERS = {
'workstation': 1.0,
'server': 1.5,
'database': 2.0,
'dc': 3.0, # Domain Controller
'ot': 4.0 # OT/ICS systems
}
CRITICALITY_MULTIPLIERS = {
'low': 1.0,
'medium': 1.5,
'high': 2.0,
'critical': 3.0
}
def __init__(self, asset_registry: dict[str, AssetCriticality],
threat_intel_ips: set[str]):
self.asset_registry = asset_registry
self.threat_intel_ips = threat_intel_ips
def score_cluster(self, cluster: AlertCluster) -> dict:
"""Calcola score completo con breakdown."""
base_score = cluster.score
context_multiplier = 1.0
breakdown = {}
# 1. Asset criticality multiplier
asset = self.asset_registry.get(cluster.primary_host)
if asset:
type_mult = self.ASSET_MULTIPLIERS.get(asset.asset_type, 1.0)
crit_mult = self.CRITICALITY_MULTIPLIERS.get(asset.criticality, 1.0)
asset_mult = type_mult * crit_mult
context_multiplier *= asset_mult
breakdown['asset_multiplier'] = asset_mult
# 2. Threat Intel overlap
ti_hits = sum(
1 for alert in cluster.alerts
if alert.src_ip in self.threat_intel_ips
)
if ti_hits > 0:
ti_boost = 1.0 + (ti_hits * 0.5)
context_multiplier *= ti_boost
breakdown['threat_intel_boost'] = ti_boost
# 3. Kill chain completeness
chain_length = len(cluster.attack_chain)
chain_multiplier = 1.0 + (chain_length * 0.2) # +20% per ogni step
context_multiplier *= chain_multiplier
breakdown['chain_multiplier'] = chain_multiplier
# 4. Time pressure (alert recenti hanno priorità maggiore)
most_recent = max(a.timestamp for a in cluster.alerts)
age_minutes = (datetime.now() - most_recent).total_seconds() / 60
recency_multiplier = max(0.5, 1.0 - (age_minutes / 1440)) # Decade in 24h
context_multiplier *= recency_multiplier
breakdown['recency_multiplier'] = recency_multiplier
final_score = base_score * context_multiplier
breakdown['base_score'] = base_score
breakdown['context_multiplier'] = context_multiplier
breakdown['final_score'] = final_score
return breakdown
def prioritize_queue(self, clusters: list[AlertCluster]) -> list[dict]:
"""Genera la coda di lavoro prioritizzata per gli analisti."""
scored = []
for cluster in clusters:
score_breakdown = self.score_cluster(cluster)
scored.append({
'cluster': cluster,
'score': score_breakdown['final_score'],
'breakdown': score_breakdown,
'priority': self._score_to_priority(score_breakdown['final_score'])
})
return sorted(scored, key=lambda x: x['score'], reverse=True)
def _score_to_priority(self, score: float) -> str:
if score >= 100:
return 'P1 - Critical'
elif score >= 50:
return 'P2 - High'
elif score >= 20:
return 'P3 - Medium'
else:
return 'P4 - Low'
Integracja z Neo4j dla woluminów korporacyjnych
W środowiskach korporacyjnych z milionami alertów dziennie NetworkX w pamięci nie jest skalowalny. Neo4j, najpopularniejsza baza danych grafów, oferuje natywną wydajność zapytań złożonych korelacji i trwałości danych historycznych.
# Alert Graph su Neo4j
# File: neo4j_correlator.py
from neo4j import GraphDatabase
from datetime import datetime, timedelta
class Neo4jAlertCorrelator:
def __init__(self, uri: str, username: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(username, password))
self._create_indexes()
def _create_indexes(self) -> None:
"""Crea indici per query performance."""
with self.driver.session() as session:
session.run("""
CREATE INDEX alert_timestamp IF NOT EXISTS
FOR (a:Alert) ON (a.timestamp)
""")
session.run("""
CREATE INDEX alert_host IF NOT EXISTS
FOR (a:Alert) ON (a.host)
""")
session.run("""
CREATE INDEX alert_user IF NOT EXISTS
FOR (a:Alert) ON (a.user)
""")
def ingest_alert(self, alert: dict) -> None:
"""Inserisce un alert e crea relazioni di correlazione."""
with self.driver.session() as session:
# Crea il nodo Alert
session.run("""
CREATE (a:Alert {
id: $id,
timestamp: datetime($timestamp),
rule_name: $rule_name,
severity: $severity,
host: $host,
user: $user,
src_ip: $src_ip,
technique_id: $technique_id
})
""", **alert)
# Crea relazione SAME_HOST con alert recenti
session.run("""
MATCH (a:Alert {id: $id})
MATCH (b:Alert)
WHERE b.id <> $id
AND b.host = a.host
AND b.timestamp >= datetime($cutoff)
AND NOT (a)-[:SAME_HOST]-(b)
MERGE (a)-[:SAME_HOST {weight: 2.0}]-(b)
""", id=alert['id'],
cutoff=(datetime.fromisoformat(alert['timestamp'])
- timedelta(minutes=30)).isoformat())
# Crea relazione KILL_CHAIN per progressione logica
session.run("""
MATCH (a:Alert {id: $id})
MATCH (b:Alert)
WHERE b.id <> $id
AND b.host = a.host
AND b.timestamp < a.timestamp
AND b.timestamp >= datetime($cutoff)
AND b.technique_id IS NOT NULL
AND a.technique_id IS NOT NULL
MERGE (b)-[:PRECEDES {weight: 3.0}]->(a)
""", id=alert['id'],
cutoff=(datetime.fromisoformat(alert['timestamp'])
- timedelta(hours=2)).isoformat())
def find_incidents(self, min_alerts: int = 3,
hours_back: int = 24) -> list[dict]:
"""Trova cluster di alert che rappresentano potenziali incidenti."""
cutoff = (datetime.now() - timedelta(hours=hours_back)).isoformat()
with self.driver.session() as session:
result = session.run("""
MATCH (a:Alert)
WHERE a.timestamp >= datetime($cutoff)
CALL apoc.path.subgraphNodes(a, {
relationshipFilter: 'SAME_HOST|SAME_USER|PRECEDES',
maxLevel: 5
}) YIELD node
WITH collect(DISTINCT node) AS cluster_nodes
WHERE size(cluster_nodes) >= $min_alerts
RETURN cluster_nodes,
reduce(s = 0, n IN cluster_nodes |
s + CASE n.severity
WHEN 'critical' THEN 4
WHEN 'high' THEN 3
WHEN 'medium' THEN 2
ELSE 1 END) AS total_score
ORDER BY total_score DESC
LIMIT 100
""", cutoff=cutoff, min_alerts=min_alerts)
return [dict(record) for record in result]
def get_attack_path(self, incident_id: str) -> list[dict]:
"""Recupera il percorso di attacco per un incidente."""
with self.driver.session() as session:
result = session.run("""
MATCH path = (start:Alert)-[:PRECEDES*]->(end:Alert)
WHERE start.id IN $incident_alerts
AND NOT ()-[:PRECEDES]->(start)
RETURN [node IN nodes(path) |
{id: node.id, technique: node.technique_id,
host: node.host, timestamp: node.timestamp}
] AS attack_path
ORDER BY length(path) DESC
LIMIT 1
""", incident_alerts=[incident_id])
return [dict(record) for record in result]
def close(self) -> None:
self.driver.close()
Panel selekcji i API
System korelacji i ustalania priorytetów musi udostępniać interfejs API, z którego korzystają platformy SOC (TheHive, Cortex XSOAR, IBM QRadar) mogą zostać wykorzystane, aby zapewnić analitykom kolejkę roboczą inteligentny zamiast płaskiej listy alertów.
# FastAPI endpoint per il triage queue
# File: triage_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime
app = FastAPI(title="Alert Triage API")
class AlertIngestionRequest(BaseModel):
id: str
timestamp: str
rule_name: str
severity: str
host: str
user: str | None = None
src_ip: str | None = None
technique_id: str | None = None
raw_data: dict = {}
class TriageResponse(BaseModel):
incident_id: str
priority: str
score: float
alert_count: int
primary_host: str
attack_chain: list[str]
recommended_actions: list[str]
@app.post("/api/v1/alerts")
async def ingest_alert(alert: AlertIngestionRequest) -> dict:
"""Ingesta un alert e ritorna la correlazione risultante."""
correlator = get_correlator() # Singleton o dependency injection
alert_obj = Alert(**alert.dict())
correlator.add_alert(alert_obj)
# Ottieni cluster aggiornato
clusters = correlator.get_clusters()
affected_cluster = next(
(c for c in clusters if any(a.id == alert.id for a in c.alerts)),
None
)
if affected_cluster:
return {
"status": "correlated",
"cluster_id": affected_cluster.id,
"cluster_size": len(affected_cluster.alerts),
"cluster_score": affected_cluster.score
}
return {"status": "isolated", "cluster_id": None}
@app.get("/api/v1/triage-queue")
async def get_triage_queue(limit: int = 50, min_score: float = 0) -> list[dict]:
"""Ritorna la coda di triage prioritizzata."""
correlator = get_correlator()
scorer = get_scorer()
clusters = correlator.get_clusters()
prioritized = scorer.prioritize_queue(clusters)
return [
{
"incident_id": item['cluster'].id,
"priority": item['priority'],
"score": round(item['score'], 2),
"alert_count": len(item['cluster'].alerts),
"primary_host": item['cluster'].primary_host,
"attack_chain": item['cluster'].attack_chain,
"created_at": item['cluster'].created_at.isoformat(),
"score_breakdown": item['breakdown']
}
for item in prioritized
if item['score'] >= min_score
][:limit]
@app.get("/api/v1/incident/{incident_id}/timeline")
async def get_incident_timeline(incident_id: str) -> dict:
"""Ritorna la timeline degli eventi per un incidente."""
correlator = get_correlator()
clusters = correlator.get_clusters()
cluster = next((c for c in clusters if c.id == incident_id), None)
if not cluster:
raise HTTPException(status_code=404, detail="Incident not found")
sorted_alerts = sorted(cluster.alerts, key=lambda a: a.timestamp)
return {
"incident_id": incident_id,
"timeline": [
{
"timestamp": a.timestamp.isoformat(),
"rule": a.rule_name,
"host": a.host,
"user": a.user,
"technique": a.technique_id,
"severity": a.severity
}
for a in sorted_alerts
],
"duration_minutes": (
(sorted_alerts[-1].timestamp - sorted_alerts[0].timestamp).total_seconds() / 60
) if len(sorted_alerts) > 1 else 0
}
Miary i monitorowanie sukcesu
Każdy system automatyzacji segregacji musi być monitorowany za pomocą obiektywnych wskaźników w celu weryfikacji że faktycznie poprawia efektywność SOC i nie wprowadza nowych problemów.
# Monitoring delle metriche SOC
# File: soc_metrics.py
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class SOCMetrics:
# Metriche chiave
total_alerts: int = 0
correlated_alerts: int = 0
true_positives: int = 0
false_positives: int = 0
total_incidents: int = 0
# Tempi (in minuti)
mttd_values: list[float] = field(default_factory=list) # Mean Time to Detect
mtti_values: list[float] = field(default_factory=list) # Mean Time to Investigate
mttr_values: list[float] = field(default_factory=list) # Mean Time to Respond
def correlation_rate(self) -> float:
"""% di alert correlati in incidenti."""
if self.total_alerts == 0:
return 0.0
return (self.correlated_alerts / self.total_alerts) * 100
def false_positive_rate(self) -> float:
"""% di falsi positivi sul totale investigato."""
total = self.true_positives + self.false_positives
if total == 0:
return 0.0
return (self.false_positives / total) * 100
def avg_mttd(self) -> float:
if not self.mttd_values:
return 0.0
return sum(self.mttd_values) / len(self.mttd_values)
def alert_compression_ratio(self) -> float:
"""Quanti alert per incidente in media (riduzione noise)."""
if self.total_incidents == 0:
return 1.0
return self.correlated_alerts / self.total_incidents
def report(self) -> dict:
return {
"total_alerts": self.total_alerts,
"total_incidents": self.total_incidents,
"alert_compression_ratio": f"{self.alert_compression_ratio():.1f}:1",
"correlation_rate_pct": f"{self.correlation_rate():.1f}%",
"false_positive_rate_pct": f"{self.false_positive_rate():.1f}%",
"avg_mttd_minutes": f"{self.avg_mttd():.1f}",
"avg_mtti_minutes": (
f"{sum(self.mtti_values)/len(self.mtti_values):.1f}"
if self.mtti_values else "N/A"
)
}
Anty-wzorce w automatyzacji segregacji
- Próg korelacji zbyt niski: Powiąż dowolny alert w ciągu 24 godzin na tym samym hoście tworzy ogromne, bezużyteczne klastry. Używaj wąskich okien czasowych (15-30 min) dla słabych korelacji.
- Ocena bez kontekstu zasobów: „Wysoki” alert dotyczący Honeypota i znacznie mniej pilne tego samego alertu na kontrolerze domeny. Zawsze wzbogacaj o krytyczne zasoby.
- Automatyka bez pętli sprzężenia zwrotnego: System musi uczyć się na podstawie opinii użytkowników analityków (TP/FP) w celu poprawy w miarę upływu czasu. System statyczny ulega degradacji.
- Ignoruj zdarzenia związane z pojedynczym alertem: Nie każdy atak powoduje, że jesteś bardziej czujny. Krytyczne izolowane alerty (np. DCSync) muszą ominąć korelację i przejść bezpośrednio do kolejki P1.
Integracja z TheHive i SOAR
System triage integruje się w sposób naturalny z platformami SOAR takimi jak TheHive czy Cortex które zarządzają cyklem życia incydentów i automatyzacją reakcji.
# Integrazione TheHive
# File: thehive_integration.py
import httpx
from datetime import datetime
class TheHiveIntegration:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def create_case_from_cluster(self, cluster: AlertCluster,
score_info: dict) -> str:
"""Crea un caso TheHive da un cluster di alert."""
severity_map = {
'P1 - Critical': 3,
'P2 - High': 2,
'P3 - Medium': 1,
'P4 - Low': 1
}
case_payload = {
"title": f"[AUTO] Incident {cluster.id[:8]} - {cluster.primary_host}",
"description": self._build_description(cluster, score_info),
"severity": severity_map.get(score_info.get('priority', 'P4 - Low'), 1),
"startDate": int(cluster.created_at.timestamp() * 1000),
"tags": [
f"auto-generated",
f"host:{cluster.primary_host}",
f"alerts:{len(cluster.alerts)}",
*[f"att&ck:{t}" for t in cluster.attack_chain[:5]]
],
"tasks": self._generate_tasks(cluster, score_info)
}
with httpx.Client() as client:
response = client.post(
f"{self.base_url}/api/case",
json=case_payload,
headers=self.headers
)
response.raise_for_status()
return response.json()['id']
def _build_description(self, cluster: AlertCluster,
score_info: dict) -> str:
alerts_summary = "\n".join(
f"- [{a.severity.upper()}] {a.rule_name} @ {a.host} ({a.timestamp.strftime('%H:%M:%S')})"
for a in sorted(cluster.alerts, key=lambda x: x.timestamp)
)
return f"""## Alert Cluster Auto-Generated
**Score**: {score_info.get('score', 'N/A')}
**Priority**: {score_info.get('priority', 'N/A')}
**Primary Host**: {cluster.primary_host}
**Alert Count**: {len(cluster.alerts)}
### Attack Chain
{' -> '.join(cluster.attack_chain) if cluster.attack_chain else 'N/A'}
### Alert Timeline
{alerts_summary}
### Score Breakdown
{chr(10).join(f"- {k}: {v}" for k, v in score_info.get('breakdown', {}).items())}
"""
def _generate_tasks(self, cluster: AlertCluster,
score_info: dict) -> list[dict]:
"""Genera task di investigazione automatici."""
tasks = [
{"title": "Verify alert legitimacy", "order": 0},
{"title": f"Investigate host: {cluster.primary_host}", "order": 1},
]
if len(set(a.host for a in cluster.alerts)) > 1:
tasks.append({"title": "Assess lateral movement scope", "order": 2})
if cluster.attack_chain:
tasks.append({"title": "Map attack progression to ATT&CK", "order": 3})
tasks.append({"title": "Document findings and close/escalate", "order": 99})
return tasks
Wnioski i najważniejsze wnioski
Automatyzacja segregacji poprzez analizę wykresów nie jest luksusem: jest koniecznością operacyjną dowolny SOC, który chce skalować bez proporcjonalnego skalowania liczby analityków. Redukcja szumów alertów, korelacja kontekstowa i inteligentne ustalanie priorytetów pozwalają analitykom skupić się na tym, co naprawdę ważne: badaniu realnych zagrożeń.
Kluczowe dania na wynos
- Analiza wykresów przekształca izolowane alerty w kontekstowe scenariusze ataków
- Ocena wieloczynnikowa (ważność + krytyczność zasobów + łańcuch eliminacji + informacje o zagrożeniach) jest lepsza niż prosty ranking według ważności
- NetworkX dla prototypów, Neo4j dla produkcji korporacyjnej
- Integracja z SOAR (TheHive, XSOAR) zamyka pętlę automatyzacji-badań
- Zawsze monitoruj wskaźniki SOC: MTTD, odsetek wyników fałszywie dodatnich, współczynnik kompresji alertów
- Informacje zwrotne od analityków mają fundamentalne znaczenie dla ciągłego doskonalenia systemu
Powiązane artykuły
- Podręcznik SOAR w języku Python: automatyzacja reagowania na incydenty
- Wykrywanie wspomagane sztuczną inteligencją: LLM do generowania reguł Sigma
- Wykrywanie anomalii behawioralnych: ML w danych dziennika
- Inżynieria wykrywania jako dyscyplina: od skryptu do potoku







