File size: 2,000 Bytes
aeb70ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3d621e5
aeb70ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
540c8b7
aeb70ab
540c8b7
 
 
 
 
 
 
 
 
aeb70ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import pathway as pw
import threading
from fastapi import FastAPI, HTTPException
import uvicorn
import os
from exporter import export_loop

class ProductSchema(pw.Schema):
    _id: str
    name: str
    price: float
    description: str
    category: str
    stock: int
    imageUrl: str
    isNew: bool
    isActive: bool
    createdBy: str
    ratings: dict
    createdAt: str
    updatedAt: str
    promoCopy: str

cache = {}
cache_lock = threading.Lock()
app = FastAPI()

def run_pipeline():
    products = pw.io.jsonlines.read(
        "mongo_exports/products.jsonl",
        schema=ProductSchema,
        mode="streaming"
    )
    transformed = products.select(
        _id=pw.this._id,
        name=pw.this.name,
        price=pw.this.price,
        description=pw.this.description,
        category=pw.this.category,
        stock=pw.this.stock,
        imageUrl=pw.this.imageUrl,
        isNew=pw.this.isNew,
        isActive=pw.this.isActive,
        createdBy=pw.this.createdBy,
        ratings=pw.this.ratings,
        createdAt=pw.this.createdAt,
        updatedAt=pw.this.updatedAt,
        promoCopy=pw.this.promoCopy
    ).with_id_from(pw.this._id)

    def update_cache(key, row, time, is_addition):
        with cache_lock:
            if is_addition:
                cache[str(key)] = row
            else:
                cache.pop(str(key), None)

    pw.io.subscribe(transformed, on_change=update_cache)
    pw.run()

@app.get("/products")
def get_all_products():
    with cache_lock:
        return list(cache.values())

@app.get("/products/{product_id}")
def get_product(product_id: str):
    with cache_lock:
        product = cache.get(product_id)
        if not product:
            raise HTTPException(status_code=404, detail="Product not found")
        return product

if __name__ == "__main__":
    threading.Thread(target=export_loop, daemon=True).start()
    threading.Thread(target=run_pipeline, daemon=True).start()
    uvicorn.run(app, host="0.0.0.0", port=7860)