Building OSINT Automation Pipelines for Threat Research and Incident Response

Manual OSINT is slow. When you are investigating an incident and need to answer “what do we know about this IP address” or “where else does this domain appear,” you cannot afford to manually query twelve different services, copy-paste results into a spreadsheet, and try to correlate them by eye. You need a pipeline that accepts an indicator, enriches it from multiple sources in parallel, correlates the results, and delivers a structured report — in seconds, not hours.

This guide covers building a modular OSINT automation pipeline from scratch, including source integration, parallel enrichment, correlation scoring, and output to both human-readable reports and machine-readable STIX 2.1 bundles.

Pipeline Architecture

[Trigger] → [Classifier] → [Enrichment Workers] → [Correlator] → [Reporter]
    |              |                |                     |              |
 Webhook      Determine        Parallel API          Weighted       Markdown
 CLI input    indicator         lookups to           confidence     STIX 2.1
 SIEM alert   type (IP,        10+ sources          scoring        Matrix/Slack
              domain, hash)                                         OpenCTI

The trigger can be anything: a webhook from your SIEM, a Slack slash command, a CLI invocation, or an n8n workflow node. The classifier determines what type of indicator it is, which controls which enrichment sources are relevant. Enrichment workers run in parallel. The correlator merges results and calculates a threat score. The reporter formats output and delivers it.

Target Classification

Before querying sources, classify the input:

import re
import ipaddress

def classify_indicator(value: str) -> str:
    """Classify an indicator by type."""
    value = value.strip().lower()

    # IPv4/IPv6
    try:
        addr = ipaddress.ip_address(value)
        return "ipv4" if addr.version == 4 else "ipv6"
    except ValueError:
        pass

    # Email
    if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$', value):
        return "email"

    # Hash (MD5, SHA1, SHA256)
    if re.match(r'^[a-f0-9]{32}$', value):
        return "md5"
    if re.match(r'^[a-f0-9]{40}$', value):
        return "sha1"
    if re.match(r'^[a-f0-9]{64}$', value):
        return "sha256"

    # Domain (after ruling out hashes)
    if re.match(r'^[a-zA-Z0-9][a-zA-Z0-9.-]+.[a-zA-Z]{2,}$', value):
        return "domain"

    # URL
    if value.startswith(('http://', 'https://')):
        return "url"

    return "unknown"

Enrichment Sources

Each source is a standalone function that accepts an indicator and returns structured results. Here are the key sources with practical implementations:

Shodan InternetDB (Free, No API Key)

import httpx

async def enrich_shodan_internetdb(ip: str) -> dict:
    """Query Shodan InternetDB — free, no API key required."""
    async with httpx.AsyncClient(timeout=10) as client:
        resp = await client.get(f"https://internetdb.shodan.io/{ip}")
        if resp.status_code == 200:
            data = resp.json()
            return {
                "source": "shodan_internetdb",
                "ports": data.get("ports", []),
                "hostnames": data.get("hostnames", []),
                "cpes": data.get("cpes", []),
                "vulns": data.get("vulns", []),
                "tags": data.get("tags", []),
                "confidence": 0.7,
                "malicious": len(data.get("vulns", [])) > 0
            }
        return {"source": "shodan_internetdb", "error": f"HTTP {resp.status_code}"}

VirusTotal

async def enrich_virustotal(indicator: str, indicator_type: str, api_key: str) -> dict:
    """Query VirusTotal API v3."""
    type_map = {
        "ipv4": f"ip_addresses/{indicator}",
        "domain": f"domains/{indicator}",
        "sha256": f"files/{indicator}",
        "url": f"urls/{base64.urlsafe_b64encode(indicator.encode()).decode().rstrip('=')}"
    }

    endpoint = type_map.get(indicator_type)
    if not endpoint:
        return {"source": "virustotal", "error": f"Unsupported type: {indicator_type}"}

    async with httpx.AsyncClient(timeout=15) as client:
        resp = await client.get(
            f"https://www.virustotal.com/api/v3/{endpoint}",
            headers={"x-apikey": api_key}
        )
        if resp.status_code == 200:
            data = resp.json().get("data", {}).get("attributes", {})
            stats = data.get("last_analysis_stats", {})
            malicious_count = stats.get("malicious", 0)
            total = sum(stats.values()) if stats else 1

            return {
                "source": "virustotal",
                "malicious_detections": malicious_count,
                "total_engines": total,
                "detection_ratio": malicious_count / max(total, 1),
                "reputation": data.get("reputation", 0),
                "confidence": 0.9,
                "malicious": malicious_count > 3
            }
        return {"source": "virustotal", "error": f"HTTP {resp.status_code}"}

