| import { Router, Request, Response } from 'express'; |
| import { v4 as uuidv4 } from 'uuid'; |
| import db from './db'; |
| import { fetchPage } from './fetcher'; |
| import { extractAndNormalize } from './extractor'; |
| import { sliceAndDiff } from './differ'; |
| import { |
| CreateSourceRequest, |
| CreateSourceResponse, |
| GetSourcesQuery, |
| GetSourcesResponse, |
| CreateJobsRequest, |
| CreateJobsResponse, |
| GetJobResponse, |
| SourceRegistry |
| } from './models'; |
|
|
| const router = Router(); |
|
|
| |
| router.post('/sources', (req: Request<{}, {}, CreateSourceRequest>, res: Response<CreateSourceResponse | { error: string }>) => { |
| try { |
| const data = req.body; |
| |
| const source_id = `source_${uuidv4().replace(/-/g, '').substring(0, 16)}`; |
|
|
| const stmt = db.prepare(` |
| INSERT INTO source_registry ( |
| source_id, source_name, source_type, domain, entry_url, url_pattern, |
| parser_type, crawl_frequency, priority, enabled, topic_tags |
| ) VALUES ( |
| @source_id, @source_name, @source_type, @domain, @entry_url, @url_pattern, |
| @parser_type, @crawl_frequency, @priority, @enabled, @topic_tags |
| ) |
| `); |
|
|
| stmt.run({ |
| ...data, |
| source_id, |
| enabled: data.enabled ? 1 : 0, |
| topic_tags: JSON.stringify(data.topic_tags || []) |
| }); |
|
|
| res.json({ source_id, success: true }); |
| } catch (error: any) { |
| res.status(500).json({ error: error.message }); |
| } |
| }); |
|
|
| |
| router.get('/sources', (req: Request<{}, {}, {}, GetSourcesQuery>, res: Response<GetSourcesResponse | { error: string }>) => { |
| try { |
| const { source_type, enabled, priority } = req.query; |
| let query = 'SELECT * FROM source_registry WHERE 1=1'; |
| const params: any[] = []; |
|
|
| if (source_type) { |
| query += ' AND source_type = ?'; |
| params.push(source_type); |
| } |
| if (enabled !== undefined) { |
| query += ' AND enabled = ?'; |
| params.push(String(enabled) === 'true' ? 1 : 0); |
| } |
| if (priority) { |
| query += ' AND priority = ?'; |
| params.push(priority); |
| } |
|
|
| const items = db.prepare(query).all(...params) as any[]; |
| |
| |
| const formattedItems: SourceRegistry[] = items.map(item => ({ |
| ...item, |
| enabled: Boolean(item.enabled), |
| topic_tags: item.topic_tags ? JSON.parse(item.topic_tags) : [] |
| })); |
|
|
| res.json({ |
| items: formattedItems, |
| total: formattedItems.length |
| }); |
| } catch (error: any) { |
| res.status(500).json({ error: error.message }); |
| } |
| }); |
|
|
| |
| async function runCrawlerJob(job_id: string, source_id: string) { |
| try { |
| |
| db.prepare("UPDATE crawl_job SET status = 'running' WHERE job_id = ?").run(job_id); |
|
|
| |
| const snapshot = await fetchPage(job_id, source_id); |
|
|
| |
| const newDoc = extractAndNormalize(snapshot); |
|
|
| |
| if (newDoc) { |
| sliceAndDiff(newDoc); |
| } |
|
|
| |
| db.prepare("UPDATE crawl_job SET status = 'success', ended_at = ? WHERE job_id = ?").run(new Date().toISOString(), job_id); |
| } catch (error: any) { |
| |
| console.error(`Job ${job_id} failed:`, error); |
| db.prepare("UPDATE crawl_job SET status = 'failed', ended_at = ?, error_message = ? WHERE job_id = ?") |
| .run(new Date().toISOString(), error.message || 'Unknown error', job_id); |
| } |
| } |
|
|
| |
| router.post('/jobs', (req: Request<{}, {}, CreateJobsRequest>, res: Response<CreateJobsResponse | { error: string }>) => { |
| try { |
| const { source_ids, trigger_type } = req.body; |
| if (!source_ids || !Array.isArray(source_ids)) { |
| return res.status(400).json({ error: 'source_ids is required and must be an array' }); |
| } |
|
|
| const job_ids: string[] = []; |
| const now = new Date().toISOString(); |
|
|
| const stmt = db.prepare(` |
| INSERT INTO crawl_job ( |
| job_id, source_id, trigger_type, status, started_at |
| ) VALUES ( |
| @job_id, @source_id, @trigger_type, @status, @started_at |
| ) |
| `); |
|
|
| const insertMany = db.transaction((sources: string[]) => { |
| for (const source_id of sources) { |
| const job_id = `job_${uuidv4().replace(/-/g, '').substring(0, 16)}`; |
| stmt.run({ |
| job_id, |
| source_id, |
| trigger_type: trigger_type || 'manual', |
| status: 'queued', |
| started_at: now |
| }); |
| job_ids.push(job_id); |
| } |
| }); |
|
|
| insertMany(source_ids); |
|
|
| |
| for (let i = 0; i < source_ids.length; i++) { |
| runCrawlerJob(job_ids[i] as string, source_ids[i] as string).catch(err => console.error("Worker error:", err)); |
| } |
|
|
| res.json({ job_ids, status: 'queued' }); |
| } catch (error: any) { |
| res.status(500).json({ error: error.message }); |
| } |
| }); |
|
|
| |
| router.get('/jobs/:jobId', (req: Request<{ jobId: string }>, res: Response<GetJobResponse | { error: string }>) => { |
| try { |
| const { jobId } = req.params; |
| const job = db.prepare('SELECT * FROM crawl_job WHERE job_id = ?').get(jobId) as any; |
|
|
| if (!job) { |
| return res.status(404).json({ error: 'Job not found' }); |
| } |
|
|
| res.json({ |
| job_id: job.job_id, |
| source_id: job.source_id, |
| status: job.status, |
| started_at: job.started_at, |
| ended_at: job.ended_at, |
| error_code: job.error_code, |
| retry_count: job.retry_count |
| }); |
| } catch (error: any) { |
| res.status(500).json({ error: error.message }); |
| } |
| }); |
|
|
| |
| router.get('/documents', (req: Request<{}, {}, {}, { source_id?: string }>, res: Response) => { |
| try { |
| const { source_id } = req.query; |
| let query = 'SELECT doc_id, version_date, effective_date, normalized_hash FROM normalized_document WHERE 1=1'; |
| const params: any[] = []; |
| if (source_id) { |
| query += ' AND source_id = ?'; |
| params.push(source_id); |
| } |
| const items = db.prepare(query).all(...params); |
| res.json({ items }); |
| } catch (error: any) { |
| res.status(500).json({ error: error.message }); |
| } |
| }); |
|
|
| |
| router.get('/diff-events', (req: Request<{}, {}, {}, { source_id?: string }>, res: Response) => { |
| try { |
| const { source_id } = req.query; |
| let query = 'SELECT event_id, section_title, change_type, impact_level, topic_tags FROM diff_event WHERE 1=1'; |
| const params: any[] = []; |
| if (source_id) { |
| query += ' AND source_id = ?'; |
| params.push(source_id); |
| } |
| const items = db.prepare(query).all(...params) as any[]; |
| const formattedItems = items.map(item => ({ |
| ...item, |
| topic_tags: item.topic_tags ? JSON.parse(item.topic_tags) : [] |
| })); |
| res.json({ items: formattedItems }); |
| } catch (error: any) { |
| res.status(500).json({ error: error.message }); |
| } |
| }); |
|
|
| |
| router.get('/updates', (req: Request<{}, {}, {}, { app_name?: string, business_line?: string, topics?: string, since?: string, limit?: string }>, res: Response) => { |
| try { |
| const { app_name, business_line, topics, since, limit } = req.query; |
| const limitNum = parseInt(limit || '50', 10); |
| |
| let query = ` |
| SELECT |
| d.event_id, d.section_title, d.change_type, d.new_excerpt as excerpt, d.topic_tags as diff_topic_tags, |
| s.source_name, s.source_type, |
| n.version_date |
| FROM diff_event d |
| JOIN source_registry s ON d.source_id = s.source_id |
| JOIN normalized_document n ON d.to_doc_id = n.doc_id |
| WHERE 1=1 |
| `; |
| const params: any[] = []; |
| |
| if (since) { |
| query += ' AND d.detected_at >= ?'; |
| params.push(since); |
| } |
| |
| query += ' ORDER BY d.detected_at DESC LIMIT ?'; |
| params.push(limitNum); |
| |
| const rows = db.prepare(query).all(...params) as any[]; |
| |
| let topicFilters: string[] = []; |
| if (topics) { |
| topicFilters = topics.split(',').map((t: string) => t.trim()); |
| } |
| |
| const updatesMap = { |
| peer_bank: {} as Record<string, any>, |
| regulator: {} as Record<string, any>, |
| sdk_vendor: {} as Record<string, any> |
| }; |
| |
| for (const row of rows) { |
| const rowTopics = row.diff_topic_tags ? JSON.parse(row.diff_topic_tags) : []; |
| |
| if (topicFilters.length > 0) { |
| const hasTopic = topicFilters.some((t: string) => rowTopics.includes(t)); |
| if (!hasTopic) continue; |
| } |
| |
| const stype = row.source_type as string; |
| const mapKey = ((stype === 'peer_bank' || stype === 'regulator' || stype === 'sdk_vendor') ? stype : 'peer_bank') as 'peer_bank' | 'regulator' | 'sdk_vendor'; |
| |
| const key = `${row.source_name}_${row.version_date}`; |
| if (!updatesMap[mapKey][key]) { |
| updatesMap[mapKey][key] = { |
| source_name: row.source_name, |
| source_type: row.source_type, |
| version_date: row.version_date, |
| topic_tags: new Set<string>(), |
| changed_sections: [] |
| }; |
| } |
| |
| rowTopics.forEach((t: string) => updatesMap[mapKey][key].topic_tags.add(t)); |
| updatesMap[mapKey][key].changed_sections.push({ |
| section_title: row.section_title, |
| change_type: row.change_type, |
| excerpt: row.excerpt |
| }); |
| } |
| |
| const formatUpdates = (map: Record<string, any>) => Object.values(map).map(v => ({ |
| ...v, |
| topic_tags: Array.from(v.topic_tags) |
| })); |
| |
| res.json({ |
| peer_updates: formatUpdates(updatesMap.peer_bank), |
| regulatory_updates: formatUpdates(updatesMap.regulator), |
| sdk_updates: formatUpdates(updatesMap.sdk_vendor) |
| }); |
| } catch (error: any) { |
| res.status(500).json({ error: error.message }); |
| } |
| }); |
|
|
| export default router; |
|
|