Incident Response Automation with TheHive and Cortex: From Alert to Containment in Minutes

The gap between detecting a security incident and containing it is where damage accumulates. Every minute an attacker has post-detection is a minute they are exfiltrating data, establishing persistence, or moving laterally. Manual incident response processes that depend on an analyst reading an email, logging into three consoles, and copy-pasting IOCs between tools cannot close this gap fast enough.

TheHive and Cortex provide the foundation for automated incident response — from alert ingestion through enrichment to containment — with full audit trails and human oversight at critical decision points. This guide covers building that pipeline end to end.

Architecture: The Automated IR Pipeline

The pipeline flows through four stages:

SIEM Alert → TheHive Case → Cortex Enrichment → Response Playbook → Containment
     ↑                                                    |
     └──────────── False Positive Feedback ───────────────┘

Each stage is automated but includes breakpoints where analyst review is required before destructive actions execute. Full automation of containment is possible but should only be enabled for high-confidence, well-tested playbooks.

Deploying TheHive and Cortex

A production deployment uses dedicated backend storage. TheHive uses Cassandra for case data and Elasticsearch for indexing. Cortex shares the Elasticsearch instance:

# docker-compose.yml
version: "3.8"

services:
  cassandra:
    image: cassandra:4.1
    volumes:
      - cassandra_data:/var/lib/cassandra
    environment:
      - CASSANDRA_CLUSTER_NAME=thehive
      - MAX_HEAP_SIZE=1024M
      - HEAP_NEWSIZE=256M
    healthcheck:
      test: ["CMD", "cqlsh", "-e", "describe cluster"]
      interval: 30s
      timeout: 10s
      retries: 5

  elasticsearch:
    image: elasticsearch:7.17.18
    volumes:
      - es_data:/usr/share/elasticsearch/data
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      nofile:
        soft: 65535
        hard: 65535

  thehive:
    image: strangebee/thehive:5.4
    depends_on:
      cassandra:
        condition: service_healthy
      elasticsearch:
        condition: service_started
    ports:
      - "9000:9000"
    volumes:
      - thehive_data:/opt/thp/thehive/data
      - ./thehive/application.conf:/etc/thehive/application.conf
    environment:
      - JVM_OPTS=-Xms512m -Xmx1024m

  cortex:
    image: thehiveproject/cortex:3.1
    depends_on:
      - elasticsearch
    ports:
      - "9001:9001"
    volumes:
      - ./cortex/application.conf:/etc/cortex/application.conf
      - /var/run/docker.sock:/var/run/docker.sock  # For dockerized analyzers

volumes:
  cassandra_data:
  es_data:
  thehive_data:

TheHive Configuration

# thehive/application.conf
db.janusgraph {
  storage {
    backend = cql
    hostname = ["cassandra"]
    cql {
      cluster-name = thehive
      keyspace = thehive
    }
  }
  index.search {
    backend = elasticsearch
    hostname = ["elasticsearch"]
    index-name = thehive
  }
}

storage {
  provider = localfs
  localfs.location = /opt/thp/thehive/data
}

play.http.secret.key = "your-secret-key-change-this"

SIEM Integration: Alert to Case

The first automation connects your SIEM to TheHive. When a SIEM rule fires, it creates a TheHive alert via the API:

import requests
from datetime import datetime

class TheHiveAlertForwarder:
    """Forward SIEM alerts to TheHive as structured alerts."""

    def __init__(self, thehive_url: str, api_key: str):
        self.url = f"{thehive_url}/api/v1"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def create_alert(self, siem_alert: dict) -> dict:
        alert = {
            "type": siem_alert.get("rule_group", "SIEM"),
            "source": "SIEM",
            "sourceRef": siem_alert["alert_id"],
            "title": siem_alert["title"],
            "description": self._build_description(siem_alert),
            "severity": self._map_severity(siem_alert["severity"]),
            "tags": self._extract_tags(siem_alert),
            "observables": self._extract_observables(siem_alert)
        }

        resp = requests.post(
            f"{self.url}/alert",
            json=alert,
            headers=self.headers
        )
        resp.raise_for_status()
        return resp.json()

    def _extract_observables(self, alert: dict) -> list:
        observables = []

        if alert.get("src_ip"):
            observables.append({
                "dataType": "ip",
                "data": alert["src_ip"],
                "message": "Source IP from SIEM alert",
                "tags": ["src", "siem"]
            })

        if alert.get("dest_ip"):
            observables.append({
                "dataType": "ip",
                "data": alert["dest_ip"],
                "message": "Destination IP from SIEM alert",
                "tags": ["dst", "siem"]
            })

        for domain in alert.get("domains", []):
            observables.append({
                "dataType": "domain",
                "data": domain,
                "message": "Domain extracted from SIEM alert",
                "tags": ["siem"]
            })

        for hash_val in alert.get("file_hashes", []):
            observables.append({
                "dataType": "hash",
                "data": hash_val,
                "message": "File hash from SIEM alert",
                "tags": ["siem"]
            })

        return observables

    def _map_severity(self, siem_severity: str) -> int:
        mapping = {"low": 1, "medium": 2, "high": 3, "critical": 4}
        return mapping.get(siem_severity.lower(), 2)

