Spaces:
Sleeping
Sleeping
| import re | |
| import duckdb | |
| import pandas as pd | |
| import gradio as gr | |
| from io import StringIO | |
| from langchain_community.vectorstores.duckdb import DuckDB | |
| from langchain_community.embeddings import HuggingFaceBgeEmbeddings | |
| from langchain_community.document_loaders import RecursiveUrlLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.document_transformers import Html2TextTransformer | |
| from langsmith import traceable | |
| TAB_LINES = 22 | |
| # Embedding Model args | |
| model_name = "BAAI/bge-small-en-v1.5" | |
| model_kwargs = {'device': 'cpu'} | |
| encode_kwargs = {'normalize_embeddings': True} | |
| # HuggingFace Embeddings | |
| hf = HuggingFaceBgeEmbeddings( | |
| model_name=model_name, | |
| model_kwargs=model_kwargs, | |
| encode_kwargs=encode_kwargs | |
| ) | |
| # DuckDB Connection | |
| con = duckdb.connect('Collections.duckdb') | |
| # DuckDB Vector Store | |
| vector_store = DuckDB(connection = con, embedding=hf) | |
| def html_only_metadata_extractor(raw_html, url, response): | |
| content_type = response.headers.get("Content-Type", "") | |
| if "text/html" in content_type: | |
| return {"source": url, "content_type": content_type} | |
| return {} | |
| def scrape_text(url, max_depth): | |
| try: | |
| loader = RecursiveUrlLoader( | |
| url=url, | |
| max_depth=max_depth, | |
| check_response_status=True, | |
| metadata_extractor=html_only_metadata_extractor, | |
| prevent_outside=True, | |
| use_async=True | |
| ) | |
| documents = loader.load() | |
| except Exception as e: | |
| print(f"Error loading URL: {e}") | |
| return None | |
| return documents | |
| def clean_text(docs): | |
| html2text = Html2TextTransformer() | |
| docs_transformed = html2text.transform_documents(docs) | |
| for doc in docs_transformed: | |
| doc.page_content = re.sub(r'\n\n+|\n+|\s+', ' ', doc.page_content) | |
| return docs_transformed | |
| def remove_tables(docs): | |
| for doc in docs: | |
| table_pattern = re.compile(r'<table.*?>.*?</table>', re.DOTALL) | |
| doc.page_content = table_pattern.sub('', doc.page_content) | |
| return docs | |
| def format_chunks_with_spaces(chunks): | |
| separator = "\n\n---\n\n" | |
| formatted_chunks = "" | |
| for i, chunk in enumerate(chunks): | |
| formatted_chunks += f"Chunk {i+1}: \n\n" | |
| formatted_chunks += chunk.page_content | |
| formatted_chunks += separator | |
| return formatted_chunks | |
| def format_metdata(docs): | |
| formatted_metadata = "" | |
| for i, doc in enumerate(docs): | |
| formatted_metadata += f"Metadata {i+1}: \n\n" | |
| formatted_metadata += str(doc.metadata) | |
| formatted_metadata += "\n\n---\n\n" | |
| return formatted_metadata | |
| def format_page_content(docs): | |
| formatted_docs = "" | |
| for i, doc in enumerate(docs): | |
| formatted_docs += f"Page Content {i+1}: \n\n" | |
| formatted_docs += str(doc.page_content) | |
| formatted_docs += "\n\n---\n\n" | |
| return formatted_docs | |
| def get_tables(raw_docs): | |
| tables_list = [] | |
| for raw_doc in raw_docs: | |
| try: | |
| tables = pd.read_html(StringIO(str(raw_doc.page_content))) | |
| tables_list.extend(tables) | |
| except Exception as e: | |
| print(f"Error reading table: {e}") | |
| continue | |
| return tables_list | |
| def concat_dfs(df_list): | |
| concatenated_df = pd.concat(df_list, ignore_index=True) | |
| return concatenated_df | |
| def create_embeddings(docs): | |
| ids = vector_store.add_documents(docs) | |
| result = con.execute(f"SELECT * FROM embeddings").fetchdf() | |
| return result[result['id'].isin(ids)] | |
| def get_docs(url, max_depth): | |
| raw_html = scrape_text(url, max_depth) | |
| if raw_html is None: | |
| return None, None, None, None, None | |
| tables_list = get_tables(raw_html) | |
| if tables_list: | |
| concat_tables = concat_dfs(tables_list) | |
| else: | |
| concat_tables = None | |
| tables_rmv_html = remove_tables(raw_html) | |
| clean_docs = clean_text(tables_rmv_html) | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=200) | |
| documents_splits = text_splitter.split_documents(clean_docs) | |
| formatted_chunks = format_chunks_with_spaces(documents_splits) | |
| embeddings = create_embeddings(documents_splits) | |
| return format_page_content(raw_html), format_page_content(clean_docs), concat_tables, format_metdata(raw_html), formatted_chunks, embeddings | |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="purple", secondary_hue="indigo")) as demo: | |
| gr.Image("logo.png", label=None, show_label=False, container=False, height=100) | |
| gr.Markdown(""" | |
| <div style='text-align: center;'> | |
| <strong style='font-size: 36px;'>Domain Document Indexing</strong> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| url_input = gr.Textbox(lines=5, label="URL", placeholder="Enter your URL here...") | |
| with gr.Row(): | |
| max_depth = gr.Slider(1, 50, value=1, step=1, label="Max Depth", interactive=True) | |
| scarpe_url_button = gr.Button(value="Scrape & Create Embeddings", variant="primary") | |
| with gr.Column(elem_id = "col_container", scale=2): | |
| with gr.Tabs(): | |
| with gr.Tab("RAW HTML"): | |
| raw_page_content = gr.Textbox(lines=TAB_LINES, label="Page Content HTML", value="", interactive=False, | |
| autoscroll=False) | |
| with gr.Tab("Clean Content"): | |
| page_content = gr.Textbox(lines=TAB_LINES, label="Clean Page Content", value="", interactive=False, | |
| autoscroll=False) | |
| with gr.Tab("Tables"): | |
| tables = gr.Textbox(lines=TAB_LINES, label="Tables", value="", interactive=False, | |
| autoscroll=False) | |
| with gr.Tab("Chunks"): | |
| parsed_chunks = gr.Textbox(lines=TAB_LINES, label="Parsed Chunks", value="", interactive=False, | |
| autoscroll=False) | |
| with gr.Tab("Metadata"): | |
| metadata = gr.Textbox(lines=TAB_LINES, label="Metadata", value="", interactive=False, | |
| autoscroll=False) | |
| with gr.Tab("Embeddings"): | |
| embeddings = gr.Dataframe(label="Vector Store", interactive=False) | |
| scarpe_url_button.click(get_docs, inputs=[url_input, max_depth], outputs=[raw_page_content, page_content, tables, | |
| metadata, parsed_chunks, embeddings]) | |
| if __name__ == "__main__": | |
| demo.launch() | |