Scarlett / app.py
rbn2008k's picture
Update app.py
1ea6945 verified
raw
history blame
8.71 kB
import os
import time
import threading
import base64
from io import BytesIO
import gradio as gr
import asyncio
from collections import OrderedDict
from datetime import datetime
import requests
from openai import OpenAI
from telethon import TelegramClient, events
from PIL import Image
from huggingface_hub import InferenceClient
import pymongo
from pymongo import MongoClient
def load_system_prompt():
with open('prompt.txt', 'r') as file:
return file.read()
system_prompt = load_system_prompt()
# Environment variables
api_id = os.getenv('api_id')
api_hash = os.getenv('api_hash')
bot_token = os.getenv('bot_token')
openai_api_key = os.getenv('glhf')
ping_key = os.getenv('bolo')
api_url = os.getenv('yolo')
model = os.getenv('model')
model2 = os.getenv('model2')
mongoURI = os.getenv('MONGO_URI')
# OpenAI and MongoDB clients
openai_client = OpenAI(api_key=openai_api_key, base_url=api_url)
mongo_client = MongoClient(mongoURI)
db = mongo_client['Scarlett']
chat_collection = db['chats']
local_chat_history = OrderedDict()
MAX_LOCAL_USERS = 5
# Functions for MongoDB-based chat history storage and retrieval
def get_history_from_mongo(user_id):
result = chat_collection.find_one({"user_id": user_id})
return result.get("messages", []) if result else []
def store_message_in_mongo(user_id, role, content):
chat_collection.update_one(
{"user_id": user_id},
{
"$push": {
"messages": {
"$each": [{"role": role, "content": content}],
"$slice": -20
}
}
},
upsert=True
)
def get_chat_history(user_id):
if user_id in local_chat_history:
local_chat_history.move_to_end(user_id)
return local_chat_history[user_id]
history = get_history_from_mongo(user_id)
local_chat_history[user_id] = history
if len(local_chat_history) > MAX_LOCAL_USERS:
local_chat_history.popitem(last=False)
return history
def update_chat_history(user_id, role, content):
if user_id not in local_chat_history:
local_chat_history[user_id] = get_history_from_mongo(user_id)
local_chat_history[user_id].append({"role": role, "content": content})
local_chat_history[user_id] = local_chat_history[user_id][-20:]
local_chat_history.move_to_end(user_id)
if len(local_chat_history) > MAX_LOCAL_USERS:
local_chat_history.popitem(last=False)
store_message_in_mongo(user_id, role, content)
# Fixing image encoding
def encode_local_image(image_file):
try:
im = Image.open(image_file)
buffered = BytesIO()
im.save(buffered, format="PNG")
image_bytes = buffered.getvalue()
image_base64 = base64.b64encode(image_bytes).decode('ascii')
return image_base64
except Exception as e:
print(f"Error encoding image: {e}")
return None
# Image description function, calling external inference model
def inference_calling_idefics(image_path, question=""):
system_prompt = os.getenv('USER_PROMPT')
model_id = model2
client = InferenceClient(model=model_id)
# Use the fixed `encode_local_image` to encode the image
image_base64 = encode_local_image(image_path)
if not image_base64:
return "Error: Invalid image or unable to encode image."
image_info = f"data:image/png;base64,{image_base64}"
prompt = question if question != "" else 'Describe this image without question mark'
try:
response = ""
for message in client.chat_completion(
model=image_model,
messages=[
{
"role": "system",
"content": [
{"type": "text", "text": system_prompt},
],
},
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_info}},
{"type": "text", "text": prompt},
],
}
],
max_tokens=2048,
stream=True,
):
response += message.choices[0].delta.content
return response
except Exception as e:
print(f"Error in inference call: {e}")
return "Error while processing the image."
def describe_image(client, image_path, question=""):
try:
answer = inference_calling_idefics(image_path, question)
return answer
except Exception as e:
print(e)
return "Error while seeing the image."
# Telegram bot setup
client = TelegramClient('bot', api_id, api_hash).start(bot_token=bot_token)
async def get_bot_id():
me = await client.get_me()
return me.id
# OpenAI completion handler
async def get_completion(client, event, user_id, prompt):
history = get_chat_history(user_id)
messages = [
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": prompt},
]
try:
completion = openai_client.chat.completions.create(
model=model,
messages=messages,
max_tokens=512,
temperature=0.7,
top_p=1.0,
frequency_penalty=1.0,
presence_penalty=1.0,
stream=True
)
message = ""
for chunk in completion:
if chunk.choices[0].delta.content is not None:
message += chunk.choices[0].delta.content
await event.respond(message)
except Exception as e:
message = "..........."
await event.respond(message)
print(e)
update_chat_history(user_id, "user", prompt)
update_chat_history(user_id, "assistant", message)
return message
# Telegram bot commands
@client.on(events.NewMessage(pattern='/start'))
async def start(event):
await event.respond("Hello!")
@client.on(events.NewMessage(pattern='/help'))
async def help(event):
await event.respond("Here is how I can help you:\n/start - To check if I am alive\n/help - Show this message\n/reset - Reset chat history")
@client.on(events.NewMessage(pattern='/reset'))
async def reset(event):
user_id = event.chat_id
chat_collection.delete_one({"user_id": user_id})
if user_id in local_chat_history:
del local_chat_history[user_id]
await event.respond("History has been reset.")
@client.on(events.NewMessage)
async def handle_message(event):
bot_id = await get_bot_id()
if event.sender_id == bot_id:
return
user_id = event.chat_id
user_message = event.raw_text
if event.photo:
photo = await event.download_media()
if photo:
image_description = describe_image(client, photo, user_message)
user_message += f"\n\nI sent you an image. Content of the image: {image_description}"
if user_message.startswith(('/start', '/help', '/reset')):
return
try:
async with client.action(event.chat_id, 'typing'):
await asyncio.sleep(2)
await get_completion(client, event, user_id, user_message)
except Exception as e:
print(f"An error occurred: {e}")
await event.respond("Whoopsie!")
# Gradio interface
def launch_gradio():
welcome_message = """
# Meet Scarlett!
- Scarlett is a girl residing in Telegram
- She can chat with you without any censorship.
- Head over to https://t.me/Scarlett_Rogue_isNotBot
- Thanks to https://glhf.chat for powering this!
"""
with gr.Blocks() as demo:
gr.Markdown(f"""
# Follow me: https://example.com
{welcome_message}
""")
demo.launch(show_api=False)
# Keep-alive functionality for the bot
def keep_alive():
ping_client = OpenAI(api_key=ping_key, base_url=api_url)
while True:
try:
messages = [
{"role": "system", "content": "Repeat what I say."},
{"role": "user", "content": "Repeat: 'Ping success'"}
]
request = ping_client.chat.completions.create(
model=model,
messages=messages,
max_tokens=10,
temperature=0.1,
top_p=0.1,
)
print(request.choices[0].message.content)
except Exception as e:
print(f"Keep-alive request failed: {e}")
time.sleep(1800)
# Main execution
if __name__ == "__main__":
threading.Thread(target=keep_alive).start()
threading.Thread(target=launch_gradio).start()
client.run_until_disconnected()