| import express from "express"; |
| import swaggerUi from "swagger-ui-express"; |
| import { |
| AutoProcessor, |
| Qwen3_5ForConditionalGeneration, |
| } from "@huggingface/transformers"; |
| import crypto from "crypto"; |
| import { NodeSDK } from "@opentelemetry/sdk-node"; |
| import { LangfuseSpanProcessor } from "@langfuse/otel"; |
| import { startActiveObservation } from "@langfuse/tracing"; |
|
|
| const app = express(); |
| const PORT = 7860; |
| const MODEL_ID = "huggingworld/Qwen3.5-0.8B-ONNX"; |
| const API_KEY = process.env.API_KEY; |
| const LANGFUSE_PUBLIC_KEY = process.env.LANGFUSE_PUBLIC_KEY; |
| const LANGFUSE_SECRET_KEY = process.env.LANGFUSE_SECRET_KEY; |
| const LANGFUSE_BASE_URL = process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com"; |
| const LANGFUSE_ENV = process.env.LANGFUSE_ENV || process.env.NODE_ENV || "development"; |
| const LANGFUSE_ENABLED = Boolean(LANGFUSE_PUBLIC_KEY && LANGFUSE_SECRET_KEY); |
|
|
| let model = null; |
| let processor = null; |
| let inferenceQueue = Promise.resolve(); |
| let telemetrySdk = null; |
|
|
| function log(level, event, meta = {}) { |
| const payload = { |
| ts: new Date().toISOString(), |
| level, |
| event, |
| ...meta, |
| }; |
| const line = JSON.stringify(payload); |
| if (level === "error") { |
| console.error(line); |
| } else { |
| console.log(line); |
| } |
| } |
|
|
| async function loadModel() { |
| const start = Date.now(); |
| log("info", "model_load_started", { model_id: MODEL_ID }); |
| processor = await AutoProcessor.from_pretrained(MODEL_ID); |
| model = await Qwen3_5ForConditionalGeneration.from_pretrained(MODEL_ID, { |
| dtype: { |
| embed_tokens: "q4", |
| vision_encoder: "fp16", |
| decoder_model_merged: "q4", |
| }, |
| }); |
| log("info", "model_load_completed", { |
| model_id: MODEL_ID, |
| duration_ms: Date.now() - start, |
| }); |
| } |
|
|
| async function setupTracing() { |
| if (!LANGFUSE_ENABLED) { |
| log("info", "langfuse_disabled", { |
| reason: "missing_langfuse_keys", |
| }); |
| return; |
| } |
|
|
| const spanProcessor = new LangfuseSpanProcessor({ |
| publicKey: LANGFUSE_PUBLIC_KEY, |
| secretKey: LANGFUSE_SECRET_KEY, |
| baseUrl: LANGFUSE_BASE_URL, |
| environment: LANGFUSE_ENV, |
| }); |
|
|
| telemetrySdk = new NodeSDK({ |
| spanProcessors: [spanProcessor], |
| }); |
|
|
| await telemetrySdk.start(); |
| log("info", "langfuse_enabled", { |
| base_url: LANGFUSE_BASE_URL, |
| environment: LANGFUSE_ENV, |
| }); |
| } |
|
|
| async function withPromptTrace(req, prompt, maxTokens, handler) { |
| if (!LANGFUSE_ENABLED) { |
| return handler(); |
| } |
|
|
| return startActiveObservation( |
| "http.prompt", |
| async (span) => { |
| span.update({ |
| input: { prompt, max_tokens: maxTokens }, |
| metadata: { |
| request_id: req.requestId, |
| method: req.method, |
| path: req.originalUrl, |
| }, |
| }); |
|
|
| return handler(span); |
| }, |
| { endOnExit: true }, |
| ); |
| } |
|
|
| async function runTextInference(prompt, maxTokens) { |
| const conversation = [ |
| { |
| role: "user", |
| content: [{ type: "text", text: prompt }], |
| }, |
| ]; |
|
|
| const text = processor.apply_chat_template(conversation, { |
| add_generation_prompt: true, |
| chat_template_kwargs: { enable_thinking: false }, |
| }); |
|
|
| const inputs = await processor(text); |
| const output = await model.generate({ |
| ...inputs, |
| max_new_tokens: maxTokens, |
| do_sample: false, |
| }); |
|
|
| const promptLength = inputs.input_ids.dims.at(-1); |
| const decoded = processor.batch_decode( |
| output.slice(null, [promptLength, null]), |
| { skip_special_tokens: true }, |
| ); |
| return decoded[0]; |
| } |
|
|
| function queueTextInference(prompt, maxTokens) { |
| const task = inferenceQueue.then(() => runTextInference(prompt, maxTokens)); |
| inferenceQueue = task.catch(() => {}); |
| return task; |
| } |
|
|
| const swaggerDoc = { |
| openapi: "3.0.0", |
| info: { |
| title: "Qwen3.5-0.8B Text API (ONNX)", |
| version: "1.0.0", |
| description: "Text inference API using Qwen3.5-0.8B ONNX with transformers.js", |
| }, |
| components: { |
| securitySchemes: { |
| ApiKeyAuth: { |
| type: "apiKey", |
| in: "header", |
| name: "X-API-Key", |
| description: "Set API_KEY env var; send as X-API-Key or Authorization: Bearer <key>", |
| }, |
| }, |
| }, |
| paths: { |
| "/": { |
| get: { |
| summary: "Root", |
| responses: { 200: { description: "API status" } }, |
| }, |
| }, |
| "/health": { |
| get: { |
| summary: "Health check", |
| responses: { 200: { description: "Model load status" } }, |
| }, |
| }, |
| "/prompt": { |
| post: { |
| summary: "Text prompt inference (no image)", |
| requestBody: { |
| required: true, |
| content: { |
| "application/json": { |
| schema: { |
| type: "object", |
| required: ["prompt"], |
| properties: { |
| prompt: { type: "string", description: "Text prompt to send to the model" }, |
| max_tokens: { type: "integer", default: 256 }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| responses: { |
| 200: { description: "Inference result" }, |
| 400: { description: "Invalid input" }, |
| 401: { description: "Invalid or missing API key" }, |
| 503: { description: "Model not loaded" }, |
| }, |
| security: [{ ApiKeyAuth: [] }], |
| }, |
| }, |
| }, |
| }; |
|
|
| function requireApiKey(req, res, next) { |
| if (!API_KEY) return next(); |
| const bearer = req.headers.authorization?.startsWith("Bearer ") |
| ? req.headers.authorization.slice(7) |
| : null; |
| const key = bearer ?? req.headers["x-api-key"] ?? null; |
| if (key !== API_KEY) { |
| log("warn", "api_key_rejected", { request_id: req.requestId, path: req.path }); |
| return res.status(401).json({ detail: "Invalid or missing API key." }); |
| } |
| next(); |
| } |
|
|
| app.use("/docs", swaggerUi.serve, swaggerUi.setup(swaggerDoc)); |
|
|
| app.use((req, res, next) => { |
| const requestId = crypto.randomUUID(); |
| const start = Date.now(); |
| req.requestId = requestId; |
| log("info", "request_started", { |
| request_id: requestId, |
| method: req.method, |
| path: req.originalUrl, |
| ip: req.ip, |
| }); |
| res.on("finish", () => { |
| log("info", "request_finished", { |
| request_id: requestId, |
| method: req.method, |
| path: req.originalUrl, |
| status_code: res.statusCode, |
| duration_ms: Date.now() - start, |
| }); |
| }); |
| next(); |
| }); |
|
|
| app.get("/", (req, res) => { |
| log("info", "root_status", { request_id: req.requestId }); |
| res.json({ status: "ok", model: MODEL_ID }); |
| }); |
|
|
| app.get("/health", (req, res) => { |
| log("info", "health_checked", { |
| request_id: req.requestId, |
| model_loaded: model !== null && processor !== null, |
| }); |
| res.json({ status: "healthy", model_loaded: model !== null }); |
| }); |
|
|
| app.post("/prompt", requireApiKey, express.json(), async (req, res) => { |
| const prompt = req.body.prompt; |
| const maxTokens = parseInt(req.body.max_tokens) || 256; |
| log("info", "prompt_request_received", { |
| request_id: req.requestId, |
| prompt_chars: prompt?.length ?? 0, |
| max_tokens: maxTokens, |
| }); |
|
|
| if (!model || !processor) { |
| log("error", "prompt_model_unavailable", { request_id: req.requestId }); |
| return res.status(503).json({ detail: "Model not loaded yet." }); |
| } |
| if (!prompt) { |
| log("error", "prompt_validation_failed", { |
| request_id: req.requestId, |
| reason: "missing_prompt", |
| }); |
| return res.status(400).json({ detail: "No prompt provided." }); |
| } |
|
|
| try { |
| await withPromptTrace(req, prompt, maxTokens, async (span) => { |
| const generation = span?.startObservation( |
| "qwen_text_generation", |
| { |
| model: MODEL_ID, |
| input: prompt, |
| modelParameters: { max_new_tokens: maxTokens, do_sample: 0 }, |
| }, |
| { asType: "generation" }, |
| ); |
|
|
| const start = Date.now(); |
| const response = await queueTextInference(prompt, maxTokens); |
| const duration = Date.now() - start; |
|
|
| generation?.update({ output: response }).end(); |
| span?.update({ |
| output: { response_chars: response?.length ?? 0 }, |
| metadata: { duration_ms: duration }, |
| }); |
|
|
| log("info", "prompt_completed", { |
| request_id: req.requestId, |
| duration_ms: duration, |
| response_chars: response?.length ?? 0, |
| }); |
|
|
| res.json({ response }); |
| }); |
| } catch (err) { |
| log("error", "prompt_failed", { |
| request_id: req.requestId, |
| error: err.message, |
| stack: err.stack, |
| }); |
| res.status(500).json({ detail: "Inference failed.", error: err.message }); |
| } |
| }); |
|
|
| Promise.all([setupTracing(), loadModel()]).then(() => { |
| app.listen(PORT, "0.0.0.0", async () => { |
| log("info", "server_started", { |
| host: "0.0.0.0", |
| port: PORT, |
| model_id: MODEL_ID, |
| langfuse_enabled: LANGFUSE_ENABLED, |
| }); |
|
|
| const shutdown = async (signal) => { |
| log("info", "shutdown_started", { signal }); |
| if (telemetrySdk) { |
| await telemetrySdk.shutdown(); |
| } |
| process.exit(0); |
| }; |
|
|
| process.on("SIGINT", () => shutdown("SIGINT")); |
| process.on("SIGTERM", () => shutdown("SIGTERM")); |
| }); |
| }); |
|
|