File size: 4,979 Bytes
63f2fae
 
 
 
 
 
 
b2ecc50
63f2fae
 
 
 
 
 
 
2218cde
63f2fae
 
 
 
 
 
b2ecc50
 
 
 
 
63f2fae
b2ecc50
 
 
 
 
2218cde
63f2fae
 
2218cde
63f2fae
2218cde
 
63f2fae
2218cde
c4976b4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b2ecc50
c4976b4
 
 
 
 
 
 
 
 
 
 
 
2db7b6b
b2ecc50
 
 
05edc93
 
 
b2ecc50
 
05edc93
 
 
 
 
 
 
 
 
b2ecc50
2218cde
 
63f2fae
 
b2ecc50
 
 
 
 
 
 
 
 
 
 
2218cde
b2ecc50
63f2fae
 
 
2218cde
b2ecc50
2218cde
b2ecc50
2218cde
 
 
63f2fae
 
2218cde
b2ecc50
2218cde
b2ecc50
2218cde
 
 
b2ecc50
2218cde
 
 
63f2fae
 
 
 
2218cde
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import gradio as gr
import os
import re
import PyPDF2
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import json

class PDFAnalyzer:
    def __init__(self):
        self.text_chunks = []
        self.embeddings = None
        self.active_doc = None
        self.model = SentenceTransformer('all-MiniLM-L6-v2')

    def process_pdf(self, filepath):
        try:
            text = self._extract_text(filepath)
            self.text_chunks = self._chunk_text(text)
            self.embeddings = self.model.encode(self.text_chunks)
            self.active_doc = os.path.basename(filepath)
            return json.dumps({
                "status": 200,
                "message": f"Document {self.active_doc} processed successfully",
                "document_id": hash(self.active_doc)
            })
        except Exception as e:
            return json.dumps({
                "status": 500,
                "error": str(e),
                "message": "Document processing failed"
            })

    def _extract_text(self, filepath):
        with open(filepath, 'rb') as f:
            return ''.join([page.extract_text() for page in PyPDF2.PdfReader(f).pages])

    def _chunk_text(self, text):
        return [text[i:i+500] for i in range(0, len(text), 500)]

    def query(self, question):
        if not self.active_doc:
            return json.dumps({
                "status": 400,
                "message": "No document uploaded",
                "results": []
            })
        
        ques_emb = self.model.encode(question)
        similarities = cosine_similarity([ques_emb], self.embeddings)[0]
        best_idx = np.argmax(similarities)
        
        # Convert NumPy types to native Python types
        confidence = float(similarities[best_idx].item())  # Convert to native float
        best_idx = int(best_idx.item())  # Convert to native int
        
        full_answer = self.text_chunks[best_idx]
        
        return json.dumps({
            "status": 200,
            "message": "Success",
            "results": [{
                "text": self._format_answer(full_answer, question),
                "confidence": confidence,
                "document_id": str(hash(self.active_doc)),  # Convert to string
                "metadata": {
                    "chunk_index": best_idx,
                    "document": self.active_doc
                }
            }]
        }, default=lambda x: str(x))  # Fallback string conversion
    
    def _format_answer(self, text, question):
        # Extract focused answer with 100-word context
        sentences = re.split(r'(?<=[.!?]) +', text)
        question_words = set(question.lower().split())
        
        best_sentence = max(sentences, 
                          key=lambda s: len(set(s.lower().split()) & question_words),
                          default="")
        
        all_words = ' '.join(sentences).split()
        try:
            start = max(0, all_words.index(best_sentence.split()[0]) - 50)
            end = start + 100
        except:
            start = 0
            end = 100
            
        return ' '.join(all_words[start:end]) + ("..." if end < len(all_words) else "")

def create_app():
    analyzer = PDFAnalyzer()
    
    def format_response(response):
        try:
            data = json.loads(response)
            if data['status'] != 200:
                return f"Error: {data.get('message', 'Unknown error')}"
            
            result = data['results'][0]
            return f"**Answer** ({result['confidence']:.2f} confidence):\n{result['text']}"
        except:
            return "Error processing response"

    with gr.Blocks(theme=gr.themes.Soft()) as app:
        gr.Markdown("# πŸ“‘ PDF QA Assistant (Cohere-style API)")
        
        with gr.Row():
            with gr.Column(scale=1):
                pdf_upload = gr.File(label="Upload PDF", file_types=[".pdf"])
                status = gr.Markdown("**Status:** Idle")
                gr.Button("Process PDF").click(
                    lambda f: analyzer.process_pdf(f.name) if f else json.dumps({"status": 400, "error": "No file"}),
                    inputs=pdf_upload,
                    outputs=status
                )
            
            with gr.Column(scale=2):
                chatbot = gr.Chatbot(height=400)
                question = gr.Textbox(label="Query", placeholder="Enter your question...")
                question.submit(
                    lambda q,h: h + [(q, format_response(analyzer.query(q)))],
                    inputs=[question, chatbot],
                    outputs=chatbot
                )
                gr.Button("Clear Session").click(
                    lambda: [None, None, "**Status:** Session cleared"],
                    outputs=[chatbot, pdf_upload, status]
                )

    return app

if __name__ == "__main__":
    create_app().launch()