Cortex Analyzers: Automated Enrichment

Once an alert arrives in TheHive, Cortex analyzers automatically enrich every observable. Configure analyzers for your threat intel sources:

class CortexEnrichmentPipeline:
    """Trigger Cortex analyzers on new TheHive observables."""

    def __init__(self, cortex_url: str, api_key: str):
        self.url = f"{cortex_url}/api"
        self.headers = {"Authorization": f"Bearer {api_key}"}

    # Map observable types to relevant analyzers
    ANALYZER_MAP = {
        "ip": [
            "AbuseIPDB_1_0",
            "OTXQuery_2_0",
            "Shodan_DNSResolve_2_0",
            "VirusTotal_GetReport_3_1"
        ],
        "domain": [
            "DomainTools_ReverseIP_1_0",
            "OTXQuery_2_0",
            "VirusTotal_GetReport_3_1",
            "URLhaus_2_0"
        ],
        "hash": [
            "VirusTotal_GetReport_3_1",
            "MalwareBazaar_1_0",
            "HybridAnalysis_GetReport_1_0"
        ]
    }

    def enrich_observable(self, observable: dict) -> list:
        data_type = observable["dataType"]
        analyzers = self.ANALYZER_MAP.get(data_type, [])

        jobs = []
        for analyzer in analyzers:
            job = self._run_analyzer(analyzer, observable)
            if job:
                jobs.append(job)

        return jobs

    def _run_analyzer(self, analyzer_name: str, observable: dict) -> dict:
        payload = {
            "data": observable["data"],
            "dataType": observable["dataType"],
            "tlp": 2,  # TLP:AMBER by default
            "message": f"Auto-enrichment for case observable"
        }

        resp = requests.post(
            f"{self.url}/analyzer/{analyzer_name}/run",
            json=payload,
            headers=self.headers
        )

        if resp.status_code == 200:
            return resp.json()
        return None

Response Playbooks: From Enrichment to Containment

Playbooks define automated response actions based on enrichment results. The key design pattern is a threshold-based decision tree with mandatory human approval for destructive actions:

class ResponsePlaybook:
    """Base class for automated response playbooks."""

    def __init__(self, thehive_client, containment_apis: dict):
        self.thehive = thehive_client
        self.firewall = containment_apis.get("firewall")
        self.edr = containment_apis.get("edr")
        self.iam = containment_apis.get("iam")

    def evaluate(self, case_id: str, enrichment_results: list) -> dict:
        """Evaluate enrichment and determine response actions."""
        raise NotImplementedError

class MaliciousIPPlaybook(ResponsePlaybook):
    """Respond to confirmed malicious IP communication."""

    MALICIOUS_THRESHOLD = 3  # Sources confirming malicious
    AUTO_BLOCK_THRESHOLD = 5  # Auto-block without human approval

    def evaluate(self, case_id: str, enrichment_results: list) -> dict:
        malicious_count = sum(
            1 for r in enrichment_results
            if r.get("report", {}).get("malicious", False)
        )

        actions = []

        if malicious_count >= self.AUTO_BLOCK_THRESHOLD:
            # High confidence — auto-block with notification
            ip = enrichment_results[0]["observable"]
            self._block_ip(ip)
            self._add_case_task(case_id,
                f"AUTO: Blocked {ip} at firewall ({malicious_count} sources confirm)")
            actions.append({"action": "firewall_block", "target": ip,
                          "automated": True})

        elif malicious_count >= self.MALICIOUS_THRESHOLD:
            # Medium confidence — create task for analyst approval
            ip = enrichment_results[0]["observable"]
            self._add_case_task(case_id,
                f"REVIEW: {ip} flagged by {malicious_count} sources — approve block?",
                flag=True)
            actions.append({"action": "firewall_block", "target": ip,
                          "automated": False, "pending_approval": True})

        return {"actions": actions, "malicious_score": malicious_count}

    def _block_ip(self, ip: str):
        """Add IP to firewall block list via API."""
        self.firewall.add_block_rule(
            ip=ip,
            direction="both",
            comment=f"TheHive auto-block — case enrichment",
            duration_hours=24  # Temporary block, analyst extends if needed
        )

    def _add_case_task(self, case_id: str, description: str, flag: bool = False):
        self.thehive.create_task(case_id, {
            "title": "Automated Response Action",
            "description": description,
            "flag": flag,
            "status": "Waiting" if flag else "Completed"
        })

