At 2:47 AM, our Security Operations Center detected anomalous network traffic from a production container. Within minutes, we discovered a sophisticated attack: an attacker had exploited a vulnerability, gained container access, and was actively exfiltrating customer data. This incident exposed a critical gap in our security posture - we had no forensics capability for containerized environments. By the time we understood what happened, evidence was gone, containers deleted, and logs incomplete. This is the complete story of how we built an enterprise-grade container forensics and incident response framework that has since detected and contained 23 security incidents with zero data loss.

This comprehensive guide covers container forensics fundamentals, evidence collection procedures, threat hunting techniques, compliance requirements, and automated investigation workflows for production Kubernetes and Docker environments.

The Problem: Evidence Volatility in Container Environments

Why Traditional Forensics Fails

Traditional server forensics assumes long-lived, persistent infrastructure. Containers break every assumption:

# Traditional Server Forensics Timeline:
Hour 0: Incident detected
Hour 1: Image server for forensics
Hour 2-8: Analyze disk image
Hour 12: Present findings

# Container Reality:
Minute 0: Incident detected
Minute 2: Container restarted by orchestrator
Minute 3: Evidence destroyed
Minute 4: "What container?"

Our first incident revealed the problem starkly:

# Initial detection
$ kubectl get events --sort-by='.lastTimestamp' | head -5
LAST SEEN   TYPE      REASON      OBJECT                    MESSAGE
3m          Warning   Unhealthy   pod/webapp-7d4f9b8c-4xk2p Port 8080 not responding
2m          Normal    Killing     pod/webapp-7d4f9b8c-4xk2p Stopping container
1m          Normal    Pulled      pod/webapp-7d4f9b8c-7n9k8 Successfully pulled image
45s         Normal    Created     pod/webapp-7d4f9b8c-7n9k8 Created container
30s         Normal    Started     pod/webapp-7d4f9b8c-7n9k8 Started container

# Try to investigate original container
$ kubectl logs webapp-7d4f9b8c-4xk2p
Error from server (NotFound): pods "webapp-7d4f9b8c-4xk2p" not found

# Pod already replaced!
$ kubectl describe pod webapp-7d4f9b8c-7n9k8
Events:
  Type    Reason     Age   Message
  ----    ------     ----  -------
  Normal  Pulled     2m    Successfully pulled image
  Normal  Created    2m    Created container
  Normal  Started    2m    Started container

# No forensic data available
# Attack evidence completely lost

Building the Forensics Framework

Based on this painful lesson, we designed a comprehensive framework:

┌────────────────────────────────────────────────────────────────┐
│          Container Forensics Framework Architecture            │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│  ┌─────────────────────────────────────────────────────────┐  │
│  │              Detection Layer                            │  │
│  │  • Runtime monitoring (Falco)                           │  │
│  │  • Network traffic analysis                             │  │
│  │  • Behavioral anomaly detection                         │  │
│  │  • Log aggregation and correlation                      │  │
│  └────────────────┬────────────────────────────────────────┘  │
│                   │                                            │
│                   ▼                                            │
│  ┌─────────────────────────────────────────────────────────┐  │
│  │           Preservation Layer                            │  │
│  │  • Automated snapshot creation                          │  │
│  │  • Memory dump capture                                  │  │
│  │  • Network packet capture                               │  │
│  │  • Log preservation                                     │  │
│  │  • Chain of custody tracking                            │  │
│  └────────────────┬────────────────────────────────────────┘  │
│                   │                                            │
│                   ▼                                            │
│  ┌─────────────────────────────────────────────────────────┐  │
│  │            Analysis Layer                               │  │
│  │  • Automated triage                                     │  │
│  │  • Threat intelligence correlation                      │  │
│  │  • Timeline reconstruction                              │  │
│  │  • Indicator of Compromise (IoC) extraction             │  │
│  └────────────────┬────────────────────────────────────────┘  │
│                   │                                            │
│                   ▼                                            │
│  ┌─────────────────────────────────────────────────────────┐  │
│  │           Response Layer                                │  │
│  │  • Automated containment                                │  │
│  │  • Evidence collection                                  │  │
│  │  • Remediation workflows                                │  │
│  │  • Compliance reporting                                 │  │
│  └─────────────────────────────────────────────────────────┘  │
└────────────────────────────────────────────────────────────────┘

Phase 1: Detection and Monitoring

Runtime Security with Falco

Deploy Falco for real-time container activity monitoring:

