SOAR Playbook v Pythonu: Automatizace odezvy na incidenty
Příručka Security Orchestration, Automation and Response (SOAR) a mnohem více než jen skript: a formální, opakovatelné uspořádání akcí reakce na bezpečnostní incident. Když výstraha spustí playbook, koordinovaný sled operací – obohacení dat, analýza malwaru, automatické zadržování, upozornění a dokumentace – místo toho proběhne během několika sekund než v hodinách, což měřitelně snižuje střední dobu odezvy (MTTR).
Nedávná průmyslová data potvrzují dopad: implementace založené na Pythonu dosáhly 2,5 minuty MTTR s přesností 92 %., řízení 500 incidentů/hod. Cortex XSOAR, TheHive with Cortex a platformy jako Shuffle (open-source) vám umožňují implementovat příručky v Pythonu s plným přístupem k bezpečnostnímu ekosystému.
Tento článek vytváří kompletní příručky SOAR v Pythonu pro tři běžné scénáře: phishing, malware koncového bodu a hrubá síla. Pro každý je implementován celý tok: třídění, obohacování, zadržování, eskalace a automatická dokumentace.
Co se naučíte
- Architektura modulární příručky SOAR v Pythonu
- Automatické obohacení: VirusTotal, Shodan, Active Directory
- Automatické omezení: izolace koncových bodů, blokování IP, deaktivace uživatele
- Integrace s TheHive a Cortex pro orchestraci
- Testování a verzování příruček
- Vzor lidské eskalace s předem vyplněným kontextem
Architektura modulární příručky SOAR
Dobře navržený playbook složený z nezávislých funkčních bloků, které mohou být znovu použit mezi různými playbooky. Modulární struktura usnadňuje testování a údržbu a vývoj v čase.
Základními stavebními kameny každé příručky SOAR jsou:
- Spouštěče: která událost spustí playbook (upozornění ze SIEM, podezřelý e-mail, hlášení uživatele)
- Obohacení: Sbírka dalšího kontextu (intelové hrozby, informace o majetku, uživatelská historie)
- Rozhodnutí: logika automatického třídění (falešné poplachy? eskalace? okamžité omezení?)
- Akce: akce reakce (izolovat, zablokovat, upozornit)
- Dokumentace: aktualizace tiketu, deník playbook, shromažďování důkazů
- Uzavření/Eskalace: Automatické ukončení nebo eskalace na lidského analytika
# Framework base per Playbook SOAR
# File: soar_framework.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
from enum import Enum
import logging
class PlaybookStatus(Enum):
RUNNING = "running"
COMPLETED = "completed"
ESCALATED = "escalated"
FAILED = "failed"
class ActionResult(Enum):
SUCCESS = "success"
FAILURE = "failure"
SKIPPED = "skipped"
ESCALATE = "escalate"
@dataclass
class PlaybookContext:
"""Contesto condiviso tra tutti gli step del playbook."""
alert_id: str
alert_type: str
alert_data: dict
start_time: datetime = field(default_factory=datetime.now)
enrichment_data: dict = field(default_factory=dict)
actions_taken: list[dict] = field(default_factory=list)
evidence: list[dict] = field(default_factory=list)
escalation_reason: Optional[str] = None
status: PlaybookStatus = PlaybookStatus.RUNNING
def add_action(self, action: str, result: ActionResult,
details: dict = None) -> None:
self.actions_taken.append({
'timestamp': datetime.now().isoformat(),
'action': action,
'result': result.value,
'details': details or {}
})
def add_evidence(self, evidence_type: str, data: Any,
source: str) -> None:
self.evidence.append({
'timestamp': datetime.now().isoformat(),
'type': evidence_type,
'data': data,
'source': source
})
def set_escalation(self, reason: str) -> None:
self.escalation_reason = reason
self.status = PlaybookStatus.ESCALATED
class PlaybookStep(ABC):
"""Classe base per ogni step del playbook."""
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(f"soar.{name}")
@abstractmethod
def execute(self, context: PlaybookContext) -> ActionResult:
"""Esegue lo step. Deve essere implementato da ogni sottoclasse."""
pass
def __str__(self) -> str:
return self.name
class Playbook:
"""Orchestratore del playbook: esegue gli step in sequenza."""
def __init__(self, name: str, steps: list[PlaybookStep]):
self.name = name
self.steps = steps
self.logger = logging.getLogger(f"soar.playbook.{name}")
def run(self, alert_id: str, alert_type: str,
alert_data: dict) -> PlaybookContext:
context = PlaybookContext(
alert_id=alert_id,
alert_type=alert_type,
alert_data=alert_data
)
self.logger.info(f"Playbook '{self.name}' avviato per alert {alert_id}")
for step in self.steps:
if context.status in [PlaybookStatus.ESCALATED, PlaybookStatus.FAILED]:
self.logger.info(f"Skip step '{step}': playbook in stato {context.status}")
break
self.logger.info(f"Esecuzione step: {step}")
try:
result = step.execute(context)
context.add_action(step.name, result)
if result == ActionResult.ESCALATE:
context.status = PlaybookStatus.ESCALATED
self.logger.warning(f"Step '{step}' richiede escalation")
break
except Exception as e:
self.logger.error(f"Step '{step}' fallito: {e}", exc_info=True)
context.add_action(step.name, ActionResult.FAILURE,
{'error': str(e)})
context.status = PlaybookStatus.FAILED
break
if context.status == PlaybookStatus.RUNNING:
context.status = PlaybookStatus.COMPLETED
self.logger.info(f"Playbook completato con status: {context.status.value}")
return context
Phishing Playbook: Od třídění k zadržování
Phishing je nejběžnějším scénářem v SOC. Vyspělá příručka snižuje dobu odezvy od 45 minut (manuálně) do méně než 2 minut (automatizované), analyzování adres URL, příloh, ověřování hlaviček e-mailů a izolování uživatelů, kteří klikli.
# Playbook Phishing Completo
# File: playbooks/phishing_playbook.py
import httpx
import re
import base64
from email.parser import Parser
from email.policy import default as default_policy
class ExtractEmailArtifactsStep(PlaybookStep):
"""Estrae URL, allegati e header dall'email sospetta."""
def __init__(self):
super().__init__("extract_email_artifacts")
def execute(self, context: PlaybookContext) -> ActionResult:
email_raw = context.alert_data.get('email_raw', '')
if not email_raw:
return ActionResult.SKIPPED
# Parse email
msg = Parser(policy=default_policy).parsestr(email_raw)
# Estrai header
headers = {
'from': msg.get('From', ''),
'reply_to': msg.get('Reply-To', ''),
'x_originating_ip': msg.get('X-Originating-IP', ''),
'dkim': 'DKIM-Signature' in msg,
'spf': 'Received-SPF' in email_raw,
'dmarc': 'DMARC' in email_raw.upper()
}
# Estrai URL dal body
body = msg.get_body(preferencelist=('plain', 'html'))
body_text = body.get_content() if body else ''
urls = re.findall(r'https?://[^\s<>"]+', body_text)
# Estrai allegati
attachments = []
for part in msg.iter_attachments():
attachments.append({
'filename': part.get_filename(),
'content_type': part.get_content_type(),
'size': len(part.get_payload(decode=True) or b''),
'content_b64': base64.b64encode(
part.get_payload(decode=True) or b''
).decode()
})
context.enrichment_data['email_headers'] = headers
context.enrichment_data['urls'] = list(set(urls)) # dedup
context.enrichment_data['attachments'] = attachments
context.add_evidence('email_headers', headers, 'email_parser')
context.add_evidence('extracted_urls', urls, 'email_parser')
self.logger.info(f"Estratti {len(urls)} URL e {len(attachments)} allegati")
return ActionResult.SUCCESS
class VirusTotalEnrichmentStep(PlaybookStep):
"""Analizza URL e hash allegati con VirusTotal."""
def __init__(self, vt_api_key: str):
super().__init__("virustotal_enrichment")
self.vt_api_key = vt_api_key
self.base_url = "https://www.virustotal.com/api/v3"
self.headers = {"x-apikey": vt_api_key}
def execute(self, context: PlaybookContext) -> ActionResult:
vt_results = {}
# Analizza URL
for url in context.enrichment_data.get('urls', [])[:10]: # Max 10 URL
try:
result = self._check_url(url)
vt_results[url] = result
if result.get('malicious', 0) > 0:
context.add_evidence('malicious_url', {'url': url, 'vt': result}, 'virustotal')
except Exception as e:
self.logger.warning(f"VT check URL fallito per {url}: {e}")
context.enrichment_data['virustotal'] = vt_results
# Determina se e necessario containment immediato
malicious_count = sum(
1 for r in vt_results.values() if r.get('malicious', 0) > 2
)
if malicious_count > 0:
context.enrichment_data['vt_verdict'] = 'malicious'
self.logger.warning(f"Trovati {malicious_count} URL malevoli su VT")
else:
context.enrichment_data['vt_verdict'] = 'clean'
return ActionResult.SUCCESS
def _check_url(self, url: str) -> dict:
"""Controlla un URL su VirusTotal."""
import hashlib
url_id = base64.urlsafe_b64encode(url.encode()).decode().rstrip('=')
with httpx.Client() as client:
response = client.get(
f"{self.base_url}/urls/{url_id}",
headers=self.headers,
timeout=10
)
if response.status_code == 404:
# Submetti per analisi
post_response = client.post(
f"{self.base_url}/urls",
headers=self.headers,
data={"url": url},
timeout=10
)
return {"status": "submitted", "malicious": 0}
if response.status_code != 200:
return {"status": "error", "malicious": 0}
data = response.json()
stats = data.get('data', {}).get(
'attributes', {}
).get('last_analysis_stats', {})
return {
"malicious": stats.get('malicious', 0),
"suspicious": stats.get('suspicious', 0),
"harmless": stats.get('harmless', 0),
"undetected": stats.get('undetected', 0)
}
class TriageDecisionStep(PlaybookStep):
"""Decide l'azione basandosi sull'enrichment."""
def __init__(self):
super().__init__("triage_decision")
def execute(self, context: PlaybookContext) -> ActionResult:
vt_verdict = context.enrichment_data.get('vt_verdict', 'unknown')
headers = context.enrichment_data.get('email_headers', {})
# Score-based triage
risk_score = 0
# Fattori di rischio
if vt_verdict == 'malicious':
risk_score += 40
if not headers.get('dkim', True):
risk_score += 15
if not headers.get('spf', True):
risk_score += 15
if not headers.get('dmarc', True):
risk_score += 10
if context.enrichment_data.get('attachments'):
for att in context.enrichment_data['attachments']:
if any(ext in str(att.get('filename', '')).lower()
for ext in ['.exe', '.js', '.vbs', '.ps1', '.macro']):
risk_score += 20
context.enrichment_data['risk_score'] = risk_score
self.logger.info(f"Risk score calcolato: {risk_score}")
if risk_score >= 50:
context.enrichment_data['triage_result'] = 'high_risk'
return ActionResult.SUCCESS # Procedi con containment
elif risk_score >= 25:
context.enrichment_data['triage_result'] = 'medium_risk'
context.set_escalation(
f"Risk score {risk_score}: richiede review umana"
)
return ActionResult.ESCALATE
else:
context.enrichment_data['triage_result'] = 'low_risk'
return ActionResult.SUCCESS
class ContainPhishingStep(PlaybookStep):
"""Azioni di containment per phishing confermato."""
def __init__(self, exchange_client, ad_client):
super().__init__("contain_phishing")
self.exchange_client = exchange_client
self.ad_client = ad_client
def execute(self, context: PlaybookContext) -> ActionResult:
if context.enrichment_data.get('triage_result') != 'high_risk':
return ActionResult.SKIPPED
affected_user = context.alert_data.get('recipient_email', '')
actions = []
# 1. Rimuovi email simili da tutte le mailbox
if context.enrichment_data.get('urls'):
for url in context.enrichment_data['urls'][:3]:
result = self.exchange_client.search_and_delete(
sender=context.alert_data.get('sender_email', ''),
url_contains=url
)
actions.append({'type': 'email_deletion', 'url': url, 'result': result})
# 2. Blocca mittente in Exchange
sender = context.alert_data.get('sender_email', '')
if sender:
self.exchange_client.add_to_blocklist(sender)
actions.append({'type': 'sender_blocked', 'sender': sender})
# 3. Forza reset password se utente ha cliccato link
if context.alert_data.get('user_clicked_link', False):
self.ad_client.force_password_reset(affected_user)
self.ad_client.revoke_sessions(affected_user)
actions.append({
'type': 'password_reset',
'user': affected_user,
'reason': 'User clicked malicious link'
})
context.add_evidence('containment_actions', actions, 'soar_playbook')
self.logger.info(f"Containment completato: {len(actions)} azioni")
return ActionResult.SUCCESS
def build_phishing_playbook(vt_api_key: str,
exchange_client,
ad_client) -> Playbook:
"""Factory per il playbook phishing."""
return Playbook(
name="phishing_response",
steps=[
ExtractEmailArtifactsStep(),
VirusTotalEnrichmentStep(vt_api_key),
TriageDecisionStep(),
ContainPhishingStep(exchange_client, ad_client),
DocumentIncidentStep(), # Definito sotto
NotifyStakeholdersStep()
]
)
Malware on Endpoint Playbook: Enrichment and Isolation
Příručka malwaru koncového bodu musí reagovat agresivněji: izolace koncového bodu a kritické, aby se zabránilo bočnímu pohybu. Ale před izolací je to nutné shromáždit co nejvíce forenzních dat.
# Playbook Malware Endpoint
# File: playbooks/malware_endpoint_playbook.py
class CollectForensicDataStep(PlaybookStep):
"""Raccoglie dati forensi dall'endpoint prima dell'isolamento."""
def __init__(self, edr_client):
super().__init__("collect_forensic_data")
self.edr = edr_client
def execute(self, context: PlaybookContext) -> ActionResult:
endpoint = context.alert_data.get('endpoint_hostname', '')
malicious_process = context.alert_data.get('process_name', '')
forensics = {}
# Processo malevolo
forensics['process_tree'] = self.edr.get_process_tree(
endpoint, malicious_process
)
# Network connections del processo
forensics['network_connections'] = self.edr.get_process_network(
endpoint, context.alert_data.get('pid')
)
# File system changes nelle ultime 2 ore
forensics['file_changes'] = self.edr.get_recent_file_changes(
endpoint, hours_back=2
)
# Autorun entries (persistence)
forensics['autoruns'] = self.edr.get_autoruns(endpoint)
# Memory dump del processo (se disponibile)
try:
forensics['memory_dump_path'] = self.edr.dump_process_memory(
endpoint, context.alert_data.get('pid')
)
except Exception as e:
self.logger.warning(f"Memory dump non disponibile: {e}")
context.enrichment_data['forensics'] = forensics
context.add_evidence('forensic_collection', forensics, 'edr')
return ActionResult.SUCCESS
class MalwareHashAnalysisStep(PlaybookStep):
"""Analizza hash del malware su multipli servizi TI."""
def __init__(self, vt_api_key: str, malware_bazaar_key: str):
super().__init__("malware_hash_analysis")
self.vt_key = vt_api_key
self.bazaar_key = malware_bazaar_key
def execute(self, context: PlaybookContext) -> ActionResult:
file_hash = context.alert_data.get('file_hash', '')
if not file_hash:
return ActionResult.SKIPPED
results = {}
# VirusTotal
with httpx.Client() as client:
vt_resp = client.get(
f"https://www.virustotal.com/api/v3/files/{file_hash}",
headers={"x-apikey": self.vt_key},
timeout=15
)
if vt_resp.status_code == 200:
vt_data = vt_resp.json()['data']['attributes']
stats = vt_data.get('last_analysis_stats', {})
results['virustotal'] = {
'malicious': stats.get('malicious', 0),
'total': sum(stats.values()),
'family': vt_data.get('popular_threat_classification', {}).get(
'suggested_threat_label', 'unknown'
)
}
# MalwareBazaar
with httpx.Client() as client:
bazaar_resp = client.post(
"https://mb-api.abuse.ch/api/v1/",
data={"query": "get_info", "hash": file_hash},
timeout=15
)
if bazaar_resp.status_code == 200:
bazaar_data = bazaar_resp.json()
if bazaar_data.get('query_status') == 'ok':
results['malware_bazaar'] = bazaar_data.get('data', [{}])[0]
context.enrichment_data['malware_analysis'] = results
# Aggiorna verdict
vt_malicious = results.get('virustotal', {}).get('malicious', 0)
if vt_malicious > 5:
context.enrichment_data['malware_verdict'] = 'confirmed_malware'
elif vt_malicious > 0:
context.enrichment_data['malware_verdict'] = 'suspicious'
else:
context.enrichment_data['malware_verdict'] = 'unknown'
return ActionResult.SUCCESS
class IsolateEndpointStep(PlaybookStep):
"""Isola l'endpoint dalla rete per prevenire lateral movement."""
def __init__(self, edr_client, cmdb_client):
super().__init__("isolate_endpoint")
self.edr = edr_client
self.cmdb = cmdb_client
def execute(self, context: PlaybookContext) -> ActionResult:
verdict = context.enrichment_data.get('malware_verdict', 'unknown')
if verdict not in ['confirmed_malware', 'suspicious']:
return ActionResult.SKIPPED
endpoint = context.alert_data.get('endpoint_hostname', '')
# Verifica criticalita dell'asset prima dell'isolamento
asset_info = self.cmdb.get_asset_info(endpoint)
if asset_info.get('criticality') == 'critical':
# Non isolare automaticamente asset critici
context.set_escalation(
f"Endpoint '{endpoint}' e critico (tipo: {asset_info.get('type')}). "
f"Isolamento richiede approvazione manuale."
)
return ActionResult.ESCALATE
# Isola l'endpoint
isolation_result = self.edr.isolate_endpoint(
endpoint,
reason=f"Malware detection - alert {context.alert_id}",
allow_edr_communication=True # Mantieni canale per remediation
)
context.add_evidence('isolation', {
'endpoint': endpoint,
'result': isolation_result,
'timestamp': datetime.now().isoformat()
}, 'edr')
self.logger.info(f"Endpoint {endpoint} isolato: {isolation_result}")
return ActionResult.SUCCESS
Automatická dokumentace a vytváření lístků
Vyspělý playbook automaticky generuje dokumentaci incidentu a vyplní tiket se všemi údaji o obohacení, přijatými opatřeními a doporučeními k nápravě. To eliminuje „režii vykazování“, která často zabírá 40 % času analytiků.
# Documentazione Automatica
# File: steps/documentation_step.py
import jinja2
INCIDENT_REPORT_TEMPLATE = """
## Incident Report - {{ context.alert_id }}
**Status**: {{ context.status.value }}
**Start Time**: {{ context.start_time.strftime('%Y-%m-%d %H:%M:%S') }}
**Alert Type**: {{ context.alert_type }}
### Summary
{{ summary }}
### Risk Assessment
- **Risk Score**: {{ context.enrichment_data.get('risk_score', 'N/A') }}
- **Verdict**: {{ context.enrichment_data.get('vt_verdict', context.enrichment_data.get('malware_verdict', 'N/A')) }}
- **Triage Result**: {{ context.enrichment_data.get('triage_result', 'N/A') }}
### Threat Intelligence
{% if context.enrichment_data.get('virustotal') %}
**VirusTotal**: {{ context.enrichment_data.virustotal | tojson(indent=2) }}
{% endif %}
### Actions Taken
{% for action in context.actions_taken %}
- **{{ action.timestamp }}** - {{ action.action }}: {{ action.result }}
{% if action.details %} Details: {{ action.details }}{% endif %}
{% endfor %}
### Evidence Collected
{% for ev in context.evidence %}
- **{{ ev.type }}** (from {{ ev.source }}): {{ ev.timestamp }}
{% endfor %}
{% if context.escalation_reason %}
### Escalation Required
**Reason**: {{ context.escalation_reason }}
**Recommended Actions**:
1. Validate analyst judgment on enrichment data
2. Confirm containment or determine alternative
3. Initiate full forensic investigation if warranted
{% endif %}
### Indicators of Compromise
{% if context.enrichment_data.get('urls') %}
**Malicious URLs**:
{% for url in context.enrichment_data.urls %}
- {{ url }}
{% endfor %}
{% endif %}
"""
class DocumentIncidentStep(PlaybookStep):
"""Crea documentazione strutturata dell'incidente."""
def __init__(self, thehive_client=None):
super().__init__("document_incident")
self.thehive = thehive_client
self.template = jinja2.Template(INCIDENT_REPORT_TEMPLATE)
def execute(self, context: PlaybookContext) -> ActionResult:
# Genera summary basato sul tipo di alert
summary = self._generate_summary(context)
# Renderizza il report
report = self.template.render(
context=context,
summary=summary
)
context.enrichment_data['incident_report'] = report
# Crea/aggiorna caso in TheHive se disponibile
if self.thehive:
try:
case_id = context.alert_data.get('thehive_case_id')
if case_id:
self.thehive.update_case(
case_id,
description=report,
tags=self._extract_tags(context)
)
else:
new_case_id = self.thehive.create_case({
'title': f"[SOAR] {context.alert_type} - {context.alert_id}",
'description': report,
'severity': self._determine_severity(context),
'tags': self._extract_tags(context)
})
context.enrichment_data['thehive_case_id'] = new_case_id
except Exception as e:
self.logger.error(f"Errore creazione caso TheHive: {e}")
return ActionResult.SUCCESS
def _generate_summary(self, context: PlaybookContext) -> str:
alert_type = context.alert_type
if alert_type == 'phishing':
return (
f"Email di phishing rilevata con {len(context.enrichment_data.get('urls', []))} "
f"URL sospetti. Verdict VirusTotal: {context.enrichment_data.get('vt_verdict', 'N/A')}."
)
elif alert_type == 'malware_endpoint':
endpoint = context.alert_data.get('endpoint_hostname', 'N/A')
return (
f"Malware rilevato su {endpoint}. "
f"Verdict: {context.enrichment_data.get('malware_verdict', 'N/A')}."
)
return "Incidente di sicurezza processato automaticamente."
def _extract_tags(self, context: PlaybookContext) -> list[str]:
tags = [f"soar-auto", f"type:{context.alert_type}"]
if context.status == PlaybookStatus.ESCALATED:
tags.append("needs-human-review")
verdict = context.enrichment_data.get('vt_verdict') or \
context.enrichment_data.get('malware_verdict')
if verdict:
tags.append(f"verdict:{verdict}")
return tags
def _determine_severity(self, context: PlaybookContext) -> int:
score = context.enrichment_data.get('risk_score', 0)
if score >= 50:
return 3 # High
elif score >= 25:
return 2 # Medium
return 1 # Low
Testování a vytváření verzí příruček
Playbooky SOAR by měly být testovány se stejnou přísností jako kód aplikace. Vadný playbook ve výrobě může způsobit nadměrné zadržování (izolace hostitele legitimní) nebo nedostatečná kontejnmentace (nečinnost proti skutečným hrozbám).
# Testing Framework per SOAR Playbook
# File: tests/test_phishing_playbook.py
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime
class MockExchangeClient:
def __init__(self):
self.blocked_senders = []
self.deleted_emails = []
def add_to_blocklist(self, sender: str) -> None:
self.blocked_senders.append(sender)
def search_and_delete(self, sender: str, url_contains: str) -> dict:
self.deleted_emails.append({"sender": sender, "url": url_contains})
return {"deleted_count": 3, "status": "success"}
class MockADClient:
def __init__(self):
self.password_resets = []
self.revoked_sessions = []
def force_password_reset(self, user: str) -> None:
self.password_resets.append(user)
def revoke_sessions(self, user: str) -> None:
self.revoked_sessions.append(user)
class MockVirusTotalClient:
def __init__(self, malicious_urls: list[str] = None):
self.malicious_urls = malicious_urls or []
def check_url(self, url: str) -> dict:
if url in self.malicious_urls:
return {"malicious": 15, "suspicious": 3, "harmless": 0}
return {"malicious": 0, "suspicious": 0, "harmless": 50}
class TestPhishingPlaybook:
def setup_method(self):
self.exchange_client = MockExchangeClient()
self.ad_client = MockADClient()
@pytest.fixture
def high_risk_phishing_alert(self) -> dict:
return {
"alert_id": "test-001",
"alert_type": "phishing",
"sender_email": "attacker@evil.com",
"recipient_email": "victim@company.com",
"user_clicked_link": True,
"email_raw": """From: attacker@evil.com
To: victim@company.com
Subject: Urgente: Aggiorna le tue credenziali
Clicca qui: http://malicious-phish.com/steal-creds
""",
"thehive_case_id": None
}
def test_phishing_high_risk_containment(self, high_risk_phishing_alert):
"""Test: phishing ad alto rischio deve triggherare containment."""
with patch('httpx.Client') as mock_http:
# Mocka VirusTotal response con URL malevolo
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"data": {
"attributes": {
"last_analysis_stats": {
"malicious": 15, "suspicious": 2,
"harmless": 0, "undetected": 5
}
}
}
}
mock_http.return_value.__enter__.return_value.get.return_value = mock_response
playbook = build_phishing_playbook(
vt_api_key="test-key",
exchange_client=self.exchange_client,
ad_client=self.ad_client
)
context = playbook.run(
alert_id=high_risk_phishing_alert['alert_id'],
alert_type=high_risk_phishing_alert['alert_type'],
alert_data=high_risk_phishing_alert
)
# Verifica stato finale
assert context.status in [PlaybookStatus.COMPLETED, PlaybookStatus.ESCALATED]
# Se completato, verifica azioni intraprese
if context.status == PlaybookStatus.COMPLETED:
action_names = [a['action'] for a in context.actions_taken]
assert 'virustotal_enrichment' in action_names
assert 'triage_decision' in action_names
def test_low_risk_no_containment(self):
"""Test: phishing a basso rischio non deve triggherare containment."""
alert = {
"alert_id": "test-002",
"alert_type": "phishing",
"sender_email": "newsletter@legit.com",
"recipient_email": "user@company.com",
"user_clicked_link": False,
"email_raw": "From: newsletter@legit.com\nSubject: News\n\nHello!"
}
playbook = build_phishing_playbook(
vt_api_key="test-key",
exchange_client=self.exchange_client,
ad_client=self.ad_client
)
with patch('httpx.Client'):
context = playbook.run("test-002", "phishing", alert)
# Nessun isolamento su basso rischio
assert len(self.exchange_client.blocked_senders) == 0
assert len(self.ad_client.password_resets) == 0
Osvědčený postup: Příručka pro běh nasucho
Před nasazením do produkce spusťte každou příručku v režimu suchého provozu po dobu 2 týdnů. Suchá fáze provede všechny kroky obohacení a rozhodnutí, ale vynechá akce uzavření skutečné (izolace, bloky, reset hesla), protokoluje pouze to, co by udělal. To vám umožňuje kalibrovat logiku třídění bez rizika.
Závěry a klíčové poznatky
Příručky SOAR v Pythonu představují hranici moderní reakce na incidenty: kombinují se flexibilita univerzálního jazyka s ekosystémem bezpečnostních integrací nejbohatší dostupné. Modulární rámec zajišťuje opětovnou použitelnost, testovatelnost a udržovatelnost v průběhu času.
Klíčové věci
- Modulární architektura (Step + Playbook + Context) je základem každého SOAR podniku
- Obohacení z více zdrojů (VT, EDR, CMDB, AD) je rozhodující pro přesné rozhodování o třídění
- Automatická eskalace pro kritická aktiva zabraňuje vedlejším škodám z nadměrného zadržování
- Automatická dokumentace eliminuje režii vykazování (40 % času analytika)
- Testování s falešnými klienty zaručuje kvalitu bez dopadu na výrobní systémy
- Režim chodu nasucho je nezbytný pro kalibraci logiky třídění před uvedením do provozu
Související články
- Alert Triage Automation: Snižte MTTD pomocí grafové analýzy
- Inteligence hrozeb Požití: Procesor podávání STIX/TAXII
- Detekční inženýrství jako disciplína: Od skriptu k potrubí







