from fastapi import FastAPI, Query from fastapi.middleware.cors import CORSMiddleware import os # Define the FastAPI app app = FastAPI(docs_url="/") # Add the CORS middleware to the app app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) key = os.environ.get("GOOGLE_BOOKS_API_KEY") @app.get("/search") async def search( query: str, add_chatgpt_results: bool = False, add_articles: bool = False, n_results: int = 10, ): """ Get the results from the Google Books API, OpenAlex, and optionally OpenAI. """ import time import requests start_time = time.time() # Initialize the lists to store the results titles = [] authors = [] publishers = [] descriptions = [] images = [] def gbooks_search(query, n_results=30): """ Access the Google Books API and return the results. """ # Set the API endpoint and query parameters url = "https://www.googleapis.com/books/v1/volumes" params = { "q": str(query), "printType": "books", "maxResults": n_results, "key": key, } # Send a GET request to the API with the specified parameters response = requests.get(url, params=params) # Parse the response JSON and append the results data = response.json() # Initialize the lists to store the results titles = [] authors = [] publishers = [] descriptions = [] images = [] for item in data["items"]: volume_info = item["volumeInfo"] try: titles.append(f"{volume_info['title']}: {volume_info['subtitle']}") except KeyError: titles.append(volume_info["title"]) try: descriptions.append(volume_info["description"]) except KeyError: descriptions.append("Null") try: publishers.append(volume_info["publisher"]) except KeyError: publishers.append("Null") try: authors.append(volume_info["authors"][0]) except KeyError: authors.append("Null") try: images.append(volume_info["imageLinks"]["thumbnail"]) except KeyError: images.append( "https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" ) return titles, authors, publishers, descriptions, images # Run the gbooks_search function ( titles_placeholder, authors_placeholder, publishers_placeholder, descriptions_placeholder, images_placeholder, ) = gbooks_search(query, n_results=n_results) # Append the results to the lists [titles.append(title) for title in titles_placeholder] [authors.append(author) for author in authors_placeholder] [publishers.append(publisher) for publisher in publishers_placeholder] [descriptions.append(description) for description in descriptions_placeholder] [images.append(image) for image in images_placeholder] # Get the time since the start first_checkpoint = time.time() first_checkpoint_time = int(first_checkpoint - start_time) def openalex_search(query, n_results=10): """ Run a search on OpenAlex and return the results. """ import pyalex from pyalex import Works # Add email to the config pyalex.config.email = "ber2mir@gmail.com" # Define a pager object with the same query pager = Works().search(str(query)).paginate(per_page=n_results, n_max=n_results) # Generate a list of the results openalex_results = list(pager) # Initialize the lists to store the results titles = [] authors = [] publishers = [] descriptions = [] images = [] # Get the titles, descriptions, and publishers and append them to the lists try: for result in openalex_results[0]: try: titles.append(result["title"]) except KeyError: titles.append("Null") try: descriptions.append(result["abstract"]) except KeyError: descriptions.append("Null") try: publishers.append(result["host_venue"]["publisher"]) except KeyError: publishers.append("Null") try: authors.append(result["authorships"][0]["author"]["display_name"]) except KeyError: authors.append("Null") images.append( "https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" ) except IndexError: titles.append("Null") descriptions.append("Null") publishers.append("Null") authors.append("Null") images.append( "https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" ) return titles, authors, publishers, descriptions, images if add_articles: # Run the openalex_search function ( titles_placeholder, authors_placeholder, publishers_placeholder, descriptions_placeholder, images_placeholder, ) = openalex_search(query, n_results=n_results) # Append the results to the lists [titles.append(title) for title in titles_placeholder] [authors.append(author) for author in authors_placeholder] [publishers.append(publisher) for publisher in publishers_placeholder] [descriptions.append(description) for description in descriptions_placeholder] [images.append(image) for image in images_placeholder] # Calculate the elapsed time between the first and second checkpoints second_checkpoint = time.time() second_checkpoint_time = int(second_checkpoint - first_checkpoint) def openai_search(query, n_results=10): """ Create a query to the OpenAI ChatGPT API and return the results. """ import openai # Initialize the lists to store the results titles = [] authors = [] publishers = [] descriptions = [] images = [] # Set the OpenAI API key openai.api_key = "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" # Create ChatGPT query chatgpt_response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ { "role": "system", "content": "You are a librarian. You are helping a patron find a book.", }, { "role": "user", "content": f"Recommend me {n_results} books about {query}. Your response should be like: 'title: , author: <author>, publisher: <publisher>, summary: <summary>'", }, ], ) # Split the response into a list of results chatgpt_results = chatgpt_response["choices"][0]["message"]["content"].split( "\n" )[2::2] # Define a function to parse the results def parse_result( result, ordered_keys=["Title", "Author", "Publisher", "Summary"] ): # Create a dict to store the key-value pairs parsed_result = {} for key in ordered_keys: # Split the result string by the key and append the value to the list if key != ordered_keys[-1]: parsed_result[key] = result.split(f"{key}: ")[1].split(",")[0] else: parsed_result[key] = result.split(f"{key}: ")[1] return parsed_result ordered_keys = ["Title", "Author", "Publisher", "Summary"] for result in chatgpt_results: try: # Parse the result parsed_result = parse_result(result, ordered_keys=ordered_keys) # Append the parsed result to the lists titles.append(parsed_result["Title"]) authors.append(parsed_result["Author"]) publishers.append(parsed_result["Publisher"]) descriptions.append(parsed_result["Summary"]) images.append( "https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" ) # In case the OpenAI API hits the limit except IndexError: break return titles, authors, publishers, descriptions, images if add_chatgpt_results: # Run the openai_search function ( titles_placeholder, authors_placeholder, publishers_placeholder, descriptions_placeholder, images_placeholder, ) = openai_search(query) # Append the results to the lists [titles.append(title) for title in titles_placeholder] [authors.append(author) for author in authors_placeholder] [publishers.append(publisher) for publisher in publishers_placeholder] [descriptions.append(description) for description in descriptions_placeholder] [images.append(image) for image in images_placeholder] # Calculate the elapsed time between the second and third checkpoints third_checkpoint = time.time() third_checkpoint_time = int(third_checkpoint - second_checkpoint) results = [ { "id": i, "title": title, "author": author, "publisher": publisher, "description": description, "image_link": image, } for (i, [title, author, publisher, description, image]) in enumerate( zip(titles, authors, publishers, descriptions, images) ) ] return results @app.post("/classify") async def classify( data: list, runtime: str = Query(default="trained", enum=["trained", "zero-shot"]) ): """ Create classifier pipeline and return the results. """ titles = [book["title"] for book in data] descriptions = [book["description"] for book in data] publishers = [book["publisher"] for book in data] # Combine title, description, and publisher into a single string combined_data = [ f"The book's title is {title}. It is published by {publisher}. This book is about {description}" for title, description, publisher in zip(titles, descriptions, publishers) ] from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, pipeline, ) from optimum.onnxruntime import ORTModelForSequenceClassification from optimum.bettertransformer import BetterTransformer if runtime == "zero-shot": # Define the zero-shot classifier tokenizer = AutoTokenizer.from_pretrained( "sileod/deberta-v3-base-tasksource-nli" ) model = AutoModelForSequenceClassification.from_pretrained( "sileod/deberta-v3-base-tasksource-nli" ) classifier_pipe = pipeline( "zero-shot-classification", model=model, tokenizer=tokenizer, hypothesis_template="This book is {}.", batch_size=1, device=-1, multi_label=False, ) # Define the candidate labels level = [ "Introductory", "Advanced", ] audience = ["Academic", "Not Academic", "Manual"] classes = [ { "audience": classifier_pipe(doc, audience)["labels"][0], "audience_confidence": classifier_pipe(doc, audience)["scores"][0], "level": classifier_pipe(doc, level)["labels"][0], "level_confidence": classifier_pipe(doc, level)["scores"][0], } for doc in combined_data ] elif runtime == "trained": ### Define the classifier for audience prediction ### audience_tokenizer = AutoTokenizer.from_pretrained( "bertugmirasyedi/deberta-v3-base-book-classification", max_len=512, ) audience_model = AutoModelForSequenceClassification.from_pretrained( "bertugmirasyedi/deberta-v3-base-book-classification" ) audience_classifier = pipeline( "text-classification", model=audience_model, tokenizer=audience_tokenizer, device=-1, ) ### Define the classifier for level prediction ### level_tokenizer = AutoTokenizer.from_pretrained( "bertugmirasyedi/deberta-v3-base-level-classification", max_len=512, ) level_model = AutoModelForSequenceClassification.from_pretrained( "bertugmirasyedi/deberta-v3-base-level-classification" ) level_classifier = pipeline( "text-classification", model=level_model, tokenizer=level_tokenizer, device=-1, ) classes = [ { "audience": audience_classifier(doc, padding=True, truncation=True)[0][ "label" ], "audience_confidence": audience_classifier( doc, padding=True, truncation=True )[0]["score"], "level": level_classifier(doc, padding=True, truncation=True)[0][ "label" ], "level_confidence": level_classifier( doc, padding=True, truncation=True )[0]["score"], } for doc in combined_data ] return classes @app.post("/find_similar") async def find_similar(data: list, top_k: int = 5): """ Calculate the similarity between the selected book and the corpus. Return the top_k results. """ from sentence_transformers import SentenceTransformer from sentence_transformers import util titles = [book["title"] for book in data] descriptions = [book["description"] for book in data] publishers = [book["publisher"] for book in data] # Combine title, description, and publisher into a single string combined_data = [ f"The book's title is {title}. It is published by {publisher}. This book is about {description}" for title, description, publisher in zip(titles, descriptions, publishers) ] sentence_transformer = SentenceTransformer("all-MiniLM-L6-v2") book_embeddings = sentence_transformer.encode(combined_data, convert_to_tensor=True) # Make sure that the top_k value is not greater than the number of books top_k = len(combined_data) if top_k > len(combined_data) else top_k similar_books = [] for i in range(len(combined_data)): # Get the embedding for the ith book current_embedding = book_embeddings[i] # Calculate the similarity between the ith book and the rest of the books similarity_sorted = util.semantic_search( current_embedding, book_embeddings, top_k=top_k ) # Append the results to the list similar_books.append( { "sorted_by_similarity": similarity_sorted[0][1:], } ) return similar_books @app.post("/summarize") async def summarize( descriptions: list, runtime: str = Query(default="normal", enum=["normal", "onnxruntime"]), ): """ Summarize the descriptions and return the results. """ from transformers import ( AutoTokenizer, AutoModelForSeq2SeqLM, pipeline, ) from optimum.onnxruntime import ORTModelForSeq2SeqLM from optimum.bettertransformer import BetterTransformer # Define the summarizer model and tokenizer if runtime == "normal": tokenizer = AutoTokenizer.from_pretrained("lidiya/bart-base-samsum") model = AutoModelForSeq2SeqLM.from_pretrained("lidiya/bart-base-samsum") model = BetterTransformer.transform(model) elif runtime == "onnxruntime": tokenizer = AutoTokenizer.from_pretrained("optimum/t5-small") model = ORTModelForSeq2SeqLM.from_pretrained("optimum/t5-small") # Create the summarizer pipeline summarizer_pipe = pipeline("summarization", model=model, tokenizer=tokenizer) # Summarize the descriptions summaries = [ summarizer_pipe(description) if (description != "Null" and description != None) else [{"summary_text": "No summary text is available."}] for description in descriptions ] return summaries @app.get("/get_server_status") def get_server_status(): """ Return the server status. """ from huggingface_hub import HfApi # Define the Hugging Face API client and Aristotle API space hf_api = HfApi() space_id = "bertugmirasyedi/aristotle-api" # Get the space runtime information runtime = hf_api.get_space_runtime(space_id) # Return the server status status = runtime.stage return {"status": status}