Phase 6 · Chapter 6.02
ETL for ML
Data engineer ETL বলে data warehouse-এ load করে। ML engineer ETL বলে feature-ready data বানায়। Goal ভিন্ন, principle এক।
ETL vs ELT
দুটো শব্দ, একটা order swap
- ETL: Extract → Transform → Load (warehouse-এর বাইরে transform)।
- ELT: Extract → Load → Transform (warehouse-এর ভেতরে SQL-এ transform — Snowflake, BigQuery, dbt era)।
আধুনিক stack ELT favor করে — compute warehouse-এ cheap, raw data history থাকে।
The 3 Steps
ML-এর চোখে
- Extract: Postgres, Kafka, S3, third-party API থেকে pull।
- Transform: clean (null, dedup), enrich (join), aggregate (rolling window), feature engineer।
- Load: Feature store, training table, model cache-এ write।
Example
User-event ETL → training table
pythonproduction
import pandas as pd
from sqlalchemy import create_engine
eng = create_engine(os.environ["WAREHOUSE_URL"])
# Extract
events = pd.read_sql("""
SELECT user_id, event_type, amount, ts
FROM raw.events
WHERE ts >= NOW() - INTERVAL '30 days'
""", eng)
# Transform
events = events.dropna(subset=["user_id", "amount"])
features = (
events
.groupby("user_id")
.agg(
event_count=("event_type", "count"),
total_amount=("amount", "sum"),
avg_amount=("amount", "mean"),
last_seen=("ts", "max"),
)
.reset_index()
)
features["days_since_last"] = (pd.Timestamp.utcnow() - features["last_seen"]).dt.days
# Load
features.to_sql("ml.user_features_v1", eng, if_exists="replace", index=False)dbt
SQL-first transform
sqlproduction
-- models/marts/user_features.sql
{{ config(materialized='table') }}
with events as (
select user_id, event_type, amount, ts
from {{ ref('stg_events') }}
where ts >= dateadd(day, -30, current_timestamp)
)
select
user_id,
count(*) as event_count,
sum(amount) as total_amount,
avg(amount) as avg_amount,
max(ts) as last_seen,
datediff('day', max(ts), current_timestamp) as days_since_last
from events
group by user_idML-Specific Concerns
Generic ETL-এ যা থাকে না
- Point-in-time correctness: training-এ "future leak" এড়াও — feature value as-of event time।
- Train/serve skew: training-এ pandas, serving-এ Spark — same logic দুই জায়গায় match করতে হবে।
- Backfill: নতুন feature বের করলে পুরোনো data-তে ও apply করতে হয়।
- Schema evolution: column যোগ/বাদ হলে downstream pipeline ভাঙে।
Pitfalls
যা প্রায়ই pipeline নষ্ট করে
- Timezone — UTC store করো, display-এ convert।
- Type coercion — float ভেবে string এসেছে, model NaN।
- Duplicate join — N×M row explosion।
- Source schema change — contract test না থাকলে silent break।
Mini Project
dbt + Postgres mini-warehouse
- Postgres-এ raw events table seed করো।
- dbt model লিখে user_features বানাও।
dbt testদিয়ে not-null + unique check।- Airflow/Prefect দিয়ে daily run schedule।
Takeaway
মনে রাখো
ETL/ELT-তে success = correctness + lineage + tests। Speed পরে।