| import { loadDocument } from "@midday/documents/loader"; |
| import { |
| getContentSample, |
| isMimeTypeSupportedForProcessing, |
| } from "@midday/documents/utils"; |
| import { triggerJob, triggerJobAndWait } from "@midday/job-client"; |
| import { createClient } from "@midday/supabase/job"; |
| import type { Job } from "bullmq"; |
| import type { ProcessDocumentPayload } from "../../schemas/documents"; |
| import { getDb } from "../../utils/db"; |
| import { detectFileTypeFromBlob } from "../../utils/detect-file-type"; |
| import { updateDocumentWithRetry } from "../../utils/document-update"; |
| import { |
| NonRetryableError, |
| UnsupportedFileTypeError, |
| } from "../../utils/error-classification"; |
| import { |
| convertHeicToJpeg, |
| MAX_HEIC_FILE_SIZE, |
| } from "../../utils/image-processing"; |
| import { TIMEOUTS, withTimeout } from "../../utils/timeout"; |
| import { BaseProcessor } from "../base"; |
|
|
| |
| |
| |
| |
| export class ProcessDocumentProcessor extends BaseProcessor<ProcessDocumentPayload> { |
| async process(job: Job<ProcessDocumentPayload>): Promise<void> { |
| const processStartTime = Date.now(); |
| const { mimetype, filePath, teamId } = job.data; |
| const supabase = createClient(); |
| const db = getDb(); |
| const fileName = filePath.join("/"); |
|
|
| this.logger.info("Starting process-document job", { |
| jobId: job.id, |
| teamId, |
| fileName, |
| mimetype, |
| }); |
|
|
| |
| try { |
| await triggerJob( |
| "notification", |
| { |
| type: "document_uploaded", |
| teamId, |
| fileName: filePath.join("/"), |
| filePath: filePath, |
| mimeType: mimetype, |
| }, |
| "notifications", |
| ); |
| } catch (error) { |
| |
| this.logger.warn("Failed to trigger document_uploaded notification", { |
| teamId, |
| fileName: filePath.join("/"), |
| error: error instanceof Error ? error.message : "Unknown error", |
| }); |
| } |
|
|
| try { |
| const fileName = filePath.join("/"); |
| let fileData: Blob | null = null; |
| let processedMimetype = mimetype; |
|
|
| |
| |
| if (mimetype === "image/heic") { |
| this.logger.info("Converting HEIC to JPG", { filePath: fileName }); |
|
|
| const { data } = await withTimeout( |
| supabase.storage.from("vault").download(fileName), |
| TIMEOUTS.FILE_DOWNLOAD, |
| `File download timed out after ${TIMEOUTS.FILE_DOWNLOAD}ms`, |
| ); |
|
|
| if (!data) { |
| throw new NonRetryableError( |
| "File not found", |
| undefined, |
| "validation", |
| ); |
| } |
|
|
| await this.updateProgress( |
| job, |
| this.ProgressMilestones.FETCHED, |
| "HEIC file downloaded", |
| ); |
|
|
| const buffer = await data.arrayBuffer(); |
|
|
| |
| const fileSizeMB = (buffer.byteLength / (1024 * 1024)).toFixed(2); |
| this.logger.info("HEIC file size", { |
| fileName, |
| sizeBytes: buffer.byteLength, |
| sizeMB: fileSizeMB, |
| }); |
|
|
| |
| |
| if (buffer.byteLength > MAX_HEIC_FILE_SIZE) { |
| this.logger.warn( |
| "HEIC file too large for AI classification - completing with filename", |
| { |
| fileName, |
| teamId, |
| sizeBytes: buffer.byteLength, |
| maxSizeBytes: MAX_HEIC_FILE_SIZE, |
| }, |
| ); |
|
|
| await updateDocumentWithRetry( |
| db, |
| { |
| pathTokens: filePath, |
| teamId, |
| title: filePath.at(-1) ?? "Large HEIC Image", |
| summary: `Large image (${fileSizeMB}MB) - AI classification skipped`, |
| processingStatus: "completed", |
| }, |
| this.logger, |
| ); |
| return; |
| } |
|
|
| |
| try { |
| const { buffer: image } = await convertHeicToJpeg( |
| buffer, |
| this.logger, |
| ); |
|
|
| await this.updateProgress( |
| job, |
| this.ProgressMilestones.PROCESSING, |
| "HEIC converted to JPEG", |
| ); |
|
|
| |
| const { data: uploadedData } = await withTimeout( |
| supabase.storage.from("vault").upload(fileName, image, { |
| contentType: "image/jpeg", |
| upsert: true, |
| }), |
| TIMEOUTS.FILE_UPLOAD, |
| `File upload timed out after ${TIMEOUTS.FILE_UPLOAD}ms`, |
| ); |
|
|
| if (!uploadedData) { |
| throw new Error("Failed to upload converted image"); |
| } |
|
|
| await this.updateProgress( |
| job, |
| this.ProgressMilestones.HALFWAY, |
| "Converted image uploaded", |
| ); |
|
|
| |
| fileData = new Blob([image], { type: "image/jpeg" }); |
| processedMimetype = "image/jpeg"; |
| } catch (conversionError) { |
| |
| |
| this.logger.error( |
| "HEIC conversion failed - completing with fallback", |
| { |
| fileName, |
| teamId, |
| fileSizeMB, |
| error: |
| conversionError instanceof Error |
| ? conversionError.message |
| : "Unknown error", |
| }, |
| ); |
|
|
| await updateDocumentWithRetry( |
| db, |
| { |
| pathTokens: filePath, |
| teamId, |
| title: filePath.at(-1) ?? "HEIC Image", |
| summary: "HEIC conversion failed - original file preserved", |
| processingStatus: "completed", |
| }, |
| this.logger, |
| ); |
| return; |
| } |
| } else { |
| |
| const downloadStartTime = Date.now(); |
| this.logger.info("Downloading file from storage", { |
| jobId: job.id, |
| fileName, |
| teamId, |
| mimetype: processedMimetype, |
| }); |
|
|
| const { data } = await withTimeout( |
| supabase.storage.from("vault").download(fileName), |
| TIMEOUTS.FILE_DOWNLOAD, |
| `File download timed out after ${TIMEOUTS.FILE_DOWNLOAD}ms`, |
| ); |
|
|
| const downloadDuration = Date.now() - downloadStartTime; |
| this.logger.info("File downloaded", { |
| jobId: job.id, |
| fileName, |
| teamId, |
| fileSize: data?.size, |
| duration: `${downloadDuration}ms`, |
| }); |
|
|
| if (!data) { |
| throw new NonRetryableError( |
| "File not found", |
| undefined, |
| "validation", |
| ); |
| } |
|
|
| fileData = data; |
| } |
|
|
| |
| if (processedMimetype === "application/octet-stream" && fileData) { |
| try { |
| const detectionResult = await detectFileTypeFromBlob(fileData); |
|
|
| if (detectionResult.detected) { |
| this.logger.info( |
| "Detected file type from application/octet-stream", |
| { |
| fileName, |
| teamId, |
| detectedMimetype: detectionResult.mimetype, |
| }, |
| ); |
| processedMimetype = detectionResult.mimetype; |
| |
| fileData = new Blob([detectionResult.buffer], { |
| type: detectionResult.mimetype, |
| }); |
| } else { |
| |
| this.logger.warn( |
| "application/octet-stream file type could not be detected - skipping processing", |
| { |
| fileName, |
| teamId, |
| header: detectionResult.buffer.subarray(0, 8).toString("hex"), |
| }, |
| ); |
| |
| await updateDocumentWithRetry( |
| db, |
| { |
| pathTokens: filePath, |
| teamId, |
| processingStatus: "failed", |
| }, |
| this.logger, |
| ); |
| return; |
| } |
| } catch (error) { |
| this.logger.error( |
| "Failed to detect file type for application/octet-stream - will attempt to process as PDF", |
| { |
| fileName, |
| teamId, |
| error: error instanceof Error ? error.message : "Unknown error", |
| }, |
| ); |
| |
| |
| const { data: redownloadedData } = await withTimeout( |
| supabase.storage.from("vault").download(fileName), |
| TIMEOUTS.FILE_DOWNLOAD, |
| `File re-download timed out after ${TIMEOUTS.FILE_DOWNLOAD}ms`, |
| ); |
| if (redownloadedData) { |
| fileData = redownloadedData; |
| processedMimetype = "application/pdf"; |
| } else { |
| throw new Error("Failed to re-download file for type detection"); |
| } |
| } |
| } |
|
|
| |
| if (!isMimeTypeSupportedForProcessing(processedMimetype)) { |
| throw new UnsupportedFileTypeError(processedMimetype, fileName); |
| } |
|
|
| |
| if (processedMimetype.startsWith("image/")) { |
| this.logger.info("Triggering image classification", { |
| fileName, |
| teamId, |
| }); |
|
|
| |
| |
| |
| |
| |
| |
| await triggerJobAndWait( |
| "classify-image", |
| { |
| fileName, |
| teamId, |
| }, |
| "documents", |
| { |
| jobId: `classify-img_${teamId}_${fileName}_${Date.now()}`, |
| timeout: TIMEOUTS.CLASSIFICATION_JOB_WAIT, |
| }, |
| ); |
|
|
| return; |
| } |
|
|
| |
| |
| let document: string | null = null; |
| let documentLoadFailed = false; |
|
|
| try { |
| const parseStartTime = Date.now(); |
| this.logger.info("Parsing document content (extracting text)", { |
| jobId: job.id, |
| fileName, |
| teamId, |
| mimetype: processedMimetype, |
| fileSize: fileData?.size, |
| }); |
|
|
| |
| const loadedDoc = await withTimeout( |
| loadDocument({ |
| content: fileData, |
| metadata: { mimetype: processedMimetype }, |
| }), |
| 60_000, |
| "Document parsing timed out after 60000ms", |
| ); |
|
|
| if (!loadedDoc) { |
| throw new Error("Failed to load document"); |
| } |
|
|
| document = loadedDoc; |
| const parseDuration = Date.now() - parseStartTime; |
| this.logger.info("Document parsed successfully", { |
| jobId: job.id, |
| fileName, |
| teamId, |
| contentLength: document.length, |
| duration: `${parseDuration}ms`, |
| }); |
| } catch (error) { |
| |
| documentLoadFailed = true; |
| this.logger.warn( |
| "Failed to extract document content - completing with fallback", |
| { |
| jobId: job.id, |
| fileName, |
| teamId, |
| mimetype: processedMimetype, |
| error: error instanceof Error ? error.message : "Unknown error", |
| }, |
| ); |
| } |
|
|
| |
| |
| if (documentLoadFailed || !document) { |
| this.logger.info( |
| "Completing document with null values - user can retry classification", |
| { |
| fileName, |
| teamId, |
| documentLoadFailed, |
| }, |
| ); |
| await updateDocumentWithRetry( |
| db, |
| { |
| pathTokens: filePath, |
| teamId, |
| title: undefined, |
| summary: undefined, |
| processingStatus: "completed", |
| }, |
| this.logger, |
| ); |
| return; |
| } |
|
|
| |
| if (document.trim().length === 0) { |
| this.logger.warn("Document loaded but has no extractable content", { |
| fileName, |
| teamId, |
| }); |
| |
| await updateDocumentWithRetry( |
| db, |
| { |
| pathTokens: filePath, |
| teamId, |
| title: undefined, |
| summary: undefined, |
| processingStatus: "completed", |
| }, |
| this.logger, |
| ); |
| return; |
| } |
|
|
| const sample = getContentSample(document); |
|
|
| |
| if (!sample || sample.trim().length === 0) { |
| this.logger.warn( |
| "Document sample is empty, marking as completed without classification", |
| { |
| fileName, |
| teamId, |
| contentLength: document.length, |
| }, |
| ); |
| |
| await updateDocumentWithRetry( |
| db, |
| { |
| pathTokens: filePath, |
| teamId, |
| processingStatus: "completed", |
| }, |
| this.logger, |
| ); |
| return; |
| } |
|
|
| const classificationStartTime = Date.now(); |
| this.logger.info("Triggering document classification", { |
| jobId: job.id, |
| fileName, |
| teamId, |
| contentLength: document.length, |
| sampleLength: sample.length, |
| }); |
|
|
| |
| |
| |
| |
| |
| |
| const classificationJobResult = await triggerJobAndWait( |
| "classify-document", |
| { |
| content: sample, |
| fileName, |
| teamId, |
| }, |
| "documents", |
| { |
| jobId: `classify-doc_${teamId}_${fileName}_${Date.now()}`, |
| timeout: TIMEOUTS.CLASSIFICATION_JOB_WAIT, |
| }, |
| ); |
|
|
| const classificationDuration = Date.now() - classificationStartTime; |
| this.logger.info("Document classification job completed", { |
| jobId: job.id, |
| fileName, |
| teamId, |
| triggeredJobId: classificationJobResult.id, |
| triggeredJobName: "classify-document", |
| duration: `${classificationDuration}ms`, |
| }); |
|
|
| |
| try { |
| await triggerJob( |
| "notification", |
| { |
| type: "document_processed", |
| teamId, |
| fileName, |
| filePath: filePath, |
| mimeType: mimetype, |
| contentLength: document.length, |
| sampleLength: sample.length, |
| }, |
| "notifications", |
| ); |
| } catch (error) { |
| |
| this.logger.warn("Failed to trigger document_processed notification", { |
| teamId, |
| fileName, |
| error: error instanceof Error ? error.message : "Unknown error", |
| }); |
| } |
|
|
| const totalDuration = Date.now() - processStartTime; |
| this.logger.info("process-document job completed successfully", { |
| jobId: job.id, |
| fileName, |
| teamId, |
| contentLength: document.length, |
| sampleLength: sample.length, |
| totalDuration: `${totalDuration}ms`, |
| }); |
| } catch (error) { |
| this.logger.error("Document processing failed", { |
| fileName: filePath.join("/"), |
| teamId, |
| error: error instanceof Error ? error.message : "Unknown error", |
| }); |
|
|
| |
| |
| throw error; |
| } |
| } |
| } |
|
|