|
import os |
|
import gradio as gr |
|
import faiss |
|
import pickle |
|
import requests |
|
from bs4 import BeautifulSoup |
|
from urllib.parse import urljoin, urlparse |
|
from sentence_transformers import SentenceTransformer |
|
from huggingface_hub import InferenceClient, HfApi |
|
|
|
|
|
HF_REPO_ID = "MoslemBot/kajiweb" |
|
HF_API_TOKEN = os.getenv("HF_TOKEN") |
|
api = HfApi() |
|
|
|
def upload_to_hub(local_path, remote_path): |
|
api.upload_file( |
|
path_or_fileobj=local_path, |
|
path_in_repo=remote_path, |
|
repo_id=HF_REPO_ID, |
|
repo_type="space", |
|
token=HF_API_TOKEN |
|
) |
|
print(f"β
Uploaded to Hub: {remote_path}") |
|
|
|
|
|
embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") |
|
llm = InferenceClient(token=os.getenv("HF_TOKEN")) |
|
|
|
DATA_DIR = "data" |
|
os.makedirs(DATA_DIR, exist_ok=True) |
|
|
|
def extract_links_and_text(base_url, max_depth=1, visited=None): |
|
if visited is None: |
|
visited = set() |
|
if base_url in visited or max_depth < 0: |
|
return [] |
|
|
|
visited.add(base_url) |
|
print(f"π Crawling: {base_url}") |
|
try: |
|
headers = { |
|
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15", |
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
|
"Accept-Language": "en-US,en;q=0.9", |
|
"Referer": base_url, |
|
"Connection": "keep-alive", |
|
} |
|
response = requests.get(base_url, headers=headers, timeout=10) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
page_text = ' '.join([p.get_text() for p in soup.find_all(['p', 'h1', 'h2', 'h3'])]) |
|
result = [(page_text, base_url)] if page_text.strip() else [] |
|
|
|
links = set() |
|
for a in soup.find_all("a", href=True): |
|
href = a["href"] |
|
full_url = urljoin(base_url, href) |
|
if urlparse(full_url).netloc == urlparse(base_url).netloc: |
|
links.add(full_url) |
|
|
|
for link in links: |
|
result.extend(extract_links_and_text(link, max_depth=max_depth-1, visited=visited)) |
|
return result |
|
except Exception as e: |
|
print(f"β Failed to fetch {base_url}: {e}") |
|
return [] |
|
|
|
|
|
def save_webpage(url, title): |
|
folder = os.path.join(DATA_DIR, title.strip()) |
|
if os.path.exists(folder): |
|
return f"'{title}' already exists. Use a different title." |
|
|
|
os.makedirs(folder, exist_ok=True) |
|
|
|
|
|
page_data = extract_links_and_text(url, max_depth=1) |
|
|
|
if not page_data: |
|
return "β No text extracted from the webpage." |
|
|
|
|
|
chunks = [] |
|
sources = [] |
|
for text, source_url in page_data: |
|
for i in range(0, len(text), 500): |
|
chunk = text[i:i+500] |
|
chunks.append(chunk) |
|
sources.append(source_url) |
|
|
|
|
|
embeddings = embedder.encode(chunks) |
|
|
|
print("Embeddings shape:", embeddings.shape) |
|
if len(embeddings.shape) != 2: |
|
raise ValueError(f"Expected 2D embeddings, got shape {embeddings.shape}") |
|
|
|
index = faiss.IndexFlatL2(embeddings.shape[1]) |
|
index.add(embeddings) |
|
|
|
|
|
index_path = os.path.join(folder, "index.faiss") |
|
meta_path = os.path.join(folder, "meta.pkl") |
|
faiss.write_index(index, index_path) |
|
with open(meta_path, "wb") as f: |
|
pickle.dump(list(zip(chunks, sources)), f) |
|
|
|
|
|
upload_to_hub(index_path, f"data/{title}/index.faiss") |
|
upload_to_hub(meta_path, f"data/{title}/meta.pkl") |
|
|
|
return f"β
Saved and indexed '{title}', and uploaded to Hub. Please reload (refresh) the page." |
|
|
|
|
|
def list_titles(): |
|
print(f"Listing in: {DATA_DIR} β {os.listdir(DATA_DIR)}") |
|
return [d for d in os.listdir(DATA_DIR) if os.path.isdir(os.path.join(DATA_DIR, d))] |
|
|
|
|
|
def ask_question(message, history, selected_titles): |
|
if not selected_titles: |
|
return "β Please select at least one webpage." |
|
|
|
combined_answer = "" |
|
for title in selected_titles: |
|
folder = os.path.join(DATA_DIR, title) |
|
try: |
|
index = faiss.read_index(os.path.join(folder, "index.faiss")) |
|
with open(os.path.join(folder, "meta.pkl"), "rb") as f: |
|
chunk_data = pickle.load(f) |
|
|
|
chunks = [cd[0] for cd in chunk_data] |
|
urls = [cd[1] for cd in chunk_data] |
|
|
|
q_embed = embedder.encode([message]) |
|
D, I = index.search(q_embed, k=3) |
|
|
|
response_context = "" |
|
sources_set = set() |
|
for idx in I[0]: |
|
response_context += f"[{urls[idx]}]\n{chunks[idx]}\n\n" |
|
sources_set.add(urls[idx]) |
|
|
|
response = llm.chat_completion( |
|
messages=[ |
|
{"role": "system", "content": "You are a helpful assistant. Answer based only on the given context."}, |
|
{"role": "user", "content": f"Context:\n{response_context}\n\nQuestion: {message}"} |
|
], |
|
model="deepseek-ai/DeepSeek-R1-0528", |
|
max_tokens=2048, |
|
) |
|
|
|
response = response.choices[0].message["content"] |
|
combined_answer += f"**{title}** (sources: {', '.join(sources_set)}):\n{response.strip()}\n\n" |
|
except Exception as e: |
|
combined_answer += f"β οΈ Error with {title}: {str(e)}\n\n" |
|
|
|
return combined_answer.strip() |
|
|
|
|
|
with gr.Blocks() as demo: |
|
with gr.Tab("π Index Web Page"): |
|
url = gr.Textbox(label="Web Page URL") |
|
title = gr.Textbox(label="Title for Web Page") |
|
index_btn = gr.Button("Fetch and Index (with crawl)") |
|
index_status = gr.Textbox(label="Status") |
|
index_btn.click(fn=save_webpage, inputs=[url, title], outputs=index_status) |
|
|
|
with gr.Tab("π¬ Chat with Web Pages"): |
|
page_selector = gr.CheckboxGroup(label="Select Indexed Pages", choices=list_titles()) |
|
refresh_btn = gr.Button("π Refresh List") |
|
refresh_btn.click(fn=list_titles, outputs=page_selector) |
|
chat = gr.ChatInterface(fn=ask_question, additional_inputs=[page_selector]) |
|
|
|
demo.launch() |
|
|