Advanced Data Governance and Lineage Tracking: Enterprise-Grade Data Management and Compliance
Advanced Data Governance and Lineage Tracking: Enterprise-Grade Data Management and Compliance
Modern organizations face increasing regulatory requirements and the need for comprehensive data governance frameworks that ensure data quality, security, privacy, and compliance. Advanced data governance systems must provide automated classification, lineage tracking, access controls, and audit capabilities while scaling to handle enterprise data volumes.
This comprehensive guide explores sophisticated approaches to implementing enterprise-grade data governance and lineage tracking systems, covering regulatory compliance, automated policy enforcement, and advanced analytics for data management decision-making.
Data Governance Framework Architecture
Core Governance Components and Principles
Enterprise data governance requires a comprehensive framework that addresses data quality, security, privacy, and regulatory compliance across the entire data lifecycle.
# Advanced data governance framework implementation
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Set, Tuple, Union
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from enum import Enum
import uuid
import hashlib
import json
import logging
import re
class DataClassification(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
PERSONAL = "personal"
SENSITIVE_PERSONAL = "sensitive_personal"
class AccessLevel(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
class ComplianceFramework(Enum):
GDPR = "gdpr"
CCPA = "ccpa"
HIPAA = "hipaa"
SOX = "sox"
PCI_DSS = "pci_dss"
ISO_27001 = "iso_27001"
@dataclass
class DataAsset:
"""Core data asset representation for governance"""
asset_id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
asset_type: str = "" # table, view, file, stream, etc.
location: str = ""
schema: Dict[str, Any] = field(default_factory=dict)
classification: DataClassification = DataClassification.INTERNAL
owner: str = ""
steward: str = ""
tags: Set[str] = field(default_factory=set)
metadata: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
retention_policy: Optional['RetentionPolicy'] = None
access_policies: List['AccessPolicy'] = field(default_factory=list)
compliance_requirements: Set[ComplianceFramework] = field(default_factory=set)
def add_tag(self, tag: str):
"""Add tag to data asset"""
self.tags.add(tag)
self.updated_at = datetime.now(timezone.utc)
def remove_tag(self, tag: str):
"""Remove tag from data asset"""
self.tags.discard(tag)
self.updated_at = datetime.now(timezone.utc)
def update_classification(self, classification: DataClassification, reason: str):
"""Update data classification with audit trail"""
old_classification = self.classification
self.classification = classification
self.updated_at = datetime.now(timezone.utc)
# Log classification change
logging.info(f"Asset {self.asset_id} classification changed from {old_classification.value} to {classification.value}: {reason}")
@dataclass
class RetentionPolicy:
"""Data retention policy specification"""
policy_id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
retention_period_days: int = 2555 # 7 years default
auto_delete: bool = False
archive_after_days: Optional[int] = None
compliance_frameworks: Set[ComplianceFramework] = field(default_factory=set)
exceptions: List[str] = field(default_factory=list)
def is_expired(self, asset_date: datetime) -> bool:
"""Check if data asset has exceeded retention period"""
cutoff_date = datetime.now(timezone.utc) - timedelta(days=self.retention_period_days)
return asset_date < cutoff_date
def should_archive(self, asset_date: datetime) -> bool:
"""Check if data asset should be archived"""
if not self.archive_after_days:
return False
archive_date = datetime.now(timezone.utc) - timedelta(days=self.archive_after_days)
return asset_date < archive_date
@dataclass
class AccessPolicy:
"""Data access policy specification"""
policy_id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
principal_type: str = "user" # user, group, role, service
principal_id: str = ""
access_level: AccessLevel = AccessLevel.READ
conditions: Dict[str, Any] = field(default_factory=dict)
time_restrictions: Optional[Dict[str, Any]] = None
ip_restrictions: Optional[List[str]] = None
purpose_limitations: Optional[List[str]] = None
valid_from: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
valid_until: Optional[datetime] = None
def is_valid(self, timestamp: Optional[datetime] = None) -> bool:
"""Check if access policy is currently valid"""
now = timestamp or datetime.now(timezone.utc)
if now < self.valid_from:
return False
if self.valid_until and now > self.valid_until:
return False
return True
def check_time_restrictions(self, timestamp: datetime) -> bool:
"""Check if access is allowed at given time"""
if not self.time_restrictions:
return True
# Check day of week restrictions
if "allowed_days" in self.time_restrictions:
allowed_days = self.time_restrictions["allowed_days"]
current_day = timestamp.strftime("%A").lower()
if current_day not in allowed_days:
return False
# Check time of day restrictions
if "allowed_hours" in self.time_restrictions:
allowed_hours = self.time_restrictions["allowed_hours"]
current_hour = timestamp.hour
if not (allowed_hours["start"] <= current_hour <= allowed_hours["end"]):
return False
return True
class DataGovernanceEngine:
"""Core data governance engine"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.assets: Dict[str, DataAsset] = {}
self.policies: Dict[str, Union[RetentionPolicy, AccessPolicy]] = {}
self.classifiers: List['DataClassifier'] = []
self.compliance_manager = ComplianceManager()
self.audit_logger = AuditLogger()
def register_asset(self, asset: DataAsset) -> str:
"""Register a data asset with governance"""
# Auto-classify asset
classification = self._auto_classify_asset(asset)
if classification:
asset.classification = classification
# Apply default policies
self._apply_default_policies(asset)
# Store asset
self.assets[asset.asset_id] = asset
# Log registration
self.audit_logger.log_asset_registration(asset)
logging.info(f"Registered data asset: {asset.name} ({asset.asset_id})")
return asset.asset_id
def update_asset(self, asset_id: str, updates: Dict[str, Any]) -> bool:
"""Update data asset with governance validation"""
if asset_id not in self.assets:
raise ValueError(f"Asset {asset_id} not found")
asset = self.assets[asset_id]
old_values = {}
# Track changes for audit
for field, new_value in updates.items():
if hasattr(asset, field):
old_values[field] = getattr(asset, field)
setattr(asset, field, new_value)
asset.updated_at = datetime.now(timezone.utc)
# Re-classify if schema changed
if "schema" in updates:
new_classification = self._auto_classify_asset(asset)
if new_classification and new_classification != asset.classification:
asset.update_classification(new_classification, "Schema change triggered reclassification")
# Log update
self.audit_logger.log_asset_update(asset, old_values, updates)
return True
def _auto_classify_asset(self, asset: DataAsset) -> Optional[DataClassification]:
"""Automatically classify data asset"""
for classifier in self.classifiers:
classification = classifier.classify(asset)
if classification:
return classification
return None
def _apply_default_policies(self, asset: DataAsset):
"""Apply default policies based on classification"""
default_policies = self.config.get("default_policies", {})
classification_key = asset.classification.value
if classification_key in default_policies:
policy_config = default_policies[classification_key]
# Apply retention policy
if "retention" in policy_config:
retention_config = policy_config["retention"]
retention_policy = RetentionPolicy(
name=f"Default {classification_key} retention",
description=f"Default retention policy for {classification_key} data",
retention_period_days=retention_config.get("days", 2555),
auto_delete=retention_config.get("auto_delete", False),
archive_after_days=retention_config.get("archive_after_days")
)
asset.retention_policy = retention_policy
# Apply access policies
if "access" in policy_config:
for access_config in policy_config["access"]:
access_policy = AccessPolicy(
name=f"Default {classification_key} access",
description=f"Default access policy for {classification_key} data",
principal_type=access_config.get("principal_type", "role"),
principal_id=access_config.get("principal_id", "data_analysts"),
access_level=AccessLevel(access_config.get("access_level", "read"))
)
asset.access_policies.append(access_policy)
def check_access(self, asset_id: str, principal: str,
access_level: AccessLevel, context: Dict[str, Any]) -> bool:
"""Check if access is allowed for given principal"""
if asset_id not in self.assets:
return False
asset = self.assets[asset_id]
# Check each access policy
for policy in asset.access_policies:
if (policy.principal_id == principal and
policy.access_level == access_level and
policy.is_valid()):
# Check additional conditions
if policy.time_restrictions:
if not policy.check_time_restrictions(datetime.now(timezone.utc)):
continue
if policy.ip_restrictions:
client_ip = context.get("client_ip")
if client_ip not in policy.ip_restrictions:
continue
# Log access attempt
self.audit_logger.log_access_attempt(asset, principal, access_level, True, context)
return True
# Log denied access
self.audit_logger.log_access_attempt(asset, principal, access_level, False, context)
return False
def get_assets_by_classification(self, classification: DataClassification) -> List[DataAsset]:
"""Get all assets with specified classification"""
return [asset for asset in self.assets.values() if asset.classification == classification]
def get_expired_assets(self) -> List[DataAsset]:
"""Get assets that have exceeded retention period"""
expired_assets = []
for asset in self.assets.values():
if asset.retention_policy and asset.retention_policy.is_expired(asset.created_at):
expired_assets.append(asset)
return expired_assets
def search_assets(self, query: Dict[str, Any]) -> List[DataAsset]:
"""Search assets based on criteria"""
matching_assets = []
for asset in self.assets.values():
if self._matches_search_criteria(asset, query):
matching_assets.append(asset)
return matching_assets
def _matches_search_criteria(self, asset: DataAsset, query: Dict[str, Any]) -> bool:
"""Check if asset matches search criteria"""
# Text search
if "text" in query:
search_text = query["text"].lower()
if (search_text not in asset.name.lower() and
search_text not in asset.description.lower()):
return False
# Classification filter
if "classification" in query:
if asset.classification != DataClassification(query["classification"]):
return False
# Owner filter
if "owner" in query:
if asset.owner != query["owner"]:
return False
# Tag filter
if "tags" in query:
required_tags = set(query["tags"])
if not required_tags.issubset(asset.tags):
return False
return True
class DataClassifier(ABC):
"""Abstract base class for data classifiers"""
@abstractmethod
def classify(self, asset: DataAsset) -> Optional[DataClassification]:
"""Classify data asset"""
pass
class PIIClassifier(DataClassifier):
"""Classifier for Personally Identifiable Information"""
def __init__(self):
self.pii_patterns = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"ssn": r'\b\d{3}-\d{2}-\d{4}\b',
"phone": r'\b\d{3}-\d{3}-\d{4}\b',
"credit_card": r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
"passport": r'\b[A-Z]{2}\d{7}\b'
}
self.pii_field_names = {
"personal": ["first_name", "last_name", "full_name", "name"],
"contact": ["email", "phone", "address", "zip_code", "postal_code"],
"identification": ["ssn", "social_security", "passport", "driver_license"],
"financial": ["credit_card", "bank_account", "routing_number"]
}
def classify(self, asset: DataAsset) -> Optional[DataClassification]:
"""Classify asset based on PII content"""
pii_score = 0
sensitive_pii_score = 0
# Check schema field names
schema = asset.schema
if "properties" in schema:
for field_name, field_def in schema["properties"].items():
field_name_lower = field_name.lower()
# Check for personal information fields
for category, field_patterns in self.pii_field_names.items():
for pattern in field_patterns:
if pattern in field_name_lower:
if category in ["identification", "financial"]:
sensitive_pii_score += 1
else:
pii_score += 1
# Check for data patterns (if sample data available)
if "sample_data" in asset.metadata:
sample_data = asset.metadata["sample_data"]
for record in sample_data[:100]: # Check first 100 records
for field_value in record.values():
if isinstance(field_value, str):
for pattern_name, pattern in self.pii_patterns.items():
if re.search(pattern, field_value):
if pattern_name in ["ssn", "credit_card", "passport"]:
sensitive_pii_score += 1
else:
pii_score += 1
# Determine classification
if sensitive_pii_score > 0:
return DataClassification.SENSITIVE_PERSONAL
elif pii_score > 2:
return DataClassification.PERSONAL
elif pii_score > 0:
return DataClassification.CONFIDENTIAL
return None
class FinancialDataClassifier(DataClassifier):
"""Classifier for financial data"""
def __init__(self):
self.financial_keywords = [
"revenue", "profit", "loss", "earnings", "financial", "accounting",
"budget", "cost", "expense", "income", "salary", "payment",
"invoice", "transaction", "balance", "credit", "debit"
]
self.financial_field_patterns = [
"amount", "price", "cost", "fee", "rate", "balance", "total"
]
def classify(self, asset: DataAsset) -> Optional[DataClassification]:
"""Classify asset based on financial content"""
financial_score = 0
# Check asset name and description
text_content = f"{asset.name} {asset.description}".lower()
for keyword in self.financial_keywords:
if keyword in text_content:
financial_score += 1
# Check schema fields
if "properties" in asset.schema:
for field_name in asset.schema["properties"].keys():
field_name_lower = field_name.lower()
for pattern in self.financial_field_patterns:
if pattern in field_name_lower:
financial_score += 1
# Classify based on score
if financial_score >= 3:
return DataClassification.CONFIDENTIAL
elif financial_score >= 1:
return DataClassification.INTERNAL
return None
class ComplianceManager:
"""Manage compliance requirements and validation"""
def __init__(self):
self.compliance_rules = {
ComplianceFramework.GDPR: GDPRComplianceRules(),
ComplianceFramework.CCPA: CCPAComplianceRules(),
ComplianceFramework.HIPAA: HIPAAComplianceRules(),
ComplianceFramework.SOX: SOXComplianceRules()
}
def validate_compliance(self, asset: DataAsset) -> Dict[str, Any]:
"""Validate asset compliance against applicable frameworks"""
compliance_report = {
"asset_id": asset.asset_id,
"asset_name": asset.name,
"validation_timestamp": datetime.now(timezone.utc).isoformat(),
"overall_compliant": True,
"framework_results": {}
}
for framework in asset.compliance_requirements:
if framework in self.compliance_rules:
rules = self.compliance_rules[framework]
result = rules.validate(asset)
compliance_report["framework_results"][framework.value] = result
if not result["compliant"]:
compliance_report["overall_compliant"] = False
return compliance_report
def get_compliance_requirements(self, classification: DataClassification) -> Set[ComplianceFramework]:
"""Get applicable compliance frameworks for data classification"""
requirements = set()
if classification in [DataClassification.PERSONAL, DataClassification.SENSITIVE_PERSONAL]:
requirements.add(ComplianceFramework.GDPR)
requirements.add(ComplianceFramework.CCPA)
if classification == DataClassification.SENSITIVE_PERSONAL:
requirements.add(ComplianceFramework.HIPAA)
if classification == DataClassification.CONFIDENTIAL:
requirements.add(ComplianceFramework.SOX)
return requirements
class ComplianceRules(ABC):
"""Abstract base class for compliance rules"""
@abstractmethod
def validate(self, asset: DataAsset) -> Dict[str, Any]:
"""Validate asset against compliance rules"""
pass
class GDPRComplianceRules(ComplianceRules):
"""GDPR compliance validation rules"""
def validate(self, asset: DataAsset) -> Dict[str, Any]:
"""Validate GDPR compliance"""
result = {
"framework": "GDPR",
"compliant": True,
"violations": [],
"recommendations": []
}
# Check data subject rights
if asset.classification in [DataClassification.PERSONAL, DataClassification.SENSITIVE_PERSONAL]:
# Right to erasure (retention policy required)
if not asset.retention_policy:
result["violations"].append("Missing retention policy for personal data")
result["compliant"] = False
# Data minimization (purpose limitation)
purpose_limited = False
for policy in asset.access_policies:
if policy.purpose_limitations:
purpose_limited = True
break
if not purpose_limited:
result["violations"].append("No purpose limitations defined for personal data access")
result["recommendations"].append("Define purpose limitations for data access policies")
# Consent tracking (metadata should include consent information)
if "consent_tracking" not in asset.metadata:
result["violations"].append("Missing consent tracking for personal data")
result["recommendations"].append("Implement consent tracking mechanism")
# Data protection by design
encryption_required = asset.classification == DataClassification.SENSITIVE_PERSONAL
if encryption_required and not asset.metadata.get("encrypted", False):
result["violations"].append("Sensitive personal data must be encrypted")
result["compliant"] = False
return result
class CCPAComplianceRules(ComplianceRules):
"""CCPA compliance validation rules"""
def validate(self, asset: DataAsset) -> Dict[str, Any]:
"""Validate CCPA compliance"""
result = {
"framework": "CCPA",
"compliant": True,
"violations": [],
"recommendations": []
}
# CCPA applies to personal information of California residents
if asset.classification in [DataClassification.PERSONAL, DataClassification.SENSITIVE_PERSONAL]:
# Right to know - data inventory required
if not asset.description:
result["violations"].append("Personal information assets must have detailed descriptions")
result["recommendations"].append("Add comprehensive description of data processing purposes")
# Right to delete
if not asset.retention_policy or not asset.retention_policy.auto_delete:
result["recommendations"].append("Consider implementing automated deletion for consumer requests")
# Right to opt-out
if "opt_out_mechanism" not in asset.metadata:
result["recommendations"].append("Implement opt-out mechanism for personal information")
return result
class HIPAAComplianceRules(ComplianceRules):
"""HIPAA compliance validation rules"""
def validate(self, asset: DataAsset) -> Dict[str, Any]:
"""Validate HIPAA compliance"""
result = {
"framework": "HIPAA",
"compliant": True,
"violations": [],
"recommendations": []
}
# HIPAA applies to protected health information (PHI)
if asset.classification == DataClassification.SENSITIVE_PERSONAL:
# Administrative safeguards
if not asset.owner:
result["violations"].append("PHI must have designated data owner")
result["compliant"] = False
# Physical safeguards
if not asset.metadata.get("access_controls", False):
result["violations"].append("PHI must have physical access controls")
result["compliant"] = False
# Technical safeguards
if not asset.metadata.get("encrypted", False):
result["violations"].append("PHI must be encrypted")
result["compliant"] = False
if not asset.metadata.get("audit_logging", False):
result["violations"].append("PHI access must be logged and audited")
result["compliant"] = False
return result
class SOXComplianceRules(ComplianceRules):
"""SOX compliance validation rules"""
def validate(self, asset: DataAsset) -> Dict[str, Any]:
"""Validate SOX compliance"""
result = {
"framework": "SOX",
"compliant": True,
"violations": [],
"recommendations": []
}
# SOX applies to financial data
if asset.classification == DataClassification.CONFIDENTIAL:
# Internal controls
if not asset.access_policies:
result["violations"].append("Financial data must have defined access controls")
result["compliant"] = False
# Audit trail
if not asset.metadata.get("audit_logging", False):
result["violations"].append("Financial data access must be audited")
result["compliant"] = False
# Data integrity
if not asset.metadata.get("integrity_checks", False):
result["recommendations"].append("Implement data integrity validation")
# Change management
if not asset.metadata.get("change_approval", False):
result["recommendations"].append("Implement change approval process")
return result
class AuditLogger:
"""Comprehensive audit logging for governance activities"""
def __init__(self):
self.audit_events: List[Dict[str, Any]] = []
def log_asset_registration(self, asset: DataAsset):
"""Log asset registration event"""
event = {
"event_id": str(uuid.uuid4()),
"event_type": "asset_registration",
"timestamp": datetime.now(timezone.utc).isoformat(),
"asset_id": asset.asset_id,
"asset_name": asset.name,
"asset_type": asset.asset_type,
"classification": asset.classification.value,
"owner": asset.owner,
"metadata": {
"location": asset.location,
"tags": list(asset.tags)
}
}
self.audit_events.append(event)
logging.info(f"Audit: Asset registered - {asset.name}")
def log_asset_update(self, asset: DataAsset, old_values: Dict[str, Any], new_values: Dict[str, Any]):
"""Log asset update event"""
event = {
"event_id": str(uuid.uuid4()),
"event_type": "asset_update",
"timestamp": datetime.now(timezone.utc).isoformat(),
"asset_id": asset.asset_id,
"asset_name": asset.name,
"changes": {
"old_values": old_values,
"new_values": new_values
},
"metadata": {
"updated_by": "system" # Would come from context
}
}
self.audit_events.append(event)
logging.info(f"Audit: Asset updated - {asset.name}")
def log_access_attempt(self, asset: DataAsset, principal: str,
access_level: AccessLevel, granted: bool, context: Dict[str, Any]):
"""Log access attempt event"""
event = {
"event_id": str(uuid.uuid4()),
"event_type": "access_attempt",
"timestamp": datetime.now(timezone.utc).isoformat(),
"asset_id": asset.asset_id,
"asset_name": asset.name,
"principal": principal,
"access_level": access_level.value,
"granted": granted,
"context": context,
"metadata": {
"asset_classification": asset.classification.value,
"client_ip": context.get("client_ip"),
"user_agent": context.get("user_agent")
}
}
self.audit_events.append(event)
status = "granted" if granted else "denied"
logging.info(f"Audit: Access {status} - {principal} -> {asset.name} ({access_level.value})")
def log_compliance_validation(self, asset: DataAsset, compliance_result: Dict[str, Any]):
"""Log compliance validation event"""
event = {
"event_id": str(uuid.uuid4()),
"event_type": "compliance_validation",
"timestamp": datetime.now(timezone.utc).isoformat(),
"asset_id": asset.asset_id,
"asset_name": asset.name,
"compliance_result": compliance_result,
"metadata": {
"asset_classification": asset.classification.value,
"compliance_frameworks": [f.value for f in asset.compliance_requirements]
}
}
self.audit_events.append(event)
status = "compliant" if compliance_result["overall_compliant"] else "non_compliant"
logging.info(f"Audit: Compliance validation {status} - {asset.name}")
def get_audit_trail(self, asset_id: Optional[str] = None,
event_type: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> List[Dict[str, Any]]:
"""Get filtered audit trail"""
filtered_events = []
for event in self.audit_events:
# Filter by asset ID
if asset_id and event.get("asset_id") != asset_id:
continue
# Filter by event type
if event_type and event.get("event_type") != event_type:
continue
# Filter by date range
event_timestamp = datetime.fromisoformat(event["timestamp"])
if start_date and event_timestamp < start_date:
continue
if end_date and event_timestamp > end_date:
continue
filtered_events.append(event)
return filtered_events
def generate_audit_report(self, timeframe_days: int = 30) -> Dict[str, Any]:
"""Generate comprehensive audit report"""
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=timeframe_days)
events = self.get_audit_trail(start_date=start_date, end_date=end_date)
report = {
"report_id": str(uuid.uuid4()),
"generated_at": end_date.isoformat(),
"timeframe_days": timeframe_days,
"total_events": len(events),
"event_summary": {},
"access_summary": {},
"compliance_summary": {},
"top_accessed_assets": {},
"security_incidents": []
}
# Event type summary
for event in events:
event_type = event["event_type"]
report["event_summary"][event_type] = report["event_summary"].get(event_type, 0) + 1
# Access summary
access_events = [e for e in events if e["event_type"] == "access_attempt"]
granted_access = [e for e in access_events if e["granted"]]
denied_access = [e for e in access_events if not e["granted"]]
report["access_summary"] = {
"total_attempts": len(access_events),
"granted": len(granted_access),
"denied": len(denied_access),
"success_rate": len(granted_access) / len(access_events) * 100 if access_events else 0
}
# Compliance summary
compliance_events = [e for e in events if e["event_type"] == "compliance_validation"]
compliant_validations = [e for e in compliance_events if e["compliance_result"]["overall_compliant"]]
report["compliance_summary"] = {
"total_validations": len(compliance_events),
"compliant": len(compliant_validations),
"non_compliant": len(compliance_events) - len(compliant_validations),
"compliance_rate": len(compliant_validations) / len(compliance_events) * 100 if compliance_events else 0
}
# Top accessed assets
asset_access_count = {}
for event in granted_access:
asset_name = event["asset_name"]
asset_access_count[asset_name] = asset_access_count.get(asset_name, 0) + 1
top_assets = sorted(asset_access_count.items(), key=lambda x: x[1], reverse=True)[:10]
report["top_accessed_assets"] = dict(top_assets)
# Security incidents (multiple failed access attempts)
failed_attempts_by_principal = {}
for event in denied_access:
principal = event["principal"]
if principal not in failed_attempts_by_principal:
failed_attempts_by_principal[principal] = []
failed_attempts_by_principal[principal].append(event)
for principal, failed_events in failed_attempts_by_principal.items():
if len(failed_events) >= 5: # Threshold for security incident
report["security_incidents"].append({
"type": "multiple_failed_access",
"principal": principal,
"failed_attempts": len(failed_events),
"timeframe": f"{failed_events[0]['timestamp']} to {failed_events[-1]['timestamp']}"
})
return report
Advanced Data Lineage Tracking
Comprehensive Lineage Implementation
# Advanced data lineage tracking and analysis
from typing import Dict, List, Any, Optional, Set, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timezone
import networkx as nx
import json
import logging
@dataclass
class LineageNode:
"""Data lineage node representation"""
node_id: str
name: str
node_type: str # table, view, file, transformation, model, report
system: str
location: str
properties: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
@dataclass
class LineageEdge:
"""Data lineage relationship representation"""
edge_id: str
source_node_id: str
target_node_id: str
relationship_type: str # reads, writes, transforms, derives, depends_on
transformation_logic: Optional[str] = None
columns_mapping: Optional[Dict[str, str]] = None
properties: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
class DataLineageTracker:
"""Advanced data lineage tracking system"""
def __init__(self):
self.lineage_graph = nx.DiGraph()
self.nodes: Dict[str, LineageNode] = {}
self.edges: Dict[str, LineageEdge] = {}
self.column_lineage: Dict[str, Dict[str, List[str]]] = {} # node_id -> {column -> [source_columns]}
def add_node(self, node: LineageNode) -> str:
"""Add node to lineage graph"""
self.nodes[node.node_id] = node
self.lineage_graph.add_node(node.node_id, **node.properties)
logging.info(f"Added lineage node: {node.name} ({node.node_id})")
return node.node_id
def add_edge(self, edge: LineageEdge) -> str:
"""Add edge to lineage graph"""
if edge.source_node_id not in self.nodes:
raise ValueError(f"Source node {edge.source_node_id} not found")
if edge.target_node_id not in self.nodes:
raise ValueError(f"Target node {edge.target_node_id} not found")
self.edges[edge.edge_id] = edge
self.lineage_graph.add_edge(
edge.source_node_id, edge.target_node_id,
edge_id=edge.edge_id,
relationship_type=edge.relationship_type,
**edge.properties
)
# Update column lineage if mapping provided
if edge.columns_mapping:
if edge.target_node_id not in self.column_lineage:
self.column_lineage[edge.target_node_id] = {}
for target_col, source_col in edge.columns_mapping.items():
if target_col not in self.column_lineage[edge.target_node_id]:
self.column_lineage[edge.target_node_id][target_col] = []
self.column_lineage[edge.target_node_id][target_col].append(f"{edge.source_node_id}.{source_col}")
logging.info(f"Added lineage edge: {edge.source_node_id} -> {edge.target_node_id}")
return edge.edge_id
def get_upstream_lineage(self, node_id: str, depth: int = 10) -> Dict[str, Any]:
"""Get upstream data lineage for a node"""
if node_id not in self.nodes:
raise ValueError(f"Node {node_id} not found")
upstream_nodes = set()
upstream_edges = []
# BFS traversal for upstream nodes
queue = [(node_id, 0)]
visited = set()
while queue:
current_node, current_depth = queue.pop(0)
if current_node in visited or current_depth >= depth:
continue
visited.add(current_node)
upstream_nodes.add(current_node)
# Get predecessors
for predecessor in self.lineage_graph.predecessors(current_node):
if predecessor not in visited:
queue.append((predecessor, current_depth + 1))
# Add edge information
edge_data = self.lineage_graph.edges[predecessor, current_node]
edge_id = edge_data.get("edge_id")
if edge_id and edge_id in self.edges:
upstream_edges.append(self.edges[edge_id])
return {
"target_node": self.nodes[node_id],
"upstream_nodes": [self.nodes[nid] for nid in upstream_nodes if nid != node_id],
"edges": upstream_edges,
"depth_reached": depth
}
def get_downstream_lineage(self, node_id: str, depth: int = 10) -> Dict[str, Any]:
"""Get downstream data lineage for a node"""
if node_id not in self.nodes:
raise ValueError(f"Node {node_id} not found")
downstream_nodes = set()
downstream_edges = []
# BFS traversal for downstream nodes
queue = [(node_id, 0)]
visited = set()
while queue:
current_node, current_depth = queue.pop(0)
if current_node in visited or current_depth >= depth:
continue
visited.add(current_node)
downstream_nodes.add(current_node)
# Get successors
for successor in self.lineage_graph.successors(current_node):
if successor not in visited:
queue.append((successor, current_depth + 1))
# Add edge information
edge_data = self.lineage_graph.edges[current_node, successor]
edge_id = edge_data.get("edge_id")
if edge_id and edge_id in self.edges:
downstream_edges.append(self.edges[edge_id])
return {
"source_node": self.nodes[node_id],
"downstream_nodes": [self.nodes[nid] for nid in downstream_nodes if nid != node_id],
"edges": downstream_edges,
"depth_reached": depth
}
def get_column_lineage(self, node_id: str, column_name: str) -> List[str]:
"""Get column-level lineage"""
if node_id not in self.column_lineage:
return []
return self.column_lineage[node_id].get(column_name, [])
def find_impact_analysis(self, node_id: str) -> Dict[str, Any]:
"""Perform impact analysis for changes to a node"""
downstream_lineage = self.get_downstream_lineage(node_id)
impact_analysis = {
"source_node": self.nodes[node_id],
"potentially_affected_nodes": downstream_lineage["downstream_nodes"],
"impact_severity": self._calculate_impact_severity(downstream_lineage),
"affected_systems": self._get_affected_systems(downstream_lineage),
"recommendations": self._generate_impact_recommendations(downstream_lineage)
}
return impact_analysis
def find_root_cause_analysis(self, node_id: str) -> Dict[str, Any]:
"""Perform root cause analysis for issues with a node"""
upstream_lineage = self.get_upstream_lineage(node_id)
root_cause_analysis = {
"target_node": self.nodes[node_id],
"potential_root_causes": upstream_lineage["upstream_nodes"],
"dependency_depth": len(upstream_lineage["upstream_nodes"]),
"critical_dependencies": self._identify_critical_dependencies(upstream_lineage),
"recommendations": self._generate_root_cause_recommendations(upstream_lineage)
}
return root_cause_analysis
def _calculate_impact_severity(self, lineage: Dict[str, Any]) -> str:
"""Calculate impact severity based on downstream dependencies"""
downstream_count = len(lineage["downstream_nodes"])
# Count critical systems
critical_systems = ["production_reports", "customer_facing", "regulatory"]
critical_count = 0
for node in lineage["downstream_nodes"]:
for system in critical_systems:
if system in node.system.lower() or system in node.name.lower():
critical_count += 1
break
if critical_count > 0:
return "high"
elif downstream_count > 10:
return "medium"
elif downstream_count > 0:
return "low"
else:
return "none"
def _get_affected_systems(self, lineage: Dict[str, Any]) -> Set[str]:
"""Get list of affected systems"""
systems = set()
for node in lineage["downstream_nodes"]:
systems.add(node.system)
return systems
def _generate_impact_recommendations(self, lineage: Dict[str, Any]) -> List[str]:
"""Generate recommendations for impact mitigation"""
recommendations = []
downstream_count = len(lineage["downstream_nodes"])
if downstream_count > 10:
recommendations.append("High impact change - consider phased rollout")
recommendations.append("Notify downstream data owners before making changes")
if downstream_count > 0:
recommendations.append("Test changes in development environment first")
recommendations.append("Monitor downstream systems after deployment")
return recommendations
def _identify_critical_dependencies(self, lineage: Dict[str, Any]) -> List[LineageNode]:
"""Identify critical dependencies in upstream lineage"""
critical_nodes = []
for node in lineage["upstream_nodes"]:
# Check if node is a single point of failure
downstream_of_node = self.get_downstream_lineage(node.node_id, depth=1)
if len(downstream_of_node["downstream_nodes"]) > 5:
critical_nodes.append(node)
return critical_nodes
def _generate_root_cause_recommendations(self, lineage: Dict[str, Any]) -> List[str]:
"""Generate recommendations for root cause investigation"""
recommendations = []
upstream_count = len(lineage["upstream_nodes"])
if upstream_count > 10:
recommendations.append("Complex dependency chain - investigate most recent changes first")
if upstream_count > 0:
recommendations.append("Check data quality and freshness of upstream sources")
recommendations.append("Validate transformation logic in pipeline")
return recommendations
def detect_circular_dependencies(self) -> List[List[str]]:
"""Detect circular dependencies in lineage graph"""
try:
# NetworkX will raise exception if cycles exist
cycles = list(nx.simple_cycles(self.lineage_graph))
return cycles
except nx.NetworkXNoCycle:
return []
def get_lineage_statistics(self) -> Dict[str, Any]:
"""Get comprehensive lineage statistics"""
stats = {
"total_nodes": len(self.nodes),
"total_edges": len(self.edges),
"node_types": {},
"relationship_types": {},
"systems": set(),
"max_depth": 0,
"avg_degree": 0,
"circular_dependencies": len(self.detect_circular_dependencies())
}
# Node type distribution
for node in self.nodes.values():
stats["node_types"][node.node_type] = stats["node_types"].get(node.node_type, 0) + 1
stats["systems"].add(node.system)
# Relationship type distribution
for edge in self.edges.values():
rel_type = edge.relationship_type
stats["relationship_types"][rel_type] = stats["relationship_types"].get(rel_type, 0) + 1
# Graph metrics
if self.lineage_graph.nodes:
degrees = [self.lineage_graph.degree(node) for node in self.lineage_graph.nodes]
stats["avg_degree"] = sum(degrees) / len(degrees)
# Calculate maximum depth
for node in self.lineage_graph.nodes:
try:
depths = nx.single_source_shortest_path_length(self.lineage_graph, node)
max_node_depth = max(depths.values()) if depths else 0
stats["max_depth"] = max(stats["max_depth"], max_node_depth)
except:
pass
stats["systems"] = list(stats["systems"])
return stats
def export_lineage(self, format: str = "json") -> str:
"""Export lineage graph in specified format"""
if format == "json":
export_data = {
"nodes": [
{
"id": node.node_id,
"name": node.name,
"type": node.node_type,
"system": node.system,
"location": node.location,
"properties": node.properties,
"metadata": node.metadata
}
for node in self.nodes.values()
],
"edges": [
{
"id": edge.edge_id,
"source": edge.source_node_id,
"target": edge.target_node_id,
"type": edge.relationship_type,
"transformation_logic": edge.transformation_logic,
"columns_mapping": edge.columns_mapping,
"properties": edge.properties
}
for edge in self.edges.values()
],
"metadata": {
"exported_at": datetime.now(timezone.utc).isoformat(),
"statistics": self.get_lineage_statistics()
}
}
return json.dumps(export_data, indent=2, default=str)
elif format == "graphml":
# Export as GraphML for visualization tools
nx.write_graphml(self.lineage_graph, "lineage_graph.graphml")
return "lineage_graph.graphml"
else:
raise ValueError(f"Unsupported export format: {format}")
class AutomatedLineageCapture:
"""Automatically capture lineage from various sources"""
def __init__(self, lineage_tracker: DataLineageTracker):
self.lineage_tracker = lineage_tracker
self.parsers = {
"sql": SQLLineageParser(),
"spark": SparkLineageParser(),
"airflow": AirflowLineageParser(),
"dbt": DBTLineageParser()
}
def capture_from_sql(self, sql_query: str, execution_context: Dict[str, Any]) -> List[str]:
"""Capture lineage from SQL query"""
parser = self.parsers["sql"]
lineage_info = parser.parse(sql_query, execution_context)
# Create nodes and edges from parsed information
edge_ids = []
for source_table in lineage_info["source_tables"]:
source_node_id = self._ensure_node_exists(source_table)
for target_table in lineage_info["target_tables"]:
target_node_id = self._ensure_node_exists(target_table)
edge = LineageEdge(
edge_id=str(uuid.uuid4()),
source_node_id=source_node_id,
target_node_id=target_node_id,
relationship_type="transforms",
transformation_logic=sql_query,
columns_mapping=lineage_info.get("column_mapping", {}),
properties={"query_type": lineage_info.get("query_type", "unknown")}
)
edge_id = self.lineage_tracker.add_edge(edge)
edge_ids.append(edge_id)
return edge_ids
def _ensure_node_exists(self, table_info: Dict[str, Any]) -> str:
"""Ensure node exists in lineage graph"""
node_id = f"{table_info['database']}.{table_info['schema']}.{table_info['table']}"
if node_id not in self.lineage_tracker.nodes:
node = LineageNode(
node_id=node_id,
name=table_info['table'],
node_type="table",
system=table_info.get('system', 'unknown'),
location=f"{table_info['database']}.{table_info['schema']}.{table_info['table']}",
properties=table_info
)
self.lineage_tracker.add_node(node)
return node_id
class LineageParser(ABC):
"""Abstract base class for lineage parsers"""
@abstractmethod
def parse(self, source: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Parse lineage information from source"""
pass
class SQLLineageParser(LineageParser):
"""Parse lineage from SQL queries"""
def parse(self, sql_query: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Parse SQL query for lineage information"""
# Simplified SQL parsing - in practice would use sqlparse or similar
sql_upper = sql_query.upper()
lineage_info = {
"source_tables": [],
"target_tables": [],
"column_mapping": {},
"query_type": "unknown"
}
# Determine query type
if sql_upper.strip().startswith("SELECT"):
lineage_info["query_type"] = "select"
elif sql_upper.strip().startswith("INSERT"):
lineage_info["query_type"] = "insert"
elif sql_upper.strip().startswith("UPDATE"):
lineage_info["query_type"] = "update"
elif sql_upper.strip().startswith("CREATE"):
lineage_info["query_type"] = "create"
# Extract table references (simplified)
import re
# Find FROM clause tables
from_match = re.search(r'FROM\s+(\w+(?:\.\w+)*)', sql_upper)
if from_match:
table_name = from_match.group(1)
parts = table_name.split('.')
if len(parts) == 3:
database, schema, table = parts
elif len(parts) == 2:
database = context.get("default_database", "default")
schema, table = parts
else:
database = context.get("default_database", "default")
schema = context.get("default_schema", "public")
table = parts[0]
lineage_info["source_tables"].append({
"database": database,
"schema": schema,
"table": table,
"system": context.get("system", "unknown")
})
# Find JOIN tables
join_matches = re.findall(r'JOIN\s+(\w+(?:\.\w+)*)', sql_upper)
for join_table in join_matches:
parts = join_table.split('.')
if len(parts) == 3:
database, schema, table = parts
elif len(parts) == 2:
database = context.get("default_database", "default")
schema, table = parts
else:
database = context.get("default_database", "default")
schema = context.get("default_schema", "public")
table = parts[0]
lineage_info["source_tables"].append({
"database": database,
"schema": schema,
"table": table,
"system": context.get("system", "unknown")
})
# Find target tables (INSERT INTO, CREATE TABLE, etc.)
target_matches = re.findall(r'(?:INSERT\s+INTO|CREATE\s+TABLE)\s+(\w+(?:\.\w+)*)', sql_upper)
for target_table in target_matches:
parts = target_table.split('.')
if len(parts) == 3:
database, schema, table = parts
elif len(parts) == 2:
database = context.get("default_database", "default")
schema, table = parts
else:
database = context.get("default_database", "default")
schema = context.get("default_schema", "public")
table = parts[0]
lineage_info["target_tables"].append({
"database": database,
"schema": schema,
"table": table,
"system": context.get("system", "unknown")
})
return lineage_info
class SparkLineageParser(LineageParser):
"""Parse lineage from Spark applications"""
def parse(self, source: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Parse Spark application for lineage information"""
# Would integrate with Spark's query execution plans
# This is a simplified implementation
lineage_info = {
"source_tables": [],
"target_tables": [],
"transformations": [],
"query_type": "spark_job"
}
# Parse from Spark execution plan or application logs
# Implementation would depend on specific Spark integration
return lineage_info
class AirflowLineageParser(LineageParser):
"""Parse lineage from Airflow DAGs"""
def parse(self, source: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Parse Airflow DAG for lineage information"""
# Would parse Airflow DAG definitions and task dependencies
# This is a simplified implementation
lineage_info = {
"source_datasets": [],
"target_datasets": [],
"transformations": [],
"query_type": "airflow_dag"
}
return lineage_info
class DBTLineageParser(LineageParser):
"""Parse lineage from dbt models"""
def parse(self, source: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Parse dbt model for lineage information"""
# Would parse dbt manifest.json and model files
# This is a simplified implementation
lineage_info = {
"source_models": [],
"target_models": [],
"transformations": [],
"query_type": "dbt_model"
}
return lineage_info
Conclusion
Implementing advanced data governance and lineage tracking systems requires sophisticated approaches to data classification, access control, compliance automation, and comprehensive audit capabilities. The frameworks and implementations shown in this guide provide a foundation for building enterprise-grade data governance systems that can scale with organizational needs while ensuring regulatory compliance and data security.
Key takeaways for successful data governance implementation include:
- Automated Classification: Implement intelligent data classification systems that can automatically identify sensitive data and apply appropriate policies
- Comprehensive Lineage: Build detailed lineage tracking that captures column-level dependencies and transformation logic
- Policy Automation: Create automated policy enforcement mechanisms that reduce manual governance overhead
- Compliance Integration: Integrate compliance frameworks directly into governance workflows for continuous validation
- Audit Excellence: Implement comprehensive audit logging and reporting for transparency and regulatory requirements
By following these advanced patterns and implementing robust governance frameworks, organizations can build data management systems that provide both operational efficiency and regulatory compliance while maintaining data quality and security at enterprise scale.