edtech / apps /api /src /routes /internal.ts
CognxSafeTrack
chore: stabilization, audit fixes and project synchronization
7b0c22b
import { FastifyInstance } from 'fastify';
import { whatsappService } from '../services/whatsapp';
import { z } from 'zod';
import { getOrganizationByPhoneNumberId } from '../services/organization';
import { logger } from '../logger';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { FastifyAdapter } from '@bull-board/fastify';
import { whatsappQueue, notificationQueue } from '../services/queue';
const WebhookPayloadSchema = z.object({
object: z.literal('whatsapp_business_account'),
entry: z.array(z.object({
id: z.string(),
changes: z.array(z.object({
value: z.object({
metadata: z.object({ phone_number_id: z.string() }).optional(),
messages: z.array(z.any()).optional(),
}),
field: z.string(),
})),
})),
});
export async function internalRoutes(fastify: FastifyInstance) {
// ── BullBoard Dashboard ──────────────────────────────────────────────────
const serverAdapter = new FastifyAdapter();
createBullBoard({
queues: [
new BullMQAdapter(whatsappQueue),
new BullMQAdapter(notificationQueue)
],
serverAdapter,
});
// Re-enabled BullBoard for production monitoring
serverAdapter.setBasePath('/v1/internal/queues');
fastify.register(async (instance) => {
instance.register(serverAdapter.registerPlugin(), {
prefix: '/',
});
}, {
prefix: '/v1/internal/queues',
config: { requireAuth: true }
});
// ── Handle Webhook Forwarding from Gateway (HF -> Railway) ───────────────
fastify.post('/v1/internal/whatsapp/inbound', {
config: { requireAuth: true }
}, async (request, reply) => {
try {
const parsed = WebhookPayloadSchema.safeParse(request.body);
if (!parsed.success) {
return reply.code(400).send({ error: 'Invalid Payload', details: parsed.error.flatten() });
}
for (const entry of parsed.data.entry) {
for (const change of entry.changes) {
const phoneNumberId = change.value.metadata?.phone_number_id || 'unknown';
const organizationId = await getOrganizationByPhoneNumberId(phoneNumberId);
for (const message of change.value.messages || []) {
const phone = message.from;
let text = '';
let audioUrl: string | undefined;
let imageUrl: string | undefined;
if (message.type === 'text') text = message.text?.body;
else if (message.type === 'interactive') {
text = message.interactive?.button_reply?.id || message.interactive?.list_reply?.id;
} else if (message.type === 'audio') {
audioUrl = message.audio?.id;
text = ''; // Trigger transcription
} else if (message.type === 'image') {
imageUrl = message.image?.id;
text = ''; // Trigger analysis
}
if (phone) {
await whatsappQueue.add('handle-inbound', {
phone,
text,
audioUrl,
imageUrl,
organizationId
}, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true
});
}
}
}
}
return reply.code(200).send({ ok: true, status: 'queued' });
} catch (error) {
logger.error(`[INTERNAL-WEBHOOK] Enqueue error: ${error}`);
return reply.code(500).send({ error: 'Failed to enqueue messages' });
}
});
// ── Handle standard transcribed messages from worker (Railway) ───────────
fastify.post<{
Body: { phone: string; text: string; audioUrl?: string; imageUrl?: string; organizationId?: string }
}>('/v1/internal/handle-message', {
config: { requireAuth: true }
}, async (request, reply) => {
const { phone, text, audioUrl, imageUrl, organizationId } = request.body;
try {
await whatsappService.handleIncomingMessage(phone, text, audioUrl, imageUrl, undefined, organizationId);
return reply.send({ ok: true });
} catch (err: unknown) {
return reply.code(500).send({ error: err instanceof Error ? err.message : String(err) });
}
});
fastify.get('/v1/internal/ping', { config: { requireAuth: true } }, async () => ({ ok: true }));
}