M2A / app.py
Niansuh's picture
Update app.py
378b624 verified
import random
import time
import json
import re
import aiohttp
import requests
import asyncio
from flask import Flask, request, Response
from flask_cors import CORS
import uuid
app = Flask(__name__)
CORS(app)
MAGAI_TOKEN = {
"cookie": "_fbp=fb.1.1731157547017.999175969945264246; _ga=GA1.1.1511565936.1731157547; soquick-mobile_live_u2main=bus|1731157558019x939242898939773400|1731157558072x230077743876915040; soquick-mobile_live_u2main.sig=O-AoJ9Vd_oxtGTWtsC31B_SsLlU; soquick-mobile_u1main=1731157558019x939242898939773400; _ga_N5J29RVHDJ=GS1.1.1731157547.1.1.1731157568.0.0.0",
"app_last_change": "21388518093",
"current_page_item": "1348695171700984260__LOOKUP__1726124636560x692535552825360400",
"current_user": "1348695171700984260__LOOKUP__1722349236461x685414888067651300",
}
MAGAI_MAPPING = {
"gpt-4o": "openai/gpt-4o",
"claude-3.5-sonnet": "anthropic/claude-3.5-sonnet:beta",
"claude-3-opus": "anthropic/claude-3-opus:beta",
"gemini-1.5-pro": "google/gemini-pro-1.5"
}
UUID_LENGTH = 1e18
MODULO = 1e18
def generate_uuid():
return f"{int(time.time() * 1000)}x{str(round(random.random() * UUID_LENGTH)).zfill(18)}"
def create_luid(separator="x"):
timestamp = int(time.time() * 1000)
return f"{timestamp}{separator}1"
def format_model_name(model_name):
return re.sub(r"_+", "_", re.sub(r"[/:-]", "_", model_name))
def find_token_in_object(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if key == "token" and isinstance(value, str):
return value
token = find_token_in_object(value)
if token:
return token
return None
def get_last_user_content(messages):
for message in reversed(messages):
if message["role"] == "user":
return message["content"]
return None
async def get_token(model, message):
server_call_id = generate_uuid()
created_id = MAGAI_TOKEN["current_page_item"].split("__")[0]
user_id = MAGAI_TOKEN["current_user"].split("__")[2]
model_id = "0060f9accd1dbade552f65ac646fb3da"
item_id = "bUNih7"
element_id = "bUNib7"
body = {
"app_last_change": MAGAI_TOKEN["app_last_change"],
"calls": [
{
"client_state": {
"element_instances": {
"bUNib7": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUNib7",
"parent_element_id": "bUMiq3",
},
"bTezP": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezP",
"parent_element_id": "bTezJ",
},
"bTezE": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezE",
"parent_element_id": "bTeqc",
},
"bTezJ": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezJ",
"parent_element_id": "bUKFL2",
},
"bTezQ": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezQ",
"parent_element_id": "bUKFL2",
},
"bUiru0": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUiru0",
"parent_element_id": "bUjNK",
},
"bUDVj0": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUDVj0",
"parent_element_id": "bUMiq3",
},
"bUXzm2": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUXzm2",
"parent_element_id": "bUMhk3",
},
"bUifI1": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUifI1",
"parent_element_id": "bTeqg",
},
"bUMiq3": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUMiq3",
"parent_element_id": "bTezE",
},
"bTekm": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTekm",
"parent_element_id": None,
},
},
"element_state": {
f"{created_id}__LOOKUP__ElementInstance::bTezP": {
"is_visible": True,
"value_that_is_valid": message,
"value": message,
},
f"{created_id}__LOOKUP__ElementInstance::bTezE": {
"custom.images_": None,
"custom.file_": None,
"custom.file_content_": None,
"custom.file_name_": None,
"custom.file_type_": None,
},
f"{created_id}__LOOKUP__ElementInstance::bTezJ": {
"custom.isrecording_": None,
"custom.prompt_": None,
},
f"{created_id}__LOOKUP__ElementInstance::bUiru0": {
"AAE": message
},
f"{created_id}__LOOKUP__ElementInstance::bUDVj0": {
"AAE": message
},
f"{created_id}__LOOKUP__ElementInstance::bUifI1": {
"custom.is_visible_": None,
"group_data": None,
},
f"{created_id}__LOOKUP__ElementInstance::bUMiq3": {
"group_data": None
},
},
"other_data": {
"Current Page Scroll Position": 0,
"Current Page Width": 661,
},
"cache": {
f"{model_id}": format_model_name(model),
"true": True,
"CurrentPageItem": MAGAI_TOKEN["current_page_item"],
"CurrentUser": MAGAI_TOKEN["current_user"],
},
"exists": {
f"{model_id}": True,
"true": True,
"CurrentPageItem": True,
"CurrentUser": True,
},
},
"run_id": generate_uuid(),
"server_call_id": server_call_id,
"item_id": item_id,
"element_id": element_id,
"page_id": "bTekm",
"uid_generator": {
"timestamp": int(time.time() * 1000),
"seed": round(random.random() * UUID_LENGTH) % MODULO,
},
"random_seed": random.random(),
"current_date_time": int(time.time() * 1000),
"current_wf_params": {},
}
],
"client_breaking_revision": 5,
"timezone_offset": -480,
"timezone_string": "Asia/Shanghai",
"user_id": user_id,
"wait_for": [],
}
url = "https://app.magai.co/workflow/start"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers={
"x-bubble-fiber-id": generate_uuid(),
"x-bubble-pl": create_luid(),
"accept": "application/json, text/javascript, */*; q=0.01",
"cookie": MAGAI_TOKEN["cookie"],
},
json=body,
) as response:
response_data = await response.json()
if "error_class" in response_data:
raise Exception(response_data)
server_call_data = response_data.get(server_call_id)
if not server_call_data or "step_results" not in server_call_data:
return None
for step_result in server_call_data["step_results"].values():
if isinstance(step_result.get("return_value"), dict):
token = find_token_in_object(step_result["return_value"])
if token:
return token
async def get_request_data(model, messages):
if model not in MAGAI_MAPPING:
return Response(
json.dumps(
{
"error": {
"message": "This model is currently unavailable. Please try again later or choose another model.",
"code": "model_not_exists",
}
}
),
status=400,
mimetype="application/json",
)
last_user_message = get_last_user_content(messages)
token = await get_token(MAGAI_MAPPING[model], last_user_message)
headers = {
"Content-Type": "application/json",
"HTTP-Referer": "https://magai.co",
"Origin": "https://app.magai.co",
"Pragma": "no-cache",
"Referer": "https://app.magai.co/",
"Token": token,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/500.00 (KHTML, like Gecko) Chrome/100.0.0.0 Safari/500.00",
}
json_data = {
"model": MAGAI_MAPPING[model],
"messages": [{"role": "system", "content": "You are a helpful assistant."}]
+ messages,
"tools": [
{
"type": "function",
"function": {
"name": "get_actual_time_info",
"description": "Returns actual information from web about prompt theme.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query string based on users prompt to search information about.",
}
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "generate_image",
"description": "Returns generated image URL.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Prompt to image generation AI model, that describes what image to generate.",
}
},
"required": ["query"],
},
},
},
],
"provider": {"data_collection": "deny"},
"tool_choice": "auto",
"stream": True,
}
response = requests.post(
"https://live.proxy.magai.co:4430/opr/api/v1/chat/completions",
headers=headers,
json=json_data,
)
return response
def format_response(response):
content = ""
for line in response.iter_lines():
if line:
decoded_line = line.decode("utf-8")
if decoded_line.startswith("data:"):
try:
data = json.loads(decoded_line[5:].strip())
if "choices" in data and len(data["choices"]) > 0:
delta = data["choices"][0].get("delta", {})
if "content" in delta:
content += delta["content"]
except json.JSONDecodeError:
pass
return content
@app.route("/hf/v1/chat/completions", methods=["POST"])
def chat_completions():
data = request.json
messages = data.get("messages", [])
model = data.get("model", "claude-3.5-sonnet")
async def process_request():
response = await get_request_data(model, messages)
return format_response(response)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(process_request())
event_stream_response = ""
for part in result:
part = part.replace("\n", "\\n")
event_stream_response += f'data:{{"id":"{uuid.uuid4()}","object":"chat.completion.chunk","created":{int(time.time())},"model":"{model}","system_fingerprint":"fp_45ah8ld5a7","choices":[{{"index":0,"delta":{{"content":"{part}"}},"logprobs":null,"finish_reason":null}}]}}\n\n'
event_stream_response += "data:[DONE]\n"
return Response(event_stream_response, mimetype="text/event-stream")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)