Account Lockout Playbook

For compromised credential scenarios:

class CompromisedAccountPlaybook(ResponsePlaybook):
    """Respond to confirmed account compromise."""

    def evaluate(self, case_id: str, indicators: dict) -> dict:
        actions = []
        username = indicators.get("username")

        if not username:
            return {"actions": [], "error": "No username identified"}

        # Immediate: disable account
        self.iam.disable_account(username)
        actions.append({"action": "account_disabled", "target": username})

        # Immediate: revoke all active sessions
        self.iam.revoke_sessions(username)
        actions.append({"action": "sessions_revoked", "target": username})

        # Create analyst tasks for follow-up
        follow_up_tasks = [
            "Review account activity logs for past 72 hours",
            "Identify any lateral movement from compromised account",
            "Check for persistence mechanisms (scheduled tasks, SSH keys)",
            "Reset credentials and re-enable account after investigation",
            "Notify account owner through out-of-band channel"
        ]

        for task in follow_up_tasks:
            self._add_case_task(case_id, task, flag=True)

        return {"actions": actions}

False Positive Feedback Loops

Automation without feedback loops generates noise. When an analyst marks an alert as a false positive, the system must learn:

class FalsePositiveTracker:
    """Track false positives and auto-suppress repeat offenders."""

    def __init__(self, db_path: str):
        self.db_path = db_path
        self._init_db()

    def record_false_positive(self, alert_source: str,
                                rule_id: str, observable: str):
        """Record a false positive determination."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT INTO false_positives
                (source, rule_id, observable, reported_at)
                VALUES (?, ?, ?, ?)
            """, (alert_source, rule_id, observable, datetime.now().isoformat()))

    def should_suppress(self, alert_source: str,
                         rule_id: str, observable: str) -> bool:
        """Check if this alert matches known false positive patterns."""
        with sqlite3.connect(self.db_path) as conn:
            # Suppress if same source+rule+observable reported 3+ times
            count = conn.execute("""
                SELECT COUNT() FROM false_positives
                WHERE source = ? AND rule_id = ? AND observable = ?
                AND reported_at > datetime('now', '-30 days')
            """, (alert_source, rule_id, observable)).fetchone()[0]

            return count >= 3

    def get_suppression_stats(self) -> dict:
        """Report on suppression rates for tuning."""
        with sqlite3.connect(self.db_path) as conn:
            rows = conn.execute("""
                SELECT source, rule_id, COUNT() as fp_count
                FROM false_positives
                WHERE reported_at > datetime('now', '-30 days')
                GROUP BY source, rule_id
                ORDER BY fp_count DESC
                LIMIT 20
            """).fetchall()

            return [{"source": r[0], "rule_id": r[1], "fp_count": r[2]} for r in rows]

Audit Trails: Every Action Logged

Every automated action must be logged with enough detail for post-incident review and compliance:

import json
import logging

class AuditLogger:
    def __init__(self, log_path: str):
        self.logger = logging.getLogger("ir_audit")
        handler = logging.FileHandler(log_path)
        handler.setFormatter(logging.Formatter(
            '%(asctime)s %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z'
        ))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_action(self, case_id: str, action: str,
                    target: str, automated: bool,
                    operator: str = "system", result: str = "success"):
        entry = {
            "case_id": case_id,
            "action": action,
            "target": target,
            "automated": automated,
            "operator": operator,
            "result": result,
            "timestamp": datetime.utcnow().isoformat() + "Z"
        }
        self.logger.info(json.dumps(entry))

Playbook Design Principles

After building and operating dozens of response playbooks, several patterns emerge:

Start with alert-only playbooks. Before automating any containment action, run the playbook in “observe” mode for two weeks. Log what it would have done. Review those decisions manually. Only enable automated response after confirming an acceptable false positive rate.

Temporary containment, permanent decisions by humans. Automated blocks should be time-limited (24 hours). If the analyst confirms the threat, they extend or make it permanent. This limits the blast radius of false positives.

Separate enrichment from response. Enrichment should always be fully automated — it is non-destructive. Response should be gated by confidence thresholds. Keep these in separate, composable playbooks.

Test playbooks against historical data. Replay last month’s alerts through a new playbook in dry-run mode. If it would have auto-blocked your CEO’s home IP, you know the thresholds need adjustment before going live.

The goal is not to remove humans from incident response. It is to remove the 15 minutes of copy-paste, console-switching, and manual IOC lookup that delays every response, so the analyst can spend their time on judgment calls instead of data gathering.

Scroll to Top