Most organizations collect threat intelligence. Few actually operationalize it. The gap between “we have IOC feeds” and “our SIEM automatically detects these threats” is where security teams lose weeks of manual effort — or worse, let indicators rot unused in a database. This guide builds an automated pipeline that ingests STIX/TAXII feeds, correlates and deduplicates IOCs in OpenCTI, scores confidence, and pushes actionable detection rules to Wazuh and Suricata with proper expiration and false-positive management.
Pipeline Architecture
┌────────────────────┐
│ STIX/TAXII Feeds │
│ (abuse.ch, OTX, │
│ MITRE ATT&CK) │
└────────┬───────────┘
│
▼
┌────────────────────┐ ┌──────────────────┐
│ Feed Ingestion │────▶│ OpenCTI │
│ (Python workers) │ │ (correlation + │
│ │ │ enrichment) │
└────────────────────┘ └────────┬─────────┘
│
┌────────▼─────────┐
│ Rule Generator │
│ (confidence + │
│ expiration) │
└───┬──────────┬────┘
│ │
┌──────────▼──┐ ┌────▼───────────┐
│ Wazuh │ │ Suricata │
│ (CDB lists, │ │ (emerging rules │
│ decoders) │ │ + threshold) │
└─────────────┘ └────────────────┘
Feed Ingestion Layer
abuse.ch Feeds
The abuse.ch suite provides several high-value feeds. Note that their API endpoints now require authentication for JSON responses, but CSV and text-based downloads remain open:
#!/usr/bin/env python3
"""feed_ingester.py — Fetch and normalize threat intel feeds."""
import csv
import json
import hashlib
import requests
from datetime import datetime, timedelta, timezone
from typing import Generator
FEEDS = {
"urlhaus": {
"url": "https://urlhaus.abuse.ch/downloads/csv_recent/",
"type": "csv",
"ioc_type": "url",
"confidence": 75,
"ttl_days": 30,
},
"threatfox_hostfile": {
"url": "https://threatfox.abuse.ch/downloads/hostfile/",
"type": "hostfile",
"ioc_type": "domain-name",
"confidence": 70,
"ttl_days": 30,
},
"feodo_botnet_c2": {
"url": "https://feodotracker.abuse.ch/downloads/ipblocklist.json",
"type": "json",
"ioc_type": "ipv4-addr",
"confidence": 90,
"ttl_days": 14,
},
"blocklist_de": {
"url": "https://lists.blocklist.de/lists/all.txt",
"type": "plaintext",
"ioc_type": "ipv4-addr",
"confidence": 50,
"ttl_days": 7,
},
}
def fetch_urlhaus_csv(url: str) -> Generator[dict, None, None]:
"""Parse URLhaus CSV, skipping comment lines."""
resp = requests.get(url, timeout=60)
resp.raise_for_status()
lines = [l for l in resp.text.splitlines() if not l.startswith("#")]
reader = csv.reader(lines)
for row in reader:
if len(row) >= 8:
yield {
"value": row[2], # url
"ioc_type": "url",
"threat_type": row[4], # malware_download, etc.
"malware": row[6],
"source": "urlhaus",
"first_seen": row[1],
"tags": [t.strip() for t in row[7].split(",") if t.strip()],
}
def fetch_feodo_json(url: str) -> Generator[dict, None, None]:
"""Parse Feodo Tracker JSON blocklist."""
resp = requests.get(url, timeout=60)
resp.raise_for_status()
for entry in resp.json():
yield {
"value": entry["ip_address"],
"ioc_type": "ipv4-addr",
"threat_type": "botnet_cc",
"malware": entry.get("malware", "unknown"),
"source": "feodotracker",
"first_seen": entry.get("first_seen", ""),
"port": entry.get("dst_port"),
}
def fetch_plaintext_ips(url: str) -> Generator[dict, None, None]:
"""Parse plain-text IP lists (one IP per line)."""
resp = requests.get(url, timeout=60)
resp.raise_for_status()
for line in resp.text.splitlines():
line = line.strip()
if line and not line.startswith("#"):
yield {
"value": line,
"ioc_type": "ipv4-addr",
"source": "blocklist",
}
AlienVault OTX via STIX/TAXII
from stix2 import TAXIICollectionSource, Filter
from taxii2client.v20 import Collection
OTX_TAXII_URL = "https://otx.alienvault.com/taxii/v2/"
def fetch_otx_indicators(api_key: str, collection_id: str) -> list:
"""Fetch indicators from OTX TAXII 2.0 endpoint."""
collection = Collection(
f"{OTX_TAXII_URL}collections/{collection_id}/",
headers={"Authorization": f"Bearer {api_key}"}
)
tc_source = TAXIICollectionSource(collection)
# Get indicators from the last 24 hours
indicators = tc_source.query([
Filter("type", "=", "indicator"),
Filter("created", ">=", (
datetime.now(timezone.utc) - timedelta(days=1)
).strftime("%Y-%m-%dT%H:%M:%SZ"))
])
return indicators
Deduplication and Confidence Scoring
Raw feeds contain massive overlap. A single C2 IP might appear in URLhaus, Feodo Tracker, and Blocklist.de. Deduplication reduces noise; confidence scoring prioritizes response.
import hashlib
from collections import defaultdict
class IOCDeduplicator:
"""Deduplicate IOCs across feeds with confidence aggregation."""
def __init__(self):
self.seen: dict[str, dict] = {}
def _normalize(self, value: str, ioc_type: str) -> str:
"""Normalize IOC values for deduplication."""
if ioc_type == "url":
value = value.lower().rstrip("/")
elif ioc_type == "domain-name":
value = value.lower().strip(".")
elif ioc_type == "ipv4-addr":
# Normalize 010.0.0.1 -> 10.0.0.1
parts = value.split(".")
value = ".".join(str(int(p)) for p in parts)
return value
def add(self, ioc: dict, feed_confidence: int) -> dict:
"""Add IOC, return merged record with updated confidence."""
key = self._normalize(ioc["value"], ioc["ioc_type"])
ioc_hash = hashlib.sha256(key.encode()).hexdigest()[:16]
if ioc_hash in self.seen:
existing = self.seen[ioc_hash]
existing["sources"].add(ioc.get("source", "unknown"))
existing["source_count"] = len(existing["sources"])
# Multi-source corroboration boosts confidence
# Cap at 95 — never fully trust automated feeds
existing["confidence"] = min(95,
existing["confidence"] + (feed_confidence 0.3)
)
existing["last_seen"] = datetime.now(timezone.utc).isoformat()
return existing
record = {
"hash": ioc_hash,
"value": key,
"ioc_type": ioc["ioc_type"],
"confidence": feed_confidence,
"sources": {ioc.get("source", "unknown")},
"source_count": 1,
"first_ingested": datetime.now(timezone.utc).isoformat(),
"last_seen": datetime.now(timezone.utc).isoformat(),
"tags": ioc.get("tags", []),
"malware": ioc.get("malware"),
"expires_at": (
datetime.now(timezone.utc) + timedelta(days=30)
).isoformat(),
}
self.seen[ioc_hash] = record
return record
Confidence Scoring Model
Use a tiered model rather than raw numbers:
| Confidence Range | Label | Action |
|—|—|—|
| 90-95 | Confirmed | Auto-block, create SIEM alert |
| 70-89 | High | SIEM alert, manual review queue |
| 50-69 | Medium | SIEM correlation only (no alert) |
| 30-49 | Low | Log enrichment only |
| <30 | Informational | Store but do not deploy |
Pushing to OpenCTI
OpenCTI provides the correlation layer. Use the GraphQL API to create indicators with proper STIX relationships:
import requests
OPENCTI_URL = "https://opencti.internal.example-corp.com"
OPENCTI_TOKEN = "your-api-token-here" # Store in secrets manager
def create_opencti_indicator(ioc: dict) -> str:
"""Create or update indicator in OpenCTI."""
query = """
mutation CreateIndicator($input: IndicatorAddInput!) {
indicatorAdd(input: $input) {
id
name
pattern
}
}
"""
# Build STIX pattern from IOC type
pattern_map = {
"ipv4-addr": f"[ipv4-addr:value = '{ioc['value']}']",
"domain-name": f"[domain-name:value = '{ioc['value']}']",
"url": f"[url:value = '{ioc['value']}']",
"file-sha256": f"[file:hashes.'SHA-256' = '{ioc['value']}']",
}
variables = {
"input": {
"name": f"{ioc['ioc_type']}: {ioc['value']}",
"pattern": pattern_map[ioc["ioc_type"]],
"pattern_type": "stix",
"x_opencti_main_observable_type": ioc["ioc_type"],
"confidence": int(ioc["confidence"]),
"valid_from": ioc["first_ingested"],
"valid_until": ioc["expires_at"],
"x_opencti_score": int(ioc["confidence"]),
"objectLabel": ioc.get("tags", []),
}
}
resp = requests.post(
f"{OPENCTI_URL}/graphql",
headers={"Authorization": f"Bearer {OPENCTI_TOKEN}"},
json={"query": query, "variables": variables},
timeout=30,
)
return resp.json()["data"]["indicatorAdd"]["id"]
Generating Wazuh CDB Lists
Wazuh uses CDB (Constant Database) lists for fast IOC lookups. Generate these from the deduplicated, scored IOC set:
def generate_wazuh_cdb(iocs: list[dict], min_confidence: int = 50) -> str:
"""Generate Wazuh CDB list from scored IOCs."""
lines = []
for ioc in sorted(iocs, key=lambda x: x["confidence"], reverse=True):
if ioc["confidence"] < min_confidence:
continue
if ioc["ioc_type"] == "ipv4-addr":
# CDB format: key:value
lines.append(f"{ioc['value']}:threat_{ioc.get('malware', 'generic')}")
return "n".join(lines)
def generate_wazuh_rules(rule_id_start: int = 100900) -> str:
"""Generate Wazuh rules that reference CDB lists."""
return f"""
<group name="threat_intel,">
<rule id="{rule_id_start}" level="12">
<if_sid>5710</if_sid>
<list field="srcip" lookup="address_match_key">etc/lists/threat_intel_ips</list>
<description>Connection from known threat intel IP: $(srcip)</description>
<group>threat_intel,network,</group>
</rule>
<rule id="{rule_id_start + 1}" level="14">
<if_sid>31100,31101</if_sid>
<list field="url" lookup="match_key">etc/lists/threat_intel_urls</list>
<description>Access to known malicious URL detected</description>
<group>threat_intel,web,</group>
</rule>
<rule id="{rule_id_start + 2}" level="10">
<if_sid>5710</if_sid>
<list field="dstip" lookup="address_match_key">etc/lists/threat_intel_c2</list>
<description>Outbound connection to known C2 server: $(dstip)</description>
<group>threat_intel,c2,</group>
</rule>
</group>
"""
Deploy CDB lists to the Wazuh manager:
# Deploy updated CDB list
scp threat_intel_ips.cdb siem01.internal:/var/ossec/etc/lists/threat_intel_ips
ssh siem01.internal "chown wazuh:wazuh /var/ossec/etc/lists/threat_intel_ips"
ssh siem01.internal "/var/ossec/bin/wazuh-control restart"
Suricata Rule Generation
For network-level detection, generate Suricata rules from high-confidence IOCs:
def generate_suricata_rules(iocs: list[dict], sid_start: int = 9000001) -> str:
"""Generate Suricata rules from high-confidence IOCs."""
rules = []
sid = sid_start
for ioc in iocs:
if ioc["confidence"] < 70:
continue
if ioc["ioc_type"] == "ipv4-addr":
rules.append(
f'alert ip any any -> {ioc["value"]} any '
f'(msg:"THREAT-INTEL Known malicious IP {ioc["value"]} '
f'({ioc.get("malware", "unknown")})"; '
f'classtype:trojan-activity; sid:{sid}; rev:1; '
f'metadata:confidence {int(ioc["confidence"])}, '
f'source {",".join(ioc["sources"])}, '
f'expires {ioc["expires_at"][:10]};)'
)
sid += 1
elif ioc["ioc_type"] == "domain-name":
rules.append(
f'alert dns any any -> any any '
f'(msg:"THREAT-INTEL DNS query for malicious domain '
f'{ioc["value"]}"; dns.query; '
f'content:"{ioc["value"]}"; nocase; '
f'classtype:trojan-activity; sid:{sid}; rev:1;)'
)
sid += 1
return "n".join(rules)
IOC Expiration and False Positive Tuning
IOCs have a shelf life. A C2 IP from last month is likely reassigned or sinkholed. Build expiration into the pipeline:
def expire_stale_iocs(iocs: dict, grace_days: int = 7) -> tuple[list, list]:
"""Separate active from expired IOCs."""
now = datetime.now(timezone.utc)
active, expired = [], []
for ioc in iocs.values():
expires_at = datetime.fromisoformat(ioc["expires_at"])
if now > expires_at + timedelta(days=grace_days):
expired.append(ioc)
else:
active.append(ioc)
return active, expired
For false positive management, maintain a suppression list that persists across pipeline runs:
{
"suppressions": [
{
"value": "8.8.8.8",
"reason": "Google Public DNS — false positive from blocklist.de",
"suppressed_by": "[email protected]",
"suppressed_at": "2026-03-15T14:30:00Z"
},
{
"value": "cdn.example-corp.com",
"reason": "Internal CDN flagged by ThreatFox hostfile",
"suppressed_by": "[email protected]",
"suppressed_at": "2026-03-20T09:00:00Z"
}
]
}
Automation Schedule
Run the full pipeline on a cron schedule, with staggered feed fetches to avoid hammering upstream APIs:
# /etc/cron.d/threat-intel-pipeline
# Fetch feeds every 4 hours, staggered
0 /4 svc-threatintel /opt/threat-intel/venv/bin/python /opt/threat-intel/fetch_feeds.py --feeds urlhaus,feodo
15 /4 svc-threatintel /opt/threat-intel/venv/bin/python /opt/threat-intel/fetch_feeds.py --feeds otx,blocklist
# Deduplicate and score
30 /4 svc-threatintel /opt/threat-intel/venv/bin/python /opt/threat-intel/deduplicate_and_score.py
# Push to OpenCTI
35 /4 svc-threatintel /opt/threat-intel/venv/bin/python /opt/threat-intel/push_opencti.py
# Generate and deploy SIEM rules
45 /4 svc-threatintel /opt/threat-intel/venv/bin/python /opt/threat-intel/deploy_rules.py
# Expire stale IOCs daily
0 2 svc-threatintel /opt/threat-intel/venv/bin/python /opt/threat-intel/expire_iocs.py
This pipeline turns raw feeds into deployed detections in under an hour, with confidence scoring that prevents alert fatigue and expiration that prevents stale data. The key is treating threat intelligence as a data engineering problem — deduplication, normalization, scoring, and lifecycle management are more important than the volume of feeds you ingest.
