Advanced CoreDNS Forwarding and Split-Horizon DNS: Enterprise Network Architecture for Complex Multi-Domain Environments
Advanced DNS forwarding architectures enable organizations to implement sophisticated network topologies with precise control over domain resolution across different network segments. CoreDNS provides the flexibility needed to build complex split-horizon DNS systems that support enterprise requirements for network segmentation, security isolation, and performance optimization.
Executive Summary
Enterprise networks require sophisticated DNS forwarding strategies that enable different network segments to resolve domains according to their specific access patterns and security requirements. CoreDNS’s advanced forwarding capabilities support complex split-horizon DNS architectures, conditional forwarding based on client location, and intelligent routing strategies that optimize both security and performance. This guide presents production-ready configurations and architectural patterns for implementing enterprise-grade DNS forwarding systems.
Advanced DNS Forwarding Architecture
Split-Horizon DNS Foundation
Split-horizon DNS enables different DNS responses based on the source of the query, allowing organizations to present different views of their network infrastructure to internal and external users:
# Advanced CoreDNS configuration for split-horizon DNS
apiVersion: v1
kind: ConfigMap
metadata:
name: coredns-split-horizon-config
namespace: dns-system
labels:
app: coredns-enterprise
component: split-horizon
data:
Corefile: |
# Internal corporate network view
(internal-view) {
# Corporate certificate validation
tls /etc/ssl/certs/internal-ca.pem /etc/ssl/private/internal-key.pem {
client_auth require_and_verify_client_cert /etc/ssl/certs/internal-ca.pem
}
# Comprehensive logging for audit compliance
log {
class all
format "{type} {name} {rcode} {>rflags} {>bufsize} {>do} {>id} {remote} {size} {duration} {>opcode}"
}
# High-performance caching for internal queries
cache 3600 {
success 50000 3600
denial 10000 300
prefetch 100 900s 80%
serve_stale 86400s
}
# Health monitoring
health {
lameduck 10s
}
# Metrics for internal monitoring
prometheus :9153 {
path /internal-metrics
}
}
# External network view configuration
(external-view) {
# Rate limiting for external queries
ratelimit {
per_second 50
per_client 5
whitelist 203.0.113.0/24 198.51.100.0/24
blacklist 192.0.2.0/24
}
# Basic logging for external queries
log {
class denial error
}
# Conservative caching for external queries
cache 300 {
success 10000 300
denial 5000 60
}
# External health endpoint
health {
lameduck 5s
}
# Public metrics endpoint
prometheus :9154 {
path /external-metrics
}
}
# Internal corporate domains - comprehensive resolution
company.internal:53 {
import internal-view
# Multi-source resolution strategy
hosts /etc/coredns/internal-hosts {
ttl 300
reload 30s
fallthrough
}
# Database-backed dynamic records
mysql {
dsn "dns_user:dns_pass@tcp(mysql.dns-system:3306)/dns_db"
query "SELECT ip FROM dns_records WHERE domain = ? AND type = 'A' AND zone = 'internal'"
ttl 300
}
# Template-based service discovery
template IN A {
match "^(.+)-([0-9]+)\.company\.internal\.$"
answer "{{ .Name }} 300 IN A {{ index (service (printf \"%s.%s\" (.Group 1) (.Group 2))) \"clusterIP\" }}"
fallthrough
}
# Kubernetes service integration
kubernetes cluster.local in-addr.arpa ip6.arpa {
pods insecure
fallthrough in-addr.arpa ip6.arpa
ttl 30
endpoint_pod_names
upstream /etc/resolv.conf
}
# Conditional forwarding to regional DNS
forward . dns-internal.company.com:53 {
max_concurrent 1000
policy sequential
health_check 15s
tls_servername dns-internal.company.com
}
}
# Development environment domains
dev.company.internal:53 {
import internal-view
# Development-specific host mappings
hosts /etc/coredns/dev-hosts {
ttl 60
reload 15s
fallthrough
}
# Forward to development DNS servers
forward . 192.168.100.10 192.168.100.11 {
max_concurrent 500
policy round_robin
health_check 10s
}
}
# Staging environment with restricted access
staging.company.internal:53 {
# Access control based on client IP
acl {
allow 192.168.200.0/24
allow 10.100.0.0/16
deny all
}
import internal-view
# Staging host mappings
hosts /etc/coredns/staging-hosts {
ttl 120
reload 20s
fallthrough
}
# Forward to staging infrastructure
forward . staging-dns.company.internal:53 {
max_concurrent 200
policy sequential
health_check 20s
}
}
# External public domains
company.com:53 {
import external-view
# Public records from authoritative sources
hosts /etc/coredns/public-hosts {
ttl 300
reload 60s
fallthrough
}
# GeoDNS for global load balancing
geodns {
config /etc/coredns/geodns.conf
fallthrough
}
# Forward to public authoritative servers
forward . ns1.company.com:53 ns2.company.com:53 {
max_concurrent 2000
policy random
health_check 30s
}
}
# Partner network domains
partner.company.com:53 {
# Partner-specific access control
acl {
allow 203.0.113.0/24 # Partner Network A
allow 198.51.100.0/24 # Partner Network B
deny all
}
import external-view
# Partner host mappings
hosts /etc/coredns/partner-hosts {
ttl 600
reload 45s
fallthrough
}
# Multi-region partner DNS forwarding
forward . partner-dns-east.company.com:53 partner-dns-west.company.com:53 {
max_concurrent 500
policy sequential
health_check 25s
}
}
# Cloud provider domains
aws.company.internal:53 {
import internal-view
# AWS Route53 integration
route53 {
region us-east-1
hosted_zone_id ZXXXXXXXXXXXXX
fallthrough
}
# Forward to AWS internal resolvers
forward . 169.254.169.253:53 {
max_concurrent 1000
policy sequential
}
}
azure.company.internal:53 {
import internal-view
# Azure DNS integration
azuredns {
subscription_id "xxxx-xxxx-xxxx-xxxx"
resource_group "dns-resources"
fallthrough
}
# Forward to Azure internal resolvers
forward . 168.63.129.16:53 {
max_concurrent 1000
policy sequential
}
}
# Default forwarding for unknown domains
.:53 {
import external-view
# DNS over HTTPS for enhanced security
forward . tls://1.1.1.1:853 tls://8.8.8.8:853 {
tls_servername cloudflare-dns.com
max_concurrent 2000
policy random
health_check 60s
}
# Fallback to traditional DNS
forward . 8.8.8.8:53 1.1.1.1:53 {
max_concurrent 1000
policy random
health_check 30s
}
}
Advanced Host Mapping Configuration
# Internal network host mappings
apiVersion: v1
kind: ConfigMap
metadata:
name: coredns-internal-hosts
namespace: dns-system
data:
internal-hosts: |
# Executive infrastructure
10.0.1.10 ceo-desktop.company.internal
10.0.1.11 cfo-laptop.company.internal
10.0.1.12 boardroom-system.company.internal
# IT Infrastructure
10.0.10.10 ad-controller-1.company.internal dc1.company.internal
10.0.10.11 ad-controller-2.company.internal dc2.company.internal
10.0.10.20 exchange-server.company.internal mail.company.internal
10.0.10.30 sharepoint.company.internal portal.company.internal
# Database infrastructure
10.0.20.10 db-primary.company.internal database.company.internal
10.0.20.11 db-secondary.company.internal db-backup.company.internal
10.0.20.20 redis-cluster.company.internal cache.company.internal
# Application servers
10.0.30.10 app-server-1.company.internal
10.0.30.11 app-server-2.company.internal
10.0.30.12 app-server-3.company.internal
# Load balancers
10.0.40.10 lb-internal.company.internal
10.0.40.11 lb-external.company.internal
10.0.40.20 api-gateway.company.internal api.company.internal
# Monitoring infrastructure
10.0.50.10 prometheus.company.internal monitoring.company.internal
10.0.50.11 grafana.company.internal dashboards.company.internal
10.0.50.12 alertmanager.company.internal alerts.company.internal
# Security infrastructure
10.0.60.10 security-scanner.company.internal
10.0.60.11 vulnerability-db.company.internal
10.0.60.12 siem-system.company.internal
---
# Development environment hosts
apiVersion: v1
kind: ConfigMap
metadata:
name: coredns-dev-hosts
namespace: dns-system
data:
dev-hosts: |
# Development API endpoints
192.168.100.10 api-dev.dev.company.internal
192.168.100.11 web-dev.dev.company.internal
192.168.100.12 db-dev.dev.company.internal
# Developer workstations
192.168.100.50 dev-workstation-1.dev.company.internal
192.168.100.51 dev-workstation-2.dev.company.internal
192.168.100.52 dev-workstation-3.dev.company.internal
# CI/CD infrastructure
192.168.100.100 jenkins-dev.dev.company.internal
192.168.100.101 gitlab-dev.dev.company.internal
192.168.100.102 docker-registry-dev.dev.company.internal
---
# Staging environment hosts
apiVersion: v1
kind: ConfigMap
metadata:
name: coredns-staging-hosts
namespace: dns-system
data:
staging-hosts: |
# Staging application stack
192.168.200.10 api-staging.staging.company.internal
192.168.200.11 web-staging.staging.company.internal
192.168.200.12 db-staging.staging.company.internal
# Staging load balancers
192.168.200.20 lb-staging.staging.company.internal
# Testing infrastructure
192.168.200.30 test-runner.staging.company.internal
192.168.200.31 performance-tester.staging.company.internal
---
# Public-facing hosts
apiVersion: v1
kind: ConfigMap
metadata:
name: coredns-public-hosts
namespace: dns-system
data:
public-hosts: |
# Public web properties
203.0.113.10 www.company.com
203.0.113.11 api.company.com
203.0.113.12 cdn.company.com
# Email infrastructure
203.0.113.20 mail.company.com mx1.company.com
203.0.113.21 mail2.company.com mx2.company.com
# Support infrastructure
203.0.113.30 support.company.com help.company.com
203.0.113.31 docs.company.com documentation.company.com
---
# Partner network hosts
apiVersion: v1
kind: ConfigMap
metadata:
name: coredns-partner-hosts
namespace: dns-system
data:
partner-hosts: |
# Partner A infrastructure
198.51.100.10 partner-a-api.partner.company.com
198.51.100.11 partner-a-web.partner.company.com
# Partner B infrastructure
198.51.100.20 partner-b-api.partner.company.com
198.51.100.21 partner-b-web.partner.company.com
# Shared partner services
198.51.100.100 shared-auth.partner.company.com
198.51.100.101 shared-data.partner.company.com
Geographic DNS and Multi-Region Forwarding
Geographic DNS Configuration
# GeoDNS configuration for global load balancing
apiVersion: v1
kind: ConfigMap
metadata:
name: coredns-geodns-config
namespace: dns-system
data:
geodns.conf: |
# Global configuration
[global]
geoip_database = "/etc/geoip/GeoLite2-City.mmdb"
default_region = "us-east"
# Regional endpoint mappings
[regions]
us-east = ["us", "ca", "mx"]
us-west = ["us-west"]
eu-west = ["gb", "ie", "fr", "de", "es", "it", "nl", "be"]
eu-central = ["pl", "cz", "at", "ch", "hu"]
asia-pacific = ["jp", "kr", "au", "nz", "sg", "hk"]
# Service endpoint definitions
[services]
api.company.com = {
us-east = "api-use1.company.com",
us-west = "api-usw1.company.com",
eu-west = "api-euw1.company.com",
eu-central = "api-euc1.company.com",
asia-pacific = "api-ap1.company.com"
}
web.company.com = {
us-east = "web-use1.company.com",
us-west = "web-usw1.company.com",
eu-west = "web-euw1.company.com",
eu-central = "web-euc1.company.com",
asia-pacific = "web-ap1.company.com"
}
# Health check configuration
[healthchecks]
api-use1.company.com = "https://api-use1.company.com/health"
api-usw1.company.com = "https://api-usw1.company.com/health"
api-euw1.company.com = "https://api-euw1.company.com/health"
api-euc1.company.com = "https://api-euc1.company.com/health"
api-ap1.company.com = "https://api-ap1.company.com/health"
# Failover configuration
[failover]
api.company.com = ["api-use1.company.com", "api-usw1.company.com", "api-euw1.company.com"]
web.company.com = ["web-use1.company.com", "web-usw1.company.com", "web-euw1.company.com"]
---
# Geographic deployment for multi-region CoreDNS
apiVersion: apps/v1
kind: Deployment
metadata:
name: coredns-geo-us-east
namespace: dns-system
labels:
app: coredns-geo
region: us-east
spec:
replicas: 3
selector:
matchLabels:
app: coredns-geo
region: us-east
template:
metadata:
labels:
app: coredns-geo
region: us-east
spec:
serviceAccountName: coredns-geo
# Regional node placement
nodeSelector:
topology.kubernetes.io/region: us-east-1
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
app: coredns-geo
region: us-east
topologyKey: kubernetes.io/hostname
containers:
- name: coredns
image: coredns/coredns:1.10.1
# Enhanced resource allocation for geo-distributed workloads
resources:
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 2000m
memory: 2Gi
env:
- name: REGION
value: "us-east"
- name: DATACENTER
value: "us-east-1"
ports:
- containerPort: 53
name: dns-udp
protocol: UDP
- containerPort: 53
name: dns-tcp
protocol: TCP
- containerPort: 9153
name: metrics
protocol: TCP
volumeMounts:
- name: config
mountPath: /etc/coredns
readOnly: true
- name: geodns-config
mountPath: /etc/coredns/geodns.conf
subPath: geodns.conf
readOnly: true
- name: geoip-data
mountPath: /etc/geoip
readOnly: true
# Regional health checks
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8181
initialDelaySeconds: 5
periodSeconds: 5
volumes:
- name: config
configMap:
name: coredns-split-horizon-config
- name: geodns-config
configMap:
name: coredns-geodns-config
- name: geoip-data
configMap:
name: geoip-database
Conditional Forwarding Based on Client Location
#!/usr/bin/env python3
"""
Dynamic DNS forwarding controller for geographic and network-based routing
"""
import ipaddress
import json
import yaml
from kubernetes import client, config
from typing import Dict, List, Set, Tuple
import geoip2.database
import geoip2.errors
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ConditionalForwardingController:
def __init__(self, namespace: str = "dns-system"):
try:
config.load_incluster_config()
except:
config.load_kube_config()
self.v1 = client.CoreV1Api()
self.namespace = namespace
# Load GeoIP database
self.geoip_reader = geoip2.database.Reader('/etc/geoip/GeoLite2-City.mmdb')
# Network segment definitions
self.network_segments = {
'internal': [
ipaddress.IPv4Network('10.0.0.0/8'),
ipaddress.IPv4Network('172.16.0.0/12'),
ipaddress.IPv4Network('192.168.0.0/16')
],
'dmz': [
ipaddress.IPv4Network('203.0.113.0/24'),
ipaddress.IPv4Network('198.51.100.0/24')
],
'partners': [
ipaddress.IPv4Network('203.0.113.128/25'),
ipaddress.IPv4Network('198.51.100.128/25')
],
'external': [
ipaddress.IPv4Network('0.0.0.0/0') # Catch-all for external traffic
]
}
# Forwarding rules
self.forwarding_rules = self._load_forwarding_rules()
def _load_forwarding_rules(self) -> Dict:
"""Load DNS forwarding rules from configuration"""
return {
'internal': {
'company.internal': ['dns-internal-1.company.com:53', 'dns-internal-2.company.com:53'],
'dev.company.internal': ['dns-dev.company.com:53'],
'staging.company.internal': ['dns-staging.company.com:53']
},
'dmz': {
'company.com': ['ns1.company.com:53', 'ns2.company.com:53'],
'partner.company.com': ['partner-dns.company.com:53']
},
'partners': {
'partner.company.com': ['partner-dns-1.company.com:53', 'partner-dns-2.company.com:53'],
'shared.company.com': ['shared-dns.company.com:53']
},
'external': {
'.': ['8.8.8.8:53', '1.1.1.1:53']
}
}
def determine_client_segment(self, client_ip: str) -> str:
"""Determine network segment based on client IP"""
try:
client_addr = ipaddress.IPv4Address(client_ip)
for segment_name, networks in self.network_segments.items():
for network in networks:
if client_addr in network:
if segment_name == 'external':
# Further classify external clients by geography
try:
response = self.geoip_reader.city(client_ip)
country_code = response.country.iso_code
if country_code in ['US', 'CA', 'MX']:
return 'external-na'
elif country_code in ['GB', 'IE', 'FR', 'DE', 'ES', 'IT', 'NL', 'BE']:
return 'external-eu'
elif country_code in ['JP', 'KR', 'AU', 'NZ', 'SG', 'HK']:
return 'external-apac'
else:
return 'external'
except geoip2.errors.AddressNotFoundError:
return 'external'
return segment_name
return 'external'
except ipaddress.AddressValueError:
logger.warning(f"Invalid IP address: {client_ip}")
return 'external'
def generate_conditional_config(self) -> str:
"""Generate CoreDNS configuration with conditional forwarding"""
config_template = """
# Conditional forwarding configuration based on client location
(conditional-forwarding) {{
errors
log {{
class all
format "{{{{.Type}}}} {{{{.Name}}}} {{{{.Rcode}}}} {{{{.Remote}}}} {{{{.Duration}}}}"
}}
health {{
lameduck 10s
}}
ready
cache 300 {{
success 10000 300
denial 5000 60
prefetch 50 60s 30%
}}
prometheus :9153
}}
# Client-based forwarding rules
{forwarding_blocks}
# Default forwarding
.:53 {{
import conditional-forwarding
forward . 8.8.8.8:53 1.1.1.1:53 {{
max_concurrent 1000
policy random
health_check 30s
}}
}}
"""
forwarding_blocks = []
# Generate forwarding blocks for each network segment
for segment, rules in self.forwarding_rules.items():
segment_block = self._generate_segment_block(segment, rules)
forwarding_blocks.append(segment_block)
return config_template.format(
forwarding_blocks='\n\n'.join(forwarding_blocks)
)
def _generate_segment_block(self, segment: str, rules: Dict[str, List[str]]) -> str:
"""Generate CoreDNS configuration block for a network segment"""
# ACL rules based on segment
acl_rules = self._generate_acl_rules(segment)
segment_blocks = []
for domain, forwarders in rules.items():
forwarder_list = ' '.join(forwarders)
domain_block = f"""
# {segment.upper()} segment - {domain}
{domain}:53 {{
{acl_rules}
import conditional-forwarding
forward . {forwarder_list} {{
max_concurrent 1000
policy sequential
health_check 15s
}}
}}"""
segment_blocks.append(domain_block)
return '\n'.join(segment_blocks)
def _generate_acl_rules(self, segment: str) -> str:
"""Generate ACL rules for network segment"""
if segment == 'internal':
return """ acl {
allow 10.0.0.0/8
allow 172.16.0.0/12
allow 192.168.0.0/16
deny all
}"""
elif segment == 'dmz':
return """ acl {
allow 203.0.113.0/24
allow 198.51.100.0/24
allow 10.0.0.0/8
allow 172.16.0.0/12
allow 192.168.0.0/16
deny all
}"""
elif segment == 'partners':
return """ acl {
allow 203.0.113.128/25
allow 198.51.100.128/25
deny all
}"""
else:
return """ # External access - no restrictions"""
def update_coredns_configuration(self):
"""Update CoreDNS configuration with conditional forwarding rules"""
try:
# Generate new configuration
new_config = self.generate_conditional_config()
# Update ConfigMap
cm = self.v1.read_namespaced_config_map(
name="coredns-split-horizon-config",
namespace=self.namespace
)
cm.data["Corefile"] = new_config
self.v1.patch_namespaced_config_map(
name="coredns-split-horizon-config",
namespace=self.namespace,
body=cm
)
logger.info("CoreDNS configuration updated with conditional forwarding rules")
# Trigger configuration reload
self._reload_coredns_pods()
except Exception as e:
logger.error(f"Failed to update CoreDNS configuration: {e}")
def _reload_coredns_pods(self):
"""Trigger graceful reload of CoreDNS pods"""
try:
pods = self.v1.list_namespaced_pod(
namespace=self.namespace,
label_selector="app=coredns-enterprise"
)
for pod in pods.items:
logger.info(f"Triggering configuration reload for pod {pod.metadata.name}")
# Configuration reload happens automatically via the reload plugin
# No explicit action needed
except Exception as e:
logger.error(f"Failed to reload CoreDNS pods: {e}")
def monitor_network_changes(self):
"""Monitor network topology changes and update forwarding rules"""
import time
while True:
try:
# Check for network topology changes
self._detect_network_changes()
# Update configuration if changes detected
self.update_coredns_configuration()
# Wait for next check interval
time.sleep(300) # 5 minutes
except Exception as e:
logger.error(f"Error during network monitoring: {e}")
time.sleep(60) # Shorter retry interval on error
def _detect_network_changes(self):
"""Detect changes in network topology that require configuration updates"""
# Implementation would monitor:
# - Changes in service endpoints
# - New network segments
# - DNS server health status
# - Geographic routing changes
pass
if __name__ == "__main__":
controller = ConditionalForwardingController()
# Start monitoring network changes
controller.monitor_network_changes()
Database-Backed Dynamic DNS
MySQL Integration for Dynamic Records
# MySQL database configuration for dynamic DNS records
apiVersion: apps/v1
kind: Deployment
metadata:
name: dns-mysql
namespace: dns-system
spec:
replicas: 1
selector:
matchLabels:
app: dns-mysql
template:
metadata:
labels:
app: dns-mysql
spec:
containers:
- name: mysql
image: mysql:8.0
env:
- name: MYSQL_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: dns-mysql-secret
key: root-password
- name: MYSQL_DATABASE
value: "dns_db"
- name: MYSQL_USER
value: "dns_user"
- name: MYSQL_PASSWORD
valueFrom:
secretKeyRef:
name: dns-mysql-secret
key: user-password
ports:
- containerPort: 3306
volumeMounts:
- name: mysql-data
mountPath: /var/lib/mysql
- name: mysql-config
mountPath: /etc/mysql/conf.d
readOnly: true
# Resource allocation
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1000m
memory: 2Gi
# Health checks
livenessProbe:
exec:
command:
- mysqladmin
- ping
- -h
- localhost
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
exec:
command:
- mysqladmin
- ping
- -h
- localhost
initialDelaySeconds: 5
periodSeconds: 5
volumes:
- name: mysql-data
persistentVolumeClaim:
claimName: dns-mysql-pvc
- name: mysql-config
configMap:
name: dns-mysql-config
---
# MySQL service
apiVersion: v1
kind: Service
metadata:
name: dns-mysql
namespace: dns-system
spec:
selector:
app: dns-mysql
ports:
- port: 3306
targetPort: 3306
type: ClusterIP
---
# Database schema initialization
apiVersion: v1
kind: ConfigMap
metadata:
name: dns-mysql-init
namespace: dns-system
data:
init.sql: |
CREATE DATABASE IF NOT EXISTS dns_db;
USE dns_db;
-- DNS records table
CREATE TABLE IF NOT EXISTS dns_records (
id INT AUTO_INCREMENT PRIMARY KEY,
domain VARCHAR(255) NOT NULL,
type ENUM('A', 'AAAA', 'CNAME', 'MX', 'TXT', 'SRV', 'PTR') NOT NULL,
value VARCHAR(255) NOT NULL,
ttl INT DEFAULT 300,
zone VARCHAR(255) NOT NULL,
priority INT DEFAULT 0,
weight INT DEFAULT 0,
port INT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_domain_type (domain, type),
INDEX idx_zone (zone)
);
-- DNS zones table
CREATE TABLE IF NOT EXISTS dns_zones (
id INT AUTO_INCREMENT PRIMARY KEY,
zone_name VARCHAR(255) NOT NULL UNIQUE,
zone_type ENUM('internal', 'external', 'partner') NOT NULL,
authoritative BOOLEAN DEFAULT FALSE,
serial_number INT DEFAULT 1,
refresh_interval INT DEFAULT 3600,
retry_interval INT DEFAULT 1800,
expire_interval INT DEFAULT 604800,
minimum_ttl INT DEFAULT 300,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- Load balancer health status table
CREATE TABLE IF NOT EXISTS lb_health_status (
id INT AUTO_INCREMENT PRIMARY KEY,
endpoint VARCHAR(255) NOT NULL,
status ENUM('healthy', 'unhealthy', 'unknown') NOT NULL,
response_time_ms INT DEFAULT 0,
last_check TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
consecutive_failures INT DEFAULT 0,
UNIQUE KEY unique_endpoint (endpoint)
);
-- Insert sample data
INSERT INTO dns_zones (zone_name, zone_type, authoritative) VALUES
('company.internal', 'internal', TRUE),
('dev.company.internal', 'internal', TRUE),
('partner.company.com', 'partner', TRUE),
('company.com', 'external', FALSE);
INSERT INTO dns_records (domain, type, value, ttl, zone) VALUES
('api.company.internal', 'A', '10.0.30.10', 300, 'internal'),
('api.company.internal', 'A', '10.0.30.11', 300, 'internal'),
('api.company.internal', 'A', '10.0.30.12', 300, 'internal'),
('web.company.internal', 'A', '10.0.40.10', 300, 'internal'),
('web.company.internal', 'A', '10.0.40.11', 300, 'internal'),
('database.company.internal', 'A', '10.0.20.10', 300, 'internal'),
('mail.company.com', 'A', '203.0.113.20', 300, 'external'),
('www.company.com', 'A', '203.0.113.10', 300, 'external');
-- Create user and grant permissions
CREATE USER IF NOT EXISTS 'dns_user'@'%' IDENTIFIED BY 'secure_dns_password';
GRANT SELECT, INSERT, UPDATE, DELETE ON dns_db.* TO 'dns_user'@'%';
FLUSH PRIVILEGES;
Dynamic DNS Record Management
#!/usr/bin/env python3
"""
Dynamic DNS record management system for CoreDNS
"""
import mysql.connector
from kubernetes import client, config
import json
import time
import logging
from typing import List, Dict, Any, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DNSRecordManager:
def __init__(self, db_config: Dict[str, str]):
self.db_config = db_config
self.connection = None
# Initialize Kubernetes client
try:
config.load_incluster_config()
except:
config.load_kube_config()
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
def connect_database(self):
"""Connect to MySQL database"""
try:
self.connection = mysql.connector.connect(
host=self.db_config['host'],
port=self.db_config['port'],
user=self.db_config['user'],
password=self.db_config['password'],
database=self.db_config['database']
)
logger.info("Connected to DNS database")
except mysql.connector.Error as e:
logger.error(f"Failed to connect to database: {e}")
raise
def get_dns_records(self, zone: str = None) -> List[Dict[str, Any]]:
"""Retrieve DNS records from database"""
if not self.connection or not self.connection.is_connected():
self.connect_database()
cursor = self.connection.cursor(dictionary=True)
if zone:
query = """
SELECT domain, type, value, ttl, priority, weight, port
FROM dns_records
WHERE zone = %s AND ttl > 0
ORDER BY domain, type
"""
cursor.execute(query, (zone,))
else:
query = """
SELECT domain, type, value, ttl, priority, weight, port, zone
FROM dns_records
WHERE ttl > 0
ORDER BY zone, domain, type
"""
cursor.execute(query)
records = cursor.fetchall()
cursor.close()
return records
def add_dns_record(self, domain: str, record_type: str, value: str,
ttl: int = 300, zone: str = 'internal',
priority: int = 0, weight: int = 0, port: int = 0):
"""Add a new DNS record"""
if not self.connection or not self.connection.is_connected():
self.connect_database()
cursor = self.connection.cursor()
query = """
INSERT INTO dns_records (domain, type, value, ttl, zone, priority, weight, port)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(query, (domain, record_type.upper(), value, ttl, zone, priority, weight, port))
self.connection.commit()
cursor.close()
logger.info(f"Added DNS record: {domain} {record_type} {value}")
def update_dns_record(self, domain: str, record_type: str, old_value: str,
new_value: str, zone: str = 'internal'):
"""Update existing DNS record"""
if not self.connection or not self.connection.is_connected():
self.connect_database()
cursor = self.connection.cursor()
query = """
UPDATE dns_records
SET value = %s, updated_at = CURRENT_TIMESTAMP
WHERE domain = %s AND type = %s AND value = %s AND zone = %s
"""
cursor.execute(query, (new_value, domain, record_type.upper(), old_value, zone))
self.connection.commit()
cursor.close()
logger.info(f"Updated DNS record: {domain} {record_type} {old_value} -> {new_value}")
def delete_dns_record(self, domain: str, record_type: str, value: str, zone: str = 'internal'):
"""Delete DNS record"""
if not self.connection or not self.connection.is_connected():
self.connect_database()
cursor = self.connection.cursor()
query = """
DELETE FROM dns_records
WHERE domain = %s AND type = %s AND value = %s AND zone = %s
"""
cursor.execute(query, (domain, record_type.upper(), value, zone))
self.connection.commit()
cursor.close()
logger.info(f"Deleted DNS record: {domain} {record_type} {value}")
def sync_kubernetes_services(self):
"""Synchronize Kubernetes services to DNS records"""
try:
# Get all services across all namespaces
services = self.v1.list_service_for_all_namespaces()
for service in services.items:
namespace = service.metadata.namespace
service_name = service.metadata.name
# Skip system services
if namespace in ['kube-system', 'kube-public', 'kube-node-lease']:
continue
# Generate DNS name
dns_name = f"{service_name}.{namespace}.company.internal"
# Get service endpoints
try:
endpoints = self.v1.read_namespaced_endpoints(
name=service_name,
namespace=namespace
)
# Extract endpoint IPs
endpoint_ips = []
if endpoints.subsets:
for subset in endpoints.subsets:
if subset.addresses:
for address in subset.addresses:
endpoint_ips.append(address.ip)
# Update DNS records
if endpoint_ips:
self._update_service_dns_records(dns_name, endpoint_ips, 'internal')
except client.exceptions.ApiException as e:
if e.status != 404: # Ignore not found errors
logger.warning(f"Failed to get endpoints for {service_name}: {e}")
except Exception as e:
logger.error(f"Failed to sync Kubernetes services: {e}")
def _update_service_dns_records(self, domain: str, ips: List[str], zone: str):
"""Update DNS records for a service"""
# Get existing records
existing_records = self.get_dns_records_for_domain(domain, zone)
existing_ips = {record['value'] for record in existing_records}
new_ips = set(ips)
# Add new IPs
for ip in new_ips - existing_ips:
self.add_dns_record(domain, 'A', ip, 300, zone)
# Remove old IPs
for ip in existing_ips - new_ips:
self.delete_dns_record(domain, 'A', ip, zone)
def get_dns_records_for_domain(self, domain: str, zone: str) -> List[Dict[str, Any]]:
"""Get all DNS records for a specific domain"""
if not self.connection or not self.connection.is_connected():
self.connect_database()
cursor = self.connection.cursor(dictionary=True)
query = """
SELECT domain, type, value, ttl, priority, weight, port
FROM dns_records
WHERE domain = %s AND zone = %s
ORDER BY type, value
"""
cursor.execute(query, (domain, zone))
records = cursor.fetchall()
cursor.close()
return records
def monitor_service_health(self):
"""Monitor service health and update DNS records accordingly"""
while True:
try:
# Get all registered endpoints
cursor = self.connection.cursor(dictionary=True)
cursor.execute("""
SELECT DISTINCT value as ip, domain, zone
FROM dns_records
WHERE type = 'A' AND zone IN ('internal', 'external')
""")
endpoints = cursor.fetchall()
cursor.close()
# Check health of each endpoint
for endpoint in endpoints:
health_status = self._check_endpoint_health(
endpoint['ip'],
endpoint['domain']
)
self._update_health_status(
endpoint['ip'],
health_status
)
# Wait for next health check cycle
time.sleep(30)
except Exception as e:
logger.error(f"Error during health monitoring: {e}")
time.sleep(60) # Retry after error
def _check_endpoint_health(self, ip: str, domain: str) -> Dict[str, Any]:
"""Check health of a specific endpoint"""
import requests
try:
# Try health check endpoint
health_url = f"http://{ip}/health"
start_time = time.time()
response = requests.get(health_url, timeout=5)
response_time = (time.time() - start_time) * 1000 # Convert to ms
if response.status_code == 200:
return {
'status': 'healthy',
'response_time_ms': int(response_time)
}
else:
return {
'status': 'unhealthy',
'response_time_ms': int(response_time)
}
except requests.exceptions.RequestException:
return {
'status': 'unhealthy',
'response_time_ms': 0
}
def _update_health_status(self, endpoint: str, health_status: Dict[str, Any]):
"""Update health status in database"""
if not self.connection or not self.connection.is_connected():
self.connect_database()
cursor = self.connection.cursor()
# Update or insert health status
query = """
INSERT INTO lb_health_status (endpoint, status, response_time_ms, last_check, consecutive_failures)
VALUES (%s, %s, %s, CURRENT_TIMESTAMP, %s)
ON DUPLICATE KEY UPDATE
status = VALUES(status),
response_time_ms = VALUES(response_time_ms),
last_check = VALUES(last_check),
consecutive_failures = CASE
WHEN VALUES(status) = 'healthy' THEN 0
ELSE consecutive_failures + 1
END
"""
consecutive_failures = 0 if health_status['status'] == 'healthy' else 1
cursor.execute(query, (
endpoint,
health_status['status'],
health_status['response_time_ms'],
consecutive_failures
))
self.connection.commit()
cursor.close()
if __name__ == "__main__":
db_config = {
'host': 'dns-mysql.dns-system.svc.cluster.local',
'port': 3306,
'user': 'dns_user',
'password': 'secure_dns_password',
'database': 'dns_db'
}
dns_manager = DNSRecordManager(db_config)
# Start service synchronization and health monitoring
import threading
# Start service sync thread
sync_thread = threading.Thread(target=lambda: [
time.sleep(10), # Initial delay
dns_manager.sync_kubernetes_services()
])
sync_thread.daemon = True
sync_thread.start()
# Start health monitoring (blocks)
dns_manager.monitor_service_health()
Security Hardening and Access Control
Advanced Access Control Lists
# Enhanced security configuration for CoreDNS
apiVersion: v1
kind: ConfigMap
metadata:
name: coredns-security-config
namespace: dns-system
data:
Corefile: |
# Security-hardened CoreDNS configuration
(security-baseline) {
errors {
consolidate 5m ".*" warning
}
# Comprehensive logging for security auditing
log {
class all
format "{type} {name} {rcode} {>rflags} {>bufsize} {>do} {>id} {remote} {port} {size} {duration}"
}
# Health and readiness
health {
lameduck 10s
}
ready
# Rate limiting with intelligent thresholds
ratelimit {
per_second 100
per_client 10
whitelist 10.0.0.0/8 172.16.0.0/12 192.168.0.0/16
blacklist 192.0.2.0/24 203.0.113.128/25
window 60s
ipv4_mask 24
ipv6_mask 64
}
# DNS filtering for security
filter {
block_type A AAAA
action drop
# Block known malicious domains
file /etc/coredns/blocklist.txt
}
# Cache with security considerations
cache 300 {
success 10000 300
denial 5000 60
# Prevent cache poisoning
prefetch 25 60s 20%
}
prometheus :9153 {
path /metrics
}
}
# Executive network - highest security
executive.company.internal:53 {
# Strict IP-based access control
acl {
allow 10.0.1.0/24 # Executive network
allow 10.0.100.0/24 # IT admin network
deny all
}
import security-baseline
# Enhanced security logging
log {
class all
format "[EXECUTIVE] {type} {name} {rcode} {remote} {port} {size} {duration}"
}
# Executive host mappings
hosts /etc/coredns/executive-hosts {
ttl 600
reload 60s
fallthrough
}
# Secure forwarding to executive DNS
forward . 10.0.1.253:53 {
max_concurrent 100
policy sequential
health_check 30s
tls_servername executive-dns.company.internal
}
}
# Finance network - PCI DSS compliance
finance.company.internal:53 {
acl {
allow 10.0.5.0/24 # Finance network
allow 10.0.100.0/24 # IT admin network
deny all
}
import security-baseline
# PCI DSS compliant logging
log {
class all
format "[FINANCE] {type} {name} {rcode} {remote} {port} {size} {duration} {>edns0}"
}
# Finance-specific host mappings
hosts /etc/coredns/finance-hosts {
ttl 300
reload 30s
fallthrough
}
# Encrypted forwarding
forward . tls://10.0.5.253:853 {
tls_servername finance-dns.company.internal
max_concurrent 200
policy sequential
health_check 15s
}
}
# DMZ network - controlled external access
dmz.company.internal:53 {
acl {
allow 203.0.113.0/24 # DMZ network
allow 198.51.100.0/24 # Partner access
allow 10.0.100.0/24 # IT admin network
deny all
}
import security-baseline
# DMZ security filtering
filter {
block_type A AAAA
action refuse
file /etc/coredns/dmz-blocklist.txt
# Additional DMZ-specific blocks
regex ".*\.onion$"
regex ".*\.bit$"
}
# DMZ host mappings
hosts /etc/coredns/dmz-hosts {
ttl 120
reload 20s
fallthrough
}
# DMZ forwarding with monitoring
forward . 203.0.113.253:53 {
max_concurrent 500
policy round_robin
health_check 10s
}
}
# Development network - restricted access
dev.company.internal:53 {
acl {
allow 192.168.100.0/24 # Dev network
allow 192.168.101.0/24 # Dev test network
allow 10.0.100.0/24 # IT admin network
deny all
}
import security-baseline
# Development-specific rate limiting
ratelimit {
per_second 200
per_client 20
window 300s
}
# Development host mappings
hosts /etc/coredns/dev-hosts {
ttl 60
reload 10s
fallthrough
}
# Development DNS forwarding
forward . 192.168.100.253:53 {
max_concurrent 300
policy sequential
health_check 5s
}
}
# Guest network - heavily restricted
guest.company.internal:53 {
acl {
allow 172.16.200.0/24 # Guest network
deny all
}
# Enhanced rate limiting for guest access
ratelimit {
per_second 20
per_client 2
window 3600s
ipv4_mask 32
}
# Aggressive DNS filtering
filter {
block_type A AAAA
action drop
file /etc/coredns/guest-blocklist.txt
# Block internal domains
regex ".*\.company\.internal$"
regex ".*\.local$"
}
# Minimal logging for privacy
log {
class denial error
}
# Basic cache
cache 600 {
success 1000 600
denial 500 300
}
# Restricted forwarding
forward . 8.8.8.8:53 1.1.1.1:53 {
max_concurrent 100
policy random
health_check 60s
}
}
# Default - most restrictive
.:53 {
import security-baseline
# Default rate limiting
ratelimit {
per_second 50
per_client 5
window 60s
}
# Security filtering
filter {
block_type A AAAA
action refuse
file /etc/coredns/global-blocklist.txt
}
# Default forwarding
forward . 8.8.8.8:53 1.1.1.1:53 {
max_concurrent 1000
policy random
health_check 30s
}
}
---
# Security blocklists
apiVersion: v1
kind: ConfigMap
metadata:
name: coredns-blocklists
namespace: dns-system
data:
blocklist.txt: |
# Known malicious domains
malicious-site.com
phishing-site.org
malware-host.net
# Cryptocurrency mining
coinhive.com
jsecoin.com
cryptoloot.pro
# Ad networks (optional)
doubleclick.net
googleadservices.com
googlesyndication.com
dmz-blocklist.txt: |
# Additional DMZ-specific blocks
internal-dev.company.com
test-server.company.com
staging-api.company.com
guest-blocklist.txt: |
# Comprehensive blocks for guest network
company.internal
*.company.internal
localhost
*.local
intranet
*.intranet
# P2P and file sharing
thepiratebay.org
bittorrent.com
utorrent.com
# Social media (if policy requires)
facebook.com
twitter.com
instagram.com
global-blocklist.txt: |
# Global security blocks
suspicious-domain.com
known-bad-actor.org
malware-distribution.net
DNS Query Monitoring and Threat Detection
#!/usr/bin/env python3
"""
DNS query monitoring and threat detection system
"""
import re
import json
import time
from collections import defaultdict, deque
from typing import Dict, List, Set, Tuple
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class DNSQuery:
timestamp: datetime
client_ip: str
query_type: str
domain: str
response_code: str
response_time: float
query_size: int
class DNSThreatDetector:
def __init__(self):
# Threat detection parameters
self.max_queries_per_minute = 100
self.max_unique_domains_per_hour = 500
self.suspicious_query_threshold = 50
# Tracking structures
self.client_queries = defaultdict(lambda: deque(maxlen=1000))
self.client_domains = defaultdict(lambda: defaultdict(int))
self.domain_reputation = defaultdict(int)
# Threat patterns
self.threat_patterns = self._load_threat_patterns()
# Alerting thresholds
self.alert_thresholds = {
'high_frequency': 200, # Queries per minute
'domain_diversity': 1000, # Unique domains per hour
'suspicious_patterns': 10, # Suspicious pattern matches
'dns_tunneling': 20, # Long domain queries
'dga_detection': 15 # Domain generation algorithm detection
}
def _load_threat_patterns(self) -> Dict[str, List[str]]:
"""Load DNS threat detection patterns"""
return {
'dga_patterns': [
r'^[a-z]{8,}\.com$', # Long random domains
r'^[0-9]{6,}\..*$', # Numeric domains
r'^[a-z]{3,}[0-9]{3,}\..*$' # Mixed alphanumeric
],
'dns_tunneling': [
r'^[a-zA-Z0-9]{30,}\..*$', # Very long subdomains
r'^[a-zA-Z0-9\-]{20,}\..*\..*\..*$' # Deep subdomains
],
'c2_patterns': [
r'.*\.tk$', # Common C2 TLD
r'.*\.ml$', # Common C2 TLD
r'.*\.ga$', # Common C2 TLD
r'^[0-9]{1,3}-[0-9]{1,3}-[0-9]{1,3}-[0-9]{1,3}\..*$' # IP-like domains
],
'malware_patterns': [
r'.*update.*\.exe$',
r'.*download.*\.zip$',
r'.*temp.*\.tmp$'
]
}
def analyze_query(self, query: DNSQuery) -> Dict[str, Any]:
"""Analyze a DNS query for threats"""
analysis = {
'timestamp': query.timestamp.isoformat(),
'client_ip': query.client_ip,
'domain': query.domain,
'threat_score': 0,
'threats_detected': [],
'recommendations': []
}
# Track query for client
self.client_queries[query.client_ip].append(query)
# High frequency detection
recent_queries = self._get_recent_queries(query.client_ip, minutes=1)
if len(recent_queries) > self.alert_thresholds['high_frequency']:
analysis['threats_detected'].append('high_frequency_queries')
analysis['threat_score'] += 30
# Domain diversity detection
hourly_domains = self._get_unique_domains_count(query.client_ip, hours=1)
if hourly_domains > self.alert_thresholds['domain_diversity']:
analysis['threats_detected'].append('high_domain_diversity')
analysis['threat_score'] += 25
# Pattern matching
pattern_threats = self._detect_pattern_threats(query.domain)
if pattern_threats:
analysis['threats_detected'].extend(pattern_threats)
analysis['threat_score'] += len(pattern_threats) * 15
# DNS tunneling detection
if self._detect_dns_tunneling(query.domain, query.query_size):
analysis['threats_detected'].append('dns_tunneling')
analysis['threat_score'] += 40
# DGA detection
if self._detect_dga(query.domain):
analysis['threats_detected'].append('domain_generation_algorithm')
analysis['threat_score'] += 35
# Failed query analysis
if query.response_code != 'NOERROR':
failed_queries = self._count_failed_queries(query.client_ip, minutes=5)
if failed_queries > 20:
analysis['threats_detected'].append('excessive_failed_queries')
analysis['threat_score'] += 20
# Generate recommendations
analysis['recommendations'] = self._generate_recommendations(analysis)
return analysis
def _get_recent_queries(self, client_ip: str, minutes: int = 1) -> List[DNSQuery]:
"""Get recent queries for a client"""
cutoff_time = datetime.now() - timedelta(minutes=minutes)
return [q for q in self.client_queries[client_ip]
if q.timestamp >= cutoff_time]
def _get_unique_domains_count(self, client_ip: str, hours: int = 1) -> int:
"""Count unique domains queried by client in time period"""
cutoff_time = datetime.now() - timedelta(hours=hours)
unique_domains = set()
for query in self.client_queries[client_ip]:
if query.timestamp >= cutoff_time:
unique_domains.add(query.domain)
return len(unique_domains)
def _detect_pattern_threats(self, domain: str) -> List[str]:
"""Detect threat patterns in domain"""
threats = []
for threat_type, patterns in self.threat_patterns.items():
for pattern in patterns:
if re.match(pattern, domain):
threats.append(threat_type)
break
return threats
def _detect_dns_tunneling(self, domain: str, query_size: int) -> bool:
"""Detect potential DNS tunneling"""
# Long subdomain names
if len(domain) > 50:
return True
# Many subdomains
if domain.count('.') > 4:
return True
# Large query size
if query_size > 512:
return True
# Base64-like patterns in subdomain
subdomain = domain.split('.')[0]
if len(subdomain) > 20 and re.match(r'^[A-Za-z0-9+/=]+$', subdomain):
return True
return False
def _detect_dga(self, domain: str) -> bool:
"""Detect Domain Generation Algorithm patterns"""
domain_part = domain.split('.')[0]
# Very long domain names
if len(domain_part) > 15:
return True
# High consonant to vowel ratio
consonants = sum(1 for c in domain_part.lower()
if c.isalpha() and c not in 'aeiou')
vowels = sum(1 for c in domain_part.lower()
if c in 'aeiou')
if vowels > 0 and consonants / vowels > 3:
return True
# Entropy analysis (simplified)
unique_chars = len(set(domain_part))
if len(domain_part) > 8 and unique_chars / len(domain_part) > 0.8:
return True
return False
def _count_failed_queries(self, client_ip: str, minutes: int = 5) -> int:
"""Count failed queries for client in time period"""
cutoff_time = datetime.now() - timedelta(minutes=minutes)
failed_count = 0
for query in self.client_queries[client_ip]:
if (query.timestamp >= cutoff_time and
query.response_code not in ['NOERROR', 'NXDOMAIN']):
failed_count += 1
return failed_count
def _generate_recommendations(self, analysis: Dict[str, Any]) -> List[str]:
"""Generate security recommendations based on analysis"""
recommendations = []
threats = analysis['threats_detected']
if 'high_frequency_queries' in threats:
recommendations.append("Rate limit client IP address")
recommendations.append("Investigate client for potential DDoS activity")
if 'high_domain_diversity' in threats:
recommendations.append("Monitor client for data exfiltration")
recommendations.append("Consider temporary access restriction")
if 'dns_tunneling' in threats:
recommendations.append("Block client immediately - DNS tunneling detected")
recommendations.append("Analyze network traffic for data exfiltration")
if 'domain_generation_algorithm' in threats:
recommendations.append("Block domain family")
recommendations.append("Update malware detection signatures")
if any('c2_patterns' in threat for threat in threats):
recommendations.append("Block C2 communication channel")
recommendations.append("Initiate incident response procedure")
if analysis['threat_score'] > 50:
recommendations.append("IMMEDIATE ACTION REQUIRED - High threat score")
recommendations.append("Escalate to security team")
return recommendations
def generate_threat_report(self, time_period: timedelta = timedelta(hours=1)) -> Dict[str, Any]:
"""Generate comprehensive threat report"""
cutoff_time = datetime.now() - time_period
# Analyze all recent queries
all_analyses = []
threat_summary = defaultdict(int)
high_risk_clients = set()
for client_ip, queries in self.client_queries.items():
for query in queries:
if query.timestamp >= cutoff_time:
analysis = self.analyze_query(query)
all_analyses.append(analysis)
# Update threat summary
for threat in analysis['threats_detected']:
threat_summary[threat] += 1
# Track high-risk clients
if analysis['threat_score'] > 30:
high_risk_clients.add(client_ip)
# Generate report
report = {
'report_period': {
'start': cutoff_time.isoformat(),
'end': datetime.now().isoformat(),
'duration_hours': time_period.total_seconds() / 3600
},
'summary': {
'total_queries_analyzed': len(all_analyses),
'threats_detected': dict(threat_summary),
'high_risk_clients': list(high_risk_clients),
'total_threat_score': sum(a['threat_score'] for a in all_analyses)
},
'top_threats': self._get_top_threats(all_analyses),
'recommendations': self._generate_global_recommendations(threat_summary),
'detailed_analyses': [a for a in all_analyses if a['threat_score'] > 20]
}
return report
def _get_top_threats(self, analyses: List[Dict[str, Any]], top_n: int = 10) -> List[Dict[str, Any]]:
"""Get top threats by score"""
sorted_analyses = sorted(analyses, key=lambda x: x['threat_score'], reverse=True)
return sorted_analyses[:top_n]
def _generate_global_recommendations(self, threat_summary: Dict[str, int]) -> List[str]:
"""Generate global security recommendations"""
recommendations = []
total_threats = sum(threat_summary.values())
if total_threats > 100:
recommendations.append("CRITICAL: High volume of DNS threats detected")
recommendations.append("Consider implementing DNS firewall")
if threat_summary.get('dns_tunneling', 0) > 5:
recommendations.append("Multiple DNS tunneling attempts - investigate network")
recommendations.append("Implement deep packet inspection")
if threat_summary.get('domain_generation_algorithm', 0) > 10:
recommendations.append("DGA activity detected - update anti-malware systems")
recommendations.append("Consider sinkholing suspicious domains")
if threat_summary.get('high_frequency_queries', 0) > 20:
recommendations.append("Multiple high-frequency query sources")
recommendations.append("Review rate limiting configuration")
return recommendations
if __name__ == "__main__":
detector = DNSThreatDetector()
# Example usage
sample_query = DNSQuery(
timestamp=datetime.now(),
client_ip="192.168.1.100",
query_type="A",
domain="aabcdefghijklmnopqrstuvwxyz123.com",
response_code="NOERROR",
response_time=0.15,
query_size=128
)
analysis = detector.analyze_query(sample_query)
print(json.dumps(analysis, indent=2))
# Generate threat report
report = detector.generate_threat_report()
print(json.dumps(report, indent=2))
Conclusion
Advanced DNS forwarding with CoreDNS enables organizations to implement sophisticated network architectures that support complex business requirements while maintaining security and performance standards. The configurations and patterns presented in this guide demonstrate how split-horizon DNS, conditional forwarding, and intelligent threat detection can be integrated to create enterprise-grade DNS infrastructure.
Key success factors include careful network segmentation design, comprehensive access control implementation, proactive threat monitoring, and robust operational procedures. Organizations implementing these patterns can expect significant improvements in DNS security posture, network performance optimization, and operational visibility across complex multi-domain environments.
The combination of CoreDNS’s advanced plugin ecosystem, database-backed dynamic record management, and intelligent threat detection provides a comprehensive foundation for modern enterprise DNS architecture capable of supporting evolving security requirements and business growth.