Integrare MITRE ATT&CK: cartografierea decalajului de acoperire în mod programatic
Cadrul MITRE ATT&CK (Tactici, tehnici adverse și Common Knowledge) a devenit limbajul comun al securității cibernetice defensive. Cu peste 200 de tehnici de întreprindere documentate, organizate în 14 tactici, ATT&CK reprezintă cea mai completă hartă a comportamentului real al atacatorului. Dar să ai harta nu este suficient: trebuie să știi unde se află organizația dvs pe harta respectivă, adică ce tehnici poți detecta și care reprezintă lacune critice.
În acest articol vom vedea cum integrați programatic MITRE ATT&CK în fluxul de lucru Detection Engineering: cum să descărcați și să analizați datele STIX cadru, cum să mapați regulile existente la tehnici, cum să identificați și prioritizați lacunele de acoperire și cum să generați hărți termice și rapoarte automate pentru a comunica conducerii postura de detectare.
Ce veți învăța în acest articol
- Structura cadrului ATT&CK: tactici, tehnici, sub-tehnici, proceduri
- Date ATT&CK STIX: descărcare, analizare și interogare prin Python
- Maparea automată a regulilor Sigma -> tehnici ATT&CK
- Analiza acoperirii: metrici cantitative de acoperire
- Identificarea decalajelor și prioritizarea pe baza informațiilor despre amenințări
- Generarea de hărți termice ATT&CK Navigator în format JSON
- Raportare automată pentru CISO și management
- Integrare cu pipeline DaC pentru urmărirea continuă a acoperirii
Cadrul ATT&CK: Structură și concepte cheie
MITRE ATT&CK organizează într-o singură cunoștințele despre tehnicile de atac ierarhie pe trei niveluri:
- Tactici (14): obiectivul atacatorului în orice moment dat. Exemplu: „Acces la acreditări” (fura acreditări), „Mișcare laterală” (extinde în rețea). Tactica este „de ce” unei acțiuni rău intenționate.
- Tehnici (200+): metoda specifică utilizată pentru a realiza scopul tacticii. Exemplu: T1003 „Dumping Credential OS”. Tehnicile Eu sunt „cum”.
- Subtehnici (400+): variante specifice ale unei tehnici. Exemplu: T1003.001 „Memorie LSASS”, T1003.002 „Manager cont de securitate”. Sub-tehnicile sunt „cum anume”.
- Proceduri: a documentat cazuri specifice ale modului în care un grup APT a folosit o tehnică. Ele oferă contextul real al domeniului.
Descărcați și analizați datele ATT&CK STIX
MITRE publică întregul cadru în format STIX 2.1 pe GitHub. Să folosim
biblioteca mitreattack-python pentru a accesa datele într-un fel
structurat:
pip install mitreattack-python requests pandas matplotlib
#!/usr/bin/env python3
"""
ATT&CK Framework Data Client
Scarica e analizza i dati STIX di MITRE ATT&CK.
"""
import json
import requests
from pathlib import Path
from dataclasses import dataclass, field
from mitreattack.stix20 import MitreAttackData
@dataclass
class AttackTechnique:
id: str # Es: T1003
name: str # Es: OS Credential Dumping
tactic: str # Es: credential-access
description: str
is_subtechnique: bool
parent_id: str = "" # Es: T1003 per T1003.001
platforms: list[str] = field(default_factory=list)
data_sources: list[str] = field(default_factory=list)
detection_notes: str = ""
class AttackFrameworkClient:
ENTERPRISE_URL = (
"https://raw.githubusercontent.com/mitre/cti/master/"
"enterprise-attack/enterprise-attack.json"
)
def __init__(self, cache_path: str = "attack_data.json"):
self.cache_path = Path(cache_path)
self._ensure_data()
self.attack_data = MitreAttackData(str(self.cache_path))
def _ensure_data(self) -> None:
"""Scarica i dati ATT&CK se non presenti in cache."""
if not self.cache_path.exists():
print("Downloading MITRE ATT&CK data...")
resp = requests.get(self.ENTERPRISE_URL, timeout=60)
resp.raise_for_status()
with open(self.cache_path, 'w') as f:
json.dump(resp.json(), f)
print(f"Data saved to {self.cache_path}")
def get_all_techniques(self) -> list[AttackTechnique]:
"""Recupera tutte le tecniche (incluse sub-tecniche) dal framework."""
techniques = []
stix_techniques = self.attack_data.get_techniques(remove_revoked_deprecated=True)
for t in stix_techniques:
# Estrae ID ATT&CK dalla reference esterna
attack_id = ""
for ref in t.get("external_references", []):
if ref.get("source_name") == "mitre-attack":
attack_id = ref.get("external_id", "")
break
if not attack_id:
continue
# Determina la tattica primaria
kill_chain_phases = t.get("kill_chain_phases", [])
tactic = (kill_chain_phases[0].get("phase_name", "unknown")
if kill_chain_phases else "unknown")
# Determina se e sub-tecnica
is_subtechnique = "." in attack_id
parent_id = attack_id.split(".")[0] if is_subtechnique else ""
techniques.append(AttackTechnique(
id=attack_id,
name=t.get("name", ""),
tactic=tactic,
description=t.get("description", "")[:500],
is_subtechnique=is_subtechnique,
parent_id=parent_id,
platforms=t.get("x_mitre_platforms", []),
data_sources=t.get("x_mitre_data_sources", []),
detection_notes=t.get("x_mitre_detection", "")[:300]
))
return techniques
def get_techniques_by_tactic(
self, tactic: str
) -> list[AttackTechnique]:
"""Filtra tecniche per tattica."""
all_techniques = self.get_all_techniques()
return [t for t in all_techniques if t.tactic == tactic]
def get_technique_by_id(
self, technique_id: str
) -> AttackTechnique | None:
"""Trova una tecnica specifica per ID."""
all_techniques = self.get_all_techniques()
for t in all_techniques:
if t.id == technique_id:
return t
return None
# Uso
client = AttackFrameworkClient()
techniques = client.get_all_techniques()
print(f"Tecniche caricate: {len(techniques)}")
print(f"Sub-tecniche: {sum(1 for t in techniques if t.is_subtechnique)}")
print(f"Tecniche padre: {sum(1 for t in techniques if not t.is_subtechnique)}")
Maparea regulilor Sigma la tehnicile ATT&CK
Regulile Sigma bine scrise conțin etichete ATT&CK în format
attack.t1234.001. Acest lucru ne permite să construim automat
harta de acoperire:
#!/usr/bin/env python3
"""
Sigma to ATT&CK Coverage Mapper
Mappa le regole Sigma alle tecniche ATT&CK e calcola la copertura.
"""
import yaml
from pathlib import Path
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class RuleInfo:
path: str
title: str
status: str # experimental, test, stable
level: str # low, medium, high, critical
tags: list[str] = field(default_factory=list)
@dataclass
class TechniqueCoverage:
technique_id: str
technique_name: str
tactic: str
rules: list[RuleInfo] = field(default_factory=list)
@property
def coverage_score(self) -> float:
"""Score da 0 a 1 basato su numero e qualità delle regole."""
if not self.rules:
return 0.0
score = 0.0
for rule in self.rules:
# Peso in base allo status
status_weight = {
"stable": 1.0,
"test": 0.5,
"experimental": 0.25
}.get(rule.status, 0.1)
# Peso in base al livello di severita
level_weight = {
"critical": 1.0,
"high": 0.8,
"medium": 0.5,
"low": 0.3,
"informational": 0.1
}.get(rule.level, 0.1)
score += status_weight * level_weight
# Normalizza con cap a 1.0
return min(1.0, score)
@property
def stable_rule_count(self) -> int:
return sum(1 for r in self.rules if r.status == "stable")
def extract_attack_tags(rule: dict) -> list[str]:
"""Estrae gli ID tecnica ATT&CK dai tag della regola Sigma."""
technique_ids = []
for tag in rule.get("tags", []):
tag_lower = tag.lower()
if tag_lower.startswith("attack.t"):
# Estrae l'ID: "attack.t1003.001" -> "T1003.001"
tech_id = tag.replace("attack.", "").upper()
if len(tech_id) >= 5: # Minimo "T1234"
technique_ids.append(tech_id)
return technique_ids
def build_coverage_map(
rules_dir: str,
attack_client: AttackFrameworkClient
) -> dict[str, TechniqueCoverage]:
"""
Analizza il repository Sigma e costruisce la mappa di copertura
per ogni tecnica ATT&CK.
"""
coverage_map: dict[str, TechniqueCoverage] = {}
rules_path = Path(rules_dir)
for rule_file in rules_path.rglob("*.yml"):
try:
with open(rule_file, encoding='utf-8') as f:
rule = yaml.safe_load(f)
except Exception as e:
print(f"Error reading {rule_file}: {e}")
continue
if not isinstance(rule, dict):
continue
technique_ids = extract_attack_tags(rule)
if not technique_ids:
continue
rule_info = RuleInfo(
path=str(rule_file.relative_to(rules_path)),
title=rule.get("title", "Unknown"),
status=rule.get("status", "experimental"),
level=rule.get("level", "medium"),
tags=rule.get("tags", [])
)
for tech_id in technique_ids:
if tech_id not in coverage_map:
# Recupera info dalla libreria ATT&CK
technique = attack_client.get_technique_by_id(tech_id)
coverage_map[tech_id] = TechniqueCoverage(
technique_id=tech_id,
technique_name=technique.name if technique else "Unknown",
tactic=technique.tactic if technique else "unknown"
)
coverage_map[tech_id].rules.append(rule_info)
return coverage_map
Analiza decalajelor și prioritizarea
Odată ce harta de acoperire a fost construită, cel mai important pas este identifica lacunele și prioritizează-le. Nu toate golurile sunt la fel urgență: un decalaj pe T1190 (Exploit Public-Facing Application) și multe mai critic decât un decalaj pe T1586 (Conturi de compromis) pentru majoritatea parte a organizaţiilor.
@dataclass
class CoverageGap:
technique_id: str
technique_name: str
tactic: str
priority_score: float
reasons: list[str] = field(default_factory=list)
def calculate_gap_priority(
technique: AttackTechnique,
threat_intel: dict # Frequenza osservata nei report di threat intel
) -> float:
"""
Calcola un priority score per un gap di copertura.
Score più alto = gap più urgente da colmare.
Fattori considerati:
- Frequenza di utilizzo dagli APT group (da threat intel)
- Disponibilità di data sources necessari
- Impatto potenziale della tecnica
"""
score = 0.0
reasons = []
# Fattore 1: Frequenza nelle campagne recenti (0-40 punti)
freq = threat_intel.get(technique.id, 0)
freq_score = min(40.0, freq * 10)
score += freq_score
if freq > 2:
reasons.append(f"Observed in {freq} recent threat campaigns")
# Fattore 2: Tattiche ad alto impatto (0-30 punti)
high_impact_tactics = [
"credential-access", "lateral-movement",
"impact", "exfiltration", "command-and-control"
]
if technique.tactic in high_impact_tactics:
score += 30.0
reasons.append(f"High-impact tactic: {technique.tactic}")
# Fattore 3: Data sources disponibili (0-20 punti)
# Tecniche con molti data sources sono più facili da rilevare
available_sources = len(technique.data_sources)
if available_sources >= 3:
score += 20.0
reasons.append("Multiple data sources available for detection")
elif available_sources >= 1:
score += 10.0
# Fattore 4: Supporto piattaforme comuni (0-10 punti)
if "Windows" in technique.platforms:
score += 5.0
if "Linux" in technique.platforms or "macOS" in technique.platforms:
score += 5.0
return score
def identify_coverage_gaps(
all_techniques: list[AttackTechnique],
coverage_map: dict[str, TechniqueCoverage],
threat_intel: dict,
min_coverage_score: float = 0.5
) -> list[CoverageGap]:
"""
Identifica le tecniche con copertura insufficiente
e le ordina per priorità.
"""
gaps = []
for technique in all_techniques:
coverage = coverage_map.get(technique.id)
current_score = coverage.coverage_score if coverage else 0.0
if current_score < min_coverage_score:
priority = calculate_gap_priority(technique, threat_intel)
reasons = []
if current_score == 0:
reasons.append("No detection rules exist")
else:
reasons.append(
f"Insufficient coverage score: {current_score:.2f}/1.0"
)
gaps.append(CoverageGap(
technique_id=technique.id,
technique_name=technique.name,
tactic=technique.tactic,
priority_score=priority,
reasons=reasons
))
# Ordina per priority score decrescente
return sorted(gaps, key=lambda g: g.priority_score, reverse=True)
# Esempio di threat intel basata su report pubblici
THREAT_INTEL_FREQUENCY = {
"T1059.001": 8, # PowerShell - molto comune
"T1003.001": 7, # LSASS Memory - molto comune
"T1566.001": 9, # Spearphishing Attachment
"T1190": 6, # Exploit Public-Facing Application
"T1021.001": 5, # Remote Desktop Protocol
"T1053.005": 4, # Scheduled Task
"T1078": 6, # Valid Accounts
}
Generați Heatmap pentru ATT&CK Navigator
Navigator ATT&CK și instrumentul oficial MITRE pentru vizualizați acoperirea pe o matrice interactivă. Acceptați intrarea în Format JSON pe care îl putem genera programatic:
def generate_navigator_layer(
coverage_map: dict[str, TechniqueCoverage],
gaps: list[CoverageGap],
layer_name: str = "Detection Coverage"
) -> dict:
"""
Genera un layer JSON per ATT&CK Navigator che mostra
la copertura con colori: verde = buona, rosso = gap critico.
"""
techniques_layer = []
# Tecniche con copertura
for tech_id, coverage in coverage_map.items():
score = coverage.coverage_score
if score >= 0.8:
color = "#4caf50" # Verde: copertura eccellente
comment = f"Covered by {coverage.stable_rule_count} stable rules"
elif score >= 0.5:
color = "#ff9800" # Arancione: copertura parziale
comment = "Partial coverage - needs improvement"
else:
color = "#f44336" # Rosso: copertura insufficiente
comment = "Insufficient coverage"
techniques_layer.append({
"techniqueID": tech_id,
"color": color,
"comment": comment,
"score": round(score * 100),
"metadata": [
{"name": "rule_count", "value": str(len(coverage.rules))},
{"name": "stable_rules",
"value": str(coverage.stable_rule_count)}
]
})
# Gap critici senza copertura
covered_ids = set(coverage_map.keys())
for gap in gaps[:20]: # Top 20 gap prioritari
if gap.technique_id not in covered_ids:
techniques_layer.append({
"techniqueID": gap.technique_id,
"color": "#b71c1c", # Rosso scuro: gap critico
"comment": f"PRIORITY GAP (score: {gap.priority_score:.0f}): " +
", ".join(gap.reasons),
"score": 0
})
return {
"name": layer_name,
"versions": {
"attack": "14",
"navigator": "4.9",
"layer": "4.5"
},
"domain": "enterprise-attack",
"description": f"Auto-generated coverage layer - {len(coverage_map)} techniques covered",
"filters": {"platforms": ["Windows", "Linux", "macOS"]},
"sorting": 3,
"layout": {"layout": "side", "aggregateFunction": "average"},
"hideDisabled": False,
"techniques": techniques_layer,
"gradient": {
"colors": ["#ff6666", "#ffe766", "#8ec843"],
"minValue": 0,
"maxValue": 100
},
"legendItems": [
{"label": "No coverage", "color": "#b71c1c"},
{"label": "Partial coverage", "color": "#ff9800"},
{"label": "Good coverage", "color": "#4caf50"}
]
}
Raport automat pentru management
Un raport care poate fi citit de conducere trebuie să comunice postura de securitate într-un mod clar și practic:
def generate_executive_report(
all_techniques: list[AttackTechnique],
coverage_map: dict[str, TechniqueCoverage],
gaps: list[CoverageGap]
) -> str:
"""Genera un report testuale executive-friendly."""
total = len(all_techniques)
covered = len(coverage_map)
well_covered = sum(
1 for c in coverage_map.values() if c.coverage_score >= 0.8
)
partial = sum(
1 for c in coverage_map.values()
if 0.5 <= c.coverage_score < 0.8
)
insufficient = covered - well_covered - partial
uncovered = total - covered
coverage_pct = covered / total * 100
good_pct = well_covered / total * 100
report = f"""
=== DETECTION ENGINEERING COVERAGE REPORT ===
Generated: 2026-03-09
EXECUTIVE SUMMARY
-----------------
ATT&CK Techniques Total: {total:4d}
Covered (any level): {covered:4d} ({coverage_pct:.1f}%)
Well-covered (score >= 0.8): {well_covered:4d} ({good_pct:.1f}%)
Partial coverage: {partial:4d}
Insufficient coverage: {insufficient:4d}
Not covered: {uncovered:4d}
TOP 10 PRIORITY GAPS (by risk score)
--------------------------------------
"""
for i, gap in enumerate(gaps[:10], 1):
report += (
f"{i:2d}. [{gap.technique_id}] {gap.technique_name}\n"
f" Tactic: {gap.tactic}\n"
f" Priority Score: {gap.priority_score:.0f}/100\n"
f" Reasons: {', '.join(gap.reasons[:2])}\n\n"
)
report += "COVERAGE BY TACTIC\n------------------\n"
tactic_stats: dict[str, dict] = {}
for coverage in coverage_map.values():
tac = coverage.tactic
if tac not in tactic_stats:
tactic_stats[tac] = {"covered": 0, "total": 0, "score_sum": 0}
tactic_stats[tac]["covered"] += 1
tactic_stats[tac]["score_sum"] += coverage.coverage_score
for tac, stats in sorted(tactic_stats.items()):
avg_score = stats["score_sum"] / stats["covered"] * 100
report += f"{tac:35s} {stats['covered']:3d} rules, avg score {avg_score:.0f}%\n"
return report
Integrarea în conducta DaC
Raportul de acoperire trebuie să fie generat automat la fiecare îmbinare în principal și publicat ca artefact CI/CD pipeline:
# .github/workflows/coverage-report.yml
name: ATT&CK Coverage Report
on:
push:
branches: [main]
schedule:
- cron: '0 8 * * 1' # Ogni lunedi mattina
jobs:
coverage-analysis:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: pip install mitreattack-python pyyaml requests
- name: Generate coverage report
run: |
python scripts/attck_coverage.py \
--rules-dir ./rules \
--output-json reports/coverage.json \
--output-layer reports/navigator_layer.json \
--output-report reports/executive_report.txt
- name: Comment on PR with coverage delta
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const report = fs.readFileSync('reports/executive_report.txt', 'utf8');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: '## ATT&CK Coverage Update\n```\n' + report + '\n```'
});
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: attck-coverage-${{ github.sha }}
path: reports/
retention-days: 90
Cele mai bune practici pentru analiza acoperirii
| Practica | Descriere | Impact |
|---|---|---|
| Concentrați-vă pe cele mai utilizate tehnici | Utilizați rapoarte precum M-Trends, Verizon DBIR pentru a stabili prioritățile | Ridicat |
| Luați în considerare sursele de date | Nu poți detecta ceea ce nu vezi. Mai întâi auditați sursele de date | Critic |
| Distinge platformele relevante | Dacă nu aveți macOS, nu acordați prioritate tehnicilor doar pentru macOS | Mediu |
| Urmărirea în timp | Măsurați delta de acoperire săptămână de săptămână | Ridicat |
| Validați regulile periodic | Este posibil ca o regulă care nu a fost testată de peste 6 luni să nu mai funcționeze | Ridicat |
Greșeli frecvente în analiza acoperirii
- Numără regulile, nu calitatea: 10 reguli experimentale valorează mai puțin de 2 reguli stabile testate. Folosiți coverage_score ponderat
- Ignorați sursele de date: dacă nu aveți Sysmon instalat, multe tehnici Windows sunt nedetectabile, indiferent de reguli
- Acoperire 100% este imposibilă: unele tehnici au detectarea foarte dificilă (trăind din pământ). Concentrați-vă pe goluri risc ridicat și detectabilitate ridicată
- Raportați fără acțiune: fiecare raport de decalaj trebuie să genereze un stoc de elemente de detectare cu proprietar și termen limită
Concluzii
Integrați MITRE ATT&CK în mod programatic în fluxul dvs. de lucru Detection Engineering transformă analiza acoperirii dintr-o activitate manuală și subiectivă într-un proces automatizat, obiectiv și continuu. Rezultatul este un program de detectare măsurabile, prioritizate în funcție de amenințările reale și comunicabile conducerii cu date concrete.
Următorul articol din serie
În următorul articol vom vedea cum să automatizăm răspunsul la alerte cu SOAR Playbook în Python, orchestrarea acțiunilor de izolarea automată, îmbogățirea și notificarea.







