Advanced Analytics Database Performance Tuning: Optimization Strategies for High-Scale Data Workloads
Advanced Analytics Database Performance Tuning: Optimization Strategies for High-Scale Data Workloads
Analytics databases face unique performance challenges due to complex queries, large data volumes, and diverse access patterns. Modern analytical workloads require sophisticated optimization strategies that go beyond traditional OLTP tuning, involving advanced indexing, intelligent partitioning, query plan optimization, and hardware-aware configurations.
This comprehensive guide explores advanced database performance tuning techniques specifically designed for analytics workloads, covering both traditional relational databases and modern analytical engines with practical implementations and monitoring strategies.
Database Performance Fundamentals for Analytics
Understanding Analytics Query Patterns
Analytics queries differ significantly from OLTP queries in their characteristics, requiring specialized optimization approaches.
# Advanced database performance analysis and optimization framework
import psycopg2
import pymysql
import sqlalchemy
from sqlalchemy import create_engine, text
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import logging
import json
import time
import re
from collections import defaultdict
import matplotlib.pyplot as plt
import seaborn as sns
@dataclass
class QueryPerformanceMetrics:
"""Query performance metrics structure"""
query_id: str
query_text: str
execution_time_ms: float
rows_examined: int
rows_returned: int
cpu_time_ms: float
io_time_ms: float
memory_used_mb: float
index_scans: int
sequential_scans: int
temporary_tables: int
sort_operations: int
join_operations: int
timestamp: datetime = field(default_factory=datetime.now)
@property
def efficiency_ratio(self) -> float:
"""Calculate query efficiency ratio"""
if self.rows_examined == 0:
return float('inf')
return self.rows_returned / self.rows_examined
@property
def selectivity(self) -> float:
"""Calculate query selectivity"""
if self.rows_examined == 0:
return 0.0
return (self.rows_returned / self.rows_examined) * 100
@dataclass
class IndexRecommendation:
"""Index recommendation structure"""
table_name: str
column_names: List[str]
index_type: str # btree, hash, gin, gist, etc.
estimated_benefit: float
space_overhead_mb: float
maintenance_cost: float
reasoning: str
priority: str # high, medium, low
class DatabasePerformanceAnalyzer:
"""Advanced database performance analysis and optimization"""
def __init__(self, connection_config: Dict[str, Any]):
self.connection_config = connection_config
self.db_type = connection_config.get("db_type", "postgresql")
self.engine = self._create_engine()
self.query_cache: Dict[str, QueryPerformanceMetrics] = {}
self.performance_history: List[QueryPerformanceMetrics] = []
def _create_engine(self) -> sqlalchemy.Engine:
"""Create database engine based on configuration"""
if self.db_type == "postgresql":
connection_string = (
f"postgresql://{self.connection_config['user']}:"
f"{self.connection_config['password']}@"
f"{self.connection_config['host']}:"
f"{self.connection_config.get('port', 5432)}/"
f"{self.connection_config['database']}"
)
elif self.db_type == "mysql":
connection_string = (
f"mysql+pymysql://{self.connection_config['user']}:"
f"{self.connection_config['password']}@"
f"{self.connection_config['host']}:"
f"{self.connection_config.get('port', 3306)}/"
f"{self.connection_config['database']}"
)
else:
raise ValueError(f"Unsupported database type: {self.db_type}")
return create_engine(connection_string, echo=False)
def analyze_query_performance(self, query: str,
explain_analyze: bool = True) -> QueryPerformanceMetrics:
"""Comprehensive query performance analysis"""
query_hash = self._hash_query(query)
# Check cache first
if query_hash in self.query_cache:
cached_metrics = self.query_cache[query_hash]
# Return cached metrics if recent (within 1 hour)
if datetime.now() - cached_metrics.timestamp < timedelta(hours=1):
return cached_metrics
with self.engine.connect() as conn:
start_time = time.time()
if explain_analyze:
# Get execution plan with timing
explain_query = self._build_explain_query(query, analyze=True)
plan_result = conn.execute(text(explain_query))
execution_plan = plan_result.fetchall()
# Parse execution plan for metrics
metrics = self._parse_execution_plan(execution_plan, query)
else:
# Simple execution timing
result = conn.execute(text(query))
rows = result.fetchall()
execution_time = (time.time() - start_time) * 1000
metrics = QueryPerformanceMetrics(
query_id=query_hash,
query_text=query,
execution_time_ms=execution_time,
rows_examined=len(rows),
rows_returned=len(rows),
cpu_time_ms=execution_time, # Approximation
io_time_ms=0,
memory_used_mb=0,
index_scans=0,
sequential_scans=0,
temporary_tables=0,
sort_operations=0,
join_operations=0
)
# Cache metrics
self.query_cache[query_hash] = metrics
self.performance_history.append(metrics)
return metrics
def _build_explain_query(self, query: str, analyze: bool = True) -> str:
"""Build EXPLAIN query based on database type"""
if self.db_type == "postgresql":
if analyze:
return f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}"
else:
return f"EXPLAIN (FORMAT JSON) {query}"
elif self.db_type == "mysql":
if analyze:
return f"EXPLAIN ANALYZE {query}"
else:
return f"EXPLAIN FORMAT=JSON {query}"
return f"EXPLAIN {query}"
def _parse_execution_plan(self, plan_result: List, query: str) -> QueryPerformanceMetrics:
"""Parse database execution plan for performance metrics"""
if self.db_type == "postgresql":
return self._parse_postgresql_plan(plan_result, query)
elif self.db_type == "mysql":
return self._parse_mysql_plan(plan_result, query)
# Fallback for other databases
return QueryPerformanceMetrics(
query_id=self._hash_query(query),
query_text=query,
execution_time_ms=0,
rows_examined=0,
rows_returned=0,
cpu_time_ms=0,
io_time_ms=0,
memory_used_mb=0,
index_scans=0,
sequential_scans=0,
temporary_tables=0,
sort_operations=0,
join_operations=0
)
def _parse_postgresql_plan(self, plan_result: List, query: str) -> QueryPerformanceMetrics:
"""Parse PostgreSQL execution plan"""
if not plan_result or not plan_result[0]:
return self._create_empty_metrics(query)
plan_json = plan_result[0][0] if isinstance(plan_result[0], tuple) else plan_result[0]
if isinstance(plan_json, str):
plan_data = json.loads(plan_json)
else:
plan_data = plan_json
execution_stats = plan_data[0]
planning_time = execution_stats.get("Planning Time", 0)
execution_time = execution_stats.get("Execution Time", 0)
# Extract metrics from plan nodes
total_cost = execution_stats["Plan"]["Total Cost"]
actual_rows = execution_stats["Plan"].get("Actual Rows", 0)
# Analyze plan nodes for detailed metrics
node_stats = self._analyze_plan_nodes(execution_stats["Plan"])
return QueryPerformanceMetrics(
query_id=self._hash_query(query),
query_text=query,
execution_time_ms=execution_time,
rows_examined=node_stats["rows_examined"],
rows_returned=actual_rows,
cpu_time_ms=execution_time - node_stats["io_time"],
io_time_ms=node_stats["io_time"],
memory_used_mb=node_stats["memory_used"] / 1024 / 1024,
index_scans=node_stats["index_scans"],
sequential_scans=node_stats["seq_scans"],
temporary_tables=node_stats["temp_tables"],
sort_operations=node_stats["sorts"],
join_operations=node_stats["joins"]
)
def _analyze_plan_nodes(self, plan_node: Dict[str, Any]) -> Dict[str, Any]:
"""Recursively analyze plan nodes for statistics"""
stats = {
"rows_examined": 0,
"io_time": 0,
"memory_used": 0,
"index_scans": 0,
"seq_scans": 0,
"temp_tables": 0,
"sorts": 0,
"joins": 0
}
node_type = plan_node.get("Node Type", "")
# Update stats based on node type
if "Scan" in node_type:
stats["rows_examined"] += plan_node.get("Actual Rows", 0)
if "Index" in node_type:
stats["index_scans"] += 1
elif "Seq" in node_type:
stats["seq_scans"] += 1
elif "Join" in node_type:
stats["joins"] += 1
stats["rows_examined"] += plan_node.get("Actual Rows", 0)
elif "Sort" in node_type:
stats["sorts"] += 1
stats["memory_used"] += plan_node.get("Sort Space Used", 0)
elif "Temp" in node_type or "Material" in node_type:
stats["temp_tables"] += 1
# Add I/O statistics if available
if "I/O Read Time" in plan_node:
stats["io_time"] += plan_node["I/O Read Time"]
if "I/O Write Time" in plan_node:
stats["io_time"] += plan_node["I/O Write Time"]
# Recursively process child nodes
for child in plan_node.get("Plans", []):
child_stats = self._analyze_plan_nodes(child)
for key in stats:
stats[key] += child_stats[key]
return stats
def _parse_mysql_plan(self, plan_result: List, query: str) -> QueryPerformanceMetrics:
"""Parse MySQL execution plan"""
# MySQL EXPLAIN ANALYZE parsing would be implemented here
# This is a simplified version
return QueryPerformanceMetrics(
query_id=self._hash_query(query),
query_text=query,
execution_time_ms=0,
rows_examined=0,
rows_returned=0,
cpu_time_ms=0,
io_time_ms=0,
memory_used_mb=0,
index_scans=0,
sequential_scans=0,
temporary_tables=0,
sort_operations=0,
join_operations=0
)
def _create_empty_metrics(self, query: str) -> QueryPerformanceMetrics:
"""Create empty metrics for fallback cases"""
return QueryPerformanceMetrics(
query_id=self._hash_query(query),
query_text=query,
execution_time_ms=0,
rows_examined=0,
rows_returned=0,
cpu_time_ms=0,
io_time_ms=0,
memory_used_mb=0,
index_scans=0,
sequential_scans=0,
temporary_tables=0,
sort_operations=0,
join_operations=0
)
def _hash_query(self, query: str) -> str:
"""Generate hash for query caching"""
import hashlib
# Normalize query for hashing
normalized = re.sub(r'\s+', ' ', query.strip().lower())
return hashlib.md5(normalized.encode()).hexdigest()
def identify_slow_queries(self, threshold_ms: float = 1000) -> List[QueryPerformanceMetrics]:
"""Identify queries exceeding performance threshold"""
slow_queries = [
metrics for metrics in self.performance_history
if metrics.execution_time_ms > threshold_ms
]
# Sort by execution time descending
slow_queries.sort(key=lambda x: x.execution_time_ms, reverse=True)
return slow_queries
def analyze_query_patterns(self) -> Dict[str, Any]:
"""Analyze query patterns for optimization opportunities"""
if not self.performance_history:
return {"message": "No query history available"}
analysis = {
"total_queries": len(self.performance_history),
"avg_execution_time": np.mean([q.execution_time_ms for q in self.performance_history]),
"median_execution_time": np.median([q.execution_time_ms for q in self.performance_history]),
"p95_execution_time": np.percentile([q.execution_time_ms for q in self.performance_history], 95),
"efficiency_stats": {
"avg_efficiency_ratio": np.mean([q.efficiency_ratio for q in self.performance_history if q.efficiency_ratio != float('inf')]),
"low_efficiency_queries": len([q for q in self.performance_history if q.efficiency_ratio < 0.1])
},
"scan_patterns": {
"sequential_scans": sum(q.sequential_scans for q in self.performance_history),
"index_scans": sum(q.index_scans for q in self.performance_history),
"seq_scan_ratio": 0
},
"resource_usage": {
"total_memory_mb": sum(q.memory_used_mb for q in self.performance_history),
"avg_memory_per_query": np.mean([q.memory_used_mb for q in self.performance_history]),
"high_memory_queries": len([q for q in self.performance_history if q.memory_used_mb > 100])
}
}
# Calculate sequential scan ratio
total_scans = analysis["scan_patterns"]["sequential_scans"] + analysis["scan_patterns"]["index_scans"]
if total_scans > 0:
analysis["scan_patterns"]["seq_scan_ratio"] = analysis["scan_patterns"]["sequential_scans"] / total_scans
return analysis
def generate_optimization_recommendations(self) -> List[str]:
"""Generate optimization recommendations based on analysis"""
recommendations = []
patterns = self.analyze_query_patterns()
# Check for high sequential scan ratio
if patterns["scan_patterns"]["seq_scan_ratio"] > 0.3:
recommendations.append("High sequential scan ratio detected - consider adding indexes")
# Check for low efficiency queries
if patterns["efficiency_stats"]["low_efficiency_queries"] > 0:
recommendations.append(f"{patterns['efficiency_stats']['low_efficiency_queries']} queries with low efficiency - review WHERE clauses and joins")
# Check for high memory usage
if patterns["resource_usage"]["high_memory_queries"] > 0:
recommendations.append(f"{patterns['resource_usage']['high_memory_queries']} queries using >100MB memory - consider query optimization")
# Check for slow queries
slow_queries = self.identify_slow_queries()
if slow_queries:
recommendations.append(f"{len(slow_queries)} queries slower than 1 second - prioritize optimization")
return recommendations
class IndexOptimizer:
"""Advanced index optimization and recommendation system"""
def __init__(self, performance_analyzer: DatabasePerformanceAnalyzer):
self.analyzer = performance_analyzer
self.db_type = performance_analyzer.db_type
def analyze_existing_indexes(self) -> Dict[str, Any]:
"""Analyze existing indexes for optimization opportunities"""
with self.analyzer.engine.connect() as conn:
if self.db_type == "postgresql":
index_query = """
SELECT
schemaname,
tablename,
indexname,
indexdef,
pg_size_pretty(pg_relation_size(indexrelid)) as size,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_indexes
JOIN pg_stat_user_indexes USING (schemaname, tablename, indexname)
WHERE schemaname = 'public'
ORDER BY pg_relation_size(indexrelid) DESC
"""
elif self.db_type == "mysql":
index_query = """
SELECT
TABLE_SCHEMA,
TABLE_NAME,
INDEX_NAME,
COLUMN_NAME,
CARDINALITY,
INDEX_TYPE
FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = DATABASE()
ORDER BY TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX
"""
result = conn.execute(text(index_query))
indexes = result.fetchall()
return self._analyze_index_usage(indexes)
def _analyze_index_usage(self, indexes: List) -> Dict[str, Any]:
"""Analyze index usage patterns"""
analysis = {
"total_indexes": len(indexes),
"unused_indexes": [],
"duplicate_indexes": [],
"oversized_indexes": [],
"recommendations": []
}
if self.db_type == "postgresql":
for index in indexes:
# Check for unused indexes
if index.idx_scan == 0:
analysis["unused_indexes"].append({
"schema": index.schemaname,
"table": index.tablename,
"index": index.indexname,
"size": index.size
})
# Check for oversized indexes with low usage
if index.idx_scan < 100 and "GB" in index.size:
analysis["oversized_indexes"].append({
"schema": index.schemaname,
"table": index.tablename,
"index": index.indexname,
"size": index.size,
"scans": index.idx_scan
})
# Generate recommendations
if analysis["unused_indexes"]:
analysis["recommendations"].append(f"Consider dropping {len(analysis['unused_indexes'])} unused indexes")
if analysis["oversized_indexes"]:
analysis["recommendations"].append(f"Review {len(analysis['oversized_indexes'])} large, low-usage indexes")
return analysis
def recommend_indexes(self, query_history: List[QueryPerformanceMetrics]) -> List[IndexRecommendation]:
"""Recommend new indexes based on query patterns"""
recommendations = []
# Analyze slow queries for index opportunities
slow_queries = [q for q in query_history if q.execution_time_ms > 1000]
for query_metrics in slow_queries:
query = query_metrics.query_text
# Extract table and column references
table_columns = self._extract_query_patterns(query)
for table, columns in table_columns.items():
# Recommend indexes for WHERE clause columns
if columns["where_columns"]:
recommendation = IndexRecommendation(
table_name=table,
column_names=columns["where_columns"][:3], # Limit to 3 columns
index_type="btree",
estimated_benefit=self._estimate_index_benefit(query_metrics, columns["where_columns"]),
space_overhead_mb=self._estimate_index_size(table, columns["where_columns"]),
maintenance_cost=self._estimate_maintenance_cost(table),
reasoning=f"Index on WHERE clause columns for query with {query_metrics.execution_time_ms:.0f}ms execution time",
priority="high" if query_metrics.execution_time_ms > 5000 else "medium"
)
recommendations.append(recommendation)
# Recommend indexes for JOIN columns
if columns["join_columns"]:
recommendation = IndexRecommendation(
table_name=table,
column_names=columns["join_columns"][:2],
index_type="btree",
estimated_benefit=self._estimate_index_benefit(query_metrics, columns["join_columns"]),
space_overhead_mb=self._estimate_index_size(table, columns["join_columns"]),
maintenance_cost=self._estimate_maintenance_cost(table),
reasoning=f"Index on JOIN columns for query with {query_metrics.join_operations} joins",
priority="medium"
)
recommendations.append(recommendation)
# Recommend indexes for ORDER BY columns
if columns["order_columns"]:
recommendation = IndexRecommendation(
table_name=table,
column_names=columns["order_columns"],
index_type="btree",
estimated_benefit=self._estimate_index_benefit(query_metrics, columns["order_columns"]),
space_overhead_mb=self._estimate_index_size(table, columns["order_columns"]),
maintenance_cost=self._estimate_maintenance_cost(table),
reasoning=f"Index on ORDER BY columns for query with {query_metrics.sort_operations} sorts",
priority="low"
)
recommendations.append(recommendation)
# Deduplicate and prioritize recommendations
unique_recommendations = self._deduplicate_recommendations(recommendations)
return sorted(unique_recommendations, key=lambda x: x.estimated_benefit, reverse=True)
def _extract_query_patterns(self, query: str) -> Dict[str, Dict[str, List[str]]]:
"""Extract table and column patterns from SQL query"""
import sqlparse
patterns = defaultdict(lambda: {
"where_columns": [],
"join_columns": [],
"order_columns": []
})
try:
parsed = sqlparse.parse(query)[0]
# This is a simplified pattern extraction
# In practice, you'd use a more sophisticated SQL parser
query_upper = query.upper()
# Extract WHERE clause columns (simplified)
where_match = re.search(r'WHERE\s+(.+?)(?:\s+ORDER\s+BY|\s+GROUP\s+BY|\s+HAVING|\s*$)', query_upper, re.DOTALL)
if where_match:
where_clause = where_match.group(1)
# Extract column references (simplified)
columns = re.findall(r'(\w+\.\w+|\w+)\s*[=<>!]', where_clause)
if columns:
# Assume main table for simplicity
table_name = "main_table"
patterns[table_name]["where_columns"] = [col.split('.')[-1] for col in columns[:3]]
# Extract JOIN columns (simplified)
join_matches = re.findall(r'JOIN\s+(\w+)\s+.*?ON\s+(\w+\.\w+|\w+)\s*=\s*(\w+\.\w+|\w+)', query_upper)
for match in join_matches:
table, col1, col2 = match
patterns[table]["join_columns"].append(col1.split('.')[-1])
# Extract ORDER BY columns (simplified)
order_match = re.search(r'ORDER\s+BY\s+(.+?)(?:\s+LIMIT|\s*$)', query_upper)
if order_match:
order_clause = order_match.group(1)
columns = re.findall(r'(\w+\.\w+|\w+)', order_clause)
if columns:
table_name = "main_table"
patterns[table_name]["order_columns"] = [col.split('.')[-1] for col in columns[:2]]
except Exception as e:
logging.warning(f"Error parsing query: {e}")
return dict(patterns)
def _estimate_index_benefit(self, query_metrics: QueryPerformanceMetrics, columns: List[str]) -> float:
"""Estimate potential benefit of proposed index"""
# Simplified benefit estimation based on query characteristics
benefit = 0.0
# Higher benefit for queries with poor efficiency
if query_metrics.efficiency_ratio < 0.1:
benefit += 0.8
# Higher benefit for queries with many sequential scans
if query_metrics.sequential_scans > 0:
benefit += 0.6
# Higher benefit for slow queries
if query_metrics.execution_time_ms > 5000:
benefit += 0.7
# Adjust based on number of columns (fewer columns = higher benefit)
column_penalty = len(columns) * 0.1
benefit = max(0.1, benefit - column_penalty)
return min(1.0, benefit)
def _estimate_index_size(self, table: str, columns: List[str]) -> float:
"""Estimate index size in MB"""
# Simplified size estimation
# In practice, you'd query table statistics
base_size = 10 # MB
column_factor = len(columns) * 5 # MB per column
return base_size + column_factor
def _estimate_maintenance_cost(self, table: str) -> float:
"""Estimate index maintenance cost"""
# Simplified maintenance cost estimation
# Based on table size and update frequency
return 0.1 # Low maintenance cost assumption
def _deduplicate_recommendations(self, recommendations: List[IndexRecommendation]) -> List[IndexRecommendation]:
"""Remove duplicate index recommendations"""
seen = set()
unique_recommendations = []
for rec in recommendations:
key = (rec.table_name, tuple(sorted(rec.column_names)))
if key not in seen:
seen.add(key)
unique_recommendations.append(rec)
return unique_recommendations
def generate_index_creation_sql(self, recommendation: IndexRecommendation) -> str:
"""Generate SQL for creating recommended index"""
index_name = f"idx_{recommendation.table_name}_{'_'.join(recommendation.column_names)}"
columns_str = ', '.join(recommendation.column_names)
if self.db_type == "postgresql":
if recommendation.index_type == "btree":
return f"CREATE INDEX CONCURRENTLY {index_name} ON {recommendation.table_name} ({columns_str});"
elif recommendation.index_type == "gin":
return f"CREATE INDEX CONCURRENTLY {index_name} ON {recommendation.table_name} USING GIN ({columns_str});"
elif recommendation.index_type == "gist":
return f"CREATE INDEX CONCURRENTLY {index_name} ON {recommendation.table_name} USING GIST ({columns_str});"
elif self.db_type == "mysql":
return f"CREATE INDEX {index_name} ON {recommendation.table_name} ({columns_str});"
return f"CREATE INDEX {index_name} ON {recommendation.table_name} ({columns_str});"
class PartitioningOptimizer:
"""Advanced table partitioning optimization"""
def __init__(self, performance_analyzer: DatabasePerformanceAnalyzer):
self.analyzer = performance_analyzer
self.db_type = performance_analyzer.db_type
def analyze_partitioning_opportunities(self, table_name: str) -> Dict[str, Any]:
"""Analyze table for partitioning opportunities"""
with self.analyzer.engine.connect() as conn:
# Get table statistics
if self.db_type == "postgresql":
stats_query = f"""
SELECT
n_tup_ins + n_tup_upd + n_tup_del as total_activity,
n_tup_ins,
n_tup_upd,
n_tup_del,
seq_scan,
seq_tup_read,
idx_scan,
idx_tup_fetch,
pg_size_pretty(pg_total_relation_size('{table_name}')) as table_size,
pg_total_relation_size('{table_name}') as table_size_bytes
FROM pg_stat_user_tables
WHERE relname = '{table_name}'
"""
column_stats_query = f"""
SELECT
column_name,
data_type,
is_nullable
FROM information_schema.columns
WHERE table_name = '{table_name}'
AND table_schema = 'public'
ORDER BY ordinal_position
"""
elif self.db_type == "mysql":
stats_query = f"""
SELECT
table_rows,
data_length,
index_length,
(data_length + index_length) as total_size
FROM information_schema.tables
WHERE table_name = '{table_name}'
AND table_schema = DATABASE()
"""
column_stats_query = f"""
SELECT
column_name,
data_type,
is_nullable
FROM information_schema.columns
WHERE table_name = '{table_name}'
AND table_schema = DATABASE()
ORDER BY ordinal_position
"""
table_stats = conn.execute(text(stats_query)).fetchone()
columns = conn.execute(text(column_stats_query)).fetchall()
return self._evaluate_partitioning_strategy(table_name, table_stats, columns)
def _evaluate_partitioning_strategy(self, table_name: str,
table_stats: Any,
columns: List) -> Dict[str, Any]:
"""Evaluate optimal partitioning strategy"""
analysis = {
"table_name": table_name,
"should_partition": False,
"recommended_strategy": None,
"partition_column": None,
"estimated_benefit": 0.0,
"reasoning": []
}
if self.db_type == "postgresql":
table_size_bytes = table_stats.table_size_bytes if table_stats else 0
# Check if table is large enough to benefit from partitioning
if table_size_bytes > 100 * 1024 * 1024 * 1024: # > 100GB
analysis["should_partition"] = True
analysis["reasoning"].append("Table size exceeds 100GB threshold")
# Look for time-based columns for range partitioning
time_columns = [
col.column_name for col in columns
if any(time_type in col.data_type.lower()
for time_type in ['timestamp', 'date', 'time'])
]
if time_columns:
analysis["recommended_strategy"] = "range_partitioning"
analysis["partition_column"] = time_columns[0]
analysis["estimated_benefit"] = 0.7
analysis["reasoning"].append(f"Time-based column '{time_columns[0]}' suitable for range partitioning")
# Check for high-cardinality categorical columns
else:
categorical_columns = [
col.column_name for col in columns
if col.data_type.lower() in ['varchar', 'text', 'char']
]
if categorical_columns:
analysis["recommended_strategy"] = "hash_partitioning"
analysis["partition_column"] = categorical_columns[0]
analysis["estimated_benefit"] = 0.5
analysis["reasoning"].append(f"Categorical column '{categorical_columns[0]}' suitable for hash partitioning")
return analysis
def generate_partitioning_sql(self, table_name: str,
strategy: str,
partition_column: str,
partition_count: int = 12) -> List[str]:
"""Generate SQL for table partitioning"""
sql_statements = []
if self.db_type == "postgresql":
if strategy == "range_partitioning":
# Create parent table
sql_statements.append(f"""
CREATE TABLE {table_name}_partitioned (LIKE {table_name} INCLUDING ALL)
PARTITION BY RANGE ({partition_column});
""")
# Create monthly partitions for the last year
for i in range(12):
partition_name = f"{table_name}_y2024_m{i+1:02d}"
start_date = f"2024-{i+1:02d}-01"
end_date = f"2024-{i+2:02d}-01" if i < 11 else "2025-01-01"
sql_statements.append(f"""
CREATE TABLE {partition_name} PARTITION OF {table_name}_partitioned
FOR VALUES FROM ('{start_date}') TO ('{end_date}');
""")
elif strategy == "hash_partitioning":
# Create parent table
sql_statements.append(f"""
CREATE TABLE {table_name}_partitioned (LIKE {table_name} INCLUDING ALL)
PARTITION BY HASH ({partition_column});
""")
# Create hash partitions
for i in range(partition_count):
partition_name = f"{table_name}_hash_{i}"
sql_statements.append(f"""
CREATE TABLE {partition_name} PARTITION OF {table_name}_partitioned
FOR VALUES WITH (modulus {partition_count}, remainder {i});
""")
elif self.db_type == "mysql":
if strategy == "range_partitioning":
# MySQL range partitioning by date
partitions = []
for i in range(12):
month = i + 1
partition_value = f"TO_DAYS('2024-{month:02d}-01')"
partitions.append(f"PARTITION p{month:02d} VALUES LESS THAN ({partition_value})")
sql_statements.append(f"""
ALTER TABLE {table_name}
PARTITION BY RANGE (TO_DAYS({partition_column})) (
{', '.join(partitions)}
);
""")
elif strategy == "hash_partitioning":
sql_statements.append(f"""
ALTER TABLE {table_name}
PARTITION BY HASH({partition_column})
PARTITIONS {partition_count};
""")
return sql_statements
class MaterializedViewOptimizer:
"""Optimize analytical queries with materialized views"""
def __init__(self, performance_analyzer: DatabasePerformanceAnalyzer):
self.analyzer = performance_analyzer
self.db_type = performance_analyzer.db_type
def identify_materialized_view_opportunities(self,
query_history: List[QueryPerformanceMetrics]) -> List[Dict[str, Any]]:
"""Identify opportunities for materialized views"""
opportunities = []
# Group similar queries
query_groups = self._group_similar_queries(query_history)
for group_key, queries in query_groups.items():
if len(queries) >= 3: # At least 3 similar queries
avg_execution_time = np.mean([q.execution_time_ms for q in queries])
if avg_execution_time > 5000: # Slow queries
opportunity = {
"query_pattern": group_key,
"frequency": len(queries),
"avg_execution_time": avg_execution_time,
"total_time_saved": avg_execution_time * len(queries) * 0.8, # 80% improvement
"estimated_benefit": "high" if avg_execution_time > 10000 else "medium",
"sample_query": queries[0].query_text
}
opportunities.append(opportunity)
return sorted(opportunities, key=lambda x: x["total_time_saved"], reverse=True)
def _group_similar_queries(self, query_history: List[QueryPerformanceMetrics]) -> Dict[str, List[QueryPerformanceMetrics]]:
"""Group similar queries for materialized view analysis"""
groups = defaultdict(list)
for query_metrics in query_history:
# Normalize query for grouping
pattern = self._extract_query_pattern(query_metrics.query_text)
groups[pattern].append(query_metrics)
return dict(groups)
def _extract_query_pattern(self, query: str) -> str:
"""Extract query pattern for grouping"""
# Normalize query by removing literals and parameters
pattern = re.sub(r"'[^']*'", "'?'", query) # Replace string literals
pattern = re.sub(r'\b\d+\b', '?', pattern) # Replace numbers
pattern = re.sub(r'\s+', ' ', pattern) # Normalize whitespace
return pattern.strip().lower()
def generate_materialized_view_sql(self, opportunity: Dict[str, Any],
view_name: str) -> str:
"""Generate SQL for creating materialized view"""
base_query = opportunity["sample_query"]
if self.db_type == "postgresql":
return f"""
CREATE MATERIALIZED VIEW {view_name} AS
{base_query}
WITH DATA;
CREATE UNIQUE INDEX ON {view_name} (/* add appropriate columns */);
"""
elif self.db_type == "mysql":
# MySQL doesn't have materialized views, but we can simulate with tables
return f"""
CREATE TABLE {view_name} AS
{base_query};
/* Add appropriate indexes */
ALTER TABLE {view_name} ADD INDEX (/* add appropriate columns */);
"""
return f"-- Materialized view not supported for {self.db_type}"
def generate_refresh_strategy(self, view_name: str,
refresh_frequency: str = "hourly") -> List[str]:
"""Generate refresh strategy for materialized view"""
strategies = []
if self.db_type == "postgresql":
if refresh_frequency == "realtime":
# Use triggers for real-time refresh
strategies.append(f"""
CREATE OR REPLACE FUNCTION refresh_{view_name}()
RETURNS TRIGGER AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY {view_name};
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
""")
else:
# Scheduled refresh
strategies.append(f"""
-- Create cron job or use pg_cron extension
SELECT cron.schedule('refresh-{view_name}', '0 * * * *',
'REFRESH MATERIALIZED VIEW CONCURRENTLY {view_name};');
""")
return strategies
class QueryOptimizer:
"""Advanced query optimization and rewriting"""
def __init__(self, performance_analyzer: DatabasePerformanceAnalyzer):
self.analyzer = performance_analyzer
self.db_type = performance_analyzer.db_type
def optimize_query(self, query: str) -> Dict[str, Any]:
"""Analyze and optimize a SQL query"""
# Analyze current performance
current_metrics = self.analyzer.analyze_query_performance(query)
# Generate optimization suggestions
optimizations = self._generate_query_optimizations(query, current_metrics)
# Apply automatic optimizations where safe
optimized_query = self._apply_safe_optimizations(query, optimizations)
return {
"original_query": query,
"optimized_query": optimized_query,
"original_metrics": current_metrics,
"optimizations_applied": optimizations,
"estimated_improvement": self._estimate_improvement(optimizations)
}
def _generate_query_optimizations(self, query: str,
metrics: QueryPerformanceMetrics) -> List[Dict[str, Any]]:
"""Generate specific optimization recommendations"""
optimizations = []
query_upper = query.upper()
# Check for SELECT * usage
if "SELECT *" in query_upper:
optimizations.append({
"type": "select_optimization",
"description": "Replace SELECT * with specific columns",
"impact": "medium",
"auto_applicable": False
})
# Check for missing WHERE clause in large table scans
if "WHERE" not in query_upper and metrics.rows_examined > 10000:
optimizations.append({
"type": "filtering_optimization",
"description": "Add WHERE clause to reduce rows examined",
"impact": "high",
"auto_applicable": False
})
# Check for inefficient JOINs
if "JOIN" in query_upper and metrics.efficiency_ratio < 0.1:
optimizations.append({
"type": "join_optimization",
"description": "Review JOIN conditions and consider index usage",
"impact": "high",
"auto_applicable": False
})
# Check for unnecessary ORDER BY
if "ORDER BY" in query_upper and metrics.sort_operations > 0:
optimizations.append({
"type": "sorting_optimization",
"description": "Consider if ORDER BY is necessary or can use index",
"impact": "medium",
"auto_applicable": False
})
# Check for subqueries that can be converted to JOINs
if "EXISTS" in query_upper or "IN (" in query_upper:
optimizations.append({
"type": "subquery_optimization",
"description": "Consider converting EXISTS/IN subqueries to JOINs",
"impact": "medium",
"auto_applicable": True
})
# Check for DISTINCT usage
if "DISTINCT" in query_upper:
optimizations.append({
"type": "distinct_optimization",
"description": "Review if DISTINCT is necessary or can be avoided with proper JOINs",
"impact": "low",
"auto_applicable": False
})
return optimizations
def _apply_safe_optimizations(self, query: str,
optimizations: List[Dict[str, Any]]) -> str:
"""Apply safe automatic optimizations"""
optimized_query = query
for opt in optimizations:
if opt.get("auto_applicable", False):
if opt["type"] == "subquery_optimization":
# This is a complex transformation that would require
# sophisticated SQL parsing and rewriting
pass
return optimized_query
def _estimate_improvement(self, optimizations: List[Dict[str, Any]]) -> float:
"""Estimate performance improvement percentage"""
total_improvement = 0.0
for opt in optimizations:
if opt["impact"] == "high":
total_improvement += 0.4
elif opt["impact"] == "medium":
total_improvement += 0.2
elif opt["impact"] == "low":
total_improvement += 0.1
return min(0.8, total_improvement) # Cap at 80% improvement
def generate_query_variants(self, query: str) -> List[str]:
"""Generate alternative query variants for testing"""
variants = []
# Add LIMIT for testing
if "LIMIT" not in query.upper():
variants.append(f"{query.rstrip(';')} LIMIT 1000;")
# Add query hints for different databases
if self.db_type == "postgresql":
# Add PostgreSQL-specific hints
variants.append(f"/*+ SeqScan(table_name) */ {query}")
variants.append(f"/*+ IndexScan(table_name) */ {query}")
elif self.db_type == "mysql":
# Add MySQL-specific hints
variants.append(f"SELECT /*+ USE_INDEX(table_name, index_name) */ {query[6:]}")
return variants
Conclusion
Advanced analytics database performance tuning requires a comprehensive approach that combines query optimization, intelligent indexing, strategic partitioning, and continuous monitoring. The frameworks and techniques presented in this guide provide a foundation for building high-performance analytics systems that can handle enterprise-scale workloads efficiently.
Key takeaways for successful database performance optimization include:
- Comprehensive Analysis: Use detailed execution plan analysis to understand query performance characteristics and identify optimization opportunities
- Strategic Indexing: Implement intelligent indexing strategies based on actual query patterns rather than assumptions
- Partitioning Strategy: Apply table partitioning for large datasets to improve query performance and maintenance operations
- Materialized Views: Use materialized views strategically for frequently accessed aggregated data
- Continuous Monitoring: Implement ongoing performance monitoring to detect and address performance degradation proactively
By following these advanced optimization strategies and implementing the monitoring frameworks shown in this guide, organizations can build analytics database systems that deliver consistent high performance while scaling efficiently with growing data volumes and query complexity.