import gradio as gr import os import re import requests import tempfile import time from pyzotero import zotero from paperqa import Docs from lxml import html from models import Icons, Message def is_integer(string): try: int(string) except ValueError: return False else: return True def reset_open_ai(openai_api_key): os.environ['OPENAI_API_KEY'] = openai_api_key.strip() return gr.HTML.update(value=None) def fetch_collections(openai_api_key, id, type, key, messages): if openai_api_key == '': messages.append( Message(Icons.ERR, f"Your Open API key is missing. Check out: https://platform.openai.com/overview.")) return ( None, [], None, gr.Button.update(visible=True), gr.HTML.update(visible=True), messages, gr.HTML.update(value=str(messages)), ) if key == '': messages.append( Message(Icons.ERR, f"Your Zotero API key is missing. Click here to create a new one.")) return ( None, [], None, gr.Button.update(visible=True), gr.HTML.update(visible=True), messages, gr.HTML.update(value=str(messages)), ) if not is_integer(id): messages.append( Message(Icons.ERR, f"Your Zotero ID should be an integer.")) return ( None, [], None, gr.Button.update(visible=True), gr.HTML.update(visible=True), messages, gr.HTML.update(value=str(messages)), ) zot = zotero.Zotero(int(id), type.lower(), key) try: collections = zot.collections_top() collection_names = [ f"{x['data']['name']} ({x['meta']['numItems']})" for x in collections] messages.append( Message(Icons.INFO, "Please select a Zotero collection to proceed.")) return ( zot, collections, gr.Radio.update(choices=collection_names, visible=True, interactive=True), gr.Button.update(visible=False), gr.HTML.update(visible=False), messages, gr.HTML.update(value=str(messages)), ) except Exception as e: messages.append( Message(Icons.ERR, f"Error occurred when fetching Zotero collection: {e}")) return ( None, [], None, gr.Button.update(visible=True), None, messages, gr.HTML.update(value=str(messages)), ) def select_collection(collection, messages): if collection is None: return None, messages, gr.HTML.update(), None collection_name = re.sub('\s\(\d+\)$', '', collection) messages.set([Message( Icons.OK, f"Selected collection: {collection_name}. Please type your question and hit \"Enter\".")]) return ( gr.Text.update( placeholder="Please type your question and hit \"Enter\".", interactive=True), messages, gr.HTML.update(value=str(messages)), gr.HTML.update(value=None) ) # def search_attachments(id, type, key, collection, queries=[], limit=10): # try: # zot = zotero.Zotero(int(id), type.lower(), key) # searches = [zot.collection_items( # collection['key'], # q=q, # limit=limit, # itemType='attachment', # qmode='everything' # ) for q in queries] # attachments = [x for x in {item['key']: item for search in searches for item in search if item['data'] # ['contentType'] == 'application/pdf'}.values()][:limit] # parents = set([a['data']['parentItem'] for a in attachments]) # message = f"
✅ Found {len(attachments)} PDF {'attachments' if len(attachments) > 1 else 'attachment'} from {len(parents)} {'articles' if len(parents) > 1 else 'article'}.
" if len( # attachments) else "
❔ No results. Make sure to index your PDF attachments in Zotero.
" # return parents, attachments, message # except Exception as e: # message = f"
⚠️ Error occurred when searching in Zotero: {e}
" # return [], [], message def download_attachment(id, type, key, attachment): zot = zotero.Zotero(int(id), type.lower(), key) link_mode = attachment['data']['linkMode'] if link_mode == 'imported_file': return zot.file(attachment['key']) elif link_mode == 'imported_url': res = requests.get(attachment['data']['url']) return res.content else: raise ValueError( f'Unsupported link mode: {link_mode} for {attachment["key"]}.') def reset_collection(messages): messages.set([Message( Icons.INFO, "Please provide all the required OpenAI and Zotero information in the left panel.")]) return ( gr.Radio.update(choices=[], visible=False), gr.HTML.update(visible=True), gr.Text.update( placeholder="You have to select a Zotero collection to proceed", interactive=False), gr.HTML.update(value=None), messages, gr.HTML.update(value=str(messages)), None ) def handle_submit(zot, collection_name, collections, style, question, messages): collection_name_only = re.sub('\s\(\d+\)$', '', collection_name) messages.set([Message( Icons.OK, f"Selected collection: {collection_name_only}.")]) yield ( messages, gr.HTML.update(value=str(messages)), None, ) docs = Docs() # Generate search queries from the question by Paper QA try: question_prompt = 'A "keyword search" is a list of no more than 3 words, which separated by whitespace only and with no boolean operators (e.g. "dog canine puppy"). Avoid adding any new words not in the question unless they are synonyms to the existing words.' queries = [x.strip('"').lower() for x in docs.generate_search_query(question + '\n' + question_prompt)] query_str = ", ".join( [f"{q}" for q in queries]) messages.append( Message(Icons.WAIT, f"Searching your Zotero collection for {query_str}.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) except Exception as e: messages.append( Message(Icons.ERR, f"Error occurred when generating search queries: {e}")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) return None, None, None # Search for attachments in Zotero try: collection = [ x for x in collections if f"{x['data']['name']} ({x['meta']['numItems']})" == collection_name][0] searches = [zot.collection_items( collection['key'], q=q, limit=10, itemType='attachment', qmode='everything' ) for q in queries] attachments = [x for x in { item['key']: item for search in searches for item in search if item['data']['contentType'] == 'application/pdf'}.values()][:10] parents = set([a['data']['parentItem'] if 'parentItem' in a['data'] else a['key'] for a in attachments ]) if len(attachments) > 0: messages.append(Message( Icons.SUCCESS, f"Found {len(attachments)} PDF {'attachments' if len(attachments) > 1 else 'attachment'} from {len(parents)} {'articles' if len(parents) > 1 else 'article'}.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) else: messages.append(Message( Icons.ERR, "No results. Make sure to index your PDF attachments in Zotero and try rephrasing your question.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) return None, None, None except Exception as e: messages.append( Message(Icons.ERR, f"Error occurred when searching in Zotero: {e}")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) return None, None, None # Compile citation metadata citation_dict = {} parents = {} messages.append( Message(Icons.WAIT, f"Fetching attachment bibliography information.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) for attachment in attachments: parent_id = attachment["data"]["parentItem"] if "parentItem" in attachment["data"] else attachment["key"] try: if parent_id in parents: citation_dict[attachment["key"]] = parents[parent_id] else: parent = zot.item( parent_id, content="bib", style=style)[0] bib = f""" {html.fragment_fromstring(parent).xpath("normalize-space(//*)")} Open in Zotero """ parents[parent_id] = bib citation_dict[attachment["key"]] = bib except Exception as e: messages.append(Message( Icons.WARN, f"Failed to retrieve bibliography for PDF attachment {attachment['data']['title']}: {e}")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) # Load attachments available_attachments = 0 for attachment in attachments: try: link_mode = attachment['data']['linkMode'] if link_mode in ['imported_file', 'imported_url']: attachment_content = zot.file(attachment['key']) if link_mode == 'imported_file' else requests.get( attachment['data']['url']).content temp_file = tempfile.NamedTemporaryFile(suffix=".pdf") temp_file.write(attachment_content) temp_file.flush() docs.add(temp_file.name, citation_dict[attachment["key"]]) messages.append(Message( Icons.INDEX, f"Loaded PDF attachment: {attachment['data']['title']}.")) available_attachments += 1 else: messages.append(Message( Icons.WARN, f"Unable to access linked PDF attachment {attachment['data']['title']}: The file is not in Zotero online storage.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) except Exception as e: messages.append(Message( Icons.WARN, f"Failed to retrieve PDF attachment {attachment['data']['title']}: {e}")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) # Build vector index if available_attachments == 0: messages.append(Message( Icons.ERR, "No answer. Unable to access any PDF attachments from your Zotero online storage or public URLs.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) return None, None, None if docs._faiss_index is None: try: messages.append(Message( Icons.WAIT, f"Building vector index based on {available_attachments} available PDF {'attachment' if available_attachments==1 else 'attachments'}.")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) docs._build_faiss_index() except Exception as e: messages.append(Message( Icons.ERR, f"Unable to build vector index: {e}")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) return None, None, None # Synthesize response messages.append(Message( Icons.WAIT, f"""Creating answer. {"This should be done within a minute." if available_attachments==1 else "This will loop through all available PDF attachments and may take a couple of minutes."}.""")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) try: start_time = time.time() total_time = 0 for i, answer in enumerate(docs.query_gen(question)): end_time = time.time() time_dif = end_time - start_time if time_dif > 15: start_time = end_time total_time += time_dif messages.append(Message( Icons.INFO, f"Still in prgress: {total_time:.1f} seconds")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) answer_text = "\n".join( [f"
{x}
" for x in answer.answer.split("\n")]) reference_list = "" if answer.references == "" else "\n".join([f"
  • {x.split('.', 1)[1]}
  • " for x in answer.references.split('\n\n')]) references = "" if reference_list == "" else f"""

    References:

      {reference_list}
    """ formatted_answer = f"""
    {answer_text}
    {references}
    Tokens Used: {answer.tokens} Cost: ${answer.tokens/1000 * 0.002:.2f}
    """.strip() messages.append(Message( Icons.OK, f"Answer created.")) yield ( messages, gr.HTML.update(value=str(messages)), gr.HTML.update(value=formatted_answer) ) except Exception as e: messages.append(Message( Icons.ERR, f"Error occurred when creating answer: {e}")) yield ( messages, gr.HTML.update(value=str(messages)), None, ) return None, None, None