Automated Adversary Emulation: Building a Repeatable Penetration Testing Framework

Security teams that rely solely on annual penetration tests are operating with a dangerous blind spot. Between engagements, infrastructure changes, new services deploy, and fresh vulnerabilities emerge — all untested. Building an internal adversary emulation framework mapped to MITRE ATT&CK transforms penetration testing from a periodic checkbox into a continuous validation capability.

This guide walks through designing, building, and operating a repeatable adversary emulation framework that your red team can run weekly without burning the building down.

Why Adversary Emulation Over Traditional Pentesting

Traditional penetration testing answers one question: “Can someone break in?” Adversary emulation answers a far more useful one: “Can a specific threat actor achieve their objectives against our environment, and would we detect them doing it?”

The distinction matters. Adversary emulation uses documented threat intelligence — real TTPs from real adversary groups — to simulate realistic attack chains. Instead of throwing every tool at the wall, you replicate what APT29, FIN7, or a ransomware operator would actually do against your industry vertical.

The benefits compound over time. Each emulation run produces structured data you can compare against previous runs, track detection coverage trends, and feed directly into your vulnerability management program.

Architecture Overview

A production adversary emulation framework has four layers:

  1. Attack plan library — MITRE ATT&CK-mapped procedure sets organized by threat actor profile
  2. Execution engine — Orchestration tooling that runs attack chains with safety controls
  3. Detection validation — Integration with your SIEM/EDR to verify alert generation
  4. Reporting pipeline — Structured output that feeds vulnerability management and risk dashboards

The execution engine is the core. We will build it around a modular Python framework with YAML-defined attack plans.

Defining Attack Plans in YAML

Each attack plan maps to specific ATT&CK techniques and chains them into realistic sequences:

# plans/apt-initial-access-phishing.yml
plan:
  name: "Initial Access via Spearphishing Link"
  threat_actor: "Generic APT"
  objective: "Establish foothold from phishing vector"

  safety:
    scope:
      networks: ["10.100.0.0/16"]
      exclude_hosts: ["10.100.0.1", "10.100.1.1"]  # gateways
      exclude_services: ["production-db-"]
    max_duration_minutes: 60
    deescalation_triggers:
      - condition: "host_unreachable_count > 3"
        action: "abort"
      - condition: "privilege_level == SYSTEM"
        action: "pause_and_notify"

  phases:
    - id: "initial_access"
      technique: "T1566.002"  # Phishing: Spearphishing Link
      description: "Simulate payload delivery via crafted URL"
      tools: ["gophish_api", "payload_server"]
      parameters:
        payload_type: "macro_document"
        callback_host: "${C2_SERVER}"
        callback_port: 8443
      success_criteria:
        - "callback_received"

    - id: "execution"
      technique: "T1059.001"  # PowerShell
      depends_on: "initial_access"
      description: "Execute discovery commands via PowerShell"
      commands:
        - "whoami /all"
        - "net group 'Domain Admins' /domain"
        - "nltest /dclist:"

    - id: "privilege_escalation"
      technique: "T1548.002"  # UAC Bypass
      depends_on: "execution"
      description: "Attempt UAC bypass via fodhelper"
      tools: ["uac_bypass_module"]
      parameters:
        method: "fodhelper"

The Execution Engine

The engine reads plans, enforces safety constraints, executes each phase, and records results:

import yaml
import ipaddress
import logging
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

class PhaseStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    SKIPPED = "skipped"
    ABORTED = "aborted"

@dataclass
class EmulationResult:
    phase_id: str
    technique_id: str
    status: PhaseStatus
    started_at: datetime
    completed_at: Optional[datetime] = None
    output: str = ""
    detection_observed: bool = False
    alert_ids: list = field(default_factory=list)

class SafetyController:
    """Enforces scope limits and de-escalation triggers."""

    def __init__(self, safety_config: dict):
        self.allowed_networks = [
            ipaddress.ip_network(n) for n in safety_config["scope"]["networks"]
        ]
        self.excluded_hosts = set(safety_config["scope"].get("exclude_hosts", []))
        self.max_duration = timedelta(
            minutes=safety_config.get("max_duration_minutes", 30)
        )
        self.triggers = safety_config.get("deescalation_triggers", [])
        self.start_time = None
        self.host_failures = 0

    def validate_target(self, target_ip: str) -> bool:
        addr = ipaddress.ip_address(target_ip)
        if target_ip in self.excluded_hosts:
            logging.warning(f"Target {target_ip} is explicitly excluded")
            return False
        return any(addr in net for net in self.allowed_networks)

    def check_triggers(self, context: dict) -> Optional[str]:
        if self.start_time and datetime.now() - self.start_time > self.max_duration:
            return "abort"
        if context.get("host_unreachable_count", 0) > 3:
            return "abort"
        if context.get("privilege_level") == "SYSTEM":
            return "pause_and_notify"
        return None

