vertex2api / app.js
smgc's picture
Update app.js
47afc7f verified
raw
history blame
6.49 kB
const express = require('express');
const fetch = require('node-fetch');
const zlib = require('zlib');
const { promisify } = require('util');
const stream = require('stream');
const gunzip = promisify(zlib.gunzip);
const pipeline = promisify(stream.pipeline);
const PROJECT_ID = process.env.PROJECT_ID;
const CLIENT_ID = process.env.CLIENT_ID;
const CLIENT_SECRET = process.env.CLIENT_SECRET;
const REFRESH_TOKEN = process.env.REFRESH_TOKEN;
const API_KEY = process.env.API_KEY;
const TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token';
let tokenCache = {
accessToken: '',
expiry: 0,
refreshPromise: null
};
function logRequest(req, status, message) {
const timestamp = new Date().toISOString();
const method = req.method;
const url = req.originalUrl;
const ip = req.ip;
console.log(`[${timestamp}] ${method} ${url} - Status: ${status}, IP: ${ip}, Message: ${message}`);
}
async function getAccessToken() {
const now = Date.now() / 1000;
if (tokenCache.accessToken && now < tokenCache.expiry - 120) {
return tokenCache.accessToken;
}
if (tokenCache.refreshPromise) {
await tokenCache.refreshPromise;
return tokenCache.accessToken;
}
tokenCache.refreshPromise = (async () => {
try {
const response = await fetch(TOKEN_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
client_id: CLIENT_ID,
client_secret: CLIENT_SECRET,
refresh_token: REFRESH_TOKEN,
grant_type: 'refresh_token'
})
});
const data = await response.json();
tokenCache.accessToken = data.access_token;
tokenCache.expiry = now + data.expires_in;
} finally {
tokenCache.refreshPromise = null;
}
})();
await tokenCache.refreshPromise;
return tokenCache.accessToken;
}
function getLocation() {
const currentSeconds = new Date().getSeconds();
return currentSeconds < 30 ? 'europe-west1' : 'us-east5';
}
function constructApiUrl(location, model) {
return `https://${location}-aiplatform.googleapis.com/v1/projects/${PROJECT_ID}/locations/${location}/publishers/anthropic/models/${model}:streamRawPredict`;
}
function formatModelName(model) {
if (model === 'claude-3-5-sonnet-20240620') {
return 'claude-3-5-sonnet@20240620';
}
return model;
}
async function handleRequest(req, res) {
if (req.method === 'OPTIONS') {
handleOptions(res);
logRequest(req, 204, 'CORS preflight request');
return;
}
const apiKey = req.headers['x-api-key'];
if (apiKey !== API_KEY) {
res.status(403).json({
type: "error",
error: {
type: "permission_error",
message: "Your API key does not have permission to use the specified resource."
}
});
logRequest(req, 403, 'Invalid API key');
return;
}
const accessToken = await getAccessToken();
const location = getLocation();
let requestBody = req.body;
let model = requestBody.model || 'claude-3-5-sonnet@20240620';
model = formatModelName(model);
const apiUrl = constructApiUrl(location, model);
if (requestBody.anthropic_version) {
delete requestBody.anthropic_version;
}
if (requestBody.model) {
delete requestBody.model;
}
requestBody.anthropic_version = "vertex-2023-10-16";
try {
const response = await fetch(apiUrl, {
method: 'POST',
headers: {
'Authorization': `Bearer ${accessToken}`,
'Content-Type': 'application/json; charset=utf-8',
'Accept-Encoding': 'gzip, deflate'
},
body: JSON.stringify(requestBody),
compress: false
});
res.status(response.status);
for (const [key, value] of response.headers.entries()) {
if (key.toLowerCase() !== 'content-encoding' && key.toLowerCase() !== 'content-length') {
res.setHeader(key, value);
}
}
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Access-Control-Allow-Methods', 'POST, GET, OPTIONS');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, x-api-key, anthropic-version, model');
const contentType = response.headers.get('content-type');
const contentEncoding = response.headers.get('content-encoding');
if (contentType && contentType.includes('text/event-stream')) {
// 处理 SSE
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const gunzipStream = zlib.createGunzip();
const transformStream = new stream.Transform({
transform(chunk, encoding, callback) {
this.push(chunk);
callback();
}
});
pipeline(response.body, gunzipStream, transformStream)
.then(() => {
console.log('Stream processing completed');
})
.catch((err) => {
console.error('Stream processing error:', err);
});
transformStream.pipe(res);
} else {
// 非流式响应的处理
const buffer = await response.buffer();
let data;
if (contentEncoding === 'gzip') {
try {
data = await gunzip(buffer);
} catch (error) {
console.error('Gunzip error:', error);
throw new Error('Failed to decompress the response');
}
} else {
data = buffer;
}
res.send(data);
}
logRequest(req, response.status, `Request forwarded successfully for model: ${model}`);
} catch (error) {
console.error('Request error:', error);
res.status(500).json({
type: "error",
error: {
type: "internal_server_error",
message: "An unexpected error occurred while processing your request."
}
});
logRequest(req, 500, `Error: ${error.message}`);
}
}
function handleOptions(res) {
res.status(204);
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Access-Control-Allow-Methods', 'POST, GET, OPTIONS');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, x-api-key, anthropic-version, model');
res.end();
}
const app = express();
app.use(express.json());
// 根路由处理
app.get('/', (req, res) => {
res.status(200).send('GCP VertexAI For Claude Proxy');
});
app.all('/ai/v1/messages', handleRequest);
const PORT = 8080;
app.listen(PORT, () => {
console.log(`Server is running on port ${PORT}`);
});