The gap between detecting a security incident and containing it is where damage accumulates. Every minute an attacker has post-detection is a minute they are exfiltrating data, establishing persistence, or moving laterally. Manual incident response processes that depend on an analyst reading an email, logging into three consoles, and copy-pasting IOCs between tools cannot close this gap fast enough.
TheHive and Cortex provide the foundation for automated incident response — from alert ingestion through enrichment to containment — with full audit trails and human oversight at critical decision points. This guide covers building that pipeline end to end.
Architecture: The Automated IR Pipeline
The pipeline flows through four stages:
SIEM Alert → TheHive Case → Cortex Enrichment → Response Playbook → Containment
↑ |
└──────────── False Positive Feedback ───────────────┘
Each stage is automated but includes breakpoints where analyst review is required before destructive actions execute. Full automation of containment is possible but should only be enabled for high-confidence, well-tested playbooks.
Deploying TheHive and Cortex
A production deployment uses dedicated backend storage. TheHive uses Cassandra for case data and Elasticsearch for indexing. Cortex shares the Elasticsearch instance:
# docker-compose.yml
version: "3.8"
services:
cassandra:
image: cassandra:4.1
volumes:
- cassandra_data:/var/lib/cassandra
environment:
- CASSANDRA_CLUSTER_NAME=thehive
- MAX_HEAP_SIZE=1024M
- HEAP_NEWSIZE=256M
healthcheck:
test: ["CMD", "cqlsh", "-e", "describe cluster"]
interval: 30s
timeout: 10s
retries: 5
elasticsearch:
image: elasticsearch:7.17.18
volumes:
- es_data:/usr/share/elasticsearch/data
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
nofile:
soft: 65535
hard: 65535
thehive:
image: strangebee/thehive:5.4
depends_on:
cassandra:
condition: service_healthy
elasticsearch:
condition: service_started
ports:
- "9000:9000"
volumes:
- thehive_data:/opt/thp/thehive/data
- ./thehive/application.conf:/etc/thehive/application.conf
environment:
- JVM_OPTS=-Xms512m -Xmx1024m
cortex:
image: thehiveproject/cortex:3.1
depends_on:
- elasticsearch
ports:
- "9001:9001"
volumes:
- ./cortex/application.conf:/etc/cortex/application.conf
- /var/run/docker.sock:/var/run/docker.sock # For dockerized analyzers
volumes:
cassandra_data:
es_data:
thehive_data:
TheHive Configuration
# thehive/application.conf
db.janusgraph {
storage {
backend = cql
hostname = ["cassandra"]
cql {
cluster-name = thehive
keyspace = thehive
}
}
index.search {
backend = elasticsearch
hostname = ["elasticsearch"]
index-name = thehive
}
}
storage {
provider = localfs
localfs.location = /opt/thp/thehive/data
}
play.http.secret.key = "your-secret-key-change-this"
SIEM Integration: Alert to Case
The first automation connects your SIEM to TheHive. When a SIEM rule fires, it creates a TheHive alert via the API:
import requests
from datetime import datetime
class TheHiveAlertForwarder:
"""Forward SIEM alerts to TheHive as structured alerts."""
def __init__(self, thehive_url: str, api_key: str):
self.url = f"{thehive_url}/api/v1"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def create_alert(self, siem_alert: dict) -> dict:
alert = {
"type": siem_alert.get("rule_group", "SIEM"),
"source": "SIEM",
"sourceRef": siem_alert["alert_id"],
"title": siem_alert["title"],
"description": self._build_description(siem_alert),
"severity": self._map_severity(siem_alert["severity"]),
"tags": self._extract_tags(siem_alert),
"observables": self._extract_observables(siem_alert)
}
resp = requests.post(
f"{self.url}/alert",
json=alert,
headers=self.headers
)
resp.raise_for_status()
return resp.json()
def _extract_observables(self, alert: dict) -> list:
observables = []
if alert.get("src_ip"):
observables.append({
"dataType": "ip",
"data": alert["src_ip"],
"message": "Source IP from SIEM alert",
"tags": ["src", "siem"]
})
if alert.get("dest_ip"):
observables.append({
"dataType": "ip",
"data": alert["dest_ip"],
"message": "Destination IP from SIEM alert",
"tags": ["dst", "siem"]
})
for domain in alert.get("domains", []):
observables.append({
"dataType": "domain",
"data": domain,
"message": "Domain extracted from SIEM alert",
"tags": ["siem"]
})
for hash_val in alert.get("file_hashes", []):
observables.append({
"dataType": "hash",
"data": hash_val,
"message": "File hash from SIEM alert",
"tags": ["siem"]
})
return observables
def _map_severity(self, siem_severity: str) -> int:
mapping = {"low": 1, "medium": 2, "high": 3, "critical": 4}
return mapping.get(siem_severity.lower(), 2)
Cortex Analyzers: Automated Enrichment
Once an alert arrives in TheHive, Cortex analyzers automatically enrich every observable. Configure analyzers for your threat intel sources:
class CortexEnrichmentPipeline:
"""Trigger Cortex analyzers on new TheHive observables."""
def __init__(self, cortex_url: str, api_key: str):
self.url = f"{cortex_url}/api"
self.headers = {"Authorization": f"Bearer {api_key}"}
# Map observable types to relevant analyzers
ANALYZER_MAP = {
"ip": [
"AbuseIPDB_1_0",
"OTXQuery_2_0",
"Shodan_DNSResolve_2_0",
"VirusTotal_GetReport_3_1"
],
"domain": [
"DomainTools_ReverseIP_1_0",
"OTXQuery_2_0",
"VirusTotal_GetReport_3_1",
"URLhaus_2_0"
],
"hash": [
"VirusTotal_GetReport_3_1",
"MalwareBazaar_1_0",
"HybridAnalysis_GetReport_1_0"
]
}
def enrich_observable(self, observable: dict) -> list:
data_type = observable["dataType"]
analyzers = self.ANALYZER_MAP.get(data_type, [])
jobs = []
for analyzer in analyzers:
job = self._run_analyzer(analyzer, observable)
if job:
jobs.append(job)
return jobs
def _run_analyzer(self, analyzer_name: str, observable: dict) -> dict:
payload = {
"data": observable["data"],
"dataType": observable["dataType"],
"tlp": 2, # TLP:AMBER by default
"message": f"Auto-enrichment for case observable"
}
resp = requests.post(
f"{self.url}/analyzer/{analyzer_name}/run",
json=payload,
headers=self.headers
)
if resp.status_code == 200:
return resp.json()
return None
Response Playbooks: From Enrichment to Containment
Playbooks define automated response actions based on enrichment results. The key design pattern is a threshold-based decision tree with mandatory human approval for destructive actions:
class ResponsePlaybook:
"""Base class for automated response playbooks."""
def __init__(self, thehive_client, containment_apis: dict):
self.thehive = thehive_client
self.firewall = containment_apis.get("firewall")
self.edr = containment_apis.get("edr")
self.iam = containment_apis.get("iam")
def evaluate(self, case_id: str, enrichment_results: list) -> dict:
"""Evaluate enrichment and determine response actions."""
raise NotImplementedError
class MaliciousIPPlaybook(ResponsePlaybook):
"""Respond to confirmed malicious IP communication."""
MALICIOUS_THRESHOLD = 3 # Sources confirming malicious
AUTO_BLOCK_THRESHOLD = 5 # Auto-block without human approval
def evaluate(self, case_id: str, enrichment_results: list) -> dict:
malicious_count = sum(
1 for r in enrichment_results
if r.get("report", {}).get("malicious", False)
)
actions = []
if malicious_count >= self.AUTO_BLOCK_THRESHOLD:
# High confidence — auto-block with notification
ip = enrichment_results[0]["observable"]
self._block_ip(ip)
self._add_case_task(case_id,
f"AUTO: Blocked {ip} at firewall ({malicious_count} sources confirm)")
actions.append({"action": "firewall_block", "target": ip,
"automated": True})
elif malicious_count >= self.MALICIOUS_THRESHOLD:
# Medium confidence — create task for analyst approval
ip = enrichment_results[0]["observable"]
self._add_case_task(case_id,
f"REVIEW: {ip} flagged by {malicious_count} sources — approve block?",
flag=True)
actions.append({"action": "firewall_block", "target": ip,
"automated": False, "pending_approval": True})
return {"actions": actions, "malicious_score": malicious_count}
def _block_ip(self, ip: str):
"""Add IP to firewall block list via API."""
self.firewall.add_block_rule(
ip=ip,
direction="both",
comment=f"TheHive auto-block — case enrichment",
duration_hours=24 # Temporary block, analyst extends if needed
)
def _add_case_task(self, case_id: str, description: str, flag: bool = False):
self.thehive.create_task(case_id, {
"title": "Automated Response Action",
"description": description,
"flag": flag,
"status": "Waiting" if flag else "Completed"
})
Account Lockout Playbook
For compromised credential scenarios:
class CompromisedAccountPlaybook(ResponsePlaybook):
"""Respond to confirmed account compromise."""
def evaluate(self, case_id: str, indicators: dict) -> dict:
actions = []
username = indicators.get("username")
if not username:
return {"actions": [], "error": "No username identified"}
# Immediate: disable account
self.iam.disable_account(username)
actions.append({"action": "account_disabled", "target": username})
# Immediate: revoke all active sessions
self.iam.revoke_sessions(username)
actions.append({"action": "sessions_revoked", "target": username})
# Create analyst tasks for follow-up
follow_up_tasks = [
"Review account activity logs for past 72 hours",
"Identify any lateral movement from compromised account",
"Check for persistence mechanisms (scheduled tasks, SSH keys)",
"Reset credentials and re-enable account after investigation",
"Notify account owner through out-of-band channel"
]
for task in follow_up_tasks:
self._add_case_task(case_id, task, flag=True)
return {"actions": actions}
False Positive Feedback Loops
Automation without feedback loops generates noise. When an analyst marks an alert as a false positive, the system must learn:
class FalsePositiveTracker:
"""Track false positives and auto-suppress repeat offenders."""
def __init__(self, db_path: str):
self.db_path = db_path
self._init_db()
def record_false_positive(self, alert_source: str,
rule_id: str, observable: str):
"""Record a false positive determination."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO false_positives
(source, rule_id, observable, reported_at)
VALUES (?, ?, ?, ?)
""", (alert_source, rule_id, observable, datetime.now().isoformat()))
def should_suppress(self, alert_source: str,
rule_id: str, observable: str) -> bool:
"""Check if this alert matches known false positive patterns."""
with sqlite3.connect(self.db_path) as conn:
# Suppress if same source+rule+observable reported 3+ times
count = conn.execute("""
SELECT COUNT() FROM false_positives
WHERE source = ? AND rule_id = ? AND observable = ?
AND reported_at > datetime('now', '-30 days')
""", (alert_source, rule_id, observable)).fetchone()[0]
return count >= 3
def get_suppression_stats(self) -> dict:
"""Report on suppression rates for tuning."""
with sqlite3.connect(self.db_path) as conn:
rows = conn.execute("""
SELECT source, rule_id, COUNT() as fp_count
FROM false_positives
WHERE reported_at > datetime('now', '-30 days')
GROUP BY source, rule_id
ORDER BY fp_count DESC
LIMIT 20
""").fetchall()
return [{"source": r[0], "rule_id": r[1], "fp_count": r[2]} for r in rows]
Audit Trails: Every Action Logged
Every automated action must be logged with enough detail for post-incident review and compliance:
import json
import logging
class AuditLogger:
def __init__(self, log_path: str):
self.logger = logging.getLogger("ir_audit")
handler = logging.FileHandler(log_path)
handler.setFormatter(logging.Formatter(
'%(asctime)s %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z'
))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_action(self, case_id: str, action: str,
target: str, automated: bool,
operator: str = "system", result: str = "success"):
entry = {
"case_id": case_id,
"action": action,
"target": target,
"automated": automated,
"operator": operator,
"result": result,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
self.logger.info(json.dumps(entry))
Playbook Design Principles
After building and operating dozens of response playbooks, several patterns emerge:
Start with alert-only playbooks. Before automating any containment action, run the playbook in “observe” mode for two weeks. Log what it would have done. Review those decisions manually. Only enable automated response after confirming an acceptable false positive rate.
Temporary containment, permanent decisions by humans. Automated blocks should be time-limited (24 hours). If the analyst confirms the threat, they extend or make it permanent. This limits the blast radius of false positives.
Separate enrichment from response. Enrichment should always be fully automated — it is non-destructive. Response should be gated by confidence thresholds. Keep these in separate, composable playbooks.
Test playbooks against historical data. Replay last month’s alerts through a new playbook in dry-run mode. If it would have auto-blocked your CEO’s home IP, you know the thresholds need adjustment before going live.
The goal is not to remove humans from incident response. It is to remove the 15 minutes of copy-paste, console-switching, and manual IOC lookup that delays every response, so the analyst can spend their time on judgment calls instead of data gathering.
