Distributed Computing with Dask and Ray for Data Science: Scalable Analytics and Machine Learning
Distributed Computing with Dask and Ray for Data Science: Scalable Analytics and Machine Learning
Modern data science workloads require distributed computing frameworks that can scale seamlessly from laptops to clusters while maintaining the familiar Python ecosystem. Dask and Ray represent two powerful approaches to distributed computing: Dask excels at scaling pandas and NumPy workflows, while Ray provides a general-purpose distributed computing framework with advanced machine learning capabilities.
This comprehensive guide explores advanced techniques for implementing scalable data science solutions using both Dask and Ray, covering distributed analytics, parallel machine learning, cluster orchestration, and production deployment strategies.
Understanding Dask and Ray Architectures
Dask: Flexible Parallel Computing
Dask provides familiar APIs for scaling pandas, NumPy, and scikit-learn workloads across multiple cores and machines while maintaining lazy evaluation and automatic optimization.
# Advanced Dask implementation for scalable data science
import dask
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
from dask.distributed import Client, as_completed, progress
from dask.delayed import delayed
from dask import compute
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional, Callable, Tuple
import logging
import time
from datetime import datetime, timedelta
import joblib
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
import asyncio
class DaskClusterManager:
"""Advanced Dask cluster management and optimization"""
def __init__(self, cluster_config: Dict[str, Any]):
self.cluster_config = cluster_config
self.client = None
self.cluster = None
self.performance_monitor = DaskPerformanceMonitor()
def start_cluster(self, cluster_type: str = "local") -> Client:
"""Start Dask cluster with optimal configuration"""
if cluster_type == "local":
# Local cluster with optimized settings
from dask.distributed import LocalCluster
self.cluster = LocalCluster(
n_workers=self.cluster_config.get("n_workers", 4),
threads_per_worker=self.cluster_config.get("threads_per_worker", 2),
memory_limit=self.cluster_config.get("memory_limit", "4GB"),
dashboard_address=self.cluster_config.get("dashboard_address", ":8787"),
silence_logs=logging.WARNING
)
elif cluster_type == "kubernetes":
# Kubernetes cluster
from dask_kubernetes import KubeCluster
self.cluster = KubeCluster(
pod_template=self._create_pod_template(),
namespace=self.cluster_config.get("namespace", "default"),
dashboard_address=self.cluster_config.get("dashboard_address", ":8787")
)
elif cluster_type == "yarn":
# YARN cluster
from dask_yarn import YarnCluster
self.cluster = YarnCluster(
environment=self.cluster_config.get("environment", "python://my_env.tar.gz"),
worker_memory=self.cluster_config.get("worker_memory", "4GB"),
worker_vcores=self.cluster_config.get("worker_vcores", 2),
n_workers=self.cluster_config.get("n_workers", 4)
)
else:
raise ValueError(f"Unsupported cluster type: {cluster_type}")
# Create client
self.client = Client(self.cluster)
# Configure cluster optimizations
self._configure_cluster_optimizations()
logging.info(f"Started Dask {cluster_type} cluster with {self.cluster_config.get('n_workers', 4)} workers")
return self.client
def _create_pod_template(self) -> Dict[str, Any]:
"""Create Kubernetes pod template for Dask workers"""
pod_template = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"labels": {
"app": "dask-worker",
"component": "worker"
}
},
"spec": {
"containers": [{
"name": "dask-worker",
"image": self.cluster_config.get("image", "daskdev/dask:latest"),
"resources": {
"requests": {
"cpu": self.cluster_config.get("cpu_request", "1"),
"memory": self.cluster_config.get("memory_request", "4Gi")
},
"limits": {
"cpu": self.cluster_config.get("cpu_limit", "2"),
"memory": self.cluster_config.get("memory_limit", "8Gi")
}
},
"env": [
{
"name": "EXTRA_PIP_PACKAGES",
"value": self.cluster_config.get("extra_packages", "")
}
]
}],
"restartPolicy": "Never"
}
}
return pod_template
def _configure_cluster_optimizations(self):
"""Configure cluster-level optimizations"""
# Configure worker settings
self.client.run(self._configure_worker_optimizations)
# Set up automatic scaling if supported
if hasattr(self.cluster, 'adapt'):
self.cluster.adapt(
minimum=self.cluster_config.get("min_workers", 1),
maximum=self.cluster_config.get("max_workers", 10),
target_duration=self.cluster_config.get("target_duration", "5s")
)
def _configure_worker_optimizations(self):
"""Configure individual worker optimizations"""
import os
import multiprocessing
# Set optimal thread counts
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["NUMBA_NUM_THREADS"] = "1"
# Configure memory settings
dask.config.set({"distributed.worker.memory.target": 0.8})
dask.config.set({"distributed.worker.memory.spill": 0.9})
dask.config.set({"distributed.worker.memory.pause": 0.95})
def scale_cluster(self, n_workers: int):
"""Dynamically scale cluster"""
if hasattr(self.cluster, 'scale'):
self.cluster.scale(n_workers)
logging.info(f"Scaled cluster to {n_workers} workers")
else:
logging.warning("Cluster does not support dynamic scaling")
def get_cluster_status(self) -> Dict[str, Any]:
"""Get comprehensive cluster status"""
if not self.client:
return {"status": "not_connected"}
status = {
"scheduler_info": self.client.scheduler_info(),
"worker_count": len(self.client.scheduler_info()["workers"]),
"total_cores": sum(w["nthreads"] for w in self.client.scheduler_info()["workers"].values()),
"total_memory": sum(w["memory_limit"] for w in self.client.scheduler_info()["workers"].values()),
"dashboard_link": self.client.dashboard_link
}
return status
def shutdown_cluster(self):
"""Gracefully shutdown cluster"""
if self.client:
self.client.close()
if self.cluster:
self.cluster.close()
logging.info("Shutdown Dask cluster")
class DaskDataProcessor:
"""Advanced data processing with Dask"""
def __init__(self, client: Client):
self.client = client
def load_large_dataset(self, data_sources: List[str],
file_format: str = "parquet") -> dd.DataFrame:
"""Efficiently load large datasets"""
if file_format == "parquet":
# Read multiple parquet files
df = dd.read_parquet(data_sources, engine="pyarrow")
elif file_format == "csv":
# Read multiple CSV files with optimal settings
df = dd.read_csv(
data_sources,
blocksize="64MB", # Optimal block size
dtype="object", # Let Dask infer types
parse_dates=True,
infer_datetime_format=True
)
elif file_format == "json":
# Read JSON files
df = dd.read_json(data_sources, lines=True)
else:
raise ValueError(f"Unsupported file format: {file_format}")
# Optimize data types
df = self._optimize_dtypes(df)
# Repartition for optimal performance
df = df.repartition(partition_size="100MB")
logging.info(f"Loaded dataset with {df.npartitions} partitions")
return df
def _optimize_dtypes(self, df: dd.DataFrame) -> dd.DataFrame:
"""Optimize data types for memory efficiency"""
# Sample small portion to infer optimal types
sample = df.head(10000)
optimized_dtypes = {}
for column in sample.columns:
if sample[column].dtype == "object":
# Try to convert to numeric
try:
pd.to_numeric(sample[column])
optimized_dtypes[column] = "float32"
except:
# Check if it can be categorical
if sample[column].nunique() / len(sample) < 0.5:
optimized_dtypes[column] = "category"
elif sample[column].dtype == "int64":
# Downcast to smaller integer types
max_val = sample[column].max()
min_val = sample[column].min()
if min_val >= 0:
if max_val < 255:
optimized_dtypes[column] = "uint8"
elif max_val < 65535:
optimized_dtypes[column] = "uint16"
elif max_val < 4294967295:
optimized_dtypes[column] = "uint32"
else:
if -128 <= min_val and max_val <= 127:
optimized_dtypes[column] = "int8"
elif -32768 <= min_val and max_val <= 32767:
optimized_dtypes[column] = "int16"
elif -2147483648 <= min_val and max_val <= 2147483647:
optimized_dtypes[column] = "int32"
elif sample[column].dtype == "float64":
# Downcast to float32 if precision allows
optimized_dtypes[column] = "float32"
# Apply optimized dtypes
if optimized_dtypes:
df = df.astype(optimized_dtypes)
return df
def distributed_feature_engineering(self, df: dd.DataFrame,
feature_config: Dict[str, Any]) -> dd.DataFrame:
"""Perform distributed feature engineering"""
# Time-based features
if "time_features" in feature_config:
time_col = feature_config["time_features"]["column"]
if time_col in df.columns:
df[f"{time_col}_hour"] = df[time_col].dt.hour
df[f"{time_col}_day_of_week"] = df[time_col].dt.dayofweek
df[f"{time_col}_month"] = df[time_col].dt.month
df[f"{time_col}_quarter"] = df[time_col].dt.quarter
# Aggregation features
if "aggregation_features" in feature_config:
agg_config = feature_config["aggregation_features"]
group_by_cols = agg_config["group_by"]
for agg_col, agg_funcs in agg_config["aggregations"].items():
if agg_col in df.columns:
for func in agg_funcs:
feature_name = f"{agg_col}_{func}_by_{'_'.join(group_by_cols)}"
if func == "mean":
agg_df = df.groupby(group_by_cols)[agg_col].mean().rename(feature_name)
elif func == "std":
agg_df = df.groupby(group_by_cols)[agg_col].std().rename(feature_name)
elif func == "count":
agg_df = df.groupby(group_by_cols)[agg_col].count().rename(feature_name)
else:
continue
# Merge back to original dataframe
df = df.merge(agg_df.to_frame(), left_on=group_by_cols, right_index=True, how="left")
# Text features (if applicable)
if "text_features" in feature_config:
text_config = feature_config["text_features"]
for text_col in text_config.get("columns", []):
if text_col in df.columns:
# String length
df[f"{text_col}_length"] = df[text_col].str.len()
# Word count
df[f"{text_col}_word_count"] = df[text_col].str.split().str.len()
# Character counts
for char in text_config.get("character_counts", []):
df[f"{text_col}_{char}_count"] = df[text_col].str.count(char)
return df
def distributed_data_quality_check(self, df: dd.DataFrame) -> Dict[str, Any]:
"""Perform distributed data quality assessment"""
quality_report = {
"timestamp": datetime.now().isoformat(),
"total_rows": len(df),
"total_columns": len(df.columns),
"column_quality": {}
}
# Compute quality metrics for each column
for column in df.columns:
col_series = df[column]
# Basic statistics
null_count = col_series.isnull().sum()
total_count = len(col_series)
column_quality = {
"null_count": null_count.compute(),
"null_percentage": (null_count / total_count * 100).compute(),
"data_type": str(col_series.dtype)
}
# Type-specific quality checks
if pd.api.types.is_numeric_dtype(col_series.dtype):
column_quality.update({
"min": col_series.min().compute(),
"max": col_series.max().compute(),
"mean": col_series.mean().compute(),
"std": col_series.std().compute(),
"zeros_count": (col_series == 0).sum().compute(),
"negative_count": (col_series < 0).sum().compute()
})
# Outlier detection using IQR
q1 = col_series.quantile(0.25).compute()
q3 = col_series.quantile(0.75).compute()
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
outliers = ((col_series < lower_bound) | (col_series > upper_bound)).sum().compute()
column_quality["outliers_count"] = outliers
column_quality["outliers_percentage"] = outliers / total_count * 100
elif pd.api.types.is_string_dtype(col_series.dtype):
column_quality.update({
"unique_count": col_series.nunique().compute(),
"empty_strings": (col_series == "").sum().compute(),
"whitespace_only": col_series.str.strip().eq("").sum().compute()
})
# Average length
lengths = col_series.str.len()
column_quality["avg_length"] = lengths.mean().compute()
column_quality["max_length"] = lengths.max().compute()
quality_report["column_quality"][column] = column_quality
return quality_report
def distributed_sampling(self, df: dd.DataFrame,
sample_config: Dict[str, Any]) -> dd.DataFrame:
"""Perform distributed sampling with various strategies"""
strategy = sample_config.get("strategy", "random")
if strategy == "random":
# Random sampling
fraction = sample_config.get("fraction", 0.1)
return df.sample(frac=fraction, random_state=42)
elif strategy == "stratified":
# Stratified sampling
strata_column = sample_config["strata_column"]
fraction = sample_config.get("fraction", 0.1)
return df.groupby(strata_column).apply(
lambda x: x.sample(frac=fraction, random_state=42),
meta=df._meta
)
elif strategy == "systematic":
# Systematic sampling
k = sample_config.get("k", 10) # Every kth record
return df.iloc[::k]
else:
raise ValueError(f"Unsupported sampling strategy: {strategy}")
class DaskMLPipeline:
"""Machine learning pipeline with Dask"""
def __init__(self, client: Client):
self.client = client
def distributed_preprocessing(self, df: dd.DataFrame,
preprocessing_config: Dict[str, Any]) -> dd.DataFrame:
"""Distributed data preprocessing for ML"""
# Handle missing values
if "missing_values" in preprocessing_config:
missing_config = preprocessing_config["missing_values"]
for column, strategy in missing_config.items():
if column in df.columns:
if strategy == "mean":
fill_value = df[column].mean()
elif strategy == "median":
fill_value = df[column].quantile(0.5)
elif strategy == "mode":
fill_value = df[column].mode().iloc[0]
elif isinstance(strategy, (int, float, str)):
fill_value = strategy
else:
continue
df[column] = df[column].fillna(fill_value)
# Scaling/normalization
if "scaling" in preprocessing_config:
scaling_config = preprocessing_config["scaling"]
for column, method in scaling_config.items():
if column in df.columns and pd.api.types.is_numeric_dtype(df[column].dtype):
if method == "standard":
# Standard scaling (z-score)
mean_val = df[column].mean()
std_val = df[column].std()
df[column] = (df[column] - mean_val) / std_val
elif method == "minmax":
# Min-max scaling
min_val = df[column].min()
max_val = df[column].max()
df[column] = (df[column] - min_val) / (max_val - min_val)
elif method == "robust":
# Robust scaling using median and IQR
median_val = df[column].quantile(0.5)
q1 = df[column].quantile(0.25)
q3 = df[column].quantile(0.75)
iqr = q3 - q1
df[column] = (df[column] - median_val) / iqr
# Encoding categorical variables
if "encoding" in preprocessing_config:
encoding_config = preprocessing_config["encoding"]
for column, method in encoding_config.items():
if column in df.columns:
if method == "onehot":
# One-hot encoding
dummies = dd.get_dummies(df[column], prefix=column)
df = dd.concat([df, dummies], axis=1)
df = df.drop(columns=[column])
elif method == "label":
# Label encoding (simple integer encoding)
unique_values = df[column].unique()
mapping = {val: idx for idx, val in enumerate(unique_values)}
df[column] = df[column].map(mapping, meta=('x', 'int64'))
return df
def distributed_train_test_split(self, df: dd.DataFrame,
target_column: str,
test_size: float = 0.2,
random_state: int = 42) -> Tuple[dd.DataFrame, dd.DataFrame, dd.Series, dd.Series]:
"""Distributed train-test split"""
# Sample for stratification if needed
if df[target_column].dtype in ["object", "category"]:
# Stratified split for classification
def stratified_split(partition):
if len(partition) > 0:
return train_test_split(
partition.drop(columns=[target_column]),
partition[target_column],
test_size=test_size,
random_state=random_state,
stratify=partition[target_column] if len(partition[target_column].unique()) > 1 else None
)
else:
return partition.drop(columns=[target_column]), pd.DataFrame(), partition[target_column], pd.Series()
# Apply to each partition
splits = df.map_partitions(
lambda x: stratified_split(x),
meta=(df._meta.drop(columns=[target_column]), df._meta.drop(columns=[target_column]),
df[target_column]._meta, df[target_column]._meta)
)
# This is a simplified version - in practice, you'd need to handle the tuple returns properly
# For simplicity, use random split
mask = da.random.random(len(df), chunks=df.npartitions) < (1 - test_size)
train_df = df[mask]
test_df = df[~mask]
X_train = train_df.drop(columns=[target_column])
y_train = train_df[target_column]
X_test = test_df.drop(columns=[target_column])
y_test = test_df[target_column]
return X_train, X_test, y_train, y_test
def distributed_model_training(self, X_train: dd.DataFrame, y_train: dd.Series,
model_config: Dict[str, Any]) -> Any:
"""Train models using distributed computing"""
model_type = model_config.get("type", "random_forest")
if model_type == "random_forest":
# Distributed Random Forest using joblib backend
from sklearn.ensemble import RandomForestClassifier
with joblib.parallel_backend('dask'):
model = RandomForestClassifier(
n_estimators=model_config.get("n_estimators", 100),
max_depth=model_config.get("max_depth", None),
random_state=model_config.get("random_state", 42),
n_jobs=-1
)
# Convert to pandas for sklearn compatibility
X_train_pd = X_train.compute()
y_train_pd = y_train.compute()
model.fit(X_train_pd, y_train_pd)
elif model_type == "dask_ml":
# Use dask-ml algorithms
from dask_ml.linear_model import LogisticRegression
from dask_ml.preprocessing import StandardScaler
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
# Train model
model = LogisticRegression(
max_iter=model_config.get("max_iter", 1000),
random_state=model_config.get("random_state", 42)
)
model.fit(X_train_scaled, y_train)
else:
raise ValueError(f"Unsupported model type: {model_type}")
return model
def distributed_hyperparameter_tuning(self, X_train: dd.DataFrame, y_train: dd.Series,
model_class: Any, param_grid: Dict[str, List],
cv_folds: int = 5) -> Tuple[Any, Dict[str, Any]]:
"""Distributed hyperparameter tuning"""
from dask_ml.model_selection import GridSearchCV
# Create base model
base_model = model_class()
# Distributed grid search
grid_search = GridSearchCV(
base_model,
param_grid,
cv=cv_folds,
scoring="accuracy",
n_jobs=-1
)
# Fit grid search
grid_search.fit(X_train, y_train)
best_model = grid_search.best_estimator_
best_params = grid_search.best_params_
return best_model, best_params
def distributed_model_evaluation(self, model: Any, X_test: dd.DataFrame, y_test: dd.Series) -> Dict[str, Any]:
"""Distributed model evaluation"""
# Make predictions
if hasattr(model, "predict"):
y_pred = model.predict(X_test)
if hasattr(model, "predict_proba"):
y_pred_proba = model.predict_proba(X_test)
else:
y_pred_proba = None
else:
raise ValueError("Model does not support prediction")
# Compute evaluation metrics
evaluation_results = {
"accuracy": accuracy_score(y_test.compute(), y_pred),
"classification_report": classification_report(y_test.compute(), y_pred, output_dict=True)
}
if y_pred_proba is not None:
from sklearn.metrics import roc_auc_score, log_loss
try:
evaluation_results["roc_auc"] = roc_auc_score(y_test.compute(), y_pred_proba[:, 1])
evaluation_results["log_loss"] = log_loss(y_test.compute(), y_pred_proba)
except:
pass # Skip if not applicable (e.g., multiclass)
return evaluation_results
class DaskPerformanceMonitor:
"""Monitor and optimize Dask performance"""
def __init__(self):
self.performance_history = []
def monitor_task_performance(self, client: Client) -> Dict[str, Any]:
"""Monitor task execution performance"""
# Get scheduler info
scheduler_info = client.scheduler_info()
# Get task stream
task_stream = client.get_task_stream()
performance_metrics = {
"timestamp": datetime.now().isoformat(),
"worker_count": len(scheduler_info["workers"]),
"total_cores": sum(w["nthreads"] for w in scheduler_info["workers"].values()),
"memory_usage": {},
"task_metrics": {},
"bottlenecks": []
}
# Memory usage by worker
for worker_id, worker_info in scheduler_info["workers"].items():
performance_metrics["memory_usage"][worker_id] = {
"memory_limit": worker_info["memory_limit"],
"memory_used": worker_info.get("metrics", {}).get("memory", 0),
"utilization": worker_info.get("metrics", {}).get("memory", 0) / worker_info["memory_limit"] * 100
}
# Task performance metrics
if task_stream:
recent_tasks = [task for task in task_stream if task["action"] == "compute"]
if recent_tasks:
durations = [task["stop"] - task["start"] for task in recent_tasks if "stop" in task]
performance_metrics["task_metrics"] = {
"total_tasks": len(recent_tasks),
"avg_duration": sum(durations) / len(durations) if durations else 0,
"max_duration": max(durations) if durations else 0,
"min_duration": min(durations) if durations else 0
}
# Identify bottlenecks
bottlenecks = self._identify_bottlenecks(performance_metrics)
performance_metrics["bottlenecks"] = bottlenecks
self.performance_history.append(performance_metrics)
return performance_metrics
def _identify_bottlenecks(self, metrics: Dict[str, Any]) -> List[str]:
"""Identify performance bottlenecks"""
bottlenecks = []
# Memory bottlenecks
for worker_id, memory_info in metrics["memory_usage"].items():
if memory_info["utilization"] > 90:
bottlenecks.append(f"High memory usage on worker {worker_id}: {memory_info['utilization']:.1f}%")
# Task duration bottlenecks
task_metrics = metrics.get("task_metrics", {})
if task_metrics.get("max_duration", 0) > 60: # More than 1 minute
bottlenecks.append(f"Long-running tasks detected: max duration {task_metrics['max_duration']:.1f}s")
# Worker imbalance
memory_utilizations = [info["utilization"] for info in metrics["memory_usage"].values()]
if memory_utilizations:
std_utilization = np.std(memory_utilizations)
if std_utilization > 20: # High variance in utilization
bottlenecks.append(f"Worker load imbalance detected: std deviation {std_utilization:.1f}%")
return bottlenecks
def generate_optimization_recommendations(self, metrics: Dict[str, Any]) -> List[str]:
"""Generate optimization recommendations"""
recommendations = []
# Memory recommendations
avg_memory_utilization = np.mean([
info["utilization"] for info in metrics["memory_usage"].values()
])
if avg_memory_utilization > 80:
recommendations.append("Consider increasing worker memory or reducing chunk sizes")
elif avg_memory_utilization < 30:
recommendations.append("Consider reducing worker memory to allow more workers")
# Task duration recommendations
task_metrics = metrics.get("task_metrics", {})
avg_duration = task_metrics.get("avg_duration", 0)
if avg_duration > 30:
recommendations.append("Consider breaking down large tasks into smaller chunks")
elif avg_duration < 0.1:
recommendations.append("Consider increasing chunk sizes to reduce task overhead")
# Worker scaling recommendations
worker_count = metrics["worker_count"]
total_cores = metrics["total_cores"]
if worker_count < total_cores / 2:
recommendations.append("Consider increasing number of workers to utilize available cores")
return recommendations
Ray: Distributed AI and ML Framework
Ray provides a more general-purpose distributed computing framework with native support for machine learning, reinforcement learning, and hyperparameter tuning.
# Advanced Ray implementation for scalable AI/ML
import ray
from ray import tune
from ray.train import Trainer
from ray.data import Dataset
from ray.tune.search.optuna import OptunaSearch
from ray.tune.schedulers import ASHAScheduler
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional, Callable, Tuple
import logging
import time
from datetime import datetime
import asyncio
@ray.remote
class RayDataProcessor:
"""Distributed data processing with Ray"""
def __init__(self, config: Dict[str, Any]):
self.config = config
def process_partition(self, data_partition: pd.DataFrame,
processing_config: Dict[str, Any]) -> pd.DataFrame:
"""Process a data partition with specified transformations"""
# Data cleaning
if "cleaning" in processing_config:
cleaning_config = processing_config["cleaning"]
# Remove duplicates
if cleaning_config.get("remove_duplicates", False):
data_partition = data_partition.drop_duplicates()
# Handle outliers
if "outlier_columns" in cleaning_config:
for column in cleaning_config["outlier_columns"]:
if column in data_partition.columns:
q1 = data_partition[column].quantile(0.25)
q3 = data_partition[column].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
# Cap outliers
data_partition[column] = data_partition[column].clip(lower_bound, upper_bound)
# Feature engineering
if "features" in processing_config:
features_config = processing_config["features"]
# Polynomial features
if "polynomial" in features_config:
poly_config = features_config["polynomial"]
for column in poly_config.get("columns", []):
if column in data_partition.columns:
degree = poly_config.get("degree", 2)
for d in range(2, degree + 1):
data_partition[f"{column}_poly_{d}"] = data_partition[column] ** d
# Interaction features
if "interactions" in features_config:
interactions = features_config["interactions"]
for col1, col2 in interactions:
if col1 in data_partition.columns and col2 in data_partition.columns:
data_partition[f"{col1}_x_{col2}"] = data_partition[col1] * data_partition[col2]
# Binning features
if "binning" in features_config:
binning_config = features_config["binning"]
for column, bins in binning_config.items():
if column in data_partition.columns:
data_partition[f"{column}_binned"] = pd.cut(
data_partition[column],
bins=bins,
labels=False
)
return data_partition
def compute_statistics(self, data_partition: pd.DataFrame) -> Dict[str, Any]:
"""Compute comprehensive statistics for data partition"""
stats = {
"row_count": len(data_partition),
"column_count": len(data_partition.columns),
"memory_usage": data_partition.memory_usage(deep=True).sum(),
"column_stats": {}
}
for column in data_partition.columns:
col_stats = {
"dtype": str(data_partition[column].dtype),
"null_count": data_partition[column].isnull().sum(),
"null_percentage": data_partition[column].isnull().sum() / len(data_partition) * 100
}
if pd.api.types.is_numeric_dtype(data_partition[column]):
col_stats.update({
"min": data_partition[column].min(),
"max": data_partition[column].max(),
"mean": data_partition[column].mean(),
"std": data_partition[column].std(),
"q25": data_partition[column].quantile(0.25),
"q50": data_partition[column].quantile(0.50),
"q75": data_partition[column].quantile(0.75)
})
elif pd.api.types.is_string_dtype(data_partition[column]):
col_stats.update({
"unique_count": data_partition[column].nunique(),
"most_frequent": data_partition[column].mode().iloc[0] if not data_partition[column].mode().empty else None
})
stats["column_stats"][column] = col_stats
return stats
class RayMLPipeline:
"""Advanced ML pipeline with Ray"""
def __init__(self, cluster_config: Dict[str, Any]):
self.cluster_config = cluster_config
self.initialize_ray()
def initialize_ray(self):
"""Initialize Ray cluster"""
if not ray.is_initialized():
ray.init(
address=self.cluster_config.get("address", "auto"),
num_cpus=self.cluster_config.get("num_cpus"),
num_gpus=self.cluster_config.get("num_gpus"),
memory=self.cluster_config.get("memory"),
object_store_memory=self.cluster_config.get("object_store_memory"),
dashboard_host=self.cluster_config.get("dashboard_host", "0.0.0.0"),
dashboard_port=self.cluster_config.get("dashboard_port", 8265)
)
logging.info(f"Ray cluster initialized with {ray.cluster_resources()}")
def distributed_data_loading(self, data_sources: List[str],
file_format: str = "parquet") -> Dataset:
"""Load data using Ray Data"""
if file_format == "parquet":
dataset = ray.data.read_parquet(data_sources)
elif file_format == "csv":
dataset = ray.data.read_csv(data_sources)
elif file_format == "json":
dataset = ray.data.read_json(data_sources)
else:
raise ValueError(f"Unsupported file format: {file_format}")
return dataset
def distributed_preprocessing(self, dataset: Dataset,
preprocessing_config: Dict[str, Any]) -> Dataset:
"""Distributed data preprocessing"""
# Apply preprocessing transformations
processor = RayDataProcessor.remote(preprocessing_config)
def preprocess_batch(batch: pd.DataFrame) -> pd.DataFrame:
return ray.get(processor.process_partition.remote(batch, preprocessing_config))
# Apply preprocessing to dataset
preprocessed_dataset = dataset.map_batches(preprocess_batch, batch_format="pandas")
return preprocessed_dataset
def distributed_feature_selection(self, dataset: Dataset,
target_column: str,
selection_config: Dict[str, Any]) -> Tuple[Dataset, List[str]]:
"""Distributed feature selection"""
selection_method = selection_config.get("method", "variance")
if selection_method == "variance":
# Variance-based feature selection
threshold = selection_config.get("variance_threshold", 0.01)
# Compute variance for each feature
def compute_variance(batch: pd.DataFrame) -> Dict[str, float]:
variances = {}
for col in batch.columns:
if col != target_column and pd.api.types.is_numeric_dtype(batch[col]):
variances[col] = batch[col].var()
return variances
# Aggregate variances across all batches
variance_results = dataset.map_batches(compute_variance, batch_format="pandas")
all_variances = {}
for result in variance_results.iter_rows():
for col, var in result.items():
if col not in all_variances:
all_variances[col] = []
all_variances[col].append(var)
# Average variances and select features
selected_features = []
for col, vars_list in all_variances.items():
avg_variance = np.mean(vars_list)
if avg_variance > threshold:
selected_features.append(col)
# Filter dataset to selected features
columns_to_keep = selected_features + [target_column]
filtered_dataset = dataset.select_columns(columns_to_keep)
elif selection_method == "correlation":
# Correlation-based feature selection
threshold = selection_config.get("correlation_threshold", 0.05)
# Compute correlations with target
sample_data = dataset.take(10000) # Sample for correlation computation
sample_df = pd.DataFrame(sample_data)
correlations = sample_df.corr()[target_column].abs()
selected_features = [
col for col in correlations.index
if col != target_column and correlations[col] > threshold
]
columns_to_keep = selected_features + [target_column]
filtered_dataset = dataset.select_columns(columns_to_keep)
else:
raise ValueError(f"Unsupported feature selection method: {selection_method}")
return filtered_dataset, selected_features
def hyperparameter_tuning(self, train_dataset: Dataset,
val_dataset: Dataset,
model_config: Dict[str, Any]) -> Dict[str, Any]:
"""Advanced hyperparameter tuning with Ray Tune"""
# Define search space
search_space = model_config.get("search_space", {})
# Define training function
def train_model(config: Dict[str, Any]):
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
# Convert Ray datasets to pandas
train_df = train_dataset.to_pandas()
val_df = val_dataset.to_pandas()
# Separate features and target
target_col = model_config["target_column"]
X_train = train_df.drop(columns=[target_col])
y_train = train_df[target_col]
X_val = val_df.drop(columns=[target_col])
y_val = val_df[target_col]
# Create and train model
model = RandomForestClassifier(
n_estimators=config["n_estimators"],
max_depth=config["max_depth"],
min_samples_split=config["min_samples_split"],
min_samples_leaf=config["min_samples_leaf"],
random_state=42
)
model.fit(X_train, y_train)
# Evaluate model
y_pred = model.predict(X_val)
accuracy = accuracy_score(y_val, y_pred)
# Report results to Ray Tune
tune.report(accuracy=accuracy)
# Configure search algorithm
search_algo = OptunaSearch()
# Configure scheduler
scheduler = ASHAScheduler(
metric="accuracy",
mode="max",
max_t=model_config.get("max_training_iterations", 100),
grace_period=model_config.get("grace_period", 10)
)
# Run hyperparameter tuning
analysis = tune.run(
train_model,
config=search_space,
search_alg=search_algo,
scheduler=scheduler,
num_samples=model_config.get("num_samples", 20),
resources_per_trial={"cpu": 2, "gpu": 0},
verbose=1
)
# Get best configuration
best_config = analysis.get_best_config(metric="accuracy", mode="max")
best_result = analysis.get_best_trial(metric="accuracy", mode="max")
return {
"best_config": best_config,
"best_accuracy": best_result.last_result["accuracy"],
"analysis": analysis
}
def distributed_ensemble_training(self, dataset: Dataset,
ensemble_config: Dict[str, Any]) -> List[Any]:
"""Train ensemble of models in parallel"""
target_column = ensemble_config["target_column"]
model_configs = ensemble_config["models"]
# Split data for each model
train_df = dataset.to_pandas()
@ray.remote
def train_single_model(model_config: Dict[str, Any], data: pd.DataFrame) -> Any:
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
# Separate features and target
X = data.drop(columns=[target_column])
y = data[target_column]
# Create model based on type
if model_config["type"] == "random_forest":
model = RandomForestClassifier(**model_config.get("params", {}))
elif model_config["type"] == "logistic_regression":
model = LogisticRegression(**model_config.get("params", {}))
elif model_config["type"] == "svm":
model = SVC(**model_config.get("params", {}))
else:
raise ValueError(f"Unsupported model type: {model_config['type']}")
# Train model
model.fit(X, y)
return model
# Train models in parallel
model_futures = []
for model_config in model_configs:
# Optionally use different data subsets for each model
if ensemble_config.get("use_bootstrap", True):
# Bootstrap sampling
bootstrap_data = train_df.sample(n=len(train_df), replace=True, random_state=42)
else:
bootstrap_data = train_df
future = train_single_model.remote(model_config, bootstrap_data)
model_futures.append(future)
# Collect trained models
trained_models = ray.get(model_futures)
return trained_models
def distributed_model_evaluation(self, models: List[Any],
test_dataset: Dataset,
target_column: str) -> Dict[str, Any]:
"""Evaluate models in parallel"""
test_df = test_dataset.to_pandas()
X_test = test_df.drop(columns=[target_column])
y_test = test_df[target_column]
@ray.remote
def evaluate_model(model: Any, X: pd.DataFrame, y: pd.Series) -> Dict[str, Any]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# Make predictions
y_pred = model.predict(X)
# Compute metrics
metrics = {
"accuracy": accuracy_score(y, y_pred),
"precision": precision_score(y, y_pred, average="weighted"),
"recall": recall_score(y, y_pred, average="weighted"),
"f1_score": f1_score(y, y_pred, average="weighted")
}
return metrics
# Evaluate models in parallel
evaluation_futures = [
evaluate_model.remote(model, X_test, y_test)
for model in models
]
# Collect evaluation results
evaluation_results = ray.get(evaluation_futures)
# Aggregate results
aggregated_results = {
"individual_results": evaluation_results,
"ensemble_metrics": self._compute_ensemble_metrics(models, X_test, y_test)
}
return aggregated_results
def _compute_ensemble_metrics(self, models: List[Any],
X_test: pd.DataFrame,
y_test: pd.Series) -> Dict[str, Any]:
"""Compute ensemble prediction metrics"""
from sklearn.metrics import accuracy_score
import numpy as np
# Get predictions from all models
predictions = []
for model in models:
pred = model.predict(X_test)
predictions.append(pred)
# Ensemble prediction (majority voting)
ensemble_pred = np.array(predictions).T
ensemble_final = [
np.bincount(row).argmax() for row in ensemble_pred
]
# Compute ensemble metrics
ensemble_accuracy = accuracy_score(y_test, ensemble_final)
return {
"ensemble_accuracy": ensemble_accuracy,
"individual_accuracies": [
accuracy_score(y_test, pred) for pred in predictions
]
}
def shutdown(self):
"""Shutdown Ray cluster"""
ray.shutdown()
logging.info("Ray cluster shut down")
class RayClusterMonitor:
"""Monitor Ray cluster performance and resource usage"""
def __init__(self):
self.monitoring_active = False
def start_monitoring(self, interval_seconds: int = 30):
"""Start monitoring Ray cluster"""
self.monitoring_active = True
@ray.remote
def monitor_worker():
while self.monitoring_active:
try:
# Collect cluster metrics
metrics = self.collect_cluster_metrics()
# Log metrics
logging.info(f"Cluster metrics: {metrics}")
# Sleep until next collection
time.sleep(interval_seconds)
except Exception as e:
logging.error(f"Error collecting metrics: {e}")
time.sleep(interval_seconds)
# Start monitoring task
self.monitor_task = monitor_worker.remote()
def collect_cluster_metrics(self) -> Dict[str, Any]:
"""Collect comprehensive cluster metrics"""
try:
cluster_resources = ray.cluster_resources()
available_resources = ray.available_resources()
metrics = {
"timestamp": datetime.now().isoformat(),
"cluster_resources": cluster_resources,
"available_resources": available_resources,
"resource_utilization": {},
"node_count": len(ray.nodes()),
"object_store_stats": self._get_object_store_stats()
}
# Calculate resource utilization
for resource, total in cluster_resources.items():
available = available_resources.get(resource, 0)
used = total - available
utilization = (used / total) * 100 if total > 0 else 0
metrics["resource_utilization"][resource] = {
"total": total,
"used": used,
"available": available,
"utilization_percent": utilization
}
return metrics
except Exception as e:
logging.error(f"Error collecting cluster metrics: {e}")
return {"error": str(e)}
def _get_object_store_stats(self) -> Dict[str, Any]:
"""Get object store statistics"""
try:
# This would require accessing Ray's internal APIs
# For now, return placeholder
return {
"total_size_bytes": 0,
"used_size_bytes": 0,
"object_count": 0
}
except:
return {"error": "Unable to collect object store stats"}
def stop_monitoring(self):
"""Stop cluster monitoring"""
self.monitoring_active = False
if hasattr(self, "monitor_task"):
ray.cancel(self.monitor_task)
logging.info("Stopped cluster monitoring")
def get_performance_report(self) -> Dict[str, Any]:
"""Generate comprehensive performance report"""
current_metrics = self.collect_cluster_metrics()
report = {
"report_id": f"ray_performance_{int(time.time())}",
"generated_at": datetime.now().isoformat(),
"cluster_overview": {
"node_count": current_metrics.get("node_count", 0),
"total_cpus": current_metrics.get("cluster_resources", {}).get("CPU", 0),
"total_gpus": current_metrics.get("cluster_resources", {}).get("GPU", 0),
"total_memory": current_metrics.get("cluster_resources", {}).get("memory", 0)
},
"current_utilization": current_metrics.get("resource_utilization", {}),
"recommendations": self._generate_recommendations(current_metrics)
}
return report
def _generate_recommendations(self, metrics: Dict[str, Any]) -> List[str]:
"""Generate performance optimization recommendations"""
recommendations = []
utilization = metrics.get("resource_utilization", {})
# CPU utilization recommendations
cpu_util = utilization.get("CPU", {}).get("utilization_percent", 0)
if cpu_util > 90:
recommendations.append("High CPU utilization detected - consider adding more nodes")
elif cpu_util < 20:
recommendations.append("Low CPU utilization - consider reducing cluster size")
# Memory utilization recommendations
memory_util = utilization.get("memory", {}).get("utilization_percent", 0)
if memory_util > 85:
recommendations.append("High memory utilization - consider increasing memory per node")
# GPU utilization recommendations
if "GPU" in utilization:
gpu_util = utilization["GPU"].get("utilization_percent", 0)
if gpu_util < 30:
recommendations.append("Low GPU utilization - ensure GPU-accelerated tasks are properly configured")
return recommendations
Production Deployment and Optimization
Advanced Deployment Strategies
# Production deployment and optimization for distributed computing
import kubernetes
from kubernetes import client, config
import yaml
import json
import logging
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
@dataclass
class ClusterDeploymentConfig:
"""Configuration for cluster deployment"""
cluster_name: str
cluster_type: str # dask, ray
node_count: int
instance_type: str
memory_per_node: str
cpu_per_node: int
gpu_per_node: int = 0
storage_size: str = "100Gi"
namespace: str = "default"
auto_scaling: bool = True
min_nodes: int = 1
max_nodes: int = 10
class KubernetesClusterDeployer:
"""Deploy and manage distributed computing clusters on Kubernetes"""
def __init__(self, k8s_config_path: Optional[str] = None):
if k8s_config_path:
config.load_kube_config(config_file=k8s_config_path)
else:
config.load_incluster_config()
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.autoscaling_v1 = client.AutoscalingV1Api()
def deploy_dask_cluster(self, deployment_config: ClusterDeploymentConfig) -> Dict[str, Any]:
"""Deploy Dask cluster on Kubernetes"""
# Create namespace if it doesn't exist
self._ensure_namespace(deployment_config.namespace)
# Deploy Dask scheduler
scheduler_deployment = self._create_dask_scheduler_deployment(deployment_config)
scheduler_service = self._create_dask_scheduler_service(deployment_config)
# Deploy Dask workers
worker_deployment = self._create_dask_worker_deployment(deployment_config)
# Create horizontal pod autoscaler if enabled
if deployment_config.auto_scaling:
hpa = self._create_dask_hpa(deployment_config)
deployment_info = {
"cluster_name": deployment_config.cluster_name,
"cluster_type": "dask",
"namespace": deployment_config.namespace,
"scheduler_service": f"{deployment_config.cluster_name}-scheduler",
"dashboard_service": f"{deployment_config.cluster_name}-dashboard",
"worker_deployment": f"{deployment_config.cluster_name}-worker"
}
return deployment_info
def deploy_ray_cluster(self, deployment_config: ClusterDeploymentConfig) -> Dict[str, Any]:
"""Deploy Ray cluster on Kubernetes"""
# Create namespace if it doesn't exist
self._ensure_namespace(deployment_config.namespace)
# Deploy Ray head node
head_deployment = self._create_ray_head_deployment(deployment_config)
head_service = self._create_ray_head_service(deployment_config)
# Deploy Ray worker nodes
worker_deployment = self._create_ray_worker_deployment(deployment_config)
# Create horizontal pod autoscaler if enabled
if deployment_config.auto_scaling:
hpa = self._create_ray_hpa(deployment_config)
deployment_info = {
"cluster_name": deployment_config.cluster_name,
"cluster_type": "ray",
"namespace": deployment_config.namespace,
"head_service": f"{deployment_config.cluster_name}-head",
"dashboard_service": f"{deployment_config.cluster_name}-dashboard",
"worker_deployment": f"{deployment_config.cluster_name}-worker"
}
return deployment_info
def _ensure_namespace(self, namespace: str):
"""Ensure namespace exists"""
try:
self.v1.read_namespace(namespace)
except client.ApiException as e:
if e.status == 404:
# Create namespace
namespace_manifest = client.V1Namespace(
metadata=client.V1ObjectMeta(name=namespace)
)
self.v1.create_namespace(namespace_manifest)
logging.info(f"Created namespace: {namespace}")
def _create_dask_scheduler_deployment(self, config: ClusterDeploymentConfig) -> Any:
"""Create Dask scheduler deployment"""
deployment_manifest = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": f"{config.cluster_name}-scheduler",
"namespace": config.namespace,
"labels": {
"app": "dask-scheduler",
"cluster": config.cluster_name
}
},
"spec": {
"replicas": 1,
"selector": {
"matchLabels": {
"app": "dask-scheduler",
"cluster": config.cluster_name
}
},
"template": {
"metadata": {
"labels": {
"app": "dask-scheduler",
"cluster": config.cluster_name
}
},
"spec": {
"containers": [{
"name": "dask-scheduler",
"image": "daskdev/dask:latest",
"command": ["dask-scheduler"],
"ports": [
{"containerPort": 8786, "name": "scheduler"},
{"containerPort": 8787, "name": "dashboard"}
],
"resources": {
"requests": {
"cpu": "500m",
"memory": "1Gi"
},
"limits": {
"cpu": "1000m",
"memory": "2Gi"
}
},
"env": [
{
"name": "DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING",
"value": "True"
}
]
}]
}
}
}
}
# Create deployment
deployment = self.apps_v1.create_namespaced_deployment(
namespace=config.namespace,
body=deployment_manifest
)
return deployment
def _create_dask_scheduler_service(self, config: ClusterDeploymentConfig) -> Any:
"""Create Dask scheduler service"""
service_manifest = {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": f"{config.cluster_name}-scheduler",
"namespace": config.namespace,
"labels": {
"app": "dask-scheduler",
"cluster": config.cluster_name
}
},
"spec": {
"selector": {
"app": "dask-scheduler",
"cluster": config.cluster_name
},
"ports": [
{
"name": "scheduler",
"port": 8786,
"targetPort": 8786
},
{
"name": "dashboard",
"port": 8787,
"targetPort": 8787
}
],
"type": "ClusterIP"
}
}
# Create service
service = self.v1.create_namespaced_service(
namespace=config.namespace,
body=service_manifest
)
return service
def _create_dask_worker_deployment(self, config: ClusterDeploymentConfig) -> Any:
"""Create Dask worker deployment"""
deployment_manifest = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": f"{config.cluster_name}-worker",
"namespace": config.namespace,
"labels": {
"app": "dask-worker",
"cluster": config.cluster_name
}
},
"spec": {
"replicas": config.node_count,
"selector": {
"matchLabels": {
"app": "dask-worker",
"cluster": config.cluster_name
}
},
"template": {
"metadata": {
"labels": {
"app": "dask-worker",
"cluster": config.cluster_name
}
},
"spec": {
"containers": [{
"name": "dask-worker",
"image": "daskdev/dask:latest",
"command": [
"dask-worker",
f"{config.cluster_name}-scheduler:8786",
"--nthreads", str(config.cpu_per_node),
"--memory-limit", config.memory_per_node
],
"resources": {
"requests": {
"cpu": str(config.cpu_per_node),
"memory": config.memory_per_node
},
"limits": {
"cpu": str(config.cpu_per_node),
"memory": config.memory_per_node
}
},
"env": [
{
"name": "DASK_DISTRIBUTED__WORKER__DAEMON",
"value": "False"
}
]
}]
}
}
}
}
# Add GPU resources if specified
if config.gpu_per_node > 0:
gpu_resources = {
"nvidia.com/gpu": str(config.gpu_per_node)
}
deployment_manifest["spec"]["template"]["spec"]["containers"][0]["resources"]["requests"].update(gpu_resources)
deployment_manifest["spec"]["template"]["spec"]["containers"][0]["resources"]["limits"].update(gpu_resources)
# Create deployment
deployment = self.apps_v1.create_namespaced_deployment(
namespace=config.namespace,
body=deployment_manifest
)
return deployment
def _create_dask_hpa(self, config: ClusterDeploymentConfig) -> Any:
"""Create horizontal pod autoscaler for Dask workers"""
hpa_manifest = {
"apiVersion": "autoscaling/v2",
"kind": "HorizontalPodAutoscaler",
"metadata": {
"name": f"{config.cluster_name}-worker-hpa",
"namespace": config.namespace
},
"spec": {
"scaleTargetRef": {
"apiVersion": "apps/v1",
"kind": "Deployment",
"name": f"{config.cluster_name}-worker"
},
"minReplicas": config.min_nodes,
"maxReplicas": config.max_nodes,
"metrics": [
{
"type": "Resource",
"resource": {
"name": "cpu",
"target": {
"type": "Utilization",
"averageUtilization": 70
}
}
},
{
"type": "Resource",
"resource": {
"name": "memory",
"target": {
"type": "Utilization",
"averageUtilization": 80
}
}
}
]
}
}
# Create HPA
hpa = self.autoscaling_v1.create_namespaced_horizontal_pod_autoscaler(
namespace=config.namespace,
body=hpa_manifest
)
return hpa
def scale_cluster(self, cluster_name: str, namespace: str, new_replica_count: int):
"""Scale cluster to new replica count"""
# Update worker deployment
deployment = self.apps_v1.read_namespaced_deployment(
name=f"{cluster_name}-worker",
namespace=namespace
)
deployment.spec.replicas = new_replica_count
self.apps_v1.patch_namespaced_deployment(
name=f"{cluster_name}-worker",
namespace=namespace,
body=deployment
)
logging.info(f"Scaled cluster {cluster_name} to {new_replica_count} workers")
def delete_cluster(self, cluster_name: str, namespace: str):
"""Delete entire cluster"""
# Delete deployments
try:
self.apps_v1.delete_namespaced_deployment(
name=f"{cluster_name}-scheduler",
namespace=namespace
)
self.apps_v1.delete_namespaced_deployment(
name=f"{cluster_name}-worker",
namespace=namespace
)
except client.ApiException:
pass
# Delete services
try:
self.v1.delete_namespaced_service(
name=f"{cluster_name}-scheduler",
namespace=namespace
)
except client.ApiException:
pass
# Delete HPA
try:
self.autoscaling_v1.delete_namespaced_horizontal_pod_autoscaler(
name=f"{cluster_name}-worker-hpa",
namespace=namespace
)
except client.ApiException:
pass
logging.info(f"Deleted cluster {cluster_name}")
class DistributedComputingOrchestrator:
"""Orchestrate distributed computing workflows"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.k8s_deployer = KubernetesClusterDeployer()
self.active_clusters: Dict[str, Dict[str, Any]] = {}
def create_optimized_cluster(self, workload_type: str,
data_size: str,
compute_requirements: Dict[str, Any]) -> str:
"""Create optimized cluster based on workload characteristics"""
# Determine optimal cluster configuration
cluster_config = self._optimize_cluster_configuration(
workload_type, data_size, compute_requirements
)
# Deploy cluster
if cluster_config.cluster_type == "dask":
deployment_info = self.k8s_deployer.deploy_dask_cluster(cluster_config)
elif cluster_config.cluster_type == "ray":
deployment_info = self.k8s_deployer.deploy_ray_cluster(cluster_config)
else:
raise ValueError(f"Unsupported cluster type: {cluster_config.cluster_type}")
# Store cluster information
self.active_clusters[cluster_config.cluster_name] = {
"config": cluster_config,
"deployment_info": deployment_info,
"created_at": datetime.now()
}
logging.info(f"Created optimized cluster: {cluster_config.cluster_name}")
return cluster_config.cluster_name
def _optimize_cluster_configuration(self, workload_type: str,
data_size: str,
compute_requirements: Dict[str, Any]) -> ClusterDeploymentConfig:
"""Optimize cluster configuration based on requirements"""
# Base configuration
cluster_name = f"compute-{workload_type}-{int(time.time())}"
# Choose framework based on workload
if workload_type in ["data_processing", "etl", "analytics"]:
cluster_type = "dask"
elif workload_type in ["ml_training", "hyperparameter_tuning", "deep_learning"]:
cluster_type = "ray"
else:
cluster_type = "dask" # Default
# Optimize based on data size
if data_size == "small": # < 1GB
node_count = 2
instance_type = "standard"
memory_per_node = "4Gi"
cpu_per_node = 2
elif data_size == "medium": # 1GB - 100GB
node_count = 4
instance_type = "standard"
memory_per_node = "8Gi"
cpu_per_node = 4
elif data_size == "large": # 100GB - 1TB
node_count = 8
instance_type = "memory-optimized"
memory_per_node = "16Gi"
cpu_per_node = 8
else: # > 1TB
node_count = 16
instance_type = "memory-optimized"
memory_per_node = "32Gi"
cpu_per_node = 16
# Adjust for GPU requirements
gpu_per_node = 0
if compute_requirements.get("gpu_required", False):
gpu_per_node = compute_requirements.get("gpu_count", 1)
instance_type = "gpu-optimized"
# Auto-scaling configuration
auto_scaling = compute_requirements.get("auto_scaling", True)
min_nodes = max(1, node_count // 4)
max_nodes = node_count * 2
return ClusterDeploymentConfig(
cluster_name=cluster_name,
cluster_type=cluster_type,
node_count=node_count,
instance_type=instance_type,
memory_per_node=memory_per_node,
cpu_per_node=cpu_per_node,
gpu_per_node=gpu_per_node,
auto_scaling=auto_scaling,
min_nodes=min_nodes,
max_nodes=max_nodes
)
def monitor_and_optimize(self, cluster_name: str):
"""Monitor cluster and apply optimizations"""
if cluster_name not in self.active_clusters:
raise ValueError(f"Cluster {cluster_name} not found")
cluster_info = self.active_clusters[cluster_name]
cluster_type = cluster_info["config"].cluster_type
if cluster_type == "dask":
# Monitor Dask cluster
self._monitor_dask_cluster(cluster_name, cluster_info)
elif cluster_type == "ray":
# Monitor Ray cluster
self._monitor_ray_cluster(cluster_name, cluster_info)
def _monitor_dask_cluster(self, cluster_name: str, cluster_info: Dict[str, Any]):
"""Monitor and optimize Dask cluster"""
# This would integrate with Dask monitoring APIs
# For now, implement basic monitoring
config = cluster_info["config"]
namespace = config.namespace
# Get current resource utilization
# This would require integration with Kubernetes metrics
logging.info(f"Monitoring Dask cluster: {cluster_name}")
def _monitor_ray_cluster(self, cluster_name: str, cluster_info: Dict[str, Any]):
"""Monitor and optimize Ray cluster"""
# This would integrate with Ray monitoring APIs
config = cluster_info["config"]
namespace = config.namespace
logging.info(f"Monitoring Ray cluster: {cluster_name}")
def cleanup_idle_clusters(self, idle_threshold_hours: int = 2):
"""Clean up clusters that have been idle"""
current_time = datetime.now()
for cluster_name, cluster_info in list(self.active_clusters.items()):
created_at = cluster_info["created_at"]
idle_time = current_time - created_at
if idle_time.total_seconds() > idle_threshold_hours * 3600:
# Check if cluster is actually idle (would require monitoring data)
# For now, clean up based on time
self.k8s_deployer.delete_cluster(
cluster_name,
cluster_info["config"].namespace
)
del self.active_clusters[cluster_name]
logging.info(f"Cleaned up idle cluster: {cluster_name}")
Conclusion
Implementing distributed computing solutions with Dask and Ray enables organizations to scale data science workloads from development environments to production clusters seamlessly. The advanced techniques and implementations shown in this guide provide a comprehensive foundation for building scalable analytics and machine learning systems.
Key takeaways for successful distributed computing implementation include:
- Framework Selection: Choose Dask for scaling pandas/NumPy workflows and Ray for general-purpose distributed computing and advanced ML
- Cluster Optimization: Configure clusters based on workload characteristics, data size, and compute requirements
- Performance Monitoring: Implement comprehensive monitoring to identify bottlenecks and optimize resource utilization
- Auto-scaling: Use dynamic scaling to handle variable workloads while controlling costs
- Production Deployment: Leverage Kubernetes for robust, scalable cluster deployment and management
By following these patterns and implementing the frameworks shown in this guide, organizations can build distributed computing systems that provide the scale and performance needed for modern data science and machine learning workloads while maintaining operational efficiency and cost control.