CSI Driver Development for Kubernetes: Complete Implementation Guide
The Container Storage Interface (CSI) standardizes storage provisioning in Kubernetes, enabling third-party storage vendors to develop plugins without modifying Kubernetes core. This comprehensive guide covers CSI driver architecture, complete implementation in Go, testing strategies, and production deployment patterns.
CSI Driver Development for Kubernetes: Complete Implementation Guide
Executive Summary
CSI drivers provide a standardized interface for storage systems to integrate with Kubernetes and other container orchestrators. This guide provides a complete walkthrough of CSI driver development, from understanding the CSI specification to implementing production-ready drivers with advanced features like snapshots, cloning, volume expansion, and topology-aware provisioning.
CSI Architecture Overview
CSI Components
┌─────────────────────────────────────────────────────────────────┐
│ Kubernetes Cluster │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Kubernetes Master │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ External Provisioner (Sidecar) │ │ │
│ │ │ • Watches PVC creation │ │ │
│ │ │ • Calls CSI Controller.CreateVolume() │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ External Attacher (Sidecar) │ │ │
│ │ │ • Watches VolumeAttachment creation │ │ │
│ │ │ • Calls CSI Controller.ControllerPublishVolume() │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ External Snapshotter (Sidecar) │ │ │
│ │ │ • Handles snapshot operations │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ CSI Controller Service (Your Driver) │ │ │
│ │ │ • CreateVolume, DeleteVolume │ │ │
│ │ │ • ControllerPublishVolume, ControllerUnpublishVolume│ │ │
│ │ │ • CreateSnapshot, DeleteSnapshot │ │ │
│ │ │ • ValidateVolumeCapabilities │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Worker Nodes │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Node Registrar (Sidecar) │ │ │
│ │ │ • Registers driver with kubelet │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ CSI Node Service (Your Driver) │ │ │
│ │ │ • NodeStageVolume, NodeUnstageVolume │ │ │
│ │ │ • NodePublishVolume, NodeUnpublishVolume │ │ │
│ │ │ • NodeGetVolumeStats │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Kubelet │ │ │
│ │ │ • Calls CSI gRPC services │ │ │
│ │ │ • Mounts volumes to pods │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
┌──────────────┴──────────────┐
│ │
┌───────────▼──────────┐ ┌────────────▼────────────┐
│ Storage Backend │ │ Storage Backend API │
│ (Block/File/Object) │ │ (REST/gRPC) │
└──────────────────────┘ └─────────────────────────┘
Complete CSI Driver Implementation
Project Structure
my-csi-driver/
├── cmd/
│ └── my-csi-driver/
│ └── main.go
├── pkg/
│ ├── driver/
│ │ ├── driver.go
│ │ ├── controller.go
│ │ ├── node.go
│ │ └── identity.go
│ ├── storage/
│ │ ├── storage.go
│ │ └── client.go
│ └── utils/
│ └── utils.go
├── deploy/
│ ├── kubernetes/
│ │ ├── controller.yaml
│ │ ├── node.yaml
│ │ ├── storageclass.yaml
│ │ └── rbac.yaml
│ └── helm/
│ └── my-csi-driver/
├── test/
│ ├── e2e/
│ └── sanity/
├── go.mod
├── go.sum
├── Dockerfile
└── README.md
Main Driver Implementation
// cmd/my-csi-driver/main.go
package main
import (
"flag"
"fmt"
"os"
"github.com/my-org/my-csi-driver/pkg/driver"
"k8s.io/klog/v2"
)
var (
endpoint = flag.String("endpoint", "unix:///csi/csi.sock", "CSI endpoint")
nodeID = flag.String("nodeid", "", "Node ID")
driverName = flag.String("drivername", "csi.example.com", "Name of the driver")
version = flag.String("version", "1.0.0", "Version of the driver")
storageAPIURL = flag.String("storage-api-url", "", "Storage backend API URL")
)
func main() {
klog.InitFlags(nil)
flag.Parse()
if *nodeID == "" {
klog.Fatal("--nodeid must be provided")
}
if *storageAPIURL == "" {
klog.Fatal("--storage-api-url must be provided")
}
// Create driver
drv, err := driver.NewDriver(
*driverName,
*version,
*nodeID,
*endpoint,
*storageAPIURL,
)
if err != nil {
klog.Fatalf("Failed to create driver: %v", err)
}
// Run driver
if err := drv.Run(); err != nil {
klog.Fatalf("Failed to run driver: %v", err)
}
}
Driver Core Implementation
// pkg/driver/driver.go
package driver
import (
"context"
"fmt"
"net"
"net/url"
"os"
"path"
"path/filepath"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"k8s.io/klog/v2"
)
const (
// Driver capabilities
PluginCapability_Service_CONTROLLER_SERVICE = csi.PluginCapability_Service_CONTROLLER_SERVICE
PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS = csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS
PluginCapability_VolumeExpansion_ONLINE = csi.PluginCapability_VolumeExpansion_ONLINE
PluginCapability_VolumeExpansion_OFFLINE = csi.PluginCapability_VolumeExpansion_OFFLINE
)
type Driver struct {
name string
version string
nodeID string
endpoint string
storageAPIURL string
// CSI services
identityServer csi.IdentityServer
controllerServer csi.ControllerServer
nodeServer csi.NodeServer
// gRPC server
server *grpc.Server
}
func NewDriver(name, version, nodeID, endpoint, storageAPIURL string) (*Driver, error) {
klog.Infof("Creating CSI driver: %s version %s", name, version)
d := &Driver{
name: name,
version: version,
nodeID: nodeID,
endpoint: endpoint,
storageAPIURL: storageAPIURL,
}
// Initialize services
d.identityServer = NewIdentityServer(d)
d.controllerServer = NewControllerServer(d)
d.nodeServer = NewNodeServer(d)
return d, nil
}
func (d *Driver) Run() error {
// Parse endpoint
u, err := url.Parse(d.endpoint)
if err != nil {
return fmt.Errorf("unable to parse endpoint: %v", err)
}
// Remove existing socket file
if u.Scheme == "unix" {
addr := path.Join(u.Host, filepath.FromSlash(u.Path))
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove unix domain socket %s: %v", addr, err)
}
}
// Create listener
listener, err := net.Listen(u.Scheme, path.Join(u.Host, filepath.FromSlash(u.Path)))
if err != nil {
return fmt.Errorf("failed to listen: %v", err)
}
// Create gRPC server
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(d.logGRPC),
}
d.server = grpc.NewServer(opts...)
// Register services
csi.RegisterIdentityServer(d.server, d.identityServer)
csi.RegisterControllerServer(d.server, d.controllerServer)
csi.RegisterNodeServer(d.server, d.nodeServer)
klog.Infof("Starting CSI driver server on %s", d.endpoint)
// Start serving
return d.server.Serve(listener)
}
func (d *Driver) Stop() {
klog.Info("Stopping CSI driver server")
d.server.GracefulStop()
}
func (d *Driver) logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
klog.V(3).Infof("GRPC call: %s", info.FullMethod)
klog.V(5).Infof("GRPC request: %+v", req)
resp, err := handler(ctx, req)
if err != nil {
klog.Errorf("GRPC error: %v", err)
} else {
klog.V(5).Infof("GRPC response: %+v", resp)
}
return resp, err
}
Identity Service Implementation
// pkg/driver/identity.go
package driver
import (
"context"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/protobuf/types/known/wrapperspb"
)
type IdentityServer struct {
Driver *Driver
}
func NewIdentityServer(driver *Driver) csi.IdentityServer {
return &IdentityServer{
Driver: driver,
}
}
func (ids *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
return &csi.GetPluginInfoResponse{
Name: ids.Driver.name,
VendorVersion: ids.Driver.version,
}, nil
}
func (ids *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
},
},
},
{
Type: &csi.PluginCapability_VolumeExpansion_{
VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
Type: csi.PluginCapability_VolumeExpansion_ONLINE,
},
},
},
},
}, nil
}
func (ids *IdentityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
// Check if storage backend is accessible
// This is a simplified example
return &csi.ProbeResponse{
Ready: wrapperspb.Bool(true),
}, nil
}
Controller Service Implementation
// pkg/driver/controller.go
package driver
import (
"context"
"fmt"
"strconv"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
)
type ControllerServer struct {
Driver *Driver
// Add storage client here
}
func NewControllerServer(driver *Driver) csi.ControllerServer {
return &ControllerServer{
Driver: driver,
}
}
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
// Validate request
if req.Name == "" {
return nil, status.Error(codes.InvalidArgument, "Volume name must be provided")
}
if req.VolumeCapabilities == nil || len(req.VolumeCapabilities) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume capabilities must be provided")
}
// Extract capacity
capacity := int64(10 * 1024 * 1024 * 1024) // Default 10GB
if req.CapacityRange != nil && req.CapacityRange.RequiredBytes > 0 {
capacity = req.CapacityRange.RequiredBytes
}
klog.Infof("Creating volume %s with capacity %d bytes", req.Name, capacity)
// Check if volume already exists
// In production, query your storage backend
volumeID := fmt.Sprintf("vol-%s", req.Name)
// Extract parameters
parameters := req.Parameters
fsType := parameters["fsType"]
if fsType == "" {
fsType = "ext4"
}
// Handle volume content source (cloning or snapshot restore)
var sourceVolumeID string
var sourceSnapshotID string
if req.VolumeContentSource != nil {
switch src := req.VolumeContentSource.Type.(type) {
case *csi.VolumeContentSource_Volume:
sourceVolumeID = src.Volume.VolumeId
klog.Infof("Cloning from volume: %s", sourceVolumeID)
// Implement volume cloning logic
case *csi.VolumeContentSource_Snapshot:
sourceSnapshotID = src.Snapshot.SnapshotId
klog.Infof("Restoring from snapshot: %s", sourceSnapshotID)
// Implement snapshot restore logic
}
}
// Create volume in storage backend
// This is where you'd call your storage API
// volume, err := cs.storageClient.CreateVolume(ctx, ...)
// Handle topology requirements
var topology []*csi.Topology
if req.AccessibilityRequirements != nil {
// Process topology requirements
for _, topo := range req.AccessibilityRequirements.Preferred {
topology = append(topology, topo)
}
}
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeID,
CapacityBytes: capacity,
VolumeContext: map[string]string{
"fsType": fsType,
"sourceVolumeID": sourceVolumeID,
"sourceSnapshotID": sourceSnapshotID,
},
ContentSource: req.VolumeContentSource,
AccessibleTopology: topology,
},
}, nil
}
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided")
}
klog.Infof("Deleting volume: %s", req.VolumeId)
// Delete volume from storage backend
// err := cs.storageClient.DeleteVolume(ctx, req.VolumeId)
return &csi.DeleteVolumeResponse{}, nil
}
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided")
}
if req.NodeId == "" {
return nil, status.Error(codes.InvalidArgument, "Node ID must be provided")
}
klog.Infof("Publishing volume %s to node %s", req.VolumeId, req.NodeId)
// Attach volume to node in storage backend
// This might involve:
// - iSCSI target creation
// - NFS export creation
// - Block device attachment
publishContext := map[string]string{
"devicePath": "/dev/disk/by-id/xxx",
// Add other metadata needed for NodeStageVolume
}
return &csi.ControllerPublishVolumeResponse{
PublishContext: publishContext,
}, nil
}
func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided")
}
klog.Infof("Unpublishing volume %s from node %s", req.VolumeId, req.NodeId)
// Detach volume from node in storage backend
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided")
}
if req.VolumeCapabilities == nil || len(req.VolumeCapabilities) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume capabilities must be provided")
}
// Validate if the volume supports requested capabilities
// Check if volume exists first
// volume, err := cs.storageClient.GetVolume(ctx, req.VolumeId)
// Validate each capability
for _, cap := range req.VolumeCapabilities {
if cap.GetMount() != nil {
// Validate mount volume capability
fsType := cap.GetMount().FsType
if fsType != "" && fsType != "ext4" && fsType != "xfs" {
return &csi.ValidateVolumeCapabilitiesResponse{
Message: fmt.Sprintf("Unsupported fsType: %s", fsType),
}, nil
}
} else if cap.GetBlock() != nil {
// Validate block volume capability
}
// Validate access mode
switch cap.AccessMode.Mode {
case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER:
// Supported
default:
return &csi.ValidateVolumeCapabilitiesResponse{
Message: "Unsupported access mode",
}, nil
}
}
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
VolumeCapabilities: req.VolumeCapabilities,
VolumeContext: req.VolumeContext,
Parameters: req.Parameters,
},
}, nil
}
func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
// List volumes from storage backend
// Handle pagination with req.StartingToken and req.MaxEntries
return &csi.ListVolumesResponse{
Entries: []*csi.ListVolumesResponse_Entry{
// Populate with volumes
},
NextToken: "", // Set if more results available
}, nil
}
func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
// Return available capacity
// Query storage backend for available space
availableCapacity := int64(1024 * 1024 * 1024 * 1024) // 1TB example
return &csi.GetCapacityResponse{
AvailableCapacity: availableCapacity,
}, nil
}
func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: []*csi.ControllerServiceCapability{
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_GET_CAPACITY,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
},
},
},
},
}, nil
}
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
if req.SourceVolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Source volume ID must be provided")
}
if req.Name == "" {
return nil, status.Error(codes.InvalidArgument, "Snapshot name must be provided")
}
klog.Infof("Creating snapshot %s from volume %s", req.Name, req.SourceVolumeId)
// Create snapshot in storage backend
snapshotID := fmt.Sprintf("snap-%s", req.Name)
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SnapshotId: snapshotID,
SourceVolumeId: req.SourceVolumeId,
CreationTime: nil, // Set to actual creation time
ReadyToUse: true,
SizeBytes: 0, // Set to actual size
},
}, nil
}
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
if req.SnapshotId == "" {
return nil, status.Error(codes.InvalidArgument, "Snapshot ID must be provided")
}
klog.Infof("Deleting snapshot: %s", req.SnapshotId)
// Delete snapshot from storage backend
return &csi.DeleteSnapshotResponse{}, nil
}
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
// List snapshots from storage backend
// Handle pagination
return &csi.ListSnapshotsResponse{
Entries: []*csi.ListSnapshotsResponse_Entry{
// Populate with snapshots
},
NextToken: "",
}, nil
}
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided")
}
if req.CapacityRange == nil {
return nil, status.Error(codes.InvalidArgument, "Capacity range must be provided")
}
newSize := req.CapacityRange.RequiredBytes
klog.Infof("Expanding volume %s to %d bytes", req.VolumeId, newSize)
// Expand volume in storage backend
return &csi.ControllerExpandVolumeResponse{
CapacityBytes: newSize,
NodeExpansionRequired: true, // Set to true if filesystem resize needed
}, nil
}
func (cs *ControllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "ControllerGetVolume not implemented")
}
Node Service Implementation
// pkg/driver/node.go
package driver
import (
"context"
"fmt"
"os"
"os/exec"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
)
type NodeServer struct {
Driver *Driver
mounter mount.Interface
}
func NewNodeServer(driver *Driver) csi.NodeServer {
return &NodeServer{
Driver: driver,
mounter: mount.New(""),
}
}
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided")
}
if req.StagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Staging target path must be provided")
}
if req.VolumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability must be provided")
}
klog.Infof("Staging volume %s to %s", req.VolumeId, req.StagingTargetPath)
// Create staging directory
if err := os.MkdirAll(req.StagingTargetPath, 0750); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create staging path: %v", err)
}
// Get device path from publish context
devicePath := req.PublishContext["devicePath"]
if devicePath == "" {
return nil, status.Error(codes.InvalidArgument, "Device path not found in publish context")
}
// Check if already staged
notMnt, err := ns.mounter.IsLikelyNotMountPoint(req.StagingTargetPath)
if err != nil {
if !os.IsNotExist(err) {
return nil, status.Errorf(codes.Internal, "Failed to check mount point: %v", err)
}
}
if !notMnt {
klog.Infof("Volume already staged at %s", req.StagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}
// Format device if needed
if req.VolumeCapability.GetMount() != nil {
fsType := req.VolumeCapability.GetMount().FsType
if fsType == "" {
fsType = "ext4"
}
// Check if device is formatted
existingFS, err := ns.getDeviceFS(devicePath)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to check filesystem: %v", err)
}
if existingFS == "" {
// Format device
klog.Infof("Formatting device %s with %s", devicePath, fsType)
if err := ns.formatDevice(devicePath, fsType); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to format device: %v", err)
}
}
// Mount device
mountOptions := req.VolumeCapability.GetMount().MountFlags
klog.Infof("Mounting %s to %s with options %v", devicePath, req.StagingTargetPath, mountOptions)
if err := ns.mounter.Mount(devicePath, req.StagingTargetPath, fsType, mountOptions); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to mount device: %v", err)
}
} else if req.VolumeCapability.GetBlock() != nil {
// For block volumes, create bind mount
// This is a simplified example
return nil, status.Error(codes.Unimplemented, "Block volumes not yet implemented")
}
return &csi.NodeStageVolumeResponse{}, nil
}
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided")
}
if req.StagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Staging target path must be provided")
}
klog.Infof("Unstaging volume %s from %s", req.VolumeId, req.StagingTargetPath)
// Check if mounted
notMnt, err := ns.mounter.IsLikelyNotMountPoint(req.StagingTargetPath)
if err != nil {
if os.IsNotExist(err) {
return &csi.NodeUnstageVolumeResponse{}, nil
}
return nil, status.Errorf(codes.Internal, "Failed to check mount point: %v", err)
}
if notMnt {
klog.Infof("Volume not mounted at %s", req.StagingTargetPath)
return &csi.NodeUnstageVolumeResponse{}, nil
}
// Unmount
if err := ns.mounter.Unmount(req.StagingTargetPath); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to unmount: %v", err)
}
return &csi.NodeUnstageVolumeResponse{}, nil
}
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided")
}
if req.TargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path must be provided")
}
if req.VolumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability must be provided")
}
if req.StagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Staging target path must be provided")
}
klog.Infof("Publishing volume %s to %s", req.VolumeId, req.TargetPath)
// Create target directory
if err := os.MkdirAll(req.TargetPath, 0750); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create target path: %v", err)
}
// Check if already published
notMnt, err := ns.mounter.IsLikelyNotMountPoint(req.TargetPath)
if err != nil {
if !os.IsNotExist(err) {
return nil, status.Errorf(codes.Internal, "Failed to check mount point: %v", err)
}
}
if !notMnt {
klog.Infof("Volume already published at %s", req.TargetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
// Bind mount from staging path to target path
mountOptions := []string{"bind"}
if req.Readonly {
mountOptions = append(mountOptions, "ro")
}
if req.VolumeCapability.GetMount() != nil {
mountOptions = append(mountOptions, req.VolumeCapability.GetMount().MountFlags...)
}
klog.Infof("Bind mounting %s to %s with options %v", req.StagingTargetPath, req.TargetPath, mountOptions)
if err := ns.mounter.Mount(req.StagingTargetPath, req.TargetPath, "", mountOptions); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to bind mount: %v", err)
}
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided")
}
if req.TargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path must be provided")
}
klog.Infof("Unpublishing volume %s from %s", req.VolumeId, req.TargetPath)
// Check if mounted
notMnt, err := ns.mounter.IsLikelyNotMountPoint(req.TargetPath)
if err != nil {
if os.IsNotExist(err) {
return &csi.NodeUnpublishVolumeResponse{}, nil
}
return nil, status.Errorf(codes.Internal, "Failed to check mount point: %v", err)
}
if notMnt {
klog.Infof("Volume not mounted at %s", req.TargetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
// Unmount
if err := ns.mounter.Unmount(req.TargetPath); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to unmount: %v", err)
}
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided")
}
if req.VolumePath == "" {
return nil, status.Error(codes.InvalidArgument, "Volume path must be provided")
}
// Get volume statistics
available, capacity, used, inodesFree, inodes, inodesUsed, err := ns.getVolumeStats(req.VolumePath)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to get volume stats: %v", err)
}
return &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Unit: csi.VolumeUsage_BYTES,
Available: available,
Total: capacity,
Used: used,
},
{
Unit: csi.VolumeUsage_INODES,
Available: inodesFree,
Total: inodes,
Used: inodesUsed,
},
},
}, nil
}
func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided")
}
if req.VolumePath == "" {
return nil, status.Error(codes.InvalidArgument, "Volume path must be provided")
}
klog.Infof("Expanding volume %s at %s", req.VolumeId, req.VolumePath)
// Resize filesystem
if err := ns.resizeFilesystem(req.VolumePath); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to resize filesystem: %v", err)
}
return &csi.NodeExpandVolumeResponse{
CapacityBytes: req.CapacityRange.RequiredBytes,
}, nil
}
func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
},
},
},
},
}, nil
}
func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
return &csi.NodeGetInfoResponse{
NodeId: ns.Driver.nodeID,
// Add topology information if needed
AccessibleTopology: &csi.Topology{
Segments: map[string]string{
"topology.kubernetes.io/zone": "zone-1",
"topology.kubernetes.io/region": "region-1",
},
},
}, nil
}
// Helper functions
func (ns *NodeServer) formatDevice(device, fsType string) error {
mkfsCmd := fmt.Sprintf("mkfs.%s", fsType)
_, err := exec.Command(mkfsCmd, device).CombinedOutput()
return err
}
func (ns *NodeServer) getDeviceFS(device string) (string, error) {
output, err := exec.Command("blkid", "-o", "value", "-s", "TYPE", device).CombinedOutput()
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
if exitErr.ExitCode() == 2 {
// No filesystem found
return "", nil
}
}
return "", err
}
return string(output), nil
}
func (ns *NodeServer) getVolumeStats(volumePath string) (available, capacity, used, inodesFree, inodes, inodesUsed int64, err error) {
// Use syscall to get filesystem stats
// This is a simplified version
return 0, 0, 0, 0, 0, 0, nil
}
func (ns *NodeServer) resizeFilesystem(volumePath string) error {
// Detect filesystem type
// Call resize2fs for ext4 or xfs_growfs for xfs
_, err := exec.Command("resize2fs", volumePath).CombinedOutput()
return err
}
Due to length limitations, I’ll provide the deployment configurations and testing in a summary format:
Deployment Configuration
# deploy/kubernetes/controller.yaml
# StatefulSet for CSI controller with sidecars:
# - external-provisioner
# - external-attacher
# - external-snapshotter
# - external-resizer
# - liveness-probe
# deploy/kubernetes/node.yaml
# DaemonSet for CSI node with sidecars:
# - node-driver-registrar
# - liveness-probe
# deploy/kubernetes/storageclass.yaml
# StorageClass definitions with various parameters
# deploy/kubernetes/rbac.yaml
# RBAC permissions for controller and node
Conclusion
CSI driver development enables custom storage integration with Kubernetes. Key points:
- Three Services: Identity, Controller, and Node services
- gRPC Protocol: Standard interface for all CSI drivers
- Sidecars: Kubernetes provides helper containers
- Testing: Sanity tests and E2E validation required
- Production Features: Snapshots, cloning, expansion, topology
This provides a foundation for building production-grade CSI drivers.