Scaper_search / app.py
gaur3009's picture
Update app.py
d48a106 verified
import gradio as gr
from search import search_google
from scraper import scrape_url
from summarizer import summarize_text
from rag import VectorStore
from llm import generate_answer
import asyncio
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor
# Initialize vector store
vs = VectorStore()
# Cached scraping function
@lru_cache(maxsize=100)
def cached_scrape(url):
return scrape_url(url)
async def process_search_results(query):
"""Search and scrape in parallel"""
# Step 1: Search Google for URLs
search_results = search_google(query, num_results=5)
if not search_results:
return None, None
# Step 2: Scrape text from each URL in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
loop = asyncio.get_running_loop()
scrape_tasks = [
loop.run_in_executor(executor, cached_scrape, result['url'])
for result in search_results
]
texts = await asyncio.gather(*scrape_tasks)
return search_results, texts
async def ask_agent(question, progress=gr.Progress()):
progress(0.1, desc="πŸ” Searching the web...")
# Process search results
search_results, texts = await process_search_results(question)
if not search_results:
return "I couldn't find any relevant information. Please try a different question."
progress(0.3, desc="πŸ“š Processing content...")
# Step 3: Summarize each text
with ThreadPoolExecutor(max_workers=3) as executor:
loop = asyncio.get_running_loop()
summarize_tasks = [
loop.run_in_executor(executor, summarize_text, text, 100)
for text in texts
]
summaries = await asyncio.gather(*summarize_tasks)
# Step 4: Add to vector store
vs.add_texts(summaries)
progress(0.6, desc="🧠 Finding relevant information...")
# Step 5: Retrieve top 3 most relevant texts
relevant_texts, indices = vs.retrieve(question, top_k=3)
context = "\n\n".join(relevant_texts)
progress(0.8, desc="πŸ’‘ Generating answer...")
# Step 6: Generate final answer
answer = generate_answer(context, question)
# Format response
response = f"### πŸ€– Assistant\n{answer}\n\n"
response += "### πŸ” Sources Used in This Answer:\n"
# Add sources used in answer
for idx in indices:
result = search_results[idx]
response += f"- [{result['title']}]({result['url']})\n"
# Add other sources
other_indices = [i for i in range(len(search_results)) if i not in indices]
if other_indices:
response += "\n### πŸ“š Other Useful Sources:\n"
for idx in other_indices:
result = search_results[idx]
response += f"- [{result['title']}]({result['url']})\n"
progress(1.0, desc="βœ… Response ready")
return response
# Gradio interface with progress tracking
with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 800px}") as demo:
gr.Markdown("""
<div style="text-align: center">
<h1>πŸ” AI Research Assistant</h1>
<p>I'll search the web and summarize information for you!</p>
</div>
""")
chatbot = gr.Chatbot(height=400, bubble_full_width=False)
msg = gr.Textbox(label="Your Question", placeholder="Ask me anything...")
clear = gr.Button("Clear Conversation")
status = gr.Textbox("", label="Status", interactive=False)
async def respond(message, chat_history):
tracker = []
try:
response = await ask_agent(
message,
progress=lambda p, d, t=tracker: tracker.append((p, d)))
# Update status
if tracker:
status.value = tracker[-1][1]
except Exception as e:
response = f"⚠️ Sorry, I encountered an error: {str(e)[:100]}"
status.value = "Error occurred"
chat_history.append((message, response))
return "", chat_history
msg.submit(respond, [msg, chatbot], [msg, chatbot])
clear.click(lambda: (vs.clear(), None), None, chatbot, queue=False)
if __name__ == "__main__":
demo.queue(concurrency_count=4).launch()