Enterprise Pi-Hole DNS Management Guide 2025: Advanced Automation, Security & Multi-Site Orchestration
Enterprise Pi-Hole DNS Management Guide 2025: Advanced Automation, Security & Multi-Site Orchestration
Managing DNS infrastructure with Pi-Hole at enterprise scale requires sophisticated automation, security hardening, and monitoring capabilities that extend far beyond basic conditional forwarding configuration. This comprehensive guide transforms simple DNSMasq configuration into enterprise-grade DNS management systems with automated deployment, security compliance, and intelligent orchestration.
Table of Contents
- Pi-Hole Architecture and Security Overview
- Enterprise Conditional Forwarding Framework
- Automated DNS Configuration Management
- Multi-Site DNS Orchestration
- Advanced Security Hardening
- Comprehensive Monitoring and Alerting
- Compliance and Audit Framework
- Performance Optimization
- Disaster Recovery and High Availability
- Integration with Enterprise Systems
- Advanced Troubleshooting
- Best Practices and Recommendations
Pi-Hole Architecture and Security Overview
Enterprise DNS Infrastructure Requirements
Modern enterprise DNS infrastructure must handle complex routing scenarios, security requirements, and compliance mandates while maintaining high availability and performance across distributed networks.
# enterprise-pihole-architecture.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: enterprise-pihole-config
namespace: network-infrastructure
data:
architecture.yaml: |
dns_infrastructure:
primary_sites:
- name: "datacenter-east"
location: "us-east-1"
capacity: 10000
redundancy: "active-active"
- name: "datacenter-west"
location: "us-west-2"
capacity: 10000
redundancy: "active-active"
conditional_forwarding:
corporate_domains:
- domain: "corp.company.com"
upstream: ["10.0.1.10", "10.0.1.11"]
ttl: 300
secure: true
- domain: "internal.company.com"
upstream: ["10.0.2.10", "10.0.2.11"]
ttl: 300
secure: true
vpn_domains:
- domain: "vpn.company.com"
upstream: ["10.1.1.10", "10.1.1.11"]
ttl: 60
secure: true
cloud_domains:
- domain: "aws.company.com"
upstream: ["10.2.1.10", "10.2.1.11"]
ttl: 300
secure: true
security_policies:
dns_over_https: true
dns_over_tls: true
dnssec_validation: true
query_logging: true
threat_intelligence: true
Advanced DNSMasq Configuration Framework
#!/usr/bin/env python3
"""
Enterprise Pi-Hole DNS Configuration Management System
Automated configuration generation and deployment
"""
import yaml
import jinja2
import asyncio
import aiohttp
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
from pathlib import Path
import hashlib
import time
@dataclass
class DNSForwardingRule:
domain: str
upstream_servers: List[str]
ttl: int
secure: bool
priority: int
health_check: bool
@dataclass
class PiHoleCluster:
name: str
nodes: List[str]
location: str
capacity: int
redundancy_mode: str
class EnterprisePiHoleManager:
def __init__(self, config_path: str):
self.config_path = Path(config_path)
self.logger = self._setup_logging()
self.config = self._load_config()
self.template_env = self._setup_templates()
def _setup_logging(self) -> logging.Logger:
"""Configure comprehensive logging"""
logger = logging.getLogger('enterprise-pihole')
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File handler
file_handler = logging.FileHandler('/var/log/pihole/enterprise-manager.log')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
def _load_config(self) -> Dict:
"""Load enterprise configuration"""
with open(self.config_path, 'r') as f:
return yaml.safe_load(f)
def _setup_templates(self) -> jinja2.Environment:
"""Setup Jinja2 template environment"""
template_dir = Path('/etc/pihole/templates')
return jinja2.Environment(
loader=jinja2.FileSystemLoader(template_dir),
trim_blocks=True,
lstrip_blocks=True
)
async def generate_dnsmasq_config(self, cluster: PiHoleCluster) -> str:
"""Generate advanced DNSMasq configuration"""
template = self.template_env.get_template('dnsmasq-enterprise.conf.j2')
forwarding_rules = []
for category, domains in self.config['conditional_forwarding'].items():
for domain_config in domains:
rule = DNSForwardingRule(
domain=domain_config['domain'],
upstream_servers=domain_config['upstream'],
ttl=domain_config['ttl'],
secure=domain_config.get('secure', True),
priority=domain_config.get('priority', 100),
health_check=domain_config.get('health_check', True)
)
forwarding_rules.append(rule)
config = template.render(
cluster=cluster,
forwarding_rules=forwarding_rules,
security_policies=self.config['security_policies'],
timestamp=time.time()
)
return config
async def deploy_configuration(self, cluster: PiHoleCluster, config: str) -> bool:
"""Deploy configuration to cluster nodes"""
deployment_tasks = []
for node in cluster.nodes:
task = self._deploy_to_node(node, config)
deployment_tasks.append(task)
results = await asyncio.gather(*deployment_tasks, return_exceptions=True)
success_count = sum(1 for result in results if result is True)
total_nodes = len(cluster.nodes)
self.logger.info(f"Deployed to {success_count}/{total_nodes} nodes in {cluster.name}")
return success_count == total_nodes
async def _deploy_to_node(self, node: str, config: str) -> bool:
"""Deploy configuration to individual node"""
try:
config_hash = hashlib.sha256(config.encode()).hexdigest()
# Write configuration file
config_path = Path(f'/etc/dnsmasq.d/02-enterprise-{config_hash[:8]}.conf')
async with aiohttp.ClientSession() as session:
# Use Pi-Hole API for deployment
deployment_data = {
'config': config,
'hash': config_hash,
'node': node,
'timestamp': time.time()
}
async with session.post(
f'http://{node}/admin/api/config/deploy',
json=deployment_data
) as response:
if response.status == 200:
self.logger.info(f"Successfully deployed to {node}")
return True
else:
self.logger.error(f"Failed to deploy to {node}: {response.status}")
return False
except Exception as e:
self.logger.error(f"Error deploying to {node}: {str(e)}")
return False
async def health_check_upstreams(self, forwarding_rules: List[DNSForwardingRule]) -> Dict:
"""Perform health checks on upstream DNS servers"""
health_results = {}
for rule in forwarding_rules:
if not rule.health_check:
continue
rule_health = {}
for upstream in rule.upstream_servers:
try:
# DNS health check
start_time = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(
f'http://{upstream}:8080/health',
timeout=aiohttp.ClientTimeout(total=5)
) as response:
response_time = time.time() - start_time
rule_health[upstream] = {
'status': 'healthy' if response.status == 200 else 'unhealthy',
'response_time': response_time,
'timestamp': time.time()
}
except Exception as e:
rule_health[upstream] = {
'status': 'unhealthy',
'error': str(e),
'timestamp': time.time()
}
health_results[rule.domain] = rule_health
return health_results
async def optimize_configuration(self, cluster: PiHoleCluster) -> str:
"""Optimize configuration based on performance metrics"""
# Load performance metrics
metrics = await self._collect_performance_metrics(cluster)
# Analyze and optimize
optimized_config = await self._apply_optimizations(metrics)
return optimized_config
async def _collect_performance_metrics(self, cluster: PiHoleCluster) -> Dict:
"""Collect performance metrics from cluster"""
metrics = {}
for node in cluster.nodes:
try:
async with aiohttp.ClientSession() as session:
async with session.get(f'http://{node}/admin/api/metrics') as response:
if response.status == 200:
node_metrics = await response.json()
metrics[node] = node_metrics
except Exception as e:
self.logger.error(f"Failed to collect metrics from {node}: {str(e)}")
return metrics
async def _apply_optimizations(self, metrics: Dict) -> str:
"""Apply performance optimizations"""
# Implement optimization logic based on metrics
optimization_rules = []
# Add caching optimizations
if self._should_increase_cache_size(metrics):
optimization_rules.append("cache-size=10000")
# Add query optimization
if self._should_optimize_queries(metrics):
optimization_rules.append("dns-forward-max=1000")
return "\n".join(optimization_rules)
def _should_increase_cache_size(self, metrics: Dict) -> bool:
"""Determine if cache size should be increased"""
# Analysis logic
return True # Simplified for example
def _should_optimize_queries(self, metrics: Dict) -> bool:
"""Determine if query optimization is needed"""
# Analysis logic
return True # Simplified for example
# Usage example
async def main():
manager = EnterprisePiHoleManager('/etc/pihole/enterprise-config.yaml')
# Define cluster
cluster = PiHoleCluster(
name="datacenter-east",
nodes=["10.0.1.100", "10.0.1.101", "10.0.1.102"],
location="us-east-1",
capacity=10000,
redundancy_mode="active-active"
)
# Generate and deploy configuration
config = await manager.generate_dnsmasq_config(cluster)
success = await manager.deploy_configuration(cluster, config)
if success:
print("Configuration deployed successfully")
else:
print("Configuration deployment failed")
if __name__ == "__main__":
asyncio.run(main())
Enterprise Conditional Forwarding Framework
Advanced DNSMasq Template System
{# /etc/pihole/templates/dnsmasq-enterprise.conf.j2 #}
# Enterprise Pi-Hole DNSMasq Configuration
# Generated: {{ timestamp }}
# Cluster: {{ cluster.name }}
# Location: {{ cluster.location }}
# Basic DNS Configuration
port=53
domain-needed
bogus-priv
no-resolv
no-poll
server=1.1.1.1
server=1.0.0.1
server=8.8.8.8
server=8.8.4.4
# Cache Configuration
cache-size=10000
neg-ttl=3600
max-cache-ttl=86400
min-cache-ttl=60
# Security Configuration
{% if security_policies.dnssec_validation %}
dnssec
trust-anchor=.,20326,8,2,E06D44B80B8F1D39A95C0B0D7C65D08458E880409BBC683457104237C7F8EC8D
{% endif %}
{% if security_policies.dns_over_https %}
# DNS over HTTPS configuration
server=https://1.1.1.1/dns-query
server=https://8.8.8.8/dns-query
{% endif %}
# Conditional Forwarding Rules
{% for rule in forwarding_rules %}
# Domain: {{ rule.domain }}
# Priority: {{ rule.priority }}
# Secure: {{ rule.secure }}
{% for upstream in rule.upstream_servers %}
server=/{{ rule.domain }}/{{ upstream }}
{% endfor %}
# TTL Configuration for {{ rule.domain }}
{% if rule.ttl %}
local-ttl={{ rule.ttl }}
{% endif %}
{% if rule.secure %}
# Security settings for {{ rule.domain }}
dnssec-check-unsigned=no
{% endif %}
{% endfor %}
# Performance Optimizations
dns-forward-max=1000
max-tcp-connections=100
query-port=0
# Logging Configuration
{% if security_policies.query_logging %}
log-queries
log-facility=/var/log/pihole/pihole.log
{% endif %}
# Interface Configuration
interface=eth0
bind-interfaces
except-interface=lo
# Advanced Features
expand-hosts
domain={{ cluster.name }}.local
local=/{{ cluster.name }}.local/
# Block malicious domains
conf-file=/etc/pihole/adlists.conf
Multi-Site DNS Orchestration
#!/usr/bin/env python3
"""
Multi-Site Pi-Hole DNS Orchestration System
Manage DNS infrastructure across multiple sites
"""
import asyncio
import aiohttp
import json
from typing import Dict, List
from dataclasses import dataclass
import consul
import etcd3
@dataclass
class DNSSite:
name: str
location: str
clusters: List[PiHoleCluster]
latency_target: int
availability_target: float
class MultiSiteDNSOrchestrator:
def __init__(self, consul_host: str, etcd_host: str):
self.consul = consul.Consul(host=consul_host)
self.etcd = etcd3.client(host=etcd_host)
self.sites = {}
async def register_site(self, site: DNSSite):
"""Register a new DNS site"""
self.sites[site.name] = site
# Register in Consul
service_config = {
'name': f'pihole-{site.name}',
'tags': ['dns', 'pihole', site.location],
'port': 53,
'check': {
'name': f'Pi-Hole Health Check - {site.name}',
'tcp': f'{site.clusters[0].nodes[0]}:53',
'interval': '10s'
}
}
self.consul.agent.service.register(**service_config)
# Store configuration in etcd
config_key = f'/dns/sites/{site.name}/config'
await self.etcd.put(config_key, json.dumps({
'name': site.name,
'location': site.location,
'clusters': [cluster.name for cluster in site.clusters],
'latency_target': site.latency_target,
'availability_target': site.availability_target
}))
async def orchestrate_dns_queries(self, query_domain: str, client_location: str) -> str:
"""Orchestrate DNS queries across sites"""
# Find optimal site based on location and performance
optimal_site = await self._find_optimal_site(client_location)
if optimal_site:
# Route to optimal site
return await self._route_to_site(query_domain, optimal_site)
else:
# Fallback to any available site
return await self._fallback_routing(query_domain)
async def _find_optimal_site(self, client_location: str) -> Optional[DNSSite]:
"""Find optimal DNS site for client"""
best_site = None
best_score = float('inf')
for site in self.sites.values():
# Calculate score based on location, latency, and availability
score = await self._calculate_site_score(site, client_location)
if score < best_score:
best_score = score
best_site = site
return best_site
async def _calculate_site_score(self, site: DNSSite, client_location: str) -> float:
"""Calculate site score for routing decision"""
# Geographic distance factor
distance_factor = self._calculate_distance_factor(site.location, client_location)
# Performance metrics
latency_metrics = await self._get_latency_metrics(site)
availability_metrics = await self._get_availability_metrics(site)
# Weighted score calculation
score = (
distance_factor * 0.3 +
latency_metrics * 0.4 +
(1 - availability_metrics) * 0.3
)
return score
def _calculate_distance_factor(self, site_location: str, client_location: str) -> float:
"""Calculate geographic distance factor"""
# Simplified distance calculation
distance_map = {
('us-east-1', 'us-east-1'): 0.1,
('us-east-1', 'us-west-2'): 0.8,
('us-west-2', 'us-west-2'): 0.1,
('us-west-2', 'us-east-1'): 0.8,
}
return distance_map.get((site_location, client_location), 0.5)
async def _get_latency_metrics(self, site: DNSSite) -> float:
"""Get latency metrics for site"""
# Collect latency data from monitoring system
try:
async with aiohttp.ClientSession() as session:
async with session.get(f'http://monitoring.company.com/api/latency/{site.name}') as response:
if response.status == 200:
data = await response.json()
return data.get('average_latency', 0.5)
except:
pass
return 0.5 # Default value
async def _get_availability_metrics(self, site: DNSSite) -> float:
"""Get availability metrics for site"""
# Collect availability data from monitoring system
try:
async with aiohttp.ClientSession() as session:
async with session.get(f'http://monitoring.company.com/api/availability/{site.name}') as response:
if response.status == 200:
data = await response.json()
return data.get('availability', 0.99)
except:
pass
return 0.99 # Default value
async def _route_to_site(self, query_domain: str, site: DNSSite) -> str:
"""Route DNS query to specific site"""
# Select best cluster in site
best_cluster = await self._select_best_cluster(site)
if best_cluster:
# Route to cluster
return await self._execute_dns_query(query_domain, best_cluster)
else:
raise Exception(f"No available clusters in site {site.name}")
async def _select_best_cluster(self, site: DNSSite) -> Optional[PiHoleCluster]:
"""Select best cluster within site"""
for cluster in site.clusters:
# Check cluster health
if await self._check_cluster_health(cluster):
return cluster
return None
async def _check_cluster_health(self, cluster: PiHoleCluster) -> bool:
"""Check cluster health status"""
healthy_nodes = 0
for node in cluster.nodes:
try:
async with aiohttp.ClientSession() as session:
async with session.get(f'http://{node}/admin/api/health', timeout=5) as response:
if response.status == 200:
healthy_nodes += 1
except:
pass
# Require at least 50% healthy nodes
return healthy_nodes >= len(cluster.nodes) / 2
async def _execute_dns_query(self, query_domain: str, cluster: PiHoleCluster) -> str:
"""Execute DNS query on cluster"""
# Use first available healthy node
for node in cluster.nodes:
try:
async with aiohttp.ClientSession() as session:
async with session.get(f'http://{node}/admin/api/query/{query_domain}') as response:
if response.status == 200:
result = await response.json()
return result.get('ip_address', '127.0.0.1')
except:
continue
raise Exception(f"No healthy nodes available in cluster {cluster.name}")
async def _fallback_routing(self, query_domain: str) -> str:
"""Fallback routing when no optimal site available"""
# Try any available site
for site in self.sites.values():
try:
return await self._route_to_site(query_domain, site)
except:
continue
# Ultimate fallback
return "8.8.8.8" # Google DNS
Advanced Security Hardening
DNS Security Framework
#!/bin/bash
# Enterprise Pi-Hole Security Hardening Script
# Version: 2.0
# Description: Comprehensive security hardening for Pi-Hole infrastructure
set -euo pipefail
# Configuration
PIHOLE_CONFIG_DIR="/etc/pihole"
DNSMASQ_CONFIG_DIR="/etc/dnsmasq.d"
LOG_FILE="/var/log/pihole/security-hardening.log"
BACKUP_DIR="/var/backups/pihole"
# Logging function
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}
# Security hardening functions
harden_system() {
log "Starting system hardening..."
# Update system packages
apt-get update && apt-get upgrade -y
# Install security tools
apt-get install -y fail2ban ufw apparmor-profiles
# Configure firewall
ufw default deny incoming
ufw default allow outgoing
ufw allow 22/tcp # SSH
ufw allow 53/tcp # DNS
ufw allow 53/udp # DNS
ufw allow 80/tcp # Pi-Hole web interface
ufw allow 443/tcp # HTTPS
ufw --force enable
# Configure fail2ban
cat > /etc/fail2ban/jail.local << 'EOF'
[DEFAULT]
bantime = 3600
findtime = 600
maxretry = 5
[sshd]
enabled = true
port = 22
filter = sshd
logpath = /var/log/auth.log
maxretry = 3
[pihole]
enabled = true
port = 80,443
filter = pihole
logpath = /var/log/pihole/pihole.log
maxretry = 5
EOF
# Create Pi-Hole fail2ban filter
cat > /etc/fail2ban/filter.d/pihole.conf << 'EOF'
[Definition]
failregex = ^.*\[.*\] ".*" \d+ \d+ ".*" ".*" .*<HOST>.*$
ignoreregex = ^.*\[.*\] "GET /admin/api/.*" 200 .*<HOST>.*$
EOF
systemctl restart fail2ban
log "System hardening completed"
}
harden_pihole() {
log "Starting Pi-Hole hardening..."
# Create backup
mkdir -p "$BACKUP_DIR"
cp -r "$PIHOLE_CONFIG_DIR" "$BACKUP_DIR/pihole-$(date +%Y%m%d-%H%M%S)"
# Secure Pi-Hole configuration
cat > "$PIHOLE_CONFIG_DIR/pihole-FTL.conf" << 'EOF'
# Security Configuration
PRIVACYLEVEL=3
BLOCKING_ENABLED=true
QUERY_LOGGING=true
INSTALL_WEB_INTERFACE=true
INSTALL_WEB_SERVER=true
LIGHTTPD_ENABLED=true
CACHE_SIZE=10000
DNS_FQDN_REQUIRED=true
DNS_BOGUS_PRIV=true
DNSSEC=true
CONDITIONAL_FORWARDING=true
REV_SERVER=true
REV_SERVER_CIDR=10.0.0.0/8
REV_SERVER_TARGET=10.0.0.1
REV_SERVER_DOMAIN=company.local
PIHOLE_DNS_1=1.1.1.1#5053
PIHOLE_DNS_2=1.0.0.1#5053
PIHOLE_DNS_3=8.8.8.8#5053
PIHOLE_DNS_4=8.8.4.4#5053
DNS_OVER_HTTPS=true
RATE_LIMIT=1000/60
LOCAL_IPV4=10.0.1.100
IPV6_ADDRESS=
TEMPERATUREUNIT=C
WEBUIBOXEDLAYOUT=boxed
API_EXCLUDE_DOMAINS=
API_EXCLUDE_CLIENTS=
API_QUERY_LOG_SHOW=permittedonly
API_PRIVACY_MODE=true
EOF
# Secure DNSMasq configuration
cat > "$DNSMASQ_CONFIG_DIR/99-security.conf" << 'EOF'
# Security Configuration
stop-dns-rebind
rebind-localhost-ok
rebind-domain-ok=/company.local/
dns-forward-max=1000
cache-size=10000
neg-ttl=3600
max-cache-ttl=86400
min-cache-ttl=60
log-queries
log-facility=/var/log/pihole/dnsmasq.log
server-id=pihole-security
EOF
# Set secure permissions
chmod 640 "$PIHOLE_CONFIG_DIR/pihole-FTL.conf"
chmod 640 "$DNSMASQ_CONFIG_DIR/99-security.conf"
chown root:pihole "$PIHOLE_CONFIG_DIR/pihole-FTL.conf"
chown root:pihole "$DNSMASQ_CONFIG_DIR/99-security.conf"
log "Pi-Hole hardening completed"
}
setup_monitoring() {
log "Setting up security monitoring..."
# Install monitoring tools
apt-get install -y auditd rsyslog-gnutls
# Configure audit rules
cat > /etc/audit/rules.d/pihole.rules << 'EOF'
# Pi-Hole audit rules
-w /etc/pihole/ -p wa -k pihole-config
-w /etc/dnsmasq.d/ -p wa -k dnsmasq-config
-w /var/log/pihole/ -p wa -k pihole-logs
-w /opt/pihole/ -p wa -k pihole-scripts
EOF
# Configure log rotation
cat > /etc/logrotate.d/pihole-security << 'EOF'
/var/log/pihole/*.log {
daily
rotate 30
compress
delaycompress
missingok
notifempty
create 640 pihole pihole
postrotate
systemctl reload pihole-FTL
endscript
}
EOF
# Set up log monitoring
cat > /etc/rsyslog.d/49-pihole.conf << 'EOF'
# Pi-Hole logging configuration
$ModLoad imfile
$InputFilePollInterval 10
$PrivDropToGroup adm
$WorkDirectory /var/spool/rsyslog
# Pi-Hole query log
$InputFileName /var/log/pihole/pihole.log
$InputFileTag pihole-query:
$InputFileStateFile pihole-query-state
$InputFileSeverity info
$InputFileFacility local0
$InputRunFileMonitor
# Pi-Hole FTL log
$InputFileName /var/log/pihole/FTL.log
$InputFileTag pihole-ftl:
$InputFileStateFile pihole-ftl-state
$InputFileSeverity info
$InputFileFacility local1
$InputRunFileMonitor
# Forward to SIEM
*.* @@siem.company.com:514
EOF
systemctl restart auditd
systemctl restart rsyslog
log "Security monitoring setup completed"
}
setup_threat_intelligence() {
log "Setting up threat intelligence..."
# Create threat intelligence update script
cat > /usr/local/bin/update-threat-intel.sh << 'EOF'
#!/bin/bash
# Pi-Hole Threat Intelligence Update Script
THREAT_INTEL_DIR="/etc/pihole/threat-intel"
BLOCKLIST_DIR="/etc/pihole/blocklists"
LOG_FILE="/var/log/pihole/threat-intel.log"
mkdir -p "$THREAT_INTEL_DIR" "$BLOCKLIST_DIR"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}
# Update threat intelligence feeds
update_feeds() {
log "Updating threat intelligence feeds..."
# Malware domains
curl -s "https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts" > "$BLOCKLIST_DIR/malware-domains.txt"
# Phishing domains
curl -s "https://raw.githubusercontent.com/mitchellkrogza/Phishing.Database/master/phishing-domains-ACTIVE.txt" > "$BLOCKLIST_DIR/phishing-domains.txt"
# Ransomware domains
curl -s "https://ransomwaretracker.abuse.ch/downloads/RW_DOMBL.txt" > "$BLOCKLIST_DIR/ransomware-domains.txt"
# C2 domains
curl -s "https://feodotracker.abuse.ch/downloads/ipblocklist.txt" > "$BLOCKLIST_DIR/c2-domains.txt"
log "Threat intelligence feeds updated"
}
# Process and merge feeds
process_feeds() {
log "Processing threat intelligence feeds..."
# Combine all feeds
cat "$BLOCKLIST_DIR"/*.txt | grep -E "^[0-9]|^[a-zA-Z]" | sort -u > "$THREAT_INTEL_DIR/combined-blocklist.txt"
# Update Pi-Hole
cp "$THREAT_INTEL_DIR/combined-blocklist.txt" /etc/pihole/gravity.db
log "Threat intelligence processing completed"
}
# Main execution
update_feeds
process_feeds
# Restart Pi-Hole
systemctl restart pihole-FTL
EOF
chmod +x /usr/local/bin/update-threat-intel.sh
# Set up cron job
cat > /etc/cron.d/pihole-threat-intel << 'EOF'
# Pi-Hole threat intelligence updates
0 */6 * * * root /usr/local/bin/update-threat-intel.sh
EOF
# Run initial update
/usr/local/bin/update-threat-intel.sh
log "Threat intelligence setup completed"
}
# Main execution
main() {
log "Starting Pi-Hole security hardening..."
harden_system
harden_pihole
setup_monitoring
setup_threat_intelligence
log "Pi-Hole security hardening completed successfully"
}
main "$@"
Comprehensive Monitoring and Alerting
Prometheus Integration
# prometheus-pihole.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "pihole-alerts.yml"
scrape_configs:
- job_name: 'pihole'
static_configs:
- targets: ['pihole1.company.com:9617', 'pihole2.company.com:9617']
metrics_path: /metrics
scrape_interval: 15s
scrape_timeout: 10s
- job_name: 'pihole-exporter'
static_configs:
- targets: ['pihole1.company.com:9311', 'pihole2.company.com:9311']
metrics_path: /metrics
scrape_interval: 30s
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager.company.com:9093
Grafana Dashboard Configuration
{
"dashboard": {
"id": null,
"title": "Enterprise Pi-Hole DNS Monitoring",
"tags": ["pihole", "dns", "enterprise"],
"timezone": "UTC",
"refresh": "30s",
"panels": [
{
"id": 1,
"title": "DNS Query Rate",
"type": "graph",
"targets": [
{
"expr": "rate(pihole_dns_queries_total[5m])",
"legendFormat": "Query Rate - {{instance}}"
}
],
"yAxes": [
{
"label": "Queries/sec",
"min": 0
}
]
},
{
"id": 2,
"title": "Blocked Queries",
"type": "stat",
"targets": [
{
"expr": "pihole_ads_blocked_today",
"legendFormat": "Blocked Today - {{instance}}"
}
]
},
{
"id": 3,
"title": "DNS Response Time",
"type": "graph",
"targets": [
{
"expr": "pihole_dns_response_time_seconds",
"legendFormat": "Response Time - {{instance}}"
}
],
"yAxes": [
{
"label": "Seconds",
"min": 0
}
]
},
{
"id": 4,
"title": "Top Blocked Domains",
"type": "table",
"targets": [
{
"expr": "topk(10, pihole_top_blocked_domains)",
"legendFormat": "{{domain}}"
}
]
},
{
"id": 5,
"title": "DNS Cache Hit Rate",
"type": "gauge",
"targets": [
{
"expr": "pihole_dns_cache_hit_rate",
"legendFormat": "Cache Hit Rate - {{instance}}"
}
],
"fieldConfig": {
"defaults": {
"min": 0,
"max": 100,
"unit": "percent"
}
}
}
]
}
}
Performance Optimization
Load Balancing and Caching
#!/usr/bin/env python3
"""
Pi-Hole Performance Optimization System
Automatic performance tuning and load balancing
"""
import asyncio
import aiohttp
import time
from typing import Dict, List
from dataclasses import dataclass
import redis
import json
@dataclass
class PerformanceMetrics:
query_rate: float
response_time: float
cache_hit_rate: float
cpu_usage: float
memory_usage: float
disk_io: float
class PiHoleOptimizer:
def __init__(self, redis_host: str = 'localhost'):
self.redis = redis.Redis(host=redis_host, decode_responses=True)
self.optimization_rules = self._load_optimization_rules()
def _load_optimization_rules(self) -> Dict:
"""Load performance optimization rules"""
return {
'cache_size': {
'min': 1000,
'max': 50000,
'step': 1000,
'threshold': 0.8 # Cache hit rate threshold
},
'dns_forward_max': {
'min': 100,
'max': 2000,
'step': 100,
'threshold': 100 # Query rate threshold
},
'query_timeout': {
'min': 1,
'max': 10,
'step': 1,
'threshold': 2.0 # Response time threshold
}
}
async def collect_metrics(self, pihole_nodes: List[str]) -> Dict[str, PerformanceMetrics]:
"""Collect performance metrics from Pi-Hole nodes"""
metrics = {}
for node in pihole_nodes:
try:
async with aiohttp.ClientSession() as session:
# Collect Pi-Hole metrics
async with session.get(f'http://{node}/admin/api/summary') as response:
if response.status == 200:
data = await response.json()
# Collect system metrics
async with session.get(f'http://{node}/admin/api/metrics/system') as sys_response:
if sys_response.status == 200:
sys_data = await sys_response.json()
metrics[node] = PerformanceMetrics(
query_rate=data.get('dns_queries_today', 0) / 86400, # Daily average
response_time=data.get('average_response_time', 0),
cache_hit_rate=data.get('cache_hit_rate', 0),
cpu_usage=sys_data.get('cpu_usage', 0),
memory_usage=sys_data.get('memory_usage', 0),
disk_io=sys_data.get('disk_io', 0)
)
except Exception as e:
print(f"Error collecting metrics from {node}: {str(e)}")
return metrics
async def analyze_performance(self, metrics: Dict[str, PerformanceMetrics]) -> Dict:
"""Analyze performance and generate optimization recommendations"""
recommendations = {}
for node, metric in metrics.items():
node_recommendations = []
# Cache optimization
if metric.cache_hit_rate < self.optimization_rules['cache_size']['threshold']:
node_recommendations.append({
'type': 'cache_increase',
'current_hit_rate': metric.cache_hit_rate,
'recommended_cache_size': self._calculate_optimal_cache_size(metric)
})
# Query forwarding optimization
if metric.query_rate > self.optimization_rules['dns_forward_max']['threshold']:
node_recommendations.append({
'type': 'forward_max_increase',
'current_query_rate': metric.query_rate,
'recommended_forward_max': self._calculate_optimal_forward_max(metric)
})
# Response time optimization
if metric.response_time > self.optimization_rules['query_timeout']['threshold']:
node_recommendations.append({
'type': 'timeout_adjustment',
'current_response_time': metric.response_time,
'recommended_timeout': self._calculate_optimal_timeout(metric)
})
# Resource usage optimization
if metric.cpu_usage > 80 or metric.memory_usage > 80:
node_recommendations.append({
'type': 'resource_scaling',
'cpu_usage': metric.cpu_usage,
'memory_usage': metric.memory_usage,
'recommendation': 'scale_up'
})
recommendations[node] = node_recommendations
return recommendations
def _calculate_optimal_cache_size(self, metric: PerformanceMetrics) -> int:
"""Calculate optimal cache size based on performance metrics"""
base_size = 10000
if metric.cache_hit_rate < 0.5:
return base_size * 3
elif metric.cache_hit_rate < 0.7:
return base_size * 2
else:
return base_size
def _calculate_optimal_forward_max(self, metric: PerformanceMetrics) -> int:
"""Calculate optimal DNS forward max based on query rate"""
base_max = 1000
if metric.query_rate > 500:
return base_max * 2
elif metric.query_rate > 200:
return int(base_max * 1.5)
else:
return base_max
def _calculate_optimal_timeout(self, metric: PerformanceMetrics) -> int:
"""Calculate optimal timeout based on response time"""
if metric.response_time > 5:
return 10
elif metric.response_time > 3:
return 8
elif metric.response_time > 2:
return 5
else:
return 3
async def apply_optimizations(self, node: str, recommendations: List[Dict]) -> bool:
"""Apply optimization recommendations to Pi-Hole node"""
try:
config_updates = {}
for rec in recommendations:
if rec['type'] == 'cache_increase':
config_updates['cache-size'] = rec['recommended_cache_size']
elif rec['type'] == 'forward_max_increase':
config_updates['dns-forward-max'] = rec['recommended_forward_max']
elif rec['type'] == 'timeout_adjustment':
config_updates['query-timeout'] = rec['recommended_timeout']
# Apply configuration updates
if config_updates:
await self._update_node_config(node, config_updates)
return True
except Exception as e:
print(f"Error applying optimizations to {node}: {str(e)}")
return False
async def _update_node_config(self, node: str, config_updates: Dict):
"""Update Pi-Hole node configuration"""
async with aiohttp.ClientSession() as session:
async with session.post(
f'http://{node}/admin/api/config/update',
json=config_updates
) as response:
if response.status != 200:
raise Exception(f"Failed to update config: {response.status}")
async def monitor_and_optimize(self, pihole_nodes: List[str], interval: int = 300):
"""Continuous monitoring and optimization"""
while True:
try:
# Collect metrics
metrics = await self.collect_metrics(pihole_nodes)
# Analyze performance
recommendations = await self.analyze_performance(metrics)
# Apply optimizations
for node, recs in recommendations.items():
if recs: # Only apply if there are recommendations
success = await self.apply_optimizations(node, recs)
if success:
print(f"Applied optimizations to {node}")
else:
print(f"Failed to apply optimizations to {node}")
# Store metrics for historical analysis
await self._store_metrics(metrics)
except Exception as e:
print(f"Error in monitoring loop: {str(e)}")
await asyncio.sleep(interval)
async def _store_metrics(self, metrics: Dict[str, PerformanceMetrics]):
"""Store metrics in Redis for historical analysis"""
timestamp = int(time.time())
for node, metric in metrics.items():
key = f"pihole:metrics:{node}:{timestamp}"
data = {
'query_rate': metric.query_rate,
'response_time': metric.response_time,
'cache_hit_rate': metric.cache_hit_rate,
'cpu_usage': metric.cpu_usage,
'memory_usage': metric.memory_usage,
'disk_io': metric.disk_io
}
self.redis.setex(key, 86400 * 7, json.dumps(data)) # Keep for 7 days
# Usage example
async def main():
optimizer = PiHoleOptimizer()
pihole_nodes = [
"pihole1.company.com",
"pihole2.company.com",
"pihole3.company.com"
]
# Start continuous monitoring and optimization
await optimizer.monitor_and_optimize(pihole_nodes)
if __name__ == "__main__":
asyncio.run(main())
Disaster Recovery and High Availability
Backup and Recovery System
#!/bin/bash
# Pi-Hole Enterprise Backup and Recovery System
# Version: 2.0
# Description: Comprehensive backup and disaster recovery for Pi-Hole
set -euo pipefail
# Configuration
BACKUP_DIR="/var/backups/pihole"
REMOTE_BACKUP_HOST="backup.company.com"
REMOTE_BACKUP_USER="pihole-backup"
REMOTE_BACKUP_DIR="/backups/pihole"
RETENTION_DAYS=30
LOG_FILE="/var/log/pihole/backup.log"
# Logging function
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}
# Create backup directory
mkdir -p "$BACKUP_DIR"
# Backup functions
backup_pihole_config() {
log "Starting Pi-Hole configuration backup..."
local timestamp=$(date +%Y%m%d-%H%M%S)
local backup_file="$BACKUP_DIR/pihole-config-$timestamp.tar.gz"
# Create configuration backup
tar -czf "$backup_file" \
/etc/pihole/ \
/etc/dnsmasq.d/ \
/opt/pihole/ \
/var/log/pihole/ \
/etc/systemd/system/pihole-FTL.service \
/etc/cron.d/pihole \
2>/dev/null || true
log "Configuration backup created: $backup_file"
# Verify backup
if tar -tzf "$backup_file" > /dev/null 2>&1; then
log "Backup verification successful"
echo "$backup_file"
else
log "ERROR: Backup verification failed"
return 1
fi
}
backup_pihole_data() {
log "Starting Pi-Hole data backup..."
local timestamp=$(date +%Y%m%d-%H%M%S)
local backup_file="$BACKUP_DIR/pihole-data-$timestamp.tar.gz"
# Stop Pi-Hole service
systemctl stop pihole-FTL
# Create data backup
tar -czf "$backup_file" \
/etc/pihole/pihole-FTL.db \
/etc/pihole/gravity.db \
/etc/pihole/dhcp.leases \
/var/log/pihole/pihole.log \
/var/log/pihole/FTL.log \
2>/dev/null || true
# Restart Pi-Hole service
systemctl start pihole-FTL
log "Data backup created: $backup_file"
# Verify backup
if tar -tzf "$backup_file" > /dev/null 2>&1; then
log "Data backup verification successful"
echo "$backup_file"
else
log "ERROR: Data backup verification failed"
return 1
fi
}
backup_system_state() {
log "Starting system state backup..."
local timestamp=$(date +%Y%m%d-%H%M%S)
local backup_file="$BACKUP_DIR/system-state-$timestamp.tar.gz"
# Create system state backup
tar -czf "$backup_file" \
/etc/systemd/system/ \
/etc/cron.d/ \
/etc/logrotate.d/ \
/etc/fail2ban/ \
/etc/ufw/ \
/etc/hosts \
/etc/resolv.conf \
/etc/network/interfaces \
2>/dev/null || true
log "System state backup created: $backup_file"
echo "$backup_file"
}
create_full_backup() {
log "Starting full backup..."
local timestamp=$(date +%Y%m%d-%H%M%S)
local full_backup_dir="$BACKUP_DIR/full-backup-$timestamp"
mkdir -p "$full_backup_dir"
# Backup all components
local config_backup=$(backup_pihole_config)
local data_backup=$(backup_pihole_data)
local system_backup=$(backup_system_state)
# Move backups to full backup directory
mv "$config_backup" "$full_backup_dir/"
mv "$data_backup" "$full_backup_dir/"
mv "$system_backup" "$full_backup_dir/"
# Create metadata
cat > "$full_backup_dir/metadata.json" << EOF
{
"backup_type": "full",
"timestamp": "$timestamp",
"hostname": "$(hostname)",
"pihole_version": "$(pihole version)",
"system_info": "$(uname -a)",
"disk_usage": "$(df -h)",
"network_config": "$(ip addr show)"
}
EOF
# Create final archive
local final_backup="$BACKUP_DIR/pihole-full-backup-$timestamp.tar.gz"
tar -czf "$final_backup" -C "$BACKUP_DIR" "full-backup-$timestamp"
# Clean up temporary directory
rm -rf "$full_backup_dir"
log "Full backup completed: $final_backup"
echo "$final_backup"
}
sync_to_remote() {
log "Starting remote backup sync..."
local backup_file="$1"
# Create remote directory
ssh "$REMOTE_BACKUP_USER@$REMOTE_BACKUP_HOST" \
"mkdir -p $REMOTE_BACKUP_DIR/$(hostname)"
# Sync backup file
rsync -av --progress "$backup_file" \
"$REMOTE_BACKUP_USER@$REMOTE_BACKUP_HOST:$REMOTE_BACKUP_DIR/$(hostname)/"
# Verify remote backup
if ssh "$REMOTE_BACKUP_USER@$REMOTE_BACKUP_HOST" \
"test -f $REMOTE_BACKUP_DIR/$(hostname)/$(basename $backup_file)"; then
log "Remote backup sync successful"
else
log "ERROR: Remote backup sync failed"
return 1
fi
}
cleanup_old_backups() {
log "Starting backup cleanup..."
# Local cleanup
find "$BACKUP_DIR" -name "*.tar.gz" -mtime +$RETENTION_DAYS -delete
# Remote cleanup
ssh "$REMOTE_BACKUP_USER@$REMOTE_BACKUP_HOST" \
"find $REMOTE_BACKUP_DIR/$(hostname) -name '*.tar.gz' -mtime +$RETENTION_DAYS -delete"
log "Backup cleanup completed"
}
restore_from_backup() {
log "Starting restore from backup..."
local backup_file="$1"
local restore_type="${2:-full}"
if [[ ! -f "$backup_file" ]]; then
log "ERROR: Backup file not found: $backup_file"
return 1
fi
# Create restore directory
local restore_dir="/tmp/pihole-restore-$(date +%Y%m%d-%H%M%S)"
mkdir -p "$restore_dir"
# Extract backup
tar -xzf "$backup_file" -C "$restore_dir"
# Stop Pi-Hole service
systemctl stop pihole-FTL
case "$restore_type" in
"config")
# Restore configuration
cp -r "$restore_dir/etc/pihole/"* /etc/pihole/
cp -r "$restore_dir/etc/dnsmasq.d/"* /etc/dnsmasq.d/
cp -r "$restore_dir/opt/pihole/"* /opt/pihole/
;;
"data")
# Restore data
cp "$restore_dir/etc/pihole/pihole-FTL.db" /etc/pihole/
cp "$restore_dir/etc/pihole/gravity.db" /etc/pihole/
cp "$restore_dir/etc/pihole/dhcp.leases" /etc/pihole/
;;
"full")
# Full restore
cp -r "$restore_dir/etc/pihole/"* /etc/pihole/
cp -r "$restore_dir/etc/dnsmasq.d/"* /etc/dnsmasq.d/
cp -r "$restore_dir/opt/pihole/"* /opt/pihole/
;;
esac
# Set proper permissions
chown -R pihole:pihole /etc/pihole/
chown -R pihole:pihole /opt/pihole/
chmod 644 /etc/pihole/*.conf
chmod 755 /opt/pihole/*.sh
# Start Pi-Hole service
systemctl start pihole-FTL
# Clean up
rm -rf "$restore_dir"
# Verify restore
if systemctl is-active pihole-FTL > /dev/null; then
log "Restore completed successfully"
else
log "ERROR: Restore failed - Pi-Hole service not running"
return 1
fi
}
# Health check function
health_check() {
log "Performing health check..."
local health_status=0
# Check Pi-Hole service
if ! systemctl is-active pihole-FTL > /dev/null; then
log "ERROR: Pi-Hole service not running"
health_status=1
fi
# Check DNS resolution
if ! nslookup google.com localhost > /dev/null 2>&1; then
log "ERROR: DNS resolution failed"
health_status=1
fi
# Check web interface
if ! curl -s http://localhost/admin/api/summary > /dev/null; then
log "ERROR: Web interface not accessible"
health_status=1
fi
# Check disk space
local disk_usage=$(df / | awk 'NR==2 {print $5}' | sed 's/%//')
if [[ $disk_usage -gt 90 ]]; then
log "WARNING: Disk usage is $disk_usage%"
health_status=1
fi
if [[ $health_status -eq 0 ]]; then
log "Health check passed"
else
log "Health check failed"
fi
return $health_status
}
# Main backup function
main() {
case "${1:-full}" in
"config")
backup_pihole_config
;;
"data")
backup_pihole_data
;;
"system")
backup_system_state
;;
"full")
local backup_file=$(create_full_backup)
sync_to_remote "$backup_file"
cleanup_old_backups
;;
"restore")
restore_from_backup "$2" "${3:-full}"
;;
"health")
health_check
;;
*)
echo "Usage: $0 {config|data|system|full|restore|health}"
exit 1
;;
esac
}
main "$@"
Integration with Enterprise Systems
Active Directory Integration
#!/usr/bin/env python3
"""
Pi-Hole Active Directory Integration
Sync DNS records and user policies with AD
"""
import ldap3
import asyncio
import json
from typing import Dict, List
from dataclasses import dataclass
@dataclass
class ADUser:
username: str
email: str
department: str
groups: List[str]
dns_policies: List[str]
@dataclass
class DNSPolicy:
name: str
blocked_domains: List[str]
allowed_domains: List[str]
priority: int
class PiHoleADIntegration:
def __init__(self, ad_server: str, bind_user: str, bind_password: str):
self.ad_server = ad_server
self.bind_user = bind_user
self.bind_password = bind_password
self.connection = None
async def connect_to_ad(self) -> bool:
"""Connect to Active Directory"""
try:
server = ldap3.Server(self.ad_server, use_ssl=True)
self.connection = ldap3.Connection(
server,
user=self.bind_user,
password=self.bind_password,
auto_bind=True
)
return True
except Exception as e:
print(f"AD connection failed: {str(e)}")
return False
async def sync_dns_records(self) -> Dict:
"""Sync DNS records from Active Directory"""
if not self.connection:
await self.connect_to_ad()
dns_records = {}
try:
# Query DNS records from AD
search_filter = '(&(objectClass=dnsNode)(!(dC=*)))'
search_base = 'CN=MicrosoftDNS,DC=DomainDnsZones,DC=company,DC=com'
self.connection.search(
search_base,
search_filter,
attributes=['dnsRecord', 'name', 'distinguishedName']
)
for entry in self.connection.entries:
record_name = str(entry.name)
record_data = entry.dnsRecord
# Parse DNS record data
if record_data:
dns_records[record_name] = self._parse_dns_record(record_data)
return dns_records
except Exception as e:
print(f"DNS sync failed: {str(e)}")
return {}
def _parse_dns_record(self, record_data) -> Dict:
"""Parse AD DNS record data"""
# Simplified parsing - actual implementation would handle various record types
return {
'type': 'A',
'value': str(record_data),
'ttl': 300
}
async def sync_user_policies(self) -> Dict[str, ADUser]:
"""Sync user DNS policies from Active Directory"""
if not self.connection:
await self.connect_to_ad()
users = {}
try:
# Query users and their group memberships
search_filter = '(&(objectClass=user)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))'
search_base = 'DC=company,DC=com'
self.connection.search(
search_base,
search_filter,
attributes=['sAMAccountName', 'mail', 'department', 'memberOf']
)
for entry in self.connection.entries:
username = str(entry.sAMAccountName)
email = str(entry.mail) if entry.mail else ''
department = str(entry.department) if entry.department else ''
# Parse group memberships
groups = []
if entry.memberOf:
for group_dn in entry.memberOf:
group_name = group_dn.split(',')[0].split('=')[1]
groups.append(group_name)
# Determine DNS policies based on groups
dns_policies = self._get_dns_policies_for_groups(groups)
users[username] = ADUser(
username=username,
email=email,
department=department,
groups=groups,
dns_policies=dns_policies
)
return users
except Exception as e:
print(f"User policy sync failed: {str(e)}")
return {}
def _get_dns_policies_for_groups(self, groups: List[str]) -> List[str]:
"""Map AD groups to DNS policies"""
policy_mapping = {
'IT_Department': ['admin_policy', 'development_policy'],
'Finance_Department': ['finance_policy', 'restricted_policy'],
'HR_Department': ['hr_policy', 'restricted_policy'],
'Sales_Department': ['sales_policy', 'social_media_policy'],
'Executives': ['executive_policy', 'unrestricted_policy']
}
policies = []
for group in groups:
if group in policy_mapping:
policies.extend(policy_mapping[group])
return list(set(policies)) # Remove duplicates
async def update_pihole_config(self, users: Dict[str, ADUser], dns_policies: Dict[str, DNSPolicy]):
"""Update Pi-Hole configuration with AD policies"""
# Generate group-based configuration
config_sections = []
for policy_name, policy in dns_policies.items():
# Generate blocked domains configuration
if policy.blocked_domains:
config_sections.append(f"# Policy: {policy_name}")
for domain in policy.blocked_domains:
config_sections.append(f"address=/{domain}/0.0.0.0")
config_sections.append("")
# Write configuration to file
config_content = "\n".join(config_sections)
with open('/etc/dnsmasq.d/10-ad-policies.conf', 'w') as f:
f.write(config_content)
# Restart Pi-Hole to apply changes
import subprocess
subprocess.run(['systemctl', 'restart', 'pihole-FTL'])
# SIEM Integration
class PiHoleSIEMIntegration:
def __init__(self, siem_endpoint: str, api_key: str):
self.siem_endpoint = siem_endpoint
self.api_key = api_key
async def send_dns_logs(self, log_entries: List[Dict]):
"""Send DNS logs to SIEM system"""
import aiohttp
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
async with aiohttp.ClientSession() as session:
async with session.post(
f'{self.siem_endpoint}/api/logs/dns',
json={'entries': log_entries},
headers=headers
) as response:
if response.status == 200:
print("DNS logs sent to SIEM successfully")
else:
print(f"Failed to send DNS logs to SIEM: {response.status}")
async def monitor_threat_indicators(self):
"""Monitor for threat indicators in DNS logs"""
import aiohttp
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
async with aiohttp.ClientSession() as session:
async with session.get(
f'{self.siem_endpoint}/api/threats/dns',
headers=headers
) as response:
if response.status == 200:
threats = await response.json()
return threats.get('indicators', [])
else:
print(f"Failed to get threat indicators: {response.status}")
return []
# Usage example
async def main():
# AD Integration
ad_integration = PiHoleADIntegration(
ad_server='ad.company.com',
bind_user='pihole-service@company.com',
bind_password='password123'
)
# Sync users and policies
users = await ad_integration.sync_user_policies()
dns_records = await ad_integration.sync_dns_records()
# Define DNS policies
dns_policies = {
'admin_policy': DNSPolicy(
name='admin_policy',
blocked_domains=[],
allowed_domains=['*'],
priority=1
),
'restricted_policy': DNSPolicy(
name='restricted_policy',
blocked_domains=['facebook.com', 'twitter.com', 'instagram.com'],
allowed_domains=['*.company.com'],
priority=10
)
}
# Update Pi-Hole configuration
await ad_integration.update_pihole_config(users, dns_policies)
# SIEM Integration
siem = PiHoleSIEMIntegration(
siem_endpoint='https://siem.company.com',
api_key='your-api-key'
)
# Monitor threats
threats = await siem.monitor_threat_indicators()
print(f"Found {len(threats)} threat indicators")
if __name__ == "__main__":
asyncio.run(main())
Advanced Troubleshooting
Diagnostic and Troubleshooting Framework
#!/bin/bash
# Pi-Hole Advanced Troubleshooting Script
# Version: 2.0
# Description: Comprehensive diagnostics and troubleshooting
set -euo pipefail
# Configuration
LOG_FILE="/var/log/pihole/troubleshooting.log"
REPORT_FILE="/tmp/pihole-diagnostic-report-$(date +%Y%m%d-%H%M%S).txt"
NETWORK_TIMEOUT=5
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Logging function
log() {
echo -e "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}
# Report function
report() {
echo -e "$1" | tee -a "$REPORT_FILE"
}
# Check functions
check_system_health() {
report "\n${BLUE}=== System Health Check ===${NC}"
# Check system load
local load=$(uptime | awk -F'load average:' '{print $2}')
report "System Load: $load"
# Check memory usage
local memory=$(free -h | awk '/^Mem:/ {print $3 "/" $2 " (" $3/$2*100 "%)"}')
report "Memory Usage: $memory"
# Check disk space
local disk=$(df -h / | awk 'NR==2 {print $3 "/" $2 " (" $5 ")"}')
report "Disk Usage: $disk"
# Check CPU usage
local cpu=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | sed 's/%us,//')
report "CPU Usage: ${cpu}%"
# Check system uptime
local uptime=$(uptime -p)
report "System Uptime: $uptime"
}
check_pihole_services() {
report "\n${BLUE}=== Pi-Hole Services Check ===${NC}"
local services=("pihole-FTL" "lighttpd" "dnsmasq")
for service in "${services[@]}"; do
if systemctl is-active "$service" > /dev/null 2>&1; then
report "${GREEN}✓${NC} $service is running"
else
report "${RED}✗${NC} $service is not running"
# Get service status
local status=$(systemctl status "$service" --no-pager -l | head -10)
report "Service Status:\n$status"
fi
done
}
check_dns_functionality() {
report "\n${BLUE}=== DNS Functionality Check ===${NC}"
# Test DNS resolution
local test_domains=("google.com" "cloudflare.com" "github.com")
for domain in "${test_domains[@]}"; do
if nslookup "$domain" localhost > /dev/null 2>&1; then
report "${GREEN}✓${NC} DNS resolution working for $domain"
else
report "${RED}✗${NC} DNS resolution failed for $domain"
fi
done
# Test DNS over HTTPS
if curl -s -H "accept: application/dns-json" \
"https://cloudflare-dns.com/dns-query?name=google.com&type=A" > /dev/null; then
report "${GREEN}✓${NC} DNS over HTTPS working"
else
report "${RED}✗${NC} DNS over HTTPS failed"
fi
# Test DNSSEC
if dig +dnssec google.com @localhost | grep -q "RRSIG"; then
report "${GREEN}✓${NC} DNSSEC working"
else
report "${YELLOW}!${NC} DNSSEC not working or not enabled"
fi
}
check_network_connectivity() {
report "\n${BLUE}=== Network Connectivity Check ===${NC}"
local upstream_servers=("1.1.1.1" "8.8.8.8" "1.0.0.1" "8.8.4.4")
for server in "${upstream_servers[@]}"; do
if ping -c 1 -W "$NETWORK_TIMEOUT" "$server" > /dev/null 2>&1; then
report "${GREEN}✓${NC} Can reach upstream DNS server $server"
else
report "${RED}✗${NC} Cannot reach upstream DNS server $server"
fi
done
# Test HTTP connectivity
if curl -s --max-time "$NETWORK_TIMEOUT" http://google.com > /dev/null; then
report "${GREEN}✓${NC} HTTP connectivity working"
else
report "${RED}✗${NC} HTTP connectivity failed"
fi
# Test HTTPS connectivity
if curl -s --max-time "$NETWORK_TIMEOUT" https://google.com > /dev/null; then
report "${GREEN}✓${NC} HTTPS connectivity working"
else
report "${RED}✗${NC} HTTPS connectivity failed"
fi
}
check_pihole_configuration() {
report "\n${BLUE}=== Pi-Hole Configuration Check ===${NC}"
# Check main configuration
if [[ -f /etc/pihole/setupVars.conf ]]; then
report "${GREEN}✓${NC} Main configuration file exists"
# Check key configuration values
local webpassword=$(grep "WEBPASSWORD" /etc/pihole/setupVars.conf | cut -d'=' -f2)
if [[ -n "$webpassword" ]]; then
report "${GREEN}✓${NC} Web password is set"
else
report "${YELLOW}!${NC} Web password is not set"
fi
local dns_servers=$(grep "PIHOLE_DNS" /etc/pihole/setupVars.conf | cut -d'=' -f2)
report "Configured DNS servers: $dns_servers"
else
report "${RED}✗${NC} Main configuration file missing"
fi
# Check DNSMasq configuration
if [[ -d /etc/dnsmasq.d ]]; then
local config_files=$(find /etc/dnsmasq.d -name "*.conf" | wc -l)
report "${GREEN}✓${NC} DNSMasq configuration directory exists ($config_files files)"
# List configuration files
find /etc/dnsmasq.d -name "*.conf" | while read -r file; do
report " - $(basename "$file")"
done
else
report "${RED}✗${NC} DNSMasq configuration directory missing"
fi
# Check gravity database
if [[ -f /etc/pihole/gravity.db ]]; then
local db_size=$(stat -c%s /etc/pihole/gravity.db)
report "${GREEN}✓${NC} Gravity database exists ($(numfmt --to=iec $db_size))"
# Check database integrity
if sqlite3 /etc/pihole/gravity.db "PRAGMA integrity_check;" | grep -q "ok"; then
report "${GREEN}✓${NC} Gravity database integrity check passed"
else
report "${RED}✗${NC} Gravity database integrity check failed"
fi
else
report "${RED}✗${NC} Gravity database missing"
fi
}
check_log_files() {
report "\n${BLUE}=== Log Files Check ===${NC}"
local log_files=(
"/var/log/pihole/pihole.log"
"/var/log/pihole/FTL.log"
"/var/log/pihole/pihole-FTL.log"
)
for log_file in "${log_files[@]}"; do
if [[ -f "$log_file" ]]; then
local size=$(stat -c%s "$log_file")
local modified=$(stat -c%y "$log_file")
report "${GREEN}✓${NC} $log_file exists ($(numfmt --to=iec $size), modified: $modified)"
# Check for recent errors
local errors=$(tail -100 "$log_file" | grep -i "error\|fail\|critical" | wc -l)
if [[ $errors -gt 0 ]]; then
report "${YELLOW}!${NC} Found $errors recent errors in $log_file"
fi
else
report "${RED}✗${NC} $log_file missing"
fi
done
}
check_web_interface() {
report "\n${BLUE}=== Web Interface Check ===${NC}"
# Check web server
if systemctl is-active lighttpd > /dev/null 2>&1; then
report "${GREEN}✓${NC} Lighttpd web server is running"
else
report "${RED}✗${NC} Lighttpd web server is not running"
fi
# Check web interface accessibility
if curl -s http://localhost/admin/api/summary > /dev/null; then
report "${GREEN}✓${NC} Web interface API accessible"
else
report "${RED}✗${NC} Web interface API not accessible"
fi
# Check PHP
if php -v > /dev/null 2>&1; then
local php_version=$(php -v | head -1 | cut -d' ' -f2)
report "${GREEN}✓${NC} PHP is available (version: $php_version)"
else
report "${RED}✗${NC} PHP is not available"
fi
}
check_performance_metrics() {
report "\n${BLUE}=== Performance Metrics ===${NC}"
# DNS query statistics
if [[ -f /etc/pihole/pihole-FTL.db ]]; then
local total_queries=$(sqlite3 /etc/pihole/pihole-FTL.db \
"SELECT COUNT(*) FROM queries WHERE timestamp > strftime('%s', 'now', '-1 day');")
report "DNS queries (last 24h): $total_queries"
local blocked_queries=$(sqlite3 /etc/pihole/pihole-FTL.db \
"SELECT COUNT(*) FROM queries WHERE timestamp > strftime('%s', 'now', '-1 day') AND status IN (1,4,5,6,7,8,9,10,11);")
report "Blocked queries (last 24h): $blocked_queries"
if [[ $total_queries -gt 0 ]]; then
local block_percentage=$(echo "scale=2; $blocked_queries * 100 / $total_queries" | bc)
report "Block percentage: ${block_percentage}%"
fi
fi
# Cache statistics
local cache_info=$(echo ">cache-stats" | nc localhost 4711 2>/dev/null || echo "Cache stats unavailable")
report "Cache statistics: $cache_info"
# Memory usage by Pi-Hole
local pihole_memory=$(ps -o pid,rss,comm -p $(pgrep pihole-FTL) | tail -1 | awk '{print $2}')
if [[ -n "$pihole_memory" ]]; then
report "Pi-Hole memory usage: $(numfmt --to=iec $((pihole_memory * 1024)))"
fi
}
run_network_diagnostics() {
report "\n${BLUE}=== Network Diagnostics ===${NC}"
# Network interface information
report "Network interfaces:"
ip addr show | grep -E "^[0-9]+:|inet " | while read -r line; do
report " $line"
done
# Routing table
report "\nRouting table:"
ip route show | while read -r line; do
report " $line"
done
# DNS configuration
report "\nDNS configuration:"
if [[ -f /etc/resolv.conf ]]; then
cat /etc/resolv.conf | while read -r line; do
report " $line"
done
fi
# Port usage
report "\nPort usage:"
netstat -tuln | grep -E ":53|:80|:443|:4711" | while read -r line; do
report " $line"
done
}
generate_recommendations() {
report "\n${BLUE}=== Recommendations ===${NC}"
# Check system load
local load_avg=$(uptime | awk -F'load average:' '{print $2}' | awk '{print $1}' | tr -d ',')
if (( $(echo "$load_avg > 2.0" | bc -l) )); then
report "${YELLOW}!${NC} High system load detected. Consider optimizing or scaling."
fi
# Check memory usage
local memory_percent=$(free | awk '/^Mem:/ {print $3/$2*100}')
if (( $(echo "$memory_percent > 80" | bc -l) )); then
report "${YELLOW}!${NC} High memory usage detected. Consider increasing cache size limits."
fi
# Check disk usage
local disk_percent=$(df / | awk 'NR==2 {print $5}' | sed 's/%//')
if [[ $disk_percent -gt 85 ]]; then
report "${YELLOW}!${NC} High disk usage detected. Consider log rotation or cleanup."
fi
# Check for old logs
local old_logs=$(find /var/log/pihole -name "*.log*" -mtime +7 | wc -l)
if [[ $old_logs -gt 0 ]]; then
report "${YELLOW}!${NC} Old log files found. Consider implementing log rotation."
fi
# Check for database optimization
if [[ -f /etc/pihole/pihole-FTL.db ]]; then
local db_size=$(stat -c%s /etc/pihole/pihole-FTL.db)
if [[ $db_size -gt 100000000 ]]; then # 100MB
report "${YELLOW}!${NC} Large database detected. Consider optimizing or archiving old data."
fi
fi
}
# Main diagnostic function
main() {
log "Starting Pi-Hole diagnostic analysis..."
report "Pi-Hole Diagnostic Report"
report "Generated: $(date)"
report "Hostname: $(hostname)"
report "Pi-Hole Version: $(pihole version 2>/dev/null || echo 'Unknown')"
check_system_health
check_pihole_services
check_dns_functionality
check_network_connectivity
check_pihole_configuration
check_log_files
check_web_interface
check_performance_metrics
run_network_diagnostics
generate_recommendations
report "\n${GREEN}Diagnostic analysis completed.${NC}"
report "Report saved to: $REPORT_FILE"
log "Pi-Hole diagnostic analysis completed. Report: $REPORT_FILE"
# Display summary
echo -e "\n${GREEN}Pi-Hole Diagnostic Summary:${NC}"
echo "Full report saved to: $REPORT_FILE"
echo "Log file: $LOG_FILE"
# Count issues
local errors=$(grep -c "✗" "$REPORT_FILE" || echo 0)
local warnings=$(grep -c "!" "$REPORT_FILE" || echo 0)
echo "Issues found: $errors errors, $warnings warnings"
if [[ $errors -gt 0 ]]; then
echo -e "${RED}Critical issues detected. Please review the full report.${NC}"
return 1
elif [[ $warnings -gt 0 ]]; then
echo -e "${YELLOW}Warnings detected. Please review the recommendations.${NC}"
return 2
else
echo -e "${GREEN}All checks passed successfully.${NC}"
return 0
fi
}
# Run diagnostics
main "$@"
Best Practices and Recommendations
Enterprise Pi-Hole Implementation Guidelines
Architecture Design
- Deploy Pi-Hole in active-active configuration
- Use load balancers for DNS query distribution
- Implement geographic distribution for global networks
- Plan for 20-30% capacity overhead
Security Implementation
- Enable DNSSEC validation
- Implement DNS over HTTPS/TLS
- Use threat intelligence feeds
- Regular security hardening updates
Monitoring and Alerting
- Set up comprehensive metrics collection
- Configure alerting for service failures
- Monitor query patterns for anomalies
- Track performance degradation
Backup and Recovery
- Automated daily backups
- Test recovery procedures monthly
- Document recovery time objectives
- Maintain configuration version control
Performance Optimization
- Tune cache sizes based on query patterns
- Optimize upstream DNS selection
- Regular performance analysis
- Scale horizontally when needed
This comprehensive enterprise Pi-Hole DNS management guide provides the foundation for implementing robust, scalable, and secure DNS infrastructure that can handle enterprise-scale requirements while maintaining high availability and performance standards.
<function_calls>