Python'da SOAR Başucu Kitabı: Olay Müdahale Otomasyonu
Bir Güvenlik Düzenleme, Otomasyon ve Yanıt (SOAR) başucu kitabı ve bir komut dosyasından çok daha fazlası: ve bir güvenlik olayına müdahale eylemlerinin resmi, tekrarlanabilir bir şekilde düzenlenmesi. Bir uyarı bir taktik kitabını tetiklediğinde, koordineli bir işlem dizisi (veri zenginleştirme, Kötü amaçlı yazılım analizi, otomatik kontrol altına alma, bildirimler ve belgeler - bunun yerine saniyeler içinde gerçekleşir Saatlerden daha fazla, Ortalama Yanıt Süresini (MTTR) ölçülebilir şekilde azaltır.
Son sektör verileri etkiyi doğruluyor: Python tabanlı uygulamalar başarıya ulaştı %92 doğrulukla 2,5 dakikalık MTTR, saatte 500 olayı yönetiyoruz. Cortex XSOAR, TheHive with Cortex ve Shuffle (açık kaynak) gibi platformlar şunları yapmanızı sağlar: Güvenlik ekosistemine tam erişimle Python'da oyun kitapları uygulayın.
Bu makale Python'da üç yaygın senaryo için eksiksiz SOAR taktik kitapları oluşturur: kimlik avı, uç nokta kötü amaçlı yazılımları ve kaba kuvvet. Her biri için akışın tamamı uygulanır: önceliklendirme, zenginleştirme, kontrol altına alma, yükseltme ve otomatik belgeleme.
Ne Öğreneceksiniz
- Python'da modüler bir SOAR oyun kitabının mimarisi
- Otomatik zenginleştirme: VirusTotal, Shodan, Active Directory
- Otomatik sınırlama: uç nokta izolasyonu, IP engelleme, kullanıcıyı devre dışı bırakma
- Düzenleme için TheHive ve Cortex ile entegrasyon
- Başucu Kitabı testi ve sürüm oluşturma
- Önceden doldurulmuş bağlamla insani yükseltme modeli
Modüler SOAR Başucu Kitabının Mimarisi
Bağımsız işlevsel bloklardan oluşan, iyi tasarlanmış bir oyun kitabı farklı oyun kitapları arasında yeniden kullanılır. Modüler yapı test ve bakımı kolaylaştırır ve zaman içindeki evrim.
Herhangi bir SOAR taktik kitabının temel yapı taşları şunlardır:
- Tetikleyiciler: başucu kitabını hangi olay tetikler (SIEM'den uyarı, şüpheli e-posta, kullanıcı raporu)
- Zenginleştirme: Ek bağlamın toplanması (istihbarat tehditleri, varlık bilgileri, kullanıcı geçmişi)
- Karar: Otomatik önceliklendirme mantığı (yanlış pozitifler? Yükseltme? Acil kontrol altına alma?)
- Aksiyon: yanıt eylemleri (izole etme, engelleme, bildirme)
- Dokümantasyon: bilet güncellemesi, başucu kitabı günlüğü, kanıt toplama
- Kapatma/İlerletme: Otomatik sonlandırma veya insan analistine iletme
# Framework base per Playbook SOAR
# File: soar_framework.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
from enum import Enum
import logging
class PlaybookStatus(Enum):
RUNNING = "running"
COMPLETED = "completed"
ESCALATED = "escalated"
FAILED = "failed"
class ActionResult(Enum):
SUCCESS = "success"
FAILURE = "failure"
SKIPPED = "skipped"
ESCALATE = "escalate"
@dataclass
class PlaybookContext:
"""Contesto condiviso tra tutti gli step del playbook."""
alert_id: str
alert_type: str
alert_data: dict
start_time: datetime = field(default_factory=datetime.now)
enrichment_data: dict = field(default_factory=dict)
actions_taken: list[dict] = field(default_factory=list)
evidence: list[dict] = field(default_factory=list)
escalation_reason: Optional[str] = None
status: PlaybookStatus = PlaybookStatus.RUNNING
def add_action(self, action: str, result: ActionResult,
details: dict = None) -> None:
self.actions_taken.append({
'timestamp': datetime.now().isoformat(),
'action': action,
'result': result.value,
'details': details or {}
})
def add_evidence(self, evidence_type: str, data: Any,
source: str) -> None:
self.evidence.append({
'timestamp': datetime.now().isoformat(),
'type': evidence_type,
'data': data,
'source': source
})
def set_escalation(self, reason: str) -> None:
self.escalation_reason = reason
self.status = PlaybookStatus.ESCALATED
class PlaybookStep(ABC):
"""Classe base per ogni step del playbook."""
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(f"soar.{name}")
@abstractmethod
def execute(self, context: PlaybookContext) -> ActionResult:
"""Esegue lo step. Deve essere implementato da ogni sottoclasse."""
pass
def __str__(self) -> str:
return self.name
class Playbook:
"""Orchestratore del playbook: esegue gli step in sequenza."""
def __init__(self, name: str, steps: list[PlaybookStep]):
self.name = name
self.steps = steps
self.logger = logging.getLogger(f"soar.playbook.{name}")
def run(self, alert_id: str, alert_type: str,
alert_data: dict) -> PlaybookContext:
context = PlaybookContext(
alert_id=alert_id,
alert_type=alert_type,
alert_data=alert_data
)
self.logger.info(f"Playbook '{self.name}' avviato per alert {alert_id}")
for step in self.steps:
if context.status in [PlaybookStatus.ESCALATED, PlaybookStatus.FAILED]:
self.logger.info(f"Skip step '{step}': playbook in stato {context.status}")
break
self.logger.info(f"Esecuzione step: {step}")
try:
result = step.execute(context)
context.add_action(step.name, result)
if result == ActionResult.ESCALATE:
context.status = PlaybookStatus.ESCALATED
self.logger.warning(f"Step '{step}' richiede escalation")
break
except Exception as e:
self.logger.error(f"Step '{step}' fallito: {e}", exc_info=True)
context.add_action(step.name, ActionResult.FAILURE,
{'error': str(e)})
context.status = PlaybookStatus.FAILED
break
if context.status == PlaybookStatus.RUNNING:
context.status = PlaybookStatus.COMPLETED
self.logger.info(f"Playbook completato con status: {context.status.value}")
return context
Kimlik Avı Başucu Kitabı: Triyajdan Sınırlamaya
Kimlik avı, SOC'lerde en yaygın senaryodur. Olgun bir taktik kitabı tepki süresini kısaltır 45 dakikadan (manuel) 2 dakikadan kısa bir süreye (otomatik) kadar, URL'leri, ekleri analiz etme, e-posta başlıklarını doğrulamak ve tıklayan kullanıcıları izole etmek.
# Playbook Phishing Completo
# File: playbooks/phishing_playbook.py
import httpx
import re
import base64
from email.parser import Parser
from email.policy import default as default_policy
class ExtractEmailArtifactsStep(PlaybookStep):
"""Estrae URL, allegati e header dall'email sospetta."""
def __init__(self):
super().__init__("extract_email_artifacts")
def execute(self, context: PlaybookContext) -> ActionResult:
email_raw = context.alert_data.get('email_raw', '')
if not email_raw:
return ActionResult.SKIPPED
# Parse email
msg = Parser(policy=default_policy).parsestr(email_raw)
# Estrai header
headers = {
'from': msg.get('From', ''),
'reply_to': msg.get('Reply-To', ''),
'x_originating_ip': msg.get('X-Originating-IP', ''),
'dkim': 'DKIM-Signature' in msg,
'spf': 'Received-SPF' in email_raw,
'dmarc': 'DMARC' in email_raw.upper()
}
# Estrai URL dal body
body = msg.get_body(preferencelist=('plain', 'html'))
body_text = body.get_content() if body else ''
urls = re.findall(r'https?://[^\s<>"]+', body_text)
# Estrai allegati
attachments = []
for part in msg.iter_attachments():
attachments.append({
'filename': part.get_filename(),
'content_type': part.get_content_type(),
'size': len(part.get_payload(decode=True) or b''),
'content_b64': base64.b64encode(
part.get_payload(decode=True) or b''
).decode()
})
context.enrichment_data['email_headers'] = headers
context.enrichment_data['urls'] = list(set(urls)) # dedup
context.enrichment_data['attachments'] = attachments
context.add_evidence('email_headers', headers, 'email_parser')
context.add_evidence('extracted_urls', urls, 'email_parser')
self.logger.info(f"Estratti {len(urls)} URL e {len(attachments)} allegati")
return ActionResult.SUCCESS
class VirusTotalEnrichmentStep(PlaybookStep):
"""Analizza URL e hash allegati con VirusTotal."""
def __init__(self, vt_api_key: str):
super().__init__("virustotal_enrichment")
self.vt_api_key = vt_api_key
self.base_url = "https://www.virustotal.com/api/v3"
self.headers = {"x-apikey": vt_api_key}
def execute(self, context: PlaybookContext) -> ActionResult:
vt_results = {}
# Analizza URL
for url in context.enrichment_data.get('urls', [])[:10]: # Max 10 URL
try:
result = self._check_url(url)
vt_results[url] = result
if result.get('malicious', 0) > 0:
context.add_evidence('malicious_url', {'url': url, 'vt': result}, 'virustotal')
except Exception as e:
self.logger.warning(f"VT check URL fallito per {url}: {e}")
context.enrichment_data['virustotal'] = vt_results
# Determina se e necessario containment immediato
malicious_count = sum(
1 for r in vt_results.values() if r.get('malicious', 0) > 2
)
if malicious_count > 0:
context.enrichment_data['vt_verdict'] = 'malicious'
self.logger.warning(f"Trovati {malicious_count} URL malevoli su VT")
else:
context.enrichment_data['vt_verdict'] = 'clean'
return ActionResult.SUCCESS
def _check_url(self, url: str) -> dict:
"""Controlla un URL su VirusTotal."""
import hashlib
url_id = base64.urlsafe_b64encode(url.encode()).decode().rstrip('=')
with httpx.Client() as client:
response = client.get(
f"{self.base_url}/urls/{url_id}",
headers=self.headers,
timeout=10
)
if response.status_code == 404:
# Submetti per analisi
post_response = client.post(
f"{self.base_url}/urls",
headers=self.headers,
data={"url": url},
timeout=10
)
return {"status": "submitted", "malicious": 0}
if response.status_code != 200:
return {"status": "error", "malicious": 0}
data = response.json()
stats = data.get('data', {}).get(
'attributes', {}
).get('last_analysis_stats', {})
return {
"malicious": stats.get('malicious', 0),
"suspicious": stats.get('suspicious', 0),
"harmless": stats.get('harmless', 0),
"undetected": stats.get('undetected', 0)
}
class TriageDecisionStep(PlaybookStep):
"""Decide l'azione basandosi sull'enrichment."""
def __init__(self):
super().__init__("triage_decision")
def execute(self, context: PlaybookContext) -> ActionResult:
vt_verdict = context.enrichment_data.get('vt_verdict', 'unknown')
headers = context.enrichment_data.get('email_headers', {})
# Score-based triage
risk_score = 0
# Fattori di rischio
if vt_verdict == 'malicious':
risk_score += 40
if not headers.get('dkim', True):
risk_score += 15
if not headers.get('spf', True):
risk_score += 15
if not headers.get('dmarc', True):
risk_score += 10
if context.enrichment_data.get('attachments'):
for att in context.enrichment_data['attachments']:
if any(ext in str(att.get('filename', '')).lower()
for ext in ['.exe', '.js', '.vbs', '.ps1', '.macro']):
risk_score += 20
context.enrichment_data['risk_score'] = risk_score
self.logger.info(f"Risk score calcolato: {risk_score}")
if risk_score >= 50:
context.enrichment_data['triage_result'] = 'high_risk'
return ActionResult.SUCCESS # Procedi con containment
elif risk_score >= 25:
context.enrichment_data['triage_result'] = 'medium_risk'
context.set_escalation(
f"Risk score {risk_score}: richiede review umana"
)
return ActionResult.ESCALATE
else:
context.enrichment_data['triage_result'] = 'low_risk'
return ActionResult.SUCCESS
class ContainPhishingStep(PlaybookStep):
"""Azioni di containment per phishing confermato."""
def __init__(self, exchange_client, ad_client):
super().__init__("contain_phishing")
self.exchange_client = exchange_client
self.ad_client = ad_client
def execute(self, context: PlaybookContext) -> ActionResult:
if context.enrichment_data.get('triage_result') != 'high_risk':
return ActionResult.SKIPPED
affected_user = context.alert_data.get('recipient_email', '')
actions = []
# 1. Rimuovi email simili da tutte le mailbox
if context.enrichment_data.get('urls'):
for url in context.enrichment_data['urls'][:3]:
result = self.exchange_client.search_and_delete(
sender=context.alert_data.get('sender_email', ''),
url_contains=url
)
actions.append({'type': 'email_deletion', 'url': url, 'result': result})
# 2. Blocca mittente in Exchange
sender = context.alert_data.get('sender_email', '')
if sender:
self.exchange_client.add_to_blocklist(sender)
actions.append({'type': 'sender_blocked', 'sender': sender})
# 3. Forza reset password se utente ha cliccato link
if context.alert_data.get('user_clicked_link', False):
self.ad_client.force_password_reset(affected_user)
self.ad_client.revoke_sessions(affected_user)
actions.append({
'type': 'password_reset',
'user': affected_user,
'reason': 'User clicked malicious link'
})
context.add_evidence('containment_actions', actions, 'soar_playbook')
self.logger.info(f"Containment completato: {len(actions)} azioni")
return ActionResult.SUCCESS
def build_phishing_playbook(vt_api_key: str,
exchange_client,
ad_client) -> Playbook:
"""Factory per il playbook phishing."""
return Playbook(
name="phishing_response",
steps=[
ExtractEmailArtifactsStep(),
VirusTotalEnrichmentStep(vt_api_key),
TriageDecisionStep(),
ContainPhishingStep(exchange_client, ad_client),
DocumentIncidentStep(), # Definito sotto
NotifyStakeholdersStep()
]
)
Uç Nokta Başucu Kitabında Kötü Amaçlı Yazılım: Zenginleştirme ve Yalıtım
Uç nokta kötü amaçlı yazılım taktik kitabının daha agresif bir şekilde yanıt vermesi gerekiyor: izolasyon uç noktanın ve yanal hareketi önlemek için kritik öneme sahiptir. Ancak izolasyondan önce gerekli Mümkün olduğu kadar çok adli veri toplayın.
# Playbook Malware Endpoint
# File: playbooks/malware_endpoint_playbook.py
class CollectForensicDataStep(PlaybookStep):
"""Raccoglie dati forensi dall'endpoint prima dell'isolamento."""
def __init__(self, edr_client):
super().__init__("collect_forensic_data")
self.edr = edr_client
def execute(self, context: PlaybookContext) -> ActionResult:
endpoint = context.alert_data.get('endpoint_hostname', '')
malicious_process = context.alert_data.get('process_name', '')
forensics = {}
# Processo malevolo
forensics['process_tree'] = self.edr.get_process_tree(
endpoint, malicious_process
)
# Network connections del processo
forensics['network_connections'] = self.edr.get_process_network(
endpoint, context.alert_data.get('pid')
)
# File system changes nelle ultime 2 ore
forensics['file_changes'] = self.edr.get_recent_file_changes(
endpoint, hours_back=2
)
# Autorun entries (persistence)
forensics['autoruns'] = self.edr.get_autoruns(endpoint)
# Memory dump del processo (se disponibile)
try:
forensics['memory_dump_path'] = self.edr.dump_process_memory(
endpoint, context.alert_data.get('pid')
)
except Exception as e:
self.logger.warning(f"Memory dump non disponibile: {e}")
context.enrichment_data['forensics'] = forensics
context.add_evidence('forensic_collection', forensics, 'edr')
return ActionResult.SUCCESS
class MalwareHashAnalysisStep(PlaybookStep):
"""Analizza hash del malware su multipli servizi TI."""
def __init__(self, vt_api_key: str, malware_bazaar_key: str):
super().__init__("malware_hash_analysis")
self.vt_key = vt_api_key
self.bazaar_key = malware_bazaar_key
def execute(self, context: PlaybookContext) -> ActionResult:
file_hash = context.alert_data.get('file_hash', '')
if not file_hash:
return ActionResult.SKIPPED
results = {}
# VirusTotal
with httpx.Client() as client:
vt_resp = client.get(
f"https://www.virustotal.com/api/v3/files/{file_hash}",
headers={"x-apikey": self.vt_key},
timeout=15
)
if vt_resp.status_code == 200:
vt_data = vt_resp.json()['data']['attributes']
stats = vt_data.get('last_analysis_stats', {})
results['virustotal'] = {
'malicious': stats.get('malicious', 0),
'total': sum(stats.values()),
'family': vt_data.get('popular_threat_classification', {}).get(
'suggested_threat_label', 'unknown'
)
}
# MalwareBazaar
with httpx.Client() as client:
bazaar_resp = client.post(
"https://mb-api.abuse.ch/api/v1/",
data={"query": "get_info", "hash": file_hash},
timeout=15
)
if bazaar_resp.status_code == 200:
bazaar_data = bazaar_resp.json()
if bazaar_data.get('query_status') == 'ok':
results['malware_bazaar'] = bazaar_data.get('data', [{}])[0]
context.enrichment_data['malware_analysis'] = results
# Aggiorna verdict
vt_malicious = results.get('virustotal', {}).get('malicious', 0)
if vt_malicious > 5:
context.enrichment_data['malware_verdict'] = 'confirmed_malware'
elif vt_malicious > 0:
context.enrichment_data['malware_verdict'] = 'suspicious'
else:
context.enrichment_data['malware_verdict'] = 'unknown'
return ActionResult.SUCCESS
class IsolateEndpointStep(PlaybookStep):
"""Isola l'endpoint dalla rete per prevenire lateral movement."""
def __init__(self, edr_client, cmdb_client):
super().__init__("isolate_endpoint")
self.edr = edr_client
self.cmdb = cmdb_client
def execute(self, context: PlaybookContext) -> ActionResult:
verdict = context.enrichment_data.get('malware_verdict', 'unknown')
if verdict not in ['confirmed_malware', 'suspicious']:
return ActionResult.SKIPPED
endpoint = context.alert_data.get('endpoint_hostname', '')
# Verifica criticalita dell'asset prima dell'isolamento
asset_info = self.cmdb.get_asset_info(endpoint)
if asset_info.get('criticality') == 'critical':
# Non isolare automaticamente asset critici
context.set_escalation(
f"Endpoint '{endpoint}' e critico (tipo: {asset_info.get('type')}). "
f"Isolamento richiede approvazione manuale."
)
return ActionResult.ESCALATE
# Isola l'endpoint
isolation_result = self.edr.isolate_endpoint(
endpoint,
reason=f"Malware detection - alert {context.alert_id}",
allow_edr_communication=True # Mantieni canale per remediation
)
context.add_evidence('isolation', {
'endpoint': endpoint,
'result': isolation_result,
'timestamp': datetime.now().isoformat()
}, 'edr')
self.logger.info(f"Endpoint {endpoint} isolato: {isolation_result}")
return ActionResult.SUCCESS
Otomatik Dokümantasyon ve Bilet Oluşturma
Olgun bir taktik kitabı otomatik olarak olay belgelerini oluşturarak bildirimi doldurur tüm zenginleştirme verileri, gerçekleştirilen eylemler ve iyileştirme önerileriyle birlikte. Bu, genellikle analistlerin zamanının %40'ını alan "raporlama yükünü" ortadan kaldırır.
# Documentazione Automatica
# File: steps/documentation_step.py
import jinja2
INCIDENT_REPORT_TEMPLATE = """
## Incident Report - {{ context.alert_id }}
**Status**: {{ context.status.value }}
**Start Time**: {{ context.start_time.strftime('%Y-%m-%d %H:%M:%S') }}
**Alert Type**: {{ context.alert_type }}
### Summary
{{ summary }}
### Risk Assessment
- **Risk Score**: {{ context.enrichment_data.get('risk_score', 'N/A') }}
- **Verdict**: {{ context.enrichment_data.get('vt_verdict', context.enrichment_data.get('malware_verdict', 'N/A')) }}
- **Triage Result**: {{ context.enrichment_data.get('triage_result', 'N/A') }}
### Threat Intelligence
{% if context.enrichment_data.get('virustotal') %}
**VirusTotal**: {{ context.enrichment_data.virustotal | tojson(indent=2) }}
{% endif %}
### Actions Taken
{% for action in context.actions_taken %}
- **{{ action.timestamp }}** - {{ action.action }}: {{ action.result }}
{% if action.details %} Details: {{ action.details }}{% endif %}
{% endfor %}
### Evidence Collected
{% for ev in context.evidence %}
- **{{ ev.type }}** (from {{ ev.source }}): {{ ev.timestamp }}
{% endfor %}
{% if context.escalation_reason %}
### Escalation Required
**Reason**: {{ context.escalation_reason }}
**Recommended Actions**:
1. Validate analyst judgment on enrichment data
2. Confirm containment or determine alternative
3. Initiate full forensic investigation if warranted
{% endif %}
### Indicators of Compromise
{% if context.enrichment_data.get('urls') %}
**Malicious URLs**:
{% for url in context.enrichment_data.urls %}
- {{ url }}
{% endfor %}
{% endif %}
"""
class DocumentIncidentStep(PlaybookStep):
"""Crea documentazione strutturata dell'incidente."""
def __init__(self, thehive_client=None):
super().__init__("document_incident")
self.thehive = thehive_client
self.template = jinja2.Template(INCIDENT_REPORT_TEMPLATE)
def execute(self, context: PlaybookContext) -> ActionResult:
# Genera summary basato sul tipo di alert
summary = self._generate_summary(context)
# Renderizza il report
report = self.template.render(
context=context,
summary=summary
)
context.enrichment_data['incident_report'] = report
# Crea/aggiorna caso in TheHive se disponibile
if self.thehive:
try:
case_id = context.alert_data.get('thehive_case_id')
if case_id:
self.thehive.update_case(
case_id,
description=report,
tags=self._extract_tags(context)
)
else:
new_case_id = self.thehive.create_case({
'title': f"[SOAR] {context.alert_type} - {context.alert_id}",
'description': report,
'severity': self._determine_severity(context),
'tags': self._extract_tags(context)
})
context.enrichment_data['thehive_case_id'] = new_case_id
except Exception as e:
self.logger.error(f"Errore creazione caso TheHive: {e}")
return ActionResult.SUCCESS
def _generate_summary(self, context: PlaybookContext) -> str:
alert_type = context.alert_type
if alert_type == 'phishing':
return (
f"Email di phishing rilevata con {len(context.enrichment_data.get('urls', []))} "
f"URL sospetti. Verdict VirusTotal: {context.enrichment_data.get('vt_verdict', 'N/A')}."
)
elif alert_type == 'malware_endpoint':
endpoint = context.alert_data.get('endpoint_hostname', 'N/A')
return (
f"Malware rilevato su {endpoint}. "
f"Verdict: {context.enrichment_data.get('malware_verdict', 'N/A')}."
)
return "Incidente di sicurezza processato automaticamente."
def _extract_tags(self, context: PlaybookContext) -> list[str]:
tags = [f"soar-auto", f"type:{context.alert_type}"]
if context.status == PlaybookStatus.ESCALATED:
tags.append("needs-human-review")
verdict = context.enrichment_data.get('vt_verdict') or \
context.enrichment_data.get('malware_verdict')
if verdict:
tags.append(f"verdict:{verdict}")
return tags
def _determine_severity(self, context: PlaybookContext) -> int:
score = context.enrichment_data.get('risk_score', 0)
if score >= 50:
return 3 # High
elif score >= 25:
return 2 # Medium
return 1 # Low
Playbook'ların Test Edilmesi ve Sürümlendirilmesi
SOAR taktik kitapları uygulama koduyla aynı titizlikle test edilmelidir. Üretimdeki hatalı bir taktik kitabı aşırı sınırlamaya (ana bilgisayar izolasyonu) neden olabilir meşru) veya yetersiz kontrol altına alma (gerçek tehditlere karşı harekete geçmeme).
# Testing Framework per SOAR Playbook
# File: tests/test_phishing_playbook.py
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime
class MockExchangeClient:
def __init__(self):
self.blocked_senders = []
self.deleted_emails = []
def add_to_blocklist(self, sender: str) -> None:
self.blocked_senders.append(sender)
def search_and_delete(self, sender: str, url_contains: str) -> dict:
self.deleted_emails.append({"sender": sender, "url": url_contains})
return {"deleted_count": 3, "status": "success"}
class MockADClient:
def __init__(self):
self.password_resets = []
self.revoked_sessions = []
def force_password_reset(self, user: str) -> None:
self.password_resets.append(user)
def revoke_sessions(self, user: str) -> None:
self.revoked_sessions.append(user)
class MockVirusTotalClient:
def __init__(self, malicious_urls: list[str] = None):
self.malicious_urls = malicious_urls or []
def check_url(self, url: str) -> dict:
if url in self.malicious_urls:
return {"malicious": 15, "suspicious": 3, "harmless": 0}
return {"malicious": 0, "suspicious": 0, "harmless": 50}
class TestPhishingPlaybook:
def setup_method(self):
self.exchange_client = MockExchangeClient()
self.ad_client = MockADClient()
@pytest.fixture
def high_risk_phishing_alert(self) -> dict:
return {
"alert_id": "test-001",
"alert_type": "phishing",
"sender_email": "attacker@evil.com",
"recipient_email": "victim@company.com",
"user_clicked_link": True,
"email_raw": """From: attacker@evil.com
To: victim@company.com
Subject: Urgente: Aggiorna le tue credenziali
Clicca qui: http://malicious-phish.com/steal-creds
""",
"thehive_case_id": None
}
def test_phishing_high_risk_containment(self, high_risk_phishing_alert):
"""Test: phishing ad alto rischio deve triggherare containment."""
with patch('httpx.Client') as mock_http:
# Mocka VirusTotal response con URL malevolo
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"data": {
"attributes": {
"last_analysis_stats": {
"malicious": 15, "suspicious": 2,
"harmless": 0, "undetected": 5
}
}
}
}
mock_http.return_value.__enter__.return_value.get.return_value = mock_response
playbook = build_phishing_playbook(
vt_api_key="test-key",
exchange_client=self.exchange_client,
ad_client=self.ad_client
)
context = playbook.run(
alert_id=high_risk_phishing_alert['alert_id'],
alert_type=high_risk_phishing_alert['alert_type'],
alert_data=high_risk_phishing_alert
)
# Verifica stato finale
assert context.status in [PlaybookStatus.COMPLETED, PlaybookStatus.ESCALATED]
# Se completato, verifica azioni intraprese
if context.status == PlaybookStatus.COMPLETED:
action_names = [a['action'] for a in context.actions_taken]
assert 'virustotal_enrichment' in action_names
assert 'triage_decision' in action_names
def test_low_risk_no_containment(self):
"""Test: phishing a basso rischio non deve triggherare containment."""
alert = {
"alert_id": "test-002",
"alert_type": "phishing",
"sender_email": "newsletter@legit.com",
"recipient_email": "user@company.com",
"user_clicked_link": False,
"email_raw": "From: newsletter@legit.com\nSubject: News\n\nHello!"
}
playbook = build_phishing_playbook(
vt_api_key="test-key",
exchange_client=self.exchange_client,
ad_client=self.ad_client
)
with patch('httpx.Client'):
context = playbook.run("test-002", "phishing", alert)
# Nessun isolamento su basso rischio
assert len(self.exchange_client.blocked_senders) == 0
assert len(self.ad_client.password_resets) == 0
En İyi Uygulama: Kuru Çalıştırma Başucu Kitabı
Üretime dağıtmadan önce her oyun kitabını 2 hafta boyunca deneme modunda çalıştırın. Prova tüm zenginleştirme ve karar adımlarını gerçekleştirir ancak sınırlama eylemlerini atlar gerçek (izolasyon, bloklamalar, şifre sıfırlama), yalnızca kendisinin yapacağını günlüğe kaydediyor. Bu, triyaj mantığını risk olmadan kalibre etmenize olanak tanır.
Sonuçlar ve Temel Çıkarımlar
Python'daki SOAR taktik kitapları, modern olay müdahalesinin sınırlarını temsil ediyor: güvenlik entegrasyonları ekosistemi ile genel amaçlı bir dilin esnekliği mevcut en zengin. Modüler çerçeve yeniden kullanılabilirlik, test edilebilirlik ve zaman içinde sürdürülebilirlik.
Temel Çıkarımlar
- Modüler mimari (Step + Playbook + Context) herhangi bir SOAR girişiminin temelidir
- Çok kaynaklı zenginleştirme (VT, EDR, CMDB, AD) doğru triyaj kararları için kritik öneme sahiptir
- Kritik varlıklar için otomatik yükseltme, aşırı sınırlamadan kaynaklanan ikincil hasarları önler
- Otomatik dokümantasyon, raporlama yükünü ortadan kaldırır (analist süresinin %40'ı)
- Sahte istemcilerle test yapmak, üretim sistemlerini etkilemeden kaliteyi garanti eder
- Deneme modu, canlı yayına geçmeden önce triyaj mantığını kalibre etmek için gereklidir
İlgili Makaleler
- Uyarı Triyaj Otomasyonu: Grafik Analizi ile MTTD'yi Azaltın
- Tehdit İstihbaratının Alınması: STIX/TAXII Besleme İşleyicisi
- Bir Disiplin Olarak Tespit Mühendisliği: Komut Dosyasından İşlem Hattına







