ffreemt
Update local files instead of git clone
813eca2
raw
history blame
No virus
16.2 kB
import { PassThrough } from "stream";
import _ from "lodash";
import AsyncLock from "async-lock";
import axios, { AxiosResponse } from "axios";
import APIException from "@/lib/exceptions/APIException.ts";
import EX from "@/api/consts/exceptions.ts";
import { createParser } from "eventsource-parser";
import logger from "@/lib/logger.ts";
import util from "@/lib/util.ts";
// 模型名称
const MODEL_NAME = "deepseek-chat";
// access_token有效期
const ACCESS_TOKEN_EXPIRES = 3600;
// 最大重试次数
const MAX_RETRY_COUNT = 3;
// 重试延迟
const RETRY_DELAY = 5000;
// 伪装headers
const FAKE_HEADERS = {
Accept: "*/*",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9",
Origin: "https://chat.deepseek.com",
Pragma: "no-cache",
Referer: "https://chat.deepseek.com/",
"Sec-Ch-Ua":
'"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"X-App-Version": "20240126.0",
};
// access_token映射
const accessTokenMap = new Map();
// access_token请求队列映射
const accessTokenRequestQueueMap: Record<string, Function[]> = {};
// 聊天异步锁
const chatLock = new AsyncLock();
/**
* 请求access_token
*
* 使用refresh_token去刷新获得access_token
*
* @param refreshToken 用于刷新access_token的refresh_token
*/
async function requestToken(refreshToken: string) {
if (accessTokenRequestQueueMap[refreshToken])
return new Promise((resolve) =>
accessTokenRequestQueueMap[refreshToken].push(resolve)
);
accessTokenRequestQueueMap[refreshToken] = [];
logger.info(`Refresh token: ${refreshToken}`);
const result = await (async () => {
const result = await axios.get(
"https://chat.deepseek.com/api/v0/users/current",
{
headers: {
Authorization: `Bearer ${refreshToken}`,
...FAKE_HEADERS,
},
timeout: 15000,
validateStatus: () => true,
}
);
const { token } = checkResult(result, refreshToken);
return {
accessToken: token,
refreshToken: token,
refreshTime: util.unixTimestamp() + ACCESS_TOKEN_EXPIRES,
};
})()
.then((result) => {
if (accessTokenRequestQueueMap[refreshToken]) {
accessTokenRequestQueueMap[refreshToken].forEach((resolve) =>
resolve(result)
);
delete accessTokenRequestQueueMap[refreshToken];
}
logger.success(`Refresh successful`);
return result;
})
.catch((err) => {
if (accessTokenRequestQueueMap[refreshToken]) {
accessTokenRequestQueueMap[refreshToken].forEach((resolve) =>
resolve(err)
);
delete accessTokenRequestQueueMap[refreshToken];
}
return err;
});
if (_.isError(result)) throw result;
return result;
}
/**
* 获取缓存中的access_token
*
* 避免短时间大量刷新token,未加锁,如果有并发要求还需加锁
*
* @param refreshToken 用于刷新access_token的refresh_token
*/
async function acquireToken(refreshToken: string): Promise<string> {
let result = accessTokenMap.get(refreshToken);
if (!result) {
result = await requestToken(refreshToken);
accessTokenMap.set(refreshToken, result);
}
if (util.unixTimestamp() > result.refreshTime) {
result = await requestToken(refreshToken);
accessTokenMap.set(refreshToken, result);
}
return result.accessToken;
}
/**
* 清除上下文
*
* @param model 模型名称
* @param refreshToken 用于刷新access_token的refresh_token
*/
async function clearContext(model: string, refreshToken: string) {
const token = await acquireToken(refreshToken);
const result = await axios.post(
"https://chat.deepseek.com/api/v0/chat/clear_context",
{
model_class: model,
append_welcome_message: false
},
{
headers: {
Authorization: `Bearer ${token}`,
...FAKE_HEADERS,
},
timeout: 15000,
validateStatus: () => true,
}
);
checkResult(result, refreshToken);
}
/**
* 同步对话补全
*
* @param model 模型名称
* @param messages 参考gpt系列消息格式,多轮对话请完整提供上下文
* @param refreshToken 用于刷新access_token的refresh_token
* @param retryCount 重试次数
*/
async function createCompletion(
model = MODEL_NAME,
messages: any[],
refreshToken: string,
retryCount = 0
) {
return (async () => {
logger.info(messages);
// 确保当前请求有干净上下文
const result = await chatLock.acquire(refreshToken, async () => {
// 清除上下文
await clearContext(model, refreshToken);
// 请求流
const token = await acquireToken(refreshToken);
return await axios.post(
"https://chat.deepseek.com/api/v0/chat/completions",
{
message: messagesPrepare(messages),
stream: true,
model_preference: null,
model_class: model,
temperature: 0
},
{
headers: {
Authorization: `Bearer ${token}`,
...FAKE_HEADERS
},
// 120秒超时
timeout: 120000,
validateStatus: () => true,
responseType: "stream",
}
);
});
if (result.headers["content-type"].indexOf("text/event-stream") == -1) {
result.data.on("data", buffer => logger.error(buffer.toString()));
throw new APIException(
EX.API_REQUEST_FAILED,
`Stream response Content-Type invalid: ${result.headers["content-type"]}`
);
}
const streamStartTime = util.timestamp();
// 接收流为输出文本
const answer = await receiveStream(model, result.data);
logger.success(
`Stream has completed transfer ${util.timestamp() - streamStartTime}ms`
);
return answer;
})().catch((err) => {
if (retryCount < MAX_RETRY_COUNT) {
logger.error(`Stream response error: ${err.stack}`);
logger.warn(`Try again after ${RETRY_DELAY / 1000}s...`);
return (async () => {
await new Promise((resolve) => setTimeout(resolve, RETRY_DELAY));
return createCompletion(
model,
messages,
refreshToken,
retryCount + 1
);
})();
}
throw err;
});
}
/**
* 流式对话补全
*
* @param model 模型名称
* @param messages 参考gpt系列消息格式,多轮对话请完整提供上下文
* @param refreshToken 用于刷新access_token的refresh_token
* @param retryCount 重试次数
*/
async function createCompletionStream(
model = MODEL_NAME,
messages: any[],
refreshToken: string,
retryCount = 0
) {
return (async () => {
logger.info(messages);
const result = await chatLock.acquire(refreshToken, async () => {
// 清除上下文
await clearContext(model, refreshToken);
// 请求流
const token = await acquireToken(refreshToken);
return await axios.post(
"https://chat.deepseek.com/api/v0/chat/completions",
{
message: messagesPrepare(messages),
stream: true,
model_preference: null,
model_class: model,
temperature: 0
},
{
headers: {
Authorization: `Bearer ${token}`,
...FAKE_HEADERS
},
// 120秒超时
timeout: 120000,
validateStatus: () => true,
responseType: "stream",
}
);
});
if (result.headers["content-type"].indexOf("text/event-stream") == -1) {
logger.error(
`Invalid response Content-Type:`,
result.headers["content-type"]
);
result.data.on("data", buffer => logger.error(buffer.toString()));
const transStream = new PassThrough();
transStream.end(
`data: ${JSON.stringify({
id: "",
model: MODEL_NAME,
object: "chat.completion.chunk",
choices: [
{
index: 0,
delta: {
role: "assistant",
content: "服务暂时不可用,第三方响应错误",
},
finish_reason: "stop",
},
],
usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2 },
created: util.unixTimestamp(),
})}\n\n`
);
return transStream;
}
const streamStartTime = util.timestamp();
// 创建转换流将消息格式转换为gpt兼容格式
return createTransStream(model, result.data, () => {
logger.success(
`Stream has completed transfer ${util.timestamp() - streamStartTime}ms`
);
});
})().catch((err) => {
if (retryCount < MAX_RETRY_COUNT) {
logger.error(`Stream response error: ${err.stack}`);
logger.warn(`Try again after ${RETRY_DELAY / 1000}s...`);
return (async () => {
await new Promise((resolve) => setTimeout(resolve, RETRY_DELAY));
return createCompletionStream(
model,
messages,
refreshToken,
retryCount + 1
);
})();
}
throw err;
});
}
/**
* 消息预处理
*
* 由于接口只取第一条消息,此处会将多条消息合并为一条,实现多轮对话效果
*
* @param messages 参考gpt系列消息格式,多轮对话请完整提供上下文
*/
function messagesPrepare(messages: any[]) {
let content;
if (messages.length < 2) {
content = messages.reduce((content, message) => {
if (_.isArray(message.content)) {
return (
message.content.reduce((_content, v) => {
if (!_.isObject(v) || v["type"] != "text") return _content;
return _content + (v["text"] || "") + "\n";
}, content)
);
}
return content + `${message.content}\n`;
}, "");
logger.info("\n透传内容:\n" + content);
}
else {
content = (
messages.reduce((content, message) => {
if (_.isArray(message.content)) {
return (
message.content.reduce((_content, v) => {
if (!_.isObject(v) || v["type"] != "text") return _content;
return _content + (`${message.role}:` + v["text"] || "") + "\n";
}, content)
);
}
return (content += `${message.role}:${message.content}\n`);
}, "") + "assistant:"
)
// 移除MD图像URL避免幻觉
.replace(/\!\[.+\]\(.+\)/g, "");
logger.info("\n对话合并:\n" + content);
}
return content;
}
/**
* 检查请求结果
*
* @param result 结果
* @param refreshToken 用于刷新access_token的refresh_token
*/
function checkResult(result: AxiosResponse, refreshToken: string) {
if (!result.data) return null;
const { code, data, msg } = result.data;
if (!_.isFinite(code)) return result.data;
if (code === 0) return data;
if (code == 40003) accessTokenMap.delete(refreshToken);
throw new APIException(EX.API_REQUEST_FAILED, `[请求deepseek失败]: ${msg}`);
}
/**
* 从流接收完整的消息内容
*
* @param model 模型名称
* @param stream 消息流
*/
async function receiveStream(model: string, stream: any): Promise<any> {
return new Promise((resolve, reject) => {
// 消息初始化
const data = {
id: "",
model,
object: "chat.completion",
choices: [
{
index: 0,
message: { role: "assistant", content: "" },
finish_reason: "stop",
},
],
usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2 },
created: util.unixTimestamp(),
};
const parser = createParser((event) => {
try {
if (event.type !== "event") return;
// 解析JSON
const result = _.attempt(() => JSON.parse(event.data));
if (_.isError(result))
throw new Error(`Stream response invalid: ${event.data}`);
if (!result.choices || !result.choices[0] || !result.choices[0].delta || !result.choices[0].delta.content || result.choices[0].delta.content == ' ')
return;
data.choices[0].message.content += result.choices[0].delta.content;
if (result.choices && result.choices[0] && result.choices[0].finish_reason === "stop")
resolve(data);
} catch (err) {
logger.error(err);
reject(err);
}
});
// 将流数据喂给SSE转换器
stream.on("data", (buffer) => parser.feed(buffer.toString()));
stream.once("error", (err) => reject(err));
stream.once("close", () => resolve(data));
});
}
/**
* 创建转换流
*
* 将流格式转换为gpt兼容流格式
*
* @param model 模型名称
* @param stream 消息流
* @param endCallback 传输结束回调
*/
function createTransStream(model: string, stream: any, endCallback?: Function) {
// 消息创建时间
const created = util.unixTimestamp();
// 创建转换流
const transStream = new PassThrough();
!transStream.closed &&
transStream.write(
`data: ${JSON.stringify({
id: "",
model,
object: "chat.completion.chunk",
choices: [
{
index: 0,
delta: { role: "assistant", content: "" },
finish_reason: null,
},
],
created,
})}\n\n`
);
const parser = createParser((event) => {
try {
if (event.type !== "event") return;
// 解析JSON
const result = _.attempt(() => JSON.parse(event.data));
if (_.isError(result))
throw new Error(`Stream response invalid: ${event.data}`);
if (!result.choices || !result.choices[0] || !result.choices[0].delta || !result.choices[0].delta.content || result.choices[0].delta.content == ' ')
return;
result.model = model;
transStream.write(`data: ${JSON.stringify({
id: result.id,
model: result.model,
object: "chat.completion.chunk",
choices: [
{
index: 0,
delta: { role: "assistant", content: result.choices[0].delta.content },
finish_reason: null,
},
],
created,
})}\n\n`);
if (result.choices && result.choices[0] && result.choices[0].finish_reason === "stop") {
transStream.write(`data: ${JSON.stringify({
id: result.id,
model: result.model,
object: "chat.completion.chunk",
choices: [
{
index: 0,
delta: { role: "assistant", content: "" },
finish_reason: "stop"
},
],
created,
})}\n\n`);
!transStream.closed && transStream.end("data: [DONE]\n\n");
}
} catch (err) {
logger.error(err);
!transStream.closed && transStream.end("data: [DONE]\n\n");
}
});
// 将流数据喂给SSE转换器
stream.on("data", (buffer) => parser.feed(buffer.toString()));
stream.once(
"error",
() => !transStream.closed && transStream.end("data: [DONE]\n\n")
);
stream.once(
"close",
() => !transStream.closed && transStream.end("data: [DONE]\n\n")
);
return transStream;
}
/**
* Token切分
*
* @param authorization 认证字符串
*/
function tokenSplit(authorization: string) {
return authorization.replace("Bearer ", "").split(",");
}
/**
* 获取Token存活状态
*/
async function getTokenLiveStatus(refreshToken: string) {
const token = await acquireToken(refreshToken);
const result = await axios.get(
"https://chat.deepseek.com/api/v0/users/current",
{
headers: {
Authorization: `Bearer ${token}`,
...FAKE_HEADERS,
},
timeout: 15000,
validateStatus: () => true,
}
);
try {
const { token } = checkResult(result, refreshToken);
return !!token;
}
catch (err) {
return false;
}
}
export default {
createCompletion,
createCompletionStream,
getTokenLiveStatus,
tokenSplit,
};