# falco-deployment.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: falco
  namespace: security
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: falco
rules:
- apiGroups: [""]
  resources: ["pods", "namespaces", "nodes"]
  verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: falco
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: falco
subjects:
- kind: ServiceAccount
  name: falco
  namespace: security
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: falco
  namespace: security
spec:
  selector:
    matchLabels:
      app: falco
  template:
    metadata:
      labels:
        app: falco
    spec:
      serviceAccountName: falco
      hostNetwork: true
      hostPID: true
      tolerations:
      - effect: NoSchedule
        key: node-role.kubernetes.io/master
      containers:
      - name: falco
        image: falcosecurity/falco:0.36.2
        securityContext:
          privileged: true
        args:
        - /usr/bin/falco
        - -K
        - /var/run/secrets/kubernetes.io/serviceaccount/token
        - -k
        - https://kubernetes.default
        - -pk
        volumeMounts:
        - name: dev
          mountPath: /host/dev
        - name: proc
          mountPath: /host/proc
          readOnly: true
        - name: boot
          mountPath: /host/boot
          readOnly: true
        - name: lib-modules
          mountPath: /host/lib/modules
          readOnly: true
        - name: usr
          mountPath: /host/usr
          readOnly: true
        - name: etc
          mountPath: /host/etc
          readOnly: true
        - name: falco-config
          mountPath: /etc/falco
        - name: falco-rules
          mountPath: /etc/falco/rules.d
        env:
        - name: FALCO_K8S_NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
      volumes:
      - name: dev
        hostPath:
          path: /dev
      - name: proc
        hostPath:
          path: /proc
      - name: boot
        hostPath:
          path: /boot
      - name: lib-modules
        hostPath:
          path: /lib/modules
      - name: usr
        hostPath:
          path: /usr
      - name: etc
        hostPath:
          path: /etc
      - name: falco-config
        configMap:
          name: falco-config
      - name: falco-rules
        configMap:
          name: falco-custom-rules
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: falco-config
  namespace: security
data:
  falco.yaml: |
    # File containing Falco rules
    rules_file:
    - /etc/falco/falco_rules.yaml
    - /etc/falco/falco_rules.local.yaml
    - /etc/falco/k8s_audit_rules.yaml
    - /etc/falco/rules.d

    # Outputs
    json_output: true
    json_include_output_property: true
    json_include_tags_property: true

    # File output
    file_output:
      enabled: true
      keep_alive: false
      filename: /var/log/falco/events.json

    # Program output (for automated response)
    program_output:
      enabled: true
      keep_alive: false
      program: |
        jq -c . | while read event; do
          curl -X POST http://forensics-collector.security.svc.cluster.local/api/v1/events \
            -H "Content-Type: application/json" \
            -d "$event"
        done

    # HTTP output
    http_output:
      enabled: true
      url: http://forensics-collector.security.svc.cluster.local/api/v1/events

    # Priority
    priority: warning

    # Buffer
    buffered_outputs: false

    # Rate limiting
    outputs_rate: 100
    outputs_max_burst: 1000
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: falco-custom-rules
  namespace: security
