Container Forensics and Incident Response: Enterprise Security Framework for Kubernetes and Docker Investigations
At 2:47 AM, our Security Operations Center detected anomalous network traffic from a production container. Within minutes, we discovered a sophisticated attack: an attacker had exploited a vulnerability, gained container access, and was actively exfiltrating customer data. This incident exposed a critical gap in our security posture - we had no forensics capability for containerized environments. By the time we understood what happened, evidence was gone, containers deleted, and logs incomplete. This is the complete story of how we built an enterprise-grade container forensics and incident response framework that has since detected and contained 23 security incidents with zero data loss.
This comprehensive guide covers container forensics fundamentals, evidence collection procedures, threat hunting techniques, compliance requirements, and automated investigation workflows for production Kubernetes and Docker environments.
The Problem: Evidence Volatility in Container Environments
Why Traditional Forensics Fails
Traditional server forensics assumes long-lived, persistent infrastructure. Containers break every assumption:
# Traditional Server Forensics Timeline:
Hour 0: Incident detected
Hour 1: Image server for forensics
Hour 2-8: Analyze disk image
Hour 12: Present findings
# Container Reality:
Minute 0: Incident detected
Minute 2: Container restarted by orchestrator
Minute 3: Evidence destroyed
Minute 4: "What container?"
Our first incident revealed the problem starkly:
# Initial detection
$ kubectl get events --sort-by='.lastTimestamp' | head -5
LAST SEEN TYPE REASON OBJECT MESSAGE
3m Warning Unhealthy pod/webapp-7d4f9b8c-4xk2p Port 8080 not responding
2m Normal Killing pod/webapp-7d4f9b8c-4xk2p Stopping container
1m Normal Pulled pod/webapp-7d4f9b8c-7n9k8 Successfully pulled image
45s Normal Created pod/webapp-7d4f9b8c-7n9k8 Created container
30s Normal Started pod/webapp-7d4f9b8c-7n9k8 Started container
# Try to investigate original container
$ kubectl logs webapp-7d4f9b8c-4xk2p
Error from server (NotFound): pods "webapp-7d4f9b8c-4xk2p" not found
# Pod already replaced!
$ kubectl describe pod webapp-7d4f9b8c-7n9k8
Events:
Type Reason Age Message
---- ------ ---- -------
Normal Pulled 2m Successfully pulled image
Normal Created 2m Created container
Normal Started 2m Started container
# No forensic data available
# Attack evidence completely lost
Building the Forensics Framework
Based on this painful lesson, we designed a comprehensive framework:
┌────────────────────────────────────────────────────────────────┐
│ Container Forensics Framework Architecture │
├────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Detection Layer │ │
│ │ • Runtime monitoring (Falco) │ │
│ │ • Network traffic analysis │ │
│ │ • Behavioral anomaly detection │ │
│ │ • Log aggregation and correlation │ │
│ └────────────────┬────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Preservation Layer │ │
│ │ • Automated snapshot creation │ │
│ │ • Memory dump capture │ │
│ │ • Network packet capture │ │
│ │ • Log preservation │ │
│ │ • Chain of custody tracking │ │
│ └────────────────┬────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Analysis Layer │ │
│ │ • Automated triage │ │
│ │ • Threat intelligence correlation │ │
│ │ • Timeline reconstruction │ │
│ │ • Indicator of Compromise (IoC) extraction │ │
│ └────────────────┬────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Response Layer │ │
│ │ • Automated containment │ │
│ │ • Evidence collection │ │
│ │ • Remediation workflows │ │
│ │ • Compliance reporting │ │
│ └─────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────┘
Phase 1: Detection and Monitoring
Runtime Security with Falco
Deploy Falco for real-time container activity monitoring:
# falco-deployment.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: falco
namespace: security
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: falco
rules:
- apiGroups: [""]
resources: ["pods", "namespaces", "nodes"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: falco
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: falco
subjects:
- kind: ServiceAccount
name: falco
namespace: security
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: falco
namespace: security
spec:
selector:
matchLabels:
app: falco
template:
metadata:
labels:
app: falco
spec:
serviceAccountName: falco
hostNetwork: true
hostPID: true
tolerations:
- effect: NoSchedule
key: node-role.kubernetes.io/master
containers:
- name: falco
image: falcosecurity/falco:0.36.2
securityContext:
privileged: true
args:
- /usr/bin/falco
- -K
- /var/run/secrets/kubernetes.io/serviceaccount/token
- -k
- https://kubernetes.default
- -pk
volumeMounts:
- name: dev
mountPath: /host/dev
- name: proc
mountPath: /host/proc
readOnly: true
- name: boot
mountPath: /host/boot
readOnly: true
- name: lib-modules
mountPath: /host/lib/modules
readOnly: true
- name: usr
mountPath: /host/usr
readOnly: true
- name: etc
mountPath: /host/etc
readOnly: true
- name: falco-config
mountPath: /etc/falco
- name: falco-rules
mountPath: /etc/falco/rules.d
env:
- name: FALCO_K8S_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumes:
- name: dev
hostPath:
path: /dev
- name: proc
hostPath:
path: /proc
- name: boot
hostPath:
path: /boot
- name: lib-modules
hostPath:
path: /lib/modules
- name: usr
hostPath:
path: /usr
- name: etc
hostPath:
path: /etc
- name: falco-config
configMap:
name: falco-config
- name: falco-rules
configMap:
name: falco-custom-rules
---
apiVersion: v1
kind: ConfigMap
metadata:
name: falco-config
namespace: security
data:
falco.yaml: |
# File containing Falco rules
rules_file:
- /etc/falco/falco_rules.yaml
- /etc/falco/falco_rules.local.yaml
- /etc/falco/k8s_audit_rules.yaml
- /etc/falco/rules.d
# Outputs
json_output: true
json_include_output_property: true
json_include_tags_property: true
# File output
file_output:
enabled: true
keep_alive: false
filename: /var/log/falco/events.json
# Program output (for automated response)
program_output:
enabled: true
keep_alive: false
program: |
jq -c . | while read event; do
curl -X POST http://forensics-collector.security.svc.cluster.local/api/v1/events \
-H "Content-Type: application/json" \
-d "$event"
done
# HTTP output
http_output:
enabled: true
url: http://forensics-collector.security.svc.cluster.local/api/v1/events
# Priority
priority: warning
# Buffer
buffered_outputs: false
# Rate limiting
outputs_rate: 100
outputs_max_burst: 1000
---
apiVersion: v1
kind: ConfigMap
metadata:
name: falco-custom-rules
namespace: security
data:
custom-rules.yaml: |
# Custom forensics-focused rules
- rule: Container Shell Spawned
desc: Detect shell spawned in container (potential compromise)
condition: >
spawned_process and
container and
proc.name in (shell_binaries)
output: >
Shell spawned in container
(user=%user.name container_id=%container.id container_name=%container.name
image=%container.image.repository:%container.image.tag
shell=%proc.name parent=%proc.pname cmdline=%proc.cmdline)
priority: WARNING
tags: [forensics, shell, container]
- rule: Suspicious Network Activity
desc: Detect suspicious outbound network connections
condition: >
outbound and
container and
not fd.sip in (allowed_destinations) and
fd.sport not in (53, 80, 443)
output: >
Suspicious network connection from container
(user=%user.name container_id=%container.id container_name=%container.name
image=%container.image.repository:%container.image.tag
connection=%fd.name direction=%fd.direction)
priority: WARNING
tags: [forensics, network, container]
- rule: File System Modification in Container
desc: Detect unexpected file modifications
condition: >
open_write and
container and
not fd.name in (allowed_write_paths) and
not proc.name in (allowed_processes)
output: >
Unexpected file write in container
(user=%user.name container_id=%container.id container_name=%container.name
image=%container.image.repository:%container.image.tag
file=%fd.name process=%proc.name cmdline=%proc.cmdline)
priority: WARNING
tags: [forensics, filesystem, container]
- rule: Privilege Escalation Attempt
desc: Detect attempts to escalate privileges
condition: >
spawned_process and
container and
proc.name in (privilege_escalation_binaries)
output: >
Privilege escalation attempt in container
(user=%user.name container_id=%container.id container_name=%container.name
image=%container.image.repository:%container.image.tag
process=%proc.name cmdline=%proc.cmdline)
priority: CRITICAL
tags: [forensics, privilege-escalation, container]
- rule: Container Drift Detected
desc: Detect execution of binary not in original image
condition: >
spawned_process and
container and
not proc.is_container_image_process
output: >
Container drift detected - binary not in original image
(user=%user.name container_id=%container.id container_name=%container.name
image=%container.image.repository:%container.image.tag
process=%proc.name cmdline=%proc.cmdline exe=%proc.exe)
priority: ERROR
tags: [forensics, drift, container]
# Crypto mining detection
- rule: Crypto Mining Activity
desc: Detect crypto mining indicators
condition: >
spawned_process and
container and
(proc.name in (crypto_miners) or
proc.cmdline contains "stratum" or
proc.cmdline contains "xmrig" or
proc.cmdline contains "minerd")
output: >
Crypto mining detected in container
(user=%user.name container_id=%container.id container_name=%container.name
image=%container.image.repository:%container.image.tag
process=%proc.name cmdline=%proc.cmdline)
priority: CRITICAL
tags: [forensics, crypto-mining, container]
# C2 communication detection
- rule: Command and Control Communication
desc: Detect potential C2 communication
condition: >
outbound and
container and
(fd.sip in (known_c2_ips) or
fd.rip in (known_c2_ips) or
fd.rip_name in (known_c2_domains))
output: >
Potential C2 communication from container
(user=%user.name container_id=%container.id container_name=%container.name
image=%container.image.repository:%container.image.tag
connection=%fd.name ip=%fd.rip)
priority: CRITICAL
tags: [forensics, c2, network, container]
Automated Evidence Collection Service
# forensics-collector.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: forensics-collector
namespace: security
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: forensics-collector
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "pods/exec"]
verbs: ["get", "list", "watch", "create"]
- apiGroups: [""]
resources: ["namespaces", "nodes", "events"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets", "daemonsets", "statefulsets"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: forensics-collector
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: forensics-collector
subjects:
- kind: ServiceAccount
name: forensics-collector
namespace: security
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: forensics-collector
namespace: security
spec:
replicas: 2
selector:
matchLabels:
app: forensics-collector
template:
metadata:
labels:
app: forensics-collector
spec:
serviceAccountName: forensics-collector
containers:
- name: collector
image: forensics-collector:1.0.0
ports:
- containerPort: 8080
name: http
env:
- name: STORAGE_BACKEND
value: "s3"
- name: S3_BUCKET
value: "forensics-evidence"
- name: S3_PREFIX
value: "investigations/"
- name: PRESERVATION_ENABLED
value: "true"
- name: AUTO_SNAPSHOT_ENABLED
value: "true"
volumeMounts:
- name: evidence
mountPath: /evidence
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2000m
memory: 4Gi
volumes:
- name: evidence
persistentVolumeClaim:
claimName: forensics-evidence
---
apiVersion: v1
kind: Service
metadata:
name: forensics-collector
namespace: security
spec:
selector:
app: forensics-collector
ports:
- port: 80
targetPort: 8080
name: http
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: forensics-evidence
namespace: security
spec:
accessModes:
- ReadWriteOnce
storageClassName: fast-ssd
resources:
requests:
storage: 500Gi
Forensics Collector Implementation
// forensics-collector/main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/exec"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
type ForensicsCollector struct {
k8sClient *kubernetes.Clientset
s3Client *s3.S3
s3Bucket string
s3Prefix string
}
type FalcoEvent struct {
Time time.Time `json:"time"`
Priority string `json:"priority"`
Rule string `json:"rule"`
Output string `json:"output"`
Tags []string `json:"tags"`
OutputFields map[string]interface{} `json:"output_fields"`
}
type Investigation struct {
ID string `json:"id"`
StartTime time.Time `json:"start_time"`
TriggerEvent FalcoEvent `json:"trigger_event"`
ContainerID string `json:"container_id"`
PodName string `json:"pod_name"`
Namespace string `json:"namespace"`
NodeName string `json:"node_name"`
Evidence []Evidence `json:"evidence"`
Status string `json:"status"`
Metadata map[string]interface{} `json:"metadata"`
}
type Evidence struct {
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Location string `json:"location"`
Size int64 `json:"size"`
Hash string `json:"hash"`
Metadata map[string]interface{} `json:"metadata"`
}
func NewForensicsCollector() (*ForensicsCollector, error) {
// Initialize Kubernetes client
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to get in-cluster config: %w", err)
}
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create k8s client: %w", err)
}
// Initialize S3 client
sess := session.Must(session.NewSession())
s3Client := s3.New(sess)
return &ForensicsCollector{
k8sClient: k8sClient,
s3Client: s3Client,
s3Bucket: os.Getenv("S3_BUCKET"),
s3Prefix: os.Getenv("S3_PREFIX"),
}, nil
}
func (fc *ForensicsCollector) HandleEvent(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var event FalcoEvent
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
http.Error(w, fmt.Sprintf("Failed to decode event: %v", err), http.StatusBadRequest)
return
}
// Check if event requires forensic investigation
if fc.requiresInvestigation(event) {
investigation, err := fc.initiateInvestigation(event)
if err != nil {
log.Printf("Failed to initiate investigation: %v", err)
http.Error(w, fmt.Sprintf("Failed to initiate investigation: %v", err), http.StatusInternalServerError)
return
}
log.Printf("Investigation initiated: %s", investigation.ID)
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(investigation)
return
}
w.WriteHeader(http.StatusOK)
}
func (fc *ForensicsCollector) requiresInvestigation(event FalcoEvent) bool {
// Check priority
if event.Priority == "CRITICAL" || event.Priority == "ERROR" {
return true
}
// Check tags
forensicsTags := []string{"shell", "network", "privilege-escalation", "crypto-mining", "c2"}
for _, tag := range event.Tags {
for _, forensicsTag := range forensicsTags {
if tag == forensicsTag {
return true
}
}
}
return false
}
func (fc *ForensicsCollector) initiateInvestigation(event FalcoEvent) (*Investigation, error) {
ctx := context.Background()
// Extract container details from event
containerID, _ := event.OutputFields["container_id"].(string)
containerName, _ := event.OutputFields["container_name"].(string)
namespace, _ := event.OutputFields["k8s_ns_name"].(string)
podName, _ := event.OutputFields["k8s_pod_name"].(string)
investigation := &Investigation{
ID: fmt.Sprintf("INV-%d", time.Now().Unix()),
StartTime: time.Now(),
TriggerEvent: event,
ContainerID: containerID,
PodName: podName,
Namespace: namespace,
Status: "in-progress",
Evidence: []Evidence{},
Metadata: make(map[string]interface{}),
}
// Get pod details
pod, err := fc.k8sClient.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pod: %w", err)
}
investigation.NodeName = pod.Spec.NodeName
investigation.Metadata["pod_uid"] = string(pod.UID)
investigation.Metadata["pod_labels"] = pod.Labels
investigation.Metadata["pod_annotations"] = pod.Annotations
// Collect evidence asynchronously
go fc.collectEvidence(investigation, pod, containerName)
return investigation, nil
}
func (fc *ForensicsCollector) collectEvidence(investigation *Investigation, pod interface{}, containerName string) {
ctx := context.Background()
log.Printf("Collecting evidence for investigation %s", investigation.ID)
// 1. Preserve container snapshot
if err := fc.preserveContainer(ctx, investigation, containerName); err != nil {
log.Printf("Failed to preserve container: %v", err)
}
// 2. Capture memory dump
if err := fc.captureMemoryDump(ctx, investigation, containerName); err != nil {
log.Printf("Failed to capture memory dump: %v", err)
}
// 3. Collect logs
if err := fc.collectLogs(ctx, investigation); err != nil {
log.Printf("Failed to collect logs: %v", err)
}
// 4. Capture network connections
if err := fc.captureNetworkState(ctx, investigation, containerName); err != nil {
log.Printf("Failed to capture network state: %v", err)
}
// 5. Extract file system artifacts
if err := fc.extractArtifacts(ctx, investigation, containerName); err != nil {
log.Printf("Failed to extract artifacts: %v", err)
}
// 6. Capture process list
if err := fc.captureProcessList(ctx, investigation, containerName); err != nil {
log.Printf("Failed to capture process list: %v", err)
}
// 7. Save investigation metadata
if err := fc.saveInvestigation(ctx, investigation); err != nil {
log.Printf("Failed to save investigation: %v", err)
}
investigation.Status = "completed"
log.Printf("Evidence collection completed for investigation %s", investigation.ID)
}
func (fc *ForensicsCollector) preserveContainer(ctx context.Context, investigation *Investigation, containerName string) error {
log.Printf("Preserving container %s", containerName)
// Create container snapshot using docker commit
snapshotName := fmt.Sprintf("forensics/%s-%s:snapshot", investigation.PodName, containerName)
cmd := exec.CommandContext(ctx, "docker", "commit", investigation.ContainerID, snapshotName)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("docker commit failed: %w, output: %s", err, output)
}
// Export snapshot
snapshotFile := fmt.Sprintf("/evidence/%s-snapshot.tar", investigation.ID)
cmd = exec.CommandContext(ctx, "docker", "save", "-o", snapshotFile, snapshotName)
output, err = cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("docker save failed: %w, output: %s", err, output)
}
// Upload to S3
if err := fc.uploadEvidence(ctx, investigation.ID, "snapshot.tar", snapshotFile); err != nil {
return fmt.Errorf("failed to upload snapshot: %w", err)
}
investigation.Evidence = append(investigation.Evidence, Evidence{
Type: "container-snapshot",
Timestamp: time.Now(),
Location: fmt.Sprintf("s3://%s/%s%s/snapshot.tar", fc.s3Bucket, fc.s3Prefix, investigation.ID),
Metadata: map[string]interface{}{
"container_id": investigation.ContainerID,
"snapshot_image": snapshotName,
},
})
return nil
}
func (fc *ForensicsCollector) captureMemoryDump(ctx context.Context, investigation *Investigation, containerName string) error {
log.Printf("Capturing memory dump for container %s", containerName)
// Use gcore to capture memory dump
dumpFile := fmt.Sprintf("/evidence/%s-memory.dump", investigation.ID)
// Get container PID
cmd := exec.CommandContext(ctx, "docker", "inspect", "-f", "{{.State.Pid}}", investigation.ContainerID)
pidBytes, err := cmd.Output()
if err != nil {
return fmt.Errorf("failed to get container PID: %w", err)
}
pid := string(pidBytes)
// Capture memory
cmd = exec.CommandContext(ctx, "gcore", "-o", dumpFile, pid)
if err := cmd.Run(); err != nil {
return fmt.Errorf("gcore failed: %w", err)
}
// Upload to S3
if err := fc.uploadEvidence(ctx, investigation.ID, "memory.dump", dumpFile); err != nil {
return fmt.Errorf("failed to upload memory dump: %w", err)
}
investigation.Evidence = append(investigation.Evidence, Evidence{
Type: "memory-dump",
Timestamp: time.Now(),
Location: fmt.Sprintf("s3://%s/%s%s/memory.dump", fc.s3Bucket, fc.s3Prefix, investigation.ID),
Metadata: map[string]interface{}{
"container_id": investigation.ContainerID,
"pid": pid,
},
})
return nil
}
func (fc *ForensicsCollector) collectLogs(ctx context.Context, investigation *Investigation) error {
log.Printf("Collecting logs for pod %s/%s", investigation.Namespace, investigation.PodName)
// Get pod logs
logOptions := &corev1.PodLogOptions{
Container: "", // All containers
Timestamps: true,
}
req := fc.k8sClient.CoreV1().Pods(investigation.Namespace).GetLogs(investigation.PodName, logOptions)
logs, err := req.Stream(ctx)
if err != nil {
return fmt.Errorf("failed to get logs: %w", err)
}
defer logs.Close()
// Save logs to file
logFile := fmt.Sprintf("/evidence/%s-logs.txt", investigation.ID)
f, err := os.Create(logFile)
if err != nil {
return fmt.Errorf("failed to create log file: %w", err)
}
defer f.Close()
if _, err := io.Copy(f, logs); err != nil {
return fmt.Errorf("failed to write logs: %w", err)
}
// Upload to S3
if err := fc.uploadEvidence(ctx, investigation.ID, "logs.txt", logFile); err != nil {
return fmt.Errorf("failed to upload logs: %w", err)
}
investigation.Evidence = append(investigation.Evidence, Evidence{
Type: "container-logs",
Timestamp: time.Now(),
Location: fmt.Sprintf("s3://%s/%s%s/logs.txt", fc.s3Bucket, fc.s3Prefix, investigation.ID),
})
return nil
}
func (fc *ForensicsCollector) captureNetworkState(ctx context.Context, investigation *Investigation, containerName string) error {
log.Printf("Capturing network state for container %s", containerName)
networkFile := fmt.Sprintf("/evidence/%s-network.txt", investigation.ID)
f, err := os.Create(networkFile)
if err != nil {
return fmt.Errorf("failed to create network file: %w", err)
}
defer f.Close()
// Capture netstat output
cmd := exec.CommandContext(ctx, "docker", "exec", investigation.ContainerID, "netstat", "-anp")
output, _ := cmd.CombinedOutput()
f.Write(output)
// Capture network connections
cmd = exec.CommandContext(ctx, "docker", "exec", investigation.ContainerID, "ss", "-tunap")
output, _ = cmd.CombinedOutput()
f.Write(output)
// Upload to S3
if err := fc.uploadEvidence(ctx, investigation.ID, "network.txt", networkFile); err != nil {
return fmt.Errorf("failed to upload network state: %w", err)
}
investigation.Evidence = append(investigation.Evidence, Evidence{
Type: "network-state",
Timestamp: time.Now(),
Location: fmt.Sprintf("s3://%s/%s%s/network.txt", fc.s3Bucket, fc.s3Prefix, investigation.ID),
})
return nil
}
func (fc *ForensicsCollector) extractArtifacts(ctx context.Context, investigation *Investigation, containerName string) error {
log.Printf("Extracting artifacts from container %s", containerName)
// Define artifacts to collect
artifacts := []string{
"/var/log",
"/tmp",
"/root/.bash_history",
"/etc/passwd",
"/etc/shadow",
"/etc/crontab",
"/var/spool/cron",
}
artifactsFile := fmt.Sprintf("/evidence/%s-artifacts.tar.gz", investigation.ID)
// Use docker cp to extract files
for _, artifact := range artifacts {
cmd := exec.CommandContext(ctx, "docker", "cp",
fmt.Sprintf("%s:%s", investigation.ContainerID, artifact),
fmt.Sprintf("/evidence/%s-artifacts/", investigation.ID))
cmd.Run() // Ignore errors for missing files
}
// Create tarball
cmd := exec.CommandContext(ctx, "tar", "-czf", artifactsFile,
"-C", fmt.Sprintf("/evidence/%s-artifacts", investigation.ID), ".")
if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to create artifacts tarball: %w", err)
}
// Upload to S3
if err := fc.uploadEvidence(ctx, investigation.ID, "artifacts.tar.gz", artifactsFile); err != nil {
return fmt.Errorf("failed to upload artifacts: %w", err)
}
investigation.Evidence = append(investigation.Evidence, Evidence{
Type: "filesystem-artifacts",
Timestamp: time.Now(),
Location: fmt.Sprintf("s3://%s/%s%s/artifacts.tar.gz", fc.s3Bucket, fc.s3Prefix, investigation.ID),
})
return nil
}
func (fc *ForensicsCollector) captureProcessList(ctx context.Context, investigation *Investigation, containerName string) error {
log.Printf("Capturing process list for container %s", containerName)
processFile := fmt.Sprintf("/evidence/%s-processes.txt", investigation.ID)
f, err := os.Create(processFile)
if err != nil {
return fmt.Errorf("failed to create process file: %w", err)
}
defer f.Close()
// Capture ps output
cmd := exec.CommandContext(ctx, "docker", "exec", investigation.ContainerID, "ps", "auxww")
output, _ := cmd.CombinedOutput()
f.Write(output)
// Capture process tree
cmd = exec.CommandContext(ctx, "docker", "exec", investigation.ContainerID, "pstree", "-p")
output, _ = cmd.CombinedOutput()
f.Write(output)
// Upload to S3
if err := fc.uploadEvidence(ctx, investigation.ID, "processes.txt", processFile); err != nil {
return fmt.Errorf("failed to upload process list: %w", err)
}
investigation.Evidence = append(investigation.Evidence, Evidence{
Type: "process-list",
Timestamp: time.Now(),
Location: fmt.Sprintf("s3://%s/%s%s/processes.txt", fc.s3Bucket, fc.s3Prefix, investigation.ID),
})
return nil
}
func (fc *ForensicsCollector) uploadEvidence(ctx context.Context, investigationID, evidenceType, filePath string) error {
f, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer f.Close()
key := fmt.Sprintf("%s%s/%s", fc.s3Prefix, investigationID, evidenceType)
_, err = fc.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(fc.s3Bucket),
Key: aws.String(key),
Body: f,
ServerSideEncryption: aws.String("AES256"),
})
if err != nil {
return fmt.Errorf("failed to upload to S3: %w", err)
}
return nil
}
func (fc *ForensicsCollector) saveInvestigation(ctx context.Context, investigation *Investigation) error {
data, err := json.MarshalIndent(investigation, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal investigation: %w", err)
}
investigationFile := fmt.Sprintf("/evidence/%s-investigation.json", investigation.ID)
if err := os.WriteFile(investigationFile, data, 0600); err != nil {
return fmt.Errorf("failed to write investigation file: %w", err)
}
// Upload to S3
if err := fc.uploadEvidence(ctx, investigation.ID, "investigation.json", investigationFile); err != nil {
return fmt.Errorf("failed to upload investigation: %w", err)
}
return nil
}
func main() {
collector, err := NewForensicsCollector()
if err != nil {
log.Fatalf("Failed to create forensics collector: %v", err)
}
http.HandleFunc("/api/v1/events", collector.HandleEvent)
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
log.Println("Forensics collector starting on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatalf("Server failed: %v", err)
}
}
Incident Response Playbook
Automated Response Workflow
# incident-response-workflow.yaml
---
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: incident-response
namespace: security
spec:
entrypoint: investigate
arguments:
parameters:
- name: investigation-id
- name: pod-name
- name: namespace
- name: container-id
templates:
- name: investigate
steps:
- - name: preserve-evidence
template: preserve-evidence
- - name: isolate-container
template: isolate-container
- - name: analyze-evidence
template: analyze-evidence
- - name: generate-report
template: generate-report
- name: preserve-evidence
container:
image: forensics-tools:1.0.0
command: ["/bin/bash"]
args:
- -c
- |
echo "Preserving evidence for investigation {{workflow.parameters.investigation-id}}"
# Preserve container
docker commit {{workflow.parameters.container-id}} \
forensics/{{workflow.parameters.investigation-id}}:snapshot
# Export snapshot
docker save -o /evidence/snapshot.tar \
forensics/{{workflow.parameters.investigation-id}}:snapshot
# Collect logs
kubectl logs {{workflow.parameters.pod-name}} \
-n {{workflow.parameters.namespace}} \
--all-containers=true > /evidence/logs.txt
echo "Evidence preserved"
- name: isolate-container
container:
image: bitnami/kubectl:latest
command: ["/bin/bash"]
args:
- -c
- |
echo "Isolating container..."
# Apply network policy to isolate pod
kubectl apply -f - <<EOF
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: isolate-{{workflow.parameters.pod-name}}
namespace: {{workflow.parameters.namespace}}
spec:
podSelector:
matchLabels:
pod-name: {{workflow.parameters.pod-name}}
policyTypes:
- Ingress
- Egress
# Deny all traffic
EOF
echo "Container isolated"
- name: analyze-evidence
container:
image: forensics-analysis:1.0.0
command: ["/bin/bash"]
args:
- -c
- |
echo "Analyzing evidence..."
# Run automated analysis
/opt/forensics/analyze.sh /evidence
echo "Analysis complete"
- name: generate-report
container:
image: forensics-reporting:1.0.0
command: ["/bin/bash"]
args:
- -c
- |
echo "Generating incident report..."
/opt/forensics/generate-report.sh \
{{workflow.parameters.investigation-id}} \
> /evidence/report.html
echo "Report generated"
Compliance and Chain of Custody
Evidence Management
# evidence-tracking.yaml
---
apiVersion: v1
kind: ConfigMap
metadata:
name: evidence-policy
namespace: security
data:
policy.yaml: |
# Evidence retention policy
retention:
# Minimum retention period (regulatory requirement)
minimum_days: 2555 # 7 years
# Automatic deletion after retention period
auto_delete: false # Manual review required
# Chain of custody requirements
chain_of_custody:
# Required fields for evidence access log
required_fields:
- timestamp
- user_id
- user_name
- access_type # read, write, delete
- evidence_id
- investigation_id
- justification
# Approval required for evidence access
approval_required: true
approvers:
- security-team-lead
- compliance-officer
# Evidence integrity
integrity:
# Hash algorithm for evidence verification
hash_algorithm: sha256
# Periodic integrity checks
check_interval: 24h
# Alert on integrity violation
alert_on_violation: true
# Encryption requirements
encryption:
# Encryption at rest
at_rest: true
algorithm: AES-256
# Encryption in transit
in_transit: true
tls_version: TLS1.3
# Access controls
access_control:
# Minimum privilege required
minimum_role: security-investigator
# MFA required for evidence access
mfa_required: true
# Audit all access
audit_enabled: true
Lessons Learned and Best Practices
Key Takeaways
- Speed is Critical: Evidence must be collected within minutes, not hours
- Automate Everything: Manual processes fail under pressure
- Preserve First, Analyze Later: Never analyze in place
- Chain of Custody Matters: Compliance requires proper documentation
- Test Regularly: Conduct forensics drills quarterly
Common Pitfalls
Pitfall 1: Relying on container logs alone Solution: Capture multiple evidence types (memory, network, filesystem)
Pitfall 2: Not preserving container state before remediation Solution: Automate snapshot creation before any response action
Pitfall 3: Insufficient storage for evidence Solution: Plan for 1TB+ evidence storage with automatic lifecycle management
Production Checklist
- Runtime security monitoring deployed (Falco)
- Automated evidence collection configured
- S3 bucket for evidence storage with encryption
- Chain of custody documentation automated
- Incident response playbooks documented
- Team trained on forensics procedures
- Forensics tools regularly tested
- Compliance requirements documented
- Evidence retention policy configured
- Regular forensics drills scheduled
Conclusion
Container forensics requires a fundamentally different approach than traditional server forensics. The ephemeral nature of containers, rapid deployment cycles, and dynamic orchestration create unique challenges that traditional tools cannot address.
Our enterprise forensics framework has successfully investigated 23 security incidents over 18 months, with 100% evidence preservation rate and zero compliance violations. The key to success was automation: by the time a human investigator is alerted, all evidence has already been collected, preserved, and uploaded to secure storage.
The investment in forensics capability paid for itself after the first major incident, where proper evidence collection enabled us to identify the attack vector, patch the vulnerability, and provide compliance documentation to auditors - all within 24 hours of detection.