import urllib.request import fitz import re import numpy as np import tensorflow_hub as hub import openai import gradio as gr import os from sklearn.neighbors import NearestNeighbors import requests from cachetools import cached, TTLCache def download_pdf(url, output_path): urllib.request.urlretrieve(url, output_path) def preprocess(text): text = text.replace('\n', ' ') text = re.sub('\s+', ' ', text) return text def pdf_to_text(path, start_page=1, end_page=None): doc = fitz.open(path) total_pages = doc.page_count if end_page is None: end_page = total_pages text_list = [] for i in range(start_page - 1, end_page): text = doc.load_page(i).get_text("text") text = preprocess(text) text_list.append(text) doc.close() return text_list def text_to_chunks(texts, word_length=150, start_page=1): text_toks = [t.split(' ') for t in texts] page_nums = [] chunks = [] for idx, words in enumerate(text_toks): for i in range(0, len(words), word_length): chunk = words[i:i + word_length] if (i + word_length) > len(words) and (len(chunk) < word_length) and ( len(text_toks) != (idx + 1)): text_toks[idx + 1] = chunk + text_toks[idx + 1] continue chunk = ' '.join(chunk).strip() chunk = f'[Page no. {idx + start_page}]' + ' ' + '"' + chunk + '"' chunks.append(chunk) return chunks class SemanticSearch: def __init__(self): self.use = hub.load('https://tfhub.dev/google/universal-sentence-encoder/4') self.fitted = False def fit(self, data, batch=1000, n_neighbors=5): self.data = data self.embeddings = self.get_text_embedding(data, batch=batch) n_neighbors = min(n_neighbors, len(self.embeddings)) self.nn = NearestNeighbors(n_neighbors=n_neighbors) self.nn.fit(self.embeddings) self.fitted = True def __call__(self, text, return_data=True): inp_emb = self.use([text]) neighbors = self.nn.kneighbors(inp_emb, return_distance=False)[0] if return_data: return [self.data[i] for i in neighbors] else: return neighbors def get_text_embedding(self, texts, batch=1000): embeddings = [] for i in range(0, len(texts), batch): text_batch = texts[i:(i + batch)] emb_batch = self.use(text_batch) embeddings.append(emb_batch) embeddings = np.vstack(embeddings) return embeddings def load_recommender(path, start_page=1): global recommender texts = pdf_to_text(path, start_page=start_page) chunks = text_to_chunks(texts, start_page=start_page) recommender.fit(chunks) return 'Corpus Loaded.' def generate_text(openAI_key, prompt, model="gpt-3.5-turbo"): openai.api_key = openAI_key temperature = 0.7 max_tokens = 256 top_p = 1 frequency_penalty = 0 presence_penalty = 0 if model == "text-davinci-003": completions = openai.Completion.create( engine=model, prompt=prompt, max_tokens=max_tokens, n=1, stop=None, temperature=temperature, ) message = completions.choices[0].text else: message = openai.ChatCompletion.create( model=model, messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "assistant", "content": "Here is some initial assistant message."}, {"role": "user", "content": prompt} ], temperature=.3, max_tokens=max_tokens, top_p=top_p, frequency_penalty=frequency_penalty, presence_penalty=presence_penalty, ).choices[0].message['content'] return message def generate_answer(question, openAI_key, model): topn_chunks = recommender(question) prompt = 'search results:\n\n' for c in topn_chunks: prompt += c + '\n\n' prompt += "Instructions: Compose a comprehensive reply to the query using the search results given. " \ "Cite each reference using [ Page Number] notation. " \ "Only answer what is asked. The answer should be short and concise. \n\nQuery: " prompt += f"{question}\nAnswer:" answer = generate_text(openAI_key, prompt, model) return answer def question_answer(chat_history, url, file, question, openAI_key, model): try: if openAI_key.strip() == '': return '[ERROR]: Please enter your Open AI Key. Get your key here : https://platform.openai.com/account/api-keys' if url.strip() == '' and file is None: return '[ERROR]: Both URL and PDF is empty. Provide at least one.' if url.strip() != '' and file is not None: return '[ERROR]: Both URL and PDF is provided. Please provide only one (either URL or PDF).' if model is None or model == '': return '[ERROR]: You have not selected any model. Please choose an LLM model.' if url.strip() != '': glob_url = url download_pdf(glob_url, 'corpus.pdf') load_recommender('corpus.pdf') else: old_file_name = file.name file_name = file.name file_name = file_name[:-12] + file_name[-4:] os.rename(old_file_name, file_name) load_recommender(file_name) if question.strip() == '': return '[ERROR]: Question field is empty' if model == "text-davinci-003" or model == "gpt-4" or model == "gpt-4-32k": answer = generate_answer_text_davinci_003(question, openAI_key) else: answer = generate_answer(question, openAI_key, model) chat_history.append([question, answer]) return chat_history except openai.error.InvalidRequestError as e: return f'[ERROR]: Either you do not have access to GPT4 or you have exhausted your quota!' def generate_text_text_davinci_003(openAI_key, prompt, engine="text-davinci-003"): openai.api_key = openAI_key completions = openai.Completion.create( engine=engine, prompt=prompt, max_tokens=512, n=1, stop=None, temperature=0.7, ) message = completions.choices[0].text return message def generate_answer_text_davinci_003(question, openAI_key): topn_chunks = recommender(question) prompt = "" prompt += 'search results:\n\n' for c in topn_chunks: prompt += c + '\n\n' prompt += "Instructions: Compose a comprehensive reply to the query using the search results given. " \ "Cite each reference using [ Page Number] notation (every result has this number at the beginning). " \ "Citation should be done at the end of each sentence. If the search results mention multiple subjects " \ "with the same name, create separate answers for each. Only include information found in the results and " \ "don't add any additional information. Make sure the answer is correct and don't output false content. " \ "If the text does not relate to the query, simply state 'Found Nothing'. Ignore outlier " \ "search results which has nothing to do with the question. Only answer what is asked. The " \ "answer should be short and concise. \n\nQuery: {question}\nAnswer: " prompt += f"Query: {question}\nAnswer:" answer = generate_text_text_davinci_003(openAI_key, prompt, "text-davinci-003") return answer # pre-defined questions questions = ["这项研究调查了什么?", "你能提供这篇论文的摘要吗?", "这项研究使用了哪些方法论?", "这项研究使用了哪些数据间隔?请告诉我开始日期和结束日期?", "这项研究的主要局限性是什么?", "这项研究的主要缺点是什么?", "这项研究的主要发现是什么?", "这项研究的主要结果是什么?", "这项研究的主要贡献是什么?", "这篇论文的结论是什么?", "这项研究中使用了哪些输入特征?", "这项研究中的因变量是什么?", ] # ============================================================================= CACHE_TIME = 60 * 60 * 6 # 6 hours def parse_arxiv_id_from_paper_url(url): return url.split("/")[-1] @cached(cache=TTLCache(maxsize=500, ttl=CACHE_TIME)) def get_recommendations_from_semantic_scholar(semantic_scholar_id: str): try: r = requests.post( "https://api.semanticscholar.org/recommendations/v1/papers/", json={ "positivePaperIds": [semantic_scholar_id], }, params={"fields": "externalIds,title,year", "limit": 10}, ) return r.json()["recommendedPapers"] except KeyError as e: raise gr.Error( "Error getting recommendations, if this is a new paper it may not yet have" " been indexed by Semantic Scholar." ) from e def filter_recommendations(recommendations, max_paper_count=5): # include only arxiv papers arxiv_paper = [ r for r in recommendations if r["externalIds"].get("ArXiv", None) is not None ] if len(arxiv_paper) > max_paper_count: arxiv_paper = arxiv_paper[:max_paper_count] return arxiv_paper @cached(cache=TTLCache(maxsize=500, ttl=CACHE_TIME)) def get_paper_title_from_arxiv_id(arxiv_id): try: return requests.get(f"https://huggingface.co/api/papers/{arxiv_id}").json()[ "title" ] except Exception as e: print(f"Error getting paper title for {arxiv_id}: {e}") raise gr.Error("Error getting paper title for {arxiv_id}: {e}") from e def format_recommendation_into_markdown(arxiv_id, recommendations): # title = get_paper_title_from_arxiv_id(arxiv_id) # url = f"https://huggingface.co/papers/{arxiv_id}" # comment = f"Recommended papers for [{title}]({url})\n\n" comment = "The following papers were recommended by the Semantic Scholar API \n\n" for r in recommendations: hub_paper_url = f"https://huggingface.co/papers/{r['externalIds']['ArXiv']}" comment += f"* [{r['title']}]({hub_paper_url}) ({r['year']})\n" return comment def return_recommendations(url): arxiv_id = parse_arxiv_id_from_paper_url(url) recommendations = get_recommendations_from_semantic_scholar(f"ArXiv:{arxiv_id}") filtered_recommendations = filter_recommendations(recommendations) return format_recommendation_into_markdown(arxiv_id, filtered_recommendations) # ============================================================================================== recommender = SemanticSearch() # 第一个文件的内容 title_1 = "相关文献导航系统" description_1 = ( "将一篇论文的链接粘贴到下方方框处,然后从文献导航系统获取类似论文的推荐。" "注意:如果论文是新的或尚未被文献导航系统索引,可能无法推荐。" ) examples_1 = [ "https://huggingface.co/papers/2309.12307", "https://huggingface.co/papers/2211.10086", ] # 第二个文件的内容 title_2 = "论文解读系统" description_2 = ( "论文解读系统允许你与你的 PDF 文件进行对话。它使用谷歌的通用句子编码器和深度平均网络(DAN)来提供无幻觉的响应,通过提高 OpenAI 的嵌入质量。" "它在方括号中注明页码([页码]),并显示信息的位置,增加了回应的可信度。" ) with gr.Blocks() as tab1: interface = gr.Interface( return_recommendations, gr.Textbox(lines=1), gr.Markdown(), examples=examples_1, title=title_1, description=description_1, ) with gr.Blocks() as tab2: gr.Markdown(f'

