Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| """app.ipynb | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1iPAjeI3M04kA13lYenlROS96tUeCYakB | |
| """ | |
| import os, re, json, math, random, pickle, joblib | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| from datetime import datetime | |
| from zoneinfo import ZoneInfo | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| from sentence_transformers import SentenceTransformer, util | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| AutoModelForTokenClassification, | |
| pipeline, | |
| ) | |
| from huggingface_hub import snapshot_download | |
| """Paths""" | |
| try: | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| except NameError: | |
| BASE_DIR = os.getcwd() | |
| # HuggingFace Model Repos | |
| INTENT_REPO = "Youmnaaaa/intent-arabert-ff" | |
| ENTITY_REPO = "Youmnaaaa/entity-hybrid-ff" | |
| SEMANTIC_REPO = "Youmnaaaa/semantic-search-ff" | |
| # ملف الأماكن جوا الـ Space | |
| PLACES_FILE = os.path.join(BASE_DIR, "beni_suef_100_places_v5ff.xlsx") | |
| intent_tokenizer = intent_model = label_encoder = id2intent = None | |
| ner_pipeline = label2id = id2label = None | |
| semantic_model = corpus_df = corpus_embeddings = places_df = None | |
| SESSIONS: dict = {} | |
| def clean_text(text): | |
| text = str(text).strip().lower() | |
| text = re.sub(r"ـ+", "", text) | |
| for old, new in [("[إأآا]","ا"),("ى","ي"),("ة","ه"),("ؤ","و"),("ئ","ي")]: | |
| text = re.sub(old, new, text) | |
| text = re.sub(r"[^\w\s]", " ", text) | |
| return re.sub(r"\s+", " ", text).strip() | |
| def norm(text): | |
| text = str(text).strip().lower() | |
| text = re.sub(r"ـ+", "", text) | |
| for old, new in [("[إأآا]","ا"),("ى","ي"),("ة","ه"),("ؤ","و"),("ئ","ي")]: | |
| text = re.sub(old, new, text) | |
| for old, new in [("صباحًا","ص"),("صباحا","ص"),("مساءً","م"),("مساءا","م"), | |
| ("ليلًا","م"),("ليلا","م"),("إلى","-"),("الى","-"),("حتى","-"), | |
| ("–","-"),("—","-")]: | |
| text = text.replace(old, new) | |
| return re.sub(r"\s+", " ", text).strip() | |
| # INTENT MAPS | |
| SEARCH_INTENTS = {"nearest_restaurant","nearest_pharmacy","nearest_cafe", | |
| "nearest_supermarket","housing_search","recommend_place", | |
| "open_now","place_details"} | |
| STATIC_INTENTS = {"greeting","thanks","goodbye","confirm","deny"} | |
| INTENT_TO_CATEGORY = { | |
| "nearest_restaurant":"restaurant","nearest_pharmacy":"pharmacy", | |
| "nearest_cafe":"cafe","nearest_supermarket":"supermarket", | |
| "housing_search":"housing", | |
| } | |
| INTENT_TEMPLATE_MAP = { | |
| "nearest_restaurant":"find_restaurant","nearest_pharmacy":"find_pharmacy", | |
| "nearest_cafe":"find_cafe","nearest_supermarket":"find_supermarket", | |
| "housing_search":"find_housing","recommend_place":"find_restaurant", | |
| "open_now":"find_restaurant","place_details":"find_restaurant", | |
| "greeting":"greeting","thanks":"thanks","goodbye":"goodbye", | |
| "confirm":"clarification","deny":"clarification","fallback":"fallback", | |
| } | |
| ENTITY_FIELD_MAP = { | |
| "location":"location","place_type":"category","cuisine_or_item":"sub_category", | |
| "food_type":"sub_category","price":"price","price_range":"price", | |
| "category":"category","sub_category":"sub_category","facility_type":"category", | |
| "housing_type":"category","status":"status","time":"time", | |
| } | |
| KEYWORD_OVERRIDE = { | |
| "goodbye": ["مع السلامة","مع السلامه","باي","وداعا","bye","goodbye","تصبح على خير", | |
| "في امان الله","الله يسلمك","سلامتك"], | |
| "greeting":["السلام عليكم","وعليكم السلام","اهلا","أهلا","هلا","هلو","مرحبا","مرحباً", | |
| "صباح الخير","مساء الخير","هاي","hi","hello","صباح","مساء"], | |
| "thanks": ["شكرا","شكراً","تسلم","يسلمو","ممنون","مشكور","thanks","thank","الف شكر"], | |
| } | |
| CATEGORY_KEYWORDS = { | |
| "restaurant":["مطعم","اكل","وجبات","مشويات","كباب","شاورما","كريب","برجر","سمك","فرايد"], | |
| "pharmacy": ["صيدليه","صيدلية","دوا","ادويه","دواء"], | |
| "cafe": ["كافيه","كوفي","قهوه","قهوة","كافيتيريا"], | |
| "supermarket":["سوبرماركت","ماركت","بقاله","هايبر"], | |
| "housing": ["شقه","شقة","ايجار","إيجار","فندق","هوستل","سكن"], | |
| } | |
| _CAT_MAP = { | |
| "مطعم":"restaurant","مطاعم":"restaurant","طعام":"restaurant","اكل":"restaurant", | |
| "صيدليه":"pharmacy","صيدلية":"pharmacy","صيدله":"pharmacy","دواء":"pharmacy","دوا":"pharmacy", | |
| "كافيه":"cafe","كافية":"cafe","كوفي":"cafe","قهوه":"cafe","قهوة":"cafe","كافيتيريا":"cafe", | |
| "سوبرماركت":"supermarket","ماركت":"supermarket","بقاله":"supermarket","بقالة":"supermarket","هايبر":"supermarket", | |
| "شقه":"housing","شقة":"housing","ايجار":"housing","إيجار":"housing", | |
| "فندق":"housing","سكن":"housing","هوستل":"housing", | |
| } | |
| def normalize_category(cat): | |
| if not cat: return cat | |
| cat_s = str(cat).strip() | |
| if cat_s in ("restaurant","pharmacy","cafe","supermarket","housing"): | |
| return cat_s | |
| if cat_s in _CAT_MAP: | |
| return _CAT_MAP[cat_s] | |
| for ar, en in _CAT_MAP.items(): | |
| if ar in cat_s or cat_s in ar: | |
| return en | |
| return cat_s | |
| CLARIFICATION_Q = { | |
| "nearest_restaurant":"أي نوع أكل؟ مشويات، شاورما، كريب، برجر؟", | |
| "nearest_pharmacy":"في أي منطقة بتدور على صيدلية؟", | |
| "nearest_cafe":"في أي منطقة بتدور على كافيه؟", | |
| "nearest_supermarket":"في أي منطقة بتدور على ماركت؟", | |
| "housing_search":"بتدور على إيه — شقة إيجار، فندق؟ وفين؟", | |
| } | |
| OUT_OF_SCOPE_KW = ["الجو","طقس","درجه","كوره","كرة","أهلي","زمالك","مباريات", | |
| "سياسه","سياسة","أخبار","رصيد","بنك","تحويل","امتحان","مدرسه", | |
| "جامعه","وظيفه","برمجه","كود","python","java","رياضيات","ترجمه","translate"] | |
| NEXT_WORDS = ["تاني","غيره","غيرها","بديل","حاجة تانية","مش عاجبني","فيه تاني","عايز غيره"] | |
| DETAIL_WORDS = ["بيفتح","بتفتح","مواعيده","مواعيدها","امتى","امتي","عنوانه","عنوانها", | |
| "تليفونه","تليفونها","رقمه","رقمها","تقييمه","تقييمها","سعره","سعرها"] | |
| REF_WORDS = ["هو","هي","ده","دي","المكان ده"] | |
| _LOC_CUES = ["الحي","بني سويف","الاباصيري","الكورنيش","مقبل","الزراعيين", | |
| "صلاح سالم","شرق النيل","سيتي سنتر","عرابي","الروضه"] | |
| # HELPER FUNCTIONS | |
| def apply_keyword_override(text): | |
| t = norm(text); tw = set(t.split()) | |
| for intent, kws in KEYWORD_OVERRIDE.items(): | |
| for k in sorted(kws, key=len, reverse=True): | |
| kn = norm(k) | |
| if (" " in kn and kn in t) or (kn in tw): return intent | |
| return None | |
| def get_template_key(intent, category=None): | |
| if category: | |
| k = {"restaurant":"find_restaurant","pharmacy":"find_pharmacy", | |
| "cafe":"find_cafe","supermarket":"find_supermarket", | |
| "housing":"find_housing"}.get(category) | |
| if k: return k | |
| return INTENT_TEMPLATE_MAP.get(intent, "fallback") | |
| def infer_category(query): | |
| q = norm(query) | |
| for cat, words in CATEGORY_KEYWORDS.items(): | |
| if any(norm(w) in q for w in words): return cat | |
| return None | |
| def is_out_of_scope(text): | |
| t = norm(text) | |
| return any(norm(k) in t for k in OUT_OF_SCOPE_KW) | |
| def detect_ref_type(text): | |
| t = norm(text); tw = set(t.split()) | |
| if any(norm(w) in t for w in NEXT_WORDS): return "next" | |
| if any(norm(w) in t for w in DETAIL_WORDS): return "detail" | |
| for w in REF_WORDS: | |
| wn = norm(w) | |
| if (" " in wn and wn in t) or (wn in tw): return "reference" | |
| return "new" | |
| def _loc_continuation(text): | |
| t = norm(text); words = t.split() | |
| if len(words) <= 4 and any(norm(c) in t for c in _LOC_CUES): return True | |
| return bool(words and words[0] == "في") | |
| def normalize_rating(r): | |
| try: | |
| r = float(r) | |
| return round(r/2, 1) if r > 5 else round(r, 1) if r > 0 else 0.0 | |
| except: return 0.0 | |
| # TIME UTILS | |
| def get_cairo_now(): | |
| return datetime.now(ZoneInfo("Africa/Cairo")) | |
| def parse_time(token): | |
| token = norm(token).replace(" ", "") | |
| m = re.match(r"^(\d{1,2})(?::(\d{1,2}))?(ص|م|ظهر)?$", token) | |
| if not m: return None | |
| h = int(m.group(1)); mn = int(m.group(2)) if m.group(2) else 0; suf = m.group(3) | |
| if not (0 <= mn <= 59): return None | |
| if suf == "ص": | |
| if h == 12: h = 0 | |
| elif not (1 <= h <= 11): return None | |
| elif suf in ("م","ظهر"): | |
| if h != 12 and 1 <= h <= 11: h += 12 | |
| else: | |
| if h == 24: h = 0 | |
| elif not (0 <= h <= 23): return None | |
| return f"{h:02d}:{mn:02d}" | |
| def check_open_now(opening_hours_str): | |
| if not opening_hours_str or str(opening_hours_str).strip() in ("","nan","none"): return None | |
| text = norm(str(opening_hours_str)) | |
| if any(k in text for k in ["24","always","طول اليوم","24/7"]): return 1 | |
| sep = re.search(r"(.+?)\s*-\s*(.+)", text) | |
| if not sep: return None | |
| t1 = parse_time(sep.group(1).strip()); t2 = parse_time(sep.group(2).strip()) | |
| if not t1 or not t2: return None | |
| now_t = f"{get_cairo_now().hour:02d}:{get_cairo_now().minute:02d}" | |
| if t1 <= t2: return 1 if t1 <= now_t <= t2 else 0 | |
| return 1 if (now_t >= t1 or now_t <= t2) else 0 | |
| # SEARCH + FILTER + RANK | |
| def semantic_candidates(query, top_k=20): | |
| q_emb = semantic_model.encode(clean_text(query), convert_to_tensor=True) | |
| scores = util.cos_sim(q_emb, corpus_embeddings)[0] | |
| top_k = min(top_k, len(corpus_df)) | |
| top_r = torch.topk(scores, k=top_k) | |
| res = corpus_df.iloc[top_r.indices.cpu().numpy()].copy() | |
| res["semantic_score"] = top_r.values.cpu().numpy() | |
| keep = [c for c in ["place_id","doc_id","name","category","sub_category","location", | |
| "address","price_range","opening_hours","description","semantic_score"] | |
| if c in res.columns] | |
| return res[keep].reset_index(drop=True) | |
| def merge_places(df): | |
| extra = [c for c in ["lat","lon","rating","phone","social_media","status", | |
| "category_clean","sub_category_clean","location_clean", | |
| "address_clean","price_range_clean","search_text_clean"] | |
| if c in places_df.columns] | |
| slim = places_df[["place_id"] + extra].copy() | |
| return df.merge(slim, on="place_id", how="left") | |
| def apply_filters(df, query, category=None, sub_category=None, location=None, | |
| price_range=None, open_now_only=False, min_rating=None): | |
| f = df.copy() | |
| if category: f = f[f["category_clean"].astype(str).str.contains(re.escape(clean_text(category)), na=False)] | |
| if sub_category: f = f[f["sub_category_clean"].astype(str).str.contains(re.escape(clean_text(sub_category)), na=False)] | |
| if location: f = f[f["location_clean"].astype(str).str.contains(re.escape(clean_text(location)), na=False)] | |
| if price_range: f = f[f["price_range_clean"].astype(str).str.contains(re.escape(clean_text(price_range)), na=False)] | |
| f["open_now"] = f["opening_hours"].apply(check_open_now) | |
| f["rating_num"] = pd.to_numeric(f.get("rating", pd.Series()), errors="coerce").fillna(0) | |
| f["rating_norm"] = f["rating_num"].apply(normalize_rating) | |
| f["rating_score"] = f["rating_norm"] / 5.0 | |
| f["open_score"] = f["open_now"].apply(lambda x: 1.0 if x==1 else (0.5 if x is None else 0.0)) | |
| if open_now_only: f = f[f["open_now"] == 1] | |
| if min_rating: f = f[f["rating_norm"] >= min_rating] | |
| return f | |
| def haversine(lat1, lon1, lat2, lon2): | |
| R=6371; p=math.pi/180 | |
| a = (math.sin((lat2-lat1)*p/2)**2 + math.cos(lat1*p)*math.cos(lat2*p)*math.sin((lon2-lon1)*p/2)**2) | |
| return 2*R*math.asin(math.sqrt(a)) | |
| def rank(df, query, user_lat=None, user_lon=None): | |
| df = df.copy() | |
| if user_lat and user_lon and "lat" in df.columns: | |
| def dist(row): | |
| try: return haversine(float(user_lat), float(user_lon), float(row["lat"]), float(row["lon"])) | |
| except: return 999 | |
| df["distance_km"] = df.apply(dist, axis=1) | |
| mx = df["distance_km"].replace(999, np.nan).max() or 1 | |
| df["distance_score"] = 1 - (df["distance_km"] / (mx + 1)) | |
| else: | |
| df["distance_km"] = 999; df["distance_score"] = 0.0 | |
| q_clean = clean_text(query) | |
| df["name_match_score"] = df["name"].apply( | |
| lambda n: 1.0 if clean_text(str(n)) in q_clean or q_clean in clean_text(str(n)) else 0.0) | |
| w = dict(semantic=0.40, rating=0.25, open=0.15, distance=0.10, name=0.10) | |
| df["final_score"] = ( | |
| w["semantic"]*df.get("semantic_score", pd.Series(0,index=df.index)).fillna(0) + | |
| w["rating"] *df.get("rating_score", pd.Series(0,index=df.index)).fillna(0) + | |
| w["open"] *df.get("open_score", pd.Series(0,index=df.index)).fillna(0) + | |
| w["distance"]*df["distance_score"] + w["name"]*df["name_match_score"] | |
| ) | |
| return df.sort_values("final_score", ascending=False).reset_index(drop=True) | |
| def search_places(query, top_k_final=5, category=None, sub_category=None, | |
| location=None, price_range=None, open_now_only=False, | |
| min_rating=None, user_lat=None, user_lon=None): | |
| cands = semantic_candidates(query, top_k=20) | |
| merged = merge_places(cands) | |
| for attempt in [ | |
| dict(category=category, sub_category=sub_category, location=location, | |
| price_range=price_range, open_now_only=open_now_only, min_rating=min_rating), | |
| dict(category=category, sub_category=None, location=location, | |
| price_range=price_range, open_now_only=open_now_only, min_rating=min_rating), | |
| dict(category=category, sub_category=None, location=location, | |
| price_range=None, open_now_only=False, min_rating=min_rating), | |
| dict(category=category, sub_category=None, location=None, | |
| price_range=None, open_now_only=False, min_rating=None), | |
| ]: | |
| filtered = apply_filters(merged, query, **attempt) | |
| if not filtered.empty: break | |
| if filtered.empty: return pd.DataFrame() | |
| ranked = rank(filtered, query, user_lat, user_lon) | |
| keep = [c for c in ["place_id","name","category","sub_category","location","address", | |
| "price_range","rating","rating_norm","opening_hours","description", | |
| "phone","lat","lon","semantic_score","final_score","open_now"] | |
| if c in ranked.columns] | |
| return ranked[keep].head(top_k_final).reset_index(drop=True) | |
| # RESPONSE TEMPLATES + FORMATTERS | |
| RESPONSE_TEMPLATES = { | |
| "find_restaurant":[ | |
| "🍽️ لقيتلك {name} في {location}. {price_info}{rating_info}{hours_info}", | |
| "أنصحك بـ {name} — هتلاقيه في {location}. {price_info}{rating_info}{hours_info}", | |
| "في {location} فيه {name}. {description_short}{price_info}{hours_info}", | |
| ], | |
| "find_pharmacy":[ | |
| "💊 {name} في {location}.{hours_info}{rating_info}", | |
| "أقرب صيدلية ليك: {name} — {address_info}{hours_info}", | |
| ], | |
| "find_cafe":[ | |
| "☕ {name} في {location}. {price_info}{rating_info}{hours_info}", | |
| "جرب {name} — في {location}. {description_short}{hours_info}", | |
| ], | |
| "find_supermarket":[ | |
| "🛒 {name} في {location}.{hours_info}{rating_info}", | |
| "أقرب ماركت: {name} — {address_info}{hours_info}", | |
| ], | |
| "find_housing":[ | |
| "🏠 {name} في {location}. {price_info}{description_short}", | |
| "فيه {name} في {location}. {price_info}{rating_info}", | |
| ], | |
| "greeting": ["أهلاً! 😊 أنا بساعدك تلاقي أي مكان في بني سويف. عايز إيه؟", | |
| "وعليكم السلام! قولي محتاج إيه — مطعم، صيدلية، كافيه؟", | |
| "هلا بيك! محتاج إيه في بني سويف؟ 😊"], | |
| "thanks": ["العفو! 😊 في حاجة تانية أساعدك فيها؟","أي خدمة! 😊","بكل سرور! 😊"], | |
| "goodbye": ["مع السلامة! 👋","سلامتك! أي وقت محتاج مساعدة أنا هنا.","باي! ربنا يوفقك 😊"], | |
| "clarification":["😊 قصدك إيه بالظبط؟","ممكن توضح أكتر؟","تمام! بتدور على إيه بالظبط؟"], | |
| "no_result": ["😔 مش لاقي حاجة مناسبة. جرب تغير المنطقة أو تسأل بطريقة تانية.", | |
| "معلش، مفيش نتايج. ممكن تحدد المنطقة أو النوع أكتر؟"], | |
| "fallback": ["آسف، مش فاهم قصدك. 😊 قولي محتاج إيه — مطعم، صيدلية، كافيه؟", | |
| "ممكن تسألني عن أي مكان في بني سويف وأنا هساعدك! 😊"], | |
| } | |
| def fmt_price(x): | |
| p = str(x).strip().lower() | |
| if not p or p in ("","nan","none"): return "" | |
| m = {"cheap":"الأسعار رخيصة","رخيص":"الأسعار رخيصة","اقتصادي":"الأسعار اقتصادية", | |
| "medium":"الأسعار متوسطة","متوسط":"الأسعار متوسطة", | |
| "expensive":"الأسعار غالية","غالي":"الأسعار غالية"} | |
| for k,v in m.items(): | |
| if k in p: return v+". " | |
| return f"السعر: {x}. " | |
| def fmt_rating(x): | |
| try: | |
| r = normalize_rating(float(x)); stars = min(round(r), 5) | |
| return f"تقييمه {r} {'⭐'*stars}. " if r > 0 else "" | |
| except: return "" | |
| def fmt_hours(x): | |
| h = str(x).strip() | |
| if not h or h in ("","nan","none"): return "" | |
| if any(k in h.lower() for k in ["24","always","طول اليوم"]): return "مفتوح 24 ساعة. " | |
| return f"بيفتح: {h}. " | |
| def fmt_addr(address, location): | |
| a=str(address).strip(); l=str(location).strip() | |
| if a and a not in ("","nan","none"): return f"عنوانه: {a}. " | |
| if l and l not in ("","nan","none"): return f"في {l}. " | |
| return "" | |
| def fmt_desc(x, max_words=12): | |
| d = str(x).strip() | |
| if not d or d in ("","nan","none"): return "" | |
| words = d.split() | |
| return (" ".join(words[:max_words])+"...") if len(words)>max_words else d+" " | |
| def build_response(place, intent, category=None): | |
| if not place: return random.choice(RESPONSE_TEMPLATES["no_result"]) | |
| tk = get_template_key(intent, category) | |
| reply = random.choice(RESPONSE_TEMPLATES[tk]).format( | |
| name = str(place.get("name","")).strip(), | |
| location = str(place.get("location","")).strip() or "بني سويف", | |
| price_info = fmt_price(place.get("price_range","")), | |
| rating_info = fmt_rating(place.get("rating_norm", place.get("rating", 0))), | |
| hours_info = fmt_hours(place.get("opening_hours","")), | |
| address_info = fmt_addr(place.get("address",""), place.get("location","")), | |
| description_short= fmt_desc(place.get("description","")), | |
| ) | |
| on = place.get("open_now") | |
| if on == 1: reply += "\n🟢 مفتوح دلوقتي." | |
| elif on == 0: reply += "\n🔴 مغلق دلوقتي." | |
| return reply | |
| def handle_detail(text, place): | |
| if not place: return "مش فاكر إحنا اتكلمنا عن مكان. ممكن تسألني من الأول؟" | |
| t = norm(text); name = str(place.get("name","")).strip() | |
| if any(w in t for w in ["امتي","امتى","مواعيد","يفتح","تفتح","يقفل"]): | |
| st = "🟢 مفتوح" if place.get("open_now")==1 else "🔴 مغلق" | |
| return f"⏰ {name} — {fmt_hours(place.get('opening_hours',''))}\n{st} دلوقتي." | |
| if any(w in t for w in ["عنوان","فين","وصول","اوصل"]): | |
| return f"📍 {name} في {place.get('location','')}.\\nالعنوان: {place.get('address','')}" | |
| if any(w in t for w in ["سعر","بكام","تكلف","غالي","رخيص"]): | |
| return f"💰 {name} — {fmt_price(place.get('price_range',''))}" | |
| if any(w in t for w in ["تقييم","نجوم"]): | |
| return f"⭐ {name} — {fmt_rating(place.get('rating_norm', place.get('rating',0)))}" | |
| if any(w in t for w in ["رقم","تليفون"]): | |
| phone = str(place.get("phone","")).strip() | |
| return f"📞 {name} — {phone}" if phone else f"معنديش رقم {name}." | |
| return f"📋 {name}:\n{fmt_desc(place.get('description',''), 20)}\n{fmt_hours(place.get('opening_hours',''))}{fmt_rating(place.get('rating_norm',0))}" | |
| # PREDICT FUNCTIONS | |
| def predict_intent(text, threshold=0.5): | |
| override = apply_keyword_override(text) | |
| if override: return {"intent": override, "confidence": 1.0} | |
| inputs = intent_tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128) | |
| with torch.no_grad(): | |
| outputs = intent_model(**inputs) | |
| probs = torch.softmax(outputs.logits, dim=1) | |
| pid = torch.argmax(probs, dim=1).item() | |
| conf = probs[0][pid].item() | |
| return {"intent": id2intent[pid] if conf >= threshold else "fallback", "confidence": round(conf, 4)} | |
| def extract_entities(text, min_score=0.40): | |
| raw = ner_pipeline([text])[0]; entities = {} | |
| for item in raw: | |
| rtype = item["entity_group"].lower().strip() | |
| val = re.sub(r"##", "", item["word"].strip()).strip() | |
| val = re.sub(r"\s+", " ", val).strip() | |
| score = float(item["score"]) | |
| if len(val) < 2 or score < min_score: continue | |
| mapped = ENTITY_FIELD_MAP.get(rtype, rtype) | |
| val_c = clean_text(val) | |
| if mapped not in entities or len(val_c) > len(clean_text(entities[mapped])): | |
| entities[mapped] = val_c | |
| return entities | |
| # SESSION | |
| class Session: | |
| def __init__(self, sid="default"): | |
| self.sid = sid; self.history=[]; self.last_intent=None | |
| self.last_entities={}; self.last_place=None | |
| self.last_results=[]; self.result_pointer=0 | |
| self.context_slots={}; self.turns=0 | |
| def add(self, user, bot, intent, entities, place, results): | |
| self.history.append({"turn":self.turns,"user":user,"bot":bot, | |
| "intent":intent,"entities":entities}) | |
| if intent and intent not in ("fallback","no_result","out_of_scope"): | |
| self.last_intent = intent | |
| if intent in SEARCH_INTENTS: | |
| self.last_entities = entities | |
| if place is not None: self.last_place = place | |
| if results: self.last_results=results; self.result_pointer=0 | |
| self._slots(entities) | |
| self.turns += 1 | |
| def _slots(self, ents): | |
| for s in ["location","category","sub_category","price"]: | |
| v = ents.get(s) | |
| if v and str(v).strip(): self.context_slots[s] = str(v).strip() | |
| def merge(self, new_ents): | |
| merged = dict(self.context_slots) | |
| for k,v in new_ents.items(): | |
| if v and str(v).strip(): merged[k]=str(v).strip() | |
| self._slots(new_ents) | |
| return merged | |
| # MAIN CHAT | |
| def chat(text: str, session: Session, user_lat=None, user_lon=None): | |
| result = dict(reply="", intent="", confidence=0.0, entities={}, best_place=None, all_results=[]) | |
| if not text or not text.strip(): | |
| result.update(reply="الرجاء إدخال سؤال 😊", intent="fallback") | |
| session.add("", result["reply"], "fallback", {}, None, []) | |
| return result | |
| if is_out_of_scope(text): | |
| reply = "أنا متخصص في إيجاد الأماكن في بني سويف فقط. 😊\nممكن أساعدك تلاقي مطعم، صيدلية، كافيه، ماركت، أو سكن." | |
| result.update(reply=reply, intent="out_of_scope") | |
| session.add(text, reply, "out_of_scope", {}, None, []) | |
| return result | |
| ref = detect_ref_type(text) | |
| if ref == "detail" and session.last_place: | |
| reply = handle_detail(text, session.last_place) | |
| result.update(reply=reply, intent=session.last_intent or "detail", best_place=session.last_place) | |
| session.add(text, reply, result["intent"], {}, session.last_place, []) | |
| return result | |
| if ref == "next" and session.last_results: | |
| ptr = session.result_pointer + 1 | |
| if ptr < len(session.last_results): | |
| session.result_pointer = ptr; nxt = session.last_results[ptr]; session.last_place = nxt | |
| reply = build_response(nxt, session.last_intent, category=nxt.get("category")) | |
| result.update(reply=reply, intent=session.last_intent, best_place=nxt) | |
| else: | |
| result.update(reply="😔 مفيش نتايج تانية. عايز أدور من الأول؟", intent="no_result") | |
| session.add(text, result["reply"], result["intent"], {}, result["best_place"], []) | |
| return result | |
| ir = predict_intent(text); intent = ir["intent"]; conf = ir["confidence"] | |
| result["intent"] = intent; result["confidence"] = conf | |
| if intent in STATIC_INTENTS: | |
| result["reply"] = random.choice(RESPONSE_TEMPLATES[get_template_key(intent)]) | |
| session.add(text, result["reply"], intent, {}, None, []) | |
| return result | |
| if intent == "fallback": | |
| if session.last_intent in SEARCH_INTENTS and _loc_continuation(text): | |
| intent = session.last_intent; result["intent"] = intent | |
| else: | |
| result["reply"] = random.choice(RESPONSE_TEMPLATES["fallback"]) | |
| session.add(text, result["reply"], "fallback", {}, None, []) | |
| return result | |
| if intent not in SEARCH_INTENTS: | |
| result["reply"] = random.choice(RESPONSE_TEMPLATES.get(get_template_key(intent), RESPONSE_TEMPLATES["fallback"])) | |
| session.add(text, result["reply"], intent, {}, None, []) | |
| return result | |
| ents = extract_entities(text); result["entities"] = ents | |
| merged = session.merge(ents) | |
| category = normalize_category(merged.get("category") or INTENT_TO_CATEGORY.get(intent) or infer_category(text)) | |
| sub_cat = merged.get("sub_category") | |
| location = merged.get("location") | |
| price_range = merged.get("price") | |
| open_only = ("open_now" in intent or "place_details" in intent) | |
| df = search_places(text, top_k_final=5, category=category, sub_category=sub_cat, | |
| location=location, price_range=price_range, open_now_only=open_only, | |
| user_lat=user_lat, user_lon=user_lon) | |
| if df.empty: | |
| cl = CLARIFICATION_Q.get(intent, "") | |
| reply = random.choice(RESPONSE_TEMPLATES["no_result"]) + (f"\n\n💬 {cl}" if cl else "") | |
| result.update(reply=reply, intent="no_result") | |
| session.add(text, reply, "no_result", ents, None, []) | |
| return result | |
| all_res = df.to_dict(orient="records"); best = all_res[0] | |
| reply = build_response(best, intent, category=category) | |
| if len(all_res) > 1: reply += f"\n\n💬 فيه {len(all_res)} نتيجة — قولي 'تاني' لو عايز غيره." | |
| result.update(reply=reply, best_place=best, all_results=all_res) | |
| session.add(text, reply, intent, ents, best, all_res) | |
| return result | |
| async def lifespan(app: FastAPI): | |
| global intent_tokenizer, intent_model, label_encoder, id2intent | |
| global ner_pipeline, label2id, id2label | |
| global semantic_model, corpus_df, corpus_embeddings, places_df | |
| print("⏳ Downloading models from HuggingFace …") | |
| # تحميل الموديلز من HuggingFace Model Hub | |
| intent_local = snapshot_download(INTENT_REPO) | |
| entity_local = snapshot_download(ENTITY_REPO) | |
| semantic_local = snapshot_download(SEMANTIC_REPO) | |
| print("⏳ Loading Intent model …") | |
| intent_tokenizer = AutoTokenizer.from_pretrained(intent_local) | |
| intent_model = AutoModelForSequenceClassification.from_pretrained(intent_local) | |
| label_encoder = joblib.load(os.path.join(intent_local, "label_encoder.pkl")) | |
| id2intent = {i: lbl for i, lbl in enumerate(label_encoder.classes_)} | |
| intent_model.eval() | |
| print("⏳ Loading Entity model …") | |
| with open(os.path.join(entity_local, "label2id.json"), encoding="utf-8") as f: label2id = json.load(f) | |
| with open(os.path.join(entity_local, "id2label.json"), encoding="utf-8") as f: id2label = json.load(f) | |
| etok = AutoTokenizer.from_pretrained(entity_local) | |
| emod = AutoModelForTokenClassification.from_pretrained(entity_local) | |
| ner_pipeline = pipeline("token-classification", model=emod, tokenizer=etok, aggregation_strategy="first") | |
| print("⏳ Loading Semantic model …") | |
| semantic_model = SentenceTransformer("Youmnaaaa/semantic-search-ff") | |
| from huggingface_hub import hf_hub_download | |
| pkl_path = hf_hub_download( | |
| repo_id="Youmnaaaa/semantic-search-ff", | |
| filename="semantic_data.pkl" | |
| ) | |
| with open(pkl_path, "rb") as f: | |
| sd = pickle.load(f) | |
| corpus_df = sd["corpus_df"] | |
| corpus_embeddings = sd["corpus_embeddings"] | |
| places_df = pd.read_excel(PLACES_FILE) | |
| for col in ["place_id","name","category","sub_category","location","address", | |
| "price_range","rating","opening_hours","description","lat","lon"]: | |
| if col not in places_df.columns: places_df[col] = "" | |
| places_df = places_df.fillna("") | |
| places_df["category_clean"] = places_df["category"].apply(clean_text) | |
| places_df["sub_category_clean"] = places_df["sub_category"].apply(clean_text) | |
| places_df["location_clean"] = places_df["location"].apply(clean_text) | |
| places_df["address_clean"] = places_df["address"].apply(clean_text) | |
| places_df["price_range_clean"] = places_df["price_range"].apply(clean_text) | |
| places_df["description_clean"] = places_df["description"].apply(clean_text) | |
| places_df["search_text_clean"] = ( | |
| places_df["name"].astype(str)+" "+places_df["category"].astype(str)+" "+ | |
| places_df["sub_category"].astype(str)+" "+places_df["location"].astype(str)+" "+ | |
| places_df["description"].astype(str) | |
| ).apply(clean_text) | |
| print("✅ All models loaded!") | |
| yield | |
| print("Shutting down.") | |
| # FASTAPI | |
| app = FastAPI(title="Beni Suef Chatbot API", version="1.0.0", lifespan=lifespan) | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) | |
| class ChatRequest(BaseModel): | |
| message: str | |
| session_id: str = "default" | |
| user_lat: Optional[float] = None | |
| user_lon: Optional[float] = None | |
| class ChatResponse(BaseModel): | |
| reply: str | |
| intent: str | |
| confidence: float | |
| entities: dict | |
| session_id: str | |
| best_place: Optional[dict] = None | |
| def root(): | |
| return {"status": "ok", "message": "Beni Suef Chatbot is running 🚀"} | |
| def health(): | |
| return {"status": "healthy", | |
| "models_loaded": all([intent_model, ner_pipeline, semantic_model, places_df is not None])} | |
| def chat_endpoint(req: ChatRequest): | |
| if req.session_id not in SESSIONS: | |
| SESSIONS[req.session_id] = Session(req.session_id) | |
| session = SESSIONS[req.session_id] | |
| try: | |
| result = chat(req.message, session, req.user_lat, req.user_lon) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| best = result.get("best_place") | |
| if best: | |
| best = {k: (float(v) if isinstance(v, (np.floating, np.integer)) else | |
| (None if (isinstance(v, float) and np.isnan(v)) else v)) | |
| for k, v in best.items() | |
| if k in ["place_id","name","category","sub_category","location","address", | |
| "price_range","rating","opening_hours","description","phone", | |
| "lat","lon","open_now","final_score"]} | |
| return ChatResponse(reply=result["reply"], intent=result["intent"], | |
| confidence=result["confidence"], entities=result["entities"], | |
| session_id=req.session_id, best_place=best) | |
| def reset_session(session_id: str): | |
| SESSIONS.pop(session_id, None) | |
| return {"status": "reset", "session_id": session_id} |