Practical Threat Intelligence Automation: From STIX/TAXII Feeds to Actionable SIEM Rules

Most organizations collect threat intelligence. Few actually operationalize it. The gap between “we have IOC feeds” and “our SIEM automatically detects these threats” is where security teams lose weeks of manual effort — or worse, let indicators rot unused in a database. This guide builds an automated pipeline that ingests STIX/TAXII feeds, correlates and deduplicates IOCs in OpenCTI, scores confidence, and pushes actionable detection rules to Wazuh and Suricata with proper expiration and false-positive management.

Pipeline Architecture

┌────────────────────┐
│   STIX/TAXII Feeds │
│  (abuse.ch, OTX,   │
│   MITRE ATT&CK)    │
└────────┬───────────┘
         │
         ▼
┌────────────────────┐     ┌──────────────────┐
│   Feed Ingestion   │────▶│    OpenCTI        │
│   (Python workers) │     │  (correlation +   │
│                    │     │   enrichment)     │
└────────────────────┘     └────────┬─────────┘
                                    │
                           ┌────────▼─────────┐
                           │  Rule Generator   │
                           │  (confidence +    │
                           │   expiration)     │
                           └───┬──────────┬────┘
                               │          │
                    ┌──────────▼──┐  ┌────▼───────────┐
                    │   Wazuh     │  │   Suricata      │
                    │ (CDB lists, │  │ (emerging rules │
                    │  decoders)  │  │  + threshold)   │
                    └─────────────┘  └────────────────┘

Feed Ingestion Layer

abuse.ch Feeds

The abuse.ch suite provides several high-value feeds. Note that their API endpoints now require authentication for JSON responses, but CSV and text-based downloads remain open:

#!/usr/bin/env python3
"""feed_ingester.py — Fetch and normalize threat intel feeds."""

import csv
import json
import hashlib
import requests
from datetime import datetime, timedelta, timezone
from typing import Generator

FEEDS = {
    "urlhaus": {
        "url": "https://urlhaus.abuse.ch/downloads/csv_recent/",
        "type": "csv",
        "ioc_type": "url",
        "confidence": 75,
        "ttl_days": 30,
    },
    "threatfox_hostfile": {
        "url": "https://threatfox.abuse.ch/downloads/hostfile/",
        "type": "hostfile",
        "ioc_type": "domain-name",
        "confidence": 70,
        "ttl_days": 30,
    },
    "feodo_botnet_c2": {
        "url": "https://feodotracker.abuse.ch/downloads/ipblocklist.json",
        "type": "json",
        "ioc_type": "ipv4-addr",
        "confidence": 90,
        "ttl_days": 14,
    },
    "blocklist_de": {
        "url": "https://lists.blocklist.de/lists/all.txt",
        "type": "plaintext",
        "ioc_type": "ipv4-addr",
        "confidence": 50,
        "ttl_days": 7,
    },
}

def fetch_urlhaus_csv(url: str) -> Generator[dict, None, None]:
    """Parse URLhaus CSV, skipping comment lines."""
    resp = requests.get(url, timeout=60)
    resp.raise_for_status()
    lines = [l for l in resp.text.splitlines() if not l.startswith("#")]
    reader = csv.reader(lines)
    for row in reader:
        if len(row) >= 8:
            yield {
                "value": row[2],            # url
                "ioc_type": "url",
                "threat_type": row[4],       # malware_download, etc.
                "malware": row[6],
                "source": "urlhaus",
                "first_seen": row[1],
                "tags": [t.strip() for t in row[7].split(",") if t.strip()],
            }

def fetch_feodo_json(url: str) -> Generator[dict, None, None]:
    """Parse Feodo Tracker JSON blocklist."""
    resp = requests.get(url, timeout=60)
    resp.raise_for_status()
    for entry in resp.json():
        yield {
            "value": entry["ip_address"],
            "ioc_type": "ipv4-addr",
            "threat_type": "botnet_cc",
            "malware": entry.get("malware", "unknown"),
            "source": "feodotracker",
            "first_seen": entry.get("first_seen", ""),
            "port": entry.get("dst_port"),
        }

