Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify, render_template | |
import requests | |
from bs4 import BeautifulSoup | |
from googlesearch import search | |
from duckduckgo_search import DDGS | |
import concurrent.futures | |
import re | |
app = Flask(__name__) | |
API_KEY_DEFAULT = '12345' | |
# Function to search DuckDuckGo | |
def duckduckgo_search(query): | |
try: | |
results = DDGS().text(f"{query} manual filetype:pdf", max_results=5) | |
return [res['href'] for res in results] | |
except: | |
return [] | |
# Function to search Google | |
def google_search(query): | |
links = [] | |
try: | |
api_key = 'AIzaSyDV_uJwrgNtawqtl6GDfeUj6NqO-H1tA4c' | |
search_engine_id = 'c4ca951b9fc6949cb' | |
url = f"https://www.googleapis.com/customsearch/v1" | |
params = { | |
"key": api_key, | |
"cx": search_engine_id, | |
"q": query + " manual filetype:pdf" | |
} | |
response = requests.get(url, params=params) | |
results = response.json() | |
for item in results.get('items', []): | |
links.append(item['link']) | |
except: | |
pass | |
try: | |
extension = "ext:pdf" | |
for result in search(query + " manual " + extension, num_results=5): | |
if result.endswith('.pdf'): | |
links.append(result) | |
except: | |
pass | |
return links | |
# Function to search Internet Archive | |
def archive_search(query): | |
try: | |
url = "https://archive.org/advancedsearch.php" | |
params = { | |
'q': f'{query} manual', | |
'fl[]': ['identifier', 'title', 'format'], | |
'rows': 50, | |
'page': 1, | |
'output': 'json' | |
} | |
# Make the request | |
response = requests.get(url, params=params) | |
data = response.json() | |
# Function to extract hyperlinks from a webpage | |
def extract_hyperlinks(url): | |
# Send a GET request to the URL | |
response = requests.get(url) | |
# Check if the request was successful | |
if response.status_code == 200: | |
# Parse the HTML content of the page | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# Find all <a> tags (hyperlinks) | |
for link in soup.find_all('a', href=True): | |
href = link['href'] | |
if href.endswith('.pdf'): | |
pdf_files.append(url+'/'+href) | |
if href.endswith('.iso'): | |
# If the link ends with .iso, follow the link and extract .pdf hyperlinks | |
extract_pdf_from_iso(url+'/'+href+'/') | |
# Function to extract .pdf hyperlinks from an .iso file | |
def extract_pdf_from_iso(iso_url): | |
# Send a GET request to the ISO URL | |
iso_response = requests.get(iso_url) | |
# Check if the request was successful | |
if iso_response.status_code == 200: | |
# Parse the HTML content of the ISO page | |
iso_soup = BeautifulSoup(iso_response.text, 'html.parser') | |
# Find all <a> tags (hyperlinks) in the ISO page | |
for link in iso_soup.find_all('a', href=True): | |
href = link['href'] | |
if href.endswith('.pdf'): | |
pdf_files.append('https:'+href) | |
pdf_files = [] | |
def process_doc(doc): | |
identifier = doc.get('identifier', 'N/A') | |
# title = doc.get('title', 'N/A') | |
# format = doc.get('format', 'N/A') | |
pdf_link = f"https://archive.org/download/{identifier}" | |
extract_hyperlinks(pdf_link) | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
futures = [executor.submit(process_doc, doc) for doc in data['response']['docs']] | |
# Optionally, wait for all futures to complete and handle any exceptions | |
for future in concurrent.futures.as_completed(futures): | |
try: | |
future.result() # This will raise an exception if the function call raised | |
except Exception as exc: | |
print(f'Generated an exception: {exc}') | |
return pdf_files | |
except: | |
return [] | |
def github_search(query): | |
try: | |
# GitHub Search API endpoint | |
url = f"https://api.github.com/search/code?q={query}+extension:md" | |
headers = { | |
'Authorization': 'Token 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' | |
} | |
# Make the request | |
response = requests.get(url,headers=headers) | |
data = response.json() | |
links = [item['html_url'].replace('/blob','').replace('//github','//raw.github') for item in data['items']] | |
return links | |
except: | |
return [] | |
#Similarity Check | |
def extract_similar_products(query): | |
results = DDGS().chat(f'{query} Similar Products') | |
pattern = r'^\d+\.\s(.+)$' | |
matches = re.findall(pattern, results, re.MULTILINE) | |
matches = [item.split(': ')[0] for item in matches] | |
print(matches) | |
return matches[:5] if matches else [] | |
# Define API routes ------------------------------------------------------- | |
def home(): | |
return render_template('index.html') | |
def search_google(): | |
if request.method == 'POST': | |
data = request.get_json() | |
api_key = data.get('API_KEY') | |
product = data.get('product') | |
else: | |
product = request.args.get('product') | |
api_key = request.args.get('API_KEY') | |
similar_products = extract_similar_products(product) | |
if api_key == API_KEY_DEFAULT: | |
results = {product: google_search(product)} | |
for p in similar_products: | |
results[p] = google_search(p) | |
return jsonify(results) | |
else: | |
return jsonify({'error': 'Invalid API key'}), 401 | |
def search_duckduckgo(): | |
if request.method == 'POST': | |
data = request.get_json() | |
api_key = data.get('API_KEY') | |
product = data.get('product') | |
else: | |
product = request.args.get('product') | |
api_key = request.args.get('API_KEY') | |
similar_products = extract_similar_products(product) | |
if api_key == API_KEY_DEFAULT: | |
results = {product: duckduckgo_search(product)} | |
for p in similar_products: | |
results[p] = duckduckgo_search(p) | |
return jsonify(results) | |
else: | |
return jsonify({'error': 'Invalid API key'}), 401 | |
def search_archive(): | |
if request.method == 'POST': | |
data = request.get_json() | |
api_key = data.get('API_KEY') | |
product = data.get('product') | |
else: | |
product = request.args.get('product') | |
api_key = request.args.get('API_KEY') | |
# Retrieve custom headers if any | |
similar_products = extract_similar_products(product) | |
if api_key == API_KEY_DEFAULT: | |
results = {product: archive_search(product)} | |
def process_product(product): | |
return product, archive_search(product) | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
# Map the process_product function to similar_products | |
future_to_product = {executor.submit(process_product, p): p for p in similar_products} | |
# Collect results as they complete | |
for future in concurrent.futures.as_completed(future_to_product): | |
product, result = future.result() | |
results[product] = result | |
return jsonify(results) | |
else: | |
return jsonify({'error': 'Invalid API key'}), 401 | |
def search_github(): | |
if request.method == 'POST': | |
data = request.get_json() | |
api_key = data.get('API_KEY') | |
product = data.get('product') | |
else: | |
product = request.args.get('product') | |
api_key = request.args.get('API_KEY') | |
similar_products = extract_similar_products(product) | |
if api_key == API_KEY_DEFAULT: | |
results = {product: github_search(product)} | |
for p in similar_products: | |
results[p] = github_search(p) | |
return jsonify(results) | |
else: | |
return jsonify({'error': 'Invalid API key'}), 401 | |
def search_all(): | |
if request.method == 'POST': | |
data = request.get_json() | |
api_key = data.get('API_KEY') | |
product = data.get('product') | |
else: | |
product = request.args.get('product') | |
api_key = request.args.get('API_KEY') | |
similar_products = extract_similar_products(product) | |
if api_key == API_KEY_DEFAULT: | |
results = { | |
product : [{'duckduckgo': duckduckgo_search(product)},{'google': google_search(product)},{'github': github_search(product)},{'archive': archive_search(product)}] | |
} | |
def search_product(p): | |
return { | |
'product': p, | |
'duckduckgo': duckduckgo_search(p), | |
'google': google_search(p), | |
'github': github_search(p), | |
'archive': archive_search(p) | |
} | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future_to_product = {executor.submit(search_product, p): p for p in similar_products} | |
for future in concurrent.futures.as_completed(future_to_product): | |
result = future.result() | |
product = result['product'] | |
results[product] = [ | |
{'duckduckgo': result['duckduckgo']}, | |
{'google': result['google']}, | |
{'github': result['github']}, | |
{'archive': result['archive']} | |
] | |
return jsonify(results) | |
else: | |
return jsonify({'error': 'Invalid API key'}), 401 | |
# Run the Flask app | |
if __name__ == '__main__': | |
app.run(debug=True) | |