Tehdit İstihbaratının Alınması: STIX/TAXII Besleme İşleyicisi
Tehdit istihbaratı modern güvenlik operasyonlarının yakıtıdır: taze veriler olmadan aktörler, kampanyalar, teknikler ve uzlaşma göstergeleri (IOC) arasında bir SOC reaktif olarak çalışır proaktif olmak yerine. Ancak zorluk istihbaratın mevcudiyeti değil, onun sorunu işleme sistematik.
STIX/TAXII beslemeleri (Yapılandırılmış Tehdit Bilgileri eXpression / Güvenilir Otomatik e-Alışveriş İstihbarat Bilgileri) otomatik paylaşıma yönelik uluslararası standarttır. Makine tarafından okunabilir formatta tehdit istihbaratı. Yapı için STIX 2.1 kombinasyonu veri ve taşıma için TAXII 2.1, alım boru hatlarını tamamen oluşturmanıza olanak tanır SIEM, EDR ve güvenlik duvarlarını yeni IOC'lerle neredeyse gerçek zamanlı olarak güncelleyen otomasyon.
Bu makale, bağlantıdan sunuculara kadar eksiksiz bir tehdit istihbaratı işlemcisi oluşturur TAXII, STIX 2.1 nesne ayrıştırma, IOC zenginleştirme, uyarılarla korelasyon SIEM'in ve uygun güvenlik kontrollerine otomatik yönlendirme.
Ne Öğreneceksiniz
- STIX 2.1 nesnelerinin yapısı ve TI modelleme kalıpları
- TAXII 2.1 sunucularını Python Taxii2-client ile bağlama ve yoklama
- STIX paketlerinden IOC'leri ayrıştırma ve normalleştirme
- IOC puanlama ve veri tekilleştirme
- SIEM, EDR ve güvenlik duvarına otomatik yönlendirme
- Gerçek zamanlı IOC uyarı korelasyonu ve otomatik uyarı zenginleştirme oluşturma
STIX 2.1: Veri Modeli
STIX 2.1, neredeyse her yönü temsil edecek bir nesne dağarcığı tanımlar tehdit istihbaratı. Ana ve temel nesnelerin yapısını anlamak İşlemciyi oluşturmadan önce.
Bir IOC işlemcisi için en uygun STIX Etki Alanı Nesneleri (SDO) şunlardır:
| STIX nesnesi | Kapsam | Anahtar Alanlar |
|---|---|---|
indicator |
Tespit desenli IOC | kalıp, geçerli_başlangıç, geçerli_kadar, etiketler |
malware |
Kötü amaçlı yazılım ailesi ayrıntıları | ad, kötü amaçlı yazılım_türleri, takma adlar, yetenekler |
threat-actor |
Tehdit aktörü profili | ad, takma adlar, tehdit_aktör_tipleri, hedefler |
attack-pattern |
Saldırı tekniği (ATT&CK) | ad, harici_referanslar (ATT&CK Kimliği) |
campaign |
Saldırı kampanyası | ad, takma adlar, ilk_görülen, son_görülen |
vulnerability |
CVE'ler ve güvenlik açıkları | ad, harici_referanslar (CVE), açıklama |
relationship |
Nesneler arasındaki bağlantılar | ilişki_türü, kaynak_ref, hedef_ref |
Nesnelerdeki desenlerin formatı indicator özel bir dil kullanır
STIX Desen Dili denir. Örnekler:
# Esempi di STIX Pattern Language
# File hash MD5
[file:hashes.'MD5' = 'd41d8cd98f00b204e9800998ecf8427e']
# URL malevolo
[url:value = 'http://malicious-c2.com/beacon']
# Indirizzo IP con porta
[network-traffic:dst_ref.type = 'ipv4-addr'
AND network-traffic:dst_ref.value = '192.168.1.1'
AND network-traffic:dst_port = 443]
# Dominio
[domain-name:value = 'evil-c2.com']
# Combinazione: file hash AND rete
[file:hashes.'SHA-256' = 'abc123...'
AND network-traffic:dst_ref.value = '10.0.0.1']
TAXII Sunucularına Bağlantı 2.1
TAXII 2.1, tehdit istihbaratı koleksiyonlarını keşfetmek ve yoklamak için bir REST API tanımlar.
Python kütüphanesi taxii2-client kimlik doğrulamayı yöneten eksiksiz bir istemci sağlar,
sayfalama ve hata yönetimi.
# Client TAXII 2.1
# File: taxii_client.py
from taxii2client.v21 import Server, Collection
from taxii2client.common import TokenAuth, BasicAuth
import requests
from datetime import datetime, timedelta
from typing import Iterator
import logging
class TaxiiClientConfig:
def __init__(self, url: str, auth_type: str, **auth_params):
self.url = url
self.auth_type = auth_type
self.auth_params = auth_params
class TaxiiIngestor:
def __init__(self, configs: list[TaxiiClientConfig]):
self.configs = configs
self.logger = logging.getLogger(__name__)
def _create_auth(self, config: TaxiiClientConfig):
"""Crea l'oggetto di autenticazione appropriato."""
if config.auth_type == 'token':
return TokenAuth(key=config.auth_params.get('token'))
elif config.auth_type == 'basic':
return BasicAuth(
username=config.auth_params.get('username'),
password=config.auth_params.get('password')
)
return None
def discover_collections(self, config: TaxiiClientConfig) -> list[dict]:
"""Scopre le collection disponibili sul server TAXII."""
try:
auth = self._create_auth(config)
server = Server(config.url, auth=auth)
collections_info = []
for api_root in server.api_roots:
for collection in api_root.collections:
collections_info.append({
'url': config.url,
'api_root': api_root.url,
'collection_id': collection.id,
'title': collection.title,
'description': collection.description,
'can_read': collection.can_read,
'can_write': collection.can_write
})
return collections_info
except Exception as e:
self.logger.error(f"Errore discovery server {config.url}: {e}")
return []
def poll_collection(
self,
config: TaxiiClientConfig,
collection_id: str,
added_after: datetime = None,
limit: int = 1000
) -> Iterator[dict]:
"""
Fa polling di una collection TAXII e restituisce bundle STIX.
Gestisce la paginazione automaticamente.
"""
if added_after is None:
added_after = datetime.utcnow() - timedelta(hours=24)
auth = self._create_auth(config)
server = Server(config.url, auth=auth)
for api_root in server.api_roots:
for collection in api_root.collections:
if collection.id != collection_id:
continue
self.logger.info(
f"Polling collection '{collection.title}' dal {added_after}"
)
next_page = None
total_fetched = 0
while True:
try:
kwargs = {
'added_after': added_after.isoformat() + 'Z',
'limit': min(limit, 500) # Max 500 per request
}
if next_page:
kwargs['next'] = next_page
response = collection.get_objects(**kwargs)
objects = response.get('objects', [])
total_fetched += len(objects)
if objects:
yield response # Yield intero bundle STIX
# Gestione paginazione TAXII
next_page = response.get('next')
if not next_page or total_fetched >= limit:
break
except Exception as e:
self.logger.error(f"Errore polling: {e}")
break
def poll_all_sources(
self,
collection_mapping: dict[str, str],
added_after: datetime = None
) -> list[dict]:
"""Fa polling di tutte le sorgenti configurate."""
all_objects = []
for config in self.configs:
collection_id = collection_mapping.get(config.url)
if not collection_id:
continue
for bundle in self.poll_collection(config, collection_id, added_after):
all_objects.extend(bundle.get('objects', []))
self.logger.info(f"Recuperati {len(all_objects)} oggetti STIX totali")
return all_objects
STIX IOC'lerin Ayrıştırılması ve Normalleştirilmesi
STIX nesneleri, güvenlik sistemlerinin kullanabileceği normalleştirilmiş yapılara dönüştürülmelidir. (SIEM, EDR, güvenlik duvarı) doğrudan tüketebilir. Gösterge ayrıştırma gerektirir STIX Desen Dilinin yorumlanması.
# Parser STIX IOC
# File: stix_parser.py
import re
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class NormalizedIOC:
"""IOC normalizzato e pronto per il deployment."""
raw_id: str # ID STIX originale
ioc_type: str # 'ip', 'domain', 'url', 'hash_md5', 'hash_sha256', 'email'
value: str # Il valore dell'IOC
source: str # Feed sorgente
confidence: int # 0-100
severity: str # 'low', 'medium', 'high', 'critical'
valid_from: datetime
valid_until: Optional[datetime]
tags: list[str]
related_malware: list[str]
related_threat_actors: list[str]
mitre_techniques: list[str]
description: str = ''
class STIXParser:
# Pattern regex per estrarre valori dal STIX Pattern Language
PATTERN_EXTRACTORS = {
'ip': re.compile(
r"network-traffic:(?:dst|src)_ref\.value\s*=\s*'([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})'"
),
'domain': re.compile(
r"domain-name:value\s*=\s*'([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})'"
),
'url': re.compile(
r"url:value\s*=\s*'(https?://[^']+)'"
),
'hash_md5': re.compile(
r"file:hashes\.'MD5'\s*=\s*'([a-fA-F0-9]{32})'"
),
'hash_sha256': re.compile(
r"file:hashes\.'SHA-256'\s*=\s*'([a-fA-F0-9]{64})'"
),
'email': re.compile(
r"email-message:from_ref\.value\s*=\s*'([^@]+@[^']+)'"
),
}
def __init__(self, source_name: str):
self.source = source_name
self.logger = logging.getLogger(__name__)
def parse_bundle(self, stix_bundle: dict) -> list[NormalizedIOC]:
"""Parsa un intero bundle STIX e restituisce IOC normalizzati."""
objects = stix_bundle.get('objects', [])
# Indicizza oggetti per ID (per risolvere relazioni)
obj_index = {obj['id']: obj for obj in objects}
# Costruisci mappa delle relazioni
relationships = self._index_relationships(objects)
iocs = []
for obj in objects:
if obj.get('type') == 'indicator':
try:
ioc = self._parse_indicator(obj, obj_index, relationships)
if ioc:
iocs.append(ioc)
except Exception as e:
self.logger.warning(f"Errore parsing indicatore {obj.get('id')}: {e}")
return iocs
def _index_relationships(self, objects: list[dict]) -> dict[str, list[dict]]:
"""Indicizza le relazioni per source_ref."""
relationships = {}
for obj in objects:
if obj.get('type') == 'relationship':
source = obj.get('source_ref', '')
if source not in relationships:
relationships[source] = []
relationships[source].append(obj)
return relationships
def _parse_indicator(
self,
indicator: dict,
obj_index: dict,
relationships: dict
) -> Optional[NormalizedIOC]:
"""Parsa un singolo oggetto indicator STIX."""
pattern = indicator.get('pattern', '')
if not pattern:
return None
# Estrai IOC dal pattern
ioc_type, ioc_value = self._extract_ioc_from_pattern(pattern)
if not ioc_type or not ioc_value:
self.logger.debug(f"Pattern non riconosciuto: {pattern[:100]}")
return None
# Calcola validita
valid_from = self._parse_datetime(indicator.get('valid_from', ''))
valid_until = self._parse_datetime(indicator.get('valid_until'))
if valid_until and datetime.utcnow() > valid_until:
self.logger.debug(f"IOC scaduto: {ioc_value}")
return None
# Confidence dal livello STIX
confidence = indicator.get('confidence', 50)
if isinstance(confidence, str):
confidence = {'high': 85, 'medium': 50, 'low': 20}.get(confidence, 50)
# Risolvi relazioni per trovare malware e threat actor correlati
related_malware = []
related_actors = []
mitre_techniques = []
for rel in relationships.get(indicator.get('id', ''), []):
target_obj = obj_index.get(rel.get('target_ref', ''), {})
obj_type = target_obj.get('type', '')
if obj_type == 'malware':
related_malware.append(target_obj.get('name', ''))
elif obj_type == 'threat-actor':
related_actors.append(target_obj.get('name', ''))
elif obj_type == 'attack-pattern':
# Estrai tecnica MITRE da external references
for ref in target_obj.get('external_references', []):
if ref.get('source_name') == 'mitre-attack':
mitre_techniques.append(ref.get('external_id', ''))
# Labels diventano tags
tags = indicator.get('labels', [])
# Determina severity dal confidence
severity = 'low'
if confidence >= 80:
severity = 'critical'
elif confidence >= 60:
severity = 'high'
elif confidence >= 40:
severity = 'medium'
return NormalizedIOC(
raw_id=indicator.get('id', ''),
ioc_type=ioc_type,
value=ioc_value,
source=self.source,
confidence=confidence,
severity=severity,
valid_from=valid_from or datetime.utcnow(),
valid_until=valid_until,
tags=tags,
related_malware=related_malware,
related_threat_actors=related_actors,
mitre_techniques=mitre_techniques,
description=indicator.get('description', '')
)
def _extract_ioc_from_pattern(self,
pattern: str) -> tuple[str, str]:
"""Estrae tipo e valore IOC dal STIX pattern."""
# Pulisci il pattern
clean_pattern = pattern.strip('[]').strip()
for ioc_type, regex in self.PATTERN_EXTRACTORS.items():
match = regex.search(clean_pattern)
if match:
return ioc_type, match.group(1)
return None, None
def _parse_datetime(self, dt_str: str) -> Optional[datetime]:
if not dt_str:
return None
try:
return datetime.fromisoformat(dt_str.replace('Z', '+00:00'))
except ValueError:
return None
IOC Tekilleştirme ve Puanlama
Birden fazla yayın, farklı güven aralıklarıyla aynı IOC'yi içerebilir. Tekilleştirme sistemi birden fazla olduğunda güveni artırarak bilgileri akıllıca toplamalıdır bağımsız kaynaklar da aynı göstergeyi bildiriyor.
# IOC Deduplication e Scoring
# File: ioc_deduplicator.py
from collections import defaultdict
class IOCDeduplicator:
def __init__(self, min_confidence: int = 30):
self.min_confidence = min_confidence
def deduplicate_and_score(
self,
iocs: list[NormalizedIOC]
) -> list[NormalizedIOC]:
"""
Deduplica IOC e aumenta confidence quando
molteplici sorgenti riportano lo stesso valore.
"""
# Raggruppa per valore IOC
grouped = defaultdict(list)
for ioc in iocs:
key = (ioc.ioc_type, ioc.value.lower())
grouped[key].append(ioc)
result = []
for (ioc_type, value), group in grouped.items():
if len(group) == 1:
ioc = group[0]
else:
# Merge degli IOC dallo stesso valore
ioc = self._merge_iocs(group)
# Filtra per confidence minima
if ioc.confidence >= self.min_confidence:
result.append(ioc)
return sorted(result, key=lambda x: x.confidence, reverse=True)
def _merge_iocs(self, iocs: list[NormalizedIOC]) -> NormalizedIOC:
"""Merge di IOC duplicati da sorgenti diverse."""
# Prendi il più recente come base
base = max(iocs, key=lambda x: x.valid_from)
# Confidence aumenta con il numero di sorgenti (diminishing returns)
base_confidence = max(i.confidence for i in iocs)
source_boost = min(len(iocs) - 1, 3) * 5 # Max +15%
merged_confidence = min(base_confidence + source_boost, 100)
# Unisci metadati
all_tags = list(set(tag for i in iocs for tag in i.tags))
all_malware = list(set(m for i in iocs for m in i.related_malware))
all_actors = list(set(a for i in iocs for a in i.related_threat_actors))
all_techniques = list(set(t for i in iocs for t in i.mitre_techniques))
all_sources = list(set(i.source for i in iocs))
return NormalizedIOC(
raw_id=base.raw_id,
ioc_type=base.ioc_type,
value=base.value,
source=",".join(all_sources),
confidence=merged_confidence,
severity=base.severity,
valid_from=base.valid_from,
valid_until=base.valid_until,
tags=all_tags,
related_malware=all_malware,
related_threat_actors=all_actors,
mitre_techniques=all_techniques,
description=base.description
)
Güvenlik Kontrollerine Yönlendirme
Tam bir TI işlemci ayrıştırmayla durmaz: normalleştirilmiş IOC'leri sistemlere dağıtır savunmayı etkili bir şekilde uygulayabilir. Yönlendirme IOC tipine ve güvenine dayanmaktadır.
# IOC Router verso SIEM, EDR, Firewall
# File: ioc_router.py
import httpx
import json
class IOCRouter:
def __init__(self, config: dict):
self.siem_endpoint = config.get('siem_endpoint')
self.edr_api_key = config.get('edr_api_key')
self.firewall_api = config.get('firewall_api')
self.confidence_thresholds = {
'firewall_block': 80, # Blocca nel firewall solo alta confidence
'edr_detection': 60, # Alert EDR da confidence media+
'siem_enrichment': 30, # Enrichment SIEM per tutti gli IOC
}
def route(self, ioc: NormalizedIOC) -> list[dict]:
"""Distribuisce un IOC ai sistemi appropriati."""
results = []
# Tutti gli IOC vanno in SIEM per enrichment
if ioc.confidence >= self.confidence_thresholds['siem_enrichment']:
result = self._push_to_siem(ioc)
results.append({'destination': 'siem', **result})
# IOC di rete ad alta confidence vanno nel firewall
if (ioc.ioc_type in ('ip', 'domain', 'url') and
ioc.confidence >= self.confidence_thresholds['firewall_block']):
result = self._push_to_firewall(ioc)
results.append({'destination': 'firewall', **result})
# Hash e IP vanno in EDR
if (ioc.ioc_type in ('hash_md5', 'hash_sha256', 'ip') and
ioc.confidence >= self.confidence_thresholds['edr_detection']):
result = self._push_to_edr(ioc)
results.append({'destination': 'edr', **result})
return results
def _push_to_siem(self, ioc: NormalizedIOC) -> dict:
"""Invia IOC al SIEM per enrichment e detection."""
# Esempio: push a Elasticsearch/OpenSearch
document = {
'@timestamp': ioc.valid_from.isoformat(),
'ioc_type': ioc.ioc_type,
'ioc_value': ioc.value,
'confidence': ioc.confidence,
'severity': ioc.severity,
'source': ioc.source,
'tags': ioc.tags,
'related_malware': ioc.related_malware,
'threat_actors': ioc.related_threat_actors,
'mitre_techniques': ioc.mitre_techniques
}
try:
with httpx.Client() as client:
response = client.post(
f"{self.siem_endpoint}/threat-intel/_doc",
json=document,
timeout=10
)
return {'status': 'success', 'http_code': response.status_code}
except Exception as e:
return {'status': 'error', 'error': str(e)}
def _push_to_firewall(self, ioc: NormalizedIOC) -> dict:
"""Aggiunge un IP/dominio alla blocklist del firewall."""
blocklist_entry = {
'type': ioc.ioc_type,
'value': ioc.value,
'action': 'block',
'comment': f"TI-{ioc.source}-conf{ioc.confidence}",
'expiry': ioc.valid_until.isoformat() if ioc.valid_until else None
}
try:
with httpx.Client() as client:
response = client.post(
f"{self.firewall_api}/blocklist",
json=blocklist_entry,
headers={"Authorization": f"Bearer {self.edr_api_key}"},
timeout=10
)
return {'status': 'success', 'http_code': response.status_code}
except Exception as e:
return {'status': 'error', 'error': str(e)}
def _push_to_edr(self, ioc: NormalizedIOC) -> dict:
"""Configura l'EDR per rilevare l'IOC."""
ioc_config = {
'indicator_type': (
'hash' if 'hash' in ioc.ioc_type
else ioc.ioc_type
),
'indicator_value': ioc.value,
'action': 'alert' if ioc.confidence < 90 else 'block',
'severity': ioc.severity,
'description': ioc.description or f"IOC from {ioc.source}"
}
try:
with httpx.Client() as client:
response = client.post(
f"{self.firewall_api}/ioc-management",
json=ioc_config,
headers={"X-API-Key": self.edr_api_key},
timeout=10
)
return {'status': 'success', 'http_code': response.status_code}
except Exception as e:
return {'status': 'error', 'error': str(e)}
Uyarı Korelasyonuyla Komple İşlem Hattı
Boru hattının tamamı TAXII yoklamayı, STIX ayrıştırmayı, veri tekilleştirmeyi, yönlendirmeyi ve korelasyonu entegre eder SIEM uyarılarıyla gerçek zamanlı olarak. Yeni bir IOC alındığında sistem şunları kontrol eder: eşleşen yeni uyarılar varsa otomatik zenginleştirme oluşturulur.
# Pipeline TI Completa
# File: ti_pipeline.py
import asyncio
from datetime import datetime, timedelta
class ThreatIntelPipeline:
def __init__(self, taxii_ingestor: TaxiiIngestor,
stix_parser: STIXParser,
deduplicator: IOCDeduplicator,
router: IOCRouter,
siem_client):
self.ingestor = taxii_ingestor
self.parser = stix_parser
self.deduplicator = deduplicator
self.router = router
self.siem = siem_client
self.logger = logging.getLogger(__name__)
async def run_ingestion_cycle(
self,
collection_mapping: dict,
lookback_hours: int = 24
) -> dict:
"""Esegue un ciclo completo di ingestion."""
start_time = datetime.utcnow()
added_after = start_time - timedelta(hours=lookback_hours)
# 1. Poll tutti i feed
self.logger.info(f"Avvio polling TAXII da {added_after}")
raw_objects = self.ingestor.poll_all_sources(
collection_mapping, added_after
)
if not raw_objects:
return {'processed': 0, 'routed': 0}
# 2. Parse STIX bundle
bundle = {"objects": raw_objects}
iocs = self.parser.parse_bundle(bundle)
self.logger.info(f"Parsati {len(iocs)} IOC da {len(raw_objects)} oggetti STIX")
# 3. Deduplica e scoring
unique_iocs = self.deduplicator.deduplicate_and_score(iocs)
self.logger.info(f"IOC unici dopo dedup: {len(unique_iocs)}")
# 4. Routing verso controlli
routing_results = []
for ioc in unique_iocs:
results = self.router.route(ioc)
routing_results.extend(results)
# 5. Correlazione con alert recenti
correlations = await self._correlate_with_recent_alerts(unique_iocs)
return {
'processed': len(iocs),
'unique': len(unique_iocs),
'routed': len(routing_results),
'correlations_found': len(correlations),
'duration_seconds': (datetime.utcnow() - start_time).total_seconds()
}
async def _correlate_with_recent_alerts(
self,
iocs: list[NormalizedIOC],
lookback_hours: int = 48
) -> list[dict]:
"""Cerca alert recenti che matchano i nuovi IOC."""
correlations = []
# Query SIEM per alert recenti
recent_alerts = await self.siem.search_recent_alerts(
hours_back=lookback_hours,
limit=10000
)
# Crea indice degli IOC per lookup efficiente
ioc_index = {}
for ioc in iocs:
key = (ioc.ioc_type, ioc.value.lower())
ioc_index[key] = ioc
# Cerca match
for alert in recent_alerts:
# Controlla IP, domini, hash nell'alert
fields_to_check = [
('ip', alert.get('src_ip', '')),
('ip', alert.get('dst_ip', '')),
('domain', alert.get('dns_query', '')),
('hash_md5', alert.get('file_hash_md5', '')),
('hash_sha256', alert.get('file_hash_sha256', '')),
]
for ioc_type, value in fields_to_check:
if not value:
continue
ioc = ioc_index.get((ioc_type, value.lower()))
if ioc:
correlation = {
'alert_id': alert.get('id'),
'ioc_type': ioc_type,
'ioc_value': value,
'ioc_confidence': ioc.confidence,
'related_malware': ioc.related_malware,
'related_actors': ioc.related_threat_actors
}
correlations.append(correlation)
# Enrichisci l'alert nel SIEM
await self.siem.enrich_alert(
alert.get('id'), correlation
)
if correlations:
self.logger.warning(
f"CORRELAZIONI TROVATE: {len(correlations)} alert matchano IOC nuovi!"
)
return correlations
async def run_scheduled(self, interval_minutes: int = 60,
collection_mapping: dict = None) -> None:
"""Esegue il pipeline su intervallo schedulato."""
self.logger.info(
f"Pipeline TI avviata, polling ogni {interval_minutes} minuti"
)
while True:
try:
result = await self.run_ingestion_cycle(
collection_mapping or {}
)
self.logger.info(f"Ciclo completato: {result}")
except Exception as e:
self.logger.error(f"Errore ciclo ingestion: {e}", exc_info=True)
await asyncio.sleep(interval_minutes * 60)
Ücretsiz TAXII Yayınları Tavsiye Edilir
- GÖNYE ATT&CK TAKSİ:
https://cti-taxii.mitre.org/taxii/- Gruplar ve yazılım - LIMO anormallikleri: Topluluğun seçtiği IOC'lerle ücretsiz katman
- AlienVault OTX: Topluluk odaklı tehdit istihbaratı beslemesi
- Kötü Amaçlı Yazılım Bilgi Paylaşım Platformu (MISP): topluluk yayınlarıyla kendi kendine barındırılan
- Kötüye KullanımIPDB: Kötü amaçlı IP yayınları (REST API, TAXII değil)
Feed'lerin kalitesine dikkat edin
TAXII yayınlarının tümü aynı kalitede değildir. Düşük güven ve yüksek yüzdeyle besleme Yanlış pozitiflerin sayısı (ör. genel IP'ler, CDN'ler, Tor çıkış düğümleri) trafik blokajlarına neden olabilir doğrulama olmadan güvenlik duvarına eklenirse meşrudur. Her zaman bir sistem uygulayın engellemeden önce beyaz listeye alma (örneğin, tanınmış bulut sağlayıcılarının CIDR'leri, kuruluş IP'leri).
Sonuçlar ve Temel Çıkarımlar
Olgun bir STIX/TAXII işlemci, tehdit istihbaratını bir insan varlığından dönüştürüyor (raporları okuyan analistler) otomatikleştirilmiş operasyonel kaynağa (IOC'ler yayılıyor) yayınlandıktan birkaç dakika sonra otomatik olarak güvenlik kontrolleri yoluyla).
Temel Çıkarımlar
- STIX 2.1, TI modelleme için standart dildir; TAXII 2.1 ve taşıma protokolü
- STIX ayrıştırıcısının, IOC'leri kötü amaçlı yazılım/aktör bağlamıyla zenginleştirmek için nesneler arasındaki ilişkileri yönetmesi gerekir
- Çoklu kaynak tekilleştirme, uyumlu kaynaklarla orantılı olarak güveni artırır
- Yönlendirme, IOC türüne ve güvenine dayalı olmalıdır (güvenlik duvarındaki her şeyi engellemeyin)
- Gerçek zamanlı IOC uyarı korelasyonu gerçek katma değerdir: pasif IOC'leri aktif uyarılara dönüştürür
- Kendiliğinden kaynaklanan kesintileri önlemek için her zaman otomatik engellemeden önce beyaz listeye eklemeyi uygulayın
İlgili Makaleler
- Yapay Zeka Destekli Tespit: Sigma Kuralları Oluşturma için Yüksek Lisans
- Python'da SOAR Başucu Kitabı: Olay Müdahale Otomasyonu
- MITRE ATT&CK Entegrasyonu: Kapsama Boşluğunu Haritalama







