ai_agent / app.py
shaheerawan3's picture
Create app.py
1f73729 verified
import os
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from nltk.probability import FreqDist
from flask import Flask, request, jsonify, render_template
import PyPDF2
import docx
import re
import heapq
# Download necessary NLTK data
nltk.download('punkt')
nltk.download('stopwords')
app = Flask(__name__)
class SimpleDocumentAgent:
def __init__(self):
"""Initialize a simple document processing agent using free libraries."""
self.current_document_text = ""
self.document_name = ""
self.stop_words = set(stopwords.words('english'))
def load_document(self, file_path):
"""Load document from PDF or DOCX file."""
try:
if file_path.endswith('.pdf'):
self.document_name = os.path.basename(file_path)
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
self.current_document_text = ""
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
self.current_document_text += page.extract_text()
elif file_path.endswith('.docx'):
self.document_name = os.path.basename(file_path)
doc = docx.Document(file_path)
self.current_document_text = "\n".join([para.text for para in doc.paragraphs])
else:
return "Unsupported file format. Please use PDF or DOCX."
return f"Successfully loaded {self.document_name}"
except Exception as e:
return f"Error loading document: {str(e)}"
def summarize_document(self, sentences_count=5):
"""Generate a summary using frequency-based extraction."""
if not self.current_document_text:
return "No document loaded. Please load a document first."
# Tokenize the text into sentences
sentences = sent_tokenize(self.current_document_text)
# Calculate word frequencies
words = word_tokenize(self.current_document_text.lower())
words = [word for word in words if word.isalnum() and word not in self.stop_words]
freq_dist = FreqDist(words)
# Calculate sentence scores based on word frequencies
sentence_scores = {}
for i, sentence in enumerate(sentences):
for word in word_tokenize(sentence.lower()):
if word in freq_dist:
if i in sentence_scores:
sentence_scores[i] += freq_dist[word]
else:
sentence_scores[i] = freq_dist[word]
# Get top sentences
summary_sentences_indices = heapq.nlargest(sentences_count,
sentence_scores,
key=sentence_scores.get)
# Sort the indices to preserve original order
summary_sentences_indices.sort()
# Create the summary
summary = [sentences[i] for i in summary_sentences_indices]
return " ".join(summary)
def extract_information(self, info_type):
"""Extract specific information like dates, emails, or phone numbers."""
if not self.current_document_text:
return "No document loaded. Please load a document first."
results = []
if info_type.lower() == "email" or info_type.lower() == "emails":
# Pattern for emails
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
results = re.findall(email_pattern, self.current_document_text)
elif info_type.lower() == "phone" or info_type.lower() == "phones" or info_type.lower() == "phone numbers":
# Pattern for phone numbers
phone_pattern = r'\b(\+\d{1,2}\s)?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}\b'
results = re.findall(phone_pattern, self.current_document_text)
elif info_type.lower() == "date" or info_type.lower() == "dates":
# Pattern for dates (simple pattern, can be improved)
date_pattern = r'\b\d{1,2}[/.-]\d{1,2}[/.-]\d{2,4}\b'
results = re.findall(date_pattern, self.current_document_text)
elif info_type.lower() == "url" or info_type.lower() == "urls" or info_type.lower() == "website" or info_type.lower() == "websites":
# Pattern for URLs
url_pattern = r'https?://[^\s]+'
results = re.findall(url_pattern, self.current_document_text)
else:
# If not a specific pattern, search for occurrences of the term
results = [sentence for sentence in sent_tokenize(self.current_document_text)
if info_type.lower() in sentence.lower()]
if not results:
return f"No {info_type} found in the document."
return results
def answer_question(self, question):
"""Attempt to answer questions about the document using keyword matching."""
if not self.current_document_text:
return "No document loaded. Please load a document first."
# Tokenize the question and remove stop words
question_words = [w.lower() for w in word_tokenize(question)
if w.lower() not in self.stop_words and w.isalnum()]
# Tokenize the document into sentences
sentences = sent_tokenize(self.current_document_text)
# Score sentences based on the question words they contain
sentence_scores = {}
for i, sentence in enumerate(sentences):
words = [w.lower() for w in word_tokenize(sentence)]
score = sum(1 for word in question_words if word in words)
if score > 0:
sentence_scores[i] = score
# If no matches found
if not sentence_scores:
return "I couldn't find information related to your question in the document."
# Get the top 3 most relevant sentences
top_indices = heapq.nlargest(3, sentence_scores, key=sentence_scores.get)
relevant_sentences = [sentences[i] for i in sorted(top_indices)]
return " ".join(relevant_sentences)
# Set up Flask routes
@app.route('/')
def home():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
# Check if the post request has the file part
if 'file' not in request.files:
return jsonify({"error": "No file part"})
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"})
if file:
# Save the file temporarily
file_path = os.path.join("temp", file.filename)
os.makedirs("temp", exist_ok=True)
file.save(file_path)
# Process the file
result = agent.load_document(file_path)
# Remove the temporary file
os.remove(file_path)
return jsonify({"message": result})
@app.route('/summarize', methods=['POST'])
def summarize():
sentences = request.json.get('sentences', 5)
result = agent.summarize_document(sentences)
return jsonify({"summary": result})
@app.route('/extract', methods=['POST'])
def extract():
info_type = request.json.get('info_type', '')
result = agent.extract_information(info_type)
return jsonify({"extracted": result})
@app.route('/question', methods=['POST'])
def question():
query = request.json.get('question', '')
result = agent.answer_question(query)
return jsonify({"answer": result})
# Initialize the agent
agent = SimpleDocumentAgent()
# Create a basic HTML template
@app.route('/get_index_template')
def get_index_template():
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>Document Processing Agent</title>
<style>
body { font-family: Arial, sans-serif; margin: 0; padding: 20px; line-height: 1.6; }
h1 { color: #333; }
.container { max-width: 800px; margin: 0 auto; }
.section { margin-bottom: 20px; padding: 15px; border: 1px solid #ddd; border-radius: 5px; }
button { background-color: #4CAF50; color: white; padding: 10px 15px; border: none; border-radius: 4px; cursor: pointer; }
button:hover { background-color: #45a049; }
input, select { padding: 8px; margin: 10px 0; width: 100%; }
textarea { width: 100%; height: 150px; }
.result { background-color: #f9f9f9; padding: 10px; border-radius: 5px; margin-top: 10px; }
</style>
</head>
<body>
<div class="container">
<h1>Document Processing Agent</h1>
<div class="section">
<h2>Upload Document</h2>
<form id="uploadForm">
<input type="file" id="documentFile" accept=".pdf,.docx">
<button type="submit">Upload</button>
</form>
<div id="uploadResult" class="result"></div>
</div>
<div class="section">
<h2>Summarize Document</h2>
<label for="sentenceCount">Number of sentences:</label>
<input type="number" id="sentenceCount" value="5" min="1" max="20">
<button onclick="summarizeDocument()">Generate Summary</button>
<div id="summaryResult" class="result"></div>
</div>
<div class="section">
<h2>Extract Information</h2>
<select id="infoType">
<option value="email">Emails</option>
<option value="phone">Phone Numbers</option>
<option value="date">Dates</option>
<option value="url">URLs</option>
</select>
<button onclick="extractInfo()">Extract</button>
<div id="extractResult" class="result"></div>
</div>
<div class="section">
<h2>Ask Questions</h2>
<input type="text" id="question" placeholder="Enter your question about the document">
<button onclick="askQuestion()">Ask</button>
<div id="questionResult" class="result"></div>
</div>
</div>
<script>
// Upload document
document.getElementById('uploadForm').addEventListener('submit', function(event) {
event.preventDefault();
const fileInput = document.getElementById('documentFile');
const file = fileInput.files[0];
if (!file) {
alert('Please select a file to upload');
return;
}
const formData = new FormData();
formData.append('file', file);
fetch('/upload', {
method: 'POST',
body: formData
})
.then(response => response.json())
.then(data => {
document.getElementById('uploadResult').textContent = data.message;
})
.catch(error => {
console.error('Error:', error);
document.getElementById('uploadResult').textContent = 'Error uploading file';
});
});
// Summarize
function summarizeDocument() {
const sentences = document.getElementById('sentenceCount').value;
fetch('/summarize', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ sentences: parseInt(sentences) })
})
.then(response => response.json())
.then(data => {
document.getElementById('summaryResult').textContent = data.summary;
})
.catch(error => {
console.error('Error:', error);
document.getElementById('summaryResult').textContent = 'Error generating summary';
});
}
// Extract info
function extractInfo() {
const infoType = document.getElementById('infoType').value;
fetch('/extract', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ info_type: infoType })
})
.then(response => response.json())
.then(data => {
if (Array.isArray(data.extracted)) {
document.getElementById('extractResult').textContent = data.extracted.join('\\n');
} else {
document.getElementById('extractResult').textContent = data.extracted;
}
})
.catch(error => {
console.error('Error:', error);
document.getElementById('extractResult').textContent = 'Error extracting information';
});
}
// Ask question
function askQuestion() {
const question = document.getElementById('question').value;
fetch('/question', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ question: question })
})
.then(response => response.json())
.then(data => {
document.getElementById('questionResult').textContent = data.answer;
})
.catch(error => {
console.error('Error:', error);
document.getElementById('questionResult').textContent = 'Error processing question';
});
}
</script>
</body>
</html>
"""
return html_content
if __name__ == "__main__":
# Create a templates folder and index.html
os.makedirs("templates", exist_ok=True)
with open("templates/index.html", "w") as f:
f.write(get_index_template())
# Run the app
app.run(debug=True)