Spaces:
Sleeping
Sleeping
# file: app.py | |
import gradio as gr | |
import requests | |
import json | |
import concurrent.futures | |
from concurrent.futures import ThreadPoolExecutor | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_community.document_loaders import WebBaseLoader | |
from langdetect import detect_langs | |
from PyPDF2 import PdfReader | |
from io import BytesIO | |
import logging | |
from dotenv import load_dotenv | |
import os | |
load_dotenv() | |
data = False | |
seen = set() | |
main_url = "https://similar-products-api.vercel.app/search/all" | |
main_product = "Samsung Galaxy" | |
API_URL = "https://api-inference.huggingface.co/models/google/flan-t5-xxl" | |
headers = {"Authorization": f"Bearer {os.getenv('HUGGINGFACE_API_TOKEN')}"} | |
logging.basicConfig(level=logging.INFO) | |
def get_links(product): | |
params = { | |
"API_KEY": "12345", | |
"product": f"{product}", | |
} | |
response = requests.get(main_url, params=params) | |
if response.status_code == 200: | |
results = response.json() | |
return results | |
else: | |
return {} | |
def language_preprocess(text): | |
try: | |
if detect_langs(text)[0].lang == 'en': | |
return True | |
return False | |
except Exception as e: | |
logging.error(f"Language detection error: {e}") | |
return False | |
def relevant(product, similar_product, content): | |
try: | |
payload = {"inputs": f'''Do you think that the given content is similar to {similar_product} and {product}, just Respond True or False \nContent for similar product: {content[:700]}'''} | |
response = requests.post(API_URL, headers=headers, json=payload) | |
output = response.json() | |
return bool(output[0]['generated_text']) | |
except Exception as e: | |
logging.error(f"Relevance checking error: {e}") | |
return False | |
def download_pdf(url, timeout=10): | |
try: | |
response = requests.get(url, timeout=timeout) | |
response.raise_for_status() | |
return BytesIO(response.content) | |
except requests.RequestException as e: | |
logging.error(f"PDF download error: {e}") | |
return None | |
def extract_text_from_pdf(pdf_file, pages): | |
reader = PdfReader(pdf_file) | |
extracted_text = "" | |
try: | |
for page_num in pages: | |
if page_num < len(reader.pages): | |
page = reader.pages[page_num] | |
extracted_text += page.extract_text() + "\n" | |
else: | |
logging.warning(f"Page {page_num} does not exist in the document.") | |
return extracted_text | |
except Exception as e: | |
logging.error(f"PDF text extraction error: {e}") | |
return 'हे चालत नाही' | |
def extract_text_online(link): | |
loader = WebBaseLoader(link) | |
pages = loader.load_and_split() | |
text = '' | |
for page in pages[:3]: | |
text+=page.page_content | |
return text | |
def process_link(link, similar_product): | |
if link in seen: | |
return None | |
seen.add(link) | |
try: | |
if link[-3:]=='.md': | |
text = extract_text_online(link) | |
else: | |
pdf_file = download_pdf(link) | |
text = extract_text_from_pdf(pdf_file, [0, 2, 4]) | |
if language_preprocess(text): | |
if relevant(main_product, similar_product, text): | |
return link | |
except: | |
pass | |
return None | |
def filtering(urls, similar_product): | |
res = [] | |
with ThreadPoolExecutor() as executor: | |
futures = {executor.submit(process_link, link, similar_product): link for link in urls} | |
for future in concurrent.futures.as_completed(futures): | |
result = future.result() | |
if result is not None: | |
res.append(result) | |
return res | |
def wikipedia_url(product): | |
api_url = "https://en.wikipedia.org/w/api.php" | |
params = { | |
"action": "opensearch", | |
"search": product, | |
"limit": 5, | |
"namespace": 0, | |
"format": "json" | |
} | |
try: | |
response = requests.get(api_url, params=params) | |
response.raise_for_status() | |
data = response.json() | |
if data and len(data) > 3 and len(data[3]) > 0: | |
return data[3] | |
else: | |
return [] | |
except requests.RequestException as e: | |
logging.error(f"Error fetching Wikipedia URLs: {e}") | |
return [] | |
def preprocess_initial(product): | |
return get_links(product) | |
def preprocess_filter(product, data): | |
for similar_product in data: | |
# if similar_product != product: | |
if list(data[similar_product][0])[0] == 'duckduckgo': | |
s = set(('duckduckgo', 'google', 'archive')) | |
temp = [] | |
for idx, item in enumerate(data[similar_product]): | |
if list(item)[0] in s: | |
urls = data[similar_product][idx][list(item)[0]] | |
temp += filtering(urls, similar_product) | |
else: | |
temp += data[similar_product][idx][list(item)[0]] | |
data[similar_product] = temp | |
data[similar_product] += wikipedia_url(similar_product) | |
else: | |
urls = data[similar_product] | |
data[similar_product] = filtering(urls, similar_product) | |
data[similar_product] += wikipedia_url(similar_product) | |
logging.info('Filtering completed') | |
return data | |
def main(product_name): | |
return preprocess_initial(product_name) | |
def filter_links(product_name, initial_data): | |
return preprocess_filter(product_name, initial_data) | |
with gr.Blocks() as demo: | |
product_name = gr.Textbox(label="Product Name") | |
get_links_btn = gr.Button("Get Links") | |
initial_links_output = gr.JSON() | |
filter_btn = gr.Button("Filter Links") | |
filtered_links_output = gr.JSON() | |
get_links_btn.click(fn=main, inputs=product_name, outputs=initial_links_output) | |
filter_btn.click(fn=filter_links, inputs=[product_name, initial_links_output], outputs=filtered_links_output) | |
if __name__ == "__main__": | |
demo.launch() | |