import os import time import threading import base64 from io import BytesIO from collections import OrderedDict from datetime import datetime import requests from openai import OpenAI from telethon import TelegramClient, events from PIL import Image from huggingface_hub import InferenceClient from transformers import AutoProcessor, AutoTokenizer import pymongo from pymongo import MongoClient # Load system prompt from a file def load_system_prompt(): with open('prompt.txt', 'r') as file: return file.read() system_prompt = load_system_prompt() # Environment variables api_id = os.getenv('api_id') api_hash = os.getenv('api_hash') bot_token = os.getenv('bot_token') openai_api_key = os.getenv('glhf') ping_key = os.getenv('bolo') api_url = os.getenv('yolo') model = os.getenv('model') model1 = os.getenv('model1') model2 = os.getenv('model2') mongoURI = os.getenv('MONGO_URI') # Initialize OpenAI and MongoDB clients openai_client = OpenAI(api_key=openai_api_key, base_url=api_url) mongo_client = MongoClient(mongoURI) db = mongo_client['Scarlett'] chat_collection = db['chats'] # Initialize Hugging Face models for image processing idefics_processor = AutoProcessor.from_pretrained(model1) idefics_client = InferenceClient(model2) tokenizer = AutoTokenizer.from_pretrained(model1) # Local cache for up to 5 users local_chat_history = OrderedDict() MAX_LOCAL_USERS = 5 # Retrieve chat history from MongoDB def get_history_from_mongo(user_id): result = chat_collection.find_one({"user_id": user_id}) return result.get("messages", []) if result else [] # Store message in MongoDB (limit to last 99 messages) def store_message_in_mongo(user_id, role, content): chat_collection.update_one( {"user_id": user_id}, { "$push": { "messages": { "$each": [{"role": role, "content": content}], "$slice": -99 } } }, upsert=True ) # Get chat history from local cache or MongoDB def get_chat_history(user_id): if user_id in local_chat_history: local_chat_history.move_to_end(user_id) # Mark as most recently used return local_chat_history[user_id] # Load from MongoDB if not in local cache history = get_history_from_mongo(user_id) local_chat_history[user_id] = history if len(local_chat_history) > MAX_LOCAL_USERS: local_chat_history.popitem(last=False) # Remove LRU user return history # Update chat history (both local and MongoDB) def update_chat_history(user_id, role, content): if user_id not in local_chat_history: local_chat_history[user_id] = get_history_from_mongo(user_id) local_chat_history[user_id].append({"role": role, "content": content}) local_chat_history.move_to_end(user_id) if len(local_chat_history) > MAX_LOCAL_USERS: local_chat_history.popitem(last=False) store_message_in_mongo(user_id, role, content) # Encode image to base64 def encode_local_image(image): pil_image = Image.open(image) buffer = BytesIO() pil_image.save(buffer, format="JPEG") return f"data:image/jpeg;base64,{base64.b64encode(buffer.getvalue()).decode('utf-8')}" # Describe image using the model with error handling def describe_image(image_path, query=''): image_string = encode_local_image(image_path) messages = [ {"role": "user", "content": [{"type": "image"}, {"type": "text", "text": f"{os.getenv('USER_PROMPT')}\n{query}" }]} ] prompt_with_template = idefics_processor.apply_chat_template( messages, add_generation_prompt=True, chat_template=os.getenv('CHAT_TEMPLATE') ) prompt_with_images = prompt_with_template.replace("", f"![]({image_string})") payload = { "inputs": prompt_with_images, "parameters": {"return_full_text": False, "max_new_tokens": 2048}, } try: response = idefics_client.post(json=payload) # Check if the response is empty or not valid JSON if response.status_code != 200 or not response.text: raise ValueError(f"Invalid response: {response.status_code}, {response.text}") return response.text except Exception as e: print(f"Error during image description: {e}") return "Unable to describe the image due to an error." # Telegram bot client client = TelegramClient('bot', api_id, api_hash).start(bot_token=bot_token) # Get bot user ID for message filtering async def get_bot_id(): me = await client.get_me() return me.id # Async function to get OpenAI completion async def get_completion(event, user_id, prompt): async with client.action(event.chat_id, 'typing'): history = get_chat_history(user_id) messages = [ {"role": "system", "content": system_prompt}, *history, {"role": "user", "content": prompt}, ] try: response = openai_client.chat.completions.create( model=model, messages=messages, max_tokens=512, temperature=0.6, top_p=1.0, frequency_penalty=0.9, presence_penalty=0.9, ) message = response.choices[0].message.content except Exception as e: message = f"Whoops!" print(e) update_chat_history(user_id, "user", prompt) # Update history update_chat_history(user_id, "assistant", message) # Update assistant's response return message # Telegram bot events @client.on(events.NewMessage(pattern='/start')) async def start(event): await event.respond("Hello!") @client.on(events.NewMessage(pattern='/help')) async def help(event): await event.respond("Here is how I can help you:\n/start - To check if I am alive\n/help - Show this message\n/reset - Reset chat history") @client.on(events.NewMessage(pattern='/reset')) async def reset(event): user_id = event.chat_id chat_collection.delete_one({"user_id": user_id}) # Reset MongoDB chat history for the user if user_id in local_chat_history: del local_chat_history[user_id] # Remove from local cache if present await event.respond("History has been reset.") @client.on(events.NewMessage) async def handle_message(event): bot_id = await get_bot_id() # Get bot ID to avoid responding to itself try: user_id = event.chat_id # Use chat_id to distinguish between users # Ignore messages from the bot itself if event.sender_id == bot_id: return user_message = event.raw_text if event.photo: # If an image is sent, describe the image photo = await event.download_media() image_description = describe_image(photo, user_message) user_message += f"\n\nContent of the image: {image_description}" # Ignore command messages to prevent double processing if user_message.startswith('/start') or user_message.startswith('/help') or user_message.startswith('/reset'): return response = await get_completion(event, user_id, user_message) await event.respond(response) except Exception as e: print(f"An error occurred: {e}") await event.respond("Whoopsie!") # Keep-alive function to keep the bot running def keep_alive(): ping_client = OpenAI(api_key=ping_key, base_url=api_url) while True: try: messages = [ {"role": "system", "content": "Be a helpful assistant."}, {"role": "user", "content": "Hello"} ] request = ping_client.chat.completions.create( model=model, messages=messages, max_tokens=10, temperature=0.1, top_p=0.1, ) print(request) except Exception as e: print(f"Keep-alive request failed: {e}") time.sleep(1800) # Ping every 30 minutes if __name__ == "__main__": threading.Thread(target=keep_alive).start() client.run_until_disconnected()