import os import gradio as gr import faiss import pickle import requests from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse from sentence_transformers import SentenceTransformer from huggingface_hub import InferenceClient, HfApi # Hugging Face Space persistence HF_REPO_ID = "MoslemBot/kajiweb" HF_API_TOKEN = os.getenv("HF_TOKEN") api = HfApi() def upload_to_hub(local_path, remote_path): api.upload_file( path_or_fileobj=local_path, path_in_repo=remote_path, repo_id=HF_REPO_ID, repo_type="space", token=HF_API_TOKEN ) print(f"✅ Uploaded to Hub: {remote_path}") # Initialize embedder and LLM client embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") llm = InferenceClient(token=os.getenv("HF_TOKEN")) DATA_DIR = "data" os.makedirs(DATA_DIR, exist_ok=True) def extract_links_and_text(base_url, max_depth=1, visited=None): if visited is None: visited = set() if base_url in visited or max_depth < 0: return [] visited.add(base_url) print(f"🔗 Crawling: {base_url}") try: headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Referer": base_url, "Connection": "keep-alive", } response = requests.get(base_url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') page_text = ' '.join([p.get_text() for p in soup.find_all(['p', 'h1', 'h2', 'h3'])]) result = [(page_text, base_url)] if page_text.strip() else [] links = set() for a in soup.find_all("a", href=True): href = a["href"] full_url = urljoin(base_url, href) if urlparse(full_url).netloc == urlparse(base_url).netloc: links.add(full_url) for link in links: result.extend(extract_links_and_text(link, max_depth=max_depth-1, visited=visited)) return result except Exception as e: print(f"❌ Failed to fetch {base_url}: {e}") return [] # Save webpage content and index it def save_webpage(url, title): folder = os.path.join(DATA_DIR, title.strip()) if os.path.exists(folder): return f"'{title}' already exists. Use a different title." os.makedirs(folder, exist_ok=True) # Extract text from webpage and its linked pages page_data = extract_links_and_text(url, max_depth=1) if not page_data: return "❌ No text extracted from the webpage." # Chunk text chunks = [] sources = [] for text, source_url in page_data: for i in range(0, len(text), 500): chunk = text[i:i+500] chunks.append(chunk) sources.append(source_url) # Embed and index embeddings = embedder.encode(chunks) print("Embeddings shape:", embeddings.shape) if len(embeddings.shape) != 2: raise ValueError(f"Expected 2D embeddings, got shape {embeddings.shape}") index = faiss.IndexFlatL2(embeddings.shape[1]) index.add(embeddings) # Save index and metadata locally index_path = os.path.join(folder, "index.faiss") meta_path = os.path.join(folder, "meta.pkl") faiss.write_index(index, index_path) with open(meta_path, "wb") as f: pickle.dump(list(zip(chunks, sources)), f) # Upload to hub upload_to_hub(index_path, f"data/{title}/index.faiss") upload_to_hub(meta_path, f"data/{title}/meta.pkl") return f"✅ Saved and indexed '{title}', and uploaded to Hub. Please reload (refresh) the page." # Return all available webpage titles def list_titles(): print(f"Listing in: {DATA_DIR} → {os.listdir(DATA_DIR)}") return [d for d in os.listdir(DATA_DIR) if os.path.isdir(os.path.join(DATA_DIR, d))] # Ask question using selected webpages as context def ask_question(message, history, selected_titles): if not selected_titles: return "❗ Please select at least one webpage." combined_answer = "" for title in selected_titles: folder = os.path.join(DATA_DIR, title) try: index = faiss.read_index(os.path.join(folder, "index.faiss")) with open(os.path.join(folder, "meta.pkl"), "rb") as f: chunk_data = pickle.load(f) # List of (chunk, url) chunks = [cd[0] for cd in chunk_data] urls = [cd[1] for cd in chunk_data] q_embed = embedder.encode([message]) D, I = index.search(q_embed, k=3) response_context = "" sources_set = set() for idx in I[0]: response_context += f"[{urls[idx]}]\n{chunks[idx]}\n\n" sources_set.add(urls[idx]) response = llm.chat_completion( messages=[ {"role": "system", "content": "You are a helpful assistant. Answer based only on the given context."}, {"role": "user", "content": f"Context:\n{response_context}\n\nQuestion: {message}"} ], model="deepseek-ai/DeepSeek-R1-0528", max_tokens=2048, ) response = response.choices[0].message["content"] combined_answer += f"**{title}** (sources: {', '.join(sources_set)}):\n{response.strip()}\n\n" except Exception as e: combined_answer += f"⚠️ Error with {title}: {str(e)}\n\n" return combined_answer.strip() # Gradio UI with gr.Blocks() as demo: with gr.Tab("🌐 Index Web Page"): url = gr.Textbox(label="Web Page URL") title = gr.Textbox(label="Title for Web Page") index_btn = gr.Button("Fetch and Index (with crawl)") index_status = gr.Textbox(label="Status") index_btn.click(fn=save_webpage, inputs=[url, title], outputs=index_status) with gr.Tab("💬 Chat with Web Pages"): page_selector = gr.CheckboxGroup(label="Select Indexed Pages", choices=list_titles()) refresh_btn = gr.Button("🔄 Refresh List") refresh_btn.click(fn=list_titles, outputs=page_selector) chat = gr.ChatInterface(fn=ask_question, additional_inputs=[page_selector]) demo.launch()