Advanced Data Governance and Lineage Tracking: Enterprise-Grade Data Management and Compliance

Modern organizations face increasing regulatory requirements and the need for comprehensive data governance frameworks that ensure data quality, security, privacy, and compliance. Advanced data governance systems must provide automated classification, lineage tracking, access controls, and audit capabilities while scaling to handle enterprise data volumes.

This comprehensive guide explores sophisticated approaches to implementing enterprise-grade data governance and lineage tracking systems, covering regulatory compliance, automated policy enforcement, and advanced analytics for data management decision-making.

Data Governance Framework Architecture

Core Governance Components and Principles

Enterprise data governance requires a comprehensive framework that addresses data quality, security, privacy, and regulatory compliance across the entire data lifecycle.

# Advanced data governance framework implementation
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Set, Tuple, Union
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from enum import Enum
import uuid
import hashlib
import json
import logging
import re

class DataClassification(Enum):
    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"
    PERSONAL = "personal"
    SENSITIVE_PERSONAL = "sensitive_personal"

class AccessLevel(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"

class ComplianceFramework(Enum):
    GDPR = "gdpr"
    CCPA = "ccpa"
    HIPAA = "hipaa"
    SOX = "sox"
    PCI_DSS = "pci_dss"
    ISO_27001 = "iso_27001"

@dataclass
class DataAsset:
    """Core data asset representation for governance"""
    asset_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    description: str = ""
    asset_type: str = ""  # table, view, file, stream, etc.
    location: str = ""
    schema: Dict[str, Any] = field(default_factory=dict)
    classification: DataClassification = DataClassification.INTERNAL
    owner: str = ""
    steward: str = ""
    tags: Set[str] = field(default_factory=set)
    metadata: Dict[str, Any] = field(default_factory=dict)
    created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    retention_policy: Optional['RetentionPolicy'] = None
    access_policies: List['AccessPolicy'] = field(default_factory=list)
    compliance_requirements: Set[ComplianceFramework] = field(default_factory=set)
    
    def add_tag(self, tag: str):
        """Add tag to data asset"""
        self.tags.add(tag)
        self.updated_at = datetime.now(timezone.utc)
    
    def remove_tag(self, tag: str):
        """Remove tag from data asset"""
        self.tags.discard(tag)
        self.updated_at = datetime.now(timezone.utc)
    
    def update_classification(self, classification: DataClassification, reason: str):
        """Update data classification with audit trail"""
        old_classification = self.classification
        self.classification = classification
        self.updated_at = datetime.now(timezone.utc)
        
        # Log classification change
        logging.info(f"Asset {self.asset_id} classification changed from {old_classification.value} to {classification.value}: {reason}")

@dataclass
class RetentionPolicy:
    """Data retention policy specification"""
    policy_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    description: str = ""
    retention_period_days: int = 2555  # 7 years default
    auto_delete: bool = False
    archive_after_days: Optional[int] = None
    compliance_frameworks: Set[ComplianceFramework] = field(default_factory=set)
    exceptions: List[str] = field(default_factory=list)
    
    def is_expired(self, asset_date: datetime) -> bool:
        """Check if data asset has exceeded retention period"""
        cutoff_date = datetime.now(timezone.utc) - timedelta(days=self.retention_period_days)
        return asset_date < cutoff_date
    
    def should_archive(self, asset_date: datetime) -> bool:
        """Check if data asset should be archived"""
        if not self.archive_after_days:
            return False
        
        archive_date = datetime.now(timezone.utc) - timedelta(days=self.archive_after_days)
        return asset_date < archive_date

@dataclass
class AccessPolicy:
    """Data access policy specification"""
    policy_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    description: str = ""
    principal_type: str = "user"  # user, group, role, service
    principal_id: str = ""
    access_level: AccessLevel = AccessLevel.READ
    conditions: Dict[str, Any] = field(default_factory=dict)
    time_restrictions: Optional[Dict[str, Any]] = None
    ip_restrictions: Optional[List[str]] = None
    purpose_limitations: Optional[List[str]] = None
    valid_from: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    valid_until: Optional[datetime] = None
    
    def is_valid(self, timestamp: Optional[datetime] = None) -> bool:
        """Check if access policy is currently valid"""
        now = timestamp or datetime.now(timezone.utc)
        
        if now < self.valid_from:
            return False
        
        if self.valid_until and now > self.valid_until:
            return False
        
        return True
    
    def check_time_restrictions(self, timestamp: datetime) -> bool:
        """Check if access is allowed at given time"""
        if not self.time_restrictions:
            return True
        
        # Check day of week restrictions
        if "allowed_days" in self.time_restrictions:
            allowed_days = self.time_restrictions["allowed_days"]
            current_day = timestamp.strftime("%A").lower()
            if current_day not in allowed_days:
                return False
        
        # Check time of day restrictions
        if "allowed_hours" in self.time_restrictions:
            allowed_hours = self.time_restrictions["allowed_hours"]
            current_hour = timestamp.hour
            if not (allowed_hours["start"] <= current_hour <= allowed_hours["end"]):
                return False
        
        return True

class DataGovernanceEngine:
    """Core data governance engine"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.assets: Dict[str, DataAsset] = {}
        self.policies: Dict[str, Union[RetentionPolicy, AccessPolicy]] = {}
        self.classifiers: List['DataClassifier'] = []
        self.compliance_manager = ComplianceManager()
        self.audit_logger = AuditLogger()
        
    def register_asset(self, asset: DataAsset) -> str:
        """Register a data asset with governance"""
        
        # Auto-classify asset
        classification = self._auto_classify_asset(asset)
        if classification:
            asset.classification = classification
        
        # Apply default policies
        self._apply_default_policies(asset)
        
        # Store asset
        self.assets[asset.asset_id] = asset
        
        # Log registration
        self.audit_logger.log_asset_registration(asset)
        
        logging.info(f"Registered data asset: {asset.name} ({asset.asset_id})")
        return asset.asset_id
    
    def update_asset(self, asset_id: str, updates: Dict[str, Any]) -> bool:
        """Update data asset with governance validation"""
        
        if asset_id not in self.assets:
            raise ValueError(f"Asset {asset_id} not found")
        
        asset = self.assets[asset_id]
        old_values = {}
        
        # Track changes for audit
        for field, new_value in updates.items():
            if hasattr(asset, field):
                old_values[field] = getattr(asset, field)
                setattr(asset, field, new_value)
        
        asset.updated_at = datetime.now(timezone.utc)
        
        # Re-classify if schema changed
        if "schema" in updates:
            new_classification = self._auto_classify_asset(asset)
            if new_classification and new_classification != asset.classification:
                asset.update_classification(new_classification, "Schema change triggered reclassification")
        
        # Log update
        self.audit_logger.log_asset_update(asset, old_values, updates)
        
        return True
    
    def _auto_classify_asset(self, asset: DataAsset) -> Optional[DataClassification]:
        """Automatically classify data asset"""
        
        for classifier in self.classifiers:
            classification = classifier.classify(asset)
            if classification:
                return classification
        
        return None
    
    def _apply_default_policies(self, asset: DataAsset):
        """Apply default policies based on classification"""
        
        default_policies = self.config.get("default_policies", {})
        classification_key = asset.classification.value
        
        if classification_key in default_policies:
            policy_config = default_policies[classification_key]
            
            # Apply retention policy
            if "retention" in policy_config:
                retention_config = policy_config["retention"]
                retention_policy = RetentionPolicy(
                    name=f"Default {classification_key} retention",
                    description=f"Default retention policy for {classification_key} data",
                    retention_period_days=retention_config.get("days", 2555),
                    auto_delete=retention_config.get("auto_delete", False),
                    archive_after_days=retention_config.get("archive_after_days")
                )
                asset.retention_policy = retention_policy
            
            # Apply access policies
            if "access" in policy_config:
                for access_config in policy_config["access"]:
                    access_policy = AccessPolicy(
                        name=f"Default {classification_key} access",
                        description=f"Default access policy for {classification_key} data",
                        principal_type=access_config.get("principal_type", "role"),
                        principal_id=access_config.get("principal_id", "data_analysts"),
                        access_level=AccessLevel(access_config.get("access_level", "read"))
                    )
                    asset.access_policies.append(access_policy)
    
    def check_access(self, asset_id: str, principal: str, 
                    access_level: AccessLevel, context: Dict[str, Any]) -> bool:
        """Check if access is allowed for given principal"""
        
        if asset_id not in self.assets:
            return False
        
        asset = self.assets[asset_id]
        
        # Check each access policy
        for policy in asset.access_policies:
            if (policy.principal_id == principal and 
                policy.access_level == access_level and
                policy.is_valid()):
                
                # Check additional conditions
                if policy.time_restrictions:
                    if not policy.check_time_restrictions(datetime.now(timezone.utc)):
                        continue
                
                if policy.ip_restrictions:
                    client_ip = context.get("client_ip")
                    if client_ip not in policy.ip_restrictions:
                        continue
                
                # Log access attempt
                self.audit_logger.log_access_attempt(asset, principal, access_level, True, context)
                return True
        
        # Log denied access
        self.audit_logger.log_access_attempt(asset, principal, access_level, False, context)
        return False
    
    def get_assets_by_classification(self, classification: DataClassification) -> List[DataAsset]:
        """Get all assets with specified classification"""
        return [asset for asset in self.assets.values() if asset.classification == classification]
    
    def get_expired_assets(self) -> List[DataAsset]:
        """Get assets that have exceeded retention period"""
        expired_assets = []
        
        for asset in self.assets.values():
            if asset.retention_policy and asset.retention_policy.is_expired(asset.created_at):
                expired_assets.append(asset)
        
        return expired_assets
    
    def search_assets(self, query: Dict[str, Any]) -> List[DataAsset]:
        """Search assets based on criteria"""
        
        matching_assets = []
        
        for asset in self.assets.values():
            if self._matches_search_criteria(asset, query):
                matching_assets.append(asset)
        
        return matching_assets
    
    def _matches_search_criteria(self, asset: DataAsset, query: Dict[str, Any]) -> bool:
        """Check if asset matches search criteria"""
        
        # Text search
        if "text" in query:
            search_text = query["text"].lower()
            if (search_text not in asset.name.lower() and 
                search_text not in asset.description.lower()):
                return False
        
        # Classification filter
        if "classification" in query:
            if asset.classification != DataClassification(query["classification"]):
                return False
        
        # Owner filter
        if "owner" in query:
            if asset.owner != query["owner"]:
                return False
        
        # Tag filter
        if "tags" in query:
            required_tags = set(query["tags"])
            if not required_tags.issubset(asset.tags):
                return False
        
        return True

class DataClassifier(ABC):
    """Abstract base class for data classifiers"""
    
    @abstractmethod
    def classify(self, asset: DataAsset) -> Optional[DataClassification]:
        """Classify data asset"""
        pass

class PIIClassifier(DataClassifier):
    """Classifier for Personally Identifiable Information"""
    
    def __init__(self):
        self.pii_patterns = {
            "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            "ssn": r'\b\d{3}-\d{2}-\d{4}\b',
            "phone": r'\b\d{3}-\d{3}-\d{4}\b',
            "credit_card": r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
            "passport": r'\b[A-Z]{2}\d{7}\b'
        }
        
        self.pii_field_names = {
            "personal": ["first_name", "last_name", "full_name", "name"],
            "contact": ["email", "phone", "address", "zip_code", "postal_code"],
            "identification": ["ssn", "social_security", "passport", "driver_license"],
            "financial": ["credit_card", "bank_account", "routing_number"]
        }
    
    def classify(self, asset: DataAsset) -> Optional[DataClassification]:
        """Classify asset based on PII content"""
        
        pii_score = 0
        sensitive_pii_score = 0
        
        # Check schema field names
        schema = asset.schema
        if "properties" in schema:
            for field_name, field_def in schema["properties"].items():
                field_name_lower = field_name.lower()
                
                # Check for personal information fields
                for category, field_patterns in self.pii_field_names.items():
                    for pattern in field_patterns:
                        if pattern in field_name_lower:
                            if category in ["identification", "financial"]:
                                sensitive_pii_score += 1
                            else:
                                pii_score += 1
        
        # Check for data patterns (if sample data available)
        if "sample_data" in asset.metadata:
            sample_data = asset.metadata["sample_data"]
            for record in sample_data[:100]:  # Check first 100 records
                for field_value in record.values():
                    if isinstance(field_value, str):
                        for pattern_name, pattern in self.pii_patterns.items():
                            if re.search(pattern, field_value):
                                if pattern_name in ["ssn", "credit_card", "passport"]:
                                    sensitive_pii_score += 1
                                else:
                                    pii_score += 1
        
        # Determine classification
        if sensitive_pii_score > 0:
            return DataClassification.SENSITIVE_PERSONAL
        elif pii_score > 2:
            return DataClassification.PERSONAL
        elif pii_score > 0:
            return DataClassification.CONFIDENTIAL
        
        return None

class FinancialDataClassifier(DataClassifier):
    """Classifier for financial data"""
    
    def __init__(self):
        self.financial_keywords = [
            "revenue", "profit", "loss", "earnings", "financial", "accounting",
            "budget", "cost", "expense", "income", "salary", "payment",
            "invoice", "transaction", "balance", "credit", "debit"
        ]
        
        self.financial_field_patterns = [
            "amount", "price", "cost", "fee", "rate", "balance", "total"
        ]
    
    def classify(self, asset: DataAsset) -> Optional[DataClassification]:
        """Classify asset based on financial content"""
        
        financial_score = 0
        
        # Check asset name and description
        text_content = f"{asset.name} {asset.description}".lower()
        for keyword in self.financial_keywords:
            if keyword in text_content:
                financial_score += 1
        
        # Check schema fields
        if "properties" in asset.schema:
            for field_name in asset.schema["properties"].keys():
                field_name_lower = field_name.lower()
                for pattern in self.financial_field_patterns:
                    if pattern in field_name_lower:
                        financial_score += 1
        
        # Classify based on score
        if financial_score >= 3:
            return DataClassification.CONFIDENTIAL
        elif financial_score >= 1:
            return DataClassification.INTERNAL
        
        return None

class ComplianceManager:
    """Manage compliance requirements and validation"""
    
    def __init__(self):
        self.compliance_rules = {
            ComplianceFramework.GDPR: GDPRComplianceRules(),
            ComplianceFramework.CCPA: CCPAComplianceRules(),
            ComplianceFramework.HIPAA: HIPAAComplianceRules(),
            ComplianceFramework.SOX: SOXComplianceRules()
        }
    
    def validate_compliance(self, asset: DataAsset) -> Dict[str, Any]:
        """Validate asset compliance against applicable frameworks"""
        
        compliance_report = {
            "asset_id": asset.asset_id,
            "asset_name": asset.name,
            "validation_timestamp": datetime.now(timezone.utc).isoformat(),
            "overall_compliant": True,
            "framework_results": {}
        }
        
        for framework in asset.compliance_requirements:
            if framework in self.compliance_rules:
                rules = self.compliance_rules[framework]
                result = rules.validate(asset)
                compliance_report["framework_results"][framework.value] = result
                
                if not result["compliant"]:
                    compliance_report["overall_compliant"] = False
        
        return compliance_report
    
    def get_compliance_requirements(self, classification: DataClassification) -> Set[ComplianceFramework]:
        """Get applicable compliance frameworks for data classification"""
        
        requirements = set()
        
        if classification in [DataClassification.PERSONAL, DataClassification.SENSITIVE_PERSONAL]:
            requirements.add(ComplianceFramework.GDPR)
            requirements.add(ComplianceFramework.CCPA)
        
        if classification == DataClassification.SENSITIVE_PERSONAL:
            requirements.add(ComplianceFramework.HIPAA)
        
        if classification == DataClassification.CONFIDENTIAL:
            requirements.add(ComplianceFramework.SOX)
        
        return requirements

class ComplianceRules(ABC):
    """Abstract base class for compliance rules"""
    
    @abstractmethod
    def validate(self, asset: DataAsset) -> Dict[str, Any]:
        """Validate asset against compliance rules"""
        pass

class GDPRComplianceRules(ComplianceRules):
    """GDPR compliance validation rules"""
    
    def validate(self, asset: DataAsset) -> Dict[str, Any]:
        """Validate GDPR compliance"""
        
        result = {
            "framework": "GDPR",
            "compliant": True,
            "violations": [],
            "recommendations": []
        }
        
        # Check data subject rights
        if asset.classification in [DataClassification.PERSONAL, DataClassification.SENSITIVE_PERSONAL]:
            
            # Right to erasure (retention policy required)
            if not asset.retention_policy:
                result["violations"].append("Missing retention policy for personal data")
                result["compliant"] = False
            
            # Data minimization (purpose limitation)
            purpose_limited = False
            for policy in asset.access_policies:
                if policy.purpose_limitations:
                    purpose_limited = True
                    break
            
            if not purpose_limited:
                result["violations"].append("No purpose limitations defined for personal data access")
                result["recommendations"].append("Define purpose limitations for data access policies")
            
            # Consent tracking (metadata should include consent information)
            if "consent_tracking" not in asset.metadata:
                result["violations"].append("Missing consent tracking for personal data")
                result["recommendations"].append("Implement consent tracking mechanism")
            
            # Data protection by design
            encryption_required = asset.classification == DataClassification.SENSITIVE_PERSONAL
            if encryption_required and not asset.metadata.get("encrypted", False):
                result["violations"].append("Sensitive personal data must be encrypted")
                result["compliant"] = False
        
        return result

class CCPAComplianceRules(ComplianceRules):
    """CCPA compliance validation rules"""
    
    def validate(self, asset: DataAsset) -> Dict[str, Any]:
        """Validate CCPA compliance"""
        
        result = {
            "framework": "CCPA",
            "compliant": True,
            "violations": [],
            "recommendations": []
        }
        
        # CCPA applies to personal information of California residents
        if asset.classification in [DataClassification.PERSONAL, DataClassification.SENSITIVE_PERSONAL]:
            
            # Right to know - data inventory required
            if not asset.description:
                result["violations"].append("Personal information assets must have detailed descriptions")
                result["recommendations"].append("Add comprehensive description of data processing purposes")
            
            # Right to delete
            if not asset.retention_policy or not asset.retention_policy.auto_delete:
                result["recommendations"].append("Consider implementing automated deletion for consumer requests")
            
            # Right to opt-out
            if "opt_out_mechanism" not in asset.metadata:
                result["recommendations"].append("Implement opt-out mechanism for personal information")
        
        return result

class HIPAAComplianceRules(ComplianceRules):
    """HIPAA compliance validation rules"""
    
    def validate(self, asset: DataAsset) -> Dict[str, Any]:
        """Validate HIPAA compliance"""
        
        result = {
            "framework": "HIPAA",
            "compliant": True,
            "violations": [],
            "recommendations": []
        }
        
        # HIPAA applies to protected health information (PHI)
        if asset.classification == DataClassification.SENSITIVE_PERSONAL:
            
            # Administrative safeguards
            if not asset.owner:
                result["violations"].append("PHI must have designated data owner")
                result["compliant"] = False
            
            # Physical safeguards
            if not asset.metadata.get("access_controls", False):
                result["violations"].append("PHI must have physical access controls")
                result["compliant"] = False
            
            # Technical safeguards
            if not asset.metadata.get("encrypted", False):
                result["violations"].append("PHI must be encrypted")
                result["compliant"] = False
            
            if not asset.metadata.get("audit_logging", False):
                result["violations"].append("PHI access must be logged and audited")
                result["compliant"] = False
        
        return result

class SOXComplianceRules(ComplianceRules):
    """SOX compliance validation rules"""
    
    def validate(self, asset: DataAsset) -> Dict[str, Any]:
        """Validate SOX compliance"""
        
        result = {
            "framework": "SOX",
            "compliant": True,
            "violations": [],
            "recommendations": []
        }
        
        # SOX applies to financial data
        if asset.classification == DataClassification.CONFIDENTIAL:
            
            # Internal controls
            if not asset.access_policies:
                result["violations"].append("Financial data must have defined access controls")
                result["compliant"] = False
            
            # Audit trail
            if not asset.metadata.get("audit_logging", False):
                result["violations"].append("Financial data access must be audited")
                result["compliant"] = False
            
            # Data integrity
            if not asset.metadata.get("integrity_checks", False):
                result["recommendations"].append("Implement data integrity validation")
            
            # Change management
            if not asset.metadata.get("change_approval", False):
                result["recommendations"].append("Implement change approval process")
        
        return result

class AuditLogger:
    """Comprehensive audit logging for governance activities"""
    
    def __init__(self):
        self.audit_events: List[Dict[str, Any]] = []
        
    def log_asset_registration(self, asset: DataAsset):
        """Log asset registration event"""
        
        event = {
            "event_id": str(uuid.uuid4()),
            "event_type": "asset_registration",
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "asset_id": asset.asset_id,
            "asset_name": asset.name,
            "asset_type": asset.asset_type,
            "classification": asset.classification.value,
            "owner": asset.owner,
            "metadata": {
                "location": asset.location,
                "tags": list(asset.tags)
            }
        }
        
        self.audit_events.append(event)
        logging.info(f"Audit: Asset registered - {asset.name}")
    
    def log_asset_update(self, asset: DataAsset, old_values: Dict[str, Any], new_values: Dict[str, Any]):
        """Log asset update event"""
        
        event = {
            "event_id": str(uuid.uuid4()),
            "event_type": "asset_update",
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "asset_id": asset.asset_id,
            "asset_name": asset.name,
            "changes": {
                "old_values": old_values,
                "new_values": new_values
            },
            "metadata": {
                "updated_by": "system"  # Would come from context
            }
        }
        
        self.audit_events.append(event)
        logging.info(f"Audit: Asset updated - {asset.name}")
    
    def log_access_attempt(self, asset: DataAsset, principal: str, 
                          access_level: AccessLevel, granted: bool, context: Dict[str, Any]):
        """Log access attempt event"""
        
        event = {
            "event_id": str(uuid.uuid4()),
            "event_type": "access_attempt",
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "asset_id": asset.asset_id,
            "asset_name": asset.name,
            "principal": principal,
            "access_level": access_level.value,
            "granted": granted,
            "context": context,
            "metadata": {
                "asset_classification": asset.classification.value,
                "client_ip": context.get("client_ip"),
                "user_agent": context.get("user_agent")
            }
        }
        
        self.audit_events.append(event)
        
        status = "granted" if granted else "denied"
        logging.info(f"Audit: Access {status} - {principal} -> {asset.name} ({access_level.value})")
    
    def log_compliance_validation(self, asset: DataAsset, compliance_result: Dict[str, Any]):
        """Log compliance validation event"""
        
        event = {
            "event_id": str(uuid.uuid4()),
            "event_type": "compliance_validation",
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "asset_id": asset.asset_id,
            "asset_name": asset.name,
            "compliance_result": compliance_result,
            "metadata": {
                "asset_classification": asset.classification.value,
                "compliance_frameworks": [f.value for f in asset.compliance_requirements]
            }
        }
        
        self.audit_events.append(event)
        
        status = "compliant" if compliance_result["overall_compliant"] else "non_compliant"
        logging.info(f"Audit: Compliance validation {status} - {asset.name}")
    
    def get_audit_trail(self, asset_id: Optional[str] = None, 
                       event_type: Optional[str] = None,
                       start_date: Optional[datetime] = None,
                       end_date: Optional[datetime] = None) -> List[Dict[str, Any]]:
        """Get filtered audit trail"""
        
        filtered_events = []
        
        for event in self.audit_events:
            # Filter by asset ID
            if asset_id and event.get("asset_id") != asset_id:
                continue
            
            # Filter by event type
            if event_type and event.get("event_type") != event_type:
                continue
            
            # Filter by date range
            event_timestamp = datetime.fromisoformat(event["timestamp"])
            if start_date and event_timestamp < start_date:
                continue
            if end_date and event_timestamp > end_date:
                continue
            
            filtered_events.append(event)
        
        return filtered_events
    
    def generate_audit_report(self, timeframe_days: int = 30) -> Dict[str, Any]:
        """Generate comprehensive audit report"""
        
        end_date = datetime.now(timezone.utc)
        start_date = end_date - timedelta(days=timeframe_days)
        
        events = self.get_audit_trail(start_date=start_date, end_date=end_date)
        
        report = {
            "report_id": str(uuid.uuid4()),
            "generated_at": end_date.isoformat(),
            "timeframe_days": timeframe_days,
            "total_events": len(events),
            "event_summary": {},
            "access_summary": {},
            "compliance_summary": {},
            "top_accessed_assets": {},
            "security_incidents": []
        }
        
        # Event type summary
        for event in events:
            event_type = event["event_type"]
            report["event_summary"][event_type] = report["event_summary"].get(event_type, 0) + 1
        
        # Access summary
        access_events = [e for e in events if e["event_type"] == "access_attempt"]
        granted_access = [e for e in access_events if e["granted"]]
        denied_access = [e for e in access_events if not e["granted"]]
        
        report["access_summary"] = {
            "total_attempts": len(access_events),
            "granted": len(granted_access),
            "denied": len(denied_access),
            "success_rate": len(granted_access) / len(access_events) * 100 if access_events else 0
        }
        
        # Compliance summary
        compliance_events = [e for e in events if e["event_type"] == "compliance_validation"]
        compliant_validations = [e for e in compliance_events if e["compliance_result"]["overall_compliant"]]
        
        report["compliance_summary"] = {
            "total_validations": len(compliance_events),
            "compliant": len(compliant_validations),
            "non_compliant": len(compliance_events) - len(compliant_validations),
            "compliance_rate": len(compliant_validations) / len(compliance_events) * 100 if compliance_events else 0
        }
        
        # Top accessed assets
        asset_access_count = {}
        for event in granted_access:
            asset_name = event["asset_name"]
            asset_access_count[asset_name] = asset_access_count.get(asset_name, 0) + 1
        
        top_assets = sorted(asset_access_count.items(), key=lambda x: x[1], reverse=True)[:10]
        report["top_accessed_assets"] = dict(top_assets)
        
        # Security incidents (multiple failed access attempts)
        failed_attempts_by_principal = {}
        for event in denied_access:
            principal = event["principal"]
            if principal not in failed_attempts_by_principal:
                failed_attempts_by_principal[principal] = []
            failed_attempts_by_principal[principal].append(event)
        
        for principal, failed_events in failed_attempts_by_principal.items():
            if len(failed_events) >= 5:  # Threshold for security incident
                report["security_incidents"].append({
                    "type": "multiple_failed_access",
                    "principal": principal,
                    "failed_attempts": len(failed_events),
                    "timeframe": f"{failed_events[0]['timestamp']} to {failed_events[-1]['timestamp']}"
                })
        
        return report

Advanced Data Lineage Tracking

Comprehensive Lineage Implementation

# Advanced data lineage tracking and analysis
from typing import Dict, List, Any, Optional, Set, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timezone
import networkx as nx
import json
import logging

@dataclass
class LineageNode:
    """Data lineage node representation"""
    node_id: str
    name: str
    node_type: str  # table, view, file, transformation, model, report
    system: str
    location: str
    properties: Dict[str, Any] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)
    created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

@dataclass
class LineageEdge:
    """Data lineage relationship representation"""
    edge_id: str
    source_node_id: str
    target_node_id: str
    relationship_type: str  # reads, writes, transforms, derives, depends_on
    transformation_logic: Optional[str] = None
    columns_mapping: Optional[Dict[str, str]] = None
    properties: Dict[str, Any] = field(default_factory=dict)
    created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

class DataLineageTracker:
    """Advanced data lineage tracking system"""
    
    def __init__(self):
        self.lineage_graph = nx.DiGraph()
        self.nodes: Dict[str, LineageNode] = {}
        self.edges: Dict[str, LineageEdge] = {}
        self.column_lineage: Dict[str, Dict[str, List[str]]] = {}  # node_id -> {column -> [source_columns]}
        
    def add_node(self, node: LineageNode) -> str:
        """Add node to lineage graph"""
        
        self.nodes[node.node_id] = node
        self.lineage_graph.add_node(node.node_id, **node.properties)
        
        logging.info(f"Added lineage node: {node.name} ({node.node_id})")
        return node.node_id
    
    def add_edge(self, edge: LineageEdge) -> str:
        """Add edge to lineage graph"""
        
        if edge.source_node_id not in self.nodes:
            raise ValueError(f"Source node {edge.source_node_id} not found")
        
        if edge.target_node_id not in self.nodes:
            raise ValueError(f"Target node {edge.target_node_id} not found")
        
        self.edges[edge.edge_id] = edge
        self.lineage_graph.add_edge(
            edge.source_node_id, edge.target_node_id,
            edge_id=edge.edge_id,
            relationship_type=edge.relationship_type,
            **edge.properties
        )
        
        # Update column lineage if mapping provided
        if edge.columns_mapping:
            if edge.target_node_id not in self.column_lineage:
                self.column_lineage[edge.target_node_id] = {}
            
            for target_col, source_col in edge.columns_mapping.items():
                if target_col not in self.column_lineage[edge.target_node_id]:
                    self.column_lineage[edge.target_node_id][target_col] = []
                self.column_lineage[edge.target_node_id][target_col].append(f"{edge.source_node_id}.{source_col}")
        
        logging.info(f"Added lineage edge: {edge.source_node_id} -> {edge.target_node_id}")
        return edge.edge_id
    
    def get_upstream_lineage(self, node_id: str, depth: int = 10) -> Dict[str, Any]:
        """Get upstream data lineage for a node"""
        
        if node_id not in self.nodes:
            raise ValueError(f"Node {node_id} not found")
        
        upstream_nodes = set()
        upstream_edges = []
        
        # BFS traversal for upstream nodes
        queue = [(node_id, 0)]
        visited = set()
        
        while queue:
            current_node, current_depth = queue.pop(0)
            
            if current_node in visited or current_depth >= depth:
                continue
            
            visited.add(current_node)
            upstream_nodes.add(current_node)
            
            # Get predecessors
            for predecessor in self.lineage_graph.predecessors(current_node):
                if predecessor not in visited:
                    queue.append((predecessor, current_depth + 1))
                
                # Add edge information
                edge_data = self.lineage_graph.edges[predecessor, current_node]
                edge_id = edge_data.get("edge_id")
                if edge_id and edge_id in self.edges:
                    upstream_edges.append(self.edges[edge_id])
        
        return {
            "target_node": self.nodes[node_id],
            "upstream_nodes": [self.nodes[nid] for nid in upstream_nodes if nid != node_id],
            "edges": upstream_edges,
            "depth_reached": depth
        }
    
    def get_downstream_lineage(self, node_id: str, depth: int = 10) -> Dict[str, Any]:
        """Get downstream data lineage for a node"""
        
        if node_id not in self.nodes:
            raise ValueError(f"Node {node_id} not found")
        
        downstream_nodes = set()
        downstream_edges = []
        
        # BFS traversal for downstream nodes
        queue = [(node_id, 0)]
        visited = set()
        
        while queue:
            current_node, current_depth = queue.pop(0)
            
            if current_node in visited or current_depth >= depth:
                continue
            
            visited.add(current_node)
            downstream_nodes.add(current_node)
            
            # Get successors
            for successor in self.lineage_graph.successors(current_node):
                if successor not in visited:
                    queue.append((successor, current_depth + 1))
                
                # Add edge information
                edge_data = self.lineage_graph.edges[current_node, successor]
                edge_id = edge_data.get("edge_id")
                if edge_id and edge_id in self.edges:
                    downstream_edges.append(self.edges[edge_id])
        
        return {
            "source_node": self.nodes[node_id],
            "downstream_nodes": [self.nodes[nid] for nid in downstream_nodes if nid != node_id],
            "edges": downstream_edges,
            "depth_reached": depth
        }
    
    def get_column_lineage(self, node_id: str, column_name: str) -> List[str]:
        """Get column-level lineage"""
        
        if node_id not in self.column_lineage:
            return []
        
        return self.column_lineage[node_id].get(column_name, [])
    
    def find_impact_analysis(self, node_id: str) -> Dict[str, Any]:
        """Perform impact analysis for changes to a node"""
        
        downstream_lineage = self.get_downstream_lineage(node_id)
        
        impact_analysis = {
            "source_node": self.nodes[node_id],
            "potentially_affected_nodes": downstream_lineage["downstream_nodes"],
            "impact_severity": self._calculate_impact_severity(downstream_lineage),
            "affected_systems": self._get_affected_systems(downstream_lineage),
            "recommendations": self._generate_impact_recommendations(downstream_lineage)
        }
        
        return impact_analysis
    
    def find_root_cause_analysis(self, node_id: str) -> Dict[str, Any]:
        """Perform root cause analysis for issues with a node"""
        
        upstream_lineage = self.get_upstream_lineage(node_id)
        
        root_cause_analysis = {
            "target_node": self.nodes[node_id],
            "potential_root_causes": upstream_lineage["upstream_nodes"],
            "dependency_depth": len(upstream_lineage["upstream_nodes"]),
            "critical_dependencies": self._identify_critical_dependencies(upstream_lineage),
            "recommendations": self._generate_root_cause_recommendations(upstream_lineage)
        }
        
        return root_cause_analysis
    
    def _calculate_impact_severity(self, lineage: Dict[str, Any]) -> str:
        """Calculate impact severity based on downstream dependencies"""
        
        downstream_count = len(lineage["downstream_nodes"])
        
        # Count critical systems
        critical_systems = ["production_reports", "customer_facing", "regulatory"]
        critical_count = 0
        
        for node in lineage["downstream_nodes"]:
            for system in critical_systems:
                if system in node.system.lower() or system in node.name.lower():
                    critical_count += 1
                    break
        
        if critical_count > 0:
            return "high"
        elif downstream_count > 10:
            return "medium"
        elif downstream_count > 0:
            return "low"
        else:
            return "none"
    
    def _get_affected_systems(self, lineage: Dict[str, Any]) -> Set[str]:
        """Get list of affected systems"""
        
        systems = set()
        for node in lineage["downstream_nodes"]:
            systems.add(node.system)
        
        return systems
    
    def _generate_impact_recommendations(self, lineage: Dict[str, Any]) -> List[str]:
        """Generate recommendations for impact mitigation"""
        
        recommendations = []
        
        downstream_count = len(lineage["downstream_nodes"])
        
        if downstream_count > 10:
            recommendations.append("High impact change - consider phased rollout")
            recommendations.append("Notify downstream data owners before making changes")
        
        if downstream_count > 0:
            recommendations.append("Test changes in development environment first")
            recommendations.append("Monitor downstream systems after deployment")
        
        return recommendations
    
    def _identify_critical_dependencies(self, lineage: Dict[str, Any]) -> List[LineageNode]:
        """Identify critical dependencies in upstream lineage"""
        
        critical_nodes = []
        
        for node in lineage["upstream_nodes"]:
            # Check if node is a single point of failure
            downstream_of_node = self.get_downstream_lineage(node.node_id, depth=1)
            if len(downstream_of_node["downstream_nodes"]) > 5:
                critical_nodes.append(node)
        
        return critical_nodes
    
    def _generate_root_cause_recommendations(self, lineage: Dict[str, Any]) -> List[str]:
        """Generate recommendations for root cause investigation"""
        
        recommendations = []
        
        upstream_count = len(lineage["upstream_nodes"])
        
        if upstream_count > 10:
            recommendations.append("Complex dependency chain - investigate most recent changes first")
        
        if upstream_count > 0:
            recommendations.append("Check data quality and freshness of upstream sources")
            recommendations.append("Validate transformation logic in pipeline")
        
        return recommendations
    
    def detect_circular_dependencies(self) -> List[List[str]]:
        """Detect circular dependencies in lineage graph"""
        
        try:
            # NetworkX will raise exception if cycles exist
            cycles = list(nx.simple_cycles(self.lineage_graph))
            return cycles
        except nx.NetworkXNoCycle:
            return []
    
    def get_lineage_statistics(self) -> Dict[str, Any]:
        """Get comprehensive lineage statistics"""
        
        stats = {
            "total_nodes": len(self.nodes),
            "total_edges": len(self.edges),
            "node_types": {},
            "relationship_types": {},
            "systems": set(),
            "max_depth": 0,
            "avg_degree": 0,
            "circular_dependencies": len(self.detect_circular_dependencies())
        }
        
        # Node type distribution
        for node in self.nodes.values():
            stats["node_types"][node.node_type] = stats["node_types"].get(node.node_type, 0) + 1
            stats["systems"].add(node.system)
        
        # Relationship type distribution
        for edge in self.edges.values():
            rel_type = edge.relationship_type
            stats["relationship_types"][rel_type] = stats["relationship_types"].get(rel_type, 0) + 1
        
        # Graph metrics
        if self.lineage_graph.nodes:
            degrees = [self.lineage_graph.degree(node) for node in self.lineage_graph.nodes]
            stats["avg_degree"] = sum(degrees) / len(degrees)
            
            # Calculate maximum depth
            for node in self.lineage_graph.nodes:
                try:
                    depths = nx.single_source_shortest_path_length(self.lineage_graph, node)
                    max_node_depth = max(depths.values()) if depths else 0
                    stats["max_depth"] = max(stats["max_depth"], max_node_depth)
                except:
                    pass
        
        stats["systems"] = list(stats["systems"])
        
        return stats
    
    def export_lineage(self, format: str = "json") -> str:
        """Export lineage graph in specified format"""
        
        if format == "json":
            export_data = {
                "nodes": [
                    {
                        "id": node.node_id,
                        "name": node.name,
                        "type": node.node_type,
                        "system": node.system,
                        "location": node.location,
                        "properties": node.properties,
                        "metadata": node.metadata
                    }
                    for node in self.nodes.values()
                ],
                "edges": [
                    {
                        "id": edge.edge_id,
                        "source": edge.source_node_id,
                        "target": edge.target_node_id,
                        "type": edge.relationship_type,
                        "transformation_logic": edge.transformation_logic,
                        "columns_mapping": edge.columns_mapping,
                        "properties": edge.properties
                    }
                    for edge in self.edges.values()
                ],
                "metadata": {
                    "exported_at": datetime.now(timezone.utc).isoformat(),
                    "statistics": self.get_lineage_statistics()
                }
            }
            
            return json.dumps(export_data, indent=2, default=str)
        
        elif format == "graphml":
            # Export as GraphML for visualization tools
            nx.write_graphml(self.lineage_graph, "lineage_graph.graphml")
            return "lineage_graph.graphml"
        
        else:
            raise ValueError(f"Unsupported export format: {format}")

class AutomatedLineageCapture:
    """Automatically capture lineage from various sources"""
    
    def __init__(self, lineage_tracker: DataLineageTracker):
        self.lineage_tracker = lineage_tracker
        self.parsers = {
            "sql": SQLLineageParser(),
            "spark": SparkLineageParser(),
            "airflow": AirflowLineageParser(),
            "dbt": DBTLineageParser()
        }
    
    def capture_from_sql(self, sql_query: str, execution_context: Dict[str, Any]) -> List[str]:
        """Capture lineage from SQL query"""
        
        parser = self.parsers["sql"]
        lineage_info = parser.parse(sql_query, execution_context)
        
        # Create nodes and edges from parsed information
        edge_ids = []
        
        for source_table in lineage_info["source_tables"]:
            source_node_id = self._ensure_node_exists(source_table)
            
            for target_table in lineage_info["target_tables"]:
                target_node_id = self._ensure_node_exists(target_table)
                
                edge = LineageEdge(
                    edge_id=str(uuid.uuid4()),
                    source_node_id=source_node_id,
                    target_node_id=target_node_id,
                    relationship_type="transforms",
                    transformation_logic=sql_query,
                    columns_mapping=lineage_info.get("column_mapping", {}),
                    properties={"query_type": lineage_info.get("query_type", "unknown")}
                )
                
                edge_id = self.lineage_tracker.add_edge(edge)
                edge_ids.append(edge_id)
        
        return edge_ids
    
    def _ensure_node_exists(self, table_info: Dict[str, Any]) -> str:
        """Ensure node exists in lineage graph"""
        
        node_id = f"{table_info['database']}.{table_info['schema']}.{table_info['table']}"
        
        if node_id not in self.lineage_tracker.nodes:
            node = LineageNode(
                node_id=node_id,
                name=table_info['table'],
                node_type="table",
                system=table_info.get('system', 'unknown'),
                location=f"{table_info['database']}.{table_info['schema']}.{table_info['table']}",
                properties=table_info
            )
            
            self.lineage_tracker.add_node(node)
        
        return node_id

class LineageParser(ABC):
    """Abstract base class for lineage parsers"""
    
    @abstractmethod
    def parse(self, source: str, context: Dict[str, Any]) -> Dict[str, Any]:
        """Parse lineage information from source"""
        pass

class SQLLineageParser(LineageParser):
    """Parse lineage from SQL queries"""
    
    def parse(self, sql_query: str, context: Dict[str, Any]) -> Dict[str, Any]:
        """Parse SQL query for lineage information"""
        
        # Simplified SQL parsing - in practice would use sqlparse or similar
        sql_upper = sql_query.upper()
        
        lineage_info = {
            "source_tables": [],
            "target_tables": [],
            "column_mapping": {},
            "query_type": "unknown"
        }
        
        # Determine query type
        if sql_upper.strip().startswith("SELECT"):
            lineage_info["query_type"] = "select"
        elif sql_upper.strip().startswith("INSERT"):
            lineage_info["query_type"] = "insert"
        elif sql_upper.strip().startswith("UPDATE"):
            lineage_info["query_type"] = "update"
        elif sql_upper.strip().startswith("CREATE"):
            lineage_info["query_type"] = "create"
        
        # Extract table references (simplified)
        import re
        
        # Find FROM clause tables
        from_match = re.search(r'FROM\s+(\w+(?:\.\w+)*)', sql_upper)
        if from_match:
            table_name = from_match.group(1)
            parts = table_name.split('.')
            
            if len(parts) == 3:
                database, schema, table = parts
            elif len(parts) == 2:
                database = context.get("default_database", "default")
                schema, table = parts
            else:
                database = context.get("default_database", "default")
                schema = context.get("default_schema", "public")
                table = parts[0]
            
            lineage_info["source_tables"].append({
                "database": database,
                "schema": schema,
                "table": table,
                "system": context.get("system", "unknown")
            })
        
        # Find JOIN tables
        join_matches = re.findall(r'JOIN\s+(\w+(?:\.\w+)*)', sql_upper)
        for join_table in join_matches:
            parts = join_table.split('.')
            
            if len(parts) == 3:
                database, schema, table = parts
            elif len(parts) == 2:
                database = context.get("default_database", "default")
                schema, table = parts
            else:
                database = context.get("default_database", "default")
                schema = context.get("default_schema", "public")
                table = parts[0]
            
            lineage_info["source_tables"].append({
                "database": database,
                "schema": schema,
                "table": table,
                "system": context.get("system", "unknown")
            })
        
        # Find target tables (INSERT INTO, CREATE TABLE, etc.)
        target_matches = re.findall(r'(?:INSERT\s+INTO|CREATE\s+TABLE)\s+(\w+(?:\.\w+)*)', sql_upper)
        for target_table in target_matches:
            parts = target_table.split('.')
            
            if len(parts) == 3:
                database, schema, table = parts
            elif len(parts) == 2:
                database = context.get("default_database", "default")
                schema, table = parts
            else:
                database = context.get("default_database", "default")
                schema = context.get("default_schema", "public")
                table = parts[0]
            
            lineage_info["target_tables"].append({
                "database": database,
                "schema": schema,
                "table": table,
                "system": context.get("system", "unknown")
            })
        
        return lineage_info

class SparkLineageParser(LineageParser):
    """Parse lineage from Spark applications"""
    
    def parse(self, source: str, context: Dict[str, Any]) -> Dict[str, Any]:
        """Parse Spark application for lineage information"""
        
        # Would integrate with Spark's query execution plans
        # This is a simplified implementation
        
        lineage_info = {
            "source_tables": [],
            "target_tables": [],
            "transformations": [],
            "query_type": "spark_job"
        }
        
        # Parse from Spark execution plan or application logs
        # Implementation would depend on specific Spark integration
        
        return lineage_info

class AirflowLineageParser(LineageParser):
    """Parse lineage from Airflow DAGs"""
    
    def parse(self, source: str, context: Dict[str, Any]) -> Dict[str, Any]:
        """Parse Airflow DAG for lineage information"""
        
        # Would parse Airflow DAG definitions and task dependencies
        # This is a simplified implementation
        
        lineage_info = {
            "source_datasets": [],
            "target_datasets": [],
            "transformations": [],
            "query_type": "airflow_dag"
        }
        
        return lineage_info

class DBTLineageParser(LineageParser):
    """Parse lineage from dbt models"""
    
    def parse(self, source: str, context: Dict[str, Any]) -> Dict[str, Any]:
        """Parse dbt model for lineage information"""
        
        # Would parse dbt manifest.json and model files
        # This is a simplified implementation
        
        lineage_info = {
            "source_models": [],
            "target_models": [],
            "transformations": [],
            "query_type": "dbt_model"
        }
        
        return lineage_info

Conclusion

Implementing advanced data governance and lineage tracking systems requires sophisticated approaches to data classification, access control, compliance automation, and comprehensive audit capabilities. The frameworks and implementations shown in this guide provide a foundation for building enterprise-grade data governance systems that can scale with organizational needs while ensuring regulatory compliance and data security.

Key takeaways for successful data governance implementation include:

  1. Automated Classification: Implement intelligent data classification systems that can automatically identify sensitive data and apply appropriate policies
  2. Comprehensive Lineage: Build detailed lineage tracking that captures column-level dependencies and transformation logic
  3. Policy Automation: Create automated policy enforcement mechanisms that reduce manual governance overhead
  4. Compliance Integration: Integrate compliance frameworks directly into governance workflows for continuous validation
  5. Audit Excellence: Implement comprehensive audit logging and reporting for transparency and regulatory requirements

By following these advanced patterns and implementing robust governance frameworks, organizations can build data management systems that provide both operational efficiency and regulatory compliance while maintaining data quality and security at enterprise scale.