Manual OSINT is slow. When you are investigating an incident and need to answer “what do we know about this IP address” or “where else does this domain appear,” you cannot afford to manually query twelve different services, copy-paste results into a spreadsheet, and try to correlate them by eye. You need a pipeline that accepts an indicator, enriches it from multiple sources in parallel, correlates the results, and delivers a structured report — in seconds, not hours.
This guide covers building a modular OSINT automation pipeline from scratch, including source integration, parallel enrichment, correlation scoring, and output to both human-readable reports and machine-readable STIX 2.1 bundles.
Pipeline Architecture
[Trigger] → [Classifier] → [Enrichment Workers] → [Correlator] → [Reporter]
| | | | |
Webhook Determine Parallel API Weighted Markdown
CLI input indicator lookups to confidence STIX 2.1
SIEM alert type (IP, 10+ sources scoring Matrix/Slack
domain, hash) OpenCTI
The trigger can be anything: a webhook from your SIEM, a Slack slash command, a CLI invocation, or an n8n workflow node. The classifier determines what type of indicator it is, which controls which enrichment sources are relevant. Enrichment workers run in parallel. The correlator merges results and calculates a threat score. The reporter formats output and delivers it.
Target Classification
Before querying sources, classify the input:
import re
import ipaddress
def classify_indicator(value: str) -> str:
"""Classify an indicator by type."""
value = value.strip().lower()
# IPv4/IPv6
try:
addr = ipaddress.ip_address(value)
return "ipv4" if addr.version == 4 else "ipv6"
except ValueError:
pass
# Email
if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$', value):
return "email"
# Hash (MD5, SHA1, SHA256)
if re.match(r'^[a-f0-9]{32}$', value):
return "md5"
if re.match(r'^[a-f0-9]{40}$', value):
return "sha1"
if re.match(r'^[a-f0-9]{64}$', value):
return "sha256"
# Domain (after ruling out hashes)
if re.match(r'^[a-zA-Z0-9][a-zA-Z0-9.-]+.[a-zA-Z]{2,}$', value):
return "domain"
# URL
if value.startswith(('http://', 'https://')):
return "url"
return "unknown"
Enrichment Sources
Each source is a standalone function that accepts an indicator and returns structured results. Here are the key sources with practical implementations:
Shodan InternetDB (Free, No API Key)
import httpx
async def enrich_shodan_internetdb(ip: str) -> dict:
"""Query Shodan InternetDB — free, no API key required."""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(f"https://internetdb.shodan.io/{ip}")
if resp.status_code == 200:
data = resp.json()
return {
"source": "shodan_internetdb",
"ports": data.get("ports", []),
"hostnames": data.get("hostnames", []),
"cpes": data.get("cpes", []),
"vulns": data.get("vulns", []),
"tags": data.get("tags", []),
"confidence": 0.7,
"malicious": len(data.get("vulns", [])) > 0
}
return {"source": "shodan_internetdb", "error": f"HTTP {resp.status_code}"}
VirusTotal
async def enrich_virustotal(indicator: str, indicator_type: str, api_key: str) -> dict:
"""Query VirusTotal API v3."""
type_map = {
"ipv4": f"ip_addresses/{indicator}",
"domain": f"domains/{indicator}",
"sha256": f"files/{indicator}",
"url": f"urls/{base64.urlsafe_b64encode(indicator.encode()).decode().rstrip('=')}"
}
endpoint = type_map.get(indicator_type)
if not endpoint:
return {"source": "virustotal", "error": f"Unsupported type: {indicator_type}"}
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(
f"https://www.virustotal.com/api/v3/{endpoint}",
headers={"x-apikey": api_key}
)
if resp.status_code == 200:
data = resp.json().get("data", {}).get("attributes", {})
stats = data.get("last_analysis_stats", {})
malicious_count = stats.get("malicious", 0)
total = sum(stats.values()) if stats else 1
return {
"source": "virustotal",
"malicious_detections": malicious_count,
"total_engines": total,
"detection_ratio": malicious_count / max(total, 1),
"reputation": data.get("reputation", 0),
"confidence": 0.9,
"malicious": malicious_count > 3
}
return {"source": "virustotal", "error": f"HTTP {resp.status_code}"}
AbuseIPDB
async def enrich_abuseipdb(ip: str, api_key: str) -> dict:
"""Query AbuseIPDB for IP reputation."""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
"https://api.abuseipdb.com/api/v2/check",
params={"ipAddress": ip, "maxAgeInDays": 90, "verbose": True},
headers={"Key": api_key, "Accept": "application/json"}
)
if resp.status_code == 200:
data = resp.json().get("data", {})
return {
"source": "abuseipdb",
"abuse_score": data.get("abuseConfidenceScore", 0),
"total_reports": data.get("totalReports", 0),
"country": data.get("countryCode", ""),
"isp": data.get("isp", ""),
"usage_type": data.get("usageType", ""),
"confidence": 0.85,
"malicious": data.get("abuseConfidenceScore", 0) > 50
}
return {"source": "abuseipdb", "error": f"HTTP {resp.status_code}"}
Certificate Transparency (crt.sh)
async def enrich_crtsh(domain: str) -> dict:
"""Query crt.sh for certificate transparency logs."""
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(
f"https://crt.sh/?q=%.{domain}&output=json"
)
if resp.status_code == 200:
certs = resp.json()
unique_names = set()
for cert in certs:
name = cert.get("name_value", "")
for n in name.split("n"):
unique_names.add(n.strip())
return {
"source": "crtsh",
"total_certificates": len(certs),
"unique_subdomains": sorted(unique_names),
"subdomain_count": len(unique_names),
"confidence": 0.8,
"malicious": False
}
return {"source": "crtsh", "error": f"HTTP {resp.status_code}"}
GreyNoise Community
async def enrich_greynoise(ip: str, api_key: str) -> dict:
"""Query GreyNoise Community API."""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"https://api.greynoise.io/v3/community/{ip}",
headers={"key": api_key}
)
if resp.status_code == 200:
data = resp.json()
return {
"source": "greynoise",
"classification": data.get("classification", "unknown"),
"noise": data.get("noise", False),
"riot": data.get("riot", False),
"name": data.get("name", ""),
"confidence": 0.75,
"malicious": data.get("classification") == "malicious"
}
return {"source": "greynoise", "error": f"HTTP {resp.status_code}"}
Parallel Enrichment with Rate Limiting
Run all enrichments in parallel with per-source rate limiting:
import asyncio
from collections import defaultdict
import time
class RateLimiter:
"""Per-source rate limiter."""
def __init__(self):
self.limits = defaultdict(lambda: {"calls": 0, "reset": time.time() + 60})
async def acquire(self, source: str, max_per_minute: int):
while True:
now = time.time()
state = self.limits[source]
if now > state["reset"]:
state["calls"] = 0
state["reset"] = now + 60
if state["calls"] < max_per_minute:
state["calls"] += 1
return
await asyncio.sleep(state["reset"] - now)
rate_limiter = RateLimiter()
async def enrich_indicator(indicator: str, indicator_type: str, config: dict) -> list:
"""Run all applicable enrichments in parallel."""
tasks = []
if indicator_type in ("ipv4", "ipv6"):
tasks.append(enrich_shodan_internetdb(indicator))
tasks.append(enrich_virustotal(indicator, indicator_type, config["vt_key"]))
tasks.append(enrich_abuseipdb(indicator, config["abuseipdb_key"]))
tasks.append(enrich_greynoise(indicator, config["greynoise_key"]))
elif indicator_type == "domain":
tasks.append(enrich_virustotal(indicator, indicator_type, config["vt_key"]))
tasks.append(enrich_crtsh(indicator))
elif indicator_type in ("md5", "sha1", "sha256"):
tasks.append(enrich_virustotal(indicator, indicator_type, config["vt_key"]))
results = await asyncio.gather(tasks, return_exceptions=True)
enrichments = []
for r in results:
if isinstance(r, Exception):
enrichments.append({"source": "unknown", "error": str(r)})
else:
enrichments.append(r)
return enrichments
Correlation and Scoring
The scoring model weights results from different sources based on their reliability:
def calculate_threat_score(enrichments: list) -> dict:
"""
Calculate a weighted threat score from multiple enrichment results.
Each source provides a 'confidence' (how much we trust this source)
and 'malicious' (whether this source considers the indicator bad).
Score = sum(confidence malicious_weight) / sum(confidence)
Normalized to 0-100.
"""
total_weight = 0
weighted_score = 0
verdicts = []
for e in enrichments:
if "error" in e:
continue
confidence = e.get("confidence", 0.5)
is_malicious = e.get("malicious", False)
# Source-specific scoring adjustments
source = e.get("source", "")
if source == "virustotal":
ratio = e.get("detection_ratio", 0)
source_score = min(ratio 2, 1.0) # Scale: 50%+ detection = max
elif source == "abuseipdb":
source_score = e.get("abuse_score", 0) / 100
elif source == "greynoise":
if e.get("riot"):
source_score = 0 # Known benign service
confidence = 0.9 # High confidence in benign verdict
elif is_malicious:
source_score = 0.9
else:
source_score = 0.2
else:
source_score = 1.0 if is_malicious else 0.0
weighted_score += confidence source_score
total_weight += confidence
verdicts.append({
"source": source,
"verdict": "malicious" if source_score > 0.5 else "benign",
"score": round(source_score 100),
"confidence": round(confidence 100)
})
final_score = (weighted_score / max(total_weight, 0.01)) * 100
# Classification
if final_score >= 75:
classification = "HIGH_THREAT"
elif final_score >= 40:
classification = "SUSPICIOUS"
elif final_score >= 15:
classification = "LOW_RISK"
else:
classification = "BENIGN"
return {
"score": round(final_score, 1),
"classification": classification,
"verdicts": verdicts,
"sources_queried": len([e for e in enrichments if "error" not in e]),
"sources_failed": len([e for e in enrichments if "error" in e])
}
STIX 2.1 Output for Threat Intel Platforms
Generate STIX 2.1 bundles for feeding back into OpenCTI or other threat intel platforms:
from datetime import datetime, timezone
import uuid
import json
def generate_stix_indicator(indicator: str, indicator_type: str,
score: dict, enrichments: list) -> dict:
"""Generate a STIX 2.1 bundle from enrichment results."""
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
pattern_map = {
"ipv4": f"[ipv4-addr:value = '{indicator}']",
"ipv6": f"[ipv6-addr:value = '{indicator}']",
"domain": f"[domain-name:value = '{indicator}']",
"sha256": f"[file:hashes.'SHA-256' = '{indicator}']",
"md5": f"[file:hashes.MD5 = '{indicator}']",
"url": f"[url:value = '{indicator}']",
}
indicator_id = f"indicator--{uuid.uuid4()}"
stix_indicator = {
"type": "indicator",
"spec_version": "2.1",
"id": indicator_id,
"created": now,
"modified": now,
"name": f"OSINT enrichment: {indicator}",
"description": f"Automated OSINT enrichment. Score: {score['score']}/100 ({score['classification']}). Sources: {score['sources_queried']} queried, {score['sources_failed']} failed.",
"pattern": pattern_map.get(indicator_type, f"[artifact:payload_bin = '{indicator}']"),
"pattern_type": "stix",
"valid_from": now,
"confidence": min(int(score["score"]), 100),
"labels": [score["classification"].lower().replace("_", "-")],
"external_references": [
{"source_name": v["source"], "description": f"Verdict: {v['verdict']} (score: {v['score']})"}
for v in score.get("verdicts", [])
]
}
bundle = {
"type": "bundle",
"id": f"bundle--{uuid.uuid4()}",
"objects": [stix_indicator]
}
return bundle
Delivery: Reports and Notifications
Markdown Report
def generate_markdown_report(indicator: str, indicator_type: str,
score: dict, enrichments: list) -> str:
"""Generate a human-readable markdown report."""
lines = [
f"# OSINT Report: {indicator}",
f"Type: {indicator_type} | Score: {score['score']}/100 | Classification: {score['classification']}",
f"Sources: {score['sources_queried']} queried, {score['sources_failed']} failed",
"",
"## Source Verdicts",
"| Source | Verdict | Score | Confidence |",
"|--------|---------|-------|------------|",
]
for v in score.get("verdicts", []):
lines.append(f"| {v['source']} | {v['verdict']} | {v['score']}% | {v['confidence']}% |")
lines.append("")
lines.append("## Raw Enrichment Data")
for e in enrichments:
source = e.get("source", "unknown")
lines.append(f"### {source}")
if "error" in e:
lines.append(f"Error: {e['error']}")
else:
for k, v in e.items():
if k not in ("source", "confidence", "malicious"):
lines.append(f"- {k}: {v}")
lines.append("")
return "n".join(lines)
Matrix/Slack Notification
async def send_to_matrix(report: str, room_id: str, homeserver: str, token: str):
"""Send report to a Matrix room."""
async with httpx.AsyncClient() as client:
await client.put(
f"{homeserver}/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{uuid.uuid4()}",
headers={"Authorization": f"Bearer {token}"},
json={
"msgtype": "m.text",
"body": report,
"format": "org.matrix.custom.html",
"formatted_body": f"<pre>{report}</pre>"
}
)
n8n Workflow Integration
For teams using n8n for workflow automation, the pipeline maps naturally to an n8n workflow:
- Webhook Trigger — receives
{"indicator": "203.0.113.50"}via POST - Function Node — classifies the indicator type
- Split In Batches — fans out to multiple HTTP Request nodes (one per source)
- Merge Node — collects all enrichment responses
- Function Node — runs correlation scoring logic
- IF Node — routes based on score (HIGH_THREAT goes to incident channel, LOW_RISK goes to log)
- HTTP Request — POST STIX bundle to OpenCTI’s GraphQL API
- Matrix/Slack Node — send formatted report to the SOC channel
The key advantage of n8n is visual debugging: when a source fails or returns unexpected data, you can inspect every node’s input and output without reading logs.
Caching
Cache results to avoid redundant lookups and respect API rate limits:
import hashlib
import json
from datetime import datetime, timedelta
class ResultCache:
"""Simple TTL cache for enrichment results."""
def __init__(self, ttl_minutes=60):
self.cache = {}
self.ttl = timedelta(minutes=ttl_minutes)
def _key(self, source: str, indicator: str) -> str:
return hashlib.sha256(f"{source}:{indicator}".encode()).hexdigest()
def get(self, source: str, indicator: str) -> dict | None:
key = self._key(source, indicator)
entry = self.cache.get(key)
if entry and datetime.now() - entry["time"] < self.ttl:
return entry["data"]
return None
def set(self, source: str, indicator: str, data: dict):
key = self._key(source, indicator)
self.cache[key] = {"data": data, "time": datetime.now()}
Set different TTLs per source: VirusTotal results are stable for hours, while AbuseIPDB scores change as new reports come in. A 60-minute default TTL is a reasonable starting point.
Putting It Together
The complete pipeline, from trigger to delivery, should execute in under 10 seconds for a single indicator. The bottleneck is always the slowest API response. Use aggressive timeouts (10-15 seconds per source) and do not let one slow source block the entire pipeline.
Build this incrementally: start with three sources (Shodan InternetDB, VirusTotal, AbuseIPDB), get the scoring model working, and add sources over time. Each new source improves correlation confidence without changing the pipeline architecture. The modular design means you can swap sources, adjust weights, and add new output formats without rewriting the core logic.
