from fastapi import Body, Request, HTTPException, status from fastapi.encoders import jsonable_encoder import sys from ..models.calls import UpdateCall, UserCall, UserCaptions from ..operations.users import * from utils.text_rank import extract_terms from openai import OpenAI from time import sleep import os from dotenv import dotenv_values # Used within calls to create call record in main.py def create_calls(collection, user: UserCall = Body(...)): calls = jsonable_encoder(user) new_calls = collection.insert_one(calls) created_calls = collection.find_one({"_id": new_calls.inserted_id}) return created_calls def list_calls(collection, limit: int): try: calls = collection.find(limit = limit) return list(calls) except: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"No existing call records yet.") '''Finding calls based on call id''' def find_call(collection, call_id: str): user_calls = collection.find_one({"call_id": call_id}) if user_calls is not None: return user_calls else: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with ID: '{call_id}' not found.") '''Finding calls based on user id''' def find_user_calls(collection, user_id: str): user_calls = list(collection.find({"$or": [{"caller_id": user_id}, {"callee_id": user_id}]})) # match on caller or callee ID if len(user_calls): return user_calls else: return [] # return empty list if no existing calls for TranscriptView frontend component '''Finding calls based on key terms list''' def list_transcripts_by_key_terms(collection, key_terms_list: list[str] = Body(...)): key_terms_list = jsonable_encoder(key_terms_list) call_records = list(collection.find({"key_terms": {"$in": key_terms_list}}, {'_id': 0})) # exclude returning ObjectID in find() # Check if any call records were returned if len(call_records): return call_records else: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with key terms: '{key_terms_list}' not found!") '''Finding calls based on date ranges''' def list_transcripts_by_dates(collection, start_date: str, end_date: str): # print(start_date, end_date) # Convert strings to date string in YYYY-MM-ddT00:00:00 format start_date = f'{start_date}T00:00:00' end_date = f'{end_date}T00:00:00' call_records = list(collection.find({"date":{"$gte": start_date, "$lte": end_date}})) if len(call_records): return call_records else: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with creation date between: '{start_date} - {end_date}' not found!") '''Finding calls based on call lengths''' def list_transcripts_by_duration(collection, min_len: int, max_len: int): call_records = list(collection.find({"duration":{"$gte": min_len, "$lte": max_len}})) if len(call_records): return call_records else: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with duration between: '{min_len} - {max_len}' milliseconds not found!") def update_calls(collection, call_id: str, calls: UpdateCall = Body(...)): # calls = {k: v for k, v in calls.model_dump().items() if v is not None} #loop in the dict calls = {k: v for k, v in calls.items() if v is not None} #loop in the dict print(calls) if len(calls) >= 1: update_result = collection.update_one({"call_id": call_id}, {"$set": calls}) if update_result.modified_count == 0: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not updated!") if (existing_item := collection.find_one({"call_id": call_id})) is not None: return existing_item raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not found!") def update_captions(call_collection, user_collection, call_id: str, captions: UserCaptions = Body(...)): # captions = {k: v for k, v in calls.model_dump().items() if v is not None} captions = {k: v for k, v in captions.items() if v is not None} # print(captions) # index user_id from caption object userID = captions["author_id"] # print(userID) # use user id to get user name username = find_name_from_id(user_collection, userID) # print(username) # add user name to captions json/object captions["author_username"] = username # print(captions) if len(captions) >= 1: update_result = call_collection.update_one({"call_id": call_id}, {"$push": {"captions": captions}}) if update_result.modified_count == 0: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Captions not updated!") if (existing_item := call_collection.find_one({"call_id": call_id})) is not None: return existing_item raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Captions not found!") def delete_calls(collection, call_id: str): deleted_calls = collection.delete_one({"call_id": call_id}) if deleted_calls.deleted_count == 1: return f"Call deleted sucessfully!" raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not found!") # def get_caption_text(collection, call_id): # call_record = find_call((collection), call_id) # try: # Check if call has any captions first # caption_records = call_record['captions'] # except KeyError: # return None # # iterate through caption embedded document and store original text # combined_text = [caption['original_text'] for caption in caption_records] # return " ".join(combined_text) def get_caption_text(collection, call_id, user_id): call_record = find_call((collection), call_id) try: # Check if call has any captions first caption_records = call_record['captions'] except KeyError: return None # iterate through caption embedded document and store original text # combined_text = [caption['original_text'] for caption in caption_records] combined_text = [] for caption_segment in caption_records: if caption_segment['author_id'] == user_id: combined_text.append(caption_segment['original_text']) else: combined_text.append(caption_segment['translated_text']) return " ".join(combined_text) # standard exact match based full text search def full_text_search(collection, query): # drop any existing indexes and create new one collection.drop_indexes() collection.create_index([('captions.original_text', 'text'), ('captions.tranlated_text', 'text')], name='captions') # print(collection.index_information()) results = list(collection.find({"$text": {"$search": query}})) return results # approximate string matching def fuzzy_search(collection, query): # drop any existing indexes and create new one collection.drop_indexes() collection.create_index([('captions.original_text', 'text'), ('captions.tranlated_text', 'text')], name='captions') # print(collection.index_information()) pipeline = [ { "$search": { "text": { "query": query, "path": {"wildcard": "*"}, "fuzzy": {} } } } ] collection_results = list(collection.aggregate(pipeline)) # print(collection_results) return collection_results def summarise(collection, call_id, user_id, target_language): # client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) config = dotenv_values(".env") client = OpenAI(api_key=config["OPENAI_API_KEY"]) # get caption text using call_id caption_text = get_caption_text(collection, call_id, user_id) chat_completion = client.chat.completions.create( messages=[ { "role": "user", "content": f"The following is an extract from a call transcript. Rewrite this as a structured, clear summary in {target_language}. \ \n\Call Transcript: \"\"\"\n{caption_text}\n\"\"\"\n" } ], model="gpt-3.5-turbo", ) # Gpt-3.5 turbo has 4096 token limit -> request will fail if exceeded try: result = chat_completion.choices[0].message.content.split(":")[1].strip() # parse summary except: return None # BO - add result to mongodb -> should be done asynchronously # summary_payload = {"summaries": {user_id: result}} update_result = collection.update_one({"call_id": call_id}, {"$set": {f"summaries.{user_id}": result}}) if update_result.modified_count == 0: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not updated!") # if (existing_item := collection.find_one({"call_id": call_id})) is not None: # print(existing_item) return result def term_extraction(collection, call_id, user_id, target_language): combined_text = get_caption_text(collection, call_id, user_id) if len(combined_text) > 50: # > min_caption_length: -> poor term extraction on short transcripts # Extract Key Terms from Concatenated Caption Field key_terms = extract_terms(combined_text, target_language, len(combined_text)) update_result = collection.update_one({"call_id": call_id}, {"$set": {f"key_terms.{user_id}": key_terms}}) if update_result.modified_count == 0: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not updated!") return key_terms