Phase 6 · Chapter 6.01
Data Pipelines
Model যত ভালোই হোক, garbage data এলে garbage prediction। Pipeline = data-র জন্য factory assembly line।
Why
Notebook থেকে production-এ যাওয়ার দেয়াল
Notebook-এ pd.read_csv চলে কারণ data fixed। Production-এ প্রতিদিন নতুন data আসে — schedule, retry, dependency, monitoring সব দরকার। সেটাই pipeline।
Modes
Batch vs Streaming
- Batch: ঘণ্টায়/দিনে এক বার বড় chunk process। Airflow, Spark। Latency বেশি, throughput বেশি।
- Streaming: Event এলেই process। Kafka, Flink। Latency কম, complexity বেশি।
- Micro-batch: Mid-ground — Spark Structured Streaming।
DAG
Directed Acyclic Graph
Pipeline = task-এর graph। Edge = dependency, cycle নেই।
textproduction
extract_users ──┐
├──> join ──> features ──> train ──> deploy
extract_events ─┘ │
└──> validateAirflow
Most-used orchestrator
pythonproduction
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
"iris_training",
start_date=datetime(2025, 1, 1),
schedule="@daily",
catchup=False,
default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
) as dag:
extract = PythonOperator(task_id="extract", python_callable=extract_data)
clean = PythonOperator(task_id="clean", python_callable=clean_data)
train = PythonOperator(task_id="train", python_callable=train_model)
deploy = PythonOperator(task_id="deploy", python_callable=deploy_model)
extract >> clean >> train >> deployPrefect
Pythonic alternative
pythonproduction
from prefect import flow, task
@task(retries=3)
def extract(): ...
@task
def train(df): ...
@flow(name="iris-daily")
def pipeline():
df = extract()
train(df)
if __name__ == "__main__":
pipeline.serve(name="iris", cron="0 2 * * *")Choosing
Tool comparison
- Airflow: Mature, huge ecosystem, heavy infra।
- Prefect: Pythonic, dynamic DAG, modern UI।
- Dagster: Asset-centric, type-safe, data-aware।
- Kubeflow Pipelines: k8s-native, ML-first।
Pitfalls
যা প্রতিদিন pipeline ভাঙে
- Idempotency নেই — retry duplicate data বানায়।
- Timezone mismatch — UTC vs local।
- Silent failures — alert নেই।
- Task এ data pass করা XCom দিয়ে (large payload) — file/S3 use করো।
Practice
মিনি pipeline
- Local Prefect দিয়ে 3-task DAG (extract → clean → save)।
- Retry policy + logging যোগ করো।
- Cron schedule দিয়ে চালু রাখো ১ ঘণ্টা।
Takeaway
মনে রাখো
Pipeline = schedule + retry + lineage + alert। Tool secondary, pattern primary।