Spaces:
Paused
Paused
import os | |
import time | |
import threading | |
import base64 | |
from io import BytesIO | |
import gradio as gr | |
import asyncio | |
from collections import OrderedDict | |
from datetime import datetime | |
import requests | |
from openai import OpenAI | |
from telethon import TelegramClient, events | |
from PIL import Image | |
from huggingface_hub import InferenceClient | |
import pymongo | |
from pymongo import MongoClient | |
def load_system_prompt(): | |
with open('prompt.txt', 'r') as file: | |
return file.read() | |
system_prompt = load_system_prompt() | |
# Environment variables | |
api_id = os.getenv('api_id') | |
api_hash = os.getenv('api_hash') | |
bot_token = os.getenv('bot_token') | |
openai_api_key = os.getenv('glhf') | |
ping_key = os.getenv('bolo') | |
api_url = os.getenv('yolo') | |
model = os.getenv('model') | |
model2 = os.getenv('model2') | |
mongoURI = os.getenv('MONGO_URI') | |
# OpenAI and MongoDB clients | |
openai_client = OpenAI(api_key=openai_api_key, base_url=api_url) | |
mongo_client = MongoClient(mongoURI) | |
db = mongo_client['Scarlett'] | |
chat_collection = db['chats'] | |
local_chat_history = OrderedDict() | |
MAX_LOCAL_USERS = 5 | |
# Functions for MongoDB-based chat history storage and retrieval | |
def get_history_from_mongo(user_id): | |
result = chat_collection.find_one({"user_id": user_id}) | |
return result.get("messages", []) if result else [] | |
def store_message_in_mongo(user_id, role, content): | |
chat_collection.update_one( | |
{"user_id": user_id}, | |
{ | |
"$push": { | |
"messages": { | |
"$each": [{"role": role, "content": content}], | |
"$slice": -20 | |
} | |
} | |
}, | |
upsert=True | |
) | |
def get_chat_history(user_id): | |
if user_id in local_chat_history: | |
local_chat_history.move_to_end(user_id) | |
return local_chat_history[user_id] | |
history = get_history_from_mongo(user_id) | |
local_chat_history[user_id] = history | |
if len(local_chat_history) > MAX_LOCAL_USERS: | |
local_chat_history.popitem(last=False) | |
return history | |
def update_chat_history(user_id, role, content): | |
if user_id not in local_chat_history: | |
local_chat_history[user_id] = get_history_from_mongo(user_id) | |
local_chat_history[user_id].append({"role": role, "content": content}) | |
local_chat_history[user_id] = local_chat_history[user_id][-20:] | |
local_chat_history.move_to_end(user_id) | |
if len(local_chat_history) > MAX_LOCAL_USERS: | |
local_chat_history.popitem(last=False) | |
store_message_in_mongo(user_id, role, content) | |
# Fixing image encoding | |
def encode_local_image(image_file): | |
try: | |
im = Image.open(image_file) | |
buffered = BytesIO() | |
im.save(buffered, format="PNG") | |
image_bytes = buffered.getvalue() | |
image_base64 = base64.b64encode(image_bytes).decode('ascii') | |
return image_base64 | |
except Exception as e: | |
print(f"Error encoding image: {e}") | |
return None | |
# Image description function, calling external inference model | |
def inference_calling_idefics(image_path, question=""): | |
system_prompt = os.getenv('USER_PROMPT') | |
model_id = model2 | |
client = InferenceClient(model=model_id) | |
# Use the fixed `encode_local_image` to encode the image | |
image_base64 = encode_local_image(image_path) | |
if not image_base64: | |
return "Error: Invalid image or unable to encode image." | |
image_info = f"data:image/png;base64,{image_base64}" | |
prompt = question if question != "" else 'Describe this image without question mark' | |
try: | |
response = "" | |
for message in client.chat_completion( | |
model=image_model, | |
messages=[ | |
{ | |
"role": "system", | |
"content": [ | |
{"type": "text", "text": system_prompt}, | |
], | |
}, | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "image_url", "image_url": {"url": image_info}}, | |
{"type": "text", "text": prompt}, | |
], | |
} | |
], | |
max_tokens=2048, | |
stream=True, | |
): | |
response += message.choices[0].delta.content | |
return response | |
except Exception as e: | |
print(f"Error in inference call: {e}") | |
return "Error while processing the image." | |
def describe_image(client, image_path, question=""): | |
try: | |
answer = inference_calling_idefics(image_path, question) | |
return answer | |
except Exception as e: | |
print(e) | |
return "Error while seeing the image." | |
# Telegram bot setup | |
client = TelegramClient('bot', api_id, api_hash).start(bot_token=bot_token) | |
async def get_bot_id(): | |
me = await client.get_me() | |
return me.id | |
# OpenAI completion handler | |
async def get_completion(client, event, user_id, prompt): | |
history = get_chat_history(user_id) | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
*history, | |
{"role": "user", "content": prompt}, | |
] | |
try: | |
completion = openai_client.chat.completions.create( | |
model=model, | |
messages=messages, | |
max_tokens=512, | |
temperature=0.7, | |
top_p=1.0, | |
frequency_penalty=1.0, | |
presence_penalty=1.0, | |
stream=True | |
) | |
message = "" | |
for chunk in completion: | |
if chunk.choices[0].delta.content is not None: | |
message += chunk.choices[0].delta.content | |
await event.respond(message) | |
except Exception as e: | |
message = "..........." | |
await event.respond(message) | |
print(e) | |
update_chat_history(user_id, "user", prompt) | |
update_chat_history(user_id, "assistant", message) | |
return message | |
# Telegram bot commands | |
async def start(event): | |
await event.respond("Hello!") | |
async def help(event): | |
await event.respond("Here is how I can help you:\n/start - To check if I am alive\n/help - Show this message\n/reset - Reset chat history") | |
async def reset(event): | |
user_id = event.chat_id | |
chat_collection.delete_one({"user_id": user_id}) | |
if user_id in local_chat_history: | |
del local_chat_history[user_id] | |
await event.respond("History has been reset.") | |
async def handle_message(event): | |
bot_id = await get_bot_id() | |
if event.sender_id == bot_id: | |
return | |
user_id = event.chat_id | |
user_message = event.raw_text | |
if event.photo: | |
photo = await event.download_media() | |
if photo: | |
image_description = describe_image(client, photo, user_message) | |
user_message += f"\n\nI sent you an image. Content of the image: {image_description}" | |
if user_message.startswith(('/start', '/help', '/reset')): | |
return | |
try: | |
async with client.action(event.chat_id, 'typing'): | |
await asyncio.sleep(2) | |
await get_completion(client, event, user_id, user_message) | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
await event.respond("Whoopsie!") | |
# Gradio interface | |
def launch_gradio(): | |
welcome_message = """ | |
# Meet Scarlett! | |
- Scarlett is a girl residing in Telegram | |
- She can chat with you without any censorship. | |
- Head over to https://t.me/Scarlett_Rogue_isNotBot | |
- Thanks to https://glhf.chat for powering this! | |
""" | |
with gr.Blocks() as demo: | |
gr.Markdown(f""" | |
# Follow me: https://example.com | |
{welcome_message} | |
""") | |
demo.launch(show_api=False) | |
# Keep-alive functionality for the bot | |
def keep_alive(): | |
ping_client = OpenAI(api_key=ping_key, base_url=api_url) | |
while True: | |
try: | |
messages = [ | |
{"role": "system", "content": "Repeat what I say."}, | |
{"role": "user", "content": "Repeat: 'Ping success'"} | |
] | |
request = ping_client.chat.completions.create( | |
model=model, | |
messages=messages, | |
max_tokens=10, | |
temperature=0.1, | |
top_p=0.1, | |
) | |
print(request.choices[0].message.content) | |
except Exception as e: | |
print(f"Keep-alive request failed: {e}") | |
time.sleep(1800) | |
# Main execution | |
if __name__ == "__main__": | |
threading.Thread(target=keep_alive).start() | |
threading.Thread(target=launch_gradio).start() | |
client.run_until_disconnected() |