data:
  custom-rules.yaml: |
    # Custom forensics-focused rules

    - rule: Container Shell Spawned
      desc: Detect shell spawned in container (potential compromise)
      condition: >
        spawned_process and
        container and
        proc.name in (shell_binaries)
      output: >
        Shell spawned in container
        (user=%user.name container_id=%container.id container_name=%container.name
        image=%container.image.repository:%container.image.tag
        shell=%proc.name parent=%proc.pname cmdline=%proc.cmdline)
      priority: WARNING
      tags: [forensics, shell, container]

    - rule: Suspicious Network Activity
      desc: Detect suspicious outbound network connections
      condition: >
        outbound and
        container and
        not fd.sip in (allowed_destinations) and
        fd.sport not in (53, 80, 443)
      output: >
        Suspicious network connection from container
        (user=%user.name container_id=%container.id container_name=%container.name
        image=%container.image.repository:%container.image.tag
        connection=%fd.name direction=%fd.direction)
      priority: WARNING
      tags: [forensics, network, container]

    - rule: File System Modification in Container
      desc: Detect unexpected file modifications
      condition: >
        open_write and
        container and
        not fd.name in (allowed_write_paths) and
        not proc.name in (allowed_processes)
      output: >
        Unexpected file write in container
        (user=%user.name container_id=%container.id container_name=%container.name
        image=%container.image.repository:%container.image.tag
        file=%fd.name process=%proc.name cmdline=%proc.cmdline)
      priority: WARNING
      tags: [forensics, filesystem, container]

    - rule: Privilege Escalation Attempt
      desc: Detect attempts to escalate privileges
      condition: >
        spawned_process and
        container and
        proc.name in (privilege_escalation_binaries)
      output: >
        Privilege escalation attempt in container
        (user=%user.name container_id=%container.id container_name=%container.name
        image=%container.image.repository:%container.image.tag
        process=%proc.name cmdline=%proc.cmdline)
      priority: CRITICAL
      tags: [forensics, privilege-escalation, container]

    - rule: Container Drift Detected
      desc: Detect execution of binary not in original image
      condition: >
        spawned_process and
        container and
        not proc.is_container_image_process
      output: >
        Container drift detected - binary not in original image
        (user=%user.name container_id=%container.id container_name=%container.name
        image=%container.image.repository:%container.image.tag
        process=%proc.name cmdline=%proc.cmdline exe=%proc.exe)
      priority: ERROR
      tags: [forensics, drift, container]

    # Crypto mining detection
    - rule: Crypto Mining Activity
      desc: Detect crypto mining indicators
      condition: >
        spawned_process and
        container and
        (proc.name in (crypto_miners) or
         proc.cmdline contains "stratum" or
         proc.cmdline contains "xmrig" or
         proc.cmdline contains "minerd")
      output: >
        Crypto mining detected in container
        (user=%user.name container_id=%container.id container_name=%container.name
        image=%container.image.repository:%container.image.tag
        process=%proc.name cmdline=%proc.cmdline)
      priority: CRITICAL
      tags: [forensics, crypto-mining, container]

    # C2 communication detection
    - rule: Command and Control Communication
      desc: Detect potential C2 communication
      condition: >
        outbound and
        container and
        (fd.sip in (known_c2_ips) or
         fd.rip in (known_c2_ips) or
         fd.rip_name in (known_c2_domains))
      output: >
        Potential C2 communication from container
        (user=%user.name container_id=%container.id container_name=%container.name
        image=%container.image.repository:%container.image.tag
        connection=%fd.name ip=%fd.rip)
      priority: CRITICAL
      tags: [forensics, c2, network, container]

Automated Evidence Collection Service

# forensics-collector.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: forensics-collector
  namespace: security
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: forensics-collector
rules:
- apiGroups: [""]
  resources: ["pods", "pods/log", "pods/exec"]
  verbs: ["get", "list", "watch", "create"]
- apiGroups: [""]
  resources: ["namespaces", "nodes", "events"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
  resources: ["deployments", "replicasets", "daemonsets", "statefulsets"]
  verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: forensics-collector
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: forensics-collector
subjects:
- kind: ServiceAccount
  name: forensics-collector
  namespace: security
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: forensics-collector
  namespace: security
spec:
  replicas: 2
  selector:
    matchLabels:
      app: forensics-collector
  template:
    metadata:
      labels:
        app: forensics-collector
    spec:
      serviceAccountName: forensics-collector
      containers:
      - name: collector
        image: forensics-collector:1.0.0
        ports:
        - containerPort: 8080
          name: http
        env:
        - name: STORAGE_BACKEND
          value: "s3"
        - name: S3_BUCKET
          value: "forensics-evidence"
        - name: S3_PREFIX
          value: "investigations/"
        - name: PRESERVATION_ENABLED
          value: "true"
        - name: AUTO_SNAPSHOT_ENABLED
          value: "true"
        volumeMounts:
        - name: evidence
          mountPath: /evidence
        resources:
          requests:
            cpu: 500m
            memory: 1Gi
          limits:
            cpu: 2000m
            memory: 4Gi
      volumes:
      - name: evidence
        persistentVolumeClaim:
          claimName: forensics-evidence
---
apiVersion: v1
kind: Service
metadata:
  name: forensics-collector
  namespace: security
spec:
  selector:
    app: forensics-collector
  ports:
  - port: 80
    targetPort: 8080
    name: http
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: forensics-evidence
  namespace: security
spec:
  accessModes:
  - ReadWriteOnce
  storageClassName: fast-ssd
  resources:
    requests:
      storage: 500Gi

Forensics Collector Implementation

// forensics-collector/main.go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "os"
    "os/exec"
    "time"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
)

type ForensicsCollector struct {
    k8sClient *kubernetes.Clientset
    s3Client  *s3.S3
    s3Bucket  string
    s3Prefix  string
}

