pathway / app.py
Rivalcoder
[Edit]
540c8b7
import pathway as pw
import threading
from fastapi import FastAPI, HTTPException
import uvicorn
import os
from exporter import export_loop
class ProductSchema(pw.Schema):
_id: str
name: str
price: float
description: str
category: str
stock: int
imageUrl: str
isNew: bool
isActive: bool
createdBy: str
ratings: dict
createdAt: str
updatedAt: str
promoCopy: str
cache = {}
cache_lock = threading.Lock()
app = FastAPI()
def run_pipeline():
products = pw.io.jsonlines.read(
"mongo_exports/products.jsonl",
schema=ProductSchema,
mode="streaming"
)
transformed = products.select(
_id=pw.this._id,
name=pw.this.name,
price=pw.this.price,
description=pw.this.description,
category=pw.this.category,
stock=pw.this.stock,
imageUrl=pw.this.imageUrl,
isNew=pw.this.isNew,
isActive=pw.this.isActive,
createdBy=pw.this.createdBy,
ratings=pw.this.ratings,
createdAt=pw.this.createdAt,
updatedAt=pw.this.updatedAt,
promoCopy=pw.this.promoCopy
).with_id_from(pw.this._id)
def update_cache(key, row, time, is_addition):
with cache_lock:
if is_addition:
cache[str(key)] = row
else:
cache.pop(str(key), None)
pw.io.subscribe(transformed, on_change=update_cache)
pw.run()
@app.get("/products")
def get_all_products():
with cache_lock:
return list(cache.values())
@app.get("/products/{product_id}")
def get_product(product_id: str):
with cache_lock:
product = cache.get(product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
if __name__ == "__main__":
threading.Thread(target=export_loop, daemon=True).start()
threading.Thread(target=run_pipeline, daemon=True).start()
uvicorn.run(app, host="0.0.0.0", port=7860)