Spaces:
Sleeping
Sleeping
import os | |
import nltk | |
from nltk.tokenize import sent_tokenize, word_tokenize | |
from nltk.corpus import stopwords | |
from nltk.probability import FreqDist | |
from flask import Flask, request, jsonify, render_template | |
import PyPDF2 | |
import docx | |
import re | |
import heapq | |
# Download necessary NLTK data | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
app = Flask(__name__) | |
class SimpleDocumentAgent: | |
def __init__(self): | |
"""Initialize a simple document processing agent using free libraries.""" | |
self.current_document_text = "" | |
self.document_name = "" | |
self.stop_words = set(stopwords.words('english')) | |
def load_document(self, file_path): | |
"""Load document from PDF or DOCX file.""" | |
try: | |
if file_path.endswith('.pdf'): | |
self.document_name = os.path.basename(file_path) | |
with open(file_path, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
self.current_document_text = "" | |
for page_num in range(len(pdf_reader.pages)): | |
page = pdf_reader.pages[page_num] | |
self.current_document_text += page.extract_text() | |
elif file_path.endswith('.docx'): | |
self.document_name = os.path.basename(file_path) | |
doc = docx.Document(file_path) | |
self.current_document_text = "\n".join([para.text for para in doc.paragraphs]) | |
else: | |
return "Unsupported file format. Please use PDF or DOCX." | |
return f"Successfully loaded {self.document_name}" | |
except Exception as e: | |
return f"Error loading document: {str(e)}" | |
def summarize_document(self, sentences_count=5): | |
"""Generate a summary using frequency-based extraction.""" | |
if not self.current_document_text: | |
return "No document loaded. Please load a document first." | |
# Tokenize the text into sentences | |
sentences = sent_tokenize(self.current_document_text) | |
# Calculate word frequencies | |
words = word_tokenize(self.current_document_text.lower()) | |
words = [word for word in words if word.isalnum() and word not in self.stop_words] | |
freq_dist = FreqDist(words) | |
# Calculate sentence scores based on word frequencies | |
sentence_scores = {} | |
for i, sentence in enumerate(sentences): | |
for word in word_tokenize(sentence.lower()): | |
if word in freq_dist: | |
if i in sentence_scores: | |
sentence_scores[i] += freq_dist[word] | |
else: | |
sentence_scores[i] = freq_dist[word] | |
# Get top sentences | |
summary_sentences_indices = heapq.nlargest(sentences_count, | |
sentence_scores, | |
key=sentence_scores.get) | |
# Sort the indices to preserve original order | |
summary_sentences_indices.sort() | |
# Create the summary | |
summary = [sentences[i] for i in summary_sentences_indices] | |
return " ".join(summary) | |
def extract_information(self, info_type): | |
"""Extract specific information like dates, emails, or phone numbers.""" | |
if not self.current_document_text: | |
return "No document loaded. Please load a document first." | |
results = [] | |
if info_type.lower() == "email" or info_type.lower() == "emails": | |
# Pattern for emails | |
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
results = re.findall(email_pattern, self.current_document_text) | |
elif info_type.lower() == "phone" or info_type.lower() == "phones" or info_type.lower() == "phone numbers": | |
# Pattern for phone numbers | |
phone_pattern = r'\b(\+\d{1,2}\s)?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}\b' | |
results = re.findall(phone_pattern, self.current_document_text) | |
elif info_type.lower() == "date" or info_type.lower() == "dates": | |
# Pattern for dates (simple pattern, can be improved) | |
date_pattern = r'\b\d{1,2}[/.-]\d{1,2}[/.-]\d{2,4}\b' | |
results = re.findall(date_pattern, self.current_document_text) | |
elif info_type.lower() == "url" or info_type.lower() == "urls" or info_type.lower() == "website" or info_type.lower() == "websites": | |
# Pattern for URLs | |
url_pattern = r'https?://[^\s]+' | |
results = re.findall(url_pattern, self.current_document_text) | |
else: | |
# If not a specific pattern, search for occurrences of the term | |
results = [sentence for sentence in sent_tokenize(self.current_document_text) | |
if info_type.lower() in sentence.lower()] | |
if not results: | |
return f"No {info_type} found in the document." | |
return results | |
def answer_question(self, question): | |
"""Attempt to answer questions about the document using keyword matching.""" | |
if not self.current_document_text: | |
return "No document loaded. Please load a document first." | |
# Tokenize the question and remove stop words | |
question_words = [w.lower() for w in word_tokenize(question) | |
if w.lower() not in self.stop_words and w.isalnum()] | |
# Tokenize the document into sentences | |
sentences = sent_tokenize(self.current_document_text) | |
# Score sentences based on the question words they contain | |
sentence_scores = {} | |
for i, sentence in enumerate(sentences): | |
words = [w.lower() for w in word_tokenize(sentence)] | |
score = sum(1 for word in question_words if word in words) | |
if score > 0: | |
sentence_scores[i] = score | |
# If no matches found | |
if not sentence_scores: | |
return "I couldn't find information related to your question in the document." | |
# Get the top 3 most relevant sentences | |
top_indices = heapq.nlargest(3, sentence_scores, key=sentence_scores.get) | |
relevant_sentences = [sentences[i] for i in sorted(top_indices)] | |
return " ".join(relevant_sentences) | |
# Set up Flask routes | |
def home(): | |
return render_template('index.html') | |
def upload_file(): | |
# Check if the post request has the file part | |
if 'file' not in request.files: | |
return jsonify({"error": "No file part"}) | |
file = request.files['file'] | |
if file.filename == '': | |
return jsonify({"error": "No selected file"}) | |
if file: | |
# Save the file temporarily | |
file_path = os.path.join("temp", file.filename) | |
os.makedirs("temp", exist_ok=True) | |
file.save(file_path) | |
# Process the file | |
result = agent.load_document(file_path) | |
# Remove the temporary file | |
os.remove(file_path) | |
return jsonify({"message": result}) | |
def summarize(): | |
sentences = request.json.get('sentences', 5) | |
result = agent.summarize_document(sentences) | |
return jsonify({"summary": result}) | |
def extract(): | |
info_type = request.json.get('info_type', '') | |
result = agent.extract_information(info_type) | |
return jsonify({"extracted": result}) | |
def question(): | |
query = request.json.get('question', '') | |
result = agent.answer_question(query) | |
return jsonify({"answer": result}) | |
# Initialize the agent | |
agent = SimpleDocumentAgent() | |
# Create a basic HTML template | |
def get_index_template(): | |
html_content = """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Document Processing Agent</title> | |
<style> | |
body { font-family: Arial, sans-serif; margin: 0; padding: 20px; line-height: 1.6; } | |
h1 { color: #333; } | |
.container { max-width: 800px; margin: 0 auto; } | |
.section { margin-bottom: 20px; padding: 15px; border: 1px solid #ddd; border-radius: 5px; } | |
button { background-color: #4CAF50; color: white; padding: 10px 15px; border: none; border-radius: 4px; cursor: pointer; } | |
button:hover { background-color: #45a049; } | |
input, select { padding: 8px; margin: 10px 0; width: 100%; } | |
textarea { width: 100%; height: 150px; } | |
.result { background-color: #f9f9f9; padding: 10px; border-radius: 5px; margin-top: 10px; } | |
</style> | |
</head> | |
<body> | |
<div class="container"> | |
<h1>Document Processing Agent</h1> | |
<div class="section"> | |
<h2>Upload Document</h2> | |
<form id="uploadForm"> | |
<input type="file" id="documentFile" accept=".pdf,.docx"> | |
<button type="submit">Upload</button> | |
</form> | |
<div id="uploadResult" class="result"></div> | |
</div> | |
<div class="section"> | |
<h2>Summarize Document</h2> | |
<label for="sentenceCount">Number of sentences:</label> | |
<input type="number" id="sentenceCount" value="5" min="1" max="20"> | |
<button onclick="summarizeDocument()">Generate Summary</button> | |
<div id="summaryResult" class="result"></div> | |
</div> | |
<div class="section"> | |
<h2>Extract Information</h2> | |
<select id="infoType"> | |
<option value="email">Emails</option> | |
<option value="phone">Phone Numbers</option> | |
<option value="date">Dates</option> | |
<option value="url">URLs</option> | |
</select> | |
<button onclick="extractInfo()">Extract</button> | |
<div id="extractResult" class="result"></div> | |
</div> | |
<div class="section"> | |
<h2>Ask Questions</h2> | |
<input type="text" id="question" placeholder="Enter your question about the document"> | |
<button onclick="askQuestion()">Ask</button> | |
<div id="questionResult" class="result"></div> | |
</div> | |
</div> | |
<script> | |
// Upload document | |
document.getElementById('uploadForm').addEventListener('submit', function(event) { | |
event.preventDefault(); | |
const fileInput = document.getElementById('documentFile'); | |
const file = fileInput.files[0]; | |
if (!file) { | |
alert('Please select a file to upload'); | |
return; | |
} | |
const formData = new FormData(); | |
formData.append('file', file); | |
fetch('/upload', { | |
method: 'POST', | |
body: formData | |
}) | |
.then(response => response.json()) | |
.then(data => { | |
document.getElementById('uploadResult').textContent = data.message; | |
}) | |
.catch(error => { | |
console.error('Error:', error); | |
document.getElementById('uploadResult').textContent = 'Error uploading file'; | |
}); | |
}); | |
// Summarize | |
function summarizeDocument() { | |
const sentences = document.getElementById('sentenceCount').value; | |
fetch('/summarize', { | |
method: 'POST', | |
headers: { | |
'Content-Type': 'application/json', | |
}, | |
body: JSON.stringify({ sentences: parseInt(sentences) }) | |
}) | |
.then(response => response.json()) | |
.then(data => { | |
document.getElementById('summaryResult').textContent = data.summary; | |
}) | |
.catch(error => { | |
console.error('Error:', error); | |
document.getElementById('summaryResult').textContent = 'Error generating summary'; | |
}); | |
} | |
// Extract info | |
function extractInfo() { | |
const infoType = document.getElementById('infoType').value; | |
fetch('/extract', { | |
method: 'POST', | |
headers: { | |
'Content-Type': 'application/json', | |
}, | |
body: JSON.stringify({ info_type: infoType }) | |
}) | |
.then(response => response.json()) | |
.then(data => { | |
if (Array.isArray(data.extracted)) { | |
document.getElementById('extractResult').textContent = data.extracted.join('\\n'); | |
} else { | |
document.getElementById('extractResult').textContent = data.extracted; | |
} | |
}) | |
.catch(error => { | |
console.error('Error:', error); | |
document.getElementById('extractResult').textContent = 'Error extracting information'; | |
}); | |
} | |
// Ask question | |
function askQuestion() { | |
const question = document.getElementById('question').value; | |
fetch('/question', { | |
method: 'POST', | |
headers: { | |
'Content-Type': 'application/json', | |
}, | |
body: JSON.stringify({ question: question }) | |
}) | |
.then(response => response.json()) | |
.then(data => { | |
document.getElementById('questionResult').textContent = data.answer; | |
}) | |
.catch(error => { | |
console.error('Error:', error); | |
document.getElementById('questionResult').textContent = 'Error processing question'; | |
}); | |
} | |
</script> | |
</body> | |
</html> | |
""" | |
return html_content | |
if __name__ == "__main__": | |
# Create a templates folder and index.html | |
os.makedirs("templates", exist_ok=True) | |
with open("templates/index.html", "w") as f: | |
f.write(get_index_template()) | |
# Run the app | |
app.run(debug=True) |