benjolo's picture
adding updates to monolingual transcript functionality
9680844 verified
raw
history blame
No virus
9.79 kB
from fastapi import Body, Request, HTTPException, status
from fastapi.encoders import jsonable_encoder
import sys
from ..models.calls import UpdateCall, UserCall, UserCaptions
from ..operations.users import *
from utils.text_rank import extract_terms
from openai import OpenAI
from time import sleep
import os
from dotenv import dotenv_values
# Used within calls to create call record in main.py
def create_calls(collection, user: UserCall = Body(...)):
calls = jsonable_encoder(user)
new_calls = collection.insert_one(calls)
created_calls = collection.find_one({"_id": new_calls.inserted_id})
return created_calls
def list_calls(collection, limit: int):
try:
calls = collection.find(limit = limit)
return list(calls)
except:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"No existing call records yet.")
'''Finding calls based on call id'''
def find_call(collection, call_id: str):
user_calls = collection.find_one({"call_id": call_id})
if user_calls is not None:
return user_calls
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with ID: '{call_id}' not found.")
'''Finding calls based on user id'''
def find_user_calls(collection, user_id: str):
user_calls = list(collection.find({"$or": [{"caller_id": user_id}, {"callee_id": user_id}]})) # match on caller or callee ID
if len(user_calls):
return user_calls
else:
return [] # return empty list if no existing calls for TranscriptView frontend component
'''Finding calls based on key terms list'''
def list_transcripts_by_key_terms(collection, key_terms_list: list[str] = Body(...)):
key_terms_list = jsonable_encoder(key_terms_list)
call_records = list(collection.find({"key_terms": {"$in": key_terms_list}}, {'_id': 0})) # exclude returning ObjectID in find()
# Check if any call records were returned
if len(call_records):
return call_records
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with key terms: '{key_terms_list}' not found!")
'''Finding calls based on date ranges'''
def list_transcripts_by_dates(collection, start_date: str, end_date: str):
# print(start_date, end_date)
# Convert strings to date string in YYYY-MM-ddT00:00:00 format
start_date = f'{start_date}T00:00:00'
end_date = f'{end_date}T00:00:00'
call_records = list(collection.find({"date":{"$gte": start_date, "$lte": end_date}}))
if len(call_records):
return call_records
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with creation date between: '{start_date} - {end_date}' not found!")
'''Finding calls based on call lengths'''
def list_transcripts_by_duration(collection, min_len: int, max_len: int):
call_records = list(collection.find({"duration":{"$gte": min_len, "$lte": max_len}}))
if len(call_records):
return call_records
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with duration between: '{min_len} - {max_len}' milliseconds not found!")
def update_calls(collection, call_id: str, calls: UpdateCall = Body(...)):
# calls = {k: v for k, v in calls.model_dump().items() if v is not None} #loop in the dict
calls = {k: v for k, v in calls.items() if v is not None} #loop in the dict
print(calls)
if len(calls) >= 1:
update_result = collection.update_one({"call_id": call_id}, {"$set": calls})
if update_result.modified_count == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not updated!")
if (existing_item := collection.find_one({"call_id": call_id})) is not None:
return existing_item
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not found!")
def update_captions(call_collection, user_collection, call_id: str, captions: UserCaptions = Body(...)):
# captions = {k: v for k, v in calls.model_dump().items() if v is not None}
captions = {k: v for k, v in captions.items() if v is not None}
# print(captions)
# index user_id from caption object
userID = captions["author_id"]
# print(userID)
# use user id to get user name
username = find_name_from_id(user_collection, userID)
# print(username)
# add user name to captions json/object
captions["author_username"] = username
# print(captions)
if len(captions) >= 1:
update_result = call_collection.update_one({"call_id": call_id},
{"$push": {"captions": captions}})
if update_result.modified_count == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Captions not updated!")
if (existing_item := call_collection.find_one({"call_id": call_id})) is not None:
return existing_item
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Captions not found!")
def delete_calls(collection, call_id: str):
deleted_calls = collection.delete_one({"call_id": call_id})
if deleted_calls.deleted_count == 1:
return f"Call deleted sucessfully!"
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not found!")
# def get_caption_text(collection, call_id):
# call_record = find_call((collection), call_id)
# try: # Check if call has any captions first
# caption_records = call_record['captions']
# except KeyError:
# return None
# # iterate through caption embedded document and store original text
# combined_text = [caption['original_text'] for caption in caption_records]
# return " ".join(combined_text)
def get_caption_text(collection, call_id, user_id):
call_record = find_call((collection), call_id)
try: # Check if call has any captions first
caption_records = call_record['captions']
except KeyError:
return None
# iterate through caption embedded document and store original text
# combined_text = [caption['original_text'] for caption in caption_records]
combined_text = []
for caption_segment in caption_records:
if caption_segment['author_id'] == user_id:
combined_text.append(caption_segment['original_text'])
else:
combined_text.append(caption_segment['translated_text'])
return " ".join(combined_text)
# standard exact match based full text search
def full_text_search(collection, query):
# drop any existing indexes and create new one
collection.drop_indexes()
collection.create_index([('captions.original_text', 'text'), ('captions.tranlated_text', 'text')],
name='captions')
# print(collection.index_information())
results = list(collection.find({"$text": {"$search": query}}))
return results
# approximate string matching
def fuzzy_search(collection, query):
# drop any existing indexes and create new one
collection.drop_indexes()
collection.create_index([('captions.original_text', 'text'), ('captions.tranlated_text', 'text')],
name='captions')
# print(collection.index_information())
pipeline = [
{
"$search": {
"text": {
"query": query,
"path": {"wildcard": "*"},
"fuzzy": {}
}
}
}
]
collection_results = list(collection.aggregate(pipeline))
# print(collection_results)
return collection_results
def summarise(collection, call_id, user_id, target_language):
# client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
config = dotenv_values(".env")
client = OpenAI(api_key=config["OPENAI_API_KEY"])
# get caption text using call_id
caption_text = get_caption_text(collection, call_id, user_id)
chat_completion = client.chat.completions.create(
messages=[
{
"role": "user",
"content": f"The following is an extract from a call transcript. Rewrite this as a structured, clear summary in {target_language}. \
\n\Call Transcript: \"\"\"\n{caption_text}\n\"\"\"\n"
}
],
model="gpt-3.5-turbo",
)
# Gpt-3.5 turbo has 4096 token limit -> request will fail if exceeded
try:
result = chat_completion.choices[0].message.content.split(":")[1].strip() # parse summary
except:
return None
# BO - add result to mongodb -> should be done asynchronously
# summary_payload = {"summaries": {user_id: result}}
update_result = collection.update_one({"call_id": call_id}, {"$set": {f"summaries.{user_id}": result}})
if update_result.modified_count == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not updated!")
# if (existing_item := collection.find_one({"call_id": call_id})) is not None:
# print(existing_item)
return result
def term_extraction(collection, call_id, user_id, target_language):
combined_text = get_caption_text(collection, call_id, user_id)
if len(combined_text) > 50: # > min_caption_length: -> poor term extraction on short transcripts
# Extract Key Terms from Concatenated Caption Field
key_terms = extract_terms(combined_text, target_language, len(combined_text))
update_result = collection.update_one({"call_id": call_id}, {"$set": {f"key_terms.{user_id}": key_terms}})
if update_result.modified_count == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not updated!")
return key_terms