Spaces:
Runtime error
Runtime error
import json | |
from dotenv import load_dotenv | |
import gradio as gr | |
import os | |
from PyPDF2 import PdfReader | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.vectorstores import FAISS | |
from langchain.chains.question_answering import load_qa_chain | |
from langchain.chat_models import ChatOpenAI | |
from langchain.callbacks import get_openai_callback | |
# from requests.exceptions import Timeout | |
import requests | |
from bs4 import BeautifulSoup | |
from urllib.parse import urlparse, urljoin | |
import time | |
import random | |
import os | |
import mimetypes | |
from openai.error import Timeout | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
load_dotenv() | |
knowledge_base = None | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3', | |
} | |
def is_webpage(url): | |
""" | |
判断一个链接是否为网页链接 | |
""" | |
content_type = requests.head(url, headers=headers).headers.get('Content-Type') | |
if content_type is not None: | |
mimetype, encoding = mimetypes.guess_type(url, strict=False) | |
if mimetype is not None and mimetype.startswith('text/html'): | |
return True | |
return False | |
def get_internal_links(url): | |
print('start get internal links') | |
internal_links = [] | |
domain = urlparse(url).netloc # 获取当前网站域名 | |
response = requests.get(url, headers=headers, timeout=5) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
for a in soup.find_all('a', href=True): | |
href = a['href'] | |
if href.startswith('http'): # 外链 | |
if urlparse(href).netloc == domain: # 如果是本站链接 | |
internal_links.append(href) | |
else: # 内链 | |
internal_link = urljoin(url, href) | |
if urlparse(internal_link).netloc == domain: | |
internal_links.append(internal_link) | |
internal_links = list(set(internal_links)) | |
print(internal_links) | |
return internal_links | |
def get_page_content(url): | |
response = requests.get(url, headers=headers, timeout=5) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
content = soup.get_text('\n') | |
time.sleep(random.randint(1, 3)) | |
return content | |
def crawl_site(url): | |
# links_to_visit = get_internal_links(url) | |
links_to_visit = [url] | |
content = "" | |
while links_to_visit: | |
link = links_to_visit.pop(0) | |
content += get_page_content(link) | |
print(f'Page content for {link}:\n') | |
return content | |
def decode_pdf(file_path): | |
encodings = ['utf-8', 'gbk', 'gb2312', 'big5', 'cp1252'] # 常见编码方式 | |
text = "" | |
with open(file_path, 'rb') as f: | |
pdf_reader = PdfReader(f) | |
for encoding in encodings: | |
try: | |
for page in pdf_reader.pages: | |
temp_text = page.extract_text() | |
encode_temp_text = temp_text.encode(encoding) | |
decode_temp_text = encode_temp_text.decode(encoding,'strict') | |
text += decode_temp_text | |
break | |
except UnicodeDecodeError: | |
pass | |
return text | |
def get_pdf_response(file): | |
if file is not None: | |
text = decode_pdf(file) | |
print('pdf text:', text) | |
if text: | |
return get_response(text) | |
else: | |
return {"error": "covert pdf to text failed"} | |
def fix_url(url): | |
try: | |
response = requests.head(url) | |
if response.status_code != 405: | |
return url | |
else: | |
return "https://" + url | |
except requests.exceptions.MissingSchema: | |
return "https://" + url | |
def get_website_response(url): | |
url = fix_url(url) | |
content = crawl_site(url) | |
result = get_response(content) | |
return result | |
def get_response(text): | |
# split into chunks | |
text_splitter = CharacterTextSplitter( | |
separator="\n", | |
chunk_size=1000, | |
chunk_overlap=200, | |
length_function=len | |
) | |
chunks = text_splitter.split_text(text) | |
# create embeddings | |
embeddings = OpenAIEmbeddings() | |
knowledge_base = FAISS.from_texts(chunks, embeddings) | |
return ask_question(knowledge_base) | |
def ask_question(knowledge_base): | |
# user_question = """this content is a web3 project pitch deck. return result as JSON format. Please use the following JSON format to return data. if some fields are incomplete or missing, use 'N/A' to replace it. | |
# {{"project_name":"this project name","introduction":"project introduction, less than 200 words","slogan":"project slogan","features":"project features","description":"project description","roadmap":"g","fundraising":"fundraising target,round, valuation etc."}}""" | |
user_question = """this content is a web3 project pitch deck. return result as JSON format. Please use the following JSON format to return data. if some fields are incomplete or missing, use 'N/A' to replace it. | |
{{"project_name":"this project name","introduction":"project introduction, less than 200 words","slogan":"project slogan","features":"project features","description":"project description","roadmap":"g","fundraising":"fundraising target,round, valuation etc.",contact_email":"project contact email","website":"project official website","twitter":"official twitter","github":"official github","telegram":"official telegram"}}""" | |
print("Question:", user_question) | |
if user_question: | |
# show user input | |
docs = knowledge_base.similarity_search(user_question) | |
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0.7) | |
chain = load_qa_chain(llm, chain_type="stuff") | |
try: | |
with get_openai_callback() as cb: | |
response = chain.run(input_documents=docs, question=user_question) | |
print(f"Total Tokens: {cb.total_tokens}") | |
print(f"Prompt Tokens: {cb.prompt_tokens}") | |
print(f"Completion Tokens: {cb.completion_tokens}") | |
print(f"Total Cost (USD): ${cb.total_cost}") | |
print("Answer:", response) | |
json.loads(response) | |
except json.decoder.JSONDecodeError: | |
response = {"error": "Data can't found"} | |
except Timeout: | |
response = {"error": "Reuest timeout, please try again"} | |
print(json.dumps(response, ensure_ascii=False)) | |
return response | |
def upload_file(file): | |
file_path = file.name | |
file_size = os.path.getsize(file_path) | |
print("File size:", file_size) | |
result = get_pdf_response(file_path) | |
return result | |
with gr.Blocks(title="Use AI boost your deal flow - Ventureflow") as demo: | |
gr.Markdown("# Use AI boost your deal flow") | |
with gr.Tab("Upload Deck"): | |
# file_input = gr.File(file_types=[".pdf"]) | |
upload_button = gr.UploadButton("Click to Upload a Deck(.pdf))", file_types=[".pdf"]) | |
json_output = gr.JSON() | |
upload_button.upload(upload_file, upload_button, json_output) | |
with gr.Tab("Enter Project website"): | |
text_input = gr.Textbox(label="Enter Project website") | |
json_output = gr.JSON() | |
submit_button = gr.Button("Click to Submit") | |
submit_button.click(get_website_response, text_input, json_output) | |
gr.Markdown(""" | |
## Links | |
- Website: [Ventureflow.xyz](https://ventureflow.xyz) | |
- Twitter: [@VentureFlow_xyz](https://twitter.com/VentureFlow_xyz) | |
- App: [app.ventureflow.xyz](https://app.ventureflow.xyz) | |
- Docs: [docs.ventureflow.xyz](https://docs.ventureflow.xyz) | |
""") | |
demo.launch() |