def fetch_plaintext_ips(url: str) -> Generator[dict, None, None]:
    """Parse plain-text IP lists (one IP per line)."""
    resp = requests.get(url, timeout=60)
    resp.raise_for_status()
    for line in resp.text.splitlines():
        line = line.strip()
        if line and not line.startswith("#"):
            yield {
                "value": line,
                "ioc_type": "ipv4-addr",
                "source": "blocklist",
            }

AlienVault OTX via STIX/TAXII

from stix2 import TAXIICollectionSource, Filter
from taxii2client.v20 import Collection

OTX_TAXII_URL = "https://otx.alienvault.com/taxii/v2/"

def fetch_otx_indicators(api_key: str, collection_id: str) -> list:
    """Fetch indicators from OTX TAXII 2.0 endpoint."""
    collection = Collection(
        f"{OTX_TAXII_URL}collections/{collection_id}/",
        headers={"Authorization": f"Bearer {api_key}"}
    )
    tc_source = TAXIICollectionSource(collection)

    # Get indicators from the last 24 hours
    indicators = tc_source.query([
        Filter("type", "=", "indicator"),
        Filter("created", ">=", (
            datetime.now(timezone.utc) - timedelta(days=1)
        ).strftime("%Y-%m-%dT%H:%M:%SZ"))
    ])
    return indicators

Deduplication and Confidence Scoring

Raw feeds contain massive overlap. A single C2 IP might appear in URLhaus, Feodo Tracker, and Blocklist.de. Deduplication reduces noise; confidence scoring prioritizes response.

import hashlib
from collections import defaultdict

class IOCDeduplicator:
    """Deduplicate IOCs across feeds with confidence aggregation."""

    def __init__(self):
        self.seen: dict[str, dict] = {}

    def _normalize(self, value: str, ioc_type: str) -> str:
        """Normalize IOC values for deduplication."""
        if ioc_type == "url":
            value = value.lower().rstrip("/")
        elif ioc_type == "domain-name":
            value = value.lower().strip(".")
        elif ioc_type == "ipv4-addr":
            # Normalize 010.0.0.1 -> 10.0.0.1
            parts = value.split(".")
            value = ".".join(str(int(p)) for p in parts)
        return value

    def add(self, ioc: dict, feed_confidence: int) -> dict:
        """Add IOC, return merged record with updated confidence."""
        key = self._normalize(ioc["value"], ioc["ioc_type"])
        ioc_hash = hashlib.sha256(key.encode()).hexdigest()[:16]

        if ioc_hash in self.seen:
            existing = self.seen[ioc_hash]
            existing["sources"].add(ioc.get("source", "unknown"))
            existing["source_count"] = len(existing["sources"])
            # Multi-source corroboration boosts confidence
            # Cap at 95 — never fully trust automated feeds
            existing["confidence"] = min(95,
                existing["confidence"] + (feed_confidence  0.3)
            )
            existing["last_seen"] = datetime.now(timezone.utc).isoformat()
            return existing

        record = {
            "hash": ioc_hash,
            "value": key,
            "ioc_type": ioc["ioc_type"],
            "confidence": feed_confidence,
            "sources": {ioc.get("source", "unknown")},
            "source_count": 1,
            "first_ingested": datetime.now(timezone.utc).isoformat(),
            "last_seen": datetime.now(timezone.utc).isoformat(),
            "tags": ioc.get("tags", []),
            "malware": ioc.get("malware"),
            "expires_at": (
                datetime.now(timezone.utc) + timedelta(days=30)
            ).isoformat(),
        }
        self.seen[ioc_hash] = record
        return record

Confidence Scoring Model

Use a tiered model rather than raw numbers:

| Confidence Range | Label | Action |
|—|—|—|
| 90-95 | Confirmed | Auto-block, create SIEM alert |
| 70-89 | High | SIEM alert, manual review queue |
| 50-69 | Medium | SIEM correlation only (no alert) |
| 30-49 | Low | Log enrichment only |
| <30 | Informational | Store but do not deploy |

