| from datetime import datetime |
| from pathlib import Path |
| import csv |
| import json |
| import re |
| from typing import Dict, List |
|
|
| BASE = Path(__file__).parent |
| LOGS = BASE / "logs" |
| DATA = BASE / "data" |
| LOGS.mkdir(exist_ok=True) |
|
|
| CATALOG = json.loads((DATA / "catalog.json").read_text(encoding="utf-8")) |
| PRICE_RULES = json.loads((DATA / "price_rules.json").read_text(encoding="utf-8")) |
| IKEA_CSV = DATA / "IKEA_SA_Furniture_Web_Scrapings_sss.csv" |
| SAR_TO_USD = 0.2667 |
|
|
|
|
| def _to_float(value: str): |
| if value is None: |
| return None |
| value = value.strip() |
| if not value or value.lower().startswith("no "): |
| return None |
| try: |
| return float(value) |
| except ValueError: |
| return None |
|
|
|
|
| def _to_bool(value: str): |
| if value is None: |
| return None |
| value = value.strip().lower() |
| if value in {"true", "yes", "y", "1"}: |
| return True |
| if value in {"false", "no", "n", "0"}: |
| return False |
| return None |
|
|
|
|
| def _load_ikea_catalog() -> List[Dict]: |
| if not IKEA_CSV.exists(): |
| fallback = BASE / "IKEA_SA_Furniture_Web_Scrapings_sss.csv" |
| if not fallback.exists(): |
| return [] |
| target = fallback |
| else: |
| target = IKEA_CSV |
|
|
| items: List[Dict] = [] |
| with target.open("r", encoding="utf-8", newline="") as fh: |
| reader = csv.DictReader(fh) |
| for row in reader: |
| item_id = (row.get("item_id") or "").strip() |
| name = (row.get("name") or "").strip() |
| if not item_id or not name: |
| continue |
| category = (row.get("category") or "").strip() |
| price_sar = _to_float(row.get("price")) |
| price_usd = round(price_sar * SAR_TO_USD, 2) if price_sar is not None else None |
| width = _to_float(row.get("width")) |
| height = _to_float(row.get("height")) |
| depth = _to_float(row.get("depth")) |
| short_description = (row.get("short_description") or "").strip() |
| if short_description: |
| short_description = re.sub(r"\s+", " ", short_description) |
| other_colors = (row.get("other_colors") or "").strip() |
| if other_colors.lower() in {"no", "n/a"}: |
| other_colors = "" |
| sellable = _to_bool(row.get("sellable_online")) |
| link = (row.get("link") or "").strip() |
| designer = (row.get("designer") or "").strip() |
|
|
| searchable = " ".join( |
| filter( |
| None, |
| [ |
| item_id.lower(), |
| name.lower(), |
| category.lower(), |
| short_description.lower(), |
| other_colors.lower(), |
| designer.lower(), |
| ], |
| ) |
| ) |
|
|
| items.append( |
| { |
| "item_id": item_id, |
| "name": name, |
| "category": category, |
| "price_usd": price_usd, |
| "price_currency": "USD" if price_usd is not None else None, |
| "price_note": ( |
| f"Converted from SAR at 1 SAR = {SAR_TO_USD:.4f} USD" |
| if price_usd is not None |
| else None |
| ), |
| "sellable_online": sellable, |
| "link": link, |
| "other_colors": other_colors, |
| "short_description": short_description, |
| "designer": designer, |
| "dimensions_cm": { |
| k: v |
| for k, v in {"width": width, "height": height, "depth": depth}.items() |
| if v is not None |
| }, |
| "_search": searchable, |
| } |
| ) |
| return items |
|
|
|
|
| IKEA_ITEMS = _load_ikea_catalog() |
|
|
|
|
| def _copy_public_item(item: Dict) -> Dict: |
| return {k: v for k, v in item.items() if not k.startswith("_")} |
|
|
|
|
| def _search_ikea_items(query: str, limit: int = 5) -> List[Dict]: |
| if not IKEA_ITEMS: |
| return [] |
| q = query.strip().lower() |
| if not q: |
| return [] |
| words = [w for w in re.split(r"\W+", q) if w] |
| scored: Dict[str, List] = {} |
| for item in IKEA_ITEMS: |
| score = 0 |
| if q == item["item_id"].lower(): |
| score += 10 |
| if q in item["_search"]: |
| score += 3 |
| if words: |
| score += sum(1 for w in words if w and w in item["_search"]) |
| if score > 0: |
| existing = scored.get(item["item_id"]) |
| if existing is None or score > existing[0]: |
| scored[item["item_id"]] = [score, item] |
| if not scored: |
| return [] |
| top = sorted(scored.values(), key=lambda pair: (-pair[0], pair[1]["name"])) |
| return [_copy_public_item(item) for _, item in top[:limit]] |
|
|
|
|
| def record_customer_interest(email: str, name: str, message: str): |
| entry = {"ts": datetime.utcnow().isoformat(), "email": email, "name": name, "message": message} |
| out = LOGS / "leads.jsonl" |
| with out.open("a", encoding="utf-8") as fh: |
| fh.write(json.dumps(entry) + "\n") |
| print(f"[LEAD] {entry}") |
| return {"ok": True, "msg": "Thanks! We'll follow up soon."} |
|
|
|
|
| def record_feedback(question: str): |
| entry = {"ts": datetime.utcnow().isoformat(), "question": question} |
| out = LOGS / "feedback.jsonl" |
| with out.open("a", encoding="utf-8") as fh: |
| fh.write(json.dumps(entry) + "\n") |
| print(f"[FEEDBACK] {entry}") |
| return {"ok": True, "msg": "Noted. We'll improve our answers."} |
|
|
|
|
| def record_service_feedback( |
| email: str, |
| name: str, |
| service_type: str, |
| satisfaction: str, |
| comments: str = "", |
| ): |
| entry = { |
| "ts": datetime.utcnow().isoformat(), |
| "email": email, |
| "name": name, |
| "service_type": service_type, |
| "satisfaction": satisfaction, |
| "comments": comments or "", |
| } |
| out = LOGS / "service_feedback.jsonl" |
| with out.open("a", encoding="utf-8") as fh: |
| fh.write(json.dumps(entry) + "\n") |
| print(f"[SERVICE_FEEDBACK] {entry}") |
| return {"ok": True, "msg": "Thanks for the feedback! We'll share it with the team."} |
|
|
|
|
| def lookup_product(query: str): |
| q = (query or "").strip() |
| if not q: |
| return {"ok": False, "msg": "Please provide a product keyword, SKU, or IKEA item ID."} |
| q_lower = q.lower() |
| result = {"ok": True, "query": q} |
|
|
| sku_match = next((item for item in CATALOG if item.get("sku", "").lower() == q_lower), None) |
| if sku_match: |
| result["catalog_match"] = sku_match |
|
|
| name_hits = [ |
| item |
| for item in CATALOG |
| if q_lower in item.get("name", "").lower() |
| or any(q_lower in (opt or "").lower() for opt in item.get("color_options", [])) |
| ] |
| if name_hits and not sku_match: |
| result["catalog_results"] = name_hits |
|
|
| category_hits = [item for item in CATALOG if item.get("category", "").lower() == q_lower] |
| if category_hits: |
| result["catalog_category"] = category_hits |
|
|
| ikea_hits = _search_ikea_items(q_lower) |
| if ikea_hits: |
| result["ikea_results"] = ikea_hits |
|
|
| if len(result) == 2: |
| return {"ok": False, "msg": f"No products found for '{q}'."} |
| return result |
|
|
|
|
| def estimate_repair(issue: str, material: str = "any", size_category: str = "medium"): |
| issue = issue.strip().lower() |
| material = (material or "any").strip().lower() |
| size = (size_category or "medium").strip().lower() |
| rules = PRICE_RULES.get(issue) |
| if not rules: |
| return {"ok": False, "msg": f"No pricing rule for issue '{issue}'."} |
| if material in rules: |
| bucket = rules[material] |
| elif "any" in rules: |
| bucket = rules["any"] |
| else: |
| bucket = next(iter(rules.values())) |
| if size not in bucket: |
| return {"ok": False, "msg": f"Unsupported size_category '{size}'. Use small/medium/large."} |
| min_p, max_p, min_d, max_d = bucket[size] |
| tiers = { |
| "budget": {"price": round(min_p * 0.9), "days": [min_d, max(min_d, min_d + 1)]}, |
| "standard": {"price": round((min_p + max_p) / 2), "days": [min_d, max_d]}, |
| "rush": {"price": round(max_p * 1.25), "days": [max(1, min_d - 1), max(1, max_d - 1)]}, |
| } |
| return {"ok": True, "issue": issue, "material": material, "size": size, "estimate": tiers} |
|
|