Spaces:
Running
Running
import os | |
import shutil | |
import gradio as gr | |
from ingestion import ( | |
load_and_ingest_file, | |
load_and_ingest_url, | |
clear_database, | |
delete_embeddings_by_source, | |
) | |
from qa_pipeline import answer_question | |
INGESTED_URLS_FILE = "./ingested_urls.txt" | |
UPLOAD_DIR = "./uploads" | |
def handle_file_upload(file): | |
filename = os.path.basename(file.name) | |
file_path = f"./uploads/{filename}" | |
upload_dir = "uploads" | |
os.makedirs("./uploads", exist_ok=True) | |
destination = os.path.join(upload_dir, filename) | |
shutil.copy2(file.name, destination) | |
load_and_ingest_file(file_path) | |
return "File processed and embedded successfully." | |
def handle_url_ingestion(url): | |
load_and_ingest_url(url) | |
save_url(url) | |
return "URL content processed and embedded successfully." | |
def handle_file_upload_with_progress(file): | |
"""File upload with progress indicator""" | |
if not file: | |
return "No file selected.", gr.update(visible=False) | |
try: | |
# Copy file | |
filename = os.path.basename(file.name) | |
file_path = f"./uploads/{filename}" | |
upload_dir = "uploads" | |
os.makedirs("./uploads", exist_ok=True) | |
destination = os.path.join(upload_dir, filename) | |
shutil.copy2(file.name, destination) | |
# Process and embed | |
load_and_ingest_file(file_path) | |
return f"File '{filename}' processed and embedded successfully!", gr.update( | |
visible=True | |
) | |
except Exception as e: | |
return f"Error processing file: {str(e)}", gr.update(visible=True) | |
def handle_url_ingestion_with_progress(url): | |
"""URL ingestion with progress indicator""" | |
if not url or not url.strip(): | |
return "No URL provided.", gr.update(visible=False) | |
try: | |
# Ingest URL content | |
load_and_ingest_url(url.strip()) | |
# Save URL to file | |
save_url(url.strip()) | |
return f"URL '{url.strip()}' processed and embedded successfully!", gr.update( | |
visible=True | |
) | |
except Exception as e: | |
return f"Error processing URL: {str(e)}", gr.update(visible=True) | |
def handle_question(question): | |
return answer_question(question) | |
def list_uploaded_files(): | |
files = [] | |
for filename in os.listdir(UPLOAD_DIR): | |
full_path = os.path.join(UPLOAD_DIR, filename) | |
if os.path.isfile(full_path): | |
files.append(full_path) | |
return files | |
def save_url(url: str): | |
with open(INGESTED_URLS_FILE, "a") as f: | |
f.write(url.strip() + "\n") | |
def get_saved_urls() -> str: | |
if not os.path.exists(INGESTED_URLS_FILE): | |
return "<i>No URLs ingested yet.</i>" | |
links_html = "" | |
with open(INGESTED_URLS_FILE, "r") as f: | |
for i, line in enumerate(f): | |
url = line.strip() | |
links_html += f'<div style="margin: 2px 0; padding: 8px; border: 1px solid #ddd; border-radius: 5px; background-color: #f9f9f9;"><a href="{url}" target="_blank">{url}</a></div>' | |
return links_html | |
def get_saved_urls_list(): | |
"""Get list of ingested URLs for dropdown""" | |
urls = [] | |
if os.path.exists(INGESTED_URLS_FILE): | |
with open(INGESTED_URLS_FILE, "r") as f: | |
for line in f: | |
url = line.strip() | |
if url: | |
urls.append(url) | |
return urls | |
def delete_url_by_url(url_to_delete: str): | |
"""Delete URL by its actual URL string and its embeddings""" | |
if not os.path.exists(INGESTED_URLS_FILE): | |
return "No URLs to delete." | |
try: | |
with open(INGESTED_URLS_FILE, "r") as f: | |
urls = f.readlines() | |
# Find and remove the URL | |
found = False | |
for i, url in enumerate(urls): | |
if url.strip() == url_to_delete: | |
urls.pop(i) | |
found = True | |
break | |
if found: | |
with open(INGESTED_URLS_FILE, "w") as f: | |
f.writelines(urls) | |
# Delete embeddings for this URL | |
embeddings_result = delete_embeddings_by_source(url_to_delete) | |
return f"Deleted URL: {url_to_delete}\n{embeddings_result}" | |
else: | |
return f"URL not found: {url_to_delete}" | |
except Exception as e: | |
return f"Error deleting URL: {str(e)}" | |
def delete_uploaded_file(filename: str): | |
"""Delete an uploaded file and its embeddings""" | |
try: | |
file_path = os.path.join(UPLOAD_DIR, filename) | |
if os.path.exists(file_path): | |
# Delete the file | |
os.remove(file_path) | |
# Delete embeddings for this file | |
embeddings_result = delete_embeddings_by_source(file_path) | |
return f"Deleted file: {filename}\n{embeddings_result}" | |
else: | |
return f"File not found: {filename}" | |
except Exception as e: | |
return f"Error deleting file: {str(e)}" | |
def get_uploaded_files_list(): | |
"""Get list of uploaded files with delete buttons""" | |
files = [] | |
if os.path.exists(UPLOAD_DIR): | |
for filename in os.listdir(UPLOAD_DIR): | |
full_path = os.path.join(UPLOAD_DIR, filename) | |
if os.path.isfile(full_path): | |
files.append(filename) | |
return files | |
with gr.Blocks() as demo: | |
gr.Markdown("# π Developer Docs Assistant") | |
with gr.Tab("Upload Document"): | |
with gr.Row(): | |
with gr.Column(scale=2): | |
file = gr.File( | |
label="Upload Document", | |
file_types=[".pdf", ".txt", ".md", ".markdown"], | |
) | |
upload_btn = gr.Button("π€ Ingest File", variant="primary") | |
upload_output = gr.Textbox(label="Upload Result", visible=False) | |
# Progress indicator | |
upload_progress = gr.HTML( | |
value="<div style='text-align: center; color: #666;'>Ready to upload</div>", | |
label="Status", | |
) | |
with gr.Column(scale=1): | |
gr.Markdown("### π Upload Instructions") | |
gr.Markdown( | |
""" | |
1. **Select File**: Choose a PDF, TXT, or Markdown file | |
2. **Click Upload**: The file will be processed and embedded | |
3. **Wait**: Processing may take a few moments | |
4. **Check Status**: Monitor the progress indicator | |
""" | |
) | |
def handle_upload_with_progress(file): | |
if not file: | |
return ( | |
"β οΈ Please select a file first.", | |
gr.update(visible=True), | |
gr.update( | |
value="<div style='text-align: center; color: #ff6b6b;'>β No file selected</div>" | |
), | |
) | |
# Show processing status | |
progress_html = """ | |
<div style='text-align: center; color: #4CAF50;'> | |
<div style='margin-bottom: 10px;'>π Processing file...</div> | |
<div style='display: inline-block; width: 20px; height: 20px; border: 3px solid #f3f3f3; border-top: 3px solid #4CAF50; border-radius: 50%; animation: spin 1s linear infinite;'></div> | |
<style> | |
@keyframes spin { | |
0% { transform: rotate(0deg); } | |
100% { transform: rotate(360deg); } | |
} | |
</style> | |
</div> | |
""" | |
try: | |
result = handle_file_upload_with_progress(file) | |
success_html = f""" | |
<div style='text-align: center; color: #4CAF50;'> | |
β {result[0]} | |
</div> | |
""" | |
return result[0], gr.update(visible=True), gr.update(value=success_html) | |
except Exception as e: | |
error_html = f""" | |
<div style='text-align: center; color: #ff6b6b;'> | |
β Error: {str(e)} | |
</div> | |
""" | |
return ( | |
f"β Error: {str(e)}", | |
gr.update(visible=True), | |
gr.update(value=error_html), | |
) | |
upload_btn.click( | |
handle_upload_with_progress, | |
inputs=file, | |
outputs=[upload_output, upload_output, upload_progress], | |
) | |
with gr.Tab("Ingest from URL"): | |
with gr.Row(): | |
with gr.Column(scale=2): | |
url_input = gr.Textbox( | |
label="Document URL", placeholder="https://example.com/document" | |
) | |
url_btn = gr.Button("π Ingest URL", variant="primary") | |
url_output = gr.Textbox(label="URL Processing Result", visible=False) | |
# Progress indicator | |
url_progress = gr.HTML( | |
value="<div style='text-align: center; color: #666;'>Ready to ingest URL</div>", | |
label="Status", | |
) | |
with gr.Column(scale=1): | |
gr.Markdown("### π URL Ingestion Instructions") | |
gr.Markdown( | |
""" | |
1. **Enter URL**: Paste a valid document URL | |
2. **Click Ingest**: Content will be fetched and processed | |
3. **Wait**: Processing may take a few moments | |
4. **Check Status**: Monitor the progress indicator | |
""" | |
) | |
def handle_url_ingestion_with_progress_ui(url): | |
if not url or not url.strip(): | |
return ( | |
"β οΈ Please enter a valid URL.", | |
gr.update(visible=True), | |
gr.update( | |
value="<div style='text-align: center; color: #ff6b6b;'>β No URL provided</div>" | |
), | |
) | |
# Show processing status | |
progress_html = """ | |
<div style='text-align: center; color: #4CAF50;'> | |
<div style='margin-bottom: 10px;'>π Fetching and processing URL...</div> | |
<div style='display: inline-block; width: 20px; height: 20px; border: 3px solid #f3f3f3; border-top: 3px solid #4CAF50; border-radius: 50%; animation: spin 1s linear infinite;'></div> | |
<style> | |
@keyframes spin { | |
0% { transform: rotate(0deg); } | |
100% { transform: rotate(360deg); } | |
} | |
</style> | |
</div> | |
""" | |
try: | |
result = handle_url_ingestion_with_progress(url.strip()) | |
success_html = f""" | |
<div style='text-align: center; color: #4CAF50;'> | |
β {result[0]} | |
</div> | |
""" | |
return result[0], gr.update(visible=True), gr.update(value=success_html) | |
except Exception as e: | |
error_html = f""" | |
<div style='text-align: center; color: #ff6b6b;'> | |
β Error: {str(e)} | |
</div> | |
""" | |
return ( | |
f"β Error: {str(e)}", | |
gr.update(visible=True), | |
gr.update(value=error_html), | |
) | |
url_btn.click( | |
handle_url_ingestion_with_progress_ui, | |
inputs=url_input, | |
outputs=[url_output, url_output, url_progress], | |
) | |
with gr.Tab("Manage Data"): | |
gr.Markdown("# ποΈ Data Management") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### π Uploaded Files") | |
file_dropdown = gr.Dropdown( | |
label="Select File to Delete", | |
choices=get_uploaded_files_list(), | |
interactive=True, | |
) | |
delete_file_btn = gr.Button("ποΈ Delete Selected File", variant="stop") | |
file_delete_output = gr.Textbox( | |
label="File Delete Result", visible=False | |
) | |
def delete_selected_file(filename): | |
if filename: | |
result = delete_uploaded_file(filename) | |
# Refresh the dropdown | |
new_choices = get_uploaded_files_list() | |
return gr.update(value=result, visible=True), gr.update( | |
choices=new_choices | |
) | |
return ( | |
gr.update(value="No file selected", visible=True), | |
gr.update(), | |
) | |
delete_file_btn.click( | |
delete_selected_file, | |
inputs=file_dropdown, | |
outputs=[file_delete_output, file_dropdown], | |
) | |
refresh_files_btn = gr.Button("π Refresh File List") | |
refresh_files_btn.click( | |
lambda: gr.update(choices=get_uploaded_files_list()), | |
outputs=file_dropdown, | |
) | |
with gr.Column(scale=1): | |
gr.Markdown("### π Ingested URLs") | |
# url_links_display = gr.HTML(value=get_saved_urls()) | |
url_dropdown = gr.Dropdown( | |
label="Select URL to Delete", | |
choices=get_saved_urls_list(), | |
interactive=True, | |
) | |
delete_url_btn = gr.Button("ποΈ Delete Selected URL", variant="stop") | |
url_delete_output = gr.Textbox(label="URL Delete Result", visible=False) | |
def delete_selected_url(url): | |
if url: | |
result = delete_url_by_url(url) | |
# Refresh the dropdown and display | |
new_choices = get_saved_urls_list() | |
new_display = get_saved_urls() | |
return ( | |
gr.update(value=result, visible=True), | |
gr.update(choices=new_choices), | |
gr.update(value=new_display), | |
) | |
return ( | |
gr.update(value="No URL selected", visible=True), | |
gr.update(), | |
gr.update(), | |
) | |
delete_url_btn.click( | |
delete_selected_url, | |
inputs=url_dropdown, | |
outputs=[url_delete_output, url_dropdown], | |
) | |
refresh_urls_btn = gr.Button("π Refresh URL List") | |
refresh_urls_btn.click( | |
lambda: ( | |
gr.update(choices=get_saved_urls_list()), | |
gr.update(value=get_saved_urls()), | |
), | |
outputs=[url_dropdown], | |
) | |
gr.Markdown("---") | |
gr.Markdown("### β οΈ Nuclear Option - Clear All Data") | |
gr.Markdown( | |
"**Warning**: This will delete ALL uploaded files, ingested URLs, and clear the entire vector database. This action cannot be undone." | |
) | |
with gr.Row(): | |
clear_all_btn = gr.Button("π₯ Clear All Data", variant="stop", size="lg") | |
clear_output = gr.Textbox(label="Clear All Result", visible=False) | |
def clear_all_data(): | |
# Clear database | |
db_result = clear_database() | |
# Clear uploaded files | |
file_result = "" | |
if os.path.exists(UPLOAD_DIR): | |
for filename in os.listdir(UPLOAD_DIR): | |
file_path = os.path.join(UPLOAD_DIR, filename) | |
if os.path.isfile(file_path): | |
try: | |
os.remove(file_path) | |
file_result += f"Deleted file: {filename}\n" | |
except Exception as e: | |
file_result += f"Error deleting {filename}: {str(e)}\n" | |
# Clear ingested URLs | |
url_result = "" | |
if os.path.exists(INGESTED_URLS_FILE): | |
try: | |
os.remove(INGESTED_URLS_FILE) | |
url_result = "Deleted ingested URLs file\n" | |
except Exception as e: | |
url_result = f"Error deleting URLs file: {str(e)}\n" | |
return f"Database: {db_result}\nFiles: {file_result}URLs: {url_result}" | |
clear_all_btn.click(clear_all_data, outputs=clear_output) | |
# Load initial data | |
demo.load( | |
fn=lambda: gr.update(choices=get_uploaded_files_list()), | |
outputs=file_dropdown, | |
) | |
demo.load( | |
fn=lambda: gr.update(choices=get_saved_urls_list()), outputs=url_dropdown | |
) | |
# demo.load(fn=get_saved_urls, outputs=url_links_display) | |
with gr.Tab("Ask a Question"): | |
with gr.Row(): | |
with gr.Column(scale=2): | |
question_input = gr.Textbox( | |
label="Your Question", | |
placeholder="Ask a question about your documents...", | |
) | |
ask_btn = gr.Button("π€ Get Answer", variant="primary") | |
answer_output = gr.Markdown( | |
label="Answer", value="Answer will appear here..." | |
) | |
def handle_question_with_sources(question): | |
return answer_question(question) | |
ask_btn.click( | |
handle_question_with_sources, inputs=question_input, outputs=answer_output | |
) | |
demo.launch() | |