SOAR Playbook în Python: Automatizarea răspunsului la incident
Un manual de orchestrare, automatizare și răspuns de securitate (SOAR) și mult mai mult decât un script: și o orchestrare formală, repetabilă a acțiunilor de răspuns la un incident de securitate. Când o alertă declanșează un manual, o secvență coordonată de operațiuni - îmbogățirea datelor, Analiza programelor malware, izolarea automată, notificările și documentația - se întâmplă în câteva secunde decât în ore, reducând în mod măsurabil timpul mediu de răspuns (MTTR).
Datele recente din industrie confirmă impactul: implementările bazate pe Python au avut rezultate 2,5 minute de MTTR cu o precizie de 92%., gestionând 500 de incidente/oră. Cortex XSOAR, TheHive cu Cortex și platforme precum Shuffle (open-source) vă permit să implementați manuale în Python cu acces deplin la ecosistemul de securitate.
Acest articol creează manuale SOAR complete în Python pentru trei scenarii comune: phishing, malware la punctul final și forța brută. Pentru fiecare, întregul flux este implementat: triaj, îmbogățire, izolare, escaladare și documentare automată.
Ce vei învăța
- Arhitectura unui manual SOAR modular în Python
- Îmbogățire automată: VirusTotal, Shodan, Active Directory
- Reținere automată: izolarea punctului final, blocarea IP, dezactivarea utilizatorului
- Integrare cu TheHive și Cortex pentru orchestrare
- Testarea și versiunea manualului
- Model de escaladare umană cu context pre-populat
Arhitectura unui manual SOAR modular
Un playbook bine conceput compus din blocuri funcționale independente care pot fi reutilizate între diferite manuale. Structura modulară facilitează testarea și întreținerea și evoluția în timp.
Elementele fundamentale ale oricărui manual SOAR sunt:
- Declanșatoare: ce eveniment declanșează playbook-ul (alerta de la SIEM, e-mail suspect, raport utilizator)
- Îmbogăţire: Colectare de context suplimentar (amenințări Intel, informații despre active, istoricul utilizatorilor)
- Decizie: logica de triaj automat (false pozitive? escaladare? izolare imediată?)
- Acţiune: acțiuni de răspuns (izolare, blocare, notificare)
- Documentare: actualizare bilet, jurnal de joc, colectare de dovezi
- Închidere/Escaladare: Terminare automată sau escaladare la analist uman
# Framework base per Playbook SOAR
# File: soar_framework.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
from enum import Enum
import logging
class PlaybookStatus(Enum):
RUNNING = "running"
COMPLETED = "completed"
ESCALATED = "escalated"
FAILED = "failed"
class ActionResult(Enum):
SUCCESS = "success"
FAILURE = "failure"
SKIPPED = "skipped"
ESCALATE = "escalate"
@dataclass
class PlaybookContext:
"""Contesto condiviso tra tutti gli step del playbook."""
alert_id: str
alert_type: str
alert_data: dict
start_time: datetime = field(default_factory=datetime.now)
enrichment_data: dict = field(default_factory=dict)
actions_taken: list[dict] = field(default_factory=list)
evidence: list[dict] = field(default_factory=list)
escalation_reason: Optional[str] = None
status: PlaybookStatus = PlaybookStatus.RUNNING
def add_action(self, action: str, result: ActionResult,
details: dict = None) -> None:
self.actions_taken.append({
'timestamp': datetime.now().isoformat(),
'action': action,
'result': result.value,
'details': details or {}
})
def add_evidence(self, evidence_type: str, data: Any,
source: str) -> None:
self.evidence.append({
'timestamp': datetime.now().isoformat(),
'type': evidence_type,
'data': data,
'source': source
})
def set_escalation(self, reason: str) -> None:
self.escalation_reason = reason
self.status = PlaybookStatus.ESCALATED
class PlaybookStep(ABC):
"""Classe base per ogni step del playbook."""
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(f"soar.{name}")
@abstractmethod
def execute(self, context: PlaybookContext) -> ActionResult:
"""Esegue lo step. Deve essere implementato da ogni sottoclasse."""
pass
def __str__(self) -> str:
return self.name
class Playbook:
"""Orchestratore del playbook: esegue gli step in sequenza."""
def __init__(self, name: str, steps: list[PlaybookStep]):
self.name = name
self.steps = steps
self.logger = logging.getLogger(f"soar.playbook.{name}")
def run(self, alert_id: str, alert_type: str,
alert_data: dict) -> PlaybookContext:
context = PlaybookContext(
alert_id=alert_id,
alert_type=alert_type,
alert_data=alert_data
)
self.logger.info(f"Playbook '{self.name}' avviato per alert {alert_id}")
for step in self.steps:
if context.status in [PlaybookStatus.ESCALATED, PlaybookStatus.FAILED]:
self.logger.info(f"Skip step '{step}': playbook in stato {context.status}")
break
self.logger.info(f"Esecuzione step: {step}")
try:
result = step.execute(context)
context.add_action(step.name, result)
if result == ActionResult.ESCALATE:
context.status = PlaybookStatus.ESCALATED
self.logger.warning(f"Step '{step}' richiede escalation")
break
except Exception as e:
self.logger.error(f"Step '{step}' fallito: {e}", exc_info=True)
context.add_action(step.name, ActionResult.FAILURE,
{'error': str(e)})
context.status = PlaybookStatus.FAILED
break
if context.status == PlaybookStatus.RUNNING:
context.status = PlaybookStatus.COMPLETED
self.logger.info(f"Playbook completato con status: {context.status.value}")
return context
Caietul de joc pentru phishing: de la triaj la izolare
Phishing-ul este cel mai frecvent scenariu în SOC. Un manual matur reduce timpul de răspuns de la 45 de minute (manual) la mai puțin de 2 minute (automatizat), analizând adrese URL, atașamente, verificarea antetelor de e-mail și izolarea utilizatorilor care au făcut clic.
# Playbook Phishing Completo
# File: playbooks/phishing_playbook.py
import httpx
import re
import base64
from email.parser import Parser
from email.policy import default as default_policy
class ExtractEmailArtifactsStep(PlaybookStep):
"""Estrae URL, allegati e header dall'email sospetta."""
def __init__(self):
super().__init__("extract_email_artifacts")
def execute(self, context: PlaybookContext) -> ActionResult:
email_raw = context.alert_data.get('email_raw', '')
if not email_raw:
return ActionResult.SKIPPED
# Parse email
msg = Parser(policy=default_policy).parsestr(email_raw)
# Estrai header
headers = {
'from': msg.get('From', ''),
'reply_to': msg.get('Reply-To', ''),
'x_originating_ip': msg.get('X-Originating-IP', ''),
'dkim': 'DKIM-Signature' in msg,
'spf': 'Received-SPF' in email_raw,
'dmarc': 'DMARC' in email_raw.upper()
}
# Estrai URL dal body
body = msg.get_body(preferencelist=('plain', 'html'))
body_text = body.get_content() if body else ''
urls = re.findall(r'https?://[^\s<>"]+', body_text)
# Estrai allegati
attachments = []
for part in msg.iter_attachments():
attachments.append({
'filename': part.get_filename(),
'content_type': part.get_content_type(),
'size': len(part.get_payload(decode=True) or b''),
'content_b64': base64.b64encode(
part.get_payload(decode=True) or b''
).decode()
})
context.enrichment_data['email_headers'] = headers
context.enrichment_data['urls'] = list(set(urls)) # dedup
context.enrichment_data['attachments'] = attachments
context.add_evidence('email_headers', headers, 'email_parser')
context.add_evidence('extracted_urls', urls, 'email_parser')
self.logger.info(f"Estratti {len(urls)} URL e {len(attachments)} allegati")
return ActionResult.SUCCESS
class VirusTotalEnrichmentStep(PlaybookStep):
"""Analizza URL e hash allegati con VirusTotal."""
def __init__(self, vt_api_key: str):
super().__init__("virustotal_enrichment")
self.vt_api_key = vt_api_key
self.base_url = "https://www.virustotal.com/api/v3"
self.headers = {"x-apikey": vt_api_key}
def execute(self, context: PlaybookContext) -> ActionResult:
vt_results = {}
# Analizza URL
for url in context.enrichment_data.get('urls', [])[:10]: # Max 10 URL
try:
result = self._check_url(url)
vt_results[url] = result
if result.get('malicious', 0) > 0:
context.add_evidence('malicious_url', {'url': url, 'vt': result}, 'virustotal')
except Exception as e:
self.logger.warning(f"VT check URL fallito per {url}: {e}")
context.enrichment_data['virustotal'] = vt_results
# Determina se e necessario containment immediato
malicious_count = sum(
1 for r in vt_results.values() if r.get('malicious', 0) > 2
)
if malicious_count > 0:
context.enrichment_data['vt_verdict'] = 'malicious'
self.logger.warning(f"Trovati {malicious_count} URL malevoli su VT")
else:
context.enrichment_data['vt_verdict'] = 'clean'
return ActionResult.SUCCESS
def _check_url(self, url: str) -> dict:
"""Controlla un URL su VirusTotal."""
import hashlib
url_id = base64.urlsafe_b64encode(url.encode()).decode().rstrip('=')
with httpx.Client() as client:
response = client.get(
f"{self.base_url}/urls/{url_id}",
headers=self.headers,
timeout=10
)
if response.status_code == 404:
# Submetti per analisi
post_response = client.post(
f"{self.base_url}/urls",
headers=self.headers,
data={"url": url},
timeout=10
)
return {"status": "submitted", "malicious": 0}
if response.status_code != 200:
return {"status": "error", "malicious": 0}
data = response.json()
stats = data.get('data', {}).get(
'attributes', {}
).get('last_analysis_stats', {})
return {
"malicious": stats.get('malicious', 0),
"suspicious": stats.get('suspicious', 0),
"harmless": stats.get('harmless', 0),
"undetected": stats.get('undetected', 0)
}
class TriageDecisionStep(PlaybookStep):
"""Decide l'azione basandosi sull'enrichment."""
def __init__(self):
super().__init__("triage_decision")
def execute(self, context: PlaybookContext) -> ActionResult:
vt_verdict = context.enrichment_data.get('vt_verdict', 'unknown')
headers = context.enrichment_data.get('email_headers', {})
# Score-based triage
risk_score = 0
# Fattori di rischio
if vt_verdict == 'malicious':
risk_score += 40
if not headers.get('dkim', True):
risk_score += 15
if not headers.get('spf', True):
risk_score += 15
if not headers.get('dmarc', True):
risk_score += 10
if context.enrichment_data.get('attachments'):
for att in context.enrichment_data['attachments']:
if any(ext in str(att.get('filename', '')).lower()
for ext in ['.exe', '.js', '.vbs', '.ps1', '.macro']):
risk_score += 20
context.enrichment_data['risk_score'] = risk_score
self.logger.info(f"Risk score calcolato: {risk_score}")
if risk_score >= 50:
context.enrichment_data['triage_result'] = 'high_risk'
return ActionResult.SUCCESS # Procedi con containment
elif risk_score >= 25:
context.enrichment_data['triage_result'] = 'medium_risk'
context.set_escalation(
f"Risk score {risk_score}: richiede review umana"
)
return ActionResult.ESCALATE
else:
context.enrichment_data['triage_result'] = 'low_risk'
return ActionResult.SUCCESS
class ContainPhishingStep(PlaybookStep):
"""Azioni di containment per phishing confermato."""
def __init__(self, exchange_client, ad_client):
super().__init__("contain_phishing")
self.exchange_client = exchange_client
self.ad_client = ad_client
def execute(self, context: PlaybookContext) -> ActionResult:
if context.enrichment_data.get('triage_result') != 'high_risk':
return ActionResult.SKIPPED
affected_user = context.alert_data.get('recipient_email', '')
actions = []
# 1. Rimuovi email simili da tutte le mailbox
if context.enrichment_data.get('urls'):
for url in context.enrichment_data['urls'][:3]:
result = self.exchange_client.search_and_delete(
sender=context.alert_data.get('sender_email', ''),
url_contains=url
)
actions.append({'type': 'email_deletion', 'url': url, 'result': result})
# 2. Blocca mittente in Exchange
sender = context.alert_data.get('sender_email', '')
if sender:
self.exchange_client.add_to_blocklist(sender)
actions.append({'type': 'sender_blocked', 'sender': sender})
# 3. Forza reset password se utente ha cliccato link
if context.alert_data.get('user_clicked_link', False):
self.ad_client.force_password_reset(affected_user)
self.ad_client.revoke_sessions(affected_user)
actions.append({
'type': 'password_reset',
'user': affected_user,
'reason': 'User clicked malicious link'
})
context.add_evidence('containment_actions', actions, 'soar_playbook')
self.logger.info(f"Containment completato: {len(actions)} azioni")
return ActionResult.SUCCESS
def build_phishing_playbook(vt_api_key: str,
exchange_client,
ad_client) -> Playbook:
"""Factory per il playbook phishing."""
return Playbook(
name="phishing_response",
steps=[
ExtractEmailArtifactsStep(),
VirusTotalEnrichmentStep(vt_api_key),
TriageDecisionStep(),
ContainPhishingStep(exchange_client, ad_client),
DocumentIncidentStep(), # Definito sotto
NotifyStakeholdersStep()
]
)
Malware pe Endpoint Playbook: îmbogățire și izolare
Registrul de programe malware pentru punctele finale trebuie să răspundă mai agresiv: izolarea a punctului final și critic pentru a preveni mișcarea laterală. Dar înainte de izolare, este necesar colectează cât mai multe date criminalistice.
# Playbook Malware Endpoint
# File: playbooks/malware_endpoint_playbook.py
class CollectForensicDataStep(PlaybookStep):
"""Raccoglie dati forensi dall'endpoint prima dell'isolamento."""
def __init__(self, edr_client):
super().__init__("collect_forensic_data")
self.edr = edr_client
def execute(self, context: PlaybookContext) -> ActionResult:
endpoint = context.alert_data.get('endpoint_hostname', '')
malicious_process = context.alert_data.get('process_name', '')
forensics = {}
# Processo malevolo
forensics['process_tree'] = self.edr.get_process_tree(
endpoint, malicious_process
)
# Network connections del processo
forensics['network_connections'] = self.edr.get_process_network(
endpoint, context.alert_data.get('pid')
)
# File system changes nelle ultime 2 ore
forensics['file_changes'] = self.edr.get_recent_file_changes(
endpoint, hours_back=2
)
# Autorun entries (persistence)
forensics['autoruns'] = self.edr.get_autoruns(endpoint)
# Memory dump del processo (se disponibile)
try:
forensics['memory_dump_path'] = self.edr.dump_process_memory(
endpoint, context.alert_data.get('pid')
)
except Exception as e:
self.logger.warning(f"Memory dump non disponibile: {e}")
context.enrichment_data['forensics'] = forensics
context.add_evidence('forensic_collection', forensics, 'edr')
return ActionResult.SUCCESS
class MalwareHashAnalysisStep(PlaybookStep):
"""Analizza hash del malware su multipli servizi TI."""
def __init__(self, vt_api_key: str, malware_bazaar_key: str):
super().__init__("malware_hash_analysis")
self.vt_key = vt_api_key
self.bazaar_key = malware_bazaar_key
def execute(self, context: PlaybookContext) -> ActionResult:
file_hash = context.alert_data.get('file_hash', '')
if not file_hash:
return ActionResult.SKIPPED
results = {}
# VirusTotal
with httpx.Client() as client:
vt_resp = client.get(
f"https://www.virustotal.com/api/v3/files/{file_hash}",
headers={"x-apikey": self.vt_key},
timeout=15
)
if vt_resp.status_code == 200:
vt_data = vt_resp.json()['data']['attributes']
stats = vt_data.get('last_analysis_stats', {})
results['virustotal'] = {
'malicious': stats.get('malicious', 0),
'total': sum(stats.values()),
'family': vt_data.get('popular_threat_classification', {}).get(
'suggested_threat_label', 'unknown'
)
}
# MalwareBazaar
with httpx.Client() as client:
bazaar_resp = client.post(
"https://mb-api.abuse.ch/api/v1/",
data={"query": "get_info", "hash": file_hash},
timeout=15
)
if bazaar_resp.status_code == 200:
bazaar_data = bazaar_resp.json()
if bazaar_data.get('query_status') == 'ok':
results['malware_bazaar'] = bazaar_data.get('data', [{}])[0]
context.enrichment_data['malware_analysis'] = results
# Aggiorna verdict
vt_malicious = results.get('virustotal', {}).get('malicious', 0)
if vt_malicious > 5:
context.enrichment_data['malware_verdict'] = 'confirmed_malware'
elif vt_malicious > 0:
context.enrichment_data['malware_verdict'] = 'suspicious'
else:
context.enrichment_data['malware_verdict'] = 'unknown'
return ActionResult.SUCCESS
class IsolateEndpointStep(PlaybookStep):
"""Isola l'endpoint dalla rete per prevenire lateral movement."""
def __init__(self, edr_client, cmdb_client):
super().__init__("isolate_endpoint")
self.edr = edr_client
self.cmdb = cmdb_client
def execute(self, context: PlaybookContext) -> ActionResult:
verdict = context.enrichment_data.get('malware_verdict', 'unknown')
if verdict not in ['confirmed_malware', 'suspicious']:
return ActionResult.SKIPPED
endpoint = context.alert_data.get('endpoint_hostname', '')
# Verifica criticalita dell'asset prima dell'isolamento
asset_info = self.cmdb.get_asset_info(endpoint)
if asset_info.get('criticality') == 'critical':
# Non isolare automaticamente asset critici
context.set_escalation(
f"Endpoint '{endpoint}' e critico (tipo: {asset_info.get('type')}). "
f"Isolamento richiede approvazione manuale."
)
return ActionResult.ESCALATE
# Isola l'endpoint
isolation_result = self.edr.isolate_endpoint(
endpoint,
reason=f"Malware detection - alert {context.alert_id}",
allow_edr_communication=True # Mantieni canale per remediation
)
context.add_evidence('isolation', {
'endpoint': endpoint,
'result': isolation_result,
'timestamp': datetime.now().isoformat()
}, 'edr')
self.logger.info(f"Endpoint {endpoint} isolato: {isolation_result}")
return ActionResult.SUCCESS
Creare automată a documentației și a biletelor
Un manual matur generează automat documentația incidentului, populând biletul cu toate datele de îmbogățire, acțiunile întreprinse și recomandările de remediere. Acest lucru elimină „taxele de raportare” care ocupă adesea 40% din timpul analiștilor.
# Documentazione Automatica
# File: steps/documentation_step.py
import jinja2
INCIDENT_REPORT_TEMPLATE = """
## Incident Report - {{ context.alert_id }}
**Status**: {{ context.status.value }}
**Start Time**: {{ context.start_time.strftime('%Y-%m-%d %H:%M:%S') }}
**Alert Type**: {{ context.alert_type }}
### Summary
{{ summary }}
### Risk Assessment
- **Risk Score**: {{ context.enrichment_data.get('risk_score', 'N/A') }}
- **Verdict**: {{ context.enrichment_data.get('vt_verdict', context.enrichment_data.get('malware_verdict', 'N/A')) }}
- **Triage Result**: {{ context.enrichment_data.get('triage_result', 'N/A') }}
### Threat Intelligence
{% if context.enrichment_data.get('virustotal') %}
**VirusTotal**: {{ context.enrichment_data.virustotal | tojson(indent=2) }}
{% endif %}
### Actions Taken
{% for action in context.actions_taken %}
- **{{ action.timestamp }}** - {{ action.action }}: {{ action.result }}
{% if action.details %} Details: {{ action.details }}{% endif %}
{% endfor %}
### Evidence Collected
{% for ev in context.evidence %}
- **{{ ev.type }}** (from {{ ev.source }}): {{ ev.timestamp }}
{% endfor %}
{% if context.escalation_reason %}
### Escalation Required
**Reason**: {{ context.escalation_reason }}
**Recommended Actions**:
1. Validate analyst judgment on enrichment data
2. Confirm containment or determine alternative
3. Initiate full forensic investigation if warranted
{% endif %}
### Indicators of Compromise
{% if context.enrichment_data.get('urls') %}
**Malicious URLs**:
{% for url in context.enrichment_data.urls %}
- {{ url }}
{% endfor %}
{% endif %}
"""
class DocumentIncidentStep(PlaybookStep):
"""Crea documentazione strutturata dell'incidente."""
def __init__(self, thehive_client=None):
super().__init__("document_incident")
self.thehive = thehive_client
self.template = jinja2.Template(INCIDENT_REPORT_TEMPLATE)
def execute(self, context: PlaybookContext) -> ActionResult:
# Genera summary basato sul tipo di alert
summary = self._generate_summary(context)
# Renderizza il report
report = self.template.render(
context=context,
summary=summary
)
context.enrichment_data['incident_report'] = report
# Crea/aggiorna caso in TheHive se disponibile
if self.thehive:
try:
case_id = context.alert_data.get('thehive_case_id')
if case_id:
self.thehive.update_case(
case_id,
description=report,
tags=self._extract_tags(context)
)
else:
new_case_id = self.thehive.create_case({
'title': f"[SOAR] {context.alert_type} - {context.alert_id}",
'description': report,
'severity': self._determine_severity(context),
'tags': self._extract_tags(context)
})
context.enrichment_data['thehive_case_id'] = new_case_id
except Exception as e:
self.logger.error(f"Errore creazione caso TheHive: {e}")
return ActionResult.SUCCESS
def _generate_summary(self, context: PlaybookContext) -> str:
alert_type = context.alert_type
if alert_type == 'phishing':
return (
f"Email di phishing rilevata con {len(context.enrichment_data.get('urls', []))} "
f"URL sospetti. Verdict VirusTotal: {context.enrichment_data.get('vt_verdict', 'N/A')}."
)
elif alert_type == 'malware_endpoint':
endpoint = context.alert_data.get('endpoint_hostname', 'N/A')
return (
f"Malware rilevato su {endpoint}. "
f"Verdict: {context.enrichment_data.get('malware_verdict', 'N/A')}."
)
return "Incidente di sicurezza processato automaticamente."
def _extract_tags(self, context: PlaybookContext) -> list[str]:
tags = [f"soar-auto", f"type:{context.alert_type}"]
if context.status == PlaybookStatus.ESCALATED:
tags.append("needs-human-review")
verdict = context.enrichment_data.get('vt_verdict') or \
context.enrichment_data.get('malware_verdict')
if verdict:
tags.append(f"verdict:{verdict}")
return tags
def _determine_severity(self, context: PlaybookContext) -> int:
score = context.enrichment_data.get('risk_score', 0)
if score >= 50:
return 3 # High
elif score >= 25:
return 2 # Medium
return 1 # Low
Testarea și versiunea de Playbooks
Registrele SOAR ar trebui testate cu aceeași rigoare ca și codul aplicației. Un manual de joc defect în producție poate provoca izolare excesivă (izolarea gazdei legitime) sau izolarea insuficientă (nerespectarea amenințărilor reale).
# Testing Framework per SOAR Playbook
# File: tests/test_phishing_playbook.py
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime
class MockExchangeClient:
def __init__(self):
self.blocked_senders = []
self.deleted_emails = []
def add_to_blocklist(self, sender: str) -> None:
self.blocked_senders.append(sender)
def search_and_delete(self, sender: str, url_contains: str) -> dict:
self.deleted_emails.append({"sender": sender, "url": url_contains})
return {"deleted_count": 3, "status": "success"}
class MockADClient:
def __init__(self):
self.password_resets = []
self.revoked_sessions = []
def force_password_reset(self, user: str) -> None:
self.password_resets.append(user)
def revoke_sessions(self, user: str) -> None:
self.revoked_sessions.append(user)
class MockVirusTotalClient:
def __init__(self, malicious_urls: list[str] = None):
self.malicious_urls = malicious_urls or []
def check_url(self, url: str) -> dict:
if url in self.malicious_urls:
return {"malicious": 15, "suspicious": 3, "harmless": 0}
return {"malicious": 0, "suspicious": 0, "harmless": 50}
class TestPhishingPlaybook:
def setup_method(self):
self.exchange_client = MockExchangeClient()
self.ad_client = MockADClient()
@pytest.fixture
def high_risk_phishing_alert(self) -> dict:
return {
"alert_id": "test-001",
"alert_type": "phishing",
"sender_email": "attacker@evil.com",
"recipient_email": "victim@company.com",
"user_clicked_link": True,
"email_raw": """From: attacker@evil.com
To: victim@company.com
Subject: Urgente: Aggiorna le tue credenziali
Clicca qui: http://malicious-phish.com/steal-creds
""",
"thehive_case_id": None
}
def test_phishing_high_risk_containment(self, high_risk_phishing_alert):
"""Test: phishing ad alto rischio deve triggherare containment."""
with patch('httpx.Client') as mock_http:
# Mocka VirusTotal response con URL malevolo
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"data": {
"attributes": {
"last_analysis_stats": {
"malicious": 15, "suspicious": 2,
"harmless": 0, "undetected": 5
}
}
}
}
mock_http.return_value.__enter__.return_value.get.return_value = mock_response
playbook = build_phishing_playbook(
vt_api_key="test-key",
exchange_client=self.exchange_client,
ad_client=self.ad_client
)
context = playbook.run(
alert_id=high_risk_phishing_alert['alert_id'],
alert_type=high_risk_phishing_alert['alert_type'],
alert_data=high_risk_phishing_alert
)
# Verifica stato finale
assert context.status in [PlaybookStatus.COMPLETED, PlaybookStatus.ESCALATED]
# Se completato, verifica azioni intraprese
if context.status == PlaybookStatus.COMPLETED:
action_names = [a['action'] for a in context.actions_taken]
assert 'virustotal_enrichment' in action_names
assert 'triage_decision' in action_names
def test_low_risk_no_containment(self):
"""Test: phishing a basso rischio non deve triggherare containment."""
alert = {
"alert_id": "test-002",
"alert_type": "phishing",
"sender_email": "newsletter@legit.com",
"recipient_email": "user@company.com",
"user_clicked_link": False,
"email_raw": "From: newsletter@legit.com\nSubject: News\n\nHello!"
}
playbook = build_phishing_playbook(
vt_api_key="test-key",
exchange_client=self.exchange_client,
ad_client=self.ad_client
)
with patch('httpx.Client'):
context = playbook.run("test-002", "phishing", alert)
# Nessun isolamento su basso rischio
assert len(self.exchange_client.blocked_senders) == 0
assert len(self.ad_client.password_resets) == 0
Cele mai bune practici: Dry Run Playbook
Înainte de implementarea în producție, rulați fiecare playbook în modul de funcționare uscată timp de 2 săptămâni. Funcția uscată realizează toți pașii de îmbogățire și de decizie, dar omite acțiunile de izolare real (izolare, blocări, resetare parolă), logând doar ceea ce ar fi făcut. Acest lucru vă permite să calibrați logica de triaj fără riscuri.
Concluzii și concluzii cheie
Registrele SOAR în Python reprezintă frontiera răspunsului modern la incident: se combină flexibilitatea unui limbaj de uz general cu ecosistemul integrărilor de securitate cel mai bogat disponibil. Cadrul modular asigură reutilizarea, testabilitatea și mentenabilitatea în timp.
Recomandări cheie
- Arhitectura modulară (Step + Playbook + Context) este baza oricărei întreprinderi SOAR
- Îmbogățirea cu mai multe surse (VT, EDR, CMDB, AD) este esențială pentru deciziile de triaj precise
- Escalarea automată a activelor critice previne daunele colaterale cauzate de izolarea excesivă
- Documentarea automată elimină cheltuielile generale de raportare (40% din timpul analistului)
- Testarea cu clienți simulați garantează calitatea fără a afecta sistemele de producție
- Modul de funcționare uscată este esențial pentru a calibra logica de triaj înainte de lansare
Articole înrudite
- Automatizare triaj alerte: reduceți MTTD cu analiza grafică
- Ingestia de informații despre amenințări: STIX/TAXII Feed Processor
- Ingineria detectării ca disciplină: de la script la pipeline







