Integracja MITRE ATT&CK: Programowe mapowanie luki w pokryciu
Ramy AT&CK UKOŚNIENIA (Taktyka kontradyktoryjna, techniki i Wiedza Powszechna) stała się powszechnym językiem cyberbezpieczeństwa defensywnego. Dzięki ponad 200 udokumentowanym technikom przedsiębiorczości, zorganizowanym w 14 taktyk, ATT&CK reprezentuje najbardziej kompletną mapę zachowań prawdziwego atakującego. Ale posiadanie mapy nie wystarczy: trzeba wiedzieć gdzie znajduje się Twoja organizacja na tej mapie, czyli jakie techniki możesz wykryć, a które reprezentują krytyczne luki.
W tym artykule zobaczymy, jak to zrobić programowo integruj MITRE ATT&CK w procesie inżynierii wykrywania: jak pobierać i analizować dane STIX ramy, jak mapować istniejące zasady na techniki, jak identyfikować i ustalać priorytety luk w zasięgu oraz generować mapy cieplne i automatyczne raporty poinformować kierownictwo o sposobie wykrywania.
Czego dowiesz się w tym artykule
- Struktura struktury ATT&CK: taktyka, techniki, podtechniki, procedury
- Dane ATT&CK STIX: pobieranie, analizowanie i wysyłanie zapytań za pośrednictwem Pythona
- Automatyczne mapowanie reguł Sigmy -> Techniki ATT&CK
- Analiza zasięgu: ilościowe wskaźniki pokrycia
- Identyfikacja luk i ustalanie priorytetów w oparciu o analizę zagrożeń
- Generowanie map cieplnych ATT&CK Navigator w formacie JSON
- Automatyczne raportowanie dla CISO i kierownictwa
- Integracja z potokiem DaC w celu ciągłego śledzenia zasięgu
Ramy ATT&CK: struktura i kluczowe pojęcia
MITRE ATT&CK porządkuje wiedzę o technikach ataku w jedną całość hierarchia trójstopniowa:
- Taktyka (14): cel atakującego w danym momencie. Przykład: „Dostęp do danych uwierzytelniających” (kradzież danych uwierzytelniających), „Ruch boczny” (rozwiń w sieci). Taktyka jest „dlaczego” złośliwego działania.
- Techniki (200+): konkretna metoda zastosowana do osiągnięcia cel taktyki. Przykład: T1003 „Zrzut danych uwierzytelniających systemu operacyjnego”. Techniki Jestem „jak”.
- Podtechniki (400+): specyficzne odmiany techniki. Przykład: T1003.001 „Pamięć LSASS”, T1003.002 „Menedżer kont zabezpieczeń”. Podtechniki to „jak dokładnie”.
- Procedury: udokumentowane konkretne przypadki działania grupy APT zastosowało pewną technikę. Zapewniają prawdziwy kontekst danej dziedziny.
Pobierz i przeanalizuj dane ATT&CK STIX
MITER publikuje cały framework w formacie STIX 2.1 na GitHubie. Użyjmy
biblioteka mitreattack-python aby w jakiś sposób uzyskać dostęp do danych
zorganizowany:
pip install mitreattack-python requests pandas matplotlib
#!/usr/bin/env python3
"""
ATT&CK Framework Data Client
Scarica e analizza i dati STIX di MITRE ATT&CK.
"""
import json
import requests
from pathlib import Path
from dataclasses import dataclass, field
from mitreattack.stix20 import MitreAttackData
@dataclass
class AttackTechnique:
id: str # Es: T1003
name: str # Es: OS Credential Dumping
tactic: str # Es: credential-access
description: str
is_subtechnique: bool
parent_id: str = "" # Es: T1003 per T1003.001
platforms: list[str] = field(default_factory=list)
data_sources: list[str] = field(default_factory=list)
detection_notes: str = ""
class AttackFrameworkClient:
ENTERPRISE_URL = (
"https://raw.githubusercontent.com/mitre/cti/master/"
"enterprise-attack/enterprise-attack.json"
)
def __init__(self, cache_path: str = "attack_data.json"):
self.cache_path = Path(cache_path)
self._ensure_data()
self.attack_data = MitreAttackData(str(self.cache_path))
def _ensure_data(self) -> None:
"""Scarica i dati ATT&CK se non presenti in cache."""
if not self.cache_path.exists():
print("Downloading MITRE ATT&CK data...")
resp = requests.get(self.ENTERPRISE_URL, timeout=60)
resp.raise_for_status()
with open(self.cache_path, 'w') as f:
json.dump(resp.json(), f)
print(f"Data saved to {self.cache_path}")
def get_all_techniques(self) -> list[AttackTechnique]:
"""Recupera tutte le tecniche (incluse sub-tecniche) dal framework."""
techniques = []
stix_techniques = self.attack_data.get_techniques(remove_revoked_deprecated=True)
for t in stix_techniques:
# Estrae ID ATT&CK dalla reference esterna
attack_id = ""
for ref in t.get("external_references", []):
if ref.get("source_name") == "mitre-attack":
attack_id = ref.get("external_id", "")
break
if not attack_id:
continue
# Determina la tattica primaria
kill_chain_phases = t.get("kill_chain_phases", [])
tactic = (kill_chain_phases[0].get("phase_name", "unknown")
if kill_chain_phases else "unknown")
# Determina se e sub-tecnica
is_subtechnique = "." in attack_id
parent_id = attack_id.split(".")[0] if is_subtechnique else ""
techniques.append(AttackTechnique(
id=attack_id,
name=t.get("name", ""),
tactic=tactic,
description=t.get("description", "")[:500],
is_subtechnique=is_subtechnique,
parent_id=parent_id,
platforms=t.get("x_mitre_platforms", []),
data_sources=t.get("x_mitre_data_sources", []),
detection_notes=t.get("x_mitre_detection", "")[:300]
))
return techniques
def get_techniques_by_tactic(
self, tactic: str
) -> list[AttackTechnique]:
"""Filtra tecniche per tattica."""
all_techniques = self.get_all_techniques()
return [t for t in all_techniques if t.tactic == tactic]
def get_technique_by_id(
self, technique_id: str
) -> AttackTechnique | None:
"""Trova una tecnica specifica per ID."""
all_techniques = self.get_all_techniques()
for t in all_techniques:
if t.id == technique_id:
return t
return None
# Uso
client = AttackFrameworkClient()
techniques = client.get_all_techniques()
print(f"Tecniche caricate: {len(techniques)}")
print(f"Sub-tecniche: {sum(1 for t in techniques if t.is_subtechnique)}")
print(f"Tecniche padre: {sum(1 for t in techniques if not t.is_subtechnique)}")
Mapowanie reguł Sigmy na techniki ATT&CK
Dobrze napisane reguły Sigma zawierają w formacie znaczniki ATT&CK
attack.t1234.001. Dzięki temu możemy budować automatycznie
mapa zasięgu:
#!/usr/bin/env python3
"""
Sigma to ATT&CK Coverage Mapper
Mappa le regole Sigma alle tecniche ATT&CK e calcola la copertura.
"""
import yaml
from pathlib import Path
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class RuleInfo:
path: str
title: str
status: str # experimental, test, stable
level: str # low, medium, high, critical
tags: list[str] = field(default_factory=list)
@dataclass
class TechniqueCoverage:
technique_id: str
technique_name: str
tactic: str
rules: list[RuleInfo] = field(default_factory=list)
@property
def coverage_score(self) -> float:
"""Score da 0 a 1 basato su numero e qualità delle regole."""
if not self.rules:
return 0.0
score = 0.0
for rule in self.rules:
# Peso in base allo status
status_weight = {
"stable": 1.0,
"test": 0.5,
"experimental": 0.25
}.get(rule.status, 0.1)
# Peso in base al livello di severita
level_weight = {
"critical": 1.0,
"high": 0.8,
"medium": 0.5,
"low": 0.3,
"informational": 0.1
}.get(rule.level, 0.1)
score += status_weight * level_weight
# Normalizza con cap a 1.0
return min(1.0, score)
@property
def stable_rule_count(self) -> int:
return sum(1 for r in self.rules if r.status == "stable")
def extract_attack_tags(rule: dict) -> list[str]:
"""Estrae gli ID tecnica ATT&CK dai tag della regola Sigma."""
technique_ids = []
for tag in rule.get("tags", []):
tag_lower = tag.lower()
if tag_lower.startswith("attack.t"):
# Estrae l'ID: "attack.t1003.001" -> "T1003.001"
tech_id = tag.replace("attack.", "").upper()
if len(tech_id) >= 5: # Minimo "T1234"
technique_ids.append(tech_id)
return technique_ids
def build_coverage_map(
rules_dir: str,
attack_client: AttackFrameworkClient
) -> dict[str, TechniqueCoverage]:
"""
Analizza il repository Sigma e costruisce la mappa di copertura
per ogni tecnica ATT&CK.
"""
coverage_map: dict[str, TechniqueCoverage] = {}
rules_path = Path(rules_dir)
for rule_file in rules_path.rglob("*.yml"):
try:
with open(rule_file, encoding='utf-8') as f:
rule = yaml.safe_load(f)
except Exception as e:
print(f"Error reading {rule_file}: {e}")
continue
if not isinstance(rule, dict):
continue
technique_ids = extract_attack_tags(rule)
if not technique_ids:
continue
rule_info = RuleInfo(
path=str(rule_file.relative_to(rules_path)),
title=rule.get("title", "Unknown"),
status=rule.get("status", "experimental"),
level=rule.get("level", "medium"),
tags=rule.get("tags", [])
)
for tech_id in technique_ids:
if tech_id not in coverage_map:
# Recupera info dalla libreria ATT&CK
technique = attack_client.get_technique_by_id(tech_id)
coverage_map[tech_id] = TechniqueCoverage(
technique_id=tech_id,
technique_name=technique.name if technique else "Unknown",
tactic=technique.tactic if technique else "unknown"
)
coverage_map[tech_id].rules.append(rule_info)
return coverage_map
Analiza luk i ustalanie priorytetów
Po zbudowaniu mapy zasięgu najważniejszym krokiem jest zidentyfikować luki i nadać im priorytety. Nie wszystkie luki są takie same pilność: luka w T1190 (aplikacja typu Exploit Public Facing) i wiele dla większości bardziej krytyczna niż luka w T1586 (konta kompromisowe). część organizacji.
@dataclass
class CoverageGap:
technique_id: str
technique_name: str
tactic: str
priority_score: float
reasons: list[str] = field(default_factory=list)
def calculate_gap_priority(
technique: AttackTechnique,
threat_intel: dict # Frequenza osservata nei report di threat intel
) -> float:
"""
Calcola un priority score per un gap di copertura.
Score più alto = gap più urgente da colmare.
Fattori considerati:
- Frequenza di utilizzo dagli APT group (da threat intel)
- Disponibilità di data sources necessari
- Impatto potenziale della tecnica
"""
score = 0.0
reasons = []
# Fattore 1: Frequenza nelle campagne recenti (0-40 punti)
freq = threat_intel.get(technique.id, 0)
freq_score = min(40.0, freq * 10)
score += freq_score
if freq > 2:
reasons.append(f"Observed in {freq} recent threat campaigns")
# Fattore 2: Tattiche ad alto impatto (0-30 punti)
high_impact_tactics = [
"credential-access", "lateral-movement",
"impact", "exfiltration", "command-and-control"
]
if technique.tactic in high_impact_tactics:
score += 30.0
reasons.append(f"High-impact tactic: {technique.tactic}")
# Fattore 3: Data sources disponibili (0-20 punti)
# Tecniche con molti data sources sono più facili da rilevare
available_sources = len(technique.data_sources)
if available_sources >= 3:
score += 20.0
reasons.append("Multiple data sources available for detection")
elif available_sources >= 1:
score += 10.0
# Fattore 4: Supporto piattaforme comuni (0-10 punti)
if "Windows" in technique.platforms:
score += 5.0
if "Linux" in technique.platforms or "macOS" in technique.platforms:
score += 5.0
return score
def identify_coverage_gaps(
all_techniques: list[AttackTechnique],
coverage_map: dict[str, TechniqueCoverage],
threat_intel: dict,
min_coverage_score: float = 0.5
) -> list[CoverageGap]:
"""
Identifica le tecniche con copertura insufficiente
e le ordina per priorità.
"""
gaps = []
for technique in all_techniques:
coverage = coverage_map.get(technique.id)
current_score = coverage.coverage_score if coverage else 0.0
if current_score < min_coverage_score:
priority = calculate_gap_priority(technique, threat_intel)
reasons = []
if current_score == 0:
reasons.append("No detection rules exist")
else:
reasons.append(
f"Insufficient coverage score: {current_score:.2f}/1.0"
)
gaps.append(CoverageGap(
technique_id=technique.id,
technique_name=technique.name,
tactic=technique.tactic,
priority_score=priority,
reasons=reasons
))
# Ordina per priority score decrescente
return sorted(gaps, key=lambda g: g.priority_score, reverse=True)
# Esempio di threat intel basata su report pubblici
THREAT_INTEL_FREQUENCY = {
"T1059.001": 8, # PowerShell - molto comune
"T1003.001": 7, # LSASS Memory - molto comune
"T1566.001": 9, # Spearphishing Attachment
"T1190": 6, # Exploit Public-Facing Application
"T1021.001": 5, # Remote Desktop Protocol
"T1053.005": 4, # Scheduled Task
"T1078": 6, # Valid Accounts
}
Wygeneruj mapę cieplną dla nawigatora ATT&CK
Nawigator ATT&CK oraz oficjalne narzędzie MITER dla przeglądaj zasięg na interaktywnej matrycy. Zaakceptuj dane wejściowe Format JSON, który możemy wygenerować programowo:
def generate_navigator_layer(
coverage_map: dict[str, TechniqueCoverage],
gaps: list[CoverageGap],
layer_name: str = "Detection Coverage"
) -> dict:
"""
Genera un layer JSON per ATT&CK Navigator che mostra
la copertura con colori: verde = buona, rosso = gap critico.
"""
techniques_layer = []
# Tecniche con copertura
for tech_id, coverage in coverage_map.items():
score = coverage.coverage_score
if score >= 0.8:
color = "#4caf50" # Verde: copertura eccellente
comment = f"Covered by {coverage.stable_rule_count} stable rules"
elif score >= 0.5:
color = "#ff9800" # Arancione: copertura parziale
comment = "Partial coverage - needs improvement"
else:
color = "#f44336" # Rosso: copertura insufficiente
comment = "Insufficient coverage"
techniques_layer.append({
"techniqueID": tech_id,
"color": color,
"comment": comment,
"score": round(score * 100),
"metadata": [
{"name": "rule_count", "value": str(len(coverage.rules))},
{"name": "stable_rules",
"value": str(coverage.stable_rule_count)}
]
})
# Gap critici senza copertura
covered_ids = set(coverage_map.keys())
for gap in gaps[:20]: # Top 20 gap prioritari
if gap.technique_id not in covered_ids:
techniques_layer.append({
"techniqueID": gap.technique_id,
"color": "#b71c1c", # Rosso scuro: gap critico
"comment": f"PRIORITY GAP (score: {gap.priority_score:.0f}): " +
", ".join(gap.reasons),
"score": 0
})
return {
"name": layer_name,
"versions": {
"attack": "14",
"navigator": "4.9",
"layer": "4.5"
},
"domain": "enterprise-attack",
"description": f"Auto-generated coverage layer - {len(coverage_map)} techniques covered",
"filters": {"platforms": ["Windows", "Linux", "macOS"]},
"sorting": 3,
"layout": {"layout": "side", "aggregateFunction": "average"},
"hideDisabled": False,
"techniques": techniques_layer,
"gradient": {
"colors": ["#ff6666", "#ffe766", "#8ec843"],
"minValue": 0,
"maxValue": 100
},
"legendItems": [
{"label": "No coverage", "color": "#b71c1c"},
{"label": "Partial coverage", "color": "#ff9800"},
{"label": "Good coverage", "color": "#4caf50"}
]
}
Automatyczny raport dla kierownictwa
Raport czytelny dla kierownictwa musi informować o stanie zabezpieczeń w jasny i praktyczny sposób:
def generate_executive_report(
all_techniques: list[AttackTechnique],
coverage_map: dict[str, TechniqueCoverage],
gaps: list[CoverageGap]
) -> str:
"""Genera un report testuale executive-friendly."""
total = len(all_techniques)
covered = len(coverage_map)
well_covered = sum(
1 for c in coverage_map.values() if c.coverage_score >= 0.8
)
partial = sum(
1 for c in coverage_map.values()
if 0.5 <= c.coverage_score < 0.8
)
insufficient = covered - well_covered - partial
uncovered = total - covered
coverage_pct = covered / total * 100
good_pct = well_covered / total * 100
report = f"""
=== DETECTION ENGINEERING COVERAGE REPORT ===
Generated: 2026-03-09
EXECUTIVE SUMMARY
-----------------
ATT&CK Techniques Total: {total:4d}
Covered (any level): {covered:4d} ({coverage_pct:.1f}%)
Well-covered (score >= 0.8): {well_covered:4d} ({good_pct:.1f}%)
Partial coverage: {partial:4d}
Insufficient coverage: {insufficient:4d}
Not covered: {uncovered:4d}
TOP 10 PRIORITY GAPS (by risk score)
--------------------------------------
"""
for i, gap in enumerate(gaps[:10], 1):
report += (
f"{i:2d}. [{gap.technique_id}] {gap.technique_name}\n"
f" Tactic: {gap.tactic}\n"
f" Priority Score: {gap.priority_score:.0f}/100\n"
f" Reasons: {', '.join(gap.reasons[:2])}\n\n"
)
report += "COVERAGE BY TACTIC\n------------------\n"
tactic_stats: dict[str, dict] = {}
for coverage in coverage_map.values():
tac = coverage.tactic
if tac not in tactic_stats:
tactic_stats[tac] = {"covered": 0, "total": 0, "score_sum": 0}
tactic_stats[tac]["covered"] += 1
tactic_stats[tac]["score_sum"] += coverage.coverage_score
for tac, stats in sorted(tactic_stats.items()):
avg_score = stats["score_sum"] / stats["covered"] * 100
report += f"{tac:35s} {stats['covered']:3d} rules, avg score {avg_score:.0f}%\n"
return report
Integracja z rurociągiem DaC
Raport zasięgu musi być generowany automatycznie przy każdym połączeniu z głównym i opublikowany jako artefakt potoku CI/CD:
# .github/workflows/coverage-report.yml
name: ATT&CK Coverage Report
on:
push:
branches: [main]
schedule:
- cron: '0 8 * * 1' # Ogni lunedi mattina
jobs:
coverage-analysis:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: pip install mitreattack-python pyyaml requests
- name: Generate coverage report
run: |
python scripts/attck_coverage.py \
--rules-dir ./rules \
--output-json reports/coverage.json \
--output-layer reports/navigator_layer.json \
--output-report reports/executive_report.txt
- name: Comment on PR with coverage delta
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const report = fs.readFileSync('reports/executive_report.txt', 'utf8');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: '## ATT&CK Coverage Update\n```\n' + report + '\n```'
});
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: attck-coverage-${{ github.sha }}
path: reports/
retention-days: 90
Najlepsze praktyki w zakresie analizy zasięgu
| Praktyka | Opis | Uderzenie |
|---|---|---|
| Skoncentruj się na najczęściej stosowanych technikach | Aby ustalić priorytety, użyj raportów takich jak M-Trends, Verizon DBIR | Wysoki |
| Weź pod uwagę źródła danych | Nie możesz wykryć tego, czego nie widzisz. Najpierw przeprowadź audyt źródeł danych | Krytyk |
| Wyróżnij odpowiednie platformówki | Jeśli nie masz systemu macOS, nie traktuj priorytetowo technik opartych wyłącznie na systemie macOS | Średni |
| Śledzenie w czasie | Mierz deltę zasięgu tydzień po tygodniu | Wysoki |
| Okresowo sprawdzaj zasady | Reguła, która nie była testowana przez ponad 6 miesięcy, może już nie działać | Wysoki |
Typowe błędy w analizie zasięgu
- Licz się z zasadami, a nie z jakością: 10 eksperymentalnych zasad są warte mniej niż 2 przetestowane stabilne reguły. Użyj ważonego zasięgu_score
- Ignoruj źródła danych: jeśli nie masz wdrożonego Sysmon, wiele technik systemu Windows jest niewykrywalnych, niezależnie od reguł
- 100% pokrycia nie jest możliwe: niektóre techniki tak mają wykrycie jest bardzo trudne (życie z ziemi). Skoncentruj się na lukach wysokie ryzyko i wysoka wykrywalność
- Zgłoś bez działania: musi wygenerować każdy raport o lukach zaległości w wykrywaniu elementów wraz z właścicielem i terminem
Wnioski
Zintegruj programowo MITER ATT&CK z przepływem pracy w inżynierii wykrywania przekształca analizę zasięgu z czynności manualnej i subiektywnej w proces zautomatyzowane, obiektywne i ciągłe. Rezultatem jest program wykrywający mierzalne, uszeregowane pod kątem realnych zagrożeń i możliwe do przekazania kierownictwu z konkretnymi danymi.
Następny artykuł z serii
W kolejnym artykule zobaczymy jak zautomatyzować reakcję na alerty z Podręcznik SOAR w Pythonie, koordynując działania automatyczne powstrzymywanie, wzbogacanie i powiadamianie.







