Spaces:
Paused
Paused
File size: 9,788 Bytes
9680844 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
from fastapi import Body, Request, HTTPException, status
from fastapi.encoders import jsonable_encoder
import sys
from ..models.calls import UpdateCall, UserCall, UserCaptions
from ..operations.users import *
from utils.text_rank import extract_terms
from openai import OpenAI
from time import sleep
import os
from dotenv import dotenv_values
# Used within calls to create call record in main.py
def create_calls(collection, user: UserCall = Body(...)):
calls = jsonable_encoder(user)
new_calls = collection.insert_one(calls)
created_calls = collection.find_one({"_id": new_calls.inserted_id})
return created_calls
def list_calls(collection, limit: int):
try:
calls = collection.find(limit = limit)
return list(calls)
except:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"No existing call records yet.")
'''Finding calls based on call id'''
def find_call(collection, call_id: str):
user_calls = collection.find_one({"call_id": call_id})
if user_calls is not None:
return user_calls
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with ID: '{call_id}' not found.")
'''Finding calls based on user id'''
def find_user_calls(collection, user_id: str):
user_calls = list(collection.find({"$or": [{"caller_id": user_id}, {"callee_id": user_id}]})) # match on caller or callee ID
if len(user_calls):
return user_calls
else:
return [] # return empty list if no existing calls for TranscriptView frontend component
'''Finding calls based on key terms list'''
def list_transcripts_by_key_terms(collection, key_terms_list: list[str] = Body(...)):
key_terms_list = jsonable_encoder(key_terms_list)
call_records = list(collection.find({"key_terms": {"$in": key_terms_list}}, {'_id': 0})) # exclude returning ObjectID in find()
# Check if any call records were returned
if len(call_records):
return call_records
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with key terms: '{key_terms_list}' not found!")
'''Finding calls based on date ranges'''
def list_transcripts_by_dates(collection, start_date: str, end_date: str):
# print(start_date, end_date)
# Convert strings to date string in YYYY-MM-ddT00:00:00 format
start_date = f'{start_date}T00:00:00'
end_date = f'{end_date}T00:00:00'
call_records = list(collection.find({"date":{"$gte": start_date, "$lte": end_date}}))
if len(call_records):
return call_records
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with creation date between: '{start_date} - {end_date}' not found!")
'''Finding calls based on call lengths'''
def list_transcripts_by_duration(collection, min_len: int, max_len: int):
call_records = list(collection.find({"duration":{"$gte": min_len, "$lte": max_len}}))
if len(call_records):
return call_records
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call with duration between: '{min_len} - {max_len}' milliseconds not found!")
def update_calls(collection, call_id: str, calls: UpdateCall = Body(...)):
# calls = {k: v for k, v in calls.model_dump().items() if v is not None} #loop in the dict
calls = {k: v for k, v in calls.items() if v is not None} #loop in the dict
print(calls)
if len(calls) >= 1:
update_result = collection.update_one({"call_id": call_id}, {"$set": calls})
if update_result.modified_count == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not updated!")
if (existing_item := collection.find_one({"call_id": call_id})) is not None:
return existing_item
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not found!")
def update_captions(call_collection, user_collection, call_id: str, captions: UserCaptions = Body(...)):
# captions = {k: v for k, v in calls.model_dump().items() if v is not None}
captions = {k: v for k, v in captions.items() if v is not None}
# print(captions)
# index user_id from caption object
userID = captions["author_id"]
# print(userID)
# use user id to get user name
username = find_name_from_id(user_collection, userID)
# print(username)
# add user name to captions json/object
captions["author_username"] = username
# print(captions)
if len(captions) >= 1:
update_result = call_collection.update_one({"call_id": call_id},
{"$push": {"captions": captions}})
if update_result.modified_count == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Captions not updated!")
if (existing_item := call_collection.find_one({"call_id": call_id})) is not None:
return existing_item
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Captions not found!")
def delete_calls(collection, call_id: str):
deleted_calls = collection.delete_one({"call_id": call_id})
if deleted_calls.deleted_count == 1:
return f"Call deleted sucessfully!"
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not found!")
# def get_caption_text(collection, call_id):
# call_record = find_call((collection), call_id)
# try: # Check if call has any captions first
# caption_records = call_record['captions']
# except KeyError:
# return None
# # iterate through caption embedded document and store original text
# combined_text = [caption['original_text'] for caption in caption_records]
# return " ".join(combined_text)
def get_caption_text(collection, call_id, user_id):
call_record = find_call((collection), call_id)
try: # Check if call has any captions first
caption_records = call_record['captions']
except KeyError:
return None
# iterate through caption embedded document and store original text
# combined_text = [caption['original_text'] for caption in caption_records]
combined_text = []
for caption_segment in caption_records:
if caption_segment['author_id'] == user_id:
combined_text.append(caption_segment['original_text'])
else:
combined_text.append(caption_segment['translated_text'])
return " ".join(combined_text)
# standard exact match based full text search
def full_text_search(collection, query):
# drop any existing indexes and create new one
collection.drop_indexes()
collection.create_index([('captions.original_text', 'text'), ('captions.tranlated_text', 'text')],
name='captions')
# print(collection.index_information())
results = list(collection.find({"$text": {"$search": query}}))
return results
# approximate string matching
def fuzzy_search(collection, query):
# drop any existing indexes and create new one
collection.drop_indexes()
collection.create_index([('captions.original_text', 'text'), ('captions.tranlated_text', 'text')],
name='captions')
# print(collection.index_information())
pipeline = [
{
"$search": {
"text": {
"query": query,
"path": {"wildcard": "*"},
"fuzzy": {}
}
}
}
]
collection_results = list(collection.aggregate(pipeline))
# print(collection_results)
return collection_results
def summarise(collection, call_id, user_id, target_language):
# client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
config = dotenv_values(".env")
client = OpenAI(api_key=config["OPENAI_API_KEY"])
# get caption text using call_id
caption_text = get_caption_text(collection, call_id, user_id)
chat_completion = client.chat.completions.create(
messages=[
{
"role": "user",
"content": f"The following is an extract from a call transcript. Rewrite this as a structured, clear summary in {target_language}. \
\n\Call Transcript: \"\"\"\n{caption_text}\n\"\"\"\n"
}
],
model="gpt-3.5-turbo",
)
# Gpt-3.5 turbo has 4096 token limit -> request will fail if exceeded
try:
result = chat_completion.choices[0].message.content.split(":")[1].strip() # parse summary
except:
return None
# BO - add result to mongodb -> should be done asynchronously
# summary_payload = {"summaries": {user_id: result}}
update_result = collection.update_one({"call_id": call_id}, {"$set": {f"summaries.{user_id}": result}})
if update_result.modified_count == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not updated!")
# if (existing_item := collection.find_one({"call_id": call_id})) is not None:
# print(existing_item)
return result
def term_extraction(collection, call_id, user_id, target_language):
combined_text = get_caption_text(collection, call_id, user_id)
if len(combined_text) > 50: # > min_caption_length: -> poor term extraction on short transcripts
# Extract Key Terms from Concatenated Caption Field
key_terms = extract_terms(combined_text, target_language, len(combined_text))
update_result = collection.update_one({"call_id": call_id}, {"$set": {f"key_terms.{user_id}": key_terms}})
if update_result.modified_count == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Call not updated!")
return key_terms
|