IMGVLM / server.js
khushalcodiste's picture
feat(added langfuse ):
bd23640
import express from "express";
import swaggerUi from "swagger-ui-express";
import {
AutoProcessor,
Qwen3_5ForConditionalGeneration,
} from "@huggingface/transformers";
import crypto from "crypto";
import { NodeSDK } from "@opentelemetry/sdk-node";
import { LangfuseSpanProcessor } from "@langfuse/otel";
import { startActiveObservation } from "@langfuse/tracing";
const app = express();
const PORT = 7860;
const MODEL_ID = "huggingworld/Qwen3.5-0.8B-ONNX";
const API_KEY = process.env.API_KEY;
const LANGFUSE_PUBLIC_KEY = process.env.LANGFUSE_PUBLIC_KEY;
const LANGFUSE_SECRET_KEY = process.env.LANGFUSE_SECRET_KEY;
const LANGFUSE_BASE_URL = process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com";
const LANGFUSE_ENV = process.env.LANGFUSE_ENV || process.env.NODE_ENV || "development";
const LANGFUSE_ENABLED = Boolean(LANGFUSE_PUBLIC_KEY && LANGFUSE_SECRET_KEY);
let model = null;
let processor = null;
let inferenceQueue = Promise.resolve();
let telemetrySdk = null;
function log(level, event, meta = {}) {
const payload = {
ts: new Date().toISOString(),
level,
event,
...meta,
};
const line = JSON.stringify(payload);
if (level === "error") {
console.error(line);
} else {
console.log(line);
}
}
async function loadModel() {
const start = Date.now();
log("info", "model_load_started", { model_id: MODEL_ID });
processor = await AutoProcessor.from_pretrained(MODEL_ID);
model = await Qwen3_5ForConditionalGeneration.from_pretrained(MODEL_ID, {
dtype: {
embed_tokens: "q4",
vision_encoder: "fp16",
decoder_model_merged: "q4",
},
});
log("info", "model_load_completed", {
model_id: MODEL_ID,
duration_ms: Date.now() - start,
});
}
async function setupTracing() {
if (!LANGFUSE_ENABLED) {
log("info", "langfuse_disabled", {
reason: "missing_langfuse_keys",
});
return;
}
const spanProcessor = new LangfuseSpanProcessor({
publicKey: LANGFUSE_PUBLIC_KEY,
secretKey: LANGFUSE_SECRET_KEY,
baseUrl: LANGFUSE_BASE_URL,
environment: LANGFUSE_ENV,
});
telemetrySdk = new NodeSDK({
spanProcessors: [spanProcessor],
});
await telemetrySdk.start();
log("info", "langfuse_enabled", {
base_url: LANGFUSE_BASE_URL,
environment: LANGFUSE_ENV,
});
}
async function withPromptTrace(req, prompt, maxTokens, handler) {
if (!LANGFUSE_ENABLED) {
return handler();
}
return startActiveObservation(
"http.prompt",
async (span) => {
span.update({
input: { prompt, max_tokens: maxTokens },
metadata: {
request_id: req.requestId,
method: req.method,
path: req.originalUrl,
},
});
return handler(span);
},
{ endOnExit: true },
);
}
async function runTextInference(prompt, maxTokens) {
const conversation = [
{
role: "user",
content: [{ type: "text", text: prompt }],
},
];
const text = processor.apply_chat_template(conversation, {
add_generation_prompt: true,
chat_template_kwargs: { enable_thinking: false },
});
const inputs = await processor(text);
const output = await model.generate({
...inputs,
max_new_tokens: maxTokens,
do_sample: false,
});
const promptLength = inputs.input_ids.dims.at(-1);
const decoded = processor.batch_decode(
output.slice(null, [promptLength, null]),
{ skip_special_tokens: true },
);
return decoded[0];
}
function queueTextInference(prompt, maxTokens) {
const task = inferenceQueue.then(() => runTextInference(prompt, maxTokens));
inferenceQueue = task.catch(() => {});
return task;
}
const swaggerDoc = {
openapi: "3.0.0",
info: {
title: "Qwen3.5-0.8B Text API (ONNX)",
version: "1.0.0",
description: "Text inference API using Qwen3.5-0.8B ONNX with transformers.js",
},
components: {
securitySchemes: {
ApiKeyAuth: {
type: "apiKey",
in: "header",
name: "X-API-Key",
description: "Set API_KEY env var; send as X-API-Key or Authorization: Bearer <key>",
},
},
},
paths: {
"/": {
get: {
summary: "Root",
responses: { 200: { description: "API status" } },
},
},
"/health": {
get: {
summary: "Health check",
responses: { 200: { description: "Model load status" } },
},
},
"/prompt": {
post: {
summary: "Text prompt inference (no image)",
requestBody: {
required: true,
content: {
"application/json": {
schema: {
type: "object",
required: ["prompt"],
properties: {
prompt: { type: "string", description: "Text prompt to send to the model" },
max_tokens: { type: "integer", default: 256 },
},
},
},
},
},
responses: {
200: { description: "Inference result" },
400: { description: "Invalid input" },
401: { description: "Invalid or missing API key" },
503: { description: "Model not loaded" },
},
security: [{ ApiKeyAuth: [] }],
},
},
},
};
function requireApiKey(req, res, next) {
if (!API_KEY) return next();
const bearer = req.headers.authorization?.startsWith("Bearer ")
? req.headers.authorization.slice(7)
: null;
const key = bearer ?? req.headers["x-api-key"] ?? null;
if (key !== API_KEY) {
log("warn", "api_key_rejected", { request_id: req.requestId, path: req.path });
return res.status(401).json({ detail: "Invalid or missing API key." });
}
next();
}
app.use("/docs", swaggerUi.serve, swaggerUi.setup(swaggerDoc));
app.use((req, res, next) => {
const requestId = crypto.randomUUID();
const start = Date.now();
req.requestId = requestId;
log("info", "request_started", {
request_id: requestId,
method: req.method,
path: req.originalUrl,
ip: req.ip,
});
res.on("finish", () => {
log("info", "request_finished", {
request_id: requestId,
method: req.method,
path: req.originalUrl,
status_code: res.statusCode,
duration_ms: Date.now() - start,
});
});
next();
});
app.get("/", (req, res) => {
log("info", "root_status", { request_id: req.requestId });
res.json({ status: "ok", model: MODEL_ID });
});
app.get("/health", (req, res) => {
log("info", "health_checked", {
request_id: req.requestId,
model_loaded: model !== null && processor !== null,
});
res.json({ status: "healthy", model_loaded: model !== null });
});
app.post("/prompt", requireApiKey, express.json(), async (req, res) => {
const prompt = req.body.prompt;
const maxTokens = parseInt(req.body.max_tokens) || 256;
log("info", "prompt_request_received", {
request_id: req.requestId,
prompt_chars: prompt?.length ?? 0,
max_tokens: maxTokens,
});
if (!model || !processor) {
log("error", "prompt_model_unavailable", { request_id: req.requestId });
return res.status(503).json({ detail: "Model not loaded yet." });
}
if (!prompt) {
log("error", "prompt_validation_failed", {
request_id: req.requestId,
reason: "missing_prompt",
});
return res.status(400).json({ detail: "No prompt provided." });
}
try {
await withPromptTrace(req, prompt, maxTokens, async (span) => {
const generation = span?.startObservation(
"qwen_text_generation",
{
model: MODEL_ID,
input: prompt,
modelParameters: { max_new_tokens: maxTokens, do_sample: 0 },
},
{ asType: "generation" },
);
const start = Date.now();
const response = await queueTextInference(prompt, maxTokens);
const duration = Date.now() - start;
generation?.update({ output: response }).end();
span?.update({
output: { response_chars: response?.length ?? 0 },
metadata: { duration_ms: duration },
});
log("info", "prompt_completed", {
request_id: req.requestId,
duration_ms: duration,
response_chars: response?.length ?? 0,
});
res.json({ response });
});
} catch (err) {
log("error", "prompt_failed", {
request_id: req.requestId,
error: err.message,
stack: err.stack,
});
res.status(500).json({ detail: "Inference failed.", error: err.message });
}
});
Promise.all([setupTracing(), loadModel()]).then(() => {
app.listen(PORT, "0.0.0.0", async () => {
log("info", "server_started", {
host: "0.0.0.0",
port: PORT,
model_id: MODEL_ID,
langfuse_enabled: LANGFUSE_ENABLED,
});
const shutdown = async (signal) => {
log("info", "shutdown_started", { signal });
if (telemetrySdk) {
await telemetrySdk.shutdown();
}
process.exit(0);
};
process.on("SIGINT", () => shutdown("SIGINT"));
process.on("SIGTERM", () => shutdown("SIGTERM"));
});
});