AbuseIPDB

async def enrich_abuseipdb(ip: str, api_key: str) -> dict:
    """Query AbuseIPDB for IP reputation."""
    async with httpx.AsyncClient(timeout=10) as client:
        resp = await client.get(
            "https://api.abuseipdb.com/api/v2/check",
            params={"ipAddress": ip, "maxAgeInDays": 90, "verbose": True},
            headers={"Key": api_key, "Accept": "application/json"}
        )
        if resp.status_code == 200:
            data = resp.json().get("data", {})
            return {
                "source": "abuseipdb",
                "abuse_score": data.get("abuseConfidenceScore", 0),
                "total_reports": data.get("totalReports", 0),
                "country": data.get("countryCode", ""),
                "isp": data.get("isp", ""),
                "usage_type": data.get("usageType", ""),
                "confidence": 0.85,
                "malicious": data.get("abuseConfidenceScore", 0) > 50
            }
        return {"source": "abuseipdb", "error": f"HTTP {resp.status_code}"}

Certificate Transparency (crt.sh)

async def enrich_crtsh(domain: str) -> dict:
    """Query crt.sh for certificate transparency logs."""
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await client.get(
            f"https://crt.sh/?q=%.{domain}&output=json"
        )
        if resp.status_code == 200:
            certs = resp.json()
            unique_names = set()
            for cert in certs:
                name = cert.get("name_value", "")
                for n in name.split("n"):
                    unique_names.add(n.strip())
            return {
                "source": "crtsh",
                "total_certificates": len(certs),
                "unique_subdomains": sorted(unique_names),
                "subdomain_count": len(unique_names),
                "confidence": 0.8,
                "malicious": False
            }
        return {"source": "crtsh", "error": f"HTTP {resp.status_code}"}

GreyNoise Community

async def enrich_greynoise(ip: str, api_key: str) -> dict:
    """Query GreyNoise Community API."""
    async with httpx.AsyncClient(timeout=10) as client:
        resp = await client.get(
            f"https://api.greynoise.io/v3/community/{ip}",
            headers={"key": api_key}
        )
        if resp.status_code == 200:
            data = resp.json()
            return {
                "source": "greynoise",
                "classification": data.get("classification", "unknown"),
                "noise": data.get("noise", False),
                "riot": data.get("riot", False),
                "name": data.get("name", ""),
                "confidence": 0.75,
                "malicious": data.get("classification") == "malicious"
            }
        return {"source": "greynoise", "error": f"HTTP {resp.status_code}"}

Parallel Enrichment with Rate Limiting

Run all enrichments in parallel with per-source rate limiting:

import asyncio
from collections import defaultdict
import time

class RateLimiter:
    """Per-source rate limiter."""
    def __init__(self):
        self.limits = defaultdict(lambda: {"calls": 0, "reset": time.time() + 60})

    async def acquire(self, source: str, max_per_minute: int):
        while True:
            now = time.time()
            state = self.limits[source]
            if now > state["reset"]:
                state["calls"] = 0
                state["reset"] = now + 60
            if state["calls"] < max_per_minute:
                state["calls"] += 1
                return
            await asyncio.sleep(state["reset"] - now)

rate_limiter = RateLimiter()

async def enrich_indicator(indicator: str, indicator_type: str, config: dict) -> list:
    """Run all applicable enrichments in parallel."""
    tasks = []

    if indicator_type in ("ipv4", "ipv6"):
        tasks.append(enrich_shodan_internetdb(indicator))
        tasks.append(enrich_virustotal(indicator, indicator_type, config["vt_key"]))
        tasks.append(enrich_abuseipdb(indicator, config["abuseipdb_key"]))
        tasks.append(enrich_greynoise(indicator, config["greynoise_key"]))

    elif indicator_type == "domain":
        tasks.append(enrich_virustotal(indicator, indicator_type, config["vt_key"]))
        tasks.append(enrich_crtsh(indicator))

    elif indicator_type in ("md5", "sha1", "sha256"):
        tasks.append(enrich_virustotal(indicator, indicator_type, config["vt_key"]))

    results = await asyncio.gather(tasks, return_exceptions=True)

    enrichments = []
    for r in results:
        if isinstance(r, Exception):
            enrichments.append({"source": "unknown", "error": str(r)})
        else:
            enrichments.append(r)

    return enrichments

