Spaces:
Paused
Paused
| import os | |
| import time | |
| import threading | |
| import base64 | |
| from io import BytesIO | |
| import gradio as gr | |
| import asyncio | |
| from collections import OrderedDict | |
| from datetime import datetime | |
| import requests | |
| from openai import OpenAI | |
| from telethon import TelegramClient, events | |
| from PIL import Image | |
| from huggingface_hub import InferenceClient | |
| import pymongo | |
| from pymongo import MongoClient | |
| def load_system_prompt(): | |
| with open('prompt.txt', 'r') as file: | |
| return file.read() | |
| system_prompt = load_system_prompt() | |
| # Environment variables | |
| api_id = os.getenv('api_id') | |
| api_hash = os.getenv('api_hash') | |
| bot_token = os.getenv('bot_token') | |
| openai_api_key = os.getenv('glhf') | |
| ping_key = os.getenv('bolo') | |
| api_url = os.getenv('yolo') | |
| model = os.getenv('model') | |
| model2 = os.getenv('model2') | |
| mongoURI = os.getenv('MONGO_URI') | |
| # OpenAI and MongoDB clients | |
| openai_client = OpenAI(api_key=openai_api_key, base_url=api_url) | |
| mongo_client = MongoClient(mongoURI) | |
| db = mongo_client['Scarlett'] | |
| chat_collection = db['chats'] | |
| local_chat_history = OrderedDict() | |
| MAX_LOCAL_USERS = 5 | |
| # Functions for MongoDB-based chat history storage and retrieval | |
| def get_history_from_mongo(user_id): | |
| result = chat_collection.find_one({"user_id": user_id}) | |
| return result.get("messages", []) if result else [] | |
| def store_message_in_mongo(user_id, role, content): | |
| chat_collection.update_one( | |
| {"user_id": user_id}, | |
| { | |
| "$push": { | |
| "messages": { | |
| "$each": [{"role": role, "content": content}], | |
| "$slice": -20 | |
| } | |
| } | |
| }, | |
| upsert=True | |
| ) | |
| def get_chat_history(user_id): | |
| if user_id in local_chat_history: | |
| local_chat_history.move_to_end(user_id) | |
| return local_chat_history[user_id] | |
| history = get_history_from_mongo(user_id) | |
| local_chat_history[user_id] = history | |
| if len(local_chat_history) > MAX_LOCAL_USERS: | |
| local_chat_history.popitem(last=False) | |
| return history | |
| def update_chat_history(user_id, role, content): | |
| if user_id not in local_chat_history: | |
| local_chat_history[user_id] = get_history_from_mongo(user_id) | |
| local_chat_history[user_id].append({"role": role, "content": content}) | |
| local_chat_history[user_id] = local_chat_history[user_id][-20:] | |
| local_chat_history.move_to_end(user_id) | |
| if len(local_chat_history) > MAX_LOCAL_USERS: | |
| local_chat_history.popitem(last=False) | |
| store_message_in_mongo(user_id, role, content) | |
| # Fixing image encoding | |
| def encode_local_image(image_file): | |
| try: | |
| im = Image.open(image_file) | |
| buffered = BytesIO() | |
| im.save(buffered, format="PNG") | |
| image_bytes = buffered.getvalue() | |
| image_base64 = base64.b64encode(image_bytes).decode('ascii') | |
| return image_base64 | |
| except Exception as e: | |
| print(f"Error encoding image: {e}") | |
| return None | |
| # Image description function, calling external inference model | |
| def inference_calling_idefics(image_path, question=""): | |
| system_prompt = os.getenv('USER_PROMPT') | |
| model_id = model2 | |
| client = InferenceClient(model=model_id) | |
| # Use the fixed `encode_local_image` to encode the image | |
| image_base64 = encode_local_image(image_path) | |
| if not image_base64: | |
| return "Error: Invalid image or unable to encode image." | |
| image_info = f"data:image/png;base64,{image_base64}" | |
| prompt = question if question != "" else 'Describe this image without question mark' | |
| try: | |
| response = "" | |
| for message in client.chat_completion( | |
| model=image_model, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": [ | |
| {"type": "text", "text": system_prompt}, | |
| ], | |
| }, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image_url", "image_url": {"url": image_info}}, | |
| {"type": "text", "text": prompt}, | |
| ], | |
| } | |
| ], | |
| max_tokens=2048, | |
| stream=True, | |
| ): | |
| response += message.choices[0].delta.content | |
| return response | |
| except Exception as e: | |
| print(f"Error in inference call: {e}") | |
| return "Error while processing the image." | |
| def describe_image(client, image_path, question=""): | |
| try: | |
| answer = inference_calling_idefics(image_path, question) | |
| return answer | |
| except Exception as e: | |
| print(e) | |
| return "Error while seeing the image." | |
| # Telegram bot setup | |
| client = TelegramClient('bot', api_id, api_hash).start(bot_token=bot_token) | |
| async def get_bot_id(): | |
| me = await client.get_me() | |
| return me.id | |
| # OpenAI completion handler | |
| async def get_completion(client, event, user_id, prompt): | |
| history = get_chat_history(user_id) | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| *history, | |
| {"role": "user", "content": prompt}, | |
| ] | |
| try: | |
| completion = openai_client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| max_tokens=512, | |
| temperature=0.7, | |
| top_p=1.0, | |
| frequency_penalty=1.0, | |
| presence_penalty=1.0, | |
| stream=True | |
| ) | |
| message = "" | |
| for chunk in completion: | |
| if chunk.choices[0].delta.content is not None: | |
| message += chunk.choices[0].delta.content | |
| await event.respond(message) | |
| except Exception as e: | |
| message = "..........." | |
| await event.respond(message) | |
| print(e) | |
| update_chat_history(user_id, "user", prompt) | |
| update_chat_history(user_id, "assistant", message) | |
| return message | |
| # Telegram bot commands | |
| async def start(event): | |
| await event.respond("Hello!") | |
| async def help(event): | |
| await event.respond("Here is how I can help you:\n/start - To check if I am alive\n/help - Show this message\n/reset - Reset chat history") | |
| async def reset(event): | |
| user_id = event.chat_id | |
| chat_collection.delete_one({"user_id": user_id}) | |
| if user_id in local_chat_history: | |
| del local_chat_history[user_id] | |
| await event.respond("History has been reset.") | |
| async def handle_message(event): | |
| bot_id = await get_bot_id() | |
| if event.sender_id == bot_id: | |
| return | |
| user_id = event.chat_id | |
| user_message = event.raw_text | |
| if event.photo: | |
| photo = await event.download_media() | |
| if photo: | |
| image_description = describe_image(client, photo, user_message) | |
| user_message += f"\n\nI sent you an image. Content of the image: {image_description}" | |
| if user_message.startswith(('/start', '/help', '/reset')): | |
| return | |
| try: | |
| async with client.action(event.chat_id, 'typing'): | |
| await asyncio.sleep(2) | |
| await get_completion(client, event, user_id, user_message) | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| await event.respond("Whoopsie!") | |
| # Gradio interface | |
| def launch_gradio(): | |
| welcome_message = """ | |
| # Meet Scarlett! | |
| - Scarlett is a girl residing in Telegram | |
| - She can chat with you without any censorship. | |
| - Head over to https://t.me/Scarlett_Rogue_isNotBot | |
| - Thanks to https://glhf.chat for powering this! | |
| """ | |
| with gr.Blocks() as demo: | |
| gr.Markdown(f""" | |
| # Follow me: https://example.com | |
| {welcome_message} | |
| """) | |
| demo.launch(show_api=False) | |
| # Keep-alive functionality for the bot | |
| def keep_alive(): | |
| ping_client = OpenAI(api_key=ping_key, base_url=api_url) | |
| while True: | |
| try: | |
| messages = [ | |
| {"role": "system", "content": "Repeat what I say."}, | |
| {"role": "user", "content": "Repeat: 'Ping success'"} | |
| ] | |
| request = ping_client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| max_tokens=10, | |
| temperature=0.1, | |
| top_p=0.1, | |
| ) | |
| print(request.choices[0].message.content) | |
| except Exception as e: | |
| print(f"Keep-alive request failed: {e}") | |
| time.sleep(1800) | |
| # Main execution | |
| if __name__ == "__main__": | |
| threading.Thread(target=keep_alive).start() | |
| threading.Thread(target=launch_gradio).start() | |
| client.run_until_disconnected() |