হোম/Roadmap/Chapter 11.03
Phase 11 · Chapter 11.03

Advanced: RAG, AI SaaS Backend, Real-time Recommender

Senior-level production projects — যেগুলো resume-এ থাকলে interview-এ আলাদা দাঁড়াবে।

Project A

RAG: Document Q&A SaaS

  • User PDF upload → chunk → embed → store in vector DB।
  • Question → retrieve top-k chunks → LLM answer with citation।
  • Stack: FastAPI + LangChain + Qdrant/Chroma + OpenAI।
  • Multi-tenant: per-user namespace isolation।
RAG Ingest

Chunk + embed pipeline

pythonproduction
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, VectorParams, Distance
import uuid, pypdf

qdrant = QdrantClient(url="http://qdrant:6333")
emb = OpenAIEmbeddings(model="text-embedding-3-small")

def ingest(user_id: str, pdf_path: str):
    text = "".join(p.extract_text() for p in pypdf.PdfReader(pdf_path).pages)
    chunks = RecursiveCharacterTextSplitter(
        chunk_size=800, chunk_overlap=100
    ).split_text(text)
    vectors = emb.embed_documents(chunks)

    collection = f"docs_{user_id}"
    if not qdrant.collection_exists(collection):
        qdrant.create_collection(collection,
            vectors_config=VectorParams(size=1536, distance=Distance.COSINE))

    qdrant.upsert(collection, points=[
        PointStruct(id=str(uuid.uuid4()), vector=v,
                    payload={"text": c, "source": pdf_path})
        for c, v in zip(chunks, vectors)
    ])
RAG Query

Retrieve + answer with citation

pythonproduction
@app.post("/ask")
def ask(req: AskReq):
    qvec = emb.embed_query(req.question)
    hits = qdrant.search(f"docs_{req.user_id}", query_vector=qvec, limit=4)
    context = "\n---\n".join(f"[{i+1}] {h.payload['text']}" for i, h in enumerate(hits))

    prompt = f"""Answer based ONLY on context. Cite [n].
Context:
{context}

Q: {req.question}
A:"""
    answer = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role":"user","content":prompt}],
    ).choices[0].message.content
    return {"answer": answer, "sources": [h.payload["source"] for h in hits]}
Project B

Multi-tenant AI SaaS Backend

  • Auth (Clerk/Supabase) + org/workspace model।
  • Per-tenant rate limit + usage tracking।
  • Stripe metered billing (per token / per call)।
  • Admin dashboard: usage, top users, error rate।
pythonproduction
# usage_tracker.py
from datetime import datetime
import redis
r = redis.Redis()

def track(user_id: str, tokens_in: int, tokens_out: int):
    month = datetime.utcnow().strftime("%Y-%m")
    pipe = r.pipeline()
    pipe.hincrby(f"usage:{user_id}:{month}", "tokens_in", tokens_in)
    pipe.hincrby(f"usage:{user_id}:{month}", "tokens_out", tokens_out)
    pipe.hincrby(f"usage:{user_id}:{month}", "calls", 1)
    pipe.execute()

def check_quota(user_id: str, plan: str) -> bool:
    month = datetime.utcnow().strftime("%Y-%m")
    used = int(r.hget(f"usage:{user_id}:{month}", "tokens_in") or 0)
    return used < PLAN_LIMITS[plan]

# Stripe usage record
stripe.SubscriptionItem.create_usage_record(
    sub_item_id, quantity=tokens_in + tokens_out,
    timestamp=int(time.time()), action="increment",
)
Project C

Real-time Recommender

  • Two-stage: ANN retrieval (FAISS) + GBDT ranker।
  • Kafka event stream → online feature update (Redis)।
  • p99 latency < 50ms target।
  • A/B framework with deterministic user hashing।
pythonproduction
@app.get("/recommend/{user_id}")
async def recommend(user_id: str):
    # 1. Online features from Redis (last 5 click vec avg)
    uvec = await redis.get(f"uvec:{user_id}") or default_vec

    # 2. ANN candidate (top 200)
    candidates = faiss_index.search(uvec.reshape(1, -1), k=200)[1][0]

    # 3. Feature fetch + rank
    feats = feature_store.batch_get(user_id, candidates)
    scores = ranker.predict(feats)

    # 4. Diversify (MMR) + top 20
    top = mmr_rerank(candidates, scores, k=20, lambda_=0.7)

    # 5. Log impression for training
    kafka.send("impressions", {"user": user_id, "items": top, "ts": time.time()})
    return {"items": top}
Infra

Production stack

  • K8s (EKS/GKE) + Helm charts।
  • Postgres (metadata) + Redis (online features) + Qdrant (vectors) + Kafka (events)।
  • Prometheus + Grafana + Loki + Sentry।
  • GitHub Actions → ArgoCD GitOps deployment।
What makes it senior

Beyond just "works"

  • Multi-tenancy isolation (data, compute, billing)।
  • Cost optimization (caching, model routing)।
  • Observability (trace per request, SLO dashboard)।
  • Graceful degradation (LLM down → fallback to cached/cheaper model)।
  • Security: prompt injection defense, PII redaction, audit log।
Pitfalls

Advanced-এ যা পুড়ে

  • RAG retrieval quality খারাপ → answer hallucinate। Hybrid (BM25 + vector) ব্যবহার করো।
  • Vector DB memory blow up — quantize + payload-only index।
  • Stripe usage record API rate limit — batch send।
  • Recsys feedback loop — popular আরো popular হয়। Exploration যোগ করো।
Deliverable

Senior portfolio

  1. 1 SaaS-quality live product (landing + auth + billing)।
  2. System design doc (Excalidraw + decisions + trade-offs)।
  3. Load test report (k6/Locust) — proves the scale claim।
  4. Blog post / talk — "how I built X" — interview gold।
Phase 11 Complete

তুমি যা শিখলে

Beginner → Intermediate → Advanced — তিন stage-এর real-world project। এই তিনটা portfolio-তে থাকলে junior থেকে senior MLOps interview pipeline খুলে যায়। পরবর্তী Phase: Research & Career — roadmap, interview, best practices।