Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import re | |
import requests | |
import tempfile | |
import time | |
from pyzotero import zotero | |
from paperqa import Docs | |
from lxml import html | |
from models import Icons, Message | |
def is_integer(string): | |
try: | |
int(string) | |
except ValueError: | |
return False | |
else: | |
return True | |
def reset_open_ai(openai_api_key): | |
os.environ['OPENAI_API_KEY'] = openai_api_key.strip() | |
return gr.HTML.update(value=None) | |
def fetch_collections(openai_api_key, id, type, key, messages): | |
if openai_api_key == '': | |
messages.append( | |
Message(Icons.ERR, f"Your Open API key is missing. Check out: <a href='https://platform.openai.com/overview' target='_blank'>https://platform.openai.com/overview</a>.")) | |
return ( | |
None, | |
[], | |
None, | |
gr.Button.update(visible=True), | |
gr.HTML.update(visible=True), | |
messages, | |
gr.HTML.update(value=str(messages)), | |
) | |
if key == '': | |
messages.append( | |
Message(Icons.ERR, f"Your Zotero API key is missing. Click <a href='https: // www.zotero.org/settings/keys/new' target='_blank'>here</a> to create a new one.")) | |
return ( | |
None, | |
[], | |
None, | |
gr.Button.update(visible=True), | |
gr.HTML.update(visible=True), | |
messages, | |
gr.HTML.update(value=str(messages)), | |
) | |
if not is_integer(id): | |
messages.append( | |
Message(Icons.ERR, f"Your Zotero ID should be an integer.")) | |
return ( | |
None, | |
[], | |
None, | |
gr.Button.update(visible=True), | |
gr.HTML.update(visible=True), | |
messages, | |
gr.HTML.update(value=str(messages)), | |
) | |
zot = zotero.Zotero(int(id), type.lower(), key) | |
try: | |
collections = zot.collections_top() | |
collection_names = [ | |
f"{x['data']['name']} ({x['meta']['numItems']})" for x in collections] | |
messages.append( | |
Message(Icons.INFO, "Please select a Zotero collection to proceed.")) | |
return ( | |
zot, | |
collections, | |
gr.Radio.update(choices=collection_names, | |
visible=True, interactive=True), | |
gr.Button.update(visible=False), | |
gr.HTML.update(visible=False), | |
messages, | |
gr.HTML.update(value=str(messages)), | |
) | |
except Exception as e: | |
messages.append( | |
Message(Icons.ERR, f"Error occurred when fetching Zotero collection: {e}")) | |
return ( | |
None, | |
[], | |
None, | |
gr.Button.update(visible=True), | |
None, | |
messages, | |
gr.HTML.update(value=str(messages)), | |
) | |
def select_collection(collection, messages): | |
if collection is None: | |
return None, messages, gr.HTML.update(), None | |
collection_name = re.sub('\s\(\d+\)$', '', collection) | |
messages.set([Message( | |
Icons.OK, f"Selected collection: <span style='font-weight: bold'>{collection_name}</span>. Please type your question and hit \"Enter\".")]) | |
return ( | |
gr.Text.update( | |
placeholder="Please type your question and hit \"Enter\".", interactive=True), | |
messages, | |
gr.HTML.update(value=str(messages)), | |
gr.HTML.update(value=None) | |
) | |
def search_attachments(id, type, key, collection, queries=[], limit=10): | |
try: | |
zot = zotero.Zotero(int(id), type.lower(), key) | |
searches = [zot.collection_items( | |
collection['key'], | |
q=q, | |
limit=limit, | |
itemType='attachment', | |
qmode='everything' | |
) for q in queries] | |
attachments = [x for x in {item['key']: item for search in searches for item in search if item['data'] | |
['contentType'] == 'application/pdf'}.values()][:limit] | |
parents = set([a['data']['parentItem'] for a in attachments]) | |
message = f"<div>✅ Found {len(attachments)} PDF {'attachments' if len(attachments) > 1 else 'attachment'} from {len(parents)} {'articles' if len(parents) > 1 else 'article'}.</div>" if len( | |
attachments) else "<div>❔ No results. Make sure to index your PDF attachments in Zotero.</div>" | |
return parents, attachments, message | |
except Exception as e: | |
message = f"<div>⚠️ Error occurred when searching in Zotero: {e}</div>" | |
return [], [], message | |
def download_attachment(id, type, key, attachment): | |
zot = zotero.Zotero(int(id), type.lower(), key) | |
link_mode = attachment['data']['linkMode'] | |
if link_mode == 'imported_file': | |
return zot.file(attachment['key']) | |
elif link_mode == 'imported_url': | |
res = requests.get(attachment['data']['url']) | |
return res.content | |
else: | |
raise ValueError( | |
f'Unsupported link mode: {link_mode} for {attachment["key"]}.') | |
def reset_collection(messages): | |
messages.set([Message( | |
Icons.INFO, "Please provide all the required OpenAI and Zotero information in the left panel.")]) | |
return ( | |
gr.Radio.update(choices=[], visible=False), | |
gr.HTML.update(visible=True), | |
gr.Text.update( | |
placeholder="You have to select a Zotero collection to proceed", interactive=False), | |
gr.HTML.update(value=None), | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None | |
) | |
def handle_submit(zot, collection_name, collections, question, messages): | |
collection_name_only = re.sub('\s\(\d+\)$', '', collection_name) | |
messages.set([Message( | |
Icons.OK, f"Selected collection: <span style='font-weight: bold'>{collection_name_only}</span>.")]) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
docs = Docs() | |
# Generate search queries from the question by Paper QA | |
try: | |
question_prompt = 'A "keyword search" is a list of no more than 3 words, which separated by whitespace only and with no boolean operators (e.g. "dog canine puppy"). Avoid adding any new words not in the question unless they are synonyms to the existing words.' | |
queries = [x.strip('"').lower() for x in | |
docs.generate_search_query(question + '\n' + question_prompt)] | |
query_str = ", ".join( | |
[f"<span style='font-weight: bold;'>{q}</span>" for q in queries]) | |
messages.append( | |
Message(Icons.WAIT, f"Searching your Zotero collection for {query_str}.")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
except Exception as e: | |
messages.append( | |
Message(Icons.ERR, f"Error occurred when generating search queries: {e}")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
return None, None, None | |
# Search for attachments in Zotero | |
try: | |
collection = [ | |
x for x in collections if f"{x['data']['name']} ({x['meta']['numItems']})" == collection_name][0] | |
searches = [zot.collection_items( | |
collection['key'], | |
q=q, | |
limit=10, | |
itemType='attachment', | |
qmode='everything' | |
) for q in queries] | |
attachments = [x for x in { | |
item['key']: item for search in searches for item in search if item['data']['contentType'] == 'application/pdf'}.values()][:10] | |
parents = set([a['data']['parentItem'] for a in attachments]) | |
if len(attachments) > 0: | |
messages.append(Message( | |
Icons.SUCCESS, f"Found {len(attachments)} PDF {'attachments' if len(attachments) > 1 else 'attachment'} from {len(parents)} {'articles' if len(parents) > 1 else 'article'}.")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
else: | |
messages.append(Message( | |
Icons.ERR, "No results. Make sure to index your PDF attachments in Zotero and try rephrasing your question.")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
return None, None, None | |
except Exception as e: | |
messages.append( | |
Message(Icons.ERR, f"Error occurred when searching in Zotero: {e}")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
return None, None, None | |
# Compile citation metadata | |
citation_dict = {} | |
parents = {} | |
messages.append( | |
Message(Icons.WAIT, f"Fetching attachment bibliography information.")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
for attachment in attachments: | |
parent_id = attachment["data"]["parentItem"] | |
try: | |
if parent_id in parents: | |
citation_dict[attachment["key"]] = parents[parent_id] | |
else: | |
parent = zot.item( | |
attachment["data"]["parentItem"], content="bib", style="nature")[0] | |
bib = f""" | |
{html.fragment_fromstring(parent).xpath("normalize-space(div[2])")} | |
<a href="{attachment['links']['alternate']['href']}" target="_blank" class="zotero-link">Open in Zotero</a> | |
""" | |
parents[parent_id] = bib | |
citation_dict[attachment["key"]] = bib | |
except Exception as e: | |
messages.append(Message( | |
Icons.WARN, f"Failed to retrieve bibliography for PDF attachment <a href='{attachment['links']['alternate']['href']}' target='_blank'>{attachment['data']['title']}</a>: {e}")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
# Load attachments | |
available_attachments = 0 | |
for attachment in attachments: | |
try: | |
link_mode = attachment['data']['linkMode'] | |
if link_mode in ['imported_file', 'imported_url']: | |
attachment_content = zot.file(attachment['key']) if link_mode == 'imported_file' else requests.get( | |
attachment['data']['url']).content | |
temp_file = tempfile.NamedTemporaryFile(suffix=".pdf") | |
temp_file.write(attachment_content) | |
temp_file.flush() | |
docs.add(temp_file.name, citation_dict[attachment["key"]]) | |
messages.append(Message( | |
Icons.INDEX, f"Loaded PDF attachment: <a href='{attachment['links']['alternate']['href']}' target='_blank'>{attachment['data']['title']}</a>.")) | |
available_attachments += 1 | |
else: | |
messages.append(Message( | |
Icons.WARN, f"Unable to access linked PDF attachment <a href='{attachment['links']['alternate']['href']}' target='_blank'>{attachment['data']['title']}</a>: The file is not in Zotero online storage.")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
except Exception as e: | |
messages.append(Message( | |
Icons.WARN, f"Failed to retrieve PDF attachment <a href='{attachment['links']['alternate']['href']}' target='_blank'>{attachment['data']['title']}</a>: {e}")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
# Build vector index | |
if available_attachments == 0: | |
messages.append(Message( | |
Icons.ERR, "No answer. Unable to access any PDF attachments from your Zotero online storage or public URLs.")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
return None, None, None | |
if docs._faiss_index is None: | |
try: | |
messages.append(Message( | |
Icons.WAIT, f"Building vector index based on {available_attachments} available PDF {'attachment' if available_attachments==1 else 'attachments'}.")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
docs._build_faiss_index() | |
except Exception as e: | |
messages.append(Message( | |
Icons.ERR, f"Unable to build vector index: {e}")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
return None, None, None | |
# Synthesize response | |
messages.append(Message( | |
Icons.WAIT, f"""Creating answer. {"This should be done within a minute." if available_attachments==1 else "This will loop through all available PDF attachments and may take a couple of minutes."}.""")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
try: | |
start_time = time.time() | |
total_time = 0 | |
for i, answer in enumerate(docs.query_gen(question)): | |
end_time = time.time() | |
time_dif = end_time - start_time | |
if time_dif > 15: | |
start_time = end_time | |
total_time += time_dif | |
messages.append(Message( | |
Icons.INFO, f"Still in prgress: {total_time:.1f} seconds")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
answer_text = "\n".join( | |
[f"<div>{x}</div>" for x in answer.answer.split("\n")]) | |
reference_list = "" if answer.references == "" else "\n".join([f"<li>{x.split('.', 1)[1]}</li>" | |
for x in answer.references.split('\n\n')]) | |
references = "" if reference_list == "" else f""" | |
<h4 style="font-size: 1rem;">References:</h4> | |
<ol> | |
{reference_list} | |
</ol> | |
""" | |
formatted_answer = f""" | |
<div>{answer_text}</div> | |
{references} | |
<div>Tokens Used: {answer.tokens} Cost: ${answer.tokens/1000 * 0.002:.2f}</div> | |
""".strip() | |
messages.append(Message( | |
Icons.OK, f"Answer created.")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
gr.HTML.update(value=formatted_answer) | |
) | |
except Exception as e: | |
messages.append(Message( | |
Icons.ERR, f"Error occurred when creating answer: {e}")) | |
yield ( | |
messages, | |
gr.HTML.update(value=str(messages)), | |
None, | |
) | |
return None, None, None | |