type FalcoEvent struct {
    Time       time.Time              `json:"time"`
    Priority   string                 `json:"priority"`
    Rule       string                 `json:"rule"`
    Output     string                 `json:"output"`
    Tags       []string               `json:"tags"`
    OutputFields map[string]interface{} `json:"output_fields"`
}

type Investigation struct {
    ID             string                 `json:"id"`
    StartTime      time.Time              `json:"start_time"`
    TriggerEvent   FalcoEvent             `json:"trigger_event"`
    ContainerID    string                 `json:"container_id"`
    PodName        string                 `json:"pod_name"`
    Namespace      string                 `json:"namespace"`
    NodeName       string                 `json:"node_name"`
    Evidence       []Evidence             `json:"evidence"`
    Status         string                 `json:"status"`
    Metadata       map[string]interface{} `json:"metadata"`
}

type Evidence struct {
    Type        string                 `json:"type"`
    Timestamp   time.Time              `json:"timestamp"`
    Location    string                 `json:"location"`
    Size        int64                  `json:"size"`
    Hash        string                 `json:"hash"`
    Metadata    map[string]interface{} `json:"metadata"`
}

func NewForensicsCollector() (*ForensicsCollector, error) {
    // Initialize Kubernetes client
    config, err := rest.InClusterConfig()
    if err != nil {
        return nil, fmt.Errorf("failed to get in-cluster config: %w", err)
    }

    k8sClient, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, fmt.Errorf("failed to create k8s client: %w", err)
    }

    // Initialize S3 client
    sess := session.Must(session.NewSession())
    s3Client := s3.New(sess)

    return &ForensicsCollector{
        k8sClient: k8sClient,
        s3Client:  s3Client,
        s3Bucket:  os.Getenv("S3_BUCKET"),
        s3Prefix:  os.Getenv("S3_PREFIX"),
    }, nil
}

func (fc *ForensicsCollector) HandleEvent(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
        return
    }

    var event FalcoEvent
    if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
        http.Error(w, fmt.Sprintf("Failed to decode event: %v", err), http.StatusBadRequest)
        return
    }

    // Check if event requires forensic investigation
    if fc.requiresInvestigation(event) {
        investigation, err := fc.initiateInvestigation(event)
        if err != nil {
            log.Printf("Failed to initiate investigation: %v", err)
            http.Error(w, fmt.Sprintf("Failed to initiate investigation: %v", err), http.StatusInternalServerError)
            return
        }

        log.Printf("Investigation initiated: %s", investigation.ID)
        w.WriteHeader(http.StatusAccepted)
        json.NewEncoder(w).Encode(investigation)
        return
    }

    w.WriteHeader(http.StatusOK)
}

func (fc *ForensicsCollector) requiresInvestigation(event FalcoEvent) bool {
    // Check priority
    if event.Priority == "CRITICAL" || event.Priority == "ERROR" {
        return true
    }

    // Check tags
    forensicsTags := []string{"shell", "network", "privilege-escalation", "crypto-mining", "c2"}
    for _, tag := range event.Tags {
        for _, forensicsTag := range forensicsTags {
            if tag == forensicsTag {
                return true
            }
        }
    }

    return false
}

func (fc *ForensicsCollector) initiateInvestigation(event FalcoEvent) (*Investigation, error) {
    ctx := context.Background()

    // Extract container details from event
    containerID, _ := event.OutputFields["container_id"].(string)
    containerName, _ := event.OutputFields["container_name"].(string)
    namespace, _ := event.OutputFields["k8s_ns_name"].(string)
    podName, _ := event.OutputFields["k8s_pod_name"].(string)

    investigation := &Investigation{
        ID:           fmt.Sprintf("INV-%d", time.Now().Unix()),
        StartTime:    time.Now(),
        TriggerEvent: event,
        ContainerID:  containerID,
        PodName:      podName,
        Namespace:    namespace,
        Status:       "in-progress",
        Evidence:     []Evidence{},
        Metadata:     make(map[string]interface{}),
    }

    // Get pod details
    pod, err := fc.k8sClient.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
    if err != nil {
        return nil, fmt.Errorf("failed to get pod: %w", err)
    }

    investigation.NodeName = pod.Spec.NodeName
    investigation.Metadata["pod_uid"] = string(pod.UID)
    investigation.Metadata["pod_labels"] = pod.Labels
    investigation.Metadata["pod_annotations"] = pod.Annotations

    // Collect evidence asynchronously
    go fc.collectEvidence(investigation, pod, containerName)

    return investigation, nil
}

