File size: 6,204 Bytes
39bdd55 8edba3e 39bdd55 8edba3e 8d5357f 8c18640 39bdd55 8edba3e 39bdd55 8edba3e 39bdd55 8edba3e 39bdd55 8edba3e 39bdd55 8edba3e 39bdd55 8edba3e 39bdd55 8edba3e 506b33a 8edba3e 506b33a 8edba3e 69c6882 4154489 8edba3e 39bdd55 4154489 39bdd55 4154489 39bdd55 4154489 39bdd55 4154489 39bdd55 506b33a 7ea5a34 4154489 8d5357f 4154489 506b33a 4154489 39bdd55 4154489 39bdd55 4154489 39bdd55 4154489 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
from io import BytesIO
import os
from flask import jsonify
import logging
import json
import re
logger = logging.getLogger(__name__)
ProxyPasswords = os.environ.get('ProxyPasswords').split(',')
def authenticate_request(request):
auth_header = request.headers.get('Authorization')
if not auth_header:
return False, jsonify({'error': 'Authorization header is missing'}), 401
try:
auth_type, api_key = auth_header.split(' ', 1)
except ValueError:
return False, jsonify({'error': 'Invalid Authorization header format'}), 401
if auth_type.lower() != 'bearer':
return False, jsonify({'error': 'Authorization type must be Bearer'}), 401
if api_key not in ProxyPasswords:
return False, jsonify({'error': 'Unauthorized'}), 401
return True, None, None
def sanitize_request_data(request_data):
"""
从请求数据中删除base64编码的数据。
Args:
request_data: 包含可能存在base64数据的字典。
Returns:
清理后的字典,其中base64数据被替换为"[Base64 Data Omitted]"。
"""
def replace_base64(match):
# 替换base64数据为提示信息
return '"[Base64 Data Omitted]"'
request_data_str = json.dumps(request_data)
# 使用正则表达式匹配base64数据,并替换为提示信息
sanitized_request_data_str = re.sub(
r'"(data:[^;]+;base64,)[^"]+"',
replace_base64,
request_data_str
)
return json.loads(sanitized_request_data_str)
def process_messages_for_gemini(messages):
"""
将通用的对话消息格式转换为Gemini API所需的格式
这个函数处理消息列表并将其转换为Gemini API兼容的格式。它支持文本、图片和文件内容的处理。
参数:
messages (list): 包含对话消息的列表。每条消息应该是一个字典,包含'role'和'content'字段。
- role: 可以是 'system', 'user' 或 'assistant'
- content: 可以是字符串或包含多个内容项的列表
返回:
tuple: 包含三个元素:
- gemini_history (list): 转换后的历史消息列表
- user_message (dict): 最新的用户消息
- error (tuple or None): 如果有错误,返回错误响应;否则返回None
错误处理:
- 检查角色是否有效
- 验证图片URL格式
- 验证文件URL格式
示例消息格式:
文本消息:
{
'role': 'user',
'content': '你好'
}
多模态消息:
{
'role': 'user',
'content': [
{'type': 'text', 'text': '这是什么图片?'},
{'type': 'image_url', 'image_url': {'url': 'data:image/jpeg;base64,...'}}
]
}
"""
gemini_history = []
errors = []
for message in messages:
role = message.get('role')
content = message.get('content')
if isinstance(content, str):
if role == 'system':
gemini_history.append({"role": "user", "parts": [content]})
elif role == 'user':
gemini_history.append({"role": "user", "parts": [content]})
elif role == 'assistant':
gemini_history.append({"role": "model", "parts": [content]})
else:
errors.append(f"Invalid role: {role}")
elif isinstance(content, list):
parts = []
for item in content:
if item.get('type') == 'text':
parts.append({"text": item.get('text')})
elif item.get('type') == 'image_url':
image_data = item.get('image_url', {}).get('url', '')
if image_data.startswith('data:image/'):
try:
mime_type, base64_data = image_data.split(';')[0].split(':')[1], image_data.split(',')[1]
parts.append({
"inline_data": {
"mime_type": mime_type,
"data": base64_data
}
})
except (IndexError, ValueError):
errors.append(f"Invalid data URI for image: {image_data}")
else:
errors.append(f"Invalid image URL format for item: {item}")
elif item.get('type') == 'file_url':
file_data = item.get('file_url', {}).get('url', '')
if file_data.startswith('data:'):
try:
mime_type, base64_data = file_data.split(';')[0].split(':')[1], file_data.split(',')[1]
parts.append({
"inline_data": {
"mime_type": mime_type,
"data": base64_data
}
})
except (IndexError, ValueError):
errors.append(f"Invalid data URI for file: {file_data}")
else:
errors.append(f"Invalid file URL format for item: {item}")
if parts:
if role in ['user', 'system']:
gemini_history.append({"role": "user", "parts": parts})
elif role == 'assistant':
gemini_history.append({"role": "model", "parts": parts})
else:
errors.append(f"Invalid role: {role}")
if gemini_history:
user_message = gemini_history[-1]
gemini_history = gemini_history[:-1]
else:
user_message = {"role": "user", "parts": [""]}
if errors:
return gemini_history, user_message, (jsonify({'error': errors}), 400)
else:
return gemini_history, user_message, None |