Enterprise kubectl & kubeconfig Operations Security Guide 2025
Enterprise kubectl & kubeconfig Operations Security Guide 2025
Introduction
Enterprise kubectl and kubeconfig management in 2025 requires sophisticated security practices, advanced RBAC integration, and comprehensive operational procedures. This guide covers enterprise-grade kubectl operations, secure kubeconfig management, advanced authentication patterns, and production-ready Kubernetes administration workflows.
Chapter 1: Advanced kubectl Operations Framework
Enterprise kubectl Management System
// Enterprise kubectl operations framework
package kubectl
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
"gopkg.in/yaml.v3"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
)
// EnterpriseKubectlManager provides comprehensive kubectl operations
type EnterpriseKubectlManager struct {
configs map[string]*KubeconfigProfile
currentContext string
kubectlPath string
// Security features
auditLogger *AuditLogger
rbacValidator *RBACValidator
securityPolicy *SecurityPolicy
// Operation tracking
operations []*Operation
metrics *OperationMetrics
// Configuration
config *KubectlConfig
mutex sync.RWMutex
}
type KubectlConfig struct {
DefaultNamespace string
TimeoutDuration time.Duration
MaxRetries int
EnableAuditLogging bool
EnableRBACCheck bool
SecureMode bool
// Security settings
AllowedCommands []string
RestrictedCommands []string
RequireApproval []string
// Output settings
OutputFormat string
EnableColors bool
VerboseLevel int
}
type KubeconfigProfile struct {
Name string
Path string
Context string
Cluster string
User string
Namespace string
// Security metadata
Permissions []Permission
Restrictions []Restriction
ExpiresAt *time.Time
// Client configuration
ClientConfig *rest.Config
Clientset kubernetes.Interface
}
type Permission struct {
APIGroups []string
Resources []string
Verbs []string
Namespaces []string
}
type Restriction struct {
Type string
Pattern string
Reason string
Severity string
}
// Create enterprise kubectl manager
func NewEnterpriseKubectlManager(config *KubectlConfig) *EnterpriseKubectlManager {
ekm := &EnterpriseKubectlManager{
configs: make(map[string]*KubeconfigProfile),
kubectlPath: findKubectlPath(),
config: config,
operations: make([]*Operation, 0),
metrics: NewOperationMetrics(),
}
// Initialize security components
if config.EnableAuditLogging {
ekm.auditLogger = NewAuditLogger()
}
if config.EnableRBACCheck {
ekm.rbacValidator = NewRBACValidator()
}
ekm.securityPolicy = NewSecurityPolicy(config)
return ekm
}
// Load kubeconfig with validation
func (ekm *EnterpriseKubectlManager) LoadKubeconfig(path string, profileName string) error {
// Validate file permissions and security
if err := ekm.validateKubeconfigSecurity(path); err != nil {
return fmt.Errorf("security validation failed: %w", err)
}
// Load kubeconfig
config, err := clientcmd.LoadFromFile(path)
if err != nil {
return fmt.Errorf("failed to load kubeconfig: %w", err)
}
// Validate configuration structure
if err := ekm.validateKubeconfigStructure(config); err != nil {
return fmt.Errorf("invalid kubeconfig structure: %w", err)
}
// Create client configuration
clientConfig := clientcmd.NewDefaultClientConfig(*config, &clientcmd.ConfigOverrides{})
restConfig, err := clientConfig.ClientConfig()
if err != nil {
return fmt.Errorf("failed to create client config: %w", err)
}
// Create Kubernetes clientset
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("failed to create clientset: %w", err)
}
// Extract profile information
currentContext := config.CurrentContext
context := config.Contexts[currentContext]
if context == nil {
return fmt.Errorf("current context %s not found", currentContext)
}
profile := &KubeconfigProfile{
Name: profileName,
Path: path,
Context: currentContext,
Cluster: context.Cluster,
User: context.AuthInfo,
Namespace: context.Namespace,
ClientConfig: restConfig,
Clientset: clientset,
}
// Load permissions and restrictions
if err := ekm.loadProfileSecurity(profile); err != nil {
return fmt.Errorf("failed to load security profile: %w", err)
}
ekm.mutex.Lock()
ekm.configs[profileName] = profile
if ekm.currentContext == "" {
ekm.currentContext = profileName
}
ekm.mutex.Unlock()
// Audit profile loading
if ekm.auditLogger != nil {
ekm.auditLogger.LogProfileLoaded(profileName, path, currentContext)
}
return nil
}
// Execute kubectl command with security validation
func (ekm *EnterpriseKubectlManager) Execute(command string, args ...string) (*CommandResult, error) {
// Create operation context
operation := &Operation{
ID: generateOperationID(),
Command: command,
Args: args,
Timestamp: time.Now(),
Profile: ekm.currentContext,
}
// Security validation
if err := ekm.validateCommand(operation); err != nil {
operation.Status = "denied"
operation.Error = err.Error()
ekm.recordOperation(operation)
return nil, fmt.Errorf("command denied: %w", err)
}
// RBAC validation
if ekm.rbacValidator != nil {
if err := ekm.rbacValidator.ValidateOperation(operation); err != nil {
operation.Status = "rbac_denied"
operation.Error = err.Error()
ekm.recordOperation(operation)
return nil, fmt.Errorf("RBAC validation failed: %w", err)
}
}
// Check if approval is required
if ekm.requiresApproval(operation) {
approval, err := ekm.requestApproval(operation)
if err != nil {
return nil, fmt.Errorf("approval request failed: %w", err)
}
if !approval.Approved {
operation.Status = "approval_denied"
operation.Error = approval.Reason
ekm.recordOperation(operation)
return nil, fmt.Errorf("operation not approved: %s", approval.Reason)
}
operation.ApprovalID = approval.ID
}
// Execute command
result, err := ekm.executeKubectl(operation)
// Record operation
operation.Duration = time.Since(operation.Timestamp)
if err != nil {
operation.Status = "failed"
operation.Error = err.Error()
} else {
operation.Status = "success"
operation.Output = result.Output
}
ekm.recordOperation(operation)
return result, err
}
// Execute kubectl command with timeout and retries
func (ekm *EnterpriseKubectlManager) executeKubectl(operation *Operation) (*CommandResult, error) {
profile := ekm.configs[ekm.currentContext]
if profile == nil {
return nil, fmt.Errorf("no active profile")
}
// Prepare command
cmdArgs := []string{operation.Command}
cmdArgs = append(cmdArgs, operation.Args...)
// Add kubeconfig flag
cmdArgs = append(cmdArgs, "--kubeconfig", profile.Path)
// Add context flag
cmdArgs = append(cmdArgs, "--context", profile.Context)
// Add namespace if specified
if profile.Namespace != "" {
cmdArgs = append(cmdArgs, "--namespace", profile.Namespace)
}
// Add output format
if ekm.config.OutputFormat != "" {
cmdArgs = append(cmdArgs, "--output", ekm.config.OutputFormat)
}
// Execute with retries
var lastErr error
for attempt := 0; attempt <= ekm.config.MaxRetries; attempt++ {
if attempt > 0 {
// Exponential backoff
backoff := time.Duration(attempt*attempt) * time.Second
time.Sleep(backoff)
}
result, err := ekm.executeWithTimeout(cmdArgs)
if err == nil {
return result, nil
}
lastErr = err
// Check if error is retryable
if !ekm.isRetryableError(err) {
break
}
}
return nil, lastErr
}
// Execute command with timeout
func (ekm *EnterpriseKubectlManager) executeWithTimeout(args []string) (*CommandResult, error) {
ctx, cancel := context.WithTimeout(context.Background(), ekm.config.TimeoutDuration)
defer cancel()
cmd := exec.CommandContext(ctx, ekm.kubectlPath, args...)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
start := time.Now()
err := cmd.Run()
duration := time.Since(start)
result := &CommandResult{
Output: stdout.String(),
Error: stderr.String(),
ExitCode: cmd.ProcessState.ExitCode(),
Duration: duration,
Timestamp: start,
}
if err != nil {
return result, fmt.Errorf("kubectl execution failed: %w", err)
}
return result, nil
}
// Advanced security validation
func (ekm *EnterpriseKubectlManager) validateCommand(operation *Operation) error {
// Check allowed commands
if len(ekm.config.AllowedCommands) > 0 {
allowed := false
for _, allowedCmd := range ekm.config.AllowedCommands {
if operation.Command == allowedCmd {
allowed = true
break
}
}
if !allowed {
return fmt.Errorf("command %s not in allowed list", operation.Command)
}
}
// Check restricted commands
for _, restrictedCmd := range ekm.config.RestrictedCommands {
if operation.Command == restrictedCmd {
return fmt.Errorf("command %s is restricted", operation.Command)
}
}
// Validate arguments for dangerous patterns
if err := ekm.validateArguments(operation); err != nil {
return err
}
// Profile-specific restrictions
profile := ekm.configs[ekm.currentContext]
if profile != nil {
for _, restriction := range profile.Restrictions {
if ekm.matchesRestriction(operation, restriction) {
return fmt.Errorf("operation violates restriction: %s", restriction.Reason)
}
}
}
return nil
}
// Validate command arguments for security
func (ekm *EnterpriseKubectlManager) validateArguments(operation *Operation) error {
// Check for dangerous patterns
dangerousPatterns := []string{
`\$\(.*\)`, // Command substitution
`\;`, // Command chaining
`\|`, // Pipes
`\&\&`, // AND operator
`\|\|`, // OR operator
`>`, // Redirection
`<`, // Input redirection
`rm\s+-rf`, // Dangerous delete
`--kubeconfig=.*\/etc`, // System config access
}
allArgs := strings.Join(operation.Args, " ")
for _, pattern := range dangerousPatterns {
matched, err := regexp.MatchString(pattern, allArgs)
if err != nil {
return fmt.Errorf("pattern validation error: %w", err)
}
if matched {
return fmt.Errorf("dangerous pattern detected: %s", pattern)
}
}
return nil
}
// Check if operation requires approval
func (ekm *EnterpriseKubectlManager) requiresApproval(operation *Operation) bool {
for _, cmd := range ekm.config.RequireApproval {
if operation.Command == cmd {
return true
}
}
// Check for sensitive operations
sensitiveOps := []string{"delete", "patch", "replace", "apply"}
for _, op := range sensitiveOps {
if operation.Command == op {
// Check if it affects critical resources
if ekm.isCriticalResource(operation) {
return true
}
}
}
return false
}
// RBAC validation system
type RBACValidator struct {
client kubernetes.Interface
cache map[string]*RBACPermissions
cacheTTL time.Duration
mutex sync.RWMutex
}
type RBACPermissions struct {
User string
Groups []string
Rules []RBACRule
CachedAt time.Time
}
type RBACRule struct {
APIGroups []string
Resources []string
Verbs []string
ResourceNames []string
Namespaces []string
}
// Create RBAC validator
func NewRBACValidator() *RBACValidator {
return &RBACValidator{
cache: make(map[string]*RBACPermissions),
cacheTTL: 5 * time.Minute,
}
}
// Validate operation against RBAC
func (rv *RBACValidator) ValidateOperation(operation *Operation) error {
// Get user permissions
permissions, err := rv.getUserPermissions(operation.Profile)
if err != nil {
return fmt.Errorf("failed to get user permissions: %w", err)
}
// Check if operation is allowed
allowed := rv.checkPermission(operation, permissions)
if !allowed {
return fmt.Errorf("operation not allowed by RBAC")
}
return nil
}
// Get user permissions from RBAC
func (rv *RBACValidator) getUserPermissions(profile string) (*RBACPermissions, error) {
rv.mutex.RLock()
cached, exists := rv.cache[profile]
rv.mutex.RUnlock()
if exists && time.Since(cached.CachedAt) < rv.cacheTTL {
return cached, nil
}
// Fetch permissions from Kubernetes API
// This would involve calling the authorization API
permissions := &RBACPermissions{
User: "user@example.com",
Groups: []string{"developers", "admins"},
CachedAt: time.Now(),
}
// Cache permissions
rv.mutex.Lock()
rv.cache[profile] = permissions
rv.mutex.Unlock()
return permissions, nil
}
// Advanced kubeconfig management
type KubeconfigManager struct {
profiles map[string]*KubeconfigProfile
merger *ConfigMerger
encryptor *ConfigEncryptor
validator *ConfigValidator
// Security features
accessControl *AccessControl
auditTrail *AuditTrail
mutex sync.RWMutex
}
type ConfigMerger struct {
strategy MergeStrategy
conflicts []ConflictResolution
}
type MergeStrategy int
const (
MergeStrategyOverwrite MergeStrategy = iota
MergeStrategyPreserve
MergeStrategyPrompt
)
type ConflictResolution struct {
Type string
Strategy MergeStrategy
Handler func(existing, new interface{}) interface{}
}
// Create kubeconfig manager
func NewKubeconfigManager() *KubeconfigManager {
return &KubeconfigManager{
profiles: make(map[string]*KubeconfigProfile),
merger: NewConfigMerger(),
encryptor: NewConfigEncryptor(),
validator: NewConfigValidator(),
accessControl: NewAccessControl(),
auditTrail: NewAuditTrail(),
}
}
// Merge multiple kubeconfigs securely
func (km *KubeconfigManager) MergeConfigs(configs []string, outputPath string) error {
var mergedConfig *api.Config
for i, configPath := range configs {
config, err := clientcmd.LoadFromFile(configPath)
if err != nil {
return fmt.Errorf("failed to load config %s: %w", configPath, err)
}
// Validate config before merging
if err := km.validator.ValidateConfig(config); err != nil {
return fmt.Errorf("config validation failed for %s: %w", configPath, err)
}
if i == 0 {
mergedConfig = config
} else {
mergedConfig = km.merger.MergeConfigs(mergedConfig, config)
}
}
// Encrypt sensitive data
if err := km.encryptor.EncryptSensitiveData(mergedConfig); err != nil {
return fmt.Errorf("encryption failed: %w", err)
}
// Write merged config
if err := clientcmd.WriteToFile(*mergedConfig, outputPath); err != nil {
return fmt.Errorf("failed to write merged config: %w", err)
}
// Audit the merge operation
km.auditTrail.RecordMerge(configs, outputPath)
return nil
}
// Secure kubeconfig rotation
func (km *KubeconfigManager) RotateCredentials(profileName string) error {
profile, exists := km.profiles[profileName]
if !exists {
return fmt.Errorf("profile %s not found", profileName)
}
// Generate new credentials
newCreds, err := km.generateNewCredentials(profile)
if err != nil {
return fmt.Errorf("failed to generate new credentials: %w", err)
}
// Backup current config
backupPath := fmt.Sprintf("%s.backup.%d", profile.Path, time.Now().Unix())
if err := km.backupConfig(profile.Path, backupPath); err != nil {
return fmt.Errorf("failed to backup config: %w", err)
}
// Update config with new credentials
config, err := clientcmd.LoadFromFile(profile.Path)
if err != nil {
return fmt.Errorf("failed to load current config: %w", err)
}
// Update auth info
authInfo := config.AuthInfos[profile.User]
if authInfo == nil {
return fmt.Errorf("auth info %s not found", profile.User)
}
authInfo.Token = newCreds.Token
authInfo.TokenFile = ""
authInfo.ClientCertificateData = newCreds.ClientCert
authInfo.ClientKeyData = newCreds.ClientKey
// Write updated config
if err := clientcmd.WriteToFile(*config, profile.Path); err != nil {
return fmt.Errorf("failed to write updated config: %w", err)
}
// Update expiration time
profile.ExpiresAt = &newCreds.ExpiresAt
// Audit rotation
km.auditTrail.RecordRotation(profileName, backupPath)
return nil
}
// Advanced audit logging
type AuditLogger struct {
writer io.Writer
formatter AuditFormatter
filters []AuditFilter
// Async logging
logChannel chan AuditEvent
stopChannel chan struct{}
wg sync.WaitGroup
}
type AuditEvent struct {
Timestamp time.Time
EventType string
User string
Profile string
Operation string
Resource string
Outcome string
Details map[string]interface{}
SessionID string
RequestID string
}
type AuditFormatter interface {
Format(event AuditEvent) ([]byte, error)
}
type AuditFilter interface {
ShouldLog(event AuditEvent) bool
}
// Create audit logger
func NewAuditLogger() *AuditLogger {
al := &AuditLogger{
writer: os.Stdout, // Configure as needed
formatter: &JSONAuditFormatter{},
logChannel: make(chan AuditEvent, 1000),
stopChannel: make(chan struct{}),
}
// Start async logging
al.wg.Add(1)
go al.logProcessor()
return al
}
// Log profile loaded event
func (al *AuditLogger) LogProfileLoaded(profileName, path, context string) {
event := AuditEvent{
Timestamp: time.Now(),
EventType: "profile_loaded",
Profile: profileName,
Operation: "load_kubeconfig",
Details: map[string]interface{}{
"path": path,
"context": context,
},
Outcome: "success",
}
al.logEvent(event)
}
// Log operation execution
func (al *AuditLogger) LogOperation(operation *Operation) {
event := AuditEvent{
Timestamp: operation.Timestamp,
EventType: "kubectl_operation",
Profile: operation.Profile,
Operation: operation.Command,
Outcome: operation.Status,
Details: map[string]interface{}{
"args": operation.Args,
"duration": operation.Duration,
"approval_id": operation.ApprovalID,
},
RequestID: operation.ID,
}
if operation.Error != "" {
event.Details["error"] = operation.Error
}
al.logEvent(event)
}
// Async log processor
func (al *AuditLogger) logProcessor() {
defer al.wg.Done()
for {
select {
case event := <-al.logChannel:
// Apply filters
shouldLog := true
for _, filter := range al.filters {
if !filter.ShouldLog(event) {
shouldLog = false
break
}
}
if shouldLog {
// Format and write event
formatted, err := al.formatter.Format(event)
if err != nil {
// Handle formatting error
continue
}
al.writer.Write(formatted)
al.writer.Write([]byte("\n"))
}
case <-al.stopChannel:
return
}
}
}
// JSON audit formatter
type JSONAuditFormatter struct{}
func (jaf *JSONAuditFormatter) Format(event AuditEvent) ([]byte, error) {
return json.Marshal(event)
}
// Command operation tracking
type Operation struct {
ID string
Command string
Args []string
Profile string
Timestamp time.Time
Duration time.Duration
Status string
Output string
Error string
ApprovalID string
SessionID string
}
type CommandResult struct {
Output string
Error string
ExitCode int
Duration time.Duration
Timestamp time.Time
}
// Operation metrics tracking
type OperationMetrics struct {
TotalOperations int64
SuccessfulOps int64
FailedOps int64
DeniedOps int64
AverageLatency time.Duration
// Per-command metrics
CommandStats map[string]*CommandStats
mutex sync.RWMutex
}
type CommandStats struct {
Count int64
Successes int64
Failures int64
TotalLatency time.Duration
LastExecuted time.Time
}
// Record operation metrics
func (om *OperationMetrics) RecordOperation(operation *Operation) {
om.mutex.Lock()
defer om.mutex.Unlock()
om.TotalOperations++
switch operation.Status {
case "success":
om.SuccessfulOps++
case "failed":
om.FailedOps++
case "denied", "rbac_denied", "approval_denied":
om.DeniedOps++
}
// Update command-specific stats
if om.CommandStats == nil {
om.CommandStats = make(map[string]*CommandStats)
}
stats, exists := om.CommandStats[operation.Command]
if !exists {
stats = &CommandStats{}
om.CommandStats[operation.Command] = stats
}
stats.Count++
stats.TotalLatency += operation.Duration
stats.LastExecuted = operation.Timestamp
if operation.Status == "success" {
stats.Successes++
} else {
stats.Failures++
}
// Update average latency
om.AverageLatency = time.Duration(
(int64(om.AverageLatency)*om.TotalOperations + int64(operation.Duration)) / (om.TotalOperations + 1),
)
}
// Helper functions
func findKubectlPath() string {
// Try common locations
locations := []string{
"/usr/local/bin/kubectl",
"/usr/bin/kubectl",
"kubectl", // PATH lookup
}
for _, location := range locations {
if _, err := exec.LookPath(location); err == nil {
return location
}
}
return "kubectl" // Default to PATH lookup
}
func generateOperationID() string {
return fmt.Sprintf("op-%d", time.Now().UnixNano())
}
func (ekm *EnterpriseKubectlManager) validateKubeconfigSecurity(path string) error {
// Check file permissions
info, err := os.Stat(path)
if err != nil {
return err
}
mode := info.Mode()
if mode&0077 != 0 {
return fmt.Errorf("kubeconfig file has overly permissive permissions: %o", mode)
}
return nil
}
func (ekm *EnterpriseKubectlManager) validateKubeconfigStructure(config *api.Config) error {
if config.CurrentContext == "" {
return fmt.Errorf("no current context set")
}
if len(config.Contexts) == 0 {
return fmt.Errorf("no contexts defined")
}
if len(config.Clusters) == 0 {
return fmt.Errorf("no clusters defined")
}
if len(config.AuthInfos) == 0 {
return fmt.Errorf("no auth info defined")
}
return nil
}
func (ekm *EnterpriseKubectlManager) loadProfileSecurity(profile *KubeconfigProfile) error {
// This would load RBAC permissions and restrictions from various sources
// Implementation depends on specific security requirements
return nil
}
func (ekm *EnterpriseKubectlManager) recordOperation(operation *Operation) {
ekm.mutex.Lock()
ekm.operations = append(ekm.operations, operation)
// Keep only recent operations
if len(ekm.operations) > 1000 {
ekm.operations = ekm.operations[len(ekm.operations)-1000:]
}
ekm.mutex.Unlock()
// Update metrics
ekm.metrics.RecordOperation(operation)
// Audit log
if ekm.auditLogger != nil {
ekm.auditLogger.LogOperation(operation)
}
}
func (ekm *EnterpriseKubectlManager) isRetryableError(err error) bool {
// Define retryable error patterns
retryablePatterns := []string{
"connection refused",
"timeout",
"temporary failure",
"server unavailable",
}
errStr := strings.ToLower(err.Error())
for _, pattern := range retryablePatterns {
if strings.Contains(errStr, pattern) {
return true
}
}
return false
}
func (ekm *EnterpriseKubectlManager) matchesRestriction(operation *Operation, restriction Restriction) bool {
// Simple pattern matching - can be enhanced
return strings.Contains(operation.Command, restriction.Pattern)
}
func (ekm *EnterpriseKubectlManager) isCriticalResource(operation *Operation) bool {
criticalResources := []string{
"namespace",
"clusterrole",
"clusterrolebinding",
"persistentvolume",
"storageclass",
}
for _, resource := range criticalResources {
for _, arg := range operation.Args {
if strings.Contains(arg, resource) {
return true
}
}
}
return false
}
// Additional type definitions and implementations would continue...
type SecurityPolicy struct {
config *KubectlConfig
}
func NewSecurityPolicy(config *KubectlConfig) *SecurityPolicy {
return &SecurityPolicy{config: config}
}
type AccessControl struct{}
func NewAccessControl() *AccessControl {
return &AccessControl{}
}
type AuditTrail struct{}
func NewAuditTrail() *AuditTrail {
return &AuditTrail{}
}
func (at *AuditTrail) RecordMerge(configs []string, outputPath string) {
// Implementation for recording merge operations
}
func (at *AuditTrail) RecordRotation(profileName, backupPath string) {
// Implementation for recording credential rotations
}
type ConfigEncryptor struct{}
func NewConfigEncryptor() *ConfigEncryptor {
return &ConfigEncryptor{}
}
func (ce *ConfigEncryptor) EncryptSensitiveData(config *api.Config) error {
// Implementation for encrypting sensitive configuration data
return nil
}
type ConfigValidator struct{}
func NewConfigValidator() *ConfigValidator {
return &ConfigValidator{}
}
func (cv *ConfigValidator) ValidateConfig(config *api.Config) error {
// Implementation for validating configuration structure and security
return nil
}
func NewConfigMerger() *ConfigMerger {
return &ConfigMerger{
strategy: MergeStrategyOverwrite,
}
}
func (cm *ConfigMerger) MergeConfigs(base, new *api.Config) *api.Config {
// Implementation for merging configurations
return base
}
func NewOperationMetrics() *OperationMetrics {
return &OperationMetrics{
CommandStats: make(map[string]*CommandStats),
}
}
func (km *KubeconfigManager) generateNewCredentials(profile *KubeconfigProfile) (*Credentials, error) {
// Implementation for generating new credentials
return &Credentials{
Token: "new-token",
ExpiresAt: time.Now().Add(24 * time.Hour),
}, nil
}
func (km *KubeconfigManager) backupConfig(currentPath, backupPath string) error {
// Implementation for backing up configuration
return nil
}
type Credentials struct {
Token string
ClientCert []byte
ClientKey []byte
ExpiresAt time.Time
}
type Approval struct {
ID string
Approved bool
Reason string
}
func (ekm *EnterpriseKubectlManager) requestApproval(operation *Operation) (*Approval, error) {
// Implementation for requesting operation approval
return &Approval{
ID: "approval-123",
Approved: true,
Reason: "auto-approved",
}, nil
}
func (al *AuditLogger) logEvent(event AuditEvent) {
select {
case al.logChannel <- event:
default:
// Channel full, log synchronously or drop
}
}
func (rv *RBACValidator) checkPermission(operation *Operation, permissions *RBACPermissions) bool {
// Implementation for checking RBAC permissions
return true
}
Chapter 2: Advanced kubeconfig Security Management
Secure kubeconfig Storage and Rotation
# Enterprise kubeconfig management configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: kubectl-security-config
namespace: kube-system
data:
security-policy.yaml: |
apiVersion: security.enterprise.com/v1
kind: KubectlSecurityPolicy
metadata:
name: enterprise-policy
spec:
# Command restrictions
allowedCommands:
- get
- describe
- logs
- exec
- port-forward
- apply
- create
- delete
- patch
restrictedCommands:
- proxy
- cluster-info
- top
requireApproval:
- delete
- patch
- replace
# Argument validation
argumentValidation:
enabled: true
dangerousPatterns:
- '\$\(.*\)'
- '\;'
- '\|'
- '\&\&'
- '\|\|'
- 'rm\s+-rf'
# Resource restrictions
criticalResources:
- namespaces
- clusterroles
- clusterrolebindings
- persistentvolumes
- storageclasses
- customresourcedefinitions
# Security settings
auditLogging:
enabled: true
destination: "/var/log/kubectl-audit.log"
format: "json"
rbacValidation:
enabled: true
cacheTTL: "5m"
encryptionAtRest:
enabled: true
algorithm: "AES-256-GCM"
keyRotationInterval: "24h"
---
apiVersion: v1
kind: Secret
metadata:
name: kubeconfig-encryption-key
namespace: kube-system
type: Opaque
data:
encryption.key: <base64-encoded-key>
---
# RBAC for kubectl operations
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: enterprise-kubectl-operator
rules:
# Authentication and authorization checks
- apiGroups: ["authentication.k8s.io"]
resources: ["tokenreviews"]
verbs: ["create"]
- apiGroups: ["authorization.k8s.io"]
resources: ["subjectaccessreviews", "selfsubjectaccessreviews"]
verbs: ["create"]
# Resource access for validation
- apiGroups: [""]
resources: ["namespaces", "serviceaccounts"]
verbs: ["get", "list"]
- apiGroups: ["rbac.authorization.k8s.io"]
resources: ["roles", "rolebindings", "clusterroles", "clusterrolebindings"]
verbs: ["get", "list"]
# Audit and monitoring
- apiGroups: ["audit.k8s.io"]
resources: ["events"]
verbs: ["create"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: enterprise-kubectl-operator
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: enterprise-kubectl-operator
subjects:
- kind: ServiceAccount
name: kubectl-operator
namespace: kube-system
---
# Advanced kubeconfig template
apiVersion: v1
kind: Config
clusters:
- cluster:
certificate-authority-data: <CA_CERT_DATA>
server: https://kubernetes.api.enterprise.com:6443
# Extensions for security
extensions:
- name: security.enterprise.com/cluster-id
extension:
cluster-id: "prod-us-west-2"
security-tier: "high"
compliance: "sox-pci"
name: enterprise-production
contexts:
- context:
cluster: enterprise-production
namespace: default
user: enterprise-user
# Security extensions
extensions:
- name: security.enterprise.com/session
extension:
max-session-duration: "8h"
require-approval-for:
- "delete"
- "patch"
- "replace"
allowed-namespaces:
- "production"
- "staging"
name: enterprise-prod-context
current-context: enterprise-prod-context
users:
- name: enterprise-user
user:
# Multiple auth methods for redundancy
exec:
apiVersion: client.authentication.k8s.io/v1beta1
command: kubectl
args:
- oidc-login
- get-token
- --oidc-issuer-url=https://oidc.enterprise.com
- --oidc-client-id=kubectl-client
- --oidc-extra-scope=groups,email
# Fallback to token-based auth
token: <FALLBACK_TOKEN>
# Client certificate auth
client-certificate-data: <CLIENT_CERT_DATA>
client-key-data: <CLIENT_KEY_DATA>
# Security extensions
extensions:
- name: security.enterprise.com/user
extension:
employee-id: "12345"
department: "engineering"
security-clearance: "high"
token-expiry: "2025-12-31T23:59:59Z"
Enterprise kubectl Plugin System
#!/bin/bash
# Enterprise kubectl plugin for secure operations
set -euo pipefail
# Plugin metadata
PLUGIN_NAME="kubectl-enterprise"
PLUGIN_VERSION="2.0.0"
PLUGIN_AUTHOR="Enterprise Security Team"
# Configuration
SECURITY_CONFIG_PATH="${HOME}/.kube/enterprise-security.yaml"
AUDIT_LOG_PATH="${HOME}/.kube/audit.log"
APPROVAL_ENDPOINT="https://approval.enterprise.com/api/v1/requests"
# Security functions
validate_command() {
local cmd="$1"
shift
local args=("$@")
# Load security policy
if [[ ! -f "$SECURITY_CONFIG_PATH" ]]; then
echo "ERROR: Security configuration not found" >&2
exit 1
fi
# Check allowed commands
if ! yq eval ".allowedCommands[] | select(. == \"$cmd\")" "$SECURITY_CONFIG_PATH" > /dev/null; then
echo "ERROR: Command '$cmd' not allowed by security policy" >&2
exit 1
fi
# Validate arguments
validate_arguments "$cmd" "${args[@]}"
}
validate_arguments() {
local cmd="$1"
shift
local args=("$@")
# Check for dangerous patterns
local dangerous_patterns=(
'\$\(.*\)'
';'
'\|'
'&&'
'\|\|'
'rm\s+-rf'
)
local all_args="${args[*]}"
for pattern in "${dangerous_patterns[@]}"; do
if [[ "$all_args" =~ $pattern ]]; then
echo "ERROR: Dangerous pattern detected: $pattern" >&2
exit 1
fi
done
}
check_rbac_permissions() {
local resource="$1"
local verb="$2"
local namespace="${3:-}"
local rbac_check_cmd="kubectl auth can-i $verb $resource"
if [[ -n "$namespace" ]]; then
rbac_check_cmd="$rbac_check_cmd -n $namespace"
fi
if ! $rbac_check_cmd >/dev/null 2>&1; then
echo "ERROR: RBAC permission denied for '$verb $resource'" >&2
exit 1
fi
}
request_approval() {
local operation="$1"
local resource="$2"
local details="$3"
echo "INFO: Operation requires approval..." >&2
# Create approval request
local request_payload=$(cat <<EOF
{
"operation": "$operation",
"resource": "$resource",
"details": "$details",
"user": "$(whoami)",
"timestamp": "$(date -Iseconds)",
"context": "$(kubectl config current-context)"
}
EOF
)
# Submit approval request
local approval_id
approval_id=$(curl -s -X POST \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $(get_approval_token)" \
-d "$request_payload" \
"$APPROVAL_ENDPOINT" | jq -r '.id')
if [[ "$approval_id" == "null" ]] || [[ -z "$approval_id" ]]; then
echo "ERROR: Failed to submit approval request" >&2
exit 1
fi
echo "INFO: Approval request submitted with ID: $approval_id" >&2
echo "INFO: Waiting for approval..." >&2
# Wait for approval
local approved=false
local timeout=300 # 5 minutes
local elapsed=0
while [[ $elapsed -lt $timeout ]]; do
local status
status=$(curl -s -H "Authorization: Bearer $(get_approval_token)" \
"$APPROVAL_ENDPOINT/$approval_id" | jq -r '.status')
case "$status" in
"approved")
approved=true
break
;;
"denied")
echo "ERROR: Approval request denied" >&2
exit 1
;;
"pending")
sleep 5
elapsed=$((elapsed + 5))
;;
*)
echo "ERROR: Unknown approval status: $status" >&2
exit 1
;;
esac
done
if [[ "$approved" != true ]]; then
echo "ERROR: Approval timeout" >&2
exit 1
fi
echo "INFO: Operation approved" >&2
echo "$approval_id"
}
audit_log() {
local operation="$1"
local outcome="$2"
local details="$3"
local log_entry=$(cat <<EOF
{
"timestamp": "$(date -Iseconds)",
"operation": "$operation",
"outcome": "$outcome",
"details": "$details",
"user": "$(whoami)",
"context": "$(kubectl config current-context)",
"pid": "$$"
}
EOF
)
echo "$log_entry" >> "$AUDIT_LOG_PATH"
}
encrypt_kubeconfig() {
local input_file="$1"
local output_file="$2"
local passphrase="$3"
# Encrypt using AES-256-GCM
openssl enc -aes-256-gcm -salt -in "$input_file" -out "$output_file" -pass pass:"$passphrase"
# Set restrictive permissions
chmod 600 "$output_file"
echo "INFO: Kubeconfig encrypted successfully" >&2
}
decrypt_kubeconfig() {
local input_file="$1"
local output_file="$2"
local passphrase="$3"
# Decrypt using AES-256-GCM
openssl enc -d -aes-256-gcm -in "$input_file" -out "$output_file" -pass pass:"$passphrase"
# Set restrictive permissions
chmod 600 "$output_file"
echo "INFO: Kubeconfig decrypted successfully" >&2
}
rotate_credentials() {
local profile="$1"
local kubeconfig_path="${2:-$KUBECONFIG}"
echo "INFO: Starting credential rotation for profile: $profile" >&2
# Backup current kubeconfig
local backup_path="${kubeconfig_path}.backup.$(date +%s)"
cp "$kubeconfig_path" "$backup_path"
# Generate new service account token
local sa_name="kubectl-user-$(whoami)"
local namespace="default"
# Create service account if it doesn't exist
kubectl get sa "$sa_name" -n "$namespace" >/dev/null 2>&1 || \
kubectl create sa "$sa_name" -n "$namespace"
# Create new token
local new_token
new_token=$(kubectl create token "$sa_name" -n "$namespace" --duration=24h)
# Update kubeconfig with new token
kubectl config set-credentials "$profile" --token="$new_token"
# Verify new credentials work
if kubectl cluster-info >/dev/null 2>&1; then
echo "INFO: Credential rotation successful" >&2
audit_log "credential_rotation" "success" "Profile: $profile, Backup: $backup_path"
else
echo "ERROR: New credentials failed verification, restoring backup" >&2
cp "$backup_path" "$kubeconfig_path"
audit_log "credential_rotation" "failed" "Profile: $profile, Restored from: $backup_path"
exit 1
fi
}
merge_kubeconfigs() {
local output_path="$1"
shift
local config_files=("$@")
echo "INFO: Merging ${#config_files[@]} kubeconfig files" >&2
# Start with first config
local merged_config="${config_files[0]}"
# Merge additional configs
for ((i=1; i<${#config_files[@]}; i++)); do
local temp_merged="/tmp/merged-kubeconfig-$$"
KUBECONFIG="$merged_config:${config_files[i]}" kubectl config view --flatten > "$temp_merged"
merged_config="$temp_merged"
done
# Copy to final destination
cp "$merged_config" "$output_path"
chmod 600 "$output_path"
# Cleanup temporary files
rm -f /tmp/merged-kubeconfig-$$
echo "INFO: Kubeconfig merge completed: $output_path" >&2
audit_log "kubeconfig_merge" "success" "Output: $output_path, Inputs: ${config_files[*]}"
}
validate_kubeconfig_security() {
local kubeconfig_path="${1:-$KUBECONFIG}"
echo "INFO: Validating kubeconfig security: $kubeconfig_path" >&2
# Check file permissions
local perms
perms=$(stat -c %a "$kubeconfig_path")
if [[ "$perms" != "600" ]]; then
echo "WARNING: Kubeconfig has insecure permissions: $perms (should be 600)" >&2
fi
# Check for embedded credentials
if grep -q "client-certificate-data\|client-key-data" "$kubeconfig_path"; then
echo "WARNING: Kubeconfig contains embedded certificate data" >&2
fi
# Check for token expiration
local tokens
tokens=$(yq eval '.users[].user.token // empty' "$kubeconfig_path")
if [[ -n "$tokens" ]]; then
echo "WARNING: Kubeconfig contains bearer tokens (consider using exec auth)" >&2
fi
# Validate cluster certificates
local clusters
clusters=$(yq eval '.clusters[].name' "$kubeconfig_path")
while IFS= read -r cluster; do
echo "INFO: Validating cluster: $cluster" >&2
kubectl cluster-info --context="$cluster" >/dev/null 2>&1 || \
echo "WARNING: Cannot connect to cluster: $cluster" >&2
done <<< "$clusters"
echo "INFO: Security validation completed" >&2
}
get_approval_token() {
# Implementation would retrieve approval system token
echo "approval-token-placeholder"
}
# Main plugin logic
main() {
local command="${1:-help}"
shift || true
case "$command" in
"secure-exec")
local kubectl_cmd="$1"
shift
validate_command "$kubectl_cmd" "$@"
# Check if approval is required
local requires_approval=false
case "$kubectl_cmd" in
delete|patch|replace)
requires_approval=true
;;
esac
local approval_id=""
if [[ "$requires_approval" == true ]]; then
approval_id=$(request_approval "$kubectl_cmd" "${1:-unknown}" "${*}")
fi
# Execute kubectl command
echo "INFO: Executing: kubectl $kubectl_cmd $*" >&2
local start_time
start_time=$(date +%s)
if kubectl "$kubectl_cmd" "$@"; then
local end_time
end_time=$(date +%s)
local duration=$((end_time - start_time))
audit_log "kubectl_exec" "success" "Command: $kubectl_cmd, Duration: ${duration}s, Approval: $approval_id"
echo "INFO: Command completed successfully" >&2
else
local exit_code=$?
audit_log "kubectl_exec" "failed" "Command: $kubectl_cmd, Exit code: $exit_code, Approval: $approval_id"
echo "ERROR: Command failed with exit code: $exit_code" >&2
exit $exit_code
fi
;;
"encrypt")
local input_file="$1"
local output_file="$2"
local passphrase="$3"
encrypt_kubeconfig "$input_file" "$output_file" "$passphrase"
;;
"decrypt")
local input_file="$1"
local output_file="$2"
local passphrase="$3"
decrypt_kubeconfig "$input_file" "$output_file" "$passphrase"
;;
"rotate")
local profile="${1:-$(kubectl config current-context)}"
rotate_credentials "$profile"
;;
"merge")
local output_path="$1"
shift
merge_kubeconfigs "$output_path" "$@"
;;
"validate")
local kubeconfig_path="${1:-$KUBECONFIG}"
validate_kubeconfig_security "$kubeconfig_path"
;;
"audit")
if [[ -f "$AUDIT_LOG_PATH" ]]; then
jq '.' "$AUDIT_LOG_PATH" | tail -n 50
else
echo "INFO: No audit log found" >&2
fi
;;
"help"|*)
cat <<EOF
Enterprise kubectl Security Plugin v$PLUGIN_VERSION
Usage: kubectl enterprise <command> [options]
Commands:
secure-exec <kubectl-command> [args...] Execute kubectl command with security validation
encrypt <input> <output> <passphrase> Encrypt kubeconfig file
decrypt <input> <output> <passphrase> Decrypt kubeconfig file
rotate [profile] Rotate credentials for profile
merge <output> <config1> [config2...] Merge multiple kubeconfig files
validate [kubeconfig] Validate kubeconfig security
audit Show recent audit logs
help Show this help message
Examples:
kubectl enterprise secure-exec get pods
kubectl enterprise encrypt ~/.kube/config ~/.kube/config.enc mypassword
kubectl enterprise rotate production-user
kubectl enterprise merge ~/.kube/merged-config ~/.kube/config1 ~/.kube/config2
kubectl enterprise validate ~/.kube/config
EOF
;;
esac
}
# Execute main function
main "$@"
This comprehensive guide covers enterprise kubectl and kubeconfig management with advanced security features, RBAC integration, audit logging, and operational best practices. Would you like me to continue with the remaining sections covering Docker optimization and multi-stage builds?