Spaces:
Paused
Paused
| from fastapi import FastAPI, Request | |
| import json | |
| import os | |
| import re | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| from datetime import datetime | |
| app = FastAPI() | |
| # --- SETUP --- | |
| COLLECTION_KNOWLEDGE = "knowledge_base" | |
| COLLECTION_RULES = "availability_rules" | |
| COLLECTION_INBOX = "inbox" | |
| KNOWLEDGE_CACHE = [] | |
| # --- FIREBASE VERBINDUNG --- | |
| db = None | |
| try: | |
| key = os.environ.get("FIREBASE_KEY") | |
| if key: | |
| cred = credentials.Certificate(json.loads(key)) | |
| if not firebase_admin._apps: | |
| firebase_admin.initialize_app(cred) | |
| db = firestore.client() | |
| print("✅ DB VERBUNDEN") | |
| else: | |
| print("❌ FEHLER: FIREBASE_KEY fehlt!") | |
| except Exception as e: | |
| print(f"❌ DB CRASH: {e}") | |
| # --- CACHE LADEN --- | |
| def reload_knowledge(): | |
| global KNOWLEDGE_CACHE | |
| if not db: return | |
| try: | |
| docs = db.collection(COLLECTION_KNOWLEDGE).stream() | |
| KNOWLEDGE_CACHE = [d.to_dict() for d in docs] | |
| print(f"📚 {len(KNOWLEDGE_CACHE)} Einträge geladen.") | |
| except Exception as e: | |
| print(f"❌ Cache Fehler: {e}") | |
| async def startup(): | |
| reload_knowledge() | |
| # --- HELPER: STEMMING & TOKENIZING --- | |
| def get_stem(word): | |
| # Einfaches Stemming | |
| w = word.lower().strip() | |
| suffixes = ["ungen", "innen", "keit", "sch", "ern", "en", "er", "es", "st", "te", "e", "s", "t"] | |
| for end in suffixes: | |
| if w.endswith(end) and len(w) > len(end) + 2: | |
| return w[:-len(end)] | |
| return w | |
| def tokenize(text): | |
| # Entfernt Sonderzeichen und zerlegt in Stämme | |
| clean_text = re.sub(r'[^\w\s]', '', text.lower()) | |
| return [get_stem(w) for w in clean_text.split() if w] | |
| # --- HELPER: VAPI REQUEST PARSER --- | |
| def parse_vapi_request(data): | |
| tool_call_id = "unknown" | |
| args = {} | |
| try: | |
| msg = data.get("message", {}) | |
| if "toolCallList" in msg: | |
| call = msg["toolCallList"][0] | |
| tool_call_id = call["id"] | |
| if "function" in call and "arguments" in call["function"]: | |
| args = call["function"]["arguments"] | |
| elif "toolCalls" in msg: | |
| call = msg["toolCalls"][0] | |
| tool_call_id = call["id"] | |
| if "function" in call and "arguments" in call["function"]: | |
| args = call["function"]["arguments"] | |
| if isinstance(args, str): | |
| args = json.loads(args) | |
| except Exception as e: | |
| print(f"⚠️ Parsing Info: {e}") | |
| return tool_call_id, args | |
| # ========================================== | |
| # TOOL: SUCHE (OPTIMIERT) | |
| # ========================================== | |
| async def search(request: Request): | |
| data = await request.json() | |
| tool_call_id, args = parse_vapi_request(data) | |
| query = args.get("search_query") or args.get("query") or data.get("search_query") | |
| print(f"🔎 QUERY: '{query}'") | |
| answer_text = "Dazu habe ich leider keine Informationen in meiner Datenbank." | |
| if query: | |
| # --- STOP WÖRTER LISTE (MASSIV ERWEITERT) --- | |
| # Diese Wörter werden komplett ignoriert und geben 0 Punkte. | |
| STOP_WORDS = [ | |
| # Kommunikation | |
| "hallo", "guten", "tag", "moin", "bitte", "danke", "frage", "sagen", "kannst", "du", "mir", | |
| "was", "ist", "wer", "wie", "wo", "wann", "erzähl", "über", "möchte", "will", "haben", | |
| # Artikel & Füllwörter (DIE KILLER!) | |
| "der", "die", "das", "dem", "den", "des", "ein", "eine", "einer", "eines", | |
| "im", "in", "von", "zu", "bei", "mit", "für", "auf", "aus", "um", "und", "oder", | |
| # Generische Business-Wörter (die alles matchen würden) | |
| "anbieten", "machen", "tun", "geben", "helfen", "unterstützen", "bieten", | |
| "firma", "unternehmen", "gmbh", "produkt", "system", "plattform" | |
| # "plattform" ist hier Stop-Wort, damit "Kosten der Plattform" nicht beim "Plattform-Feature" landet! | |
| ] | |
| # 1. Query bereinigen | |
| query_stems = [w for w in tokenize(query) if w not in STOP_WORDS and len(w) > 2] | |
| print(f"🧐 Relevante Tokens: {query_stems}") | |
| found = False | |
| if query_stems: | |
| best_doc = None | |
| best_score = 0 | |
| for doc in KNOWLEDGE_CACHE: | |
| score = 0 | |
| hits = [] | |
| # Dokument Inhalte tokenizen | |
| # WICHTIG: Keywords zählen wir doppelt so stark, wenn sie exakt passen | |
| doc_keywords = [get_stem(k) for k in doc.get("keywords", [])] | |
| doc_title_stems = tokenize(doc.get("question", "")) | |
| for q_stem in query_stems: | |
| # 1. KEYWORD MATCH (Der "Router") -> 100 Punkte! | |
| if q_stem in doc_keywords: | |
| score += 100 | |
| hits.append(f"KEYWORD '{q_stem}'") | |
| # 2. TITEL MATCH -> 50 Punkte | |
| elif q_stem in doc_title_stems: | |
| score += 50 | |
| hits.append(f"TITLE '{q_stem}'") | |
| # (Wir ignorieren den Fließtext für das Scoring, um Zufallstreffer zu vermeiden) | |
| if score > best_score: | |
| best_score = score | |
| best_doc = doc | |
| if score > 0: | |
| print(f" Kandidat: {score} Pkt ({hits}) -> {doc.get('question')[:30]}...") | |
| # SCHWELLE: 50 PUNKTE | |
| # Es muss mindestens ein Titel-Treffer (50) oder Keyword (100) sein. | |
| if best_doc and best_score >= 50: | |
| print(f"🏆 GEWINNER ({best_score} Pkt): {best_doc.get('question')}") | |
| answer_text = best_doc.get("answer") | |
| found = True | |
| else: | |
| print(f"⚠️ Kein Treffer (Max Score: {best_score})") | |
| # --- INBOX --- | |
| if not found and db: | |
| print("📥 Ab in die Inbox.") | |
| db.collection(COLLECTION_INBOX).add({ | |
| "query": query, | |
| "timestamp": datetime.now(), | |
| "status": "open" | |
| }) | |
| return {"results": [{"toolCallId": tool_call_id, "result": answer_text}]} | |
| # --- ANDERE ENDPOINTS --- | |
| async def check_availability(request: Request): | |
| data = await request.json() | |
| tool_call_id, _ = parse_vapi_request(data) | |
| today = datetime.now().strftime("%Y-%m-%d") | |
| status, instruction = "available", "Normal arbeiten" | |
| if db: | |
| rules = db.collection(COLLECTION_RULES).where("active", "==", True).stream() | |
| for r in rules: | |
| rd = r.to_dict() | |
| if rd.get('start_date') <= today <= rd.get('end_date'): | |
| status = "limited" if "ferien" in rd.get('name', '').lower() else "unavailable" | |
| instruction = rd.get('instruction_text') | |
| break | |
| return {"results": [{"toolCallId": tool_call_id, "result": {"status": status, "instruction": instruction}}]} | |
| async def dummy_incoming(request: Request): return {"status": "ok"} | |
| def home(): return {"status": "Online"} |