Advanced Network Monitoring with Flow Analysis: Enterprise Observability Guide
Advanced network monitoring and flow analysis provide critical visibility into network behavior, performance bottlenecks, and security threats. This comprehensive guide explores sophisticated monitoring architectures, flow analysis techniques, and enterprise-grade observability solutions for production network environments.
Advanced Network Flow Analysis
Section 1: NetFlow and sFlow Implementation
NetFlow and sFlow protocols enable detailed network traffic analysis by providing flow-level visibility into network communications and performance metrics.
High-Performance NetFlow Collector
package netflow
import (
"net"
"sync"
"time"
"encoding/binary"
"context"
)
type NetFlowCollector struct {
ListenAddress string
Port int
Workers int
FlowProcessor *FlowProcessor
Storage FlowStorage
Analytics *FlowAnalytics
Aggregator *FlowAggregator
Exporter *FlowExporter
MetricsEngine *MetricsEngine
SecurityEngine *SecurityEngine
mutex sync.RWMutex
}
type FlowRecord struct {
Version uint16
Count uint16
SysUptime uint32
UnixSecs uint32
UnixNsecs uint32
FlowSequence uint32
EngineType uint8
EngineID uint8
SamplingInterval uint16
Flows []Flow
}
type Flow struct {
SrcAddr net.IP
DstAddr net.IP
NextHop net.IP
Input uint16
Output uint16
Packets uint32
Octets uint32
First uint32
Last uint32
SrcPort uint16
DstPort uint16
Pad1 uint8
TCPFlags uint8
Protocol uint8
TOS uint8
SrcAS uint16
DstAS uint16
SrcMask uint8
DstMask uint8
Pad2 uint16
}
func NewNetFlowCollector(config *CollectorConfig) *NetFlowCollector {
return &NetFlowCollector{
ListenAddress: config.ListenAddress,
Port: config.Port,
Workers: config.Workers,
FlowProcessor: NewFlowProcessor(config.ProcessorConfig),
Storage: NewFlowStorage(config.StorageConfig),
Analytics: NewFlowAnalytics(config.AnalyticsConfig),
Aggregator: NewFlowAggregator(config.AggregatorConfig),
Exporter: NewFlowExporter(config.ExporterConfig),
MetricsEngine: NewMetricsEngine(),
SecurityEngine: NewSecurityEngine(),
}
}
func (nfc *NetFlowCollector) Start(ctx context.Context) error {
// Start UDP listener
addr, err := net.ResolveUDPAddr("udp",
fmt.Sprintf("%s:%d", nfc.ListenAddress, nfc.Port))
if err != nil {
return err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return err
}
defer conn.Close()
// Start worker goroutines
packetChan := make(chan *RawPacket, 10000)
for i := 0; i < nfc.Workers; i++ {
go nfc.packetWorker(ctx, packetChan)
}
// Start analytics engine
go nfc.Analytics.Start(ctx)
// Start aggregation engine
go nfc.Aggregator.Start(ctx)
// Packet receiving loop
buffer := make([]byte, 65536)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
n, addr, err := conn.ReadFromUDP(buffer)
if err != nil {
continue
}
packet := &RawPacket{
Data: make([]byte, n),
Source: addr,
Timestamp: time.Now(),
}
copy(packet.Data, buffer[:n])
select {
case packetChan <- packet:
default:
// Channel full, drop packet
nfc.MetricsEngine.IncrementDroppedPackets()
}
}
}
}
func (nfc *NetFlowCollector) packetWorker(ctx context.Context,
packetChan <-chan *RawPacket) {
for {
select {
case <-ctx.Done():
return
case packet := <-packetChan:
nfc.processPacket(packet)
}
}
}
func (nfc *NetFlowCollector) processPacket(packet *RawPacket) {
// Parse NetFlow packet
flowRecord, err := nfc.parseNetFlowPacket(packet.Data)
if err != nil {
nfc.MetricsEngine.IncrementParseErrors()
return
}
// Enrich flows with additional metadata
for i := range flowRecord.Flows {
nfc.enrichFlow(&flowRecord.Flows[i], packet)
}
// Security analysis
threats := nfc.SecurityEngine.AnalyzeFlows(flowRecord.Flows)
if len(threats) > 0 {
nfc.handleSecurityThreats(threats)
}
// Store flows
if err := nfc.Storage.StoreBatch(flowRecord.Flows); err != nil {
nfc.MetricsEngine.IncrementStorageErrors()
}
// Real-time analytics
nfc.Analytics.ProcessFlows(flowRecord.Flows)
// Aggregation
nfc.Aggregator.AddFlows(flowRecord.Flows)
// Update metrics
nfc.MetricsEngine.IncrementProcessedFlows(len(flowRecord.Flows))
}
func (nfc *NetFlowCollector) parseNetFlowPacket(data []byte) (*FlowRecord, error) {
if len(data) < 24 {
return nil, fmt.Errorf("packet too short")
}
record := &FlowRecord{
Version: binary.BigEndian.Uint16(data[0:2]),
Count: binary.BigEndian.Uint16(data[2:4]),
SysUptime: binary.BigEndian.Uint32(data[4:8]),
UnixSecs: binary.BigEndian.Uint32(data[8:12]),
UnixNsecs: binary.BigEndian.Uint32(data[12:16]),
FlowSequence: binary.BigEndian.Uint32(data[16:20]),
EngineType: data[20],
EngineID: data[21],
SamplingInterval: binary.BigEndian.Uint16(data[22:24]),
}
// Parse flows based on version
switch record.Version {
case 5:
return nfc.parseNetFlowV5(record, data[24:])
case 9:
return nfc.parseNetFlowV9(record, data[24:])
case 10:
return nfc.parseIPFIX(record, data[24:])
default:
return nil, fmt.Errorf("unsupported NetFlow version: %d", record.Version)
}
}
func (nfc *NetFlowCollector) enrichFlow(flow *Flow, packet *RawPacket) {
// GeoIP enrichment
srcCountry, srcCity := nfc.Analytics.GeoIP.Lookup(flow.SrcAddr)
dstCountry, dstCity := nfc.Analytics.GeoIP.Lookup(flow.DstAddr)
flow.SrcCountry = srcCountry
flow.SrcCity = srcCity
flow.DstCountry = dstCountry
flow.DstCity = dstCity
// AS enrichment
flow.SrcASInfo = nfc.Analytics.ASInfo.Lookup(flow.SrcAS)
flow.DstASInfo = nfc.Analytics.ASInfo.Lookup(flow.DstAS)
// Application classification
flow.Application = nfc.Analytics.AppClassifier.Classify(flow)
// Network enrichment
flow.NetworkSegment = nfc.Analytics.NetworkClassifier.ClassifySegment(flow)
}
// sFlow Collector Implementation
type SFlowCollector struct {
NetFlowCollector
SamplingRate uint32
CounterInterval uint32
}
func (sfc *SFlowCollector) processSFlowPacket(packet *RawPacket) {
// Parse sFlow header
header, err := sfc.parseSFlowHeader(packet.Data)
if err != nil {
return
}
offset := 20 // sFlow header size
for i := 0; i < int(header.NumSamples); i++ {
sample, nextOffset, err := sfc.parseSFlowSample(packet.Data[offset:])
if err != nil {
break
}
switch sample.Type {
case SFlowSampleTypeFlow:
sfc.processFlowSample(sample.(*FlowSample))
case SFlowSampleTypeCounter:
sfc.processCounterSample(sample.(*CounterSample))
}
offset += nextOffset
}
}
type FlowAnalytics struct {
GeoIP *GeoIPDatabase
ASInfo *ASInfoDatabase
AppClassifier *ApplicationClassifier
NetworkClassifier *NetworkClassifier
AnomalyDetector *AnomalyDetector
ThreatDetector *ThreatDetector
PerformanceAnalyzer *PerformanceAnalyzer
TopTalkers *TopTalkersAnalyzer
}
func (fa *FlowAnalytics) ProcessFlows(flows []Flow) {
for _, flow := range flows {
// Real-time analytics
fa.analyzeFlowRealtime(&flow)
// Update top talkers
fa.TopTalkers.UpdateFlow(&flow)
// Performance analysis
fa.PerformanceAnalyzer.AnalyzeFlow(&flow)
// Anomaly detection
if anomaly := fa.AnomalyDetector.CheckFlow(&flow); anomaly != nil {
fa.handleAnomaly(anomaly)
}
// Threat detection
if threat := fa.ThreatDetector.CheckFlow(&flow); threat != nil {
fa.handleThreat(threat)
}
}
}
Section 2: Real-Time Flow Analytics and Anomaly Detection
Implementing sophisticated analytics engines that provide real-time insights and detect network anomalies.
Machine Learning-Based Anomaly Detection
class NetworkAnomalyDetector:
def __init__(self):
self.baseline_models = {}
self.ml_models = {
'isolation_forest': IsolationForestDetector(),
'autoencoder': AutoencoderDetector(),
'lstm': LSTMDetector(),
'statistical': StatisticalDetector()
}
self.feature_extractor = FlowFeatureExtractor()
self.ensemble_detector = EnsembleDetector(self.ml_models)
def detect_anomalies(self, flows):
"""Detect anomalies in network flows using multiple ML approaches"""
anomalies = []
# Extract features from flows
features = self.feature_extractor.extract_batch_features(flows)
# Run ensemble detection
ensemble_results = self.ensemble_detector.detect(features)
# Analyze results and create anomaly reports
for i, flow in enumerate(flows):
result = ensemble_results[i]
if result.is_anomaly:
anomaly = NetworkAnomaly(
flow=flow,
anomaly_score=result.score,
detection_methods=result.methods,
confidence=result.confidence,
anomaly_type=result.type,
explanation=result.explanation
)
anomalies.append(anomaly)
return anomalies
def train_baseline_models(self, historical_flows):
"""Train baseline models on historical normal traffic"""
# Group flows by network segment and time period
training_data = self.prepare_training_data(historical_flows)
for segment, data in training_data.items():
features = self.feature_extractor.extract_batch_features(data)
# Train multiple models for this segment
segment_models = {}
for model_name, model in self.ml_models.items():
trained_model = model.train(features)
segment_models[model_name] = trained_model
self.baseline_models[segment] = segment_models
class FlowFeatureExtractor:
def extract_features(self, flow):
"""Extract comprehensive features from network flow"""
features = {
# Basic flow characteristics
'duration': flow.last - flow.first,
'packets': flow.packets,
'bytes': flow.octets,
'bytes_per_packet': flow.octets / max(1, flow.packets),
'packets_per_second': flow.packets / max(1, (flow.last - flow.first)),
'bytes_per_second': flow.octets / max(1, (flow.last - flow.first)),
# Protocol characteristics
'protocol': flow.protocol,
'src_port': flow.src_port,
'dst_port': flow.dst_port,
'tcp_flags': flow.tcp_flags,
'tos': flow.tos,
# Network topology
'src_as': flow.src_as,
'dst_as': flow.dst_as,
'src_mask': flow.src_mask,
'dst_mask': flow.dst_mask,
# Derived features
'port_entropy': self.calculate_port_entropy(flow),
'flow_asymmetry': self.calculate_flow_asymmetry(flow),
'temporal_pattern': self.extract_temporal_pattern(flow),
# Contextual features
'hour_of_day': time.localtime(flow.unix_secs).tm_hour,
'day_of_week': time.localtime(flow.unix_secs).tm_wday,
'is_internal': self.is_internal_flow(flow),
'is_encrypted': self.is_encrypted_flow(flow),
# Application-specific features
'application_signature': self.extract_app_signature(flow),
'payload_entropy': self.calculate_payload_entropy(flow),
'flow_direction': self.determine_flow_direction(flow)
}
return features
def calculate_port_entropy(self, flow):
"""Calculate entropy of port usage patterns"""
# Simplified entropy calculation
ports = [flow.src_port, flow.dst_port]
port_counts = {}
for port in ports:
port_counts[port] = port_counts.get(port, 0) + 1
total = sum(port_counts.values())
entropy = 0
for count in port_counts.values():
probability = count / total
if probability > 0:
entropy -= probability * math.log2(probability)
return entropy
class AutoencoderDetector:
def __init__(self):
self.model = None
self.scaler = StandardScaler()
self.threshold = None
def train(self, features):
"""Train autoencoder for anomaly detection"""
# Normalize features
normalized_features = self.scaler.fit_transform(features)
# Build autoencoder model
input_dim = normalized_features.shape[1]
# Encoder
input_layer = Input(shape=(input_dim,))
encoded = Dense(128, activation='relu')(input_layer)
encoded = Dense(64, activation='relu')(encoded)
encoded = Dense(32, activation='relu')(encoded)
# Decoder
decoded = Dense(64, activation='relu')(encoded)
decoded = Dense(128, activation='relu')(decoded)
decoded = Dense(input_dim, activation='sigmoid')(decoded)
# Autoencoder model
autoencoder = Model(input_layer, decoded)
autoencoder.compile(optimizer='adam', loss='mse')
# Train model
autoencoder.fit(
normalized_features,
normalized_features,
epochs=100,
batch_size=32,
validation_split=0.1,
verbose=0
)
# Calculate reconstruction threshold
reconstructions = autoencoder.predict(normalized_features)
reconstruction_errors = np.mean(np.square(normalized_features - reconstructions), axis=1)
self.threshold = np.percentile(reconstruction_errors, 95)
self.model = autoencoder
return self
def detect(self, features):
"""Detect anomalies using trained autoencoder"""
if self.model is None:
raise ValueError("Model not trained")
# Normalize features
normalized_features = self.scaler.transform(features)
# Get reconstructions
reconstructions = self.model.predict(normalized_features)
# Calculate reconstruction errors
reconstruction_errors = np.mean(np.square(normalized_features - reconstructions), axis=1)
# Detect anomalies
anomalies = reconstruction_errors > self.threshold
scores = reconstruction_errors / self.threshold
return AnomalyResults(
anomalies=anomalies,
scores=scores,
method='autoencoder'
)
class NetworkThreatDetector:
def __init__(self):
self.threat_signatures = ThreatSignatureDatabase()
self.behavioral_analyzer = BehavioralAnalyzer()
self.ml_classifier = ThreatMLClassifier()
def detect_threats(self, flows):
"""Detect security threats in network flows"""
threats = []
for flow in flows:
# Signature-based detection
signature_threats = self.detect_signature_threats(flow)
threats.extend(signature_threats)
# Behavioral analysis
behavioral_threats = self.behavioral_analyzer.analyze(flow)
threats.extend(behavioral_threats)
# ML-based classification
ml_threats = self.ml_classifier.classify(flow)
threats.extend(ml_threats)
return threats
def detect_signature_threats(self, flow):
"""Detect threats using signature-based rules"""
threats = []
# Port scanning detection
if self.is_port_scan(flow):
threats.append(SecurityThreat(
type='port_scan',
severity='medium',
source_ip=flow.src_addr,
target_ip=flow.dst_addr,
evidence={'ports_scanned': flow.dst_port}
))
# DDoS detection
if self.is_ddos_attack(flow):
threats.append(SecurityThreat(
type='ddos',
severity='high',
source_ip=flow.src_addr,
target_ip=flow.dst_addr,
evidence={'packet_rate': flow.packets_per_second}
))
# Malware communication detection
if self.is_malware_communication(flow):
threats.append(SecurityThreat(
type='malware',
severity='high',
source_ip=flow.src_addr,
target_ip=flow.dst_addr,
evidence={'suspicious_patterns': True}
))
return threats
class FlowPerformanceAnalyzer:
def __init__(self):
self.latency_analyzer = LatencyAnalyzer()
self.throughput_analyzer = ThroughputAnalyzer()
self.jitter_analyzer = JitterAnalyzer()
self.packet_loss_analyzer = PacketLossAnalyzer()
def analyze_performance(self, flows):
"""Analyze network performance from flow data"""
performance_metrics = {}
# Group flows by application/service
grouped_flows = self.group_flows_by_application(flows)
for app, app_flows in grouped_flows.items():
metrics = {
'latency': self.latency_analyzer.analyze(app_flows),
'throughput': self.throughput_analyzer.analyze(app_flows),
'jitter': self.jitter_analyzer.analyze(app_flows),
'packet_loss': self.packet_loss_analyzer.analyze(app_flows)
}
performance_metrics[app] = metrics
return performance_metrics
def identify_performance_issues(self, performance_metrics):
"""Identify performance issues and bottlenecks"""
issues = []
for app, metrics in performance_metrics.items():
# High latency detection
if metrics['latency'].average > 100: # ms
issues.append(PerformanceIssue(
type='high_latency',
application=app,
severity=self.calculate_latency_severity(metrics['latency']),
details=metrics['latency']
))
# Low throughput detection
if metrics['throughput'].average < 1000000: # 1 Mbps
issues.append(PerformanceIssue(
type='low_throughput',
application=app,
severity=self.calculate_throughput_severity(metrics['throughput']),
details=metrics['throughput']
))
# High jitter detection
if metrics['jitter'].coefficient_of_variation > 0.5:
issues.append(PerformanceIssue(
type='high_jitter',
application=app,
severity='medium',
details=metrics['jitter']
))
# Packet loss detection
if metrics['packet_loss'].rate > 0.01: # 1%
issues.append(PerformanceIssue(
type='packet_loss',
application=app,
severity='high',
details=metrics['packet_loss']
))
return issues
Section 3: Flow Aggregation and Storage
Implementing efficient flow aggregation and storage systems that handle massive volumes of flow data.
High-Performance Flow Storage
package storage
import (
"time"
"sync"
"compress/gzip"
"encoding/json"
)
type FlowStorage interface {
StoreBatch(flows []Flow) error
Query(query *FlowQuery) ([]Flow, error)
Aggregate(aggregation *AggregationQuery) (*AggregationResult, error)
GetTopTalkers(timeRange TimeRange, limit int) ([]TopTalker, error)
GetApplicationStats(timeRange TimeRange) (*ApplicationStats, error)
}
type TimeSeriesFlowStorage struct {
TimeSeries *TimeSeriesDB
BatchProcessor *BatchProcessor
Compressor *FlowCompressor
Indexer *FlowIndexer
MetricsCache *MetricsCache
Configuration *StorageConfig
mutex sync.RWMutex
}
func NewTimeSeriesFlowStorage(config *StorageConfig) *TimeSeriesFlowStorage {
return &TimeSeriesFlowStorage{
TimeSeries: NewTimeSeriesDB(config.TimeSeriesConfig),
BatchProcessor: NewBatchProcessor(config.BatchConfig),
Compressor: NewFlowCompressor(config.CompressionConfig),
Indexer: NewFlowIndexer(config.IndexConfig),
MetricsCache: NewMetricsCache(config.CacheConfig),
Configuration: config,
}
}
func (tsfs *TimeSeriesFlowStorage) StoreBatch(flows []Flow) error {
// Preprocess flows
processedFlows := tsfs.preprocessFlows(flows)
// Create storage batches
batches := tsfs.createStorageBatches(processedFlows)
// Store batches in parallel
var wg sync.WaitGroup
errorChan := make(chan error, len(batches))
for _, batch := range batches {
wg.Add(1)
go func(b *FlowBatch) {
defer wg.Done()
if err := tsfs.storeBatch(b); err != nil {
errorChan <- err
}
}(batch)
}
wg.Wait()
close(errorChan)
// Check for errors
for err := range errorChan {
if err != nil {
return err
}
}
return nil
}
func (tsfs *TimeSeriesFlowStorage) preprocessFlows(flows []Flow) []ProcessedFlow {
processed := make([]ProcessedFlow, len(flows))
for i, flow := range flows {
// Normalize timestamps
normalizedTime := tsfs.normalizeTimestamp(flow.UnixSecs)
// Calculate derived metrics
duration := flow.Last - flow.First
bytesPerSecond := float64(flow.Octets) / max(1.0, float64(duration))
packetsPerSecond := float64(flow.Packets) / max(1.0, float64(duration))
// Create processed flow
processed[i] = ProcessedFlow{
Flow: flow,
NormalizedTime: normalizedTime,
Duration: duration,
BytesPerSecond: bytesPerSecond,
PacketsPerSecond: packetsPerSecond,
FlowHash: tsfs.calculateFlowHash(flow),
}
}
return processed
}
func (tsfs *TimeSeriesFlowStorage) createStorageBatches(flows []ProcessedFlow) []*FlowBatch {
// Group flows by time bucket and storage tier
buckets := make(map[string][]ProcessedFlow)
for _, flow := range flows {
bucketKey := tsfs.calculateBucketKey(flow)
buckets[bucketKey] = append(buckets[bucketKey], flow)
}
// Create batches
var batches []*FlowBatch
for bucketKey, bucketFlows := range buckets {
batch := &FlowBatch{
BucketKey: bucketKey,
Flows: bucketFlows,
Timestamp: time.Now(),
Compression: tsfs.selectCompressionMethod(bucketFlows),
}
batches = append(batches, batch)
}
return batches
}
func (tsfs *TimeSeriesFlowStorage) storeBatch(batch *FlowBatch) error {
// Compress batch if configured
var data []byte
var err error
if batch.Compression != CompressionNone {
data, err = tsfs.Compressor.Compress(batch.Flows, batch.Compression)
if err != nil {
return err
}
} else {
data, err = json.Marshal(batch.Flows)
if err != nil {
return err
}
}
// Store in time series database
record := &TimeSeriesRecord{
Timestamp: batch.Timestamp,
Data: data,
Metadata: map[string]interface{}{
"bucket_key": batch.BucketKey,
"flow_count": len(batch.Flows),
"compression": batch.Compression,
"data_size": len(data),
},
}
if err := tsfs.TimeSeries.Insert(record); err != nil {
return err
}
// Update indexes
for _, flow := range batch.Flows {
if err := tsfs.Indexer.IndexFlow(flow, record.ID); err != nil {
// Log error but don't fail the entire batch
continue
}
}
// Update metrics cache
tsfs.updateMetricsCache(batch.Flows)
return nil
}
type FlowAggregator struct {
AggregationEngine *AggregationEngine
TimeWindowManager *TimeWindowManager
MetricsCalculator *MetricsCalculator
OutputManager *OutputManager
Configuration *AggregatorConfig
}
func (fa *FlowAggregator) Start(ctx context.Context) {
ticker := time.NewTicker(fa.Configuration.AggregationInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fa.performAggregation()
}
}
}
func (fa *FlowAggregator) performAggregation() {
// Get time windows for aggregation
windows := fa.TimeWindowManager.GetActiveWindows()
for _, window := range windows {
// Get flows for this window
flows, err := fa.getFlowsForWindow(window)
if err != nil {
continue
}
// Perform aggregations
aggregations := fa.calculateAggregations(flows, window)
// Store aggregated data
fa.storeAggregations(aggregations, window)
// Update real-time metrics
fa.updateRealtimeMetrics(aggregations)
}
}
func (fa *FlowAggregator) calculateAggregations(flows []Flow,
window *TimeWindow) map[string]*Aggregation {
aggregations := make(map[string]*Aggregation)
// Traffic volume aggregations
aggregations["traffic_volume"] = fa.aggregateTrafficVolume(flows)
// Top talkers aggregations
aggregations["top_talkers"] = fa.aggregateTopTalkers(flows)
// Application aggregations
aggregations["applications"] = fa.aggregateApplications(flows)
// Protocol aggregations
aggregations["protocols"] = fa.aggregateProtocols(flows)
// Geographic aggregations
aggregations["geography"] = fa.aggregateGeography(flows)
// Security aggregations
aggregations["security"] = fa.aggregateSecurity(flows)
return aggregations
}
func (fa *FlowAggregator) aggregateTrafficVolume(flows []Flow) *Aggregation {
var totalBytes, totalPackets uint64
var totalDuration uint32
flowCount := len(flows)
for _, flow := range flows {
totalBytes += uint64(flow.Octets)
totalPackets += uint64(flow.Packets)
totalDuration += flow.Last - flow.First
}
avgBytesPerFlow := float64(totalBytes) / float64(flowCount)
avgPacketsPerFlow := float64(totalPackets) / float64(flowCount)
avgDuration := float64(totalDuration) / float64(flowCount)
return &Aggregation{
Type: "traffic_volume",
Metrics: map[string]interface{}{
"total_bytes": totalBytes,
"total_packets": totalPackets,
"total_flows": flowCount,
"avg_bytes_per_flow": avgBytesPerFlow,
"avg_packets_per_flow": avgPacketsPerFlow,
"avg_duration": avgDuration,
"bytes_per_second": float64(totalBytes) / float64(totalDuration),
"packets_per_second": float64(totalPackets) / float64(totalDuration),
},
}
}
func (fa *FlowAggregator) aggregateTopTalkers(flows []Flow) *Aggregation {
srcTalkers := make(map[string]*TalkerStats)
dstTalkers := make(map[string]*TalkerStats)
for _, flow := range flows {
srcIP := flow.SrcAddr.String()
dstIP := flow.DstAddr.String()
// Source talkers
if stats, exists := srcTalkers[srcIP]; exists {
stats.Bytes += uint64(flow.Octets)
stats.Packets += uint64(flow.Packets)
stats.Flows++
} else {
srcTalkers[srcIP] = &TalkerStats{
IP: srcIP,
Bytes: uint64(flow.Octets),
Packets: uint64(flow.Packets),
Flows: 1,
}
}
// Destination talkers
if stats, exists := dstTalkers[dstIP]; exists {
stats.Bytes += uint64(flow.Octets)
stats.Packets += uint64(flow.Packets)
stats.Flows++
} else {
dstTalkers[dstIP] = &TalkerStats{
IP: dstIP,
Bytes: uint64(flow.Octets),
Packets: uint64(flow.Packets),
Flows: 1,
}
}
}
// Sort and get top talkers
topSrcTalkers := fa.getTopTalkers(srcTalkers, 20)
topDstTalkers := fa.getTopTalkers(dstTalkers, 20)
return &Aggregation{
Type: "top_talkers",
Metrics: map[string]interface{}{
"top_source_talkers": topSrcTalkers,
"top_destination_talkers": topDstTalkers,
},
}
}
Section 4: Network Visualization and Dashboards
Creating comprehensive visualization and dashboard systems for network monitoring and analysis.
Advanced Network Visualization
class NetworkVisualizationEngine:
def __init__(self):
self.topology_mapper = NetworkTopologyMapper()
self.flow_visualizer = FlowVisualizer()
self.heat_map_generator = HeatMapGenerator()
self.graph_analyzer = NetworkGraphAnalyzer()
self.dashboard_engine = DashboardEngine()
def create_network_topology_view(self, flows, time_range):
"""Create network topology visualization from flow data"""
# Extract network entities
nodes = self.extract_network_nodes(flows)
edges = self.extract_network_edges(flows)
# Calculate node metrics
node_metrics = self.calculate_node_metrics(nodes, flows)
# Calculate edge metrics
edge_metrics = self.calculate_edge_metrics(edges, flows)
# Create network graph
network_graph = self.create_network_graph(
nodes, edges, node_metrics, edge_metrics
)
# Apply layout algorithm
positioned_graph = self.apply_layout_algorithm(
network_graph, algorithm='force_directed'
)
return positioned_graph
def extract_network_nodes(self, flows):
"""Extract unique network nodes from flows"""
nodes = set()
for flow in flows:
# Add source and destination IPs
nodes.add(flow.src_addr)
nodes.add(flow.dst_addr)
# Enrich nodes with metadata
enriched_nodes = []
for node_ip in nodes:
node = NetworkNode(
ip=node_ip,
hostname=self.resolve_hostname(node_ip),
geo_location=self.get_geo_location(node_ip),
as_info=self.get_as_info(node_ip),
device_type=self.classify_device_type(node_ip),
network_segment=self.classify_network_segment(node_ip)
)
enriched_nodes.append(node)
return enriched_nodes
def create_traffic_flow_visualization(self, flows, aggregation_level='subnet'):
"""Create Sankey diagram for traffic flows"""
# Aggregate flows based on level
aggregated_flows = self.aggregate_flows_for_visualization(
flows, aggregation_level
)
# Create flow hierarchy
flow_hierarchy = self.create_flow_hierarchy(aggregated_flows)
# Generate Sankey diagram data
sankey_data = self.generate_sankey_data(flow_hierarchy)
return sankey_data
def generate_geographic_heat_map(self, flows):
"""Generate geographic heat map of network traffic"""
# Extract geographic data from flows
geo_data = []
for flow in flows:
src_geo = self.get_geo_location(flow.src_addr)
dst_geo = self.get_geo_location(flow.dst_addr)
if src_geo and dst_geo:
geo_data.append({
'src_lat': src_geo.latitude,
'src_lon': src_geo.longitude,
'dst_lat': dst_geo.latitude,
'dst_lon': dst_geo.longitude,
'bytes': flow.octets,
'packets': flow.packets
})
# Generate heat map data
heat_map_data = self.heat_map_generator.generate_heat_map(
geo_data, metric='bytes'
)
return heat_map_data
def create_time_series_chart(self, flows, metric='bytes', interval='5min'):
"""Create time series chart for network metrics"""
# Group flows by time intervals
time_buckets = self.group_flows_by_time(flows, interval)
# Calculate metrics for each time bucket
time_series_data = []
for timestamp, bucket_flows in time_buckets.items():
if metric == 'bytes':
value = sum(flow.octets for flow in bucket_flows)
elif metric == 'packets':
value = sum(flow.packets for flow in bucket_flows)
elif metric == 'flows':
value = len(bucket_flows)
elif metric == 'unique_sources':
value = len(set(flow.src_addr for flow in bucket_flows))
elif metric == 'unique_destinations':
value = len(set(flow.dst_addr for flow in bucket_flows))
time_series_data.append({
'timestamp': timestamp,
'value': value
})
return time_series_data
class RealTimeDashboard:
def __init__(self):
self.websocket_manager = WebSocketManager()
self.data_aggregator = RealTimeAggregator()
self.alert_manager = AlertManager()
self.widget_engine = WidgetEngine()
def start_real_time_updates(self):
"""Start real-time dashboard updates"""
# Start data collection
self.data_aggregator.start()
# Start WebSocket server for real-time updates
self.websocket_manager.start()
# Start update loop
self.start_update_loop()
def start_update_loop(self):
"""Main update loop for real-time data"""
async def update_loop():
while True:
try:
# Get latest data
latest_data = self.data_aggregator.get_latest_data()
# Process widgets
widget_updates = {}
for widget_id, widget_config in self.get_active_widgets().items():
update = self.widget_engine.process_widget(
widget_config, latest_data
)
widget_updates[widget_id] = update
# Check for alerts
alerts = self.alert_manager.check_alerts(latest_data)
# Send updates to connected clients
update_message = {
'type': 'dashboard_update',
'widgets': widget_updates,
'alerts': alerts,
'timestamp': time.time()
}
await self.websocket_manager.broadcast(update_message)
# Wait for next update cycle
await asyncio.sleep(5)
except Exception as e:
print(f"Update loop error: {e}")
await asyncio.sleep(10)
asyncio.create_task(update_loop())
def create_custom_widget(self, widget_config):
"""Create custom dashboard widget"""
widget = DashboardWidget(
id=widget_config.id,
type=widget_config.type,
title=widget_config.title,
data_source=widget_config.data_source,
visualization_type=widget_config.visualization_type,
filters=widget_config.filters,
refresh_interval=widget_config.refresh_interval
)
return widget
def generate_automated_insights(self, flow_data):
"""Generate automated insights from flow data"""
insights = []
# Traffic pattern insights
traffic_patterns = self.analyze_traffic_patterns(flow_data)
if traffic_patterns.has_anomalies():
insights.append(Insight(
type='traffic_anomaly',
message=f"Unusual traffic pattern detected: {traffic_patterns.description}",
severity='medium',
recommendation=traffic_patterns.recommendation
))
# Performance insights
performance_issues = self.identify_performance_issues(flow_data)
for issue in performance_issues:
insights.append(Insight(
type='performance',
message=issue.description,
severity=issue.severity,
recommendation=issue.recommendation
))
# Security insights
security_issues = self.identify_security_issues(flow_data)
for issue in security_issues:
insights.append(Insight(
type='security',
message=issue.description,
severity=issue.severity,
recommendation=issue.recommendation
))
return insights
class AlertingEngine:
def __init__(self):
self.alert_rules = {}
self.notification_manager = NotificationManager()
self.escalation_manager = EscalationManager()
def add_alert_rule(self, rule_config):
"""Add new alert rule"""
rule = AlertRule(
id=rule_config.id,
name=rule_config.name,
condition=rule_config.condition,
threshold=rule_config.threshold,
duration=rule_config.duration,
severity=rule_config.severity,
notification_channels=rule_config.notification_channels
)
self.alert_rules[rule.id] = rule
def evaluate_alerts(self, flow_data):
"""Evaluate all alert rules against current data"""
active_alerts = []
for rule_id, rule in self.alert_rules.items():
if self.evaluate_rule(rule, flow_data):
alert = Alert(
rule_id=rule_id,
rule_name=rule.name,
severity=rule.severity,
message=rule.generate_message(flow_data),
timestamp=time.time(),
data=flow_data
)
active_alerts.append(alert)
# Send notifications
self.notification_manager.send_alert(alert)
# Handle escalation
self.escalation_manager.handle_alert(alert)
return active_alerts
This comprehensive guide demonstrates enterprise-grade network monitoring and flow analysis implementation with real-time analytics, machine learning-based anomaly detection, high-performance storage systems, and advanced visualization capabilities. The examples provide production-ready patterns for building sophisticated network observability platforms that can handle massive traffic volumes while providing actionable insights for network operations and security teams.