LiamKhoaLe commited on
Commit
5411a7d
·
1 Parent(s): ec9f00b

Push ingestion Py README

Browse files
ingestion_js/app/api/files/chunks/route.ts ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { NextRequest, NextResponse } from 'next/server'
2
+ import { getFileChunks } from '@/lib/mongo'
3
+
4
+ export const dynamic = 'force-dynamic'
5
+ export const runtime = 'nodejs'
6
+
7
+ export async function GET(req: NextRequest) {
8
+ const { searchParams } = new URL(req.url)
9
+ const user_id = searchParams.get('user_id') || ''
10
+ const project_id = searchParams.get('project_id') || ''
11
+ const filename = searchParams.get('filename') || ''
12
+ const limit = parseInt(searchParams.get('limit') || '20', 10)
13
+ if (!user_id || !project_id || !filename) return NextResponse.json({ error: 'user_id, project_id and filename are required' }, { status: 400 })
14
+ const chunks = await getFileChunks(user_id, project_id, filename, limit)
15
+ return NextResponse.json({ chunks })
16
+ }
ingestion_js/app/api/files/route.ts ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { NextRequest, NextResponse } from 'next/server'
2
+ import { listFiles } from '@/lib/mongo'
3
+
4
+ export const dynamic = 'force-dynamic'
5
+ export const runtime = 'nodejs'
6
+
7
+ export async function GET(req: NextRequest) {
8
+ const { searchParams } = new URL(req.url)
9
+ const user_id = searchParams.get('user_id') || ''
10
+ const project_id = searchParams.get('project_id') || ''
11
+ if (!user_id || !project_id) return NextResponse.json({ error: 'user_id and project_id are required' }, { status: 400 })
12
+ const files = await listFiles(user_id, project_id)
13
+ return NextResponse.json({ files, filenames: files.map((f: any) => f.filename) })
14
+ }
ingestion_js/app/api/upload/route.ts ADDED
@@ -0,0 +1,93 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { NextRequest, NextResponse } from 'next/server'
2
+ import { randomUUID } from 'crypto'
3
+ import { extractPages } from '@/lib/parser'
4
+ import { captionImage } from '@/lib/captioner'
5
+ import { buildCardsFromPages } from '@/lib/chunker'
6
+ import { embedRemote } from '@/lib/embedder'
7
+ import { deleteFileData, storeCards, upsertFileSummary } from '@/lib/mongo'
8
+ import { cheapSummarize } from '@/lib/summarizer'
9
+ import { createJob, getJob, updateJob } from '@/lib/jobs'
10
+
11
+ export const dynamic = 'force-dynamic'
12
+ export const runtime = 'nodejs'
13
+
14
+ export async function GET(req: NextRequest) {
15
+ // Status endpoint: /api/upload?job_id=...
16
+ const { searchParams } = new URL(req.url)
17
+ const job_id = searchParams.get('job_id')
18
+ if (!job_id) return NextResponse.json({ error: 'job_id is required' }, { status: 400 })
19
+ const job = await getJob(job_id)
20
+ if (!job) return NextResponse.json({ error: 'job not found' }, { status: 404 })
21
+ return NextResponse.json({ job_id, status: job.status, total: job.total, completed: job.completed, last_error: job.last_error })
22
+ }
23
+
24
+ export async function POST(req: NextRequest) {
25
+ const form = await req.formData()
26
+ const user_id = String(form.get('user_id') || '')
27
+ const project_id = String(form.get('project_id') || '')
28
+ const fileEntries = form.getAll('files') as File[]
29
+ const replaceRaw = form.get('replace_filenames') as string | null
30
+ const renameRaw = form.get('rename_map') as string | null
31
+
32
+ if (!user_id || !project_id || fileEntries.length === 0) {
33
+ return NextResponse.json({ error: 'user_id, project_id and files are required' }, { status: 400 })
34
+ }
35
+
36
+ const maxFiles = parseInt(process.env.MAX_FILES_PER_UPLOAD || '15', 10)
37
+ const maxMb = parseInt(process.env.MAX_FILE_MB || '50', 10)
38
+ if (fileEntries.length > maxFiles) return NextResponse.json({ error: `Too many files. Max ${maxFiles} allowed per upload.` }, { status: 400 })
39
+
40
+ let replaceSet = new Set<string>()
41
+ try { if (replaceRaw) replaceSet = new Set<string>(JSON.parse(replaceRaw)) } catch {}
42
+ let renameMap: Record<string, string> = {}
43
+ try { if (renameRaw) renameMap = JSON.parse(renameRaw) } catch {}
44
+
45
+ const preloaded: Array<{ name: string; buf: Buffer }> = []
46
+ for (const f of fileEntries) {
47
+ const arr = Buffer.from(await f.arrayBuffer())
48
+ const sizeMb = arr.byteLength / (1024 * 1024)
49
+ if (sizeMb > maxMb) return NextResponse.json({ error: `${f.name} exceeds ${maxMb} MB limit` }, { status: 400 })
50
+ const eff = renameMap[f.name] || f.name
51
+ preloaded.push({ name: eff, buf: arr })
52
+ }
53
+
54
+ const job_id = randomUUID()
55
+ await createJob(job_id, preloaded.length)
56
+
57
+ // Fire-and-forget background processing; response immediately
58
+ processAll(job_id, user_id, project_id, preloaded, replaceSet).catch(async (e) => {
59
+ await updateJob(job_id, { status: 'failed', last_error: String(e) })
60
+ })
61
+
62
+ return NextResponse.json({ job_id, status: 'processing', total_files: preloaded.length })
63
+ }
64
+
65
+ async function processAll(job_id: string, user_id: string, project_id: string, files: Array<{ name: string; buf: Buffer }>, replaceSet: Set<string>) {
66
+ for (let i = 0; i < files.length; i++) {
67
+ const { name: fname, buf } = files[i]
68
+ try {
69
+ if (replaceSet.has(fname)) {
70
+ await deleteFileData(user_id, project_id, fname)
71
+ }
72
+
73
+ const pages = await extractPages(fname, buf)
74
+
75
+ // Best-effort captioning: parser doesn’t expose images; keep behavior parity by skipping or integrating if images available.
76
+ // If images were available, we would append [Image] caption lines to page text here.
77
+
78
+ const cards = await buildCardsFromPages(pages, fname, user_id, project_id)
79
+ const vectors = await embedRemote(cards.map(c => c.content))
80
+ for (let k = 0; k < cards.length; k++) (cards[k] as any).embedding = vectors[k]
81
+
82
+ await storeCards(cards)
83
+
84
+ const fullText = pages.map(p => p.text || '').join('\n\n')
85
+ const summary = await cheapSummarize(fullText, 6)
86
+ await upsertFileSummary(user_id, project_id, fname, summary)
87
+
88
+ await updateJob(job_id, { completed: i + 1, status: (i + 1) < files.length ? 'processing' as const : 'completed' as const })
89
+ } catch (e: any) {
90
+ await updateJob(job_id, { completed: i + 1, last_error: String(e) })
91
+ }
92
+ }
93
+ }
ingestion_js/lib/chunker.ts ADDED
@@ -0,0 +1,62 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import slugify from 'slugify'
2
+ import type { Page } from './parser'
3
+ import { cheapSummarize, cleanChunkText } from './summarizer'
4
+
5
+ const MAX_WORDS = 220
6
+ const OVERLAP_WORDS = 40
7
+
8
+ function byHeadings(text: string): string[] {
9
+ const lines = text.split('\n')
10
+ const parts: string[] = []
11
+ let current: string[] = []
12
+ const flush = () => { if (current.length) { parts.push(current.join('\n')); current = [] } }
13
+ const headingRe = /^(#+\s+|\d+\.|[A-Z][A-Za-z\s\-]{0,40}:?|^\s*\[[A-Za-z ]+\]\s*$)/
14
+ for (const ln of lines) {
15
+ if (headingRe.test(ln)) flush()
16
+ current.push(ln)
17
+ }
18
+ flush()
19
+ return parts.filter(p => p.trim().length > 0)
20
+ }
21
+
22
+ function createOverlappingChunks(blocks: string[]): string[] {
23
+ const out: string[] = []
24
+ let words: string[] = []
25
+ for (const b of blocks) {
26
+ words.push(...b.split(/\s+/))
27
+ while (words.length > MAX_WORDS) {
28
+ const chunk = words.slice(0, MAX_WORDS).join(' ')
29
+ out.push(chunk)
30
+ words = words.slice(MAX_WORDS - OVERLAP_WORDS)
31
+ }
32
+ }
33
+ if (words.length) out.push(words.join(' '))
34
+ return out
35
+ }
36
+
37
+ export async function buildCardsFromPages(pages: Page[], filename: string, user_id: string, project_id: string) {
38
+ let full = ''
39
+ for (const p of pages) full += `\n\n[[Page ${p.page_num}]]\n${(p.text || '').trim()}\n`
40
+ const coarse = byHeadings(full)
41
+ const chunks = createOverlappingChunks(coarse)
42
+
43
+ const out: any[] = []
44
+ for (let i = 0; i < chunks.length; i++) {
45
+ const cleaned = await cleanChunkText(chunks[i])
46
+ const topic = (await cheapSummarize(cleaned, 1)) || (cleaned.slice(0, 80) + '...')
47
+ const summary = await cheapSummarize(cleaned, 3)
48
+ const firstPage = pages[0]?.page_num ?? 1
49
+ const lastPage = pages[pages.length - 1]?.page_num ?? 1
50
+ out.push({
51
+ user_id,
52
+ project_id,
53
+ filename,
54
+ topic_name: topic.slice(0, 120),
55
+ summary,
56
+ content: cleaned,
57
+ page_span: [firstPage, lastPage],
58
+ card_id: `${slugify(filename)}-c${String(i + 1).padStart(4, '0')}`
59
+ })
60
+ }
61
+ return out
62
+ }
ingestion_js/lib/jobs.ts ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { getMongo } from './mongo'
2
+
3
+ export type JobDoc = {
4
+ _id: string
5
+ created_at: number
6
+ total: number
7
+ completed: number
8
+ status: 'processing' | 'completed' | 'failed'
9
+ last_error: string | null
10
+ }
11
+
12
+ export async function createJob(job_id: string, total: number) {
13
+ const { db } = await getMongo()
14
+ const doc: JobDoc = { _id: job_id, created_at: Date.now() / 1000, total, completed: 0, status: 'processing', last_error: null }
15
+ await db.collection('jobs').insertOne(doc)
16
+ }
17
+
18
+ export async function updateJob(job_id: string, fields: Partial<JobDoc>) {
19
+ const { db } = await getMongo()
20
+ await db.collection('jobs').updateOne({ _id: job_id }, { $set: fields })
21
+ }
22
+
23
+ export async function getJob(job_id: string) {
24
+ const { db } = await getMongo()
25
+ return db.collection('jobs').findOne({ _id: job_id })
26
+ }
ingestion_js/lib/summarizer.ts ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ export async function cheapSummarize(text: string, maxSentences = 3): Promise<string> {
2
+ if (!text || text.trim().length < 50) return text.trim()
3
+ try {
4
+ const sentences = text.split(/(?<=[.!?])\s+/).filter(Boolean)
5
+ if (sentences.length <= maxSentences) return text.trim()
6
+ let out = sentences.slice(0, maxSentences).join(' ')
7
+ if (!/[.!?]$/.test(out)) out += '.'
8
+ return out
9
+ } catch {
10
+ return text.length > 200 ? text.slice(0, 200) + '...' : text
11
+ }
12
+ }
13
+
14
+ export async function cleanChunkText(text: string): Promise<string> {
15
+ let t = text
16
+ t = t.replace(/\n\s*Page \d+\s*\n/gi, '\n')
17
+ t = t.replace(/\s{3,}/g, ' ')
18
+ return t.trim()
19
+ }
ingestion_js/tsconfig.json CHANGED
@@ -14,7 +14,11 @@
14
  "isolatedModules": true,
15
  "jsx": "preserve",
16
  "incremental": true,
17
- "types": ["node"]
 
 
 
 
18
  },
19
  "include": ["next-env.d.ts", "**/*.ts", "**/*.tsx"],
20
  "exclude": ["node_modules"]
 
14
  "isolatedModules": true,
15
  "jsx": "preserve",
16
  "incremental": true,
17
+ "types": ["node"],
18
+ "baseUrl": ".",
19
+ "paths": {
20
+ "@/*": ["./*"]
21
+ }
22
  },
23
  "include": ["next-env.d.ts", "**/*.ts", "**/*.tsx"],
24
  "exclude": ["node_modules"]
ingestion_python/README.md CHANGED
@@ -13,7 +13,7 @@ short_description: 'backend for data ingestion'
13
 
14
  A dedicated service for processing file uploads and storing them in MongoDB Atlas. This service mirrors the main system's file processing functionality while running as a separate service to share the processing load.
15
 
16
- [API docs](API.md) | [System docs](COMPATIBILITY.md)
17
 
18
  ## 🏗️ Architecture
19
 
 
13
 
14
  A dedicated service for processing file uploads and storing them in MongoDB Atlas. This service mirrors the main system's file processing functionality while running as a separate service to share the processing load.
15
 
16
+ [API docs](CURL.md)
17
 
18
  ## 🏗️ Architecture
19