from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware # Define the FastAPI app app = FastAPI(docs_url="/") # Add the CORS middleware to the app app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/search") def search( query: str, classification: bool = True, summarization: bool = True, similarity: bool = False, add_chatgpt_results: bool = True, n_results: int = 10, ): import time import requests start_time = time.time() # Initialize the lists to store the results titles = [] authors = [] publishers = [] descriptions = [] images = [] def gbooks_search(query, n_results=30): """ Access the Google Books API and return the results. """ # Set the API endpoint and query parameters url = "https://www.googleapis.com/books/v1/volumes" params = {"q": str(query), "printType": "books", "maxResults": n_results} # Send a GET request to the API with the specified parameters response = requests.get(url, params=params) # Parse the response JSON and append the results data = response.json() # Initialize the lists to store the results titles = [] authors = [] publishers = [] descriptions = [] images = [] for item in data["items"]: volume_info = item["volumeInfo"] try: titles.append(f"{volume_info['title']}: {volume_info['subtitle']}") except KeyError: titles.append(volume_info["title"]) try: descriptions.append(volume_info["description"]) except KeyError: descriptions.append("Null") try: publishers.append(volume_info["publisher"]) except KeyError: publishers.append("Null") try: authors.append(volume_info["authors"][0]) except KeyError: authors.append("Null") try: images.append(volume_info["imageLinks"]["thumbnail"]) except KeyError: images.append( "https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" ) return titles, authors, publishers, descriptions, images # Run the gbooks_search function ( titles_placeholder, authors_placeholder, publishers_placeholder, descriptions_placeholder, images_placeholder, ) = gbooks_search(query, n_results=n_results) # Append the results to the lists [titles.append(title) for title in titles_placeholder] [authors.append(author) for author in authors_placeholder] [publishers.append(publisher) for publisher in publishers_placeholder] [descriptions.append(description) for description in descriptions_placeholder] [images.append(image) for image in images_placeholder] # Get the time since the start first_checkpoint = time.time() first_checkpoint_time = int(first_checkpoint - start_time) def openalex_search(query, n_results=10): """ Run a search on OpenAlex and return the results. """ import pyalex from pyalex import Works # Add email to the config pyalex.config.email = "ber2mir@gmail.com" # Define a pager object with the same query pager = Works().search(str(query)).paginate(per_page=n_results, n_max=n_results) # Generate a list of the results openalex_results = list(pager) # Initialize the lists to store the results titles = [] authors = [] publishers = [] descriptions = [] images = [] # Get the titles, descriptions, and publishers and append them to the lists for result in openalex_results[0]: try: titles.append(result["title"]) except KeyError: titles.append("Null") try: descriptions.append(result["abstract"]) except KeyError: descriptions.append("Null") try: publishers.append(result["host_venue"]["publisher"]) except KeyError: publishers.append("Null") try: authors.append(result["authorships"][0]["author"]["display_name"]) except KeyError: authors.append("Null") images.append( "https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" ) return titles, authors, publishers, descriptions, images # Run the openalex_search function ( titles_placeholder, authors_placeholder, publishers_placeholder, descriptions_placeholder, images_placeholder, ) = openalex_search(query, n_results=n_results) # Append the results to the lists [titles.append(title) for title in titles_placeholder] [authors.append(author) for author in authors_placeholder] [publishers.append(publisher) for publisher in publishers_placeholder] [descriptions.append(description) for description in descriptions_placeholder] [images.append(image) for image in images_placeholder] # Calculate the elapsed time between the first and second checkpoints second_checkpoint = time.time() second_checkpoint_time = int(second_checkpoint - first_checkpoint) def openai_search(query, n_results=10): """ Create a query to the OpenAI ChatGPT API and return the results. """ import openai # Initialize the lists to store the results titles = [] authors = [] publishers = [] descriptions = [] images = [] # Set the OpenAI API key openai.api_key = "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" # Create ChatGPT query chatgpt_response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ { "role": "system", "content": "You are a librarian. You are helping a patron find a book.", }, { "role": "user", "content": f"Recommend me {n_results} books about {query}. Your response should be like: 'title: , author: <author>, publisher: <publisher>, summary: <summary>'", }, ], ) # Split the response into a list of results chatgpt_results = chatgpt_response["choices"][0]["message"]["content"].split( "\n" )[2::2] # Define a function to parse the results def parse_result( result, ordered_keys=["Title", "Author", "Publisher", "Summary"] ): # Create a dict to store the key-value pairs parsed_result = {} for key in ordered_keys: # Split the result string by the key and append the value to the list if key != ordered_keys[-1]: parsed_result[key] = result.split(f"{key}: ")[1].split(",")[0] else: parsed_result[key] = result.split(f"{key}: ")[1] return parsed_result ordered_keys = ["Title", "Author", "Publisher", "Summary"] for result in chatgpt_results: try: # Parse the result parsed_result = parse_result(result, ordered_keys=ordered_keys) # Append the parsed result to the lists titles.append(parsed_result["Title"]) authors.append(parsed_result["Author"]) publishers.append(parsed_result["Publisher"]) descriptions.append(parsed_result["Summary"]) images.append( "https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" ) # In case the OpenAI API hits the limit except IndexError: break return titles, authors, publishers, descriptions, images if add_chatgpt_results: # Run the openai_search function ( titles_placeholder, authors_placeholder, publishers_placeholder, descriptions_placeholder, images_placeholder, ) = openai_search(query) # Append the results to the lists [titles.append(title) for title in titles_placeholder] [authors.append(author) for author in authors_placeholder] [publishers.append(publisher) for publisher in publishers_placeholder] [descriptions.append(description) for description in descriptions_placeholder] [images.append(image) for image in images_placeholder] # Calculate the elapsed time between the second and third checkpoints third_checkpoint = time.time() third_checkpoint_time = int(third_checkpoint - second_checkpoint) # Combine title, description, and publisher into a single string combined_data = [ f"The book's title is {title}. It is published by {publisher}. This book is about {description}" for title, description, publisher in zip(titles, descriptions, publishers) ] def find_similar(combined_data, top_k=10): """ Calculate the similarity between the books and return the top_k results. """ from sentence_transformers import SentenceTransformer from sentence_transformers import util sentence_transformer = SentenceTransformer("all-MiniLM-L6-v2") book_embeddings = sentence_transformer.encode( combined_data, convert_to_tensor=True ) # Make sure that the top_k value is not greater than the number of books top_k = len(combined_data) if top_k > len(combined_data) else top_k similar_books = [] for i in range(len(combined_data)): # Get the embedding for the ith book current_embedding = book_embeddings[i] # Calculate the similarity between the ith book and the rest of the books similarity_sorted = util.semantic_search( current_embedding, book_embeddings, top_k=top_k ) # Append the results to the list similar_books.append( { "sorted_by_similarity": similarity_sorted[0][1:], } ) return similar_books def summarize(descriptions): """ Summarize the descriptions and return the results. """ from transformers import ( AutoTokenizer, AutoModelForSeq2SeqLM, pipeline, ) # Define the summarizer model and tokenizer tokenizer = AutoTokenizer.from_pretrained("sshleifer/distilbart-cnn-12-6") model = AutoModelForSeq2SeqLM.from_pretrained("sshleifer/distilbart-cnn-12-6") # Create the summarizer pipeline summarizer_pipe = pipeline( "summarization", model=model, tokenizer=tokenizer, min_length=10, max_length=128, ) # Summarize the descriptions summaries = [ summarizer_pipe(description) if (len(description) > 0) else [{"summary_text": "No summary text is available."}] for description in descriptions ] return summaries def classify(combined_data, parallel=False): """ Create classifier pipeline and return the results. """ from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, pipeline, ) # Define the zero-shot classifier tokenizer = AutoTokenizer.from_pretrained( "sileod/deberta-v3-base-tasksource-nli" ) model = AutoModelForSequenceClassification.from_pretrained( "sileod/deberta-v3-base-tasksource-nli" ) classifier_pipe = pipeline( "zero-shot-classification", model=model, tokenizer=tokenizer, hypothesis_template="This book is {}.", batch_size=1, device=-1, multi_label=True, ) # Define the candidate labels candidate_labels = [ "Introductory", "Advanced", "Academic", "Not Academic", "Manual", ] if parallel: import ray import psutil # Define the number of cores to use num_cores = psutil.cpu_count(logical=True) # Initialize Ray ray.init(num_cpus=num_cores, ignore_reinit_error=True) classifier_id = ray.put(classifier_pipe) # Define the function to be parallelized @ray.remote def classify_parallel(classifier_id, doc, candidate_labels): classifier = ray.get(classifier_id) return classifier(doc, candidate_labels) # Get the predicted labels classes = [ classify_parallel.remote(classifier_id, doc, candidate_labels) for doc in combined_data ] else: # Get the predicted labels classes = [classifier_pipe(doc, candidate_labels) for doc in combined_data] return classes # If true then run the similarity, summarize, and classify functions if classification: classes = classify(combined_data, parallel=False) else: classes = [ {"labels": ["No labels available."], "scores": [0]} for i in range(len(combined_data)) ] # Calculate the elapsed time between the third and fourth checkpoints fourth_checkpoint = time.time() classification_time = int(fourth_checkpoint - third_checkpoint) if summarization: summaries = summarize(descriptions) else: summaries = [ [{"summary_text": description}] if (len(description) > 0) else [{"summary_text": "No summary text is available."}] for description in descriptions ] # Calculate the elapsed time between the fourth and fifth checkpoints fifth_checkpoint = time.time() summarization_time = int(fifth_checkpoint - fourth_checkpoint) if similarity: similar_books = find_similar(combined_data) else: similar_books = [ {"sorted_by_similarity": ["No similar books available."]} for i in range(len(combined_data)) ] # Calculate the elapsed time between the fifth and sixth checkpoints sixth_checkpoint = time.time() similarity_time = int(sixth_checkpoint - fifth_checkpoint) # Calculate the total elapsed time end_time = time.time() runtime = f"{end_time - start_time:.2f} seconds" # Create a list of dictionaries to store the results results = [ { "id": i, "title": titles[i], "author": authors[i], "publisher": publishers[i], "image_link": images[i], "labels": classes[i]["labels"][0:2], "label_confidences": classes[i]["scores"][0:2], "summary": summaries[i][0]["summary_text"], "similar_books": similar_books[i]["sorted_by_similarity"], "checkpoints": [ { "Google Books Time": first_checkpoint_time, "OpenAlex Time": second_checkpoint_time, "OpenAI Time": third_checkpoint_time, "Classification Time": classification_time, "Summarization Time": summarization_time, "Similarity Computing Time": similarity_time, } ], "total_runtime": runtime, } for i in range(len(combined_data)) ] return results