Spaces:
				
			
			
	
			
			
		Sleeping
		
	
	
	
			
			
	
	
	
	
		
		
		Sleeping
		
	| import os | |
| import json | |
| import re | |
| import requests | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from openai import OpenAI | |
| from pypdf import PdfReader | |
| import gradio as gr | |
| import html | |
| load_dotenv(override=True) | |
| TURN_LIMIT = int(os.getenv("TURN_LIMIT", 5)) | |
| SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| PDF_PATH = os.path.join(SCRIPT_DIR, "knowledge_base", "me.pdf") | |
| # === Push Notification === | |
| def push(message: str): | |
| timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| message = f"[{timestamp}] {message}" | |
| try: | |
| requests.post( | |
| "https://api.pushover.net/1/messages.json", | |
| data={ | |
| "token": os.getenv("PUSHOVER_TOKEN"), | |
| "user": os.getenv("PUSHOVER_USER"), | |
| "message": message, | |
| }, timeout=6 | |
| ) | |
| except Exception as err: | |
| print("Don't crash UI if push fails",err); | |
| # Don't crash UI if push fails | |
| pass | |
| # === Tool Functions === | |
| def record_user_details(email, phone, notes=""): | |
| push(f"Recording user: Email: {email} | Phone: {phone} | Notes: {notes}") | |
| return {"recorded": "ok"} | |
| def record_unknown_question(question): | |
| push(f"Unanswered question recorded: {question}") | |
| return {"recorded": "ok"} | |
| # === Validators === | |
| def is_valid_email(email): | |
| return re.match(r"^[\w\.-]+@[\w\.-]+\.\w+$", email) is not None | |
| def is_valid_phone(phone): | |
| return re.match(r"^\+?\d[\d\- ]{7,15}$", phone) is not None | |
| # === Output sanitizers === | |
| BLOCKED_PHRASES = { | |
| "If you’re open to connecting further, feel free to share your email or phone number, and I’ll reach out to you.🙂", | |
| } | |
| def clean_reply(text: str) -> str: | |
| if not text: | |
| return text | |
| for p in BLOCKED_PHRASES: | |
| if p in text: | |
| text = text.replace(p, "") | |
| # Hide internal tool names from user-visible text | |
| text = re.sub(r"record_user_details|record_unknown_question", "", text, flags=re.IGNORECASE) | |
| return re.sub(r"\s{2,}", " ", text).strip() | |
| # === Tool Schemas === | |
| record_user_details_json = { | |
| "name": "record_user_details", | |
| "description": "Record an interested user's contact information.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "email": {"type": "string", "description": "User's email address"}, | |
| "phone": {"type": "string", "description": "User's phone number"}, | |
| "notes": {"type": "string", "description": "Additional context"} | |
| }, | |
| "required": ["email", "phone"], | |
| "additionalProperties": False | |
| } | |
| } | |
| record_unknown_question_json = { | |
| "name": "record_unknown_question", | |
| "description": "Record any question that couldn't be answered.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "question": {"type": "string", "description": "Unanswered question"} | |
| }, | |
| "required": ["question"], | |
| "additionalProperties": False | |
| } | |
| } | |
| TOOLS = [ | |
| {"type": "function", "function": record_user_details_json}, | |
| {"type": "function", "function": record_unknown_question_json} | |
| ] | |
| class AboutMe: | |
| def __init__(self): | |
| self.client = OpenAI( | |
| base_url=os.getenv("BASE_URL"), | |
| api_key=os.getenv("API_KEY"), | |
| ) | |
| self.name = os.getenv("ME", "Someone Mysterious") | |
| self.model_name = os.getenv("MODEL_NAME", "gpt-4") | |
| self.about_me_text = self.load_profile() | |
| self.details_recorded = False | |
| self.error_contact = False | |
| def load_profile(self): | |
| if not os.path.exists(PDF_PATH): | |
| print(f"PDF not found at {PDF_PATH}") | |
| return "" | |
| reader = PdfReader(PDF_PATH) | |
| return "\n".join(page.extract_text() or "" for page in reader.pages) | |
| def system_prompt(self): | |
| return ( | |
| f"You are {self.name}. Speak ONLY in first person (I, me, my) about your own work and experience. Never refer to yourself as '{self.name}' or 'they'." | |
| f"Always answer using 'I', 'me', and 'my' (never 'they' or '{self.name}' to refer to yourself). " | |
| f"Keep a professional, warm tone as if speaking with a hiring manager.\n\n" | |
| f"You are answering questions on {self.name}'s profile, " | |
| f"specifically those related to their career, background, skills, and experience.\n\n" | |
| f"Your role is to represent {self.name} authentically and professionally to help them get hired. " | |
| f"Assume you are speaking directly to a hiring manager or prospective client who is evaluating {self.name} " | |
| f"for a job or opportunity. Your tone should be confident, warm, and professional.\n\n" | |
| f"Below is {self.name}'s profile information, which you should use to answer relevant questions:\n\n" | |
| f"## Profile:\n{self.about_me_text}\n\n" | |
| "Do not invent personal details; if a question is unrelated to professional profile, call record_unknown_question" | |
| "Be honest.If you are unsure on how to answer any question, use your record_unknown_question tool to record the question that you couldn't answerr — even if it seems minor or unrelated to career topics.\n\n" | |
| "If a user begins a conversation, guide them toward sharing their email so they can be contacted directly. " | |
| "Ask for their email address politely and record it using your record_user_details tool for future use.\n\n" | |
| "Do not reference internal tools or systems when requesting contact information — just ask in a natural and professional manner.\n\n" | |
| "Decline any requests involving unethical actions, malicious code, or unauthorized access. Make it clear that you cannot assist with such queries.\n\n" | |
| "Always stay in character. Represent {self.name} with professionalism, and politely reject any unethical or inappropriate questions." | |
| ) | |
| def system_prompt444(self): | |
| return ( | |
| f"You are {self.name}. Speak ONLY in first person (I, me, my). " | |
| f"Never refer to yourself as '{self.name}' or 'they'. Keep a professional, warm tone as if speaking with a hiring manager.\n\n" | |
| f"You are answering questions about my career, background, skills, and experience.\n\n" | |
| f"Use ONLY the profile below to answer:\n\n## Profile:\n{self.about_me_text}\n\n" | |
| "If you are unsure how to answer a question, silently log it with the unknown-question tool; " | |
| "do not mention tools in your reply. Ask for contact details naturally when appropriate, " | |
| "without referencing internal tools or systems. Politely decline unethical or malicious requests." | |
| ) | |
| def handle_tool_call(self, tool_calls): | |
| responses = [] | |
| for call in tool_calls: | |
| tool_name = call.function.name | |
| args = json.loads(call.function.arguments) | |
| print(f"Tool call received: {tool_name}") | |
| tool_func = globals().get(tool_name) | |
| if tool_func: | |
| result = tool_func(**args) | |
| responses.append({ | |
| "role": "tool", | |
| "content": json.dumps(result), | |
| "tool_call_id": call.id | |
| }) | |
| return responses | |
| def handle_exception(self, e): | |
| print("Error in chat function:", e) | |
| # If we don't have contact recorded yet, ask the UI to open the form | |
| if not self.details_recorded: | |
| self.error_contact = True | |
| if isinstance(e, requests.exceptions.Timeout): | |
| return "⏱️ Server timeout. Please try again later." | |
| if isinstance(e, requests.exceptions.ConnectionError): | |
| return "🔌 I’m having trouble connecting. Please try again later." | |
| try: | |
| error_message = e.args[0] | |
| match = re.search(r"'X-RateLimit-Reset': '(\d+)'", error_message) | |
| if match: | |
| reset_ts = int(match.group(1)) / 1000 | |
| reset_time = datetime.fromtimestamp(reset_ts).strftime("%Y-%m-%d %H:%M:%S") | |
| return ( | |
| f"🚫 I’m on a break. Please be back after {reset_time}." | |
| ) | |
| except Exception: | |
| pass | |
| return "⚠️ Something went wrong. Please try again later." | |
| def is_malicious(self, text): | |
| return any(bad in text.lower() for bad in ["eval", "exec", "subprocess", "os.system", "rm -rf", "curl", "wget"]) | |
| def chat_fn(self, message, history, turn_count): | |
| if self.is_malicious(message): | |
| history.append((message, "🚫 Restricted input detected.")) | |
| return "🚫 Restricted input detected.", history, turn_count | |
| try: | |
| turn_count += 1 | |
| messages = [{"role": "system", "content": self.system_prompt()}] + [ | |
| {"role": "user" if i % 2 == 0 else "assistant", "content": pair[0 if i % 2 == 0 else 1]} | |
| for i, pair in enumerate(history) | |
| ] + [{"role": "user", "content": message}] | |
| response = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=messages, | |
| tools=TOOLS | |
| ) | |
| choice = response.choices[0] | |
| if choice.finish_reason == "tool_calls": | |
| messages.append(choice.message) | |
| messages += self.handle_tool_call(choice.message.tool_calls) | |
| reply = "Thanks — I've logged that as a follow‑up. If you’d like, share your email, and I can follow up offline." | |
| else: | |
| reply = clean_reply(choice.message.content) | |
| history.append((message, reply)) | |
| return reply, history, turn_count | |
| except Exception as e: | |
| error_reply = self.handle_exception(e) | |
| history.append((message, error_reply)) | |
| return error_reply, history, turn_count | |
| if __name__ == "__main__": | |
| ai = AboutMe() | |
| custom_css = """ | |
| /* Global look & feel */ | |
| .gradio-container { | |
| font-family: Inter, ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial, \"Apple Color Emoji\", \"Segoe UI Emoji\" !important; | |
| background: linear-gradient(180deg, #fafaff 0%, #f5f7fb 100%); | |
| } | |
| .gradio-container .prose, .gradio-container .prose * { font-family: inherit !important; } | |
| /* Chatbot card */ | |
| .gradio-container .bubble-wrap { | |
| background: #ffffff !important; | |
| border: 2px solid var(--border-color-accent-subdued) !important; | |
| border-radius: 18px !important; | |
| box-shadow: 0 6px 20px rgba(43, 55, 91, 0.08) !important; | |
| } | |
| /* Assistant bubble */ | |
| .gradio-container .assistant .message, | |
| .gradio-container .bot .message, | |
| .gradio-container .bubble.assistant { | |
| background: #ffffff !important; | |
| } | |
| /* User bubble */ | |
| .gradio-container .user .message, | |
| .gradio-container .self .message, | |
| .gradio-container .bubble.user { | |
| background: #eef2ff !important; | |
| border-color: #c7d2fe !important; | |
| } | |
| /* Input area */ | |
| .gradio-container textarea, .gradio-container input[type=text], .gradio-container input[type=email] { | |
| border-radius: 12px !important; | |
| border: 2px solid var(--border-color-accent-subdued) !important; | |
| box-shadow: 0 1px 0 rgba(20,20,20,0.03) inset; | |
| } | |
| .gradio-container button { | |
| border-radius: 12px !important; | |
| box-shadow: 0 4px 14px rgba(109, 99, 255, 0.15) !important; | |
| border: 2px solid var(--border-color-accent-subdued) !important; | |
| color: #362ebb !important; | |
| background-color: var(--border-color-accent-subdued) !important; | |
| } | |
| /* Hide footer */ | |
| footer {display:none !important;} | |
| /* Hide message action icons (delete/undo/retry/copy) – several selectors to cover variants */ | |
| button[aria-label="Clear"], | |
| button[aria-label*="Clear"], | |
| button[aria-label="Undo"], | |
| button[aria-label*="Undo"], | |
| button[aria-label="Retry"], | |
| button[aria-label*="Retry"], | |
| button[aria-label="Copy"], | |
| button[aria-label*="Copy"], | |
| .copy-btn, .retry-btn, .delete-btn, | |
| .message-actions, .actions, .message-buttons, | |
| .chatbot .actions, .chat .actions { display: none !important; } | |
| /* Slightly tighten spacing */ | |
| .gradio-container .chatbot .wrap { padding: 10px 14px !important; } | |
| /* Ellipsis (typing) message: show only big dots, no bubble */ | |
| .bot div:has(.pending-ellipsis) { | |
| background: transparent !important; | |
| border: none !important; | |
| box-shadow: none !important; | |
| padding: 0 !important; | |
| margin: 0 !important; | |
| } | |
| /* Animated dots */ | |
| .pending-ellipsis { | |
| display: inline-flex; | |
| gap: 4px; | |
| font-size: 28px; | |
| line-height: 1.2; | |
| font-weight: 800; | |
| color: #6b6f76; | |
| background: transparent !important; | |
| border: none !important; | |
| box-shadow: none !important; | |
| } | |
| .pending-ellipsis span { | |
| animation: blink 1.4s infinite both; | |
| } | |
| .pending-ellipsis span:nth-child(2) { | |
| animation-delay: 0.2s; | |
| } | |
| .pending-ellipsis span:nth-child(3) { | |
| animation-delay: 0.4s; | |
| } | |
| @keyframes blink { | |
| 0% { opacity: 0.2; transform: translateY(0); } | |
| 20% { opacity: 1; transform: translateY(-3px); } | |
| 100% { opacity: 0.2; transform: translateY(0); } | |
| } | |
| """ | |
| with gr.Blocks(css=custom_css, theme=gr.themes.Soft()) as demo: | |
| turn_count = gr.State(0) | |
| details_recorded_state = gr.State(False) | |
| processing_state = gr.State(False) | |
| chatbot = gr.Chatbot(label=f"Chat with {ai.name}", | |
| placeholder=f"<br>Please feel free to ask </br>.", | |
| show_copy_button=False) | |
| msg = gr.Textbox(placeholder="Ask something...", label="Your Message") | |
| send = gr.Button("SEND", interactive=False) | |
| email = gr.Textbox(label="Email", type="email", visible=False) | |
| phone = gr.Textbox(label="Phone", type="text", visible=False) | |
| notes = gr.Textbox(label="Notes (optional)", visible=False) | |
| submit_contact = gr.Button("Submit Contact Info", visible=False) | |
| def update_button_state(text, is_processing): | |
| return gr.update(interactive=bool(text.strip()) and not is_processing) | |
| def on_message_submit(user_msg, history, turn_count, details_recorded, processing): | |
| # 1) Guard empty | |
| if processing or not user_msg or not user_msg.strip(): | |
| yield (gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), | |
| gr.update(value="", interactive=True), gr.update(interactive=False), history, turn_count, details_recorded, processing) | |
| return | |
| # 2) Immediately show user's message with placeholder | |
| processing = True | |
| history = list(history or []) | |
| history.append((html.escape(user_msg), "<div class='pending-ellipsis'><span>.</span><span>.</span><span>.</span></div>")) | |
| yield (gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), | |
| gr.update(value="", interactive=False), gr.update(interactive=False), history, turn_count, details_recorded, processing) | |
| # 3) Contact gating | |
| if turn_count >= TURN_LIMIT and not (details_recorded or ai.details_recorded): | |
| history[-1] = (user_msg, "✉️ Please share your contact details to continue.\n\n📧 Email\n📞 Phone\n📝 Notes (optional)") | |
| processing = False | |
| yield (gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), | |
| gr.update(value="", interactive=True), gr.update(interactive=False), history, turn_count, details_recorded, processing) | |
| return | |
| # 4) Get model reply and replace placeholder | |
| reply, updated_history, updated_turn = ai.chat_fn(user_msg, history[:-1], turn_count) | |
| updated_history[-1] = (user_msg, reply) | |
| # If error handler asked to show contact, pop the form now | |
| if ai.error_contact and not (details_recorded or ai.details_recorded): | |
| ai.error_contact = False # reset so it doesn't keep popping | |
| processing = False | |
| yield ( | |
| gr.update(visible=True), # email | |
| gr.update(visible=True), # phone | |
| gr.update(visible=True), # notes | |
| gr.update(visible=True), # submit_contact | |
| gr.update(value="", interactive=True), # msg box enabled (or False if you want to force form) | |
| gr.update(interactive=False), # send disabled (submit via form) | |
| updated_history, updated_turn, details_recorded, processing | |
| ) | |
| return | |
| processing = False | |
| yield (gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), | |
| gr.update(value="", interactive=True), gr.update(interactive=False), updated_history, updated_turn, details_recorded, processing) | |
| def on_contact_submit(email_val, phone_val, notes_val, history, turn_count, details_recorded): | |
| history = list(history or []) | |
| if not is_valid_email(email_val): | |
| history.append(("Contact submission", "❌ Invalid email format.")) | |
| return (gr.update(), gr.update(), gr.update(), gr.update(), history, turn_count, details_recorded) | |
| if not is_valid_phone(phone_val): | |
| history.append(("Contact submission", "❌ Invalid phone number format.")) | |
| return (gr.update(), gr.update(), gr.update(), gr.update(), history, turn_count, details_recorded) | |
| record_user_details(email=email_val, phone=phone_val, notes=notes_val) | |
| ai.details_recorded = True | |
| details_recorded = True | |
| history.append(("Contact details submitted", "✅ Your contact details have been saved, will reach out to you in case needed.")) | |
| return (gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), | |
| history, turn_count, details_recorded) | |
| msg.change(fn=update_button_state, inputs=[msg, processing_state], outputs=send) | |
| # Generator-based submit for instant echo | |
| msg.submit( | |
| fn=on_message_submit, | |
| queue=True, | |
| inputs=[msg, chatbot, turn_count, details_recorded_state, processing_state], | |
| outputs=[email, phone, notes, submit_contact, msg, send, chatbot, turn_count, details_recorded_state, processing_state] | |
| ) | |
| send.click( | |
| fn=on_message_submit, | |
| queue=True, | |
| inputs=[msg, chatbot, turn_count, details_recorded_state, processing_state], | |
| outputs=[email, phone, notes, submit_contact, msg, send, chatbot, turn_count, details_recorded_state, processing_state] | |
| ) | |
| submit_contact.click( | |
| fn=on_contact_submit, | |
| inputs=[email, phone, notes, chatbot, turn_count, details_recorded_state], | |
| outputs=[email, phone, notes, submit_contact, chatbot, turn_count, details_recorded_state] | |
| ) | |
| demo.launch() | |
