Advanced ETL/ELT Pipeline Development with Airflow and Prefect
Modern data engineering requires sophisticated orchestration tools to manage complex ETL/ELT pipelines. Apache Airflow and Prefect represent the current generation of workflow orchestration platforms that enable building scalable, maintainable, and observable data pipelines with advanced features like dynamic task generation, robust error handling, and comprehensive monitoring.
Advanced ETL/ELT Pipeline Development with Airflow and Prefect
Apache Airflow Advanced Patterns
Dynamic DAG Generation
# dynamic_dag_factory.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.operators.s3 import S3FileTransformOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import yaml
import os
class DynamicDAGFactory:
"""Factory for generating DAGs dynamically from configuration"""
def __init__(self, config_path: str):
self.config_path = config_path
self.config = self._load_config()
def _load_config(self) -> dict:
"""Load pipeline configuration from YAML file"""
with open(self.config_path, 'r') as file:
return yaml.safe_load(file)
def create_dag(self, pipeline_name: str) -> DAG:
"""Create a DAG based on configuration"""
pipeline_config = self.config['pipelines'][pipeline_name]
default_args = {
'owner': pipeline_config.get('owner', 'data-team'),
'depends_on_past': False,
'start_date': datetime.strptime(
pipeline_config.get('start_date', '2025-12-01'),
'%Y-%m-%d'
),
'email_on_failure': True,
'email_on_retry': False,
'retries': pipeline_config.get('retries', 3),
'retry_delay': timedelta(minutes=pipeline_config.get('retry_delay', 5)),
'max_active_runs': pipeline_config.get('max_active_runs', 1),
}
dag = DAG(
dag_id=f"{pipeline_name}_pipeline",
default_args=default_args,
description=pipeline_config.get('description', ''),
schedule_interval=pipeline_config.get('schedule', '@daily'),
catchup=pipeline_config.get('catchup', False),
tags=pipeline_config.get('tags', []),
)
with dag:
self._create_tasks(pipeline_config, dag)
return dag
def _create_tasks(self, config: dict, dag: DAG):
"""Create tasks based on configuration"""
tasks = {}
# Create extraction tasks
if 'extract' in config:
extraction_group = self._create_extraction_group(config['extract'])
tasks['extract'] = extraction_group
# Create transformation tasks
if 'transform' in config:
transformation_group = self._create_transformation_group(config['transform'])
tasks['transform'] = transformation_group
# Create loading tasks
if 'load' in config:
loading_group = self._create_loading_group(config['load'])
tasks['load'] = loading_group
# Create data quality tasks
if 'quality' in config:
quality_group = self._create_quality_group(config['quality'])
tasks['quality'] = quality_group
# Set dependencies
self._set_dependencies(tasks, config.get('dependencies', {}))
def _create_extraction_group(self, extract_config: dict) -> TaskGroup:
"""Create extraction task group"""
with TaskGroup(group_id='extract', tooltip='Data Extraction Tasks') as group:
for source_name, source_config in extract_config.items():
if source_config['type'] == 'database':
task = self._create_database_extract_task(source_name, source_config)
elif source_config['type'] == 'api':
task = self._create_api_extract_task(source_name, source_config)
elif source_config['type'] == 'file':
task = self._create_file_extract_task(source_name, source_config)
elif source_config['type'] == 's3':
task = self._create_s3_extract_task(source_name, source_config)
return group
def _create_database_extract_task(self, source_name: str, config: dict) -> PythonOperator:
"""Create database extraction task"""
def extract_from_database(**context):
from airflow.providers.postgres.hooks.postgres import PostgresHook
import pandas as pd
hook = PostgresHook(postgres_conn_id=config['connection_id'])
# Execute extraction query
sql = config['query']
if config.get('incremental', False):
# Add incremental logic
last_run = context.get('prev_execution_date')
if last_run:
sql = sql.replace('{{last_run}}', last_run.strftime('%Y-%m-%d %H:%M:%S'))
df = hook.get_pandas_df(sql)
# Save to staging area
output_path = f"s3://data-lake/staging/{source_name}/{context['ds']}/data.parquet"
df.to_parquet(output_path, index=False)
return output_path
return PythonOperator(
task_id=f'extract_{source_name}',
python_callable=extract_from_database,
pool=config.get('pool', 'default_pool'),
pool_slots=config.get('pool_slots', 1),
)
def _create_api_extract_task(self, source_name: str, config: dict) -> PythonOperator:
"""Create API extraction task"""
def extract_from_api(**context):
import requests
import pandas as pd
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Setup retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount("http://", adapter)
session.mount("https://", adapter)
# Make API call
url = config['url']
headers = config.get('headers', {})
params = config.get('params', {})
# Add authentication if configured
if 'auth' in config:
auth_config = config['auth']
if auth_config['type'] == 'bearer':
token = Variable.get(auth_config['token_variable'])
headers['Authorization'] = f"Bearer {token}"
response = session.get(url, headers=headers, params=params, timeout=30)
response.raise_for_status()
# Process response
data = response.json()
if config.get('data_path'):
# Extract specific path from JSON
for key in config['data_path'].split('.'):
data = data[key]
df = pd.json_normalize(data)
# Save to staging area
output_path = f"s3://data-lake/staging/{source_name}/{context['ds']}/data.parquet"
df.to_parquet(output_path, index=False)
return output_path
return PythonOperator(
task_id=f'extract_{source_name}',
python_callable=extract_from_api,
retries=5,
retry_delay=timedelta(minutes=2),
)
def create_etl_pipeline_dag(pipeline_name: str) -> DAG:
"""Create ETL pipeline DAG dynamically"""
config_path = f"/opt/airflow/config/pipelines/{pipeline_name}.yaml"
factory = DynamicDAGFactory(config_path)
return factory.create_dag(pipeline_name)
# Generate DAGs for all configured pipelines
import glob
for config_file in glob.glob("/opt/airflow/config/pipelines/*.yaml"):
pipeline_name = os.path.basename(config_file).replace('.yaml', '')
dag_id = f"{pipeline_name}_pipeline"
globals()[dag_id] = create_etl_pipeline_dag(pipeline_name)
Advanced Airflow Operators
# custom_operators.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from typing import Dict, List, Optional, Any
import pandas as pd
import boto3
import logging
class DataQualityOperator(BaseOperator):
"""Custom operator for data quality checks"""
template_fields = ['sql', 'table_name']
@apply_defaults
def __init__(
self,
postgres_conn_id: str,
table_name: str,
quality_checks: List[Dict],
fail_on_quality_issues: bool = True,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.postgres_conn_id = postgres_conn_id
self.table_name = table_name
self.quality_checks = quality_checks
self.fail_on_quality_issues = fail_on_quality_issues
def execute(self, context):
hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
quality_results = []
for check in self.quality_checks:
check_name = check['name']
check_sql = check['sql'].format(table_name=self.table_name)
expected_result = check.get('expected_result', 0)
logging.info(f"Running quality check: {check_name}")
logging.info(f"SQL: {check_sql}")
result = hook.get_first(check_sql)[0]
check_result = {
'check_name': check_name,
'result': result,
'expected': expected_result,
'passed': result == expected_result,
'description': check.get('description', '')
}
quality_results.append(check_result)
if not check_result['passed']:
error_msg = f"Quality check '{check_name}' failed. Expected: {expected_result}, Got: {result}"
logging.error(error_msg)
if self.fail_on_quality_issues:
raise ValueError(error_msg)
# Log quality results summary
passed_checks = sum(1 for r in quality_results if r['passed'])
total_checks = len(quality_results)
logging.info(f"Data quality summary: {passed_checks}/{total_checks} checks passed")
# Store results for downstream tasks
context['task_instance'].xcom_push(key='quality_results', value=quality_results)
return quality_results
class SmartDataTransferOperator(BaseOperator):
"""Smart data transfer operator with optimization and monitoring"""
template_fields = ['source_path', 'destination_path', 'sql']
@apply_defaults
def __init__(
self,
source_conn_id: str,
destination_conn_id: str,
source_path: Optional[str] = None,
destination_path: Optional[str] = None,
sql: Optional[str] = None,
chunk_size: int = 10000,
compression: str = 'gzip',
data_type_optimization: bool = True,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.source_conn_id = source_conn_id
self.destination_conn_id = destination_conn_id
self.source_path = source_path
self.destination_path = destination_path
self.sql = sql
self.chunk_size = chunk_size
self.compression = compression
self.data_type_optimization = data_type_optimization
def execute(self, context):
if self.sql:
return self._transfer_from_database(context)
elif self.source_path:
return self._transfer_from_file(context)
else:
raise ValueError("Either sql or source_path must be provided")
def _transfer_from_database(self, context):
"""Transfer data from database with chunking and optimization"""
source_hook = PostgresHook(postgres_conn_id=self.source_conn_id)
dest_hook = PostgresHook(postgres_conn_id=self.destination_conn_id)
# Get total row count for progress tracking
count_sql = f"SELECT COUNT(*) FROM ({self.sql}) as subquery"
total_rows = source_hook.get_first(count_sql)[0]
logging.info(f"Transferring {total_rows} rows in chunks of {self.chunk_size}")
transferred_rows = 0
offset = 0
while offset < total_rows:
# Fetch chunk
chunk_sql = f"{self.sql} LIMIT {self.chunk_size} OFFSET {offset}"
df = source_hook.get_pandas_df(chunk_sql)
if df.empty:
break
# Optimize data types
if self.data_type_optimization:
df = self._optimize_dtypes(df)
# Insert chunk
df.to_sql(
name=self.destination_path,
con=dest_hook.get_sqlalchemy_engine(),
if_exists='append' if offset > 0 else 'replace',
index=False,
method='multi',
chunksize=1000
)
transferred_rows += len(df)
offset += self.chunk_size
# Log progress
progress = (transferred_rows / total_rows) * 100
logging.info(f"Transfer progress: {progress:.1f}% ({transferred_rows}/{total_rows} rows)")
logging.info(f"Transfer completed. Total rows transferred: {transferred_rows}")
return transferred_rows
def _optimize_dtypes(self, df: pd.DataFrame) -> pd.DataFrame:
"""Optimize pandas DataFrame data types for memory efficiency"""
for col in df.columns:
col_type = df[col].dtype
if col_type == 'object':
# Try to convert to category if beneficial
if df[col].nunique() / len(df) < 0.5:
df[col] = df[col].astype('category')
elif col_type == 'int64':
# Downcast integers
if df[col].min() >= 0:
if df[col].max() < 255:
df[col] = df[col].astype('uint8')
elif df[col].max() < 65535:
df[col] = df[col].astype('uint16')
elif df[col].max() < 4294967295:
df[col] = df[col].astype('uint32')
else:
if df[col].min() > -128 and df[col].max() < 127:
df[col] = df[col].astype('int8')
elif df[col].min() > -32768 and df[col].max() < 32767:
df[col] = df[col].astype('int16')
elif df[col].min() > -2147483648 and df[col].max() < 2147483647:
df[col] = df[col].astype('int32')
elif col_type == 'float64':
# Downcast floats
df[col] = pd.to_numeric(df[col], downcast='float')
return df
class ParallelProcessingOperator(BaseOperator):
"""Operator for parallel processing of large datasets"""
@apply_defaults
def __init__(
self,
processing_function: str,
input_path: str,
output_path: str,
num_workers: int = 4,
chunk_size: int = 10000,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.processing_function = processing_function
self.input_path = input_path
self.output_path = output_path
self.num_workers = num_workers
self.chunk_size = chunk_size
def execute(self, context):
from concurrent.futures import ProcessPoolExecutor, as_completed
import importlib
# Import processing function
module_name, function_name = self.processing_function.rsplit('.', 1)
module = importlib.import_module(module_name)
process_func = getattr(module, function_name)
# Read input data
df = pd.read_parquet(self.input_path)
total_rows = len(df)
logging.info(f"Processing {total_rows} rows with {self.num_workers} workers")
# Split data into chunks
chunks = [df[i:i + self.chunk_size] for i in range(0, total_rows, self.chunk_size)]
processed_chunks = []
# Process chunks in parallel
with ProcessPoolExecutor(max_workers=self.num_workers) as executor:
future_to_chunk = {
executor.submit(process_func, chunk): i
for i, chunk in enumerate(chunks)
}
for future in as_completed(future_to_chunk):
chunk_index = future_to_chunk[future]
try:
result = future.result()
processed_chunks.append((chunk_index, result))
logging.info(f"Completed processing chunk {chunk_index + 1}/{len(chunks)}")
except Exception as exc:
logging.error(f"Chunk {chunk_index} generated an exception: {exc}")
raise
# Combine results
processed_chunks.sort(key=lambda x: x[0]) # Sort by chunk index
final_df = pd.concat([chunk[1] for chunk in processed_chunks], ignore_index=True)
# Save results
final_df.to_parquet(self.output_path, index=False, compression='snappy')
logging.info(f"Processing completed. Output saved to {self.output_path}")
return self.output_path
Prefect Advanced Patterns
Flow Design Patterns
# prefect_flows.py
from prefect import Flow, Task, task, Parameter
from prefect.tasks.database import PostgresExecute, PostgresFetch
from prefect.tasks.aws import S3Download, S3Upload
from prefect.tasks.notifications import SlackTask
from prefect.engine.results import LocalResult, S3Result
from prefect.engine.serializers import JSONSerializer
from prefect.schedules import IntervalSchedule
from prefect.run_configs import KubernetesRun
from prefect.storage import S3
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Any
import logging
# Configure Prefect
from prefect.config import config
config.logging.level = "INFO"
@task(max_retries=3, retry_delay=timedelta(minutes=2))
def extract_data_from_source(
source_config: Dict[str, Any],
execution_date: str
) -> pd.DataFrame:
"""Extract data from various sources with retry logic"""
source_type = source_config['type']
if source_type == 'database':
return extract_from_database(source_config, execution_date)
elif source_type == 'api':
return extract_from_api(source_config, execution_date)
elif source_type == 's3':
return extract_from_s3(source_config, execution_date)
else:
raise ValueError(f"Unsupported source type: {source_type}")
def extract_from_database(config: Dict, execution_date: str) -> pd.DataFrame:
"""Extract data from database"""
from sqlalchemy import create_engine
connection_string = config['connection_string']
query = config['query']
# Replace template variables
query = query.replace('{{execution_date}}', execution_date)
engine = create_engine(connection_string)
df = pd.read_sql(query, engine)
logging.info(f"Extracted {len(df)} rows from database")
return df
def extract_from_api(config: Dict, execution_date: str) -> pd.DataFrame:
"""Extract data from API with pagination"""
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
base_url = config['url']
headers = config.get('headers', {})
params = config.get('params', {})
# Setup retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount("http://", adapter)
session.mount("https://", adapter)
all_data = []
page = 1
max_pages = config.get('max_pages', 100)
while page <= max_pages:
params['page'] = page
response = session.get(base_url, headers=headers, params=params, timeout=30)
response.raise_for_status()
data = response.json()
# Extract data from response
if config.get('data_path'):
for key in config['data_path'].split('.'):
data = data[key]
if not data:
break
all_data.extend(data)
page += 1
logging.info(f"Extracted page {page-1}, total records: {len(all_data)}")
df = pd.json_normalize(all_data)
logging.info(f"Extracted {len(df)} rows from API")
return df
@task(max_retries=2, retry_delay=timedelta(minutes=1))
def validate_data_quality(
df: pd.DataFrame,
quality_rules: List[Dict[str, Any]]
) -> pd.DataFrame:
"""Validate data quality with configurable rules"""
quality_results = []
for rule in quality_rules:
rule_name = rule['name']
rule_type = rule['type']
if rule_type == 'not_null':
columns = rule['columns']
for col in columns:
null_count = df[col].isnull().sum()
total_count = len(df)
null_percentage = (null_count / total_count) * 100
max_null_percentage = rule.get('max_null_percentage', 0)
passed = null_percentage <= max_null_percentage
quality_results.append({
'rule_name': f"{rule_name}_{col}",
'rule_type': rule_type,
'column': col,
'null_count': null_count,
'null_percentage': null_percentage,
'max_allowed': max_null_percentage,
'passed': passed
})
elif rule_type == 'unique':
columns = rule['columns']
for col in columns:
total_count = len(df)
unique_count = df[col].nunique()
duplicate_count = total_count - unique_count
max_duplicates = rule.get('max_duplicates', 0)
passed = duplicate_count <= max_duplicates
quality_results.append({
'rule_name': f"{rule_name}_{col}",
'rule_type': rule_type,
'column': col,
'duplicate_count': duplicate_count,
'max_allowed': max_duplicates,
'passed': passed
})
elif rule_type == 'range':
column = rule['column']
min_val = rule.get('min_value')
max_val = rule.get('max_value')
if min_val is not None:
below_min = (df[column] < min_val).sum()
else:
below_min = 0
if max_val is not None:
above_max = (df[column] > max_val).sum()
else:
above_max = 0
out_of_range = below_min + above_max
passed = out_of_range == 0
quality_results.append({
'rule_name': rule_name,
'rule_type': rule_type,
'column': column,
'out_of_range_count': out_of_range,
'below_min': below_min,
'above_max': above_max,
'passed': passed
})
# Check if any rules failed
failed_rules = [r for r in quality_results if not r['passed']]
if failed_rules:
error_msg = f"Data quality validation failed. {len(failed_rules)} rules failed."
logging.error(error_msg)
for failed_rule in failed_rules:
logging.error(f"Failed rule: {failed_rule}")
# You can choose to raise an exception or just log warnings
fail_on_quality_issues = True # Make this configurable
if fail_on_quality_issues:
raise ValueError(error_msg)
logging.info(f"Data quality validation completed. {len(quality_results)} rules checked.")
return df
@task(max_retries=2, retry_delay=timedelta(minutes=1))
def transform_data(
df: pd.DataFrame,
transformations: List[Dict[str, Any]]
) -> pd.DataFrame:
"""Apply configured transformations to the data"""
for transformation in transformations:
transform_type = transformation['type']
if transform_type == 'add_column':
column_name = transformation['column_name']
expression = transformation['expression']
# Simple expression evaluation (can be extended)
if expression == 'current_timestamp':
df[column_name] = pd.Timestamp.now()
elif expression.startswith('concat'):
# Example: concat(col1, col2, separator='-')
# Parse and apply concatenation
pass
else:
# Use eval for simple expressions (be careful in production)
df[column_name] = df.eval(expression)
elif transform_type == 'rename_column':
old_name = transformation['old_name']
new_name = transformation['new_name']
df = df.rename(columns={old_name: new_name})
elif transform_type == 'drop_column':
columns_to_drop = transformation['columns']
df = df.drop(columns=columns_to_drop, errors='ignore')
elif transform_type == 'type_conversion':
column = transformation['column']
target_type = transformation['target_type']
if target_type == 'datetime':
df[column] = pd.to_datetime(df[column], errors='coerce')
elif target_type == 'numeric':
df[column] = pd.to_numeric(df[column], errors='coerce')
elif target_type == 'string':
df[column] = df[column].astype(str)
elif transform_type == 'filter_rows':
condition = transformation['condition']
# Apply row filtering based on condition
df = df.query(condition)
elif transform_type == 'aggregate':
group_by = transformation['group_by']
aggregations = transformation['aggregations']
df = df.groupby(group_by).agg(aggregations).reset_index()
elif transform_type == 'join':
# This would require another dataframe - implement as needed
pass
logging.info(f"Applied {len(transformations)} transformations. Result shape: {df.shape}")
return df
@task(max_retries=3, retry_delay=timedelta(minutes=2))
def load_data_to_destination(
df: pd.DataFrame,
destination_config: Dict[str, Any]
) -> str:
"""Load data to various destinations"""
destination_type = destination_config['type']
if destination_type == 'database':
return load_to_database(df, destination_config)
elif destination_type == 's3':
return load_to_s3(df, destination_config)
elif destination_type == 'data_warehouse':
return load_to_data_warehouse(df, destination_config)
else:
raise ValueError(f"Unsupported destination type: {destination_type}")
def load_to_database(df: pd.DataFrame, config: Dict) -> str:
"""Load data to database"""
from sqlalchemy import create_engine
connection_string = config['connection_string']
table_name = config['table_name']
if_exists = config.get('if_exists', 'replace')
engine = create_engine(connection_string)
df.to_sql(
name=table_name,
con=engine,
if_exists=if_exists,
index=False,
method='multi',
chunksize=1000
)
logging.info(f"Loaded {len(df)} rows to {table_name}")
return f"Database table: {table_name}"
def load_to_s3(df: pd.DataFrame, config: Dict) -> str:
"""Load data to S3"""
import boto3
from io import BytesIO
bucket = config['bucket']
key = config['key']
file_format = config.get('format', 'parquet')
# Prepare data
if file_format == 'parquet':
buffer = BytesIO()
df.to_parquet(buffer, index=False, compression='snappy')
data = buffer.getvalue()
elif file_format == 'csv':
data = df.to_csv(index=False).encode('utf-8')
else:
raise ValueError(f"Unsupported file format: {file_format}")
# Upload to S3
s3_client = boto3.client('s3')
s3_client.put_object(Bucket=bucket, Key=key, Body=data)
s3_path = f"s3://{bucket}/{key}"
logging.info(f"Loaded {len(df)} rows to {s3_path}")
return s3_path
@task
def send_completion_notification(
pipeline_name: str,
execution_date: str,
records_processed: int,
execution_time: float
) -> None:
"""Send pipeline completion notification"""
message = f"""
Pipeline Execution Completed Successfully
Pipeline: {pipeline_name}
Execution Date: {execution_date}
Records Processed: {records_processed:,}
Execution Time: {execution_time:.2f} seconds
"""
# Send to Slack (configure webhook URL)
slack_task = SlackTask(
message=message,
webhook_secret="SLACK_WEBHOOK_URL"
)
# You can also send email notifications here
logging.info("Completion notification sent")
# Flow Definition
def create_etl_flow(pipeline_config: Dict[str, Any]) -> Flow:
"""Create ETL flow from configuration"""
pipeline_name = pipeline_config['name']
with Flow(
name=f"{pipeline_name}_etl_flow",
schedule=IntervalSchedule(interval=timedelta(hours=pipeline_config.get('interval_hours', 24))),
result=S3Result(bucket="prefect-results"),
run_config=KubernetesRun(
image="my-etl-image:latest",
cpu_request="1",
memory_request="2Gi",
cpu_limit="2",
memory_limit="4Gi"
),
storage=S3(bucket="prefect-flows", key=f"flows/{pipeline_name}.flow")
) as flow:
# Parameters
execution_date = Parameter("execution_date", default=datetime.now().strftime("%Y-%m-%d"))
# Extract data from multiple sources
extracted_datasets = []
for source_name, source_config in pipeline_config['sources'].items():
dataset = extract_data_from_source(source_config, execution_date)
extracted_datasets.append(dataset)
# Combine datasets if multiple sources
if len(extracted_datasets) > 1:
# Implement dataset combination logic
combined_data = extracted_datasets[0] # Simplified
else:
combined_data = extracted_datasets[0]
# Validate data quality
validated_data = validate_data_quality(
combined_data,
pipeline_config.get('quality_rules', [])
)
# Transform data
transformed_data = transform_data(
validated_data,
pipeline_config.get('transformations', [])
)
# Load to destinations
load_results = []
for dest_name, dest_config in pipeline_config['destinations'].items():
result = load_data_to_destination(transformed_data, dest_config)
load_results.append(result)
# Send completion notification
send_completion_notification(
pipeline_name,
execution_date,
transformed_data.map(len), # This would need to be handled properly
flow.run_config.estimated_duration if hasattr(flow.run_config, 'estimated_duration') else 0
)
return flow
# Example usage
if __name__ == "__main__":
sample_config = {
"name": "user_analytics",
"interval_hours": 6,
"sources": {
"user_events": {
"type": "database",
"connection_string": "postgresql://user:pass@host:5432/db",
"query": "SELECT * FROM user_events WHERE created_at >= '{{execution_date}}'"
},
"user_profiles": {
"type": "api",
"url": "https://api.example.com/users",
"headers": {"Authorization": "Bearer token"},
"data_path": "data.users"
}
},
"quality_rules": [
{
"name": "user_id_not_null",
"type": "not_null",
"columns": ["user_id"],
"max_null_percentage": 0
},
{
"name": "email_unique",
"type": "unique",
"columns": ["email"],
"max_duplicates": 0
}
],
"transformations": [
{
"type": "add_column",
"column_name": "processed_at",
"expression": "current_timestamp"
},
{
"type": "type_conversion",
"column": "created_at",
"target_type": "datetime"
}
],
"destinations": {
"data_warehouse": {
"type": "database",
"connection_string": "postgresql://user:pass@dw-host:5432/dwh",
"table_name": "user_analytics",
"if_exists": "append"
},
"data_lake": {
"type": "s3",
"bucket": "data-lake",
"key": "analytics/user_analytics/{{execution_date}}/data.parquet",
"format": "parquet"
}
}
}
flow = create_etl_flow(sample_config)
# Register flow with Prefect Cloud/Server
flow.register(project_name="etl-pipelines")
Performance Optimization and Monitoring
Airflow Performance Tuning
# airflow_optimization.py
from airflow.configuration import conf
from airflow.models import DAG, Variable
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
import os
import psutil
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any
class OptimizedDAGConfig:
"""Configuration class for optimized DAG settings"""
@staticmethod
def get_optimized_default_args() -> Dict[str, Any]:
"""Get optimized default arguments for DAGs"""
return {
'owner': 'data-team',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'max_active_runs': 1,
'catchup': False,
# Performance optimizations
'pool': 'default_pool',
'priority_weight': 1,
'weight_rule': 'absolute',
'queue': 'default',
# Resource limits
'task_concurrency': 4,
'max_active_tasks': 16,
}
@staticmethod
def configure_pools():
"""Configure Airflow pools for resource management"""
from airflow.models import Pool
from airflow import settings
session = settings.Session()
# Define pools
pools = [
{'pool': 'cpu_intensive_pool', 'slots': 4, 'description': 'For CPU intensive tasks'},
{'pool': 'memory_intensive_pool', 'slots': 2, 'description': 'For memory intensive tasks'},
{'pool': 'database_pool', 'slots': 8, 'description': 'For database operations'},
{'pool': 'api_pool', 'slots': 10, 'description': 'For API calls'},
{'pool': 'file_io_pool', 'slots': 6, 'description': 'For file I/O operations'},
]
for pool_config in pools:
pool = session.query(Pool).filter(Pool.pool == pool_config['pool']).first()
if not pool:
pool = Pool(
pool=pool_config['pool'],
slots=pool_config['slots'],
description=pool_config['description']
)
session.add(pool)
session.commit()
session.close()
class PerformanceMonitoringOperator(BaseOperator):
"""Operator to monitor DAG and task performance"""
template_fields = ['dag_id', 'task_id']
@apply_defaults
def __init__(
self,
monitored_dag_id: str,
performance_thresholds: Dict[str, Any],
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.monitored_dag_id = monitored_dag_id
self.performance_thresholds = performance_thresholds
def execute(self, context):
from airflow.models import DagRun, TaskInstance
from airflow import settings
session = settings.Session()
# Get recent DAG runs
recent_runs = session.query(DagRun).filter(
DagRun.dag_id == self.monitored_dag_id,
DagRun.end_date.isnot(None)
).order_by(DagRun.end_date.desc()).limit(10).all()
performance_metrics = []
for dag_run in recent_runs:
# Calculate DAG run duration
duration = (dag_run.end_date - dag_run.start_date).total_seconds()
# Get task instances for this DAG run
task_instances = session.query(TaskInstance).filter(
TaskInstance.dag_id == self.monitored_dag_id,
TaskInstance.execution_date == dag_run.execution_date
).all()
# Calculate task-level metrics
task_metrics = {}
for task_instance in task_instances:
if task_instance.end_date and task_instance.start_date:
task_duration = (task_instance.end_date - task_instance.start_date).total_seconds()
task_metrics[task_instance.task_id] = {
'duration': task_duration,
'state': task_instance.state,
'try_number': task_instance.try_number,
'queue': task_instance.queue,
'pool': task_instance.pool,
}
run_metrics = {
'execution_date': dag_run.execution_date,
'duration': duration,
'state': dag_run.state,
'task_count': len(task_instances),
'task_metrics': task_metrics
}
performance_metrics.append(run_metrics)
session.close()
# Analyze performance and generate alerts
self._analyze_performance(performance_metrics)
return performance_metrics
def _analyze_performance(self, metrics: List[Dict]):
"""Analyze performance metrics and generate alerts"""
if not metrics:
return
# Calculate average duration
avg_duration = sum(m['duration'] for m in metrics) / len(metrics)
max_duration = max(m['duration'] for m in metrics)
# Check against thresholds
duration_threshold = self.performance_thresholds.get('max_duration_seconds', 3600)
if avg_duration > duration_threshold:
self.log.warning(
f"DAG {self.monitored_dag_id} average duration ({avg_duration:.2f}s) "
f"exceeds threshold ({duration_threshold}s)"
)
if max_duration > duration_threshold * 1.5:
self.log.error(
f"DAG {self.monitored_dag_id} max duration ({max_duration:.2f}s) "
f"significantly exceeds threshold"
)
# Analyze task-level performance
task_durations = {}
for run_metric in metrics:
for task_id, task_metric in run_metric['task_metrics'].items():
if task_id not in task_durations:
task_durations[task_id] = []
task_durations[task_id].append(task_metric['duration'])
# Identify slow tasks
for task_id, durations in task_durations.items():
avg_task_duration = sum(durations) / len(durations)
task_threshold = self.performance_thresholds.get('max_task_duration_seconds', 1800)
if avg_task_duration > task_threshold:
self.log.warning(
f"Task {task_id} average duration ({avg_task_duration:.2f}s) "
f"exceeds threshold ({task_threshold}s)"
)
def create_resource_monitoring_task():
"""Create task to monitor system resources"""
def monitor_resources(**context):
# Get system resource usage
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Get Airflow-specific metrics
from airflow.models import TaskInstance
from airflow import settings
session = settings.Session()
# Count running tasks
running_tasks = session.query(TaskInstance).filter(
TaskInstance.state == 'running'
).count()
# Count queued tasks
queued_tasks = session.query(TaskInstance).filter(
TaskInstance.state == 'queued'
).count()
session.close()
metrics = {
'timestamp': datetime.now().isoformat(),
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'memory_available_gb': memory.available / (1024**3),
'disk_percent': disk.percent,
'disk_free_gb': disk.free / (1024**3),
'running_tasks': running_tasks,
'queued_tasks': queued_tasks,
}
# Log metrics
logging.info(f"Resource metrics: {metrics}")
# Check thresholds and alert if necessary
if cpu_percent > 80:
logging.warning(f"High CPU usage: {cpu_percent}%")
if memory.percent > 85:
logging.warning(f"High memory usage: {memory.percent}%")
if running_tasks > 20:
logging.warning(f"High number of running tasks: {running_tasks}")
# Store metrics (you could send to monitoring system)
Variable.set("last_resource_metrics", metrics, serialize_json=True)
return metrics
return PythonOperator(
task_id='monitor_resources',
python_callable=monitor_resources,
pool='default_pool',
pool_slots=1,
)
# Optimized DAG example
def create_optimized_etl_dag():
"""Create an optimized ETL DAG with performance considerations"""
default_args = OptimizedDAGConfig.get_optimized_default_args()
dag = DAG(
'optimized_etl_pipeline',
default_args=default_args,
description='High-performance ETL pipeline',
schedule_interval='@hourly',
max_active_runs=1,
max_active_tasks=16,
tags=['etl', 'optimized', 'production'],
)
with dag:
start = DummyOperator(task_id='start')
# Resource monitoring
monitor_task = create_resource_monitoring_task()
# Extraction tasks (parallel)
with TaskGroup(group_id='extract_data') as extract_group:
extract_db = PythonOperator(
task_id='extract_from_database',
python_callable=lambda: None, # Your extraction function
pool='database_pool',
pool_slots=2,
)
extract_api = PythonOperator(
task_id='extract_from_api',
python_callable=lambda: None, # Your API extraction function
pool='api_pool',
pool_slots=1,
)
extract_files = PythonOperator(
task_id='extract_from_files',
python_callable=lambda: None, # Your file extraction function
pool='file_io_pool',
pool_slots=1,
)
# Data quality checks
quality_check = DataQualityOperator(
task_id='data_quality_check',
postgres_conn_id='warehouse_db',
table_name='staging_table',
quality_checks=[
{'name': 'row_count', 'sql': 'SELECT COUNT(*) FROM {table_name}', 'expected_result': 0, 'operator': '>'},
{'name': 'null_check', 'sql': 'SELECT COUNT(*) FROM {table_name} WHERE id IS NULL', 'expected_result': 0},
],
pool='database_pool',
pool_slots=1,
)
# Transformation tasks
with TaskGroup(group_id='transform_data') as transform_group:
transform_users = PythonOperator(
task_id='transform_user_data',
python_callable=lambda: None, # Your transformation function
pool='cpu_intensive_pool',
pool_slots=1,
)
transform_events = PythonOperator(
task_id='transform_event_data',
python_callable=lambda: None, # Your transformation function
pool='cpu_intensive_pool',
pool_slots=1,
)
# Loading task
load_data = SmartDataTransferOperator(
task_id='load_to_warehouse',
source_conn_id='staging_db',
destination_conn_id='warehouse_db',
sql='SELECT * FROM staging_table',
destination_path='fact_table',
chunk_size=10000,
pool='database_pool',
pool_slots=2,
)
# Performance monitoring
perf_monitor = PerformanceMonitoringOperator(
task_id='monitor_performance',
monitored_dag_id='optimized_etl_pipeline',
performance_thresholds={
'max_duration_seconds': 3600,
'max_task_duration_seconds': 1800,
},
)
end = DummyOperator(
task_id='end',
trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED
)
# Set dependencies
start >> monitor_task
start >> extract_group >> quality_check >> transform_group >> load_data
[load_data, monitor_task] >> perf_monitor >> end
return dag
# Create the DAG
optimized_dag = create_optimized_etl_dag()
Production Deployment
Kubernetes Deployment Configuration
# airflow-deployment.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-config
namespace: data-platform
data:
airflow.cfg: |
[core]
dags_folder = /opt/airflow/dags
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = aws_s3_logs
encrypt_s3_logs = True
executor = KubernetesExecutor
sql_alchemy_conn = postgresql://airflow:password@postgres:5432/airflow
parallelism = 32
dag_concurrency = 16
max_active_runs_per_dag = 1
load_examples = False
[kubernetes]
namespace = data-platform
airflow_configmap = airflow-config
worker_container_repository = apache/airflow
worker_container_tag = 2.7.0-python3.9
delete_worker_pods = True
delete_worker_pods_on_failure = False
[webserver]
base_url = https://airflow.data-platform.com
web_server_port = 8080
workers = 4
[scheduler]
catchup_by_default = False
dag_dir_list_interval = 300
child_process_timeout = 600
max_threads = 2
[celery]
worker_concurrency = 4
[email]
email_backend = airflow.providers.sendgrid.utils.emailer.send_email
[logging]
logging_level = INFO
fab_logging_level = WARN
webserver_config.py: |
from airflow import configuration as conf
from flask_appbuilder.security.manager import AUTH_OAUTH
AUTH_TYPE = AUTH_OAUTH
AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Viewer"
OAUTH_PROVIDERS = [
{
'name': 'google',
'token_key': 'access_token',
'icon': 'fa-google',
'remote_app': {
'client_id': 'GOOGLE_CLIENT_ID',
'client_secret': 'GOOGLE_CLIENT_SECRET',
'api_base_url': 'https://www.googleapis.com/oauth2/v2/',
'client_kwargs': {
'scope': 'email profile'
},
'server_metadata_url': 'https://accounts.google.com/.well-known/openid_configuration'
}
}
]
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-webserver
namespace: data-platform
spec:
replicas: 2
selector:
matchLabels:
app: airflow-webserver
template:
metadata:
labels:
app: airflow-webserver
spec:
containers:
- name: webserver
image: apache/airflow:2.7.0-python3.9
command: ["airflow", "webserver"]
ports:
- containerPort: 8080
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow-secrets
key: fernet_key
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
volumeMounts:
- name: config
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
- name: dags
mountPath: /opt/airflow/dags
- name: logs
mountPath: /opt/airflow/logs
volumes:
- name: config
configMap:
name: airflow-config
- name: dags
persistentVolumeClaim:
claimName: airflow-dags-pvc
- name: logs
persistentVolumeClaim:
claimName: airflow-logs-pvc
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-scheduler
namespace: data-platform
spec:
replicas: 1
selector:
matchLabels:
app: airflow-scheduler
template:
metadata:
labels:
app: airflow-scheduler
spec:
containers:
- name: scheduler
image: apache/airflow:2.7.0-python3.9
command: ["airflow", "scheduler"]
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow-secrets
key: fernet_key
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
volumeMounts:
- name: config
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
- name: dags
mountPath: /opt/airflow/dags
- name: logs
mountPath: /opt/airflow/logs
volumes:
- name: config
configMap:
name: airflow-config
- name: dags
persistentVolumeClaim:
claimName: airflow-dags-pvc
- name: logs
persistentVolumeClaim:
claimName: airflow-logs-pvc
---
# Prefect Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: prefect-server
namespace: data-platform
spec:
replicas: 1
selector:
matchLabels:
app: prefect-server
template:
metadata:
labels:
app: prefect-server
spec:
containers:
- name: prefect-server
image: prefecthq/prefect:2.0.0-python3.9
command: ["prefect", "server", "start"]
ports:
- containerPort: 4200
env:
- name: PREFECT_API_DATABASE_CONNECTION_URL
valueFrom:
secretKeyRef:
name: prefect-secrets
key: database_url
- name: PREFECT_SERVER_API_HOST
value: "0.0.0.0"
- name: PREFECT_SERVER_API_PORT
value: "4200"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
---
apiVersion: v1
kind: Service
metadata:
name: airflow-webserver
namespace: data-platform
spec:
selector:
app: airflow-webserver
ports:
- port: 8080
targetPort: 8080
type: LoadBalancer
---
apiVersion: v1
kind: Service
metadata:
name: prefect-server
namespace: data-platform
spec:
selector:
app: prefect-server
ports:
- port: 4200
targetPort: 4200
type: LoadBalancer
Conclusion
Advanced ETL/ELT pipeline development with Airflow and Prefect enables building sophisticated, scalable data orchestration solutions. Key advantages include:
Apache Airflow Strengths:
- Mature ecosystem with extensive operator library
- Strong community support and enterprise adoption
- Rich UI and monitoring capabilities
- Flexible scheduling and dependency management
Prefect Advantages:
- Modern Python-native design
- Better error handling and retry mechanisms
- Simplified deployment and scaling
- Advanced flow versioning and parameterization
Best Practices for Production:
- Implement comprehensive monitoring and alerting
- Use resource pools for efficient resource management
- Design for fault tolerance with proper retry strategies
- Optimize for performance with parallel processing
- Implement proper data quality validation
- Use infrastructure as code for consistent deployments
- Follow security best practices for credentials and access control
Both platforms provide powerful capabilities for building enterprise-grade data pipelines that can handle complex data transformation requirements while maintaining reliability, observability, and scalability.