Correlation and Scoring

The scoring model weights results from different sources based on their reliability:

def calculate_threat_score(enrichments: list) -> dict:
    """
    Calculate a weighted threat score from multiple enrichment results.

    Each source provides a 'confidence' (how much we trust this source)
    and 'malicious' (whether this source considers the indicator bad).

    Score = sum(confidence  malicious_weight) / sum(confidence)
    Normalized to 0-100.
    """
    total_weight = 0
    weighted_score = 0
    verdicts = []

    for e in enrichments:
        if "error" in e:
            continue

        confidence = e.get("confidence", 0.5)
        is_malicious = e.get("malicious", False)

        # Source-specific scoring adjustments
        source = e.get("source", "")

        if source == "virustotal":
            ratio = e.get("detection_ratio", 0)
            source_score = min(ratio  2, 1.0)  # Scale: 50%+ detection = max
        elif source == "abuseipdb":
            source_score = e.get("abuse_score", 0) / 100
        elif source == "greynoise":
            if e.get("riot"):
                source_score = 0  # Known benign service
                confidence = 0.9  # High confidence in benign verdict
            elif is_malicious:
                source_score = 0.9
            else:
                source_score = 0.2
        else:
            source_score = 1.0 if is_malicious else 0.0

        weighted_score += confidence  source_score
        total_weight += confidence
        verdicts.append({
            "source": source,
            "verdict": "malicious" if source_score > 0.5 else "benign",
            "score": round(source_score  100),
            "confidence": round(confidence  100)
        })

    final_score = (weighted_score / max(total_weight, 0.01)) * 100

    # Classification
    if final_score >= 75:
        classification = "HIGH_THREAT"
    elif final_score >= 40:
        classification = "SUSPICIOUS"
    elif final_score >= 15:
        classification = "LOW_RISK"
    else:
        classification = "BENIGN"

    return {
        "score": round(final_score, 1),
        "classification": classification,
        "verdicts": verdicts,
        "sources_queried": len([e for e in enrichments if "error" not in e]),
        "sources_failed": len([e for e in enrichments if "error" in e])
    }

STIX 2.1 Output for Threat Intel Platforms

Generate STIX 2.1 bundles for feeding back into OpenCTI or other threat intel platforms:

from datetime import datetime, timezone
import uuid
import json

def generate_stix_indicator(indicator: str, indicator_type: str,
                            score: dict, enrichments: list) -> dict:
    """Generate a STIX 2.1 bundle from enrichment results."""
    now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")

    pattern_map = {
        "ipv4": f"[ipv4-addr:value = '{indicator}']",
        "ipv6": f"[ipv6-addr:value = '{indicator}']",
        "domain": f"[domain-name:value = '{indicator}']",
        "sha256": f"[file:hashes.'SHA-256' = '{indicator}']",
        "md5": f"[file:hashes.MD5 = '{indicator}']",
        "url": f"[url:value = '{indicator}']",
    }

    indicator_id = f"indicator--{uuid.uuid4()}"

    stix_indicator = {
        "type": "indicator",
        "spec_version": "2.1",
        "id": indicator_id,
        "created": now,
        "modified": now,
        "name": f"OSINT enrichment: {indicator}",
        "description": f"Automated OSINT enrichment. Score: {score['score']}/100 ({score['classification']}). Sources: {score['sources_queried']} queried, {score['sources_failed']} failed.",
        "pattern": pattern_map.get(indicator_type, f"[artifact:payload_bin = '{indicator}']"),
        "pattern_type": "stix",
        "valid_from": now,
        "confidence": min(int(score["score"]), 100),
        "labels": [score["classification"].lower().replace("_", "-")],
        "external_references": [
            {"source_name": v["source"], "description": f"Verdict: {v['verdict']} (score: {v['score']})"}
            for v in score.get("verdicts", [])
        ]
    }

    bundle = {
        "type": "bundle",
        "id": f"bundle--{uuid.uuid4()}",
        "objects": [stix_indicator]
    }

    return bundle

