Phase 10 · Chapter 10.02
Event-driven AI Systems
Service-গুলো একে অপরকে সরাসরি call না করে event publish করে — অন্যরা subscribe করে react করে। ML pipeline-এর জন্য আদর্শ।
Concept
Why event-driven
- Producer জানে না কে consume করছে — loose coupling।
- Burst traffic absorb — queue buffer হিসেবে কাজ করে।
- Replay possible — event log = audit + retraining data।
- Multiple consumer একই event use করতে পারে (fraud + recsys + logging)।
Kafka Producer
Event publish থেকে inference trigger
pythonproduction
from confluent_kafka import Producer
import json, uuid
p = Producer({"bootstrap.servers": "kafka:9092"})
def publish_click(user_id: str, item_id: str):
event = {
"id": str(uuid.uuid4()),
"type": "user.click",
"user_id": user_id,
"item_id": item_id,
"ts": time.time(),
}
p.produce("user-events", key=user_id, value=json.dumps(event))
p.poll(0)Kafka Consumer
Async inference worker
pythonproduction
from confluent_kafka import Consumer
import json, joblib
model = joblib.load("recsys.pkl")
c = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": "recsys-worker",
"auto.offset.reset": "earliest",
})
c.subscribe(["user-events"])
while True:
msg = c.poll(1.0)
if msg is None or msg.error(): continue
evt = json.loads(msg.value())
if evt["type"] != "user.click": continue
recs = model.recommend(evt["user_id"])
publish("recs.updated", {"user_id": evt["user_id"], "items": recs})
c.commit(msg)Stream Processing
Flink / Spark Structured Streaming
pythonproduction
# Spark stream — feature aggregation on the fly
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count
spark = SparkSession.builder.getOrCreate()
events = (spark.readStream.format("kafka")
.option("subscribe", "user-events").load())
agg = (events.groupBy(window("timestamp", "5 minutes"), "user_id")
.agg(count("*").alias("clicks_5m")))
agg.writeStream.format("redis").outputMode("update").start()Patterns
Common event-driven AI flow
- CQRS: read/write model আলাদা — recsys serve read-optimized store থেকে।
- Event sourcing: state = event log replay — retraining-এ gold।
- Saga: long-running ML pipeline (data → train → eval → deploy) orchestrate।
Pitfalls
যা ভুল হয়
- Schema breaking change → consumer crash (Schema Registry + Avro/Protobuf ব্যবহার করো)।
- Exactly-once না বুঝে at-least-once → duplicate prediction।
- Consumer lag monitor না করলে stale recommendation।
- DLQ (Dead Letter Queue) ছাড়া — bad event পুরো consumer block করে।
Mini Project
Click → Recommendation stream
- Kafka topic
user-eventsবানাও। - Producer (FastAPI), Consumer (recsys worker), Redis sink চালাও।
- 1000 event/sec inject করে consumer lag monitor করো।
Takeaway
মূল কথা
Event-driven design real-time AI-এর foundation। Sync REST থামাও যেখানে latency tolerance আছে — async event দিয়ে throughput 10x বাড়াও।
← Roadmap-এ ফিরুন
পরবর্তী: Scalable AI Infrastructure Designশীঘ্রই