from typing import Annotated from valetax_rag import ValetaxRAG from typing import List, Optional from langchain_community.tools import DuckDuckGoSearchRun from config import llm,memory rag = ValetaxRAG() search = DuckDuckGoSearchRun() async def rag_tool(query: Annotated[str, "The user’s question about the documents"]) -> str: """Answer user questions using the async RAG pipeline.""" return await rag.query(query) # مكان مؤقت لتخزين بيانات التيكت ticket_memory = {} async def store_ticket_info( conversation_text: Annotated[str, "Full conversation or user message with issue details"] ) -> str: """Extract and store ticket information from conversation text.""" extraction_prompt = f""" استخرج البيانات التالية من هذا النص: النص: "{conversation_text}" أرجع الإجابة بالتنسيق التالي بالضبط: Issue: [وصف المشكلة بالتفصيل] Email: [البريد الإلكتروني أو "غير متوفر"] Documents: [true إذا ذكر مستندات، false إذا لم يذكر] """ try: # استخدام الـ LLM لاستخراج البيانات response = llm.invoke(extraction_prompt) extracted_text = response.content # تحليل الرد lines = extracted_text.strip().split('\n') issue = "" email = "" has_docs = False for line in lines: if line.startswith("Issue:"): issue = line.replace("Issue:", "").strip() elif line.startswith("Email:"): email = line.replace("Email:", "").strip() elif line.startswith("Documents:"): has_docs = "true" in line.lower() # تحقق من صحة البيانات if not issue or issue == "غير متوفر" or len(issue) < 5: return "⚠️ لم أتمكن من فهم المشكلة. يرجى وصف المشكلة بوضوح أكثر" if not email or email == "غير متوفر" or "@" not in email: return "⚠️ يرجى تقديم البريد الإلكتروني للتواصل معك" # حفظ البيانات ticket_memory["issue"] = issue ticket_memory["email"] = email ticket_memory["documents"] = has_docs memory.save_context( {"input": "User provided ticket info"}, {"output": f"Issue: {issue}, Email: {email}, Documents: {has_docs}"} ) return f"✅ تم استخراج وحفظ البيانات بنجاح:\n📋 المشكلة: {issue}\n📧 الإيميل: {email}\n📎 مستندات: {'نعم' if has_docs else 'لا'}" except Exception as e: return f"⚠️ حدث خطأ في معالجة البيانات: {str(e)}" # باقي الكود كما هو... async def support_ticket_tool(_: str = None) -> str: # نفس الكود السابق if "issue" not in ticket_memory or "email" not in ticket_memory: return "⚠️ بيانات التذكرة غير مكتملة. يرجى تقديم المشكلة والبريد الإلكتروني أولاً" issue = ticket_memory["issue"] email = ticket_memory["email"] documents = ticket_memory.get("documents", False) return f"🎫 تم إنشاء تذكرة دعم للمشكلة: '{issue[:50]}...' \n📧 البريد: {email}\n📎 مستندات: {'نعم' if documents else 'لا'}" # Tool 2: فتح التيكت async def support_ticket_tool(_: str = None) -> str: """ Open a support ticket in the system asynchronously. Parameters: - issue (str): Detailed description of the problem. - email (str, optional): User email for follow-up. - documents (list of str, optional): Paths or links to documents/screenshots to attach. Returns: - str: Confirmation message with ticket details. """ if "issue" not in ticket_memory or "email" not in ticket_memory: return "⚠️ Ticket info not complete. Please provide issue and email first." issue = ticket_memory["issue"] email = ticket_memory["email"] documents = ticket_memory.get("documents", []) # هنا تنادي الـ API بتاعتك (POST request) # requests.post("https://your-api.com/support", json=ticket_memory) return f"🎫 Support ticket created for: '{issue[:50]}...' with {'docs' if documents else 'no docs'} . Contact: {email}"