ETL/ELT Pipelines
ETL (Extract-Transform-Load) and ELT (Extract-Load-Transform) are the two fundamental patterns for moving data from source systems into analytical storage. The distinction lies in where transformation happens - outside or inside the target system.
ETL vs ELT
| Aspect | ETL | ELT |
| Transform location | External engine (Spark, Python) | Inside target DWH (SQL) |
| Best for | Complex transformations, legacy systems | Cloud DWH with strong compute |
| Latency | Higher (extra hop) | Lower |
| Flexibility | More control over transformation | Relies on target SQL capabilities |
| Modern tools | Spark, Airflow + Python | dbt, Snowflake, BigQuery |
Three Steps
- Extract (E): Pull data from sources, validate against specs, stage in intermediate area
- Transform (T): Format conversion, encoding normalization, aggregation, cleansing, deduplication, business logic
- Load (L): Write transformed data + metadata to target system (DWH, data mart, lake)
Data Layers Architecture
Sources -> [STG/Raw] -> [Core/DDS] -> [Marts] -> BI/Analytics
| Layer | Purpose |
| Staging / Raw / ODS | Raw data from sources, minimal transformation, preserves original format. Decouples extraction from transformation |
| Core / DDS (Integration) | Cleansed, integrated, historically tracked data. Data Vault, 3NF, or dimensional modeling applied here |
| Mart / Presentation | Business-ready aggregated views (star schema, flat tables). Each mart serves specific consumers |
Pipeline Processing Modes
| Mode | Latency | Complexity | Best For |
| Batch | Minutes to hours | Low | BI, reporting, historical analysis |
| Micro-batch | Seconds to minutes | Medium | Near-real-time dashboards |
| Stream | Milliseconds | High | Fraud detection, monitoring, real-time features |
Lambda Architecture
- Batch layer: complete, accurate data with higher latency
- Speed layer: real-time approximations
- Serving layer: merges both for queries
Patterns
Typical Spark ETL
from pyspark.sql import SparkSession, functions as f
spark = SparkSession.builder.appName("etl").getOrCreate()
# Extract
df = spark.read.csv("s3a://bucket/raw/*.csv", header=True, schema=schema)
# Transform
result = (df
.filter(f.col("vendor_id").isNotNull())
.groupBy("vendor_id", f.to_date("pickup_datetime").alias("dt"))
.agg(
f.sum("total_amount").alias("sum_amount"),
f.avg("tip_amount").alias("avg_tips")
)
)
# Load
result.write.mode("overwrite").parquet("s3a://bucket/mart/daily_summary/")
Write Modes
overwrite - replace existing data append - add to existing data ignore - skip if data exists error (default) - fail if data exists
Python Data Migration Script
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine('postgresql://user:pass@host:port/db')
df = pd.read_sql("SELECT * FROM source WHERE dt BETWEEN %s AND %s", engine, params=[start_dt, end_dt])
df.to_sql('target_table', engine, if_exists='append', index=False)
Key Facts
- Idempotency is critical: re-running a pipeline with same inputs must produce same outputs. Use
overwrite mode or delete-before-insert - Incremental loads process only new/changed data using watermarks, CDC, or last-modified timestamps
- Early filtering pushes filters close to source to reduce data volume
- Schema-first: define explicit schemas rather than relying on inference
- Partitioned writes (
partitionBy("year", "month")) enable efficient downstream reads
Gotchas
- Airflow
schedule_interval defines interval between runs, not run time. DAG with @daily and start_date=2024-01-01 first runs on 2024-01-02 catchup=True (default) backfills all missed intervals since start_date - can flood the system - SCD2 joins across satellites with different change frequencies cause Cartesian explosion on date ranges
- Small files problem: too many partitions create tiny files, degrading HDFS/S3 performance
See Also