File size: 6,204 Bytes
39bdd55
8edba3e
39bdd55
 
 
 
 
 
8edba3e
8d5357f
8c18640
39bdd55
 
 
8edba3e
 
39bdd55
8edba3e
39bdd55
8edba3e
39bdd55
 
8edba3e
39bdd55
8edba3e
 
39bdd55
 
 
8edba3e
 
 
39bdd55
8edba3e
 
 
 
 
 
 
 
 
 
 
 
 
 
506b33a
8edba3e
 
 
 
 
506b33a
8edba3e
 
69c6882
4154489
8edba3e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39bdd55
4154489
39bdd55
 
 
 
4154489
39bdd55
 
 
 
 
 
4154489
 
 
39bdd55
 
 
4154489
39bdd55
 
 
506b33a
7ea5a34
4154489
 
 
 
 
 
 
 
 
8d5357f
4154489
 
 
 
506b33a
4154489
 
 
 
 
 
 
 
 
 
 
 
39bdd55
4154489
 
 
 
 
 
 
39bdd55
 
 
4154489
39bdd55
 
 
4154489
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from io import BytesIO
import os
from flask import jsonify
import logging
import json
import re
logger = logging.getLogger(__name__)

ProxyPasswords = os.environ.get('ProxyPasswords').split(',')

def authenticate_request(request):
    auth_header = request.headers.get('Authorization')

    if not auth_header:
        return False, jsonify({'error': 'Authorization header is missing'}), 401
    
    try:
        auth_type, api_key = auth_header.split(' ', 1)
    except ValueError:
        return False, jsonify({'error': 'Invalid Authorization header format'}), 401

    if auth_type.lower() != 'bearer':
        return False, jsonify({'error': 'Authorization type must be Bearer'}), 401

    if api_key not in ProxyPasswords:
        return False, jsonify({'error': 'Unauthorized'}), 401

    return True, None, None

def sanitize_request_data(request_data):
    """

    从请求数据中删除base64编码的数据。



    Args:

        request_data: 包含可能存在base64数据的字典。



    Returns:

        清理后的字典,其中base64数据被替换为"[Base64 Data Omitted]"。

    """
    
    
    def replace_base64(match):
        # 替换base64数据为提示信息
        return '"[Base64 Data Omitted]"'

    
    request_data_str = json.dumps(request_data)

    # 使用正则表达式匹配base64数据,并替换为提示信息
    sanitized_request_data_str = re.sub(
        r'"(data:[^;]+;base64,)[^"]+"',
        replace_base64,
        request_data_str
    )

    return json.loads(sanitized_request_data_str)

def process_messages_for_gemini(messages):
    """

    将通用的对话消息格式转换为Gemini API所需的格式

    这个函数处理消息列表并将其转换为Gemini API兼容的格式。它支持文本、图片和文件内容的处理。

    参数:

        messages (list): 包含对话消息的列表。每条消息应该是一个字典,包含'role'和'content'字段。

            - role: 可以是 'system', 'user' 或 'assistant'

            - content: 可以是字符串或包含多个内容项的列表

    返回:

        tuple: 包含三个元素:

            - gemini_history (list): 转换后的历史消息列表

            - user_message (dict): 最新的用户消息

            - error (tuple or None): 如果有错误,返回错误响应;否则返回None

    错误处理:

        - 检查角色是否有效

        - 验证图片URL格式

        - 验证文件URL格式

    示例消息格式:

        文本消息:

        {

            'role': 'user',

            'content': '你好'

        }

        多模态消息:

        {

            'role': 'user',

            'content': [

                {'type': 'text', 'text': '这是什么图片?'},

                {'type': 'image_url', 'image_url': {'url': 'data:image/jpeg;base64,...'}}

            ]

        }

    """
    gemini_history = []
    errors = []
    for message in messages:
        role = message.get('role')
        content = message.get('content')

        if isinstance(content, str):
            if role == 'system':
                gemini_history.append({"role": "user", "parts": [content]})
            elif role == 'user':
                gemini_history.append({"role": "user", "parts": [content]})
            elif role == 'assistant':
                gemini_history.append({"role": "model", "parts": [content]})
            else:
                errors.append(f"Invalid role: {role}")
        elif isinstance(content, list):
            parts = []
            for item in content:
                if item.get('type') == 'text':
                    parts.append({"text": item.get('text')})  
                elif item.get('type') == 'image_url':
                    image_data = item.get('image_url', {}).get('url', '')
                    if image_data.startswith('data:image/'):

                        try:
                            mime_type, base64_data = image_data.split(';')[0].split(':')[1], image_data.split(',')[1]
                            parts.append({
                                "inline_data": {
                                    "mime_type": mime_type,
                                    "data": base64_data
                                }
                            })
                        except (IndexError, ValueError):
                            errors.append(f"Invalid data URI for image: {image_data}")
                    else:
                        errors.append(f"Invalid image URL format for item: {item}")
                elif item.get('type') == 'file_url':
                    file_data = item.get('file_url', {}).get('url', '')
                    if file_data.startswith('data:'):

                        try:
                            mime_type, base64_data = file_data.split(';')[0].split(':')[1], file_data.split(',')[1]
                            parts.append({
                                "inline_data": {
                                    "mime_type": mime_type,
                                    "data": base64_data
                                }
                            })
                        except (IndexError, ValueError):
                            errors.append(f"Invalid data URI for file: {file_data}")
                    else:
                        errors.append(f"Invalid file URL format for item: {item}")

            if parts: 
                if role in ['user', 'system']:
                    gemini_history.append({"role": "user", "parts": parts})
                elif role == 'assistant':
                    gemini_history.append({"role": "model", "parts": parts})
                else:
                    errors.append(f"Invalid role: {role}")

    if gemini_history:
        user_message = gemini_history[-1]
        gemini_history = gemini_history[:-1]
    else:
        user_message = {"role": "user", "parts": [""]}

    if errors:
        return gemini_history, user_message, (jsonify({'error': errors}), 400)
    else:
        return gemini_history, user_message, None