| import json |
| import uuid |
| import time |
| import tiktoken |
| from constants import CHAT_COMPLETION_CHUNK, CONTENT_TYPE_EVENT_STREAM |
| from flask import jsonify |
|
|
| def generate_system_fingerprint(): |
| """生成并返回唯一的系统指纹。""" |
| return f"fp_{uuid.uuid4().hex[:10]}" |
|
|
| def create_openai_chunk(content, model, finish_reason=None, usage=None): |
| """创建格式化的 OpenAI 响应块。""" |
| chunk = { |
| "id": f"chatcmpl-{uuid.uuid4()}", |
| "object": CHAT_COMPLETION_CHUNK, |
| "created": int(time.time()), |
| "model": model, |
| "system_fingerprint": generate_system_fingerprint(), |
| "choices": [ |
| { |
| "index": 0, |
| "delta": {"content": content} if content else {}, |
| "logprobs": None, |
| "finish_reason": finish_reason |
| } |
| ] |
| } |
| if usage is not None: |
| chunk["usage"] = usage |
| return chunk |
|
|
| def count_tokens(text, model="gpt-3.5-turbo-0301"): |
| """计算给定文本的令牌数量。""" |
| try: |
| return len(tiktoken.encoding_for_model(model).encode(text)) |
| except KeyError: |
| return len(tiktoken.get_encoding("cl100k_base").encode(text)) |
|
|
| def count_message_tokens(messages, model="gpt-3.5-turbo-0301"): |
| """计算消息列表中的总令牌数量。""" |
| return sum(count_tokens(str(message), model) for message in messages) |
|
|
| def stream_notdiamond_response(response, model): |
| """流式处理 notdiamond API 响应。""" |
| buffer = "" |
|
|
| for chunk in response.iter_content(1024): |
| if chunk: |
| buffer = chunk.decode('utf-8') |
| yield create_openai_chunk(buffer, model) |
| |
| yield create_openai_chunk('', model, 'stop') |
|
|
| def handle_non_stream_response(response, model, prompt_tokens): |
| """处理非流式 API 响应并构建最终 JSON。""" |
| full_content = "" |
| |
| for chunk in stream_notdiamond_response(response, model): |
| if chunk['choices'][0]['delta'].get('content'): |
| full_content += chunk['choices'][0]['delta']['content'] |
|
|
| completion_tokens = count_tokens(full_content, model) |
| total_tokens = prompt_tokens + completion_tokens |
|
|
| return jsonify({ |
| "id": f"chatcmpl-{uuid.uuid4()}", |
| "object": "chat.completion", |
| "created": int(time.time()), |
| "model": model, |
| "system_fingerprint": generate_system_fingerprint(), |
| "choices": [ |
| { |
| "index": 0, |
| "message": { |
| "role": "assistant", |
| "content": full_content |
| }, |
| "finish_reason": "stop" |
| } |
| ], |
| "usage": { |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": completion_tokens, |
| "total_tokens": total_tokens |
| } |
| }) |
|
|
| def generate_stream_response(response, model, prompt_tokens): |
| """生成流式 HTTP 响应。""" |
| total_completion_tokens = 0 |
| |
| for chunk in stream_notdiamond_response(response, model): |
| content = chunk['choices'][0]['delta'].get('content', '') |
| total_completion_tokens += count_tokens(content, model) |
| |
| chunk['usage'] = { |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": total_completion_tokens, |
| "total_tokens": prompt_tokens + total_completion_tokens |
| } |
| |
| yield f"data: {json.dumps(chunk)}\n\n" |
| |
| yield "data: [DONE]\n\n" |