|
import random |
|
import time |
|
import json |
|
import re |
|
import aiohttp |
|
import requests |
|
import asyncio |
|
from flask import Flask, request, Response |
|
from flask_cors import CORS |
|
import uuid |
|
|
|
app = Flask(__name__) |
|
CORS(app) |
|
|
|
MAGAI_TOKEN = { |
|
"cookie": "_fbp=fb.1.1731157547017.999175969945264246; _ga=GA1.1.1511565936.1731157547; soquick-mobile_live_u2main=bus|1731157558019x939242898939773400|1731157558072x230077743876915040; soquick-mobile_live_u2main.sig=O-AoJ9Vd_oxtGTWtsC31B_SsLlU; soquick-mobile_u1main=1731157558019x939242898939773400; _ga_N5J29RVHDJ=GS1.1.1731157547.1.1.1731157568.0.0.0", |
|
"app_last_change": "21388518093", |
|
"current_page_item": "1348695171700984260__LOOKUP__1726124636560x692535552825360400", |
|
"current_user": "1348695171700984260__LOOKUP__1722349236461x685414888067651300", |
|
} |
|
|
|
MAGAI_MAPPING = { |
|
"gpt-4o": "openai/gpt-4o", |
|
"claude-3.5-sonnet": "anthropic/claude-3.5-sonnet:beta", |
|
"claude-3-opus": "anthropic/claude-3-opus:beta", |
|
"gemini-1.5-pro": "google/gemini-pro-1.5" |
|
} |
|
|
|
UUID_LENGTH = 1e18 |
|
MODULO = 1e18 |
|
|
|
def generate_uuid(): |
|
return f"{int(time.time() * 1000)}x{str(round(random.random() * UUID_LENGTH)).zfill(18)}" |
|
|
|
def create_luid(separator="x"): |
|
timestamp = int(time.time() * 1000) |
|
return f"{timestamp}{separator}1" |
|
|
|
def format_model_name(model_name): |
|
return re.sub(r"_+", "_", re.sub(r"[/:-]", "_", model_name)) |
|
|
|
|
|
def find_token_in_object(obj): |
|
if isinstance(obj, dict): |
|
for key, value in obj.items(): |
|
if key == "token" and isinstance(value, str): |
|
return value |
|
token = find_token_in_object(value) |
|
if token: |
|
return token |
|
return None |
|
|
|
|
|
def get_last_user_content(messages): |
|
for message in reversed(messages): |
|
if message["role"] == "user": |
|
return message["content"] |
|
return None |
|
|
|
|
|
async def get_token(model, message): |
|
server_call_id = generate_uuid() |
|
created_id = MAGAI_TOKEN["current_page_item"].split("__")[0] |
|
user_id = MAGAI_TOKEN["current_user"].split("__")[2] |
|
model_id = "0060f9accd1dbade552f65ac646fb3da" |
|
item_id = "bUNih7" |
|
element_id = "bUNib7" |
|
|
|
body = { |
|
"app_last_change": MAGAI_TOKEN["app_last_change"], |
|
"calls": [ |
|
{ |
|
"client_state": { |
|
"element_instances": { |
|
"bUNib7": { |
|
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUNib7", |
|
"parent_element_id": "bUMiq3", |
|
}, |
|
"bTezP": { |
|
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezP", |
|
"parent_element_id": "bTezJ", |
|
}, |
|
"bTezE": { |
|
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezE", |
|
"parent_element_id": "bTeqc", |
|
}, |
|
"bTezJ": { |
|
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezJ", |
|
"parent_element_id": "bUKFL2", |
|
}, |
|
"bTezQ": { |
|
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezQ", |
|
"parent_element_id": "bUKFL2", |
|
}, |
|
"bUiru0": { |
|
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUiru0", |
|
"parent_element_id": "bUjNK", |
|
}, |
|
"bUDVj0": { |
|
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUDVj0", |
|
"parent_element_id": "bUMiq3", |
|
}, |
|
"bUXzm2": { |
|
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUXzm2", |
|
"parent_element_id": "bUMhk3", |
|
}, |
|
"bUifI1": { |
|
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUifI1", |
|
"parent_element_id": "bTeqg", |
|
}, |
|
"bUMiq3": { |
|
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUMiq3", |
|
"parent_element_id": "bTezE", |
|
}, |
|
"bTekm": { |
|
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTekm", |
|
"parent_element_id": None, |
|
}, |
|
}, |
|
"element_state": { |
|
f"{created_id}__LOOKUP__ElementInstance::bTezP": { |
|
"is_visible": True, |
|
"value_that_is_valid": message, |
|
"value": message, |
|
}, |
|
f"{created_id}__LOOKUP__ElementInstance::bTezE": { |
|
"custom.images_": None, |
|
"custom.file_": None, |
|
"custom.file_content_": None, |
|
"custom.file_name_": None, |
|
"custom.file_type_": None, |
|
}, |
|
f"{created_id}__LOOKUP__ElementInstance::bTezJ": { |
|
"custom.isrecording_": None, |
|
"custom.prompt_": None, |
|
}, |
|
f"{created_id}__LOOKUP__ElementInstance::bUiru0": { |
|
"AAE": message |
|
}, |
|
f"{created_id}__LOOKUP__ElementInstance::bUDVj0": { |
|
"AAE": message |
|
}, |
|
f"{created_id}__LOOKUP__ElementInstance::bUifI1": { |
|
"custom.is_visible_": None, |
|
"group_data": None, |
|
}, |
|
f"{created_id}__LOOKUP__ElementInstance::bUMiq3": { |
|
"group_data": None |
|
}, |
|
}, |
|
"other_data": { |
|
"Current Page Scroll Position": 0, |
|
"Current Page Width": 661, |
|
}, |
|
"cache": { |
|
f"{model_id}": format_model_name(model), |
|
"true": True, |
|
"CurrentPageItem": MAGAI_TOKEN["current_page_item"], |
|
"CurrentUser": MAGAI_TOKEN["current_user"], |
|
}, |
|
"exists": { |
|
f"{model_id}": True, |
|
"true": True, |
|
"CurrentPageItem": True, |
|
"CurrentUser": True, |
|
}, |
|
}, |
|
"run_id": generate_uuid(), |
|
"server_call_id": server_call_id, |
|
"item_id": item_id, |
|
"element_id": element_id, |
|
"page_id": "bTekm", |
|
"uid_generator": { |
|
"timestamp": int(time.time() * 1000), |
|
"seed": round(random.random() * UUID_LENGTH) % MODULO, |
|
}, |
|
"random_seed": random.random(), |
|
"current_date_time": int(time.time() * 1000), |
|
"current_wf_params": {}, |
|
} |
|
], |
|
"client_breaking_revision": 5, |
|
"timezone_offset": -480, |
|
"timezone_string": "Asia/Shanghai", |
|
"user_id": user_id, |
|
"wait_for": [], |
|
} |
|
|
|
url = "https://app.magai.co/workflow/start" |
|
async with aiohttp.ClientSession() as session: |
|
async with session.post( |
|
url, |
|
headers={ |
|
"x-bubble-fiber-id": generate_uuid(), |
|
"x-bubble-pl": create_luid(), |
|
"accept": "application/json, text/javascript, */*; q=0.01", |
|
"cookie": MAGAI_TOKEN["cookie"], |
|
}, |
|
json=body, |
|
) as response: |
|
response_data = await response.json() |
|
|
|
if "error_class" in response_data: |
|
raise Exception(response_data) |
|
|
|
server_call_data = response_data.get(server_call_id) |
|
if not server_call_data or "step_results" not in server_call_data: |
|
return None |
|
for step_result in server_call_data["step_results"].values(): |
|
if isinstance(step_result.get("return_value"), dict): |
|
token = find_token_in_object(step_result["return_value"]) |
|
if token: |
|
return token |
|
|
|
|
|
async def get_request_data(model, messages): |
|
if model not in MAGAI_MAPPING: |
|
return Response( |
|
json.dumps( |
|
{ |
|
"error": { |
|
"message": "This model is currently unavailable. Please try again later or choose another model.", |
|
"code": "model_not_exists", |
|
} |
|
} |
|
), |
|
status=400, |
|
mimetype="application/json", |
|
) |
|
|
|
last_user_message = get_last_user_content(messages) |
|
token = await get_token(MAGAI_MAPPING[model], last_user_message) |
|
headers = { |
|
"Content-Type": "application/json", |
|
"HTTP-Referer": "https://magai.co", |
|
"Origin": "https://app.magai.co", |
|
"Pragma": "no-cache", |
|
"Referer": "https://app.magai.co/", |
|
"Token": token, |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/500.00 (KHTML, like Gecko) Chrome/100.0.0.0 Safari/500.00", |
|
} |
|
|
|
json_data = { |
|
"model": MAGAI_MAPPING[model], |
|
"messages": [{"role": "system", "content": "You are a helpful assistant."}] |
|
+ messages, |
|
"tools": [ |
|
{ |
|
"type": "function", |
|
"function": { |
|
"name": "get_actual_time_info", |
|
"description": "Returns actual information from web about prompt theme.", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"query": { |
|
"type": "string", |
|
"description": "The query string based on users prompt to search information about.", |
|
} |
|
}, |
|
"required": ["query"], |
|
}, |
|
}, |
|
}, |
|
{ |
|
"type": "function", |
|
"function": { |
|
"name": "generate_image", |
|
"description": "Returns generated image URL.", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"query": { |
|
"type": "string", |
|
"description": "Prompt to image generation AI model, that describes what image to generate.", |
|
} |
|
}, |
|
"required": ["query"], |
|
}, |
|
}, |
|
}, |
|
], |
|
"provider": {"data_collection": "deny"}, |
|
"tool_choice": "auto", |
|
"stream": True, |
|
} |
|
|
|
response = requests.post( |
|
"https://live.proxy.magai.co:4430/opr/api/v1/chat/completions", |
|
headers=headers, |
|
json=json_data, |
|
) |
|
return response |
|
|
|
|
|
def format_response(response): |
|
content = "" |
|
for line in response.iter_lines(): |
|
if line: |
|
decoded_line = line.decode("utf-8") |
|
if decoded_line.startswith("data:"): |
|
try: |
|
data = json.loads(decoded_line[5:].strip()) |
|
if "choices" in data and len(data["choices"]) > 0: |
|
delta = data["choices"][0].get("delta", {}) |
|
if "content" in delta: |
|
content += delta["content"] |
|
except json.JSONDecodeError: |
|
pass |
|
return content |
|
|
|
|
|
@app.route("/hf/v1/chat/completions", methods=["POST"]) |
|
def chat_completions(): |
|
data = request.json |
|
messages = data.get("messages", []) |
|
model = data.get("model", "claude-3.5-sonnet") |
|
|
|
async def process_request(): |
|
response = await get_request_data(model, messages) |
|
return format_response(response) |
|
|
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
result = loop.run_until_complete(process_request()) |
|
|
|
event_stream_response = "" |
|
for part in result: |
|
part = part.replace("\n", "\\n") |
|
event_stream_response += f'data:{{"id":"{uuid.uuid4()}","object":"chat.completion.chunk","created":{int(time.time())},"model":"{model}","system_fingerprint":"fp_45ah8ld5a7","choices":[{{"index":0,"delta":{{"content":"{part}"}},"logprobs":null,"finish_reason":null}}]}}\n\n' |
|
event_stream_response += "data:[DONE]\n" |
|
|
|
return Response(event_stream_response, mimetype="text/event-stream") |
|
|
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0", port=7860) |