হোম/Roadmap/Chapter 10.02
Phase 10 · Chapter 10.02

Event-driven AI Systems

Service-গুলো একে অপরকে সরাসরি call না করে event publish করে — অন্যরা subscribe করে react করে। ML pipeline-এর জন্য আদর্শ।

Concept

Why event-driven

  • Producer জানে না কে consume করছে — loose coupling।
  • Burst traffic absorb — queue buffer হিসেবে কাজ করে।
  • Replay possible — event log = audit + retraining data।
  • Multiple consumer একই event use করতে পারে (fraud + recsys + logging)।
Kafka Producer

Event publish থেকে inference trigger

pythonproduction
from confluent_kafka import Producer
import json, uuid

p = Producer({"bootstrap.servers": "kafka:9092"})

def publish_click(user_id: str, item_id: str):
    event = {
        "id": str(uuid.uuid4()),
        "type": "user.click",
        "user_id": user_id,
        "item_id": item_id,
        "ts": time.time(),
    }
    p.produce("user-events", key=user_id, value=json.dumps(event))
    p.poll(0)
Kafka Consumer

Async inference worker

pythonproduction
from confluent_kafka import Consumer
import json, joblib

model = joblib.load("recsys.pkl")
c = Consumer({
    "bootstrap.servers": "kafka:9092",
    "group.id": "recsys-worker",
    "auto.offset.reset": "earliest",
})
c.subscribe(["user-events"])

while True:
    msg = c.poll(1.0)
    if msg is None or msg.error(): continue
    evt = json.loads(msg.value())
    if evt["type"] != "user.click": continue
    recs = model.recommend(evt["user_id"])
    publish("recs.updated", {"user_id": evt["user_id"], "items": recs})
    c.commit(msg)
Stream Processing

Flink / Spark Structured Streaming

pythonproduction
# Spark stream — feature aggregation on the fly
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count

spark = SparkSession.builder.getOrCreate()
events = (spark.readStream.format("kafka")
    .option("subscribe", "user-events").load())

agg = (events.groupBy(window("timestamp", "5 minutes"), "user_id")
       .agg(count("*").alias("clicks_5m")))

agg.writeStream.format("redis").outputMode("update").start()
Patterns

Common event-driven AI flow

  • CQRS: read/write model আলাদা — recsys serve read-optimized store থেকে।
  • Event sourcing: state = event log replay — retraining-এ gold।
  • Saga: long-running ML pipeline (data → train → eval → deploy) orchestrate।
Pitfalls

যা ভুল হয়

  • Schema breaking change → consumer crash (Schema Registry + Avro/Protobuf ব্যবহার করো)।
  • Exactly-once না বুঝে at-least-once → duplicate prediction।
  • Consumer lag monitor না করলে stale recommendation।
  • DLQ (Dead Letter Queue) ছাড়া — bad event পুরো consumer block করে।
Mini Project

Click → Recommendation stream

  1. Kafka topic user-events বানাও।
  2. Producer (FastAPI), Consumer (recsys worker), Redis sink চালাও।
  3. 1000 event/sec inject করে consumer lag monitor করো।
Takeaway

মূল কথা

Event-driven design real-time AI-এর foundation। Sync REST থামাও যেখানে latency tolerance আছে — async event দিয়ে throughput 10x বাড়াও।

← Roadmap-এ ফিরুন
পরবর্তী: Scalable AI Infrastructure Designশীঘ্রই