class EmulationEngine:
    def __init__(self, plan_path: str):
        with open(plan_path) as f:
            self.plan = yaml.safe_load(f)["plan"]
        self.safety = SafetyController(self.plan["safety"])
        self.results: list[EmulationResult] = []
        self.context = {"host_unreachable_count": 0}

    def execute(self):
        self.safety.start_time = datetime.now()
        logging.info(f"Starting emulation: {self.plan['name']}")

        for phase in self.plan["phases"]:
            # Check safety before each phase
            action = self.safety.check_triggers(self.context)
            if action == "abort":
                logging.critical("Safety trigger fired — aborting emulation")
                self._record_abort(phase)
                break
            elif action == "pause_and_notify":
                logging.warning("Elevated privilege detected — pausing")
                if not self._operator_confirms_continue():
                    break

            # Check dependencies
            if not self._dependencies_met(phase):
                self._record_skip(phase)
                continue

            result = self._execute_phase(phase)
            self.results.append(result)

        return self._generate_report()

Safety Controls Are Non-Negotiable

The safety layer deserves its own section because getting it wrong means turning your red team exercise into an actual incident. Every emulation framework must enforce:

Scope locking. The engine must refuse to touch any host outside the defined network ranges. This is not a suggestion — it is a hard block in the execution path. Validate every target IP before any packet leaves.

Time boxing. Set maximum durations per plan and per phase. If an attack chain stalls and an operator walks away, the framework should terminate gracefully after the timeout.

De-escalation triggers. Define conditions that automatically pause or abort execution. Achieving SYSTEM or root privileges unexpectedly, hitting too many unreachable hosts (indicating network issues), or detecting production workload disruption should all halt the run.

Kill switch. Implement an out-of-band kill mechanism — a file on a shared path, an API endpoint, or a message queue topic. If anything goes wrong, any team member should be able to stop all running emulations instantly:

KILL_SWITCH_PATH = "/opt/emulation/EMERGENCY_STOP"

def check_kill_switch() -> bool:
    return os.path.exists(KILL_SWITCH_PATH)

# Check before every network operation
if check_kill_switch():
    raise EmulationAborted("Kill switch activated")

Detection Validation Integration

Running attacks without checking whether your defenses saw them is only half the exercise. Integrate with your SIEM to validate detection coverage:

import requests
from time import sleep

class DetectionValidator:
    """Query SIEM for alerts generated during emulation phases."""

    def __init__(self, siem_url: str, api_key: str):
        self.siem_url = siem_url
        self.headers = {"Authorization": f"Bearer {api_key}"}

    def check_detection(self, technique_id: str,
                         time_start: datetime,
                         time_end: datetime,
                         target_host: str) -> dict:
        # Wait for SIEM ingestion lag
        sleep(30)

        query = {
            "query": f"mitre.technique_id:{technique_id} AND host:{target_host}",
            "time_range": {
                "from": time_start.isoformat(),
                "to": time_end.isoformat()
            }
        }

        resp = requests.post(
            f"{self.siem_url}/api/search",
            json=query,
            headers=self.headers
        )

        alerts = resp.json().get("hits", [])
        return {
            "detected": len(alerts) > 0,
            "alert_count": len(alerts),
            "alert_ids": [a["id"] for a in alerts],
            "mean_time_to_detect": self._calc_mttd(alerts, time_start)
        }

Structured Reporting and Vulnerability Management Integration

Every emulation run produces a structured JSON report that feeds into your vulnerability management pipeline:

def generate_report(self) -> dict:
    total = len(self.results)
    detected = sum(1 for r in self.results if r.detection_observed)

    return {
        "emulation_id": self.run_id,
        "plan": self.plan["name"],
        "threat_actor": self.plan["threat_actor"],
        "executed_at": self.start_time.isoformat(),
        "summary": {
            "phases_executed": total,
            "phases_successful": sum(
                1 for r in self.results if r.status == PhaseStatus.SUCCESS
            ),
            "detections_fired": detected,
            "detection_coverage": f"{(detected/total)100:.1f}%" if total else "N/A",
            "mean_time_to_detect_seconds": self._avg_mttd()
        },
        "technique_coverage": [
            {
                "technique": r.technique_id,
                "phase": r.phase_id,
                "attack_succeeded": r.status == PhaseStatus.SUCCESS,
                "detection_observed": r.detection_observed,
                "gap": r.status == PhaseStatus.SUCCESS and not r.detection_observed
            }
            for r in self.results
        ],
        "gaps": [
            r.technique_id for r in self.results
            if r.status == PhaseStatus.SUCCESS and not r.detection_observed
        ]
    }

The gaps array is the most actionable output — these are techniques that succeeded without generating any detection. Feed these directly into your detection engineering backlog.

Operationalizing: From Script to Program

Running the framework once is a project. Running it weekly is a program. To operationalize:

Schedule regular runs. Use cron or your CI/CD platform to execute baseline attack plans against staging environments weekly. Compare detection coverage week over week.

Maintain a plan library. Map your threat model to specific adversary groups. Build and maintain plans for each. When threat intelligence identifies a new campaign targeting your sector, build a plan for it within 48 hours.

Track coverage metrics. The detection coverage percentage over time is your north star metric. Plot it monthly. Share it with leadership. It is the single most honest measure of your defensive posture.

Feed findings into remediation. Every gap identified should create a ticket in your vulnerability management system with the ATT&CK technique, the specific procedure that succeeded, and a recommended detection rule.

Building adversary emulation as an internal capability requires significant upfront investment, but the return is a security team that knows — not hopes — that their defenses work against the threats that matter.

Scroll to Top