Delivery: Reports and Notifications

Markdown Report

def generate_markdown_report(indicator: str, indicator_type: str,
                              score: dict, enrichments: list) -> str:
    """Generate a human-readable markdown report."""
    lines = [
        f"# OSINT Report: {indicator}",
        f"Type: {indicator_type} | Score: {score['score']}/100 | Classification: {score['classification']}",
        f"Sources: {score['sources_queried']} queried, {score['sources_failed']} failed",
        "",
        "## Source Verdicts",
        "| Source | Verdict | Score | Confidence |",
        "|--------|---------|-------|------------|",
    ]
    for v in score.get("verdicts", []):
        lines.append(f"| {v['source']} | {v['verdict']} | {v['score']}% | {v['confidence']}% |")

    lines.append("")
    lines.append("## Raw Enrichment Data")
    for e in enrichments:
        source = e.get("source", "unknown")
        lines.append(f"### {source}")
        if "error" in e:
            lines.append(f"Error: {e['error']}")
        else:
            for k, v in e.items():
                if k not in ("source", "confidence", "malicious"):
                    lines.append(f"- {k}: {v}")
        lines.append("")

    return "n".join(lines)

Matrix/Slack Notification

async def send_to_matrix(report: str, room_id: str, homeserver: str, token: str):
    """Send report to a Matrix room."""
    async with httpx.AsyncClient() as client:
        await client.put(
            f"{homeserver}/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{uuid.uuid4()}",
            headers={"Authorization": f"Bearer {token}"},
            json={
                "msgtype": "m.text",
                "body": report,
                "format": "org.matrix.custom.html",
                "formatted_body": f"<pre>{report}</pre>"
            }
        )

n8n Workflow Integration

For teams using n8n for workflow automation, the pipeline maps naturally to an n8n workflow:

  1. Webhook Trigger — receives {"indicator": "203.0.113.50"} via POST
  2. Function Node — classifies the indicator type
  3. Split In Batches — fans out to multiple HTTP Request nodes (one per source)
  4. Merge Node — collects all enrichment responses
  5. Function Node — runs correlation scoring logic
  6. IF Node — routes based on score (HIGH_THREAT goes to incident channel, LOW_RISK goes to log)
  7. HTTP Request — POST STIX bundle to OpenCTI’s GraphQL API
  8. Matrix/Slack Node — send formatted report to the SOC channel

The key advantage of n8n is visual debugging: when a source fails or returns unexpected data, you can inspect every node’s input and output without reading logs.

Caching

Cache results to avoid redundant lookups and respect API rate limits:

import hashlib
import json
from datetime import datetime, timedelta

class ResultCache:
    """Simple TTL cache for enrichment results."""
    def __init__(self, ttl_minutes=60):
        self.cache = {}
        self.ttl = timedelta(minutes=ttl_minutes)

    def _key(self, source: str, indicator: str) -> str:
        return hashlib.sha256(f"{source}:{indicator}".encode()).hexdigest()

    def get(self, source: str, indicator: str) -> dict | None:
        key = self._key(source, indicator)
        entry = self.cache.get(key)
        if entry and datetime.now() - entry["time"] < self.ttl:
            return entry["data"]
        return None

    def set(self, source: str, indicator: str, data: dict):
        key = self._key(source, indicator)
        self.cache[key] = {"data": data, "time": datetime.now()}

Set different TTLs per source: VirusTotal results are stable for hours, while AbuseIPDB scores change as new reports come in. A 60-minute default TTL is a reasonable starting point.

Putting It Together

The complete pipeline, from trigger to delivery, should execute in under 10 seconds for a single indicator. The bottleneck is always the slowest API response. Use aggressive timeouts (10-15 seconds per source) and do not let one slow source block the entire pipeline.

Build this incrementally: start with three sources (Shodan InternetDB, VirusTotal, AbuseIPDB), get the scoring model working, and add sources over time. Each new source improves correlation confidence without changing the pipeline architecture. The modular design means you can swap sources, adjust weights, and add new output formats without rewriting the core logic.

Scroll to Top