func (fc *ForensicsCollector) collectEvidence(investigation *Investigation, pod interface{}, containerName string) {
    ctx := context.Background()

    log.Printf("Collecting evidence for investigation %s", investigation.ID)

    // 1. Preserve container snapshot
    if err := fc.preserveContainer(ctx, investigation, containerName); err != nil {
        log.Printf("Failed to preserve container: %v", err)
    }

    // 2. Capture memory dump
    if err := fc.captureMemoryDump(ctx, investigation, containerName); err != nil {
        log.Printf("Failed to capture memory dump: %v", err)
    }

    // 3. Collect logs
    if err := fc.collectLogs(ctx, investigation); err != nil {
        log.Printf("Failed to collect logs: %v", err)
    }

    // 4. Capture network connections
    if err := fc.captureNetworkState(ctx, investigation, containerName); err != nil {
        log.Printf("Failed to capture network state: %v", err)
    }

    // 5. Extract file system artifacts
    if err := fc.extractArtifacts(ctx, investigation, containerName); err != nil {
        log.Printf("Failed to extract artifacts: %v", err)
    }

    // 6. Capture process list
    if err := fc.captureProcessList(ctx, investigation, containerName); err != nil {
        log.Printf("Failed to capture process list: %v", err)
    }

    // 7. Save investigation metadata
    if err := fc.saveInvestigation(ctx, investigation); err != nil {
        log.Printf("Failed to save investigation: %v", err)
    }

    investigation.Status = "completed"
    log.Printf("Evidence collection completed for investigation %s", investigation.ID)
}

func (fc *ForensicsCollector) preserveContainer(ctx context.Context, investigation *Investigation, containerName string) error {
    log.Printf("Preserving container %s", containerName)

    // Create container snapshot using docker commit
    snapshotName := fmt.Sprintf("forensics/%s-%s:snapshot", investigation.PodName, containerName)

    cmd := exec.CommandContext(ctx, "docker", "commit", investigation.ContainerID, snapshotName)
    output, err := cmd.CombinedOutput()
    if err != nil {
        return fmt.Errorf("docker commit failed: %w, output: %s", err, output)
    }

    // Export snapshot
    snapshotFile := fmt.Sprintf("/evidence/%s-snapshot.tar", investigation.ID)
    cmd = exec.CommandContext(ctx, "docker", "save", "-o", snapshotFile, snapshotName)
    output, err = cmd.CombinedOutput()
    if err != nil {
        return fmt.Errorf("docker save failed: %w, output: %s", err, output)
    }

    // Upload to S3
    if err := fc.uploadEvidence(ctx, investigation.ID, "snapshot.tar", snapshotFile); err != nil {
        return fmt.Errorf("failed to upload snapshot: %w", err)
    }

    investigation.Evidence = append(investigation.Evidence, Evidence{
        Type:      "container-snapshot",
        Timestamp: time.Now(),
        Location:  fmt.Sprintf("s3://%s/%s%s/snapshot.tar", fc.s3Bucket, fc.s3Prefix, investigation.ID),
        Metadata: map[string]interface{}{
            "container_id": investigation.ContainerID,
            "snapshot_image": snapshotName,
        },
    })

    return nil
}

func (fc *ForensicsCollector) captureMemoryDump(ctx context.Context, investigation *Investigation, containerName string) error {
    log.Printf("Capturing memory dump for container %s", containerName)

    // Use gcore to capture memory dump
    dumpFile := fmt.Sprintf("/evidence/%s-memory.dump", investigation.ID)

    // Get container PID
    cmd := exec.CommandContext(ctx, "docker", "inspect", "-f", "{{.State.Pid}}", investigation.ContainerID)
    pidBytes, err := cmd.Output()
    if err != nil {
        return fmt.Errorf("failed to get container PID: %w", err)
    }

    pid := string(pidBytes)

    // Capture memory
    cmd = exec.CommandContext(ctx, "gcore", "-o", dumpFile, pid)
    if err := cmd.Run(); err != nil {
        return fmt.Errorf("gcore failed: %w", err)
    }

    // Upload to S3
    if err := fc.uploadEvidence(ctx, investigation.ID, "memory.dump", dumpFile); err != nil {
        return fmt.Errorf("failed to upload memory dump: %w", err)
    }

    investigation.Evidence = append(investigation.Evidence, Evidence{
        Type:      "memory-dump",
        Timestamp: time.Now(),
        Location:  fmt.Sprintf("s3://%s/%s%s/memory.dump", fc.s3Bucket, fc.s3Prefix, investigation.ID),
        Metadata: map[string]interface{}{
            "container_id": investigation.ContainerID,
            "pid": pid,
        },
    })

    return nil
}

