Spaces:
Paused
Paused
| import express from 'express'; | |
| import cors from 'cors'; | |
| import { createClient } from '@supabase/supabase-js'; | |
| // ========================================== | |
| // βοΈ CONFIGURATION & SETTINGS | |
| // ========================================== | |
| const PORT = process.env.PORT || 7860; | |
| const SUPABASE_URL = process.env.SUPABASE_URL; | |
| const SUPABASE_KEY = process.env.SUPABASE_SERVICE_KEY; | |
| const CRON_SECRET = process.env.CRON_SECRET || "default_secret"; | |
| // π΄ VARIABLE BOOLEAN: Should past/missed jobs fire immediately on server startup? | |
| const RUN_PAST_EVENTS_ON_STARTUP = true; | |
| if (!SUPABASE_URL || !SUPABASE_KEY) { | |
| console.error("β Missing Supabase Credentials"); | |
| process.exit(1); | |
| } | |
| const app = express(); | |
| const supabase = createClient(SUPABASE_URL, SUPABASE_KEY); | |
| const activeJobs = new Map(); | |
| app.use(express.json()); | |
| app.use(cors()); | |
| // ========================================== | |
| // β±οΈ BULLETPROOF TIMEZONE MATH | |
| // ========================================== | |
| function getNextFiveAMDetails(rawOffset = 0) { | |
| let offsetHours = 0; | |
| // Auto-detect if payload sent Minutes (from JS) or standard Hours | |
| // JS getTimezoneOffset() is large (e.g. 300 for EST, -60 for CET) | |
| if (Math.abs(rawOffset) >= 30) { | |
| offsetHours = -(rawOffset / 60); | |
| } else { | |
| offsetHours = Number(rawOffset); // Already in hours (e.g. -5) | |
| } | |
| const now = new Date(); | |
| const target = new Date(now.getTime()); | |
| // Set to today's midnight UTC | |
| target.setUTCHours(0, 0, 0, 0); | |
| // Target 5 AM in the specified timezone | |
| // Example (EST / UTC-5): 5 - (-5) = 10:00 AM UTC | |
| // Example (CET / UTC+1): 5 - (+1) = 04:00 AM UTC | |
| target.setUTCHours(5 - offsetHours); | |
| // If 5 AM has ALREADY PASSED in that timezone today, schedule for tomorrow | |
| if (target.getTime() <= now.getTime()) { | |
| target.setUTCDate(target.getUTCDate() + 1); | |
| } | |
| return { | |
| delayMs: target.getTime() - now.getTime(), | |
| offsetHours: offsetHours | |
| }; | |
| } | |
| // ========================================== | |
| // π JOB EXECUTION & SCHEDULING | |
| // ========================================== | |
| async function executeJob(jobId) { | |
| const job = activeJobs.get(jobId); | |
| if (!job) return; | |
| console.log(`β° [${new Date().toLocaleTimeString()}] Executing: ${jobId}`); | |
| try { | |
| const res = await fetch(job.url, { | |
| method: 'POST', | |
| headers: { 'Content-Type': 'application/json' }, | |
| body: JSON.stringify(job.payload) | |
| }); | |
| console.log(` ββ Response: ${res.status}`); | |
| // β UPDATE DATABASE ON SUCCESS | |
| if (res.ok) { | |
| const { error } = await supabase | |
| .from('system_jobs') | |
| .update({ updated_at: new Date().toISOString() }) | |
| .eq('id', jobId); | |
| if (!error) { | |
| console.log(` ββ πΎ Database 'updated_at' successfully synced.`); | |
| } else { | |
| console.error(` ββ β Failed to update DB:`, error.message); | |
| } | |
| } | |
| } catch (e) { | |
| console.error(`β Job ${jobId} HTTP Failed:`, e.message); | |
| } | |
| } | |
| function startJobInternal(jobId, intervalMs, url, payload, delayData, runImmediately = false) { | |
| let delay = delayData.delayMs; | |
| let offsetHours = delayData.offsetHours; | |
| // Failsafe format for log printing (e.g., UTC+1 or UTC-5) | |
| const tzString = `UTC${offsetHours >= 0 ? '+' : ''}${offsetHours}`; | |
| const nextRunAt = Date.now() + delay; | |
| const hours = (delay / 1000 / 60 / 60).toFixed(2); | |
| const minutes = (delay / 1000 / 60).toFixed(2); | |
| activeJobs.set(jobId, { | |
| url, | |
| payload, | |
| intervalMs, | |
| offset: offsetHours, | |
| nextRunAt | |
| }); | |
| if (runImmediately) { | |
| console.log(`β‘ [Startup] Missed event detected! Firing immediately: ${jobId}`); | |
| executeJob(jobId); | |
| console.log(`β³ Job ${jobId} re-scheduled. Target: 5AM (${tzString}). Next run in ${hours} hours (${minutes} mins).`); | |
| } else { | |
| console.log(`β³ Job ${jobId} scheduled. Target: 5AM (${tzString}). Next run in ${hours} hours (${minutes} mins).`); | |
| } | |
| } | |
| // ========================================== | |
| // βοΈ THE TICK ENGINE (WATCHDOG) | |
| // ========================================== | |
| setInterval(() => { | |
| const now = Date.now(); | |
| for (const [jobId, job] of activeJobs.entries()) { | |
| if (now >= job.nextRunAt) { | |
| executeJob(jobId); | |
| const nextDelayData = getNextFiveAMDetails(job.offset); | |
| job.nextRunAt = now + nextDelayData.delayMs; | |
| const hours = (nextDelayData.delayMs / 1000 / 60 / 60).toFixed(2); | |
| console.log(`π Job ${jobId} entered 24h cycle. Target: 5AM (UTC${job.offset >= 0 ? '+' : ''}${job.offset}). Next run in ${hours} hours.`); | |
| } | |
| } | |
| }, 60000); | |
| // ========================================== | |
| // πΎ DB HYDRATION USING `updated_at` | |
| // ========================================== | |
| async function hydrateJobs() { | |
| console.log("π§ Hydrating Cron Jobs from DB..."); | |
| const { data, error } = await supabase.from('system_jobs').select('*'); | |
| if (error) { | |
| console.error("β Hydration Failed:", error.message); | |
| return; | |
| } | |
| let count = 0; | |
| const now = Date.now(); | |
| for (const job of data) { | |
| const delayData = getNextFiveAMDetails(job.payload?.timezoneOffset ?? 0); | |
| let runImmediately = false; | |
| // β CHECK LAST UPDATED TIME | |
| if (job.updated_at) { | |
| const lastUpdatedTime = new Date(job.updated_at).getTime(); | |
| const timeSinceLastRunMs = now - lastUpdatedTime; | |
| // If it's been more than 24 hours (86,400,000 ms), fire it immediately to catch up. | |
| if (RUN_PAST_EVENTS_ON_STARTUP && timeSinceLastRunMs >= 86400000) { | |
| runImmediately = true; | |
| } | |
| } else if (RUN_PAST_EVENTS_ON_STARTUP) { | |
| runImmediately = true; | |
| } | |
| startJobInternal(job.id, job.interval_ms, job.webhook_url, job.payload, delayData, runImmediately); | |
| count++; | |
| } | |
| console.log(`β Successfully hydrated and scheduled ${count} jobs.`); | |
| } | |
| app.get('/', (req, res) => res.json({ status: "Cron Registry Active", active_jobs: activeJobs.size })); | |
| app.post('/register', async (req, res) => { | |
| const { secret, jobId, intervalMs, webhookUrl, payload, initialDelay, leadId } = req.body; | |
| if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" }); | |
| const { error } = await supabase.from('system_jobs').upsert({ | |
| id: jobId, | |
| lead_id: leadId, | |
| interval_ms: intervalMs, | |
| webhook_url: webhookUrl, | |
| payload: payload, | |
| updated_at: new Date().toISOString() | |
| }); | |
| if (error) return res.status(500).json({ error: error.message }); | |
| const delayData = getNextFiveAMDetails(payload?.timezoneOffset || 0); | |
| if (initialDelay !== undefined) { | |
| delayData.delayMs = initialDelay; | |
| } | |
| startJobInternal(jobId, intervalMs, webhookUrl, payload, delayData, false); | |
| console.log(`β Registered & Scheduled Job (5AM Anchor): ${jobId}`); | |
| res.json({ success: true, jobId }); | |
| }); | |
| app.post('/deregister', async (req, res) => { | |
| const { secret, jobId } = req.body; | |
| if (secret !== CRON_SECRET) return res.status(403).json({ error: "Unauthorized" }); | |
| await supabase.from('system_jobs').delete().eq('id', jobId); | |
| if (activeJobs.has(jobId)) { | |
| activeJobs.delete(jobId); | |
| } | |
| console.log(`β Deregistered Job: ${jobId}`); | |
| res.json({ success: true }); | |
| }); | |
| app.listen(PORT, async () => { | |
| console.log(`π Cron Registry live on port ${PORT}`); | |
| setTimeout(hydrateJobs, 2000); | |
| }); |