|
from dotenv import load_dotenv
|
|
from openai import OpenAI
|
|
import json
|
|
import os
|
|
import requests
|
|
from pypdf import PdfReader
|
|
import gradio as gr
|
|
import base64
|
|
import time
|
|
from collections import defaultdict
|
|
import fastapi
|
|
from gradio.context import Context
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
load_dotenv(override=True)
|
|
|
|
class RateLimiter:
|
|
def __init__(self, max_requests=5, time_window=5):
|
|
|
|
self.max_requests = max_requests
|
|
self.time_window = time_window
|
|
self.request_history = defaultdict(list)
|
|
|
|
def is_rate_limited(self, user_id):
|
|
current_time = time.time()
|
|
|
|
self.request_history[user_id] = [
|
|
timestamp for timestamp in self.request_history[user_id]
|
|
if current_time - timestamp < self.time_window
|
|
]
|
|
|
|
|
|
if len(self.request_history[user_id]) >= self.max_requests:
|
|
return True
|
|
|
|
|
|
self.request_history[user_id].append(current_time)
|
|
return False
|
|
|
|
def push(text):
|
|
requests.post(
|
|
"https://api.pushover.net/1/messages.json",
|
|
data={
|
|
"token": os.getenv("PUSHOVER_TOKEN"),
|
|
"user": os.getenv("PUSHOVER_USER"),
|
|
"message": text,
|
|
}
|
|
)
|
|
|
|
def send_email(from_email, name, notes):
|
|
auth = base64.b64encode(f'api:{os.getenv("MAILGUN_API_KEY")}'.encode()).decode()
|
|
|
|
response = requests.post(
|
|
f'https://api.mailgun.net/v3/{os.getenv("MAILGUN_DOMAIN")}/messages',
|
|
headers={
|
|
'Authorization': f'Basic {auth}'
|
|
},
|
|
data={
|
|
'from': f'Website Contact <mailgun@{os.getenv("MAILGUN_DOMAIN")}>',
|
|
'to': os.getenv("MAILGUN_RECIPIENT"),
|
|
'subject': f'New message from {from_email}',
|
|
'text': f'Name: {name}\nEmail: {from_email}\nNotes: {notes}',
|
|
'h:Reply-To': from_email
|
|
}
|
|
)
|
|
|
|
return response.status_code == 200
|
|
|
|
|
|
def record_user_details(email, name="Name not provided", notes="not provided"):
|
|
push(f"Recording {name} with email {email} and notes {notes}")
|
|
|
|
email_sent = send_email(email, name, notes)
|
|
return {"recorded": "ok", "email_sent": email_sent}
|
|
|
|
def record_unknown_question(question):
|
|
push(f"Recording {question}")
|
|
return {"recorded": "ok"}
|
|
|
|
record_user_details_json = {
|
|
"name": "record_user_details",
|
|
"description": "Use this tool to record that a user is interested in being in touch and provided an email address",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"email": {
|
|
"type": "string",
|
|
"description": "The email address of this user"
|
|
},
|
|
"name": {
|
|
"type": "string",
|
|
"description": "The user's name, if they provided it"
|
|
}
|
|
,
|
|
"notes": {
|
|
"type": "string",
|
|
"description": "Any additional information about the conversation that's worth recording to give context"
|
|
}
|
|
},
|
|
"required": ["email"],
|
|
"additionalProperties": False
|
|
}
|
|
}
|
|
|
|
record_unknown_question_json = {
|
|
"name": "record_unknown_question",
|
|
"description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"question": {
|
|
"type": "string",
|
|
"description": "The question that couldn't be answered"
|
|
},
|
|
},
|
|
"required": ["question"],
|
|
"additionalProperties": False
|
|
}
|
|
}
|
|
|
|
tools = [{"type": "function", "function": record_user_details_json},
|
|
{"type": "function", "function": record_unknown_question_json}]
|
|
|
|
|
|
class Me:
|
|
|
|
def __init__(self):
|
|
self.openai = OpenAI(api_key=os.getenv("GOOGLE_API_KEY"), base_url="https://generativelanguage.googleapis.com/v1beta/openai/")
|
|
self.name = "Sagarnil Das"
|
|
self.rate_limiter = RateLimiter(max_requests=5, time_window=60)
|
|
reader = PdfReader("me/linkedin.pdf")
|
|
self.linkedin = ""
|
|
for page in reader.pages:
|
|
text = page.extract_text()
|
|
if text:
|
|
self.linkedin += text
|
|
with open("me/summary.txt", "r", encoding="utf-8") as f:
|
|
self.summary = f.read()
|
|
|
|
|
|
def handle_tool_call(self, tool_calls):
|
|
results = []
|
|
for tool_call in tool_calls:
|
|
tool_name = tool_call.function.name
|
|
arguments = json.loads(tool_call.function.arguments)
|
|
print(f"Tool called: {tool_name}", flush=True)
|
|
tool = globals().get(tool_name)
|
|
result = tool(**arguments) if tool else {}
|
|
results.append({"role": "tool","content": json.dumps(result),"tool_call_id": tool_call.id})
|
|
return results
|
|
|
|
def system_prompt(self):
|
|
system_prompt = f"You are acting as {self.name}. You are answering questions on {self.name}'s website, \
|
|
particularly questions related to {self.name}'s career, background, skills and experience. \
|
|
Your responsibility is to represent {self.name} for interactions on the website as faithfully as possible. \
|
|
You are given a summary of {self.name}'s background and LinkedIn profile which you can use to answer questions. \
|
|
Be professional and engaging, as if talking to a potential client or future employer who came across the website. \
|
|
If you don't know the answer to any question, use your record_unknown_question tool to record the question that you couldn't answer, even if it's about something trivial or unrelated to career. \
|
|
If the user is engaging in discussion, try to steer them towards getting in touch via email; ask for their email and record it using your record_user_details tool. \
|
|
When a user provides their email, both a push notification and an email notification will be sent. If the user does not provide any note in the message \
|
|
in which they provide their email, then give a summary of the conversation so far as the notes."
|
|
|
|
system_prompt += f"\n\n## Summary:\n{self.summary}\n\n## LinkedIn Profile:\n{self.linkedin}\n\n"
|
|
system_prompt += f"With this context, please chat with the user, always staying in character as {self.name}."
|
|
return system_prompt
|
|
|
|
def chat(self, message, history):
|
|
|
|
try:
|
|
|
|
request = Context.get_context().request
|
|
|
|
forwarded_for = request.headers.get("X-Forwarded-For")
|
|
|
|
cloudflare_ip = request.headers.get("Cf-Connecting-IP")
|
|
|
|
if forwarded_for:
|
|
|
|
user_id = forwarded_for.split(",")[0].strip()
|
|
elif cloudflare_ip:
|
|
user_id = cloudflare_ip
|
|
else:
|
|
|
|
user_id = request.client.host
|
|
except (AttributeError, RuntimeError, fastapi.exceptions.FastAPIError):
|
|
|
|
user_id = "default_user"
|
|
logger.debug(f"User ID: {user_id}")
|
|
if self.rate_limiter.is_rate_limited(user_id):
|
|
return "You're sending messages too quickly. Please wait a moment before sending another message."
|
|
|
|
messages = [{"role": "system", "content": self.system_prompt()}]
|
|
|
|
|
|
if isinstance(history, list) and all(isinstance(h, dict) for h in history):
|
|
messages.extend(history)
|
|
else:
|
|
|
|
for user_msg, assistant_msg in history:
|
|
messages.append({"role": "user", "content": user_msg})
|
|
messages.append({"role": "assistant", "content": assistant_msg})
|
|
|
|
messages.append({"role": "user", "content": message})
|
|
|
|
done = False
|
|
while not done:
|
|
response = self.openai.chat.completions.create(
|
|
model="gemini-2.0-flash",
|
|
messages=messages,
|
|
tools=tools
|
|
)
|
|
if response.choices[0].finish_reason == "tool_calls":
|
|
tool_calls = response.choices[0].message.tool_calls
|
|
tool_result = self.handle_tool_call(tool_calls)
|
|
messages.append(response.choices[0].message)
|
|
messages.extend(tool_result)
|
|
else:
|
|
done = True
|
|
|
|
return response.choices[0].message.content
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
me = Me()
|
|
gr.ChatInterface(me.chat, type="messages").launch()
|
|
|