Rivalcoder commited on
Commit
aeb70ab
·
1 Parent(s): 18cf3dc
Files changed (4) hide show
  1. Dockerfile +12 -0
  2. app.py +68 -0
  3. exporter.py +31 -0
  4. requirements.txt +5 -0
Dockerfile ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.10
2
+
3
+ WORKDIR /app
4
+
5
+ COPY requirements.txt requirements.txt
6
+ RUN pip install --no-cache-dir -r requirements.txt
7
+
8
+ COPY . .
9
+
10
+ EXPOSE 7860
11
+
12
+ CMD ["python", "app.py"]
app.py ADDED
@@ -0,0 +1,68 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pathway as pw
2
+ import threading
3
+ from fastapi import FastAPI, HTTPException
4
+ import uvicorn
5
+ import os
6
+ from exporter import export_loop
7
+
8
+ class ProductSchema(pw.Schema):
9
+ _id: str
10
+ name: str
11
+ price: float
12
+ description: str
13
+ category: str
14
+ stock: int
15
+ imageUrl: str
16
+ isNew: bool
17
+ isActive: bool
18
+ createdBy: str
19
+ ratings: dict
20
+ createdAt: str
21
+ updatedAt: str
22
+ promoCopy: str
23
+
24
+ cache = {}
25
+ cache_lock = threading.Lock()
26
+ app = FastAPI()
27
+
28
+ def run_pipeline():
29
+ products = pw.io.jsonlines.read(
30
+ "mongo_exports/products.jsonl",
31
+ schema=ProductSchema,
32
+ mode="streaming"
33
+ )
34
+ transformed = products.select(
35
+ _id=pw.this._id,
36
+ name=pw.this.name,
37
+ price=pw.this.price,
38
+ category=pw.this.category,
39
+ promo=pw.this.promoCopy
40
+ ).with_id_from(pw.this._id)
41
+
42
+ def update_cache(key, row, time, is_addition):
43
+ with cache_lock:
44
+ if is_addition:
45
+ cache[str(key)] = row
46
+ else:
47
+ cache.pop(str(key), None)
48
+
49
+ pw.io.subscribe(transformed, on_change=update_cache)
50
+ pw.run()
51
+
52
+ @app.get("/products")
53
+ def get_all_products():
54
+ with cache_lock:
55
+ return list(cache.values())
56
+
57
+ @app.get("/products/{product_id}")
58
+ def get_product(product_id: str):
59
+ with cache_lock:
60
+ product = cache.get(product_id)
61
+ if not product:
62
+ raise HTTPException(status_code=404, detail="Product not found")
63
+ return product
64
+
65
+ if __name__ == "__main__":
66
+ threading.Thread(target=export_loop, daemon=True).start()
67
+ threading.Thread(target=run_pipeline, daemon=True).start()
68
+ uvicorn.run(app, host="0.0.0.0", port=7860)
exporter.py ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pymongo import MongoClient
2
+ from bson import ObjectId
3
+ from datetime import datetime
4
+ import json
5
+ import time
6
+ import os
7
+
8
+ client = MongoClient("mongodb+srv://rivalcoder01:PurmpaRXBSThyuQA@cluster0.f7zziod.mongodb.net/?retryWrites=true&w=majority")
9
+ collection = client["test"]["products"]
10
+
11
+ EXPORT_DIR = "mongo_exports"
12
+ os.makedirs(EXPORT_DIR, exist_ok=True)
13
+
14
+ def clean(doc):
15
+ for k, v in doc.items():
16
+ if isinstance(v, ObjectId):
17
+ doc[k] = str(v)
18
+ elif isinstance(v, datetime):
19
+ doc[k] = v.isoformat()
20
+ return doc
21
+
22
+ def export_loop():
23
+ while True:
24
+ docs = list(collection.find())
25
+ fname = f"{EXPORT_DIR}/products.jsonl"
26
+ with open(fname, "w") as f:
27
+ for doc in docs:
28
+ doc = clean(doc)
29
+ f.write(json.dumps(doc) + "\n")
30
+ print("[Mongo Exporter] Wrote updated products.jsonl")
31
+ time.sleep(5)
requirements.txt ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ fastapi
2
+ uvicorn
3
+ pymongo
4
+ pathway
5
+ bson