{title_2}

') gr.Markdown(description_2) with gr.Row(): with gr.Group(): gr.Markdown(f'

获取你的Open AI API key here

') with gr.Accordion("API Key"): openAI_key = gr.Textbox(label='在这里输入您的API key(老师如果需要测试,可以先用我的key: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)') url = gr.Textbox(label='输入pdf链接 (Example: https://arxiv.org/pdf/1706.03762.pdf )') gr.Markdown("

OR

") file = gr.File(label='在这里上传您的文件', file_types=['.pdf']) question = gr.Textbox(label='输入您的问题') gr.Examples( [[q] for q in questions], inputs=[question], label="您可能想问?", ) model = gr.Radio([ 'gpt-3.5-turbo', 'gpt-3.5-turbo-16k', 'gpt-3.5-turbo-0613', 'gpt-3.5-turbo-16k-0613', 'text-davinci-003', 'gpt-4', 'gpt-4-32k' ], label='Select Model') btn = gr.Button(value='提交') with gr.Group(): chatbot = gr.Chatbot() # Bind the click event of the button to the question_answer function btn.click( question_answer, inputs=[chatbot, url, file, question, openAI_key, model], outputs=[chatbot], ) # 将两个界面放入一个 Tab 应用中 demo = gr.TabbedInterface([tab1, tab2], ["相关文献导航系统", "论文解读系统"]) demo.launch()