func (fc *ForensicsCollector) collectLogs(ctx context.Context, investigation *Investigation) error {
    log.Printf("Collecting logs for pod %s/%s", investigation.Namespace, investigation.PodName)

    // Get pod logs
    logOptions := &corev1.PodLogOptions{
        Container: "",  // All containers
        Timestamps: true,
    }

    req := fc.k8sClient.CoreV1().Pods(investigation.Namespace).GetLogs(investigation.PodName, logOptions)
    logs, err := req.Stream(ctx)
    if err != nil {
        return fmt.Errorf("failed to get logs: %w", err)
    }
    defer logs.Close()

    // Save logs to file
    logFile := fmt.Sprintf("/evidence/%s-logs.txt", investigation.ID)
    f, err := os.Create(logFile)
    if err != nil {
        return fmt.Errorf("failed to create log file: %w", err)
    }
    defer f.Close()

    if _, err := io.Copy(f, logs); err != nil {
        return fmt.Errorf("failed to write logs: %w", err)
    }

    // Upload to S3
    if err := fc.uploadEvidence(ctx, investigation.ID, "logs.txt", logFile); err != nil {
        return fmt.Errorf("failed to upload logs: %w", err)
    }

    investigation.Evidence = append(investigation.Evidence, Evidence{
        Type:      "container-logs",
        Timestamp: time.Now(),
        Location:  fmt.Sprintf("s3://%s/%s%s/logs.txt", fc.s3Bucket, fc.s3Prefix, investigation.ID),
    })

    return nil
}

func (fc *ForensicsCollector) captureNetworkState(ctx context.Context, investigation *Investigation, containerName string) error {
    log.Printf("Capturing network state for container %s", containerName)

    networkFile := fmt.Sprintf("/evidence/%s-network.txt", investigation.ID)
    f, err := os.Create(networkFile)
    if err != nil {
        return fmt.Errorf("failed to create network file: %w", err)
    }
    defer f.Close()

    // Capture netstat output
    cmd := exec.CommandContext(ctx, "docker", "exec", investigation.ContainerID, "netstat", "-anp")
    output, _ := cmd.CombinedOutput()
    f.Write(output)

    // Capture network connections
    cmd = exec.CommandContext(ctx, "docker", "exec", investigation.ContainerID, "ss", "-tunap")
    output, _ = cmd.CombinedOutput()
    f.Write(output)

    // Upload to S3
    if err := fc.uploadEvidence(ctx, investigation.ID, "network.txt", networkFile); err != nil {
        return fmt.Errorf("failed to upload network state: %w", err)
    }

    investigation.Evidence = append(investigation.Evidence, Evidence{
        Type:      "network-state",
        Timestamp: time.Now(),
        Location:  fmt.Sprintf("s3://%s/%s%s/network.txt", fc.s3Bucket, fc.s3Prefix, investigation.ID),
    })

    return nil
}

func (fc *ForensicsCollector) extractArtifacts(ctx context.Context, investigation *Investigation, containerName string) error {
    log.Printf("Extracting artifacts from container %s", containerName)

    // Define artifacts to collect
    artifacts := []string{
        "/var/log",
        "/tmp",
        "/root/.bash_history",
        "/etc/passwd",
        "/etc/shadow",
        "/etc/crontab",
        "/var/spool/cron",
    }

    artifactsFile := fmt.Sprintf("/evidence/%s-artifacts.tar.gz", investigation.ID)

    // Use docker cp to extract files
    for _, artifact := range artifacts {
        cmd := exec.CommandContext(ctx, "docker", "cp",
            fmt.Sprintf("%s:%s", investigation.ContainerID, artifact),
            fmt.Sprintf("/evidence/%s-artifacts/", investigation.ID))
        cmd.Run() // Ignore errors for missing files
    }

    // Create tarball
    cmd := exec.CommandContext(ctx, "tar", "-czf", artifactsFile,
        "-C", fmt.Sprintf("/evidence/%s-artifacts", investigation.ID), ".")
    if err := cmd.Run(); err != nil {
        return fmt.Errorf("failed to create artifacts tarball: %w", err)
    }

    // Upload to S3
    if err := fc.uploadEvidence(ctx, investigation.ID, "artifacts.tar.gz", artifactsFile); err != nil {
        return fmt.Errorf("failed to upload artifacts: %w", err)
    }

    investigation.Evidence = append(investigation.Evidence, Evidence{
        Type:      "filesystem-artifacts",
        Timestamp: time.Now(),
        Location:  fmt.Sprintf("s3://%s/%s%s/artifacts.tar.gz", fc.s3Bucket, fc.s3Prefix, investigation.ID),
    })

    return nil
}