Pushing to OpenCTI

OpenCTI provides the correlation layer. Use the GraphQL API to create indicators with proper STIX relationships:

import requests

OPENCTI_URL = "https://opencti.internal.example-corp.com"
OPENCTI_TOKEN = "your-api-token-here"  # Store in secrets manager

def create_opencti_indicator(ioc: dict) -> str:
    """Create or update indicator in OpenCTI."""
    query = """
    mutation CreateIndicator($input: IndicatorAddInput!) {
        indicatorAdd(input: $input) {
            id
            name
            pattern
        }
    }
    """

    # Build STIX pattern from IOC type
    pattern_map = {
        "ipv4-addr": f"[ipv4-addr:value = '{ioc['value']}']",
        "domain-name": f"[domain-name:value = '{ioc['value']}']",
        "url": f"[url:value = '{ioc['value']}']",
        "file-sha256": f"[file:hashes.'SHA-256' = '{ioc['value']}']",
    }

    variables = {
        "input": {
            "name": f"{ioc['ioc_type']}: {ioc['value']}",
            "pattern": pattern_map[ioc["ioc_type"]],
            "pattern_type": "stix",
            "x_opencti_main_observable_type": ioc["ioc_type"],
            "confidence": int(ioc["confidence"]),
            "valid_from": ioc["first_ingested"],
            "valid_until": ioc["expires_at"],
            "x_opencti_score": int(ioc["confidence"]),
            "objectLabel": ioc.get("tags", []),
        }
    }

    resp = requests.post(
        f"{OPENCTI_URL}/graphql",
        headers={"Authorization": f"Bearer {OPENCTI_TOKEN}"},
        json={"query": query, "variables": variables},
        timeout=30,
    )
    return resp.json()["data"]["indicatorAdd"]["id"]

Generating Wazuh CDB Lists

Wazuh uses CDB (Constant Database) lists for fast IOC lookups. Generate these from the deduplicated, scored IOC set:

def generate_wazuh_cdb(iocs: list[dict], min_confidence: int = 50) -> str:
    """Generate Wazuh CDB list from scored IOCs."""
    lines = []
    for ioc in sorted(iocs, key=lambda x: x["confidence"], reverse=True):
        if ioc["confidence"] < min_confidence:
            continue
        if ioc["ioc_type"] == "ipv4-addr":
            # CDB format: key:value
            lines.append(f"{ioc['value']}:threat_{ioc.get('malware', 'generic')}")
    return "n".join(lines)

def generate_wazuh_rules(rule_id_start: int = 100900) -> str:
    """Generate Wazuh rules that reference CDB lists."""
    return f"""
<group name="threat_intel,">
  <rule id="{rule_id_start}" level="12">
    <if_sid>5710</if_sid>
    <list field="srcip" lookup="address_match_key">etc/lists/threat_intel_ips</list>
    <description>Connection from known threat intel IP: $(srcip)</description>
    <group>threat_intel,network,</group>
  </rule>

  <rule id="{rule_id_start + 1}" level="14">
    <if_sid>31100,31101</if_sid>
    <list field="url" lookup="match_key">etc/lists/threat_intel_urls</list>
    <description>Access to known malicious URL detected</description>
    <group>threat_intel,web,</group>
  </rule>

  <rule id="{rule_id_start + 2}" level="10">
    <if_sid>5710</if_sid>
    <list field="dstip" lookup="address_match_key">etc/lists/threat_intel_c2</list>
    <description>Outbound connection to known C2 server: $(dstip)</description>
    <group>threat_intel,c2,</group>
  </rule>
</group>
"""

Deploy CDB lists to the Wazuh manager:

# Deploy updated CDB list
scp threat_intel_ips.cdb siem01.internal:/var/ossec/etc/lists/threat_intel_ips
ssh siem01.internal "chown wazuh:wazuh /var/ossec/etc/lists/threat_intel_ips"
ssh siem01.internal "/var/ossec/bin/wazuh-control restart"

