import random import time import json import re import aiohttp import requests import asyncio from flask import Flask, request, Response from flask_cors import CORS import uuid app = Flask(__name__) CORS(app) MAGAI_TOKEN = { "cookie": "_fbp=fb.1.1722349051350.28463402121267809;soquick-mobile_u1main=1722349236461x685414888067651300;intercom-id-jnjoad6e=cbbd8fc9-a010-4e8c-8e7e-9cffccd3abea;soquick-mobile_live_u2main.sig=HuQePfrEHGidu4eRyfiZkcL1_2E;__stripe_mid=7767e1a3-e87f-4456-b073-6c8b7ae9e82119b00d;__stripe_sid=99c612a5-a12a-426f-baa5-e61471a013f140c482;_ga=GA1.1.242967908.1722349051;_ga_GFQ25YSHT2=GS1.1.1726123356.1.0.1726123393.0.0.0;_ga_N5J29RVHDJ=GS1.1.1726123395.4.1.1726124637.0.0.0;intercom-device-id-jnjoad6e=35ee824e-f7f6-415d-8698-bd822cb46d3a;intercom-session-jnjoad6e=SXlhUDdqa0E5YTlmUVA2QXk3K3hBSXFNSnRzL2x0cEtvVDF3U1k3UlRCQmhPRVdUcktDYkpwVXpiZFA1SzhUSi0tTFlCazFUamcxbExFRU1LTVlWSitVQT09--40f0dca176cfb28add2903465c54bcee4f33de2b;soquick-mobile_live_u2main=bus|1722349236461x685414888067651300|1726123417637x655253536227564700", "app_last_change": "21388518093", "current_page_item": "1348695171700984260__LOOKUP__1726124636560x692535552825360400", "current_user": "1348695171700984260__LOOKUP__1722349236461x685414888067651300", } MAGAI_MAPPING = { "gpt-4o": "openai/gpt-4o", "claude-3.5-sonnet": "anthropic/claude-3.5-sonnet:beta", "claude-3-opus": "anthropic/claude-3-opus:beta", "gemini-1.5-pro": "google/gemini-pro-1.5" } UUID_LENGTH = 1e18 MODULO = 1e18 def generate_uuid(): return f"{int(time.time() * 1000)}x{str(round(random.random() * UUID_LENGTH)).zfill(18)}" def create_luid(separator="x"): timestamp = int(time.time() * 1000) return f"{timestamp}{separator}1" def format_model_name(model_name): return re.sub(r"_+", "_", re.sub(r"[/:-]", "_", model_name)) def find_token_in_object(obj): if isinstance(obj, dict): for key, value in obj.items(): if key == "token" and isinstance(value, str): return value token = find_token_in_object(value) if token: return token return None def get_last_user_content(messages): for message in reversed(messages): if message["role"] == "user": return message["content"] return None async def get_token(model, message): server_call_id = generate_uuid() created_id = MAGAI_TOKEN["current_page_item"].split("__")[0] user_id = MAGAI_TOKEN["current_user"].split("__")[2] model_id = "0060f9accd1dbade552f65ac646fb3da" item_id = "bUNih7" element_id = "bUNib7" body = { "app_last_change": MAGAI_TOKEN["app_last_change"], "calls": [ { "client_state": { "element_instances": { "bUNib7": { "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUNib7", "parent_element_id": "bUMiq3", }, "bTezP": { "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezP", "parent_element_id": "bTezJ", }, "bTezE": { "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezE", "parent_element_id": "bTeqc", }, "bTezJ": { "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezJ", "parent_element_id": "bUKFL2", }, "bTezQ": { "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezQ", "parent_element_id": "bUKFL2", }, "bUiru0": { "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUiru0", "parent_element_id": "bUjNK", }, "bUDVj0": { "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUDVj0", "parent_element_id": "bUMiq3", }, "bUXzm2": { "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUXzm2", "parent_element_id": "bUMhk3", }, "bUifI1": { "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUifI1", "parent_element_id": "bTeqg", }, "bUMiq3": { "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUMiq3", "parent_element_id": "bTezE", }, "bTekm": { "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTekm", "parent_element_id": None, }, }, "element_state": { f"{created_id}__LOOKUP__ElementInstance::bTezP": { "is_visible": True, "value_that_is_valid": message, "value": message, }, f"{created_id}__LOOKUP__ElementInstance::bTezE": { "custom.images_": None, "custom.file_": None, "custom.file_content_": None, "custom.file_name_": None, "custom.file_type_": None, }, f"{created_id}__LOOKUP__ElementInstance::bTezJ": { "custom.isrecording_": None, "custom.prompt_": None, }, f"{created_id}__LOOKUP__ElementInstance::bUiru0": { "AAE": message }, f"{created_id}__LOOKUP__ElementInstance::bUDVj0": { "AAE": message }, f"{created_id}__LOOKUP__ElementInstance::bUifI1": { "custom.is_visible_": None, "group_data": None, }, f"{created_id}__LOOKUP__ElementInstance::bUMiq3": { "group_data": None }, }, "other_data": { "Current Page Scroll Position": 0, "Current Page Width": 661, }, "cache": { f"{model_id}": format_model_name(model), "true": True, "CurrentPageItem": MAGAI_TOKEN["current_page_item"], "CurrentUser": MAGAI_TOKEN["current_user"], }, "exists": { f"{model_id}": True, "true": True, "CurrentPageItem": True, "CurrentUser": True, }, }, "run_id": generate_uuid(), "server_call_id": server_call_id, "item_id": item_id, "element_id": element_id, "page_id": "bTekm", "uid_generator": { "timestamp": int(time.time() * 1000), "seed": round(random.random() * UUID_LENGTH) % MODULO, }, "random_seed": random.random(), "current_date_time": int(time.time() * 1000), "current_wf_params": {}, } ], "client_breaking_revision": 5, "timezone_offset": -480, "timezone_string": "Asia/Shanghai", "user_id": user_id, "wait_for": [], } url = "https://app.magai.co/workflow/start" async with aiohttp.ClientSession() as session: async with session.post( url, headers={ "x-bubble-fiber-id": generate_uuid(), "x-bubble-pl": create_luid(), "accept": "application/json, text/javascript, */*; q=0.01", "cookie": MAGAI_TOKEN["cookie"], }, json=body, ) as response: response_data = await response.json() if "error_class" in response_data: raise Exception(response_data) server_call_data = response_data.get(server_call_id) if not server_call_data or "step_results" not in server_call_data: return None for step_result in server_call_data["step_results"].values(): if isinstance(step_result.get("return_value"), dict): token = find_token_in_object(step_result["return_value"]) if token: return token async def get_request_data(model, messages): if model not in MAGAI_MAPPING: return Response( json.dumps( { "error": { "message": "This model is currently unavailable. Please try again later or choose another model.", "code": "model_not_exists", } } ), status=400, mimetype="application/json", ) last_user_message = get_last_user_content(messages) token = await get_token(MAGAI_MAPPING[model], last_user_message) headers = { "Content-Type": "application/json", "HTTP-Referer": "https://magai.co", "Origin": "https://app.magai.co", "Pragma": "no-cache", "Referer": "https://app.magai.co/", "Token": token, "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/500.00 (KHTML, like Gecko) Chrome/100.0.0.0 Safari/500.00", } json_data = { "model": MAGAI_MAPPING[model], "messages": [{"role": "system", "content": "You are a helpful assistant."}] + messages, "tools": [ { "type": "function", "function": { "name": "get_actual_time_info", "description": "Returns actual information from web about prompt theme.", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "The query string based on users prompt to search information about.", } }, "required": ["query"], }, }, }, { "type": "function", "function": { "name": "generate_image", "description": "Returns generated image URL.", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "Prompt to image generation AI model, that describes what image to generate.", } }, "required": ["query"], }, }, }, ], "provider": {"data_collection": "deny"}, "tool_choice": "auto", "stream": True, } response = requests.post( "https://live.proxy.magai.co:4430/opr/api/v1/chat/completions", headers=headers, json=json_data, ) return response def format_response(response): content = "" for line in response.iter_lines(): if line: decoded_line = line.decode("utf-8") if decoded_line.startswith("data:"): try: data = json.loads(decoded_line[5:].strip()) if "choices" in data and len(data["choices"]) > 0: delta = data["choices"][0].get("delta", {}) if "content" in delta: content += delta["content"] except json.JSONDecodeError: pass return content @app.route("/hf/v1/chat/completions", methods=["POST"]) def chat_completions(): data = request.json messages = data.get("messages", []) model = data.get("model", "claude-3.5-sonnet") async def process_request(): response = await get_request_data(model, messages) return format_response(response) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(process_request()) event_stream_response = "" for part in result: part = part.replace("\n", "\\n") event_stream_response += f'data:{{"id":"{uuid.uuid4()}","object":"chat.completion.chunk","created":{int(time.time())},"model":"{model}","system_fingerprint":"fp_45ah8ld5a7","choices":[{{"index":0,"delta":{{"content":"{part}"}},"logprobs":null,"finish_reason":null}}]}}\n\n' event_stream_response += "data:[DONE]\n" return Response(event_stream_response, mimetype="text/event-stream") if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)