Spaces:
Running
Running
import pathway as pw | |
import threading | |
from fastapi import FastAPI, HTTPException | |
import uvicorn | |
import os | |
from exporter import export_loop | |
class ProductSchema(pw.Schema): | |
_id: str | |
name: str | |
price: float | |
description: str | |
category: str | |
stock: int | |
imageUrl: str | |
isNew: bool | |
isActive: bool | |
createdBy: str | |
ratings: dict | |
createdAt: str | |
updatedAt: str | |
promoCopy: str | |
cache = {} | |
cache_lock = threading.Lock() | |
app = FastAPI() | |
def run_pipeline(): | |
products = pw.io.jsonlines.read( | |
"mongo_exports/products.jsonl", | |
schema=ProductSchema, | |
mode="streaming" | |
) | |
transformed = products.select( | |
_id=pw.this._id, | |
name=pw.this.name, | |
price=pw.this.price, | |
description=pw.this.description, | |
category=pw.this.category, | |
stock=pw.this.stock, | |
imageUrl=pw.this.imageUrl, | |
isNew=pw.this.isNew, | |
isActive=pw.this.isActive, | |
createdBy=pw.this.createdBy, | |
ratings=pw.this.ratings, | |
createdAt=pw.this.createdAt, | |
updatedAt=pw.this.updatedAt, | |
promoCopy=pw.this.promoCopy | |
).with_id_from(pw.this._id) | |
def update_cache(key, row, time, is_addition): | |
with cache_lock: | |
if is_addition: | |
cache[str(key)] = row | |
else: | |
cache.pop(str(key), None) | |
pw.io.subscribe(transformed, on_change=update_cache) | |
pw.run() | |
def get_all_products(): | |
with cache_lock: | |
return list(cache.values()) | |
def get_product(product_id: str): | |
with cache_lock: | |
product = cache.get(product_id) | |
if not product: | |
raise HTTPException(status_code=404, detail="Product not found") | |
return product | |
if __name__ == "__main__": | |
threading.Thread(target=export_loop, daemon=True).start() | |
threading.Thread(target=run_pipeline, daemon=True).start() | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |