AI Model Serving: TensorFlow Serving vs Seldon vs KServe - Complete Production Comparison
Choosing the right model serving platform is crucial for deploying AI models at scale in production environments. This comprehensive guide compares TensorFlow Serving, Seldon Core, and KServe across multiple dimensions including performance, features, scalability, and operational complexity. We’ll explore real-world scenarios with practical implementations, benchmarks, and deployment strategies for each platform.
AI Model Serving: TensorFlow Serving vs Seldon vs KServe
Section 1: Model Serving Architecture Comparison
TensorFlow Serving Architecture
TensorFlow Serving is designed specifically for TensorFlow models with high-performance inference capabilities and production-ready features.
# tensorflow-serving-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: tensorflow-serving
namespace: model-serving
labels:
app: tensorflow-serving
version: stable
spec:
replicas: 3
selector:
matchLabels:
app: tensorflow-serving
template:
metadata:
labels:
app: tensorflow-serving
version: stable
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8501"
prometheus.io/path: "/monitoring/prometheus/metrics"
spec:
containers:
- name: tensorflow-serving
image: tensorflow/serving:2.14.0
ports:
- containerPort: 8500
name: grpc
- containerPort: 8501
name: rest
args:
- "--model_config_file=/models/models.config"
- "--model_config_file_poll_wait_seconds=60"
- "--allow_version_labels_for_unavailable_models=true"
- "--enable_batching=true"
- "--batching_parameters_file=/models/batching_config.txt"
- "--monitoring_config_file=/models/monitoring_config.txt"
- "--rest_api_port=8501"
- "--rest_api_num_threads=32"
- "--rest_api_timeout_in_ms=30000"
- "--enable_model_warmup=true"
env:
- name: MODEL_NAME
value: "production_model"
- name: TF_CPP_MIN_LOG_LEVEL
value: "1"
- name: TF_ENABLE_ONEDNN_OPTS
value: "1"
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "8"
memory: "16Gi"
volumeMounts:
- name: models
mountPath: /models
readOnly: true
- name: config
mountPath: /models/models.config
subPath: models.config
- name: batching-config
mountPath: /models/batching_config.txt
subPath: batching_config.txt
- name: monitoring-config
mountPath: /models/monitoring_config.txt
subPath: monitoring_config.txt
livenessProbe:
httpGet:
path: /v1/models/production_model
port: 8501
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /v1/models/production_model
port: 8501
initialDelaySeconds: 15
periodSeconds: 5
volumes:
- name: models
persistentVolumeClaim:
claimName: model-storage-pvc
- name: config
configMap:
name: tensorflow-serving-config
- name: batching-config
configMap:
name: batching-config
- name: monitoring-config
configMap:
name: monitoring-config
---
apiVersion: v1
kind: ConfigMap
metadata:
name: tensorflow-serving-config
namespace: model-serving
data:
models.config: |
model_config_list {
config {
name: 'production_model'
base_path: '/models/production_model'
model_platform: 'tensorflow'
model_version_policy {
specific {
versions: 1
versions: 2
}
}
version_labels {
key: 'stable'
value: 1
}
version_labels {
key: 'canary'
value: 2
}
logging_config {
log_requests: true
log_responses: true
sampling_config {
sampling_rate: 0.1
}
}
}
config {
name: 'preprocessing_model'
base_path: '/models/preprocessing_model'
model_platform: 'tensorflow'
model_version_policy {
latest {
num_versions: 1
}
}
}
config {
name: 'ensemble_model'
base_path: '/models/ensemble_model'
model_platform: 'tensorflow'
model_version_policy {
latest {
num_versions: 2
}
}
}
}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: batching-config
namespace: model-serving
data:
batching_config.txt: |
max_batch_size { value: 32 }
batch_timeout_micros { value: 5000 }
max_enqueued_batches { value: 1000 }
num_batch_threads { value: 8 }
allowed_batch_sizes: 1
allowed_batch_sizes: 2
allowed_batch_sizes: 4
allowed_batch_sizes: 8
allowed_batch_sizes: 16
allowed_batch_sizes: 32
---
apiVersion: v1
kind: ConfigMap
metadata:
name: monitoring-config
namespace: model-serving
data:
monitoring_config.txt: |
prometheus_config {
enable: true
path: "/monitoring/prometheus/metrics"
}
---
apiVersion: v1
kind: Service
metadata:
name: tensorflow-serving-service
namespace: model-serving
labels:
app: tensorflow-serving
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8501"
spec:
selector:
app: tensorflow-serving
ports:
- name: grpc
port: 8500
targetPort: 8500
- name: rest
port: 8501
targetPort: 8501
type: ClusterIP
Seldon Core Architecture
Seldon Core provides a more flexible, language-agnostic approach with advanced features like multi-armed bandits and complex inference graphs.
# seldon-deployment.yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: production-model-seldon
namespace: model-serving
spec:
name: production-model
protocol: tensorflow
transport: rest
replicas: 3
annotations:
seldon.io/ambassador-config: |
---
apiVersion: ambassador/v1
kind: Mapping
name: production-model-mapping
prefix: /seldon/model-serving/production-model/
service: production-model-seldon-production-model:8000
timeout_ms: 30000
predictors:
- name: default
replicas: 3
traffic: 90
annotations:
predictor_version: v1
graph:
name: classifier
implementation: TENSORFLOW_SERVER
modelUri: gs://my-bucket/models/production-model/1
envSecretRefName: gcs-credentials
parameters:
- name: signature_name
value: serving_default
- name: model_name
value: production_model
children: []
componentSpecs:
- spec:
containers:
- name: classifier
image: tensorflow/serving:2.14.0
resources:
requests:
cpu: "2"
memory: "4Gi"
nvidia.com/gpu: "1"
limits:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "1"
env:
- name: SELDON_LOG_LEVEL
value: INFO
- name: TF_CPP_MIN_LOG_LEVEL
value: "1"
volumeMounts:
- name: model-storage
mountPath: /mnt/models
readOnly: true
volumes:
- name: model-storage
persistentVolumeClaim:
claimName: model-storage-pvc
terminationGracePeriodSeconds: 60
- name: canary
replicas: 1
traffic: 10
annotations:
predictor_version: v2
graph:
name: classifier-v2
implementation: TENSORFLOW_SERVER
modelUri: gs://my-bucket/models/production-model/2
envSecretRefName: gcs-credentials
parameters:
- name: signature_name
value: serving_default
- name: model_name
value: production_model_v2
children: []
componentSpecs:
- spec:
containers:
- name: classifier-v2
image: tensorflow/serving:2.14.0
resources:
requests:
cpu: "2"
memory: "4Gi"
nvidia.com/gpu: "1"
limits:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "1"
env:
- name: SELDON_LOG_LEVEL
value: INFO
- name: TF_CPP_MIN_LOG_LEVEL
value: "1"
---
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: ensemble-model-seldon
namespace: model-serving
spec:
name: ensemble-model
protocol: tensorflow
transport: rest
annotations:
seldon.io/engine-separate-pod: "true"
predictors:
- name: ensemble
annotations:
predictor_version: ensemble-v1
graph:
name: ensemble-combiner
implementation: RANDOM_ABTEST
parameters:
- name: ratioA
value: "0.7"
- name: ratioB
value: "0.3"
children:
- name: model-a
implementation: TENSORFLOW_SERVER
modelUri: gs://my-bucket/models/model-a/1
envSecretRefName: gcs-credentials
children: []
- name: model-b
implementation: TENSORFLOW_SERVER
modelUri: gs://my-bucket/models/model-b/1
envSecretRefName: gcs-credentials
children: []
componentSpecs:
- spec:
containers:
- name: model-a
image: tensorflow/serving:2.14.0
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
- name: model-b
image: tensorflow/serving:2.14.0
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
- name: ensemble-combiner
image: seldonio/seldon-core-s2i-python3:1.18.0
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1"
memory: "2Gi"
---
# Advanced Seldon Analytics Configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: seldon-analytics-config
namespace: model-serving
data:
values.yaml: |
analytics:
enabled: true
persistence:
enabled: true
storageClass: fast-ssd
size: 100Gi
prometheus:
enabled: true
seldon_http_requests_total: true
seldon_api_executor_client_requests_seconds: true
seldon_api_executor_server_requests_seconds: true
grafana:
enabled: true
adminPassword: admin123
elasticsearch:
enabled: true
image:
tag: 7.17.0
persistence:
enabled: true
size: 50Gi
KServe Architecture
KServe (formerly KFServing) provides Kubernetes-native model serving with advanced features like auto-scaling, traffic splitting, and model explainability.
# kserve-deployment.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: production-model-kserve
namespace: model-serving
annotations:
serving.kserve.io/autoscaling.knative.dev: "kpa.autoscaling.knative.dev"
serving.kserve.io/metric: "concurrency"
serving.kserve.io/target: "10"
serving.kserve.io/scaleToZero: "true"
serving.kserve.io/scaleToZeroPodRetentionPeriod: "300s"
spec:
predictor:
serviceAccountName: kserve-sa
minReplicas: 1
maxReplicas: 20
scaleTarget: 80
scaleMetric: concurrency
tensorflow:
storageUri: "gs://my-bucket/models/production-model"
runtimeVersion: "2.14.0"
resources:
requests:
cpu: "2"
memory: "4Gi"
nvidia.com/gpu: "1"
limits:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "1"
env:
- name: TF_CPP_MIN_LOG_LEVEL
value: "1"
- name: TF_ENABLE_ONEDNN_OPTS
value: "1"
ports:
- containerPort: 8080
name: h2c
protocol: TCP
args:
- --model_name=production_model
- --port=8080
- --rest_api_port=8080
- --model_config_file=/mnt/models/models.config
- --enable_batching=true
- --batching_parameters_file=/mnt/models/batching_config.txt
- --enable_model_warmup=true
volumeMounts:
- name: model-config
mountPath: /mnt/models
readOnly: true
volumes:
- name: model-config
configMap:
name: tensorflow-serving-config
transformer:
containers:
- name: kserve-container
image: kserve/transformer:v0.11.0
env:
- name: STORAGE_URI
value: "gs://my-bucket/transformers/preprocessing"
- name: TRANSFORMER_TYPE
value: "sklearn"
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1"
memory: "2Gi"
explainer:
alibi:
type: AnchorTabular
storageUri: "gs://my-bucket/explainers/anchor-tabular"
resources:
requests:
cpu: "500m"
memory: "2Gi"
limits:
cpu: "1"
memory: "4Gi"
---
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: multi-model-kserve
namespace: model-serving
annotations:
serving.kserve.io/autoscaling.knative.dev: "kpa.autoscaling.knative.dev"
serving.kserve.io/targetUtilizationPercentage: "70"
spec:
predictor:
model:
modelFormat:
name: sklearn
storageUri: "gs://my-bucket/models/multi-model"
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "4"
memory: "8Gi"
env:
- name: STORAGE_URI
value: "gs://my-bucket/models"
runtime: kserve-sklearnserver
---
# Traffic splitting configuration
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: canary-deployment-kserve
namespace: model-serving
spec:
predictor:
canaryTrafficPercent: 20
tensorflow:
storageUri: "gs://my-bucket/models/production-model/v2"
runtimeVersion: "2.14.0"
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
canary:
tensorflow:
storageUri: "gs://my-bucket/models/production-model/v1"
runtimeVersion: "2.14.0"
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: kserve-sa
namespace: model-serving
annotations:
iam.gke.io/gcp-service-account: kserve-workload-identity@my-project.iam.gserviceaccount.com
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: kserve-role
rules:
- apiGroups: [""]
resources: ["pods", "services", "endpoints", "events"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["serving.kserve.io"]
resources: ["inferenceservices"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["networking.istio.io"]
resources: ["virtualservices", "destinationrules"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: kserve-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: kserve-role
subjects:
- kind: ServiceAccount
name: kserve-sa
namespace: model-serving
Section 2: Performance Benchmarks and Analysis
Comprehensive Benchmarking Framework
# model_serving_benchmark.py
import asyncio
import aiohttp
import time
import statistics
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor, as_completed
import argparse
import logging
@dataclass
class BenchmarkConfig:
"""Configuration for benchmarking tests."""
endpoint_url: str
model_name: str
platform: str # tensorflow, seldon, kserve
concurrent_requests: int = 10
total_requests: int = 1000
warmup_requests: int = 50
timeout_seconds: int = 30
payload_size: str = "small" # small, medium, large
test_duration_minutes: Optional[int] = None
@dataclass
class BenchmarkResult:
"""Results from a single benchmark run."""
platform: str
endpoint_url: str
concurrent_requests: int
total_requests: int
successful_requests: int
failed_requests: int
total_time_seconds: float
avg_latency_ms: float
p50_latency_ms: float
p95_latency_ms: float
p99_latency_ms: float
min_latency_ms: float
max_latency_ms: float
throughput_rps: float
error_rate: float
cpu_usage_avg: float
memory_usage_avg: float
gpu_usage_avg: float
class ModelServingBenchmark:
"""Comprehensive benchmarking suite for model serving platforms."""
def __init__(self):
self.logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Test payloads of different sizes
self.payloads = {
"small": self._generate_payload(10),
"medium": self._generate_payload(100),
"large": self._generate_payload(1000)
}
def _generate_payload(self, size: int) -> Dict:
"""Generate test payload of specified size."""
return {
"instances": [
{
"input_data": np.random.randn(size).tolist(),
"features": {
f"feature_{i}": np.random.randn() for i in range(min(size // 10, 50))
}
}
]
}
async def _make_request(self,
session: aiohttp.ClientSession,
url: str,
payload: Dict,
platform: str) -> Tuple[float, bool, Optional[str]]:
"""Make a single inference request."""
start_time = time.time()
try:
# Adjust URL based on platform
if platform == "tensorflow":
endpoint = f"{url}/v1/models/{payload.get('model_name', 'production_model')}:predict"
elif platform == "seldon":
endpoint = f"{url}/api/v1.0/predictions"
elif platform == "kserve":
endpoint = f"{url}/v1/models/{payload.get('model_name', 'production-model-kserve')}:predict"
else:
endpoint = url
async with session.post(endpoint, json=payload, timeout=30) as response:
await response.json()
latency = (time.time() - start_time) * 1000 # Convert to ms
return latency, response.status == 200, None
except asyncio.TimeoutError:
latency = (time.time() - start_time) * 1000
return latency, False, "timeout"
except Exception as e:
latency = (time.time() - start_time) * 1000
return latency, False, str(e)
async def _run_load_test(self, config: BenchmarkConfig) -> List[Tuple[float, bool, Optional[str]]]:
"""Run load test with specified configuration."""
payload = self.payloads[config.payload_size].copy()
payload["model_name"] = config.model_name
connector = aiohttp.TCPConnector(limit=config.concurrent_requests * 2)
timeout = aiohttp.ClientTimeout(total=config.timeout_seconds)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
# Warmup requests
self.logger.info(f"Running {config.warmup_requests} warmup requests...")
warmup_tasks = [
self._make_request(session, config.endpoint_url, payload, config.platform)
for _ in range(config.warmup_requests)
]
await asyncio.gather(*warmup_tasks, return_exceptions=True)
# Main test
self.logger.info(f"Running {config.total_requests} test requests with {config.concurrent_requests} concurrent connections...")
# Create semaphore to limit concurrent requests
semaphore = asyncio.Semaphore(config.concurrent_requests)
async def bounded_request():
async with semaphore:
return await self._make_request(session, config.endpoint_url, payload, config.platform)
start_time = time.time()
if config.test_duration_minutes:
# Time-based test
results = []
end_time = start_time + (config.test_duration_minutes * 60)
while time.time() < end_time:
batch_size = min(config.concurrent_requests,
int((end_time - time.time()) * config.concurrent_requests / 10))
if batch_size <= 0:
break
batch_tasks = [bounded_request() for _ in range(batch_size)]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
results.extend([r for r in batch_results if not isinstance(r, Exception)])
await asyncio.sleep(0.1) # Small delay between batches
else:
# Request count-based test
tasks = [bounded_request() for _ in range(config.total_requests)]
results = await asyncio.gather(*tasks, return_exceptions=True)
results = [r for r in results if not isinstance(r, Exception)]
return results
def _calculate_metrics(self,
results: List[Tuple[float, bool, Optional[str]]],
total_time: float,
config: BenchmarkConfig) -> BenchmarkResult:
"""Calculate performance metrics from test results."""
latencies = [r[0] for r in results if r[1]] # Only successful requests
successful = sum(1 for r in results if r[1])
failed = len(results) - successful
if not latencies:
return BenchmarkResult(
platform=config.platform,
endpoint_url=config.endpoint_url,
concurrent_requests=config.concurrent_requests,
total_requests=len(results),
successful_requests=0,
failed_requests=len(results),
total_time_seconds=total_time,
avg_latency_ms=0,
p50_latency_ms=0,
p95_latency_ms=0,
p99_latency_ms=0,
min_latency_ms=0,
max_latency_ms=0,
throughput_rps=0,
error_rate=100.0,
cpu_usage_avg=0,
memory_usage_avg=0,
gpu_usage_avg=0
)
return BenchmarkResult(
platform=config.platform,
endpoint_url=config.endpoint_url,
concurrent_requests=config.concurrent_requests,
total_requests=len(results),
successful_requests=successful,
failed_requests=failed,
total_time_seconds=total_time,
avg_latency_ms=statistics.mean(latencies),
p50_latency_ms=np.percentile(latencies, 50),
p95_latency_ms=np.percentile(latencies, 95),
p99_latency_ms=np.percentile(latencies, 99),
min_latency_ms=min(latencies),
max_latency_ms=max(latencies),
throughput_rps=successful / total_time,
error_rate=(failed / len(results)) * 100,
cpu_usage_avg=0, # TODO: Integrate with monitoring
memory_usage_avg=0,
gpu_usage_avg=0
)
async def run_benchmark(self, config: BenchmarkConfig) -> BenchmarkResult:
"""Run complete benchmark with specified configuration."""
self.logger.info(f"Starting benchmark for {config.platform} at {config.endpoint_url}")
start_time = time.time()
results = await self._run_load_test(config)
total_time = time.time() - start_time
benchmark_result = self._calculate_metrics(results, total_time, config)
self.logger.info(f"Benchmark completed:")
self.logger.info(f" Throughput: {benchmark_result.throughput_rps:.2f} RPS")
self.logger.info(f" Average latency: {benchmark_result.avg_latency_ms:.2f} ms")
self.logger.info(f" P95 latency: {benchmark_result.p95_latency_ms:.2f} ms")
self.logger.info(f" Error rate: {benchmark_result.error_rate:.2f}%")
return benchmark_result
def run_comparative_benchmark(self, configs: List[BenchmarkConfig]) -> pd.DataFrame:
"""Run benchmarks across multiple platforms and compare results."""
results = []
async def run_all_benchmarks():
tasks = [self.run_benchmark(config) for config in configs]
return await asyncio.gather(*tasks)
benchmark_results = asyncio.run(run_all_benchmarks())
# Convert to DataFrame for analysis
df_data = []
for result in benchmark_results:
df_data.append(asdict(result))
df = pd.DataFrame(df_data)
return df
def visualize_results(self, df: pd.DataFrame, output_path: str = "benchmark_results.png"):
"""Create visualization of benchmark results."""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('Model Serving Platform Comparison', fontsize=16)
# Throughput comparison
axes[0, 0].bar(df['platform'], df['throughput_rps'])
axes[0, 0].set_title('Throughput (RPS)')
axes[0, 0].set_ylabel('Requests per Second')
# Latency comparison
latency_metrics = ['avg_latency_ms', 'p50_latency_ms', 'p95_latency_ms', 'p99_latency_ms']
df_latency = df[['platform'] + latency_metrics].set_index('platform')
df_latency.plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Latency Comparison')
axes[0, 1].set_ylabel('Latency (ms)')
axes[0, 1].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
# Error rate comparison
axes[0, 2].bar(df['platform'], df['error_rate'])
axes[0, 2].set_title('Error Rate')
axes[0, 2].set_ylabel('Error Rate (%)')
# Latency vs Throughput scatter
for platform in df['platform'].unique():
platform_data = df[df['platform'] == platform]
axes[1, 0].scatter(platform_data['throughput_rps'],
platform_data['avg_latency_ms'],
label=platform, s=100)
axes[1, 0].set_xlabel('Throughput (RPS)')
axes[1, 0].set_ylabel('Average Latency (ms)')
axes[1, 0].set_title('Latency vs Throughput')
axes[1, 0].legend()
# Concurrency impact
if len(df['concurrent_requests'].unique()) > 1:
for platform in df['platform'].unique():
platform_data = df[df['platform'] == platform]
axes[1, 1].plot(platform_data['concurrent_requests'],
platform_data['throughput_rps'],
marker='o', label=platform)
axes[1, 1].set_xlabel('Concurrent Requests')
axes[1, 1].set_ylabel('Throughput (RPS)')
axes[1, 1].set_title('Concurrency Impact')
axes[1, 1].legend()
# Latency distribution
latency_data = []
platforms = []
for _, row in df.iterrows():
latency_data.extend([row['min_latency_ms'], row['p50_latency_ms'],
row['p95_latency_ms'], row['max_latency_ms']])
platforms.extend([row['platform']] * 4)
latency_df = pd.DataFrame({
'latency': latency_data,
'platform': platforms
})
sns.boxplot(data=latency_df, x='platform', y='latency', ax=axes[1, 2])
axes[1, 2].set_title('Latency Distribution')
axes[1, 2].set_ylabel('Latency (ms)')
plt.tight_layout()
plt.savefig(output_path, dpi=300, bbox_inches='tight')
plt.show()
def generate_report(self, df: pd.DataFrame, output_path: str = "benchmark_report.html"):
"""Generate detailed HTML report of benchmark results."""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>Model Serving Platform Benchmark Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.header { background-color: #f0f0f0; padding: 20px; border-radius: 5px; }
.metric { display: inline-block; margin: 10px; padding: 15px;
background-color: #e8f4fd; border-radius: 5px; min-width: 150px; }
.platform-section { margin: 30px 0; padding: 20px;
border: 1px solid #ddd; border-radius: 5px; }
table { border-collapse: collapse; width: 100%; margin: 20px 0; }
th, td { border: 1px solid #ddd; padding: 12px; text-align: left; }
th { background-color: #f2f2f2; }
.best { background-color: #d4edda; }
.worst { background-color: #f8d7da; }
</style>
</head>
<body>
<div class="header">
<h1>Model Serving Platform Benchmark Report</h1>
<p>Generated on: {timestamp}</p>
<p>Platforms tested: {platforms}</p>
</div>
<h2>Executive Summary</h2>
<div class="metric">
<h3>Best Throughput</h3>
<p>{best_throughput_platform}: {best_throughput:.2f} RPS</p>
</div>
<div class="metric">
<h3>Best Latency</h3>
<p>{best_latency_platform}: {best_latency:.2f} ms</p>
</div>
<div class="metric">
<h3>Most Reliable</h3>
<p>{most_reliable_platform}: {best_error_rate:.2f}% error rate</p>
</div>
<h2>Detailed Results</h2>
{detailed_table}
<h2>Platform Analysis</h2>
{platform_analysis}
<h2>Recommendations</h2>
{recommendations}
</body>
</html>
"""
# Calculate summary metrics
best_throughput_idx = df['throughput_rps'].idxmax()
best_latency_idx = df['avg_latency_ms'].idxmin()
most_reliable_idx = df['error_rate'].idxmin()
# Generate detailed table
df_display = df.copy()
numeric_columns = ['throughput_rps', 'avg_latency_ms', 'p95_latency_ms', 'error_rate']
for col in numeric_columns:
df_display[col] = df_display[col].round(2)
detailed_table = df_display.to_html(classes='table', escape=False, index=False)
# Generate platform analysis
platform_analysis = ""
for platform in df['platform'].unique():
platform_data = df[df['platform'] == platform].iloc[0]
platform_analysis += f"""
<div class="platform-section">
<h3>{platform}</h3>
<p><strong>Throughput:</strong> {platform_data['throughput_rps']:.2f} RPS</p>
<p><strong>Average Latency:</strong> {platform_data['avg_latency_ms']:.2f} ms</p>
<p><strong>P95 Latency:</strong> {platform_data['p95_latency_ms']:.2f} ms</p>
<p><strong>Error Rate:</strong> {platform_data['error_rate']:.2f}%</p>
</div>
"""
# Generate recommendations
recommendations = """
<ul>
<li><strong>For highest throughput:</strong> Use {best_throughput_platform}</li>
<li><strong>For lowest latency:</strong> Use {best_latency_platform}</li>
<li><strong>For highest reliability:</strong> Use {most_reliable_platform}</li>
<li><strong>For production workloads:</strong> Consider balancing throughput, latency, and operational complexity</li>
</ul>
""".format(
best_throughput_platform=df.iloc[best_throughput_idx]['platform'],
best_latency_platform=df.iloc[best_latency_idx]['platform'],
most_reliable_platform=df.iloc[most_reliable_idx]['platform']
)
# Fill template
html_content = html_template.format(
timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
platforms=", ".join(df['platform'].unique()),
best_throughput_platform=df.iloc[best_throughput_idx]['platform'],
best_throughput=df.iloc[best_throughput_idx]['throughput_rps'],
best_latency_platform=df.iloc[best_latency_idx]['platform'],
best_latency=df.iloc[best_latency_idx]['avg_latency_ms'],
most_reliable_platform=df.iloc[most_reliable_idx]['platform'],
best_error_rate=df.iloc[most_reliable_idx]['error_rate'],
detailed_table=detailed_table,
platform_analysis=platform_analysis,
recommendations=recommendations
)
with open(output_path, 'w') as f:
f.write(html_content)
print(f"Report generated: {output_path}")
# Example usage and CLI
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Model Serving Platform Benchmark')
parser.add_argument('--tensorflow-url', help='TensorFlow Serving endpoint URL')
parser.add_argument('--seldon-url', help='Seldon Core endpoint URL')
parser.add_argument('--kserve-url', help='KServe endpoint URL')
parser.add_argument('--model-name', default='production_model', help='Model name')
parser.add_argument('--concurrent-requests', type=int, default=10, help='Concurrent requests')
parser.add_argument('--total-requests', type=int, default=1000, help='Total requests')
parser.add_argument('--payload-size', choices=['small', 'medium', 'large'],
default='medium', help='Payload size')
parser.add_argument('--output-dir', default='.', help='Output directory for results')
args = parser.parse_args()
benchmark = ModelServingBenchmark()
configs = []
if args.tensorflow_url:
configs.append(BenchmarkConfig(
endpoint_url=args.tensorflow_url,
model_name=args.model_name,
platform="tensorflow",
concurrent_requests=args.concurrent_requests,
total_requests=args.total_requests,
payload_size=args.payload_size
))
if args.seldon_url:
configs.append(BenchmarkConfig(
endpoint_url=args.seldon_url,
model_name=args.model_name,
platform="seldon",
concurrent_requests=args.concurrent_requests,
total_requests=args.total_requests,
payload_size=args.payload_size
))
if args.kserve_url:
configs.append(BenchmarkConfig(
endpoint_url=args.kserve_url,
model_name=args.model_name,
platform="kserve",
concurrent_requests=args.concurrent_requests,
total_requests=args.total_requests,
payload_size=args.payload_size
))
if not configs:
print("Please provide at least one endpoint URL")
exit(1)
# Run benchmarks
results_df = benchmark.run_comparative_benchmark(configs)
# Generate outputs
results_df.to_csv(f"{args.output_dir}/benchmark_results.csv", index=False)
benchmark.visualize_results(results_df, f"{args.output_dir}/benchmark_visualization.png")
benchmark.generate_report(results_df, f"{args.output_dir}/benchmark_report.html")
print("Benchmark completed! Check output files for detailed results.")
Section 3: A/B Testing and Canary Deployments
Advanced A/B Testing Framework
# ab_testing_framework.py
import asyncio
import aiohttp
import json
import time
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from kubernetes import client, config
import logging
from scipy import stats
@dataclass
class ABTestConfig:
"""Configuration for A/B testing experiments."""
experiment_name: str
control_endpoint: str
treatment_endpoint: str
traffic_split: float # Percentage to treatment (0.0 to 1.0)
success_metric: str # 'accuracy', 'latency', 'custom'
minimum_sample_size: int = 1000
significance_level: float = 0.05
test_duration_hours: int = 24
ramping_strategy: str = "linear" # linear, exponential, step
@dataclass
class ABTestResult:
"""Results from A/B test."""
experiment_name: str
control_samples: int
treatment_samples: int
control_mean: float
treatment_mean: float
control_std: float
treatment_std: float
p_value: float
confidence_interval: Tuple[float, float]
is_significant: bool
effect_size: float
statistical_power: float
recommendation: str
class ABTestingFramework:
"""Advanced A/B testing framework for model serving platforms."""
def __init__(self):
try:
config.load_incluster_config()
except:
config.load_kube_config()
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.custom_api = client.CustomObjectsApi()
self.logger = logging.getLogger(__name__)
async def setup_traffic_split(self,
test_config: ABTestConfig,
platform: str) -> bool:
"""Setup traffic splitting for A/B test."""
if platform == "seldon":
return await self._setup_seldon_traffic_split(test_config)
elif platform == "kserve":
return await self._setup_kserve_traffic_split(test_config)
elif platform == "istio":
return await self._setup_istio_traffic_split(test_config)
else:
self.logger.error(f"Unsupported platform for traffic splitting: {platform}")
return False
async def _setup_seldon_traffic_split(self, test_config: ABTestConfig) -> bool:
"""Setup Seldon traffic split configuration."""
try:
# Update Seldon deployment with traffic split
seldon_deployment = {
"apiVersion": "machinelearning.seldon.io/v1",
"kind": "SeldonDeployment",
"metadata": {
"name": f"ab-test-{test_config.experiment_name}",
"namespace": "model-serving"
},
"spec": {
"name": test_config.experiment_name,
"predictors": [
{
"name": "control",
"traffic": int((1 - test_config.traffic_split) * 100),
"graph": {
"name": "control-model",
"endpoint": {"type": "REST"},
"type": "MODEL",
"children": []
}
},
{
"name": "treatment",
"traffic": int(test_config.traffic_split * 100),
"graph": {
"name": "treatment-model",
"endpoint": {"type": "REST"},
"type": "MODEL",
"children": []
}
}
]
}
}
# Apply configuration
self.custom_api.create_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace="model-serving",
plural="seldondeployments",
body=seldon_deployment
)
self.logger.info(f"Seldon traffic split configured: {(1-test_config.traffic_split)*100:.1f}% control, {test_config.traffic_split*100:.1f}% treatment")
return True
except Exception as e:
self.logger.error(f"Failed to setup Seldon traffic split: {e}")
return False
async def _setup_kserve_traffic_split(self, test_config: ABTestConfig) -> bool:
"""Setup KServe traffic split configuration."""
try:
# Update InferenceService with canary traffic
inference_service = {
"apiVersion": "serving.kserve.io/v1beta1",
"kind": "InferenceService",
"metadata": {
"name": f"ab-test-{test_config.experiment_name}",
"namespace": "model-serving"
},
"spec": {
"predictor": {
"canaryTrafficPercent": int(test_config.traffic_split * 100),
"tensorflow": {
"storageUri": test_config.treatment_endpoint
}
},
"canary": {
"tensorflow": {
"storageUri": test_config.control_endpoint
}
}
}
}
# Apply configuration
self.custom_api.create_namespaced_custom_object(
group="serving.kserve.io",
version="v1beta1",
namespace="model-serving",
plural="inferenceservices",
body=inference_service
)
self.logger.info(f"KServe traffic split configured: {(1-test_config.traffic_split)*100:.1f}% control, {test_config.traffic_split*100:.1f}% treatment")
return True
except Exception as e:
self.logger.error(f"Failed to setup KServe traffic split: {e}")
return False
async def _setup_istio_traffic_split(self, test_config: ABTestConfig) -> bool:
"""Setup Istio-based traffic split configuration."""
try:
# Create VirtualService for traffic splitting
virtual_service = {
"apiVersion": "networking.istio.io/v1beta1",
"kind": "VirtualService",
"metadata": {
"name": f"ab-test-{test_config.experiment_name}",
"namespace": "model-serving"
},
"spec": {
"hosts": [f"ab-test-{test_config.experiment_name}"],
"http": [
{
"match": [{"headers": {"x-experiment": {"exact": test_config.experiment_name}}}],
"route": [
{
"destination": {"host": "control-service"},
"weight": int((1 - test_config.traffic_split) * 100)
},
{
"destination": {"host": "treatment-service"},
"weight": int(test_config.traffic_split * 100)
}
]
}
]
}
}
# Apply configuration
self.custom_api.create_namespaced_custom_object(
group="networking.istio.io",
version="v1beta1",
namespace="model-serving",
plural="virtualservices",
body=virtual_service
)
self.logger.info(f"Istio traffic split configured: {(1-test_config.traffic_split)*100:.1f}% control, {test_config.traffic_split*100:.1f}% treatment")
return True
except Exception as e:
self.logger.error(f"Failed to setup Istio traffic split: {e}")
return False
async def collect_metrics(self,
test_config: ABTestConfig,
duration_hours: float) -> Tuple[List[float], List[float]]:
"""Collect metrics from both control and treatment groups."""
control_metrics = []
treatment_metrics = []
end_time = time.time() + (duration_hours * 3600)
async with aiohttp.ClientSession() as session:
while time.time() < end_time:
# Collect control metrics
try:
async with session.get(f"{test_config.control_endpoint}/metrics") as response:
if response.status == 200:
data = await response.json()
metric_value = self._extract_metric(data, test_config.success_metric)
if metric_value is not None:
control_metrics.append(metric_value)
except Exception as e:
self.logger.warning(f"Failed to collect control metrics: {e}")
# Collect treatment metrics
try:
async with session.get(f"{test_config.treatment_endpoint}/metrics") as response:
if response.status == 200:
data = await response.json()
metric_value = self._extract_metric(data, test_config.success_metric)
if metric_value is not None:
treatment_metrics.append(metric_value)
except Exception as e:
self.logger.warning(f"Failed to collect treatment metrics: {e}")
# Wait before next collection
await asyncio.sleep(60) # Collect every minute
# Early stopping if minimum sample size reached
if (len(control_metrics) >= test_config.minimum_sample_size and
len(treatment_metrics) >= test_config.minimum_sample_size):
# Perform interim analysis
interim_result = self._analyze_results(
control_metrics, treatment_metrics, test_config
)
if interim_result.is_significant:
self.logger.info("Early stopping: Significant result detected")
break
return control_metrics, treatment_metrics
def _extract_metric(self, data: Dict, metric_name: str) -> Optional[float]:
"""Extract specific metric from response data."""
if metric_name == "accuracy":
return data.get("accuracy", data.get("precision", None))
elif metric_name == "latency":
return data.get("response_time_ms", data.get("latency_ms", None))
elif metric_name == "throughput":
return data.get("requests_per_second", data.get("rps", None))
else:
return data.get(metric_name, None)
def _analyze_results(self,
control_data: List[float],
treatment_data: List[float],
test_config: ABTestConfig) -> ABTestResult:
"""Perform statistical analysis of A/B test results."""
if len(control_data) == 0 or len(treatment_data) == 0:
return ABTestResult(
experiment_name=test_config.experiment_name,
control_samples=len(control_data),
treatment_samples=len(treatment_data),
control_mean=0,
treatment_mean=0,
control_std=0,
treatment_std=0,
p_value=1.0,
confidence_interval=(0, 0),
is_significant=False,
effect_size=0,
statistical_power=0,
recommendation="Insufficient data"
)
# Basic statistics
control_mean = np.mean(control_data)
treatment_mean = np.mean(treatment_data)
control_std = np.std(control_data, ddof=1)
treatment_std = np.std(treatment_data, ddof=1)
# Perform t-test
t_statistic, p_value = stats.ttest_ind(treatment_data, control_data)
# Calculate effect size (Cohen's d)
pooled_std = np.sqrt(((len(control_data) - 1) * control_std**2 +
(len(treatment_data) - 1) * treatment_std**2) /
(len(control_data) + len(treatment_data) - 2))
effect_size = (treatment_mean - control_mean) / pooled_std if pooled_std > 0 else 0
# Calculate confidence interval
se_diff = pooled_std * np.sqrt(1/len(control_data) + 1/len(treatment_data))
df = len(control_data) + len(treatment_data) - 2
t_critical = stats.t.ppf(1 - test_config.significance_level/2, df)
diff = treatment_mean - control_mean
margin_error = t_critical * se_diff
confidence_interval = (diff - margin_error, diff + margin_error)
# Calculate statistical power
statistical_power = self._calculate_power(
effect_size, len(control_data), len(treatment_data), test_config.significance_level
)
# Determine significance
is_significant = p_value < test_config.significance_level
# Generate recommendation
recommendation = self._generate_recommendation(
is_significant, effect_size, statistical_power, p_value
)
return ABTestResult(
experiment_name=test_config.experiment_name,
control_samples=len(control_data),
treatment_samples=len(treatment_data),
control_mean=control_mean,
treatment_mean=treatment_mean,
control_std=control_std,
treatment_std=treatment_std,
p_value=p_value,
confidence_interval=confidence_interval,
is_significant=is_significant,
effect_size=effect_size,
statistical_power=statistical_power,
recommendation=recommendation
)
def _calculate_power(self, effect_size: float, n1: int, n2: int, alpha: float) -> float:
"""Calculate statistical power of the test."""
# Simplified power calculation
se = np.sqrt(1/n1 + 1/n2)
critical_value = stats.norm.ppf(1 - alpha/2)
beta = stats.norm.cdf(critical_value - effect_size/se)
power = 1 - beta
return max(0, min(1, power))
def _generate_recommendation(self,
is_significant: bool,
effect_size: float,
power: float,
p_value: float) -> str:
"""Generate recommendation based on test results."""
if not is_significant:
if power < 0.8:
return "Inconclusive: Increase sample size for adequate statistical power"
else:
return "No significant difference detected with adequate power"
if abs(effect_size) < 0.2:
return "Statistically significant but small practical effect"
elif abs(effect_size) < 0.5:
return "Statistically significant with medium practical effect"
else:
return "Statistically significant with large practical effect"
async def run_ab_test(self, test_config: ABTestConfig, platform: str) -> ABTestResult:
"""Run complete A/B test experiment."""
self.logger.info(f"Starting A/B test: {test_config.experiment_name}")
# Setup traffic splitting
if not await self.setup_traffic_split(test_config, platform):
raise Exception("Failed to setup traffic splitting")
# Wait for traffic split to take effect
await asyncio.sleep(30)
# Collect metrics
control_data, treatment_data = await self.collect_metrics(
test_config, test_config.test_duration_hours
)
# Analyze results
result = self._analyze_results(control_data, treatment_data, test_config)
self.logger.info(f"A/B test completed: {result.recommendation}")
return result
def generate_ab_test_report(self, result: ABTestResult, output_path: str):
"""Generate detailed A/B test report."""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>A/B Test Report: {experiment_name}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; }}
.header {{ background-color: #f0f0f0; padding: 20px; border-radius: 5px; }}
.metric {{ display: inline-block; margin: 10px; padding: 15px;
background-color: #e8f4fd; border-radius: 5px; min-width: 150px; }}
.significant {{ background-color: #d4edda; }}
.not-significant {{ background-color: #f8d7da; }}
table {{ border-collapse: collapse; width: 100%; margin: 20px 0; }}
th, td {{ border: 1px solid #ddd; padding: 12px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
</style>
</head>
<body>
<div class="header">
<h1>A/B Test Report: {experiment_name}</h1>
<p>Test Status: <span class="{significance_class}">{significance_text}</span></p>
<p>Recommendation: {recommendation}</p>
</div>
<h2>Test Summary</h2>
<table>
<tr><th>Metric</th><th>Control</th><th>Treatment</th><th>Difference</th></tr>
<tr>
<td>Sample Size</td>
<td>{control_samples}</td>
<td>{treatment_samples}</td>
<td>-</td>
</tr>
<tr>
<td>Mean</td>
<td>{control_mean:.4f}</td>
<td>{treatment_mean:.4f}</td>
<td>{mean_diff:.4f}</td>
</tr>
<tr>
<td>Standard Deviation</td>
<td>{control_std:.4f}</td>
<td>{treatment_std:.4f}</td>
<td>-</td>
</tr>
</table>
<h2>Statistical Analysis</h2>
<div class="metric">
<h3>P-Value</h3>
<p>{p_value:.6f}</p>
</div>
<div class="metric">
<h3>Effect Size</h3>
<p>{effect_size:.4f}</p>
</div>
<div class="metric">
<h3>Statistical Power</h3>
<p>{statistical_power:.4f}</p>
</div>
<div class="metric">
<h3>Confidence Interval</h3>
<p>[{ci_lower:.4f}, {ci_upper:.4f}]</p>
</div>
<h2>Interpretation</h2>
<p>{interpretation}</p>
</body>
</html>
"""
significance_class = "significant" if result.is_significant else "not-significant"
significance_text = "Statistically Significant" if result.is_significant else "Not Statistically Significant"
interpretation = f"""
The A/B test comparing the control and treatment groups shows:
- {'Statistically significant' if result.is_significant else 'No statistically significant'} difference (p-value: {result.p_value:.6f})
- Effect size: {abs(result.effect_size):.4f} ({'Small' if abs(result.effect_size) < 0.2 else 'Medium' if abs(result.effect_size) < 0.5 else 'Large'})
- Statistical power: {result.statistical_power:.4f} ({'Adequate' if result.statistical_power >= 0.8 else 'Insufficient'})
Based on these results: {result.recommendation}
"""
html_content = html_template.format(
experiment_name=result.experiment_name,
significance_class=significance_class,
significance_text=significance_text,
recommendation=result.recommendation,
control_samples=result.control_samples,
treatment_samples=result.treatment_samples,
control_mean=result.control_mean,
treatment_mean=result.treatment_mean,
mean_diff=result.treatment_mean - result.control_mean,
control_std=result.control_std,
treatment_std=result.treatment_std,
p_value=result.p_value,
effect_size=result.effect_size,
statistical_power=result.statistical_power,
ci_lower=result.confidence_interval[0],
ci_upper=result.confidence_interval[1],
interpretation=interpretation
)
with open(output_path, 'w') as f:
f.write(html_content)
print(f"A/B test report generated: {output_path}")
# Example usage
if __name__ == "__main__":
# Configure A/B test
test_config = ABTestConfig(
experiment_name="model-v2-latency-test",
control_endpoint="http://tensorflow-serving:8501",
treatment_endpoint="http://kserve-predictor:8080",
traffic_split=0.1, # 10% to treatment
success_metric="latency",
minimum_sample_size=1000,
test_duration_hours=24
)
# Run A/B test
framework = ABTestingFramework()
async def main():
result = await framework.run_ab_test(test_config, "kserve")
framework.generate_ab_test_report(result, "ab_test_report.html")
return result
result = asyncio.run(main())
print(f"A/B test completed: {result.recommendation}")
Section 4: Auto-scaling Strategies and Implementation
Advanced Auto-scaling Configuration
# advanced-autoscaling.yaml
# TensorFlow Serving HPA with custom metrics
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: tensorflow-serving-hpa
namespace: model-serving
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: tensorflow-serving
minReplicas: 2
maxReplicas: 50
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: requests_per_second
target:
type: AverageValue
averageValue: "100"
- type: Object
object:
metric:
name: average_response_time_ms
target:
type: Value
value: "50"
describedObject:
apiVersion: v1
kind: Service
name: tensorflow-serving-service
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
- type: Pods
value: 2
periodSeconds: 60
selectPolicy: Min
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 50
periodSeconds: 30
- type: Pods
value: 5
periodSeconds: 30
selectPolicy: Max
---
# Seldon Core HPA with predictive scaling
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: seldon-model-hpa
namespace: model-serving
annotations:
autoscaling.alpha.kubernetes.io/behavior: |
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
- type: Pods
value: 4
periodSeconds: 15
selectPolicy: Max
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 20
periodSeconds: 60
spec:
scaleTargetRef:
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
name: production-model-seldon
minReplicas: 1
maxReplicas: 30
metrics:
- type: External
external:
metric:
name: seldon_api_executor_client_requests_seconds
selector:
matchLabels:
deployment_name: production-model-seldon
target:
type: AverageValue
averageValue: "0.1" # Target 100ms average response time
- type: External
external:
metric:
name: predicted_load
selector:
matchLabels:
model: production-model
target:
type: Value
value: "1000" # Predicted requests per minute
---
# KServe with Knative autoscaling
apiVersion: v1
kind: ConfigMap
metadata:
name: config-autoscaler
namespace: knative-serving
data:
# Knative autoscaling configuration
container-concurrency-target-default: "10"
container-concurrency-target-percentage: "0.7"
enable-scale-to-zero: "true"
scale-to-zero-grace-period: "30s"
scale-to-zero-pod-retention-period: "0s"
stable-window: "60s"
panic-window-percentage: "10.0"
panic-threshold-percentage: "200.0"
max-scale-up-rate: "1000.0"
max-scale-down-rate: "2.0"
target-burst-capacity: "211"
requests-per-second-target-default: "200"
---
# KEDA ScaledObject for advanced metrics
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: tensorflow-serving-keda
namespace: model-serving
spec:
scaleTargetRef:
name: tensorflow-serving
pollingInterval: 15
cooldownPeriod: 300
idleReplicaCount: 1
minReplicaCount: 2
maxReplicaCount: 100
triggers:
- type: prometheus
metadata:
serverAddress: http://prometheus.monitoring.svc.cluster.local:9090
metricName: tensorflow_serving_request_latency_p99
threshold: '100'
query: histogram_quantile(0.99, rate(tensorflow_serving_request_duration_seconds_bucket[5m])) * 1000
- type: prometheus
metadata:
serverAddress: http://prometheus.monitoring.svc.cluster.local:9090
metricName: tensorflow_serving_queue_depth
threshold: '10'
query: tensorflow_serving_request_queue_length
- type: kafka
metadata:
bootstrapServers: kafka.kafka.svc.cluster.local:9092
consumerGroup: tensorflow-serving-consumer
topic: model-requests
lagThreshold: '100'
- type: cron
metadata:
timezone: America/New_York
start: "0 8 * * 1-5" # Scale up at 8 AM on weekdays
end: "0 20 * * 1-5" # Scale down at 8 PM on weekdays
desiredReplicas: "10"
---
# Custom metrics API configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: adapter-config
namespace: custom-metrics
data:
config.yaml: |
rules:
- seriesQuery: 'tensorflow_serving_request_duration_seconds_count{namespace!="",service!=""}'
seriesFilters: []
resources:
overrides:
namespace:
resource: namespace
service:
resource: service
name:
matches: "^tensorflow_serving_request_duration_seconds_count"
as: "requests_per_second"
metricsQuery: 'rate(<<.Series>>{<<.LabelMatchers>>}[2m])'
- seriesQuery: 'tensorflow_serving_request_duration_seconds{namespace!="",service!=""}'
seriesFilters: []
resources:
overrides:
namespace:
resource: namespace
service:
resource: service
name:
matches: "^tensorflow_serving_request_duration_seconds"
as: "average_response_time_ms"
metricsQuery: 'rate(tensorflow_serving_request_duration_seconds_sum{<<.LabelMatchers>>}[2m]) / rate(tensorflow_serving_request_duration_seconds_count{<<.LabelMatchers>>}[2m]) * 1000'
Predictive Auto-scaling Implementation
# predictive_autoscaling.py
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.preprocessing import StandardScaler
import joblib
import asyncio
import aiohttp
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
from kubernetes import client, config
import logging
import json
class PredictiveAutoscaler:
"""Predictive autoscaling for model serving workloads."""
def __init__(self, prometheus_url: str = "http://prometheus:9090"):
self.prometheus_url = prometheus_url
self.model = RandomForestRegressor(n_estimators=100, random_state=42)
self.scaler = StandardScaler()
self.is_trained = False
try:
config.load_incluster_config()
except:
config.load_kube_config()
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.autoscaling_v2 = client.AutoscalingV2Api()
self.logger = logging.getLogger(__name__)
async def collect_historical_data(self,
service_name: str,
namespace: str,
days_back: int = 30) -> pd.DataFrame:
"""Collect historical metrics for training."""
end_time = datetime.now()
start_time = end_time - timedelta(days=days_back)
queries = {
'requests_per_second': f'rate(http_requests_total{{service="{service_name}",namespace="{namespace}"}}[5m])',
'response_time_p95': f'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket{{service="{service_name}",namespace="{namespace}"}}[5m]))',
'cpu_usage': f'rate(container_cpu_usage_seconds_total{{pod=~"{service_name}.*",namespace="{namespace}"}}[5m])',
'memory_usage': f'container_memory_usage_bytes{{pod=~"{service_name}.*",namespace="{namespace}"}}',
'active_connections': f'http_connections_active{{service="{service_name}",namespace="{namespace}"}}',
'queue_depth': f'http_request_queue_length{{service="{service_name}",namespace="{namespace}"}}',
'replica_count': f'kube_deployment_status_replicas{{deployment="{service_name}",namespace="{namespace}"}}'
}
data = []
current_time = start_time
async with aiohttp.ClientSession() as session:
while current_time < end_time:
timestamp = int(current_time.timestamp())
row = {'timestamp': timestamp}
for metric_name, query in queries.items():
try:
url = f"{self.prometheus_url}/api/v1/query"
params = {
'query': query,
'time': timestamp
}
async with session.get(url, params=params) as response:
if response.status == 200:
result = await response.json()
if result['data']['result']:
value = float(result['data']['result'][0]['value'][1])
row[metric_name] = value
else:
row[metric_name] = 0
else:
row[metric_name] = 0
except Exception as e:
self.logger.warning(f"Failed to collect {metric_name}: {e}")
row[metric_name] = 0
# Add time-based features
dt = current_time
row['hour'] = dt.hour
row['day_of_week'] = dt.weekday()
row['day_of_month'] = dt.day
row['month'] = dt.month
row['is_weekend'] = 1 if dt.weekday() >= 5 else 0
row['is_business_hours'] = 1 if 9 <= dt.hour <= 17 and dt.weekday() < 5 else 0
data.append(row)
current_time += timedelta(minutes=5) # 5-minute intervals
df = pd.DataFrame(data)
# Add lag features
lag_features = ['requests_per_second', 'response_time_p95', 'cpu_usage']
for feature in lag_features:
if feature in df.columns:
for lag in [1, 2, 6, 12]: # 5min, 10min, 30min, 1hour lags
df[f'{feature}_lag_{lag}'] = df[feature].shift(lag)
# Add rolling statistics
window_sizes = [6, 12, 24] # 30min, 1hour, 2hour windows
for feature in lag_features:
if feature in df.columns:
for window in window_sizes:
df[f'{feature}_rolling_mean_{window}'] = df[feature].rolling(window).mean()
df[f'{feature}_rolling_std_{window}'] = df[feature].rolling(window).std()
# Remove rows with NaN values
df = df.dropna()
self.logger.info(f"Collected {len(df)} historical data points")
return df
def train_model(self, df: pd.DataFrame, target_column: str = 'replica_count'):
"""Train predictive model."""
# Prepare features
feature_columns = [col for col in df.columns if col not in ['timestamp', target_column]]
X = df[feature_columns]
y = df[target_column]
# Scale features
X_scaled = self.scaler.fit_transform(X)
# Train model
self.model.fit(X_scaled, y)
self.is_trained = True
# Calculate training metrics
y_pred = self.model.predict(X_scaled)
mae = mean_absolute_error(y, y_pred)
mse = mean_squared_error(y, y_pred)
rmse = np.sqrt(mse)
self.logger.info(f"Model trained - MAE: {mae:.2f}, RMSE: {rmse:.2f}")
# Feature importance
importance = pd.DataFrame({
'feature': feature_columns,
'importance': self.model.feature_importances_
}).sort_values('importance', ascending=False)
self.logger.info("Top 10 most important features:")
for _, row in importance.head(10).iterrows():
self.logger.info(f" {row['feature']}: {row['importance']:.4f}")
return {
'mae': mae,
'rmse': rmse,
'feature_importance': importance.to_dict('records')
}
async def predict_future_load(self,
service_name: str,
namespace: str,
hours_ahead: int = 2) -> List[Tuple[datetime, int]]:
"""Predict future resource requirements."""
if not self.is_trained:
raise ValueError("Model must be trained before making predictions")
# Get current metrics
current_data = await self._get_current_metrics(service_name, namespace)
predictions = []
current_time = datetime.now()
for i in range(hours_ahead * 12): # 5-minute intervals
future_time = current_time + timedelta(minutes=5 * (i + 1))
# Prepare features for prediction
features = current_data.copy()
features['hour'] = future_time.hour
features['day_of_week'] = future_time.weekday()
features['day_of_month'] = future_time.day
features['month'] = future_time.month
features['is_weekend'] = 1 if future_time.weekday() >= 5 else 0
features['is_business_hours'] = 1 if 9 <= future_time.hour <= 17 and future_time.weekday() < 5 else 0
# Convert to array and scale
feature_array = np.array(list(features.values())).reshape(1, -1)
feature_array_scaled = self.scaler.transform(feature_array)
# Make prediction
predicted_replicas = self.model.predict(feature_array_scaled)[0]
predicted_replicas = max(1, int(round(predicted_replicas)))
predictions.append((future_time, predicted_replicas))
return predictions
async def _get_current_metrics(self, service_name: str, namespace: str) -> Dict:
"""Get current metrics for prediction."""
queries = {
'requests_per_second': f'rate(http_requests_total{{service="{service_name}",namespace="{namespace}"}}[5m])',
'response_time_p95': f'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket{{service="{service_name}",namespace="{namespace}"}}[5m]))',
'cpu_usage': f'rate(container_cpu_usage_seconds_total{{pod=~"{service_name}.*",namespace="{namespace}"}}[5m])',
'memory_usage': f'container_memory_usage_bytes{{pod=~"{service_name}.*",namespace="{namespace}"}}',
'active_connections': f'http_connections_active{{service="{service_name}",namespace="{namespace}"}}',
'queue_depth': f'http_request_queue_length{{service="{service_name}",namespace="{namespace}"}}'
}
metrics = {}
async with aiohttp.ClientSession() as session:
for metric_name, query in queries.items():
try:
url = f"{self.prometheus_url}/api/v1/query"
params = {'query': query}
async with session.get(url, params=params) as response:
if response.status == 200:
result = await response.json()
if result['data']['result']:
value = float(result['data']['result'][0]['value'][1])
metrics[metric_name] = value
else:
metrics[metric_name] = 0
else:
metrics[metric_name] = 0
except Exception as e:
self.logger.warning(f"Failed to get current {metric_name}: {e}")
metrics[metric_name] = 0
return metrics
async def apply_scaling_decision(self,
service_name: str,
namespace: str,
target_replicas: int,
scaling_reason: str):
"""Apply scaling decision to deployment."""
try:
# Get current deployment
deployment = self.apps_v1.read_namespaced_deployment(
name=service_name,
namespace=namespace
)
current_replicas = deployment.spec.replicas
if current_replicas != target_replicas:
# Update deployment
deployment.spec.replicas = target_replicas
self.apps_v1.patch_namespaced_deployment(
name=service_name,
namespace=namespace,
body=deployment
)
self.logger.info(f"Scaled {service_name} from {current_replicas} to {target_replicas} replicas. Reason: {scaling_reason}")
# Record scaling event
await self._record_scaling_event(
service_name, namespace, current_replicas, target_replicas, scaling_reason
)
except Exception as e:
self.logger.error(f"Failed to apply scaling decision: {e}")
async def _record_scaling_event(self,
service_name: str,
namespace: str,
old_replicas: int,
new_replicas: int,
reason: str):
"""Record scaling event for audit and analysis."""
event = {
'timestamp': datetime.now().isoformat(),
'service': service_name,
'namespace': namespace,
'old_replicas': old_replicas,
'new_replicas': new_replicas,
'reason': reason,
'scaling_direction': 'up' if new_replicas > old_replicas else 'down',
'magnitude': abs(new_replicas - old_replicas)
}
# Store in ConfigMap for audit trail
try:
# Try to get existing audit log
try:
cm = self.v1.read_namespaced_config_map(
name="autoscaling-audit-log",
namespace=namespace
)
existing_events = json.loads(cm.data.get('events', '[]'))
except:
existing_events = []
existing_events.append(event)
# Keep only last 1000 events
if len(existing_events) > 1000:
existing_events = existing_events[-1000:]
# Update ConfigMap
cm_body = client.V1ConfigMap(
metadata=client.V1ObjectMeta(name="autoscaling-audit-log"),
data={'events': json.dumps(existing_events, indent=2)}
)
try:
self.v1.replace_namespaced_config_map(
name="autoscaling-audit-log",
namespace=namespace,
body=cm_body
)
except:
self.v1.create_namespaced_config_map(
namespace=namespace,
body=cm_body
)
except Exception as e:
self.logger.warning(f"Failed to record scaling event: {e}")
async def run_predictive_autoscaling(self,
service_name: str,
namespace: str,
check_interval_minutes: int = 5):
"""Run continuous predictive autoscaling."""
self.logger.info(f"Starting predictive autoscaling for {service_name}")
while True:
try:
# Get predictions
predictions = await self.predict_future_load(service_name, namespace)
# Find maximum predicted load in next hour
next_hour_predictions = [p[1] for p in predictions[:12]] # Next 12 intervals (1 hour)
max_predicted_replicas = max(next_hour_predictions)
# Get current replicas
deployment = self.apps_v1.read_namespaced_deployment(
name=service_name,
namespace=namespace
)
current_replicas = deployment.spec.replicas
# Determine if scaling is needed
if max_predicted_replicas > current_replicas * 1.2: # Scale up if 20% more needed
await self.apply_scaling_decision(
service_name, namespace, max_predicted_replicas,
f"Predictive scale-up: predicted load {max_predicted_replicas} replicas"
)
elif max_predicted_replicas < current_replicas * 0.7: # Scale down if 30% less needed
target_replicas = max(1, max_predicted_replicas)
await self.apply_scaling_decision(
service_name, namespace, target_replicas,
f"Predictive scale-down: predicted load {max_predicted_replicas} replicas"
)
await asyncio.sleep(check_interval_minutes * 60)
except Exception as e:
self.logger.error(f"Error in predictive autoscaling loop: {e}")
await asyncio.sleep(60) # Wait 1 minute before retrying
def save_model(self, filepath: str):
"""Save trained model and scaler."""
model_data = {
'model': self.model,
'scaler': self.scaler,
'is_trained': self.is_trained
}
joblib.dump(model_data, filepath)
self.logger.info(f"Model saved to {filepath}")
def load_model(self, filepath: str):
"""Load trained model and scaler."""
model_data = joblib.load(filepath)
self.model = model_data['model']
self.scaler = model_data['scaler']
self.is_trained = model_data['is_trained']
self.logger.info(f"Model loaded from {filepath}")
# Example usage
async def main():
autoscaler = PredictiveAutoscaler()
# Collect training data
df = await autoscaler.collect_historical_data("tensorflow-serving", "model-serving")
# Train model
metrics = autoscaler.train_model(df)
print(f"Training metrics: {metrics}")
# Save model
autoscaler.save_model("predictive_autoscaling_model.pkl")
# Run predictive autoscaling
await autoscaler.run_predictive_autoscaling("tensorflow-serving", "model-serving")
if __name__ == "__main__":
asyncio.run(main())
This comprehensive comparison provides practical implementations and benchmarks for TensorFlow Serving, Seldon Core, and KServe. The guide includes advanced features like A/B testing frameworks, predictive autoscaling, and performance benchmarking tools that enable organizations to make informed decisions about their model serving infrastructure based on specific requirements for throughput, latency, operational complexity, and feature requirements.