Suricata Rule Generation

For network-level detection, generate Suricata rules from high-confidence IOCs:

def generate_suricata_rules(iocs: list[dict], sid_start: int = 9000001) -> str:
    """Generate Suricata rules from high-confidence IOCs."""
    rules = []
    sid = sid_start

    for ioc in iocs:
        if ioc["confidence"] < 70:
            continue

        if ioc["ioc_type"] == "ipv4-addr":
            rules.append(
                f'alert ip any any -> {ioc["value"]} any '
                f'(msg:"THREAT-INTEL Known malicious IP {ioc["value"]} '
                f'({ioc.get("malware", "unknown")})"; '
                f'classtype:trojan-activity; sid:{sid}; rev:1; '
                f'metadata:confidence {int(ioc["confidence"])}, '
                f'source {",".join(ioc["sources"])}, '
                f'expires {ioc["expires_at"][:10]};)'
            )
            sid += 1

        elif ioc["ioc_type"] == "domain-name":
            rules.append(
                f'alert dns any any -> any any '
                f'(msg:"THREAT-INTEL DNS query for malicious domain '
                f'{ioc["value"]}"; dns.query; '
                f'content:"{ioc["value"]}"; nocase; '
                f'classtype:trojan-activity; sid:{sid}; rev:1;)'
            )
            sid += 1

    return "n".join(rules)

IOC Expiration and False Positive Tuning

IOCs have a shelf life. A C2 IP from last month is likely reassigned or sinkholed. Build expiration into the pipeline:

def expire_stale_iocs(iocs: dict, grace_days: int = 7) -> tuple[list, list]:
    """Separate active from expired IOCs."""
    now = datetime.now(timezone.utc)
    active, expired = [], []

    for ioc in iocs.values():
        expires_at = datetime.fromisoformat(ioc["expires_at"])
        if now > expires_at + timedelta(days=grace_days):
            expired.append(ioc)
        else:
            active.append(ioc)

    return active, expired

For false positive management, maintain a suppression list that persists across pipeline runs:

{
  "suppressions": [
    {
      "value": "8.8.8.8",
      "reason": "Google Public DNS — false positive from blocklist.de",
      "suppressed_by": "[email protected]",
      "suppressed_at": "2026-03-15T14:30:00Z"
    },
    {
      "value": "cdn.example-corp.com",
      "reason": "Internal CDN flagged by ThreatFox hostfile",
      "suppressed_by": "[email protected]",
      "suppressed_at": "2026-03-20T09:00:00Z"
    }
  ]
}

Automation Schedule

Run the full pipeline on a cron schedule, with staggered feed fetches to avoid hammering upstream APIs:

# /etc/cron.d/threat-intel-pipeline
# Fetch feeds every 4 hours, staggered
0 /4     svc-threatintel  /opt/threat-intel/venv/bin/python /opt/threat-intel/fetch_feeds.py --feeds urlhaus,feodo
15 /4     svc-threatintel  /opt/threat-intel/venv/bin/python /opt/threat-intel/fetch_feeds.py --feeds otx,blocklist
# Deduplicate and score
30 /4     svc-threatintel  /opt/threat-intel/venv/bin/python /opt/threat-intel/deduplicate_and_score.py
# Push to OpenCTI
35 /4     svc-threatintel  /opt/threat-intel/venv/bin/python /opt/threat-intel/push_opencti.py
# Generate and deploy SIEM rules
45 /4     svc-threatintel  /opt/threat-intel/venv/bin/python /opt/threat-intel/deploy_rules.py
# Expire stale IOCs daily
0 2     svc-threatintel  /opt/threat-intel/venv/bin/python /opt/threat-intel/expire_iocs.py

This pipeline turns raw feeds into deployed detections in under an hour, with confidence scoring that prevents alert fatigue and expiration that prevents stale data. The key is treating threat intelligence as a data engineering problem — deduplication, normalization, scoring, and lifecycle management are more important than the volume of feeds you ingest.

Scroll to Top