FLOW2API / request.py
xiaoh2018's picture
Upload 1108 files
33cfa2a verified
import os
import json
import re
import base64
import aiohttp # Async test. Need to install
import asyncio
# --- 配置区域 ---
BASE_URL = os.getenv('GEMINI_FLOW2API_URL', 'http://127.0.0.1:8000')
BACKEND_URL = BASE_URL + "/v1/chat/completions"
API_KEY = os.getenv('GEMINI_FLOW2API_APIKEY', 'Bearer han1234')
if API_KEY is None:
raise ValueError('[gemini flow2api] api key not set')
MODEL_LANDSCAPE = "gemini-3.0-pro-image-landscape"
MODEL_PORTRAIT = "gemini-3.0-pro-image-portrait"
# 修改: 增加 model 参数,默认为 None
async def request_backend_generation(
prompt: str,
images: list[bytes] = None,
model: str = None) -> bytes | None:
"""
请求后端生成图片。
:param prompt: 提示词
:param images: 图片二进制列表
:param model: 指定模型名称 (可选)
:return: 成功返回图片bytes,失败返回None
"""
# 更新token
images = images or []
# 逻辑: 如果未指定 model,默认使用 Landscape
use_model = model if model else MODEL_LANDSCAPE
# 1. 构造 Payload
if images:
content_payload = [{"type": "text", "text": prompt}]
print(f"[Backend] 正在处理 {len(images)} 张图片输入...")
for img_bytes in images:
b64_str = base64.b64encode(img_bytes).decode('utf-8')
content_payload.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{b64_str}"}
})
else:
content_payload = prompt
payload = {
"model": use_model, # 使用选定的模型
"messages": [{"role": "user", "content": content_payload}],
"stream": True
}
headers = {
"Authorization": API_KEY,
"Content-Type": "application/json"
}
image_url = None
print(f"[Backend] Model: {use_model} | 发起请求: {prompt[:20]}...")
try:
async with aiohttp.ClientSession() as session:
async with session.post(BACKEND_URL, json=payload, headers=headers, timeout=120) as response:
if response.status != 200:
err_text = await response.text()
content = response.content
print(f"[Backend Error] Status {response.status}: {err_text} {content}")
raise Exception(f"API Error: {response.status}: {err_text}")
async for line in response.content:
line_str = line.decode('utf-8').strip()
if line_str.startswith('{"error'):
chunk = json.loads(data_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
msg = delta['reasoning_content']
if '401' in msg:
msg += '\nAccess Token 已失效,需重新配置。'
elif '400' in msg:
msg += '\n返回内容被拦截。'
raise Exception(msg)
if not line_str or not line_str.startswith('data: '):
continue
data_str = line_str[6:]
if data_str == '[DONE]':
break
try:
chunk = json.loads(data_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
# 打印思考过程
if "reasoning_content" in delta:
print(delta['reasoning_content'], end="", flush=True)
# 提取内容中的图片链接
if "content" in delta:
content_text = delta["content"]
img_match = re.search(r'!\[.*?\]\((.*?)\)', content_text)
if img_match:
image_url = img_match.group(1)
print(f"\n[Backend] 捕获图片链接: {image_url}")
except json.JSONDecodeError:
continue
# 3. 下载生成的图片
if image_url:
async with session.get(image_url) as img_resp:
if img_resp.status == 200:
image_bytes = await img_resp.read()
return image_bytes
else:
print(f"[Backend Error] 图片下载失败: {img_resp.status}")
except Exception as e:
print(f"[Backend Exception] {e}")
raise e
return None
if __name__ == '__main__':
async def main():
print("=== AI 绘图接口测试 ===")
user_prompt = input("请输入提示词 (例如 '一只猫'): ").strip()
if not user_prompt:
user_prompt = "A cute cat in the garden"
print(f"正在请求: {user_prompt}")
# 这里的 images 传空列表用于测试文生图
# 如果想测试图生图,你需要手动读取本地文件:
# with open("output_test.jpg", "rb") as f: img_data = f.read()
# result = await request_backend_generation(user_prompt, [img_data])
result = await request_backend_generation(user_prompt)
if result:
filename = "output_test.jpg"
with open(filename, "wb") as f:
f.write(result)
print(f"\n[Success] 图片已保存为 {filename},大小: {len(result)} bytes")
else:
print("\n[Failed] 生成失败")
# 运行测试
if os.name == 'nt': # Windows 兼容性
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())