Spaces:
Paused
Paused
from fastapi import Body, Request, HTTPException, status | |
from fastapi.encoders import jsonable_encoder | |
import sys | |
from ..models.calls import UpdateCall, UserCall, UserCaptions | |
from ..operations.users import * | |
from utils.text_rank import extract_terms | |
from openai import OpenAI | |
from time import sleep | |
import os | |
from dotenv import dotenv_values | |
# Used within calls to create call record in main.py | |
def create_calls(collection, user: UserCall = Body(...)): | |
calls = jsonable_encoder(user) | |
new_calls = collection.insert_one(calls) | |
created_calls = collection.find_one({"_id": new_calls.inserted_id}) | |
return created_calls | |
'''Finding calls based on call id''' | |
def find_call(collection, call_id: str): | |
user_calls = collection.find_one({"call_id": call_id}) | |
if user_calls is not None: | |
return user_calls | |
else: | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with ID: '{call_id}' not found.") | |
'''Finding calls based on user id''' | |
def find_user_calls(collection, user_id: str): | |
user_calls = list(collection.find({"$or": [{"caller_id": user_id}, {"callee_id": user_id}]})) # match on caller or callee ID | |
if len(user_calls): | |
return user_calls | |
else: | |
return [] # return empty list if no existing calls for TranscriptView frontend component | |
def update_calls(collection, call_id: str, calls: UpdateCall = Body(...)): | |
calls = {k: v for k, v in calls.items() if v is not None} | |
print(calls) | |
if len(calls) >= 1: | |
update_result = collection.update_one({"call_id": call_id}, {"$set": calls}) | |
if update_result.modified_count == 0: | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not updated!") | |
if (existing_item := collection.find_one({"call_id": call_id})) is not None: | |
return existing_item | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not found!") | |
def update_captions(call_collection, user_collection, call_id: str, captions: UserCaptions = Body(...)): | |
captions = {k: v for k, v in captions.items() if v is not None} | |
# index user_id from caption object | |
userID = captions["author_id"] | |
# use user id to get user name | |
username = find_name_from_id(user_collection, userID) | |
# add user name to captions json/object | |
captions["author_username"] = username | |
if len(captions) >= 1: | |
update_result = call_collection.update_one({"call_id": call_id}, | |
{"$push": {"captions": captions}}) | |
if update_result.modified_count == 0: | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Captions not updated!") | |
if (existing_item := call_collection.find_one({"call_id": call_id})) is not None: | |
return existing_item | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Captions not found!") | |
def delete_calls(collection, call_id: str): | |
deleted_calls = collection.delete_one({"call_id": call_id}) | |
if deleted_calls.deleted_count == 1: | |
return f"Call deleted sucessfully!" | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not found!") | |
def get_caption_text(collection, call_id, user_id): | |
call_record = find_call((collection), call_id) | |
try: # Check if call has any captions first | |
caption_records = call_record['captions'] | |
except KeyError: | |
return None | |
combined_text = [] | |
for caption_segment in caption_records: | |
if caption_segment['author_id'] == user_id: | |
combined_text.append(caption_segment['original_text']) | |
else: | |
combined_text.append(caption_segment['translated_text']) | |
return " ".join(combined_text) | |
# approximate string matching | |
def fuzzy_search(collection, user_id, query): | |
# drop any existing indexes and create new one | |
collection.drop_indexes() | |
collection.create_index([('captions.original_text', 'text'), ('captions.tranlated_text', 'text')], | |
name='captions') | |
pipeline = [ | |
{ | |
"$search": { | |
"text": { | |
"query": query, | |
"path": {"wildcard": "*"}, | |
"fuzzy": {} | |
} | |
} | |
} | |
] | |
collection_results = list(collection.aggregate(pipeline)) | |
# add all users records to output | |
records = [] | |
for doc in collection_results: | |
if doc['caller_id'] == user_id or doc['callee_id'] == user_id: | |
records.append(doc) | |
return records | |
def summarise(collection, call_id, user_id, target_language): | |
# client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) | |
config = dotenv_values(".env") | |
client = OpenAI(api_key=config["OPENAI_API_KEY"]) | |
# get caption text using call_id | |
caption_text = get_caption_text(collection, call_id, user_id) | |
chat_completion = client.chat.completions.create( | |
messages=[ | |
{ | |
"role": "user", | |
"content": f"The following is an extract from a call transcript. Rewrite this as a structured, clear summary in {target_language}. \ | |
\n\Call Transcript: \"\"\"\n{caption_text}\n\"\"\"\n" | |
} | |
], | |
model="gpt-3.5-turbo", | |
) | |
# Gpt-3.5 turbo has 4096 token limit -> request will fail if exceeded | |
try: | |
result = chat_completion.choices[0].message.content | |
except: | |
return None | |
# BO - add result to mongodb | |
update_result = collection.update_one({"call_id": call_id}, {"$set": {f"summaries.{user_id}": result}}) | |
if update_result.modified_count == 0: | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not updated!") | |
# try parse summary and remove any leading summary prefixes | |
try: | |
return result.split(":")[1].strip() | |
except IndexError: | |
return result | |
def term_extraction(collection, call_id, user_id, target_language): | |
combined_text = get_caption_text(collection, call_id, user_id) | |
if len(combined_text) > 50: # > min_caption_length: -> poor term extraction on short transcripts | |
# Extract Key Terms from Concatenated Caption Field | |
key_terms = extract_terms(combined_text, target_language, len(combined_text)) | |
update_result = collection.update_one({"call_id": call_id}, {"$set": {f"key_terms.{user_id}": key_terms}}) | |
if update_result.modified_count == 0: | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not updated!") | |
return key_terms | |