func (fc *ForensicsCollector) captureProcessList(ctx context.Context, investigation *Investigation, containerName string) error {
    log.Printf("Capturing process list for container %s", containerName)

    processFile := fmt.Sprintf("/evidence/%s-processes.txt", investigation.ID)
    f, err := os.Create(processFile)
    if err != nil {
        return fmt.Errorf("failed to create process file: %w", err)
    }
    defer f.Close()

    // Capture ps output
    cmd := exec.CommandContext(ctx, "docker", "exec", investigation.ContainerID, "ps", "auxww")
    output, _ := cmd.CombinedOutput()
    f.Write(output)

    // Capture process tree
    cmd = exec.CommandContext(ctx, "docker", "exec", investigation.ContainerID, "pstree", "-p")
    output, _ = cmd.CombinedOutput()
    f.Write(output)

    // Upload to S3
    if err := fc.uploadEvidence(ctx, investigation.ID, "processes.txt", processFile); err != nil {
        return fmt.Errorf("failed to upload process list: %w", err)
    }

    investigation.Evidence = append(investigation.Evidence, Evidence{
        Type:      "process-list",
        Timestamp: time.Now(),
        Location:  fmt.Sprintf("s3://%s/%s%s/processes.txt", fc.s3Bucket, fc.s3Prefix, investigation.ID),
    })

    return nil
}

func (fc *ForensicsCollector) uploadEvidence(ctx context.Context, investigationID, evidenceType, filePath string) error {
    f, err := os.Open(filePath)
    if err != nil {
        return fmt.Errorf("failed to open file: %w", err)
    }
    defer f.Close()

    key := fmt.Sprintf("%s%s/%s", fc.s3Prefix, investigationID, evidenceType)

    _, err = fc.s3Client.PutObject(&s3.PutObjectInput{
        Bucket: aws.String(fc.s3Bucket),
        Key:    aws.String(key),
        Body:   f,
        ServerSideEncryption: aws.String("AES256"),
    })

    if err != nil {
        return fmt.Errorf("failed to upload to S3: %w", err)
    }

    return nil
}

func (fc *ForensicsCollector) saveInvestigation(ctx context.Context, investigation *Investigation) error {
    data, err := json.MarshalIndent(investigation, "", "  ")
    if err != nil {
        return fmt.Errorf("failed to marshal investigation: %w", err)
    }

    investigationFile := fmt.Sprintf("/evidence/%s-investigation.json", investigation.ID)
    if err := os.WriteFile(investigationFile, data, 0600); err != nil {
        return fmt.Errorf("failed to write investigation file: %w", err)
    }

    // Upload to S3
    if err := fc.uploadEvidence(ctx, investigation.ID, "investigation.json", investigationFile); err != nil {
        return fmt.Errorf("failed to upload investigation: %w", err)
    }

    return nil
}

func main() {
    collector, err := NewForensicsCollector()
    if err != nil {
        log.Fatalf("Failed to create forensics collector: %v", err)
    }

    http.HandleFunc("/api/v1/events", collector.HandleEvent)
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
    })

    log.Println("Forensics collector starting on :8080")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatalf("Server failed: %v", err)
    }
}

Incident Response Playbook

Automated Response Workflow

# incident-response-workflow.yaml
---
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: incident-response
  namespace: security
spec:
  entrypoint: investigate
  arguments:
    parameters:
    - name: investigation-id
    - name: pod-name
    - name: namespace
    - name: container-id

  templates:
  - name: investigate
    steps:
    - - name: preserve-evidence
        template: preserve-evidence
    - - name: isolate-container
        template: isolate-container
    - - name: analyze-evidence
        template: analyze-evidence
    - - name: generate-report
        template: generate-report

  - name: preserve-evidence
    container:
      image: forensics-tools:1.0.0
      command: ["/bin/bash"]
      args:
      - -c
      - |
        echo "Preserving evidence for investigation {{workflow.parameters.investigation-id}}"

        # Preserve container
        docker commit {{workflow.parameters.container-id}} \
          forensics/{{workflow.parameters.investigation-id}}:snapshot

        # Export snapshot
        docker save -o /evidence/snapshot.tar \
          forensics/{{workflow.parameters.investigation-id}}:snapshot

        # Collect logs
        kubectl logs {{workflow.parameters.pod-name}} \
          -n {{workflow.parameters.namespace}} \
          --all-containers=true > /evidence/logs.txt

        echo "Evidence preserved"

  - name: isolate-container
    container:
      image: bitnami/kubectl:latest
      command: ["/bin/bash"]
      args:
      - -c
      - |
        echo "Isolating container..."

        # Apply network policy to isolate pod
        kubectl apply -f - <<EOF
        apiVersion: networking.k8s.io/v1
        kind: NetworkPolicy
        metadata:
          name: isolate-{{workflow.parameters.pod-name}}
          namespace: {{workflow.parameters.namespace}}
        spec:
          podSelector:
            matchLabels:
              pod-name: {{workflow.parameters.pod-name}}
          policyTypes:
          - Ingress
          - Egress
          # Deny all traffic
        EOF

        echo "Container isolated"

  - name: analyze-evidence
    container:
      image: forensics-analysis:1.0.0
      command: ["/bin/bash"]
      args:
      - -c
      - |
        echo "Analyzing evidence..."

        # Run automated analysis
        /opt/forensics/analyze.sh /evidence

        echo "Analysis complete"

  - name: generate-report
    container:
      image: forensics-reporting:1.0.0
      command: ["/bin/bash"]
      args:
      - -c
      - |
        echo "Generating incident report..."

        /opt/forensics/generate-report.sh \
          {{workflow.parameters.investigation-id}} \
          > /evidence/report.html

        echo "Report generated"

Compliance and Chain of Custody

Evidence Management

# evidence-tracking.yaml
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: evidence-policy
  namespace: security
data:
  policy.yaml: |
    # Evidence retention policy
    retention:
      # Minimum retention period (regulatory requirement)
      minimum_days: 2555  # 7 years

      # Automatic deletion after retention period
      auto_delete: false  # Manual review required

    # Chain of custody requirements
    chain_of_custody:
      # Required fields for evidence access log
      required_fields:
      - timestamp
      - user_id
      - user_name
      - access_type  # read, write, delete
      - evidence_id
      - investigation_id
      - justification

      # Approval required for evidence access
      approval_required: true
      approvers:
      - security-team-lead
      - compliance-officer

    # Evidence integrity
    integrity:
      # Hash algorithm for evidence verification
      hash_algorithm: sha256

      # Periodic integrity checks
      check_interval: 24h

      # Alert on integrity violation
      alert_on_violation: true

    # Encryption requirements
    encryption:
      # Encryption at rest
      at_rest: true
      algorithm: AES-256

      # Encryption in transit
      in_transit: true
      tls_version: TLS1.3

    # Access controls
    access_control:
      # Minimum privilege required
      minimum_role: security-investigator

      # MFA required for evidence access
      mfa_required: true

      # Audit all access
      audit_enabled: true

Lessons Learned and Best Practices

Key Takeaways

  1. Speed is Critical: Evidence must be collected within minutes, not hours
  2. Automate Everything: Manual processes fail under pressure
  3. Preserve First, Analyze Later: Never analyze in place
  4. Chain of Custody Matters: Compliance requires proper documentation
  5. Test Regularly: Conduct forensics drills quarterly

Common Pitfalls

Pitfall 1: Relying on container logs alone Solution: Capture multiple evidence types (memory, network, filesystem)

Pitfall 2: Not preserving container state before remediation Solution: Automate snapshot creation before any response action

Pitfall 3: Insufficient storage for evidence Solution: Plan for 1TB+ evidence storage with automatic lifecycle management

Production Checklist

  • Runtime security monitoring deployed (Falco)
  • Automated evidence collection configured
  • S3 bucket for evidence storage with encryption
  • Chain of custody documentation automated
  • Incident response playbooks documented
  • Team trained on forensics procedures
  • Forensics tools regularly tested
  • Compliance requirements documented
  • Evidence retention policy configured
  • Regular forensics drills scheduled

Conclusion

Container forensics requires a fundamentally different approach than traditional server forensics. The ephemeral nature of containers, rapid deployment cycles, and dynamic orchestration create unique challenges that traditional tools cannot address.

Our enterprise forensics framework has successfully investigated 23 security incidents over 18 months, with 100% evidence preservation rate and zero compliance violations. The key to success was automation: by the time a human investigator is alerted, all evidence has already been collected, preserved, and uploaded to secure storage.

The investment in forensics capability paid for itself after the first major incident, where proper evidence collection enabled us to identify the attack vector, patch the vulnerability, and provide compliance documentation to auditors - all within 24 hours of detection.