Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import pymongo | |
import pandas as pd | |
from bson import ObjectId | |
from dotenv import load_dotenv | |
import requests | |
from datetime import timedelta | |
from collections import defaultdict | |
load_dotenv() | |
video_status = { | |
"Not Assigned": "NOT_STARTED", | |
"Under Review": "NOT_STARTED", | |
"Dubbing Started": "DUBBING_STARTED", | |
"Waiting QA": "WAITING_QA", | |
"Sent to Client": "SENT_TO_CLIENT", | |
"Approved": "APPROVED", | |
"Rejected": "REJECTED", | |
"Churned": "CHURNED", | |
"Re-review Requested": "RE_REVIEW_REQUESTED" | |
} | |
HEADERS = {"x-api-key": os.environ.get('X-API-KEY'), "Content-Type": "application/json"} | |
ALL_LANGUAGES = ['Hindi', 'English', 'Bengali', 'Kannada', 'Spanish', 'French', 'German', 'Italian', 'Tamil', 'Telugu'] | |
language_maps = { | |
"Hindi": "hi", | |
"English": "en", | |
"Spanish": "es", | |
"French": "fr", | |
"German": "de", | |
"Italian": "it", | |
"Tamil": "ta", | |
"Telugu": "te", | |
"Bengali": "bn", | |
"Kannada": "kn", | |
"hi": "Hindi", | |
"en": "English", | |
"es": "Spanish", | |
"fr": "French", | |
"de": "German", | |
"it": "Italian", | |
"ta": "Tamil", | |
"te": "Telugu", | |
"bn": "Bengali", | |
"kn": "Kannada" | |
} | |
def print_slack(platform, video_id, translator_name, assignment="translator"): | |
if platform == "dev": | |
return | |
else: | |
client = pymongo.MongoClient(os.environ.get("MONGO_URI")) | |
source_db = client.deepsync_prod | |
headers = { | |
'Content-type': 'application/json', | |
} | |
video = source_db.dubbedvideos.find_one({"_id": ObjectId(video_id)}) | |
video_name = video['title'] | |
source_language = language_maps[video['sourceLanguage'][:2]] | |
target_language = language_maps[video['targetLanguage'][:2]] | |
video_duration = str(timedelta(seconds=video['duration'])) | |
translator_details = "/".join(translator_name.split("/")[:3]) | |
message = f"""```{'Translator' if assignment == 'translator' else 'QA'} Assigned | |
Video Name : {video_name} | |
Source Language : {source_language} | |
Target Language : {target_language} | |
Video Duration : {video_duration} hours | |
Details : {translator_details} | |
``` | |
""" | |
json_data = { | |
'text': message, | |
} | |
response = requests.post( | |
os.environ.get('SLACK_WEBHOOK'), | |
headers=headers, | |
json=json_data, | |
) | |
client.close() | |
def get_translators(platform): | |
if platform == "dev": | |
api_url = "https://api.dev-env.deepsync.co/api/v1" | |
client = pymongo.MongoClient(os.environ.get("MONGO_DEV_URI")) | |
source_db = client.deepsync_stage | |
else: | |
api_url = "https://api.deepsync.co/api/v1" | |
client = pymongo.MongoClient(os.environ.get("MONGO_URI")) | |
source_db = client.deepsync_prod | |
response = requests.get(f"{api_url}/dashboard/translators", headers=HEADERS) | |
translators = list(filter(lambda x: not x.get('isDeleted', False), response.json()['data'])) | |
translators_data = [] | |
if len(translators): | |
desired_ids = [ObjectId(t["translator"]) for t in translators] | |
companies_cursor = source_db.companies.find({'_id': {'$in': desired_ids}}) | |
companies = {c["_id"]: c for c in companies_cursor} | |
for translator in translators: | |
try: | |
data = companies[ObjectId(translator["translator"])] | |
translators_data.append({ | |
"_id": str(translator["_id"]), | |
"Name": data["name"], | |
"Email": data["email"], | |
"Source Language": ", ".join(list(map(lambda x: language_maps[x], translator["sourceAccent"]))), | |
"Target Language": ", ".join(list(map(lambda x: language_maps[x], translator["targetAccent"]))) | |
}) | |
except Exception as e: | |
print(e) | |
pass | |
client.close() | |
return pd.DataFrame(translators_data) | |
def add_translator(platform, email_id, source_langs, target_langs, contact_num, pricing): | |
if platform == "dev": | |
api_url = "https://api.dev-env.deepsync.co/api/v1" | |
client = pymongo.MongoClient(os.environ.get("MONGO_DEV_URI")) | |
source_db = client.deepsync_stage | |
else: | |
api_url = "https://api.deepsync.co/api/v1" | |
client = pymongo.MongoClient(os.environ.get("MONGO_URI")) | |
source_db = client.deepsync_prod | |
try: | |
company_id_validity = source_db.companies.find_one({"email": email_id}) | |
except: | |
company_id_validity = None | |
if company_id_validity is None: | |
return "Invalid Company ID. Unable to add translator." | |
if source_langs is None: | |
source_accents = [] | |
else: | |
source_accents = [language_maps[l] for l in source_langs] | |
if target_langs is None: | |
target_accents = [] | |
else: | |
target_accents = [language_maps[l] for l in target_langs] | |
main_data = { | |
"targetAccent": target_accents, | |
"sourceAccent": source_accents, | |
"contactNum": None if len(contact_num.strip()) == 0 else contact_num.strip(), | |
"pricing": None if pricing is None or pricing == -1 else pricing | |
} | |
data = {"translator": str(company_id_validity["_id"]), **main_data} | |
response = requests.post(f"{api_url}/dashboard/translator", json=data, headers=HEADERS) | |
if str(response.status_code).startswith('4') or str(response.status_code).startswith('5'): | |
if response.json()['message'].strip() == "Translator already exists": | |
translator_main_id = source_db.translators.find_one({"translator": company_id_validity['_id']}) | |
response = requests.patch(f"{api_url}/dashboard/translators/{translator_main_id['_id']}", json={"isDeleted": False, **main_data}, headers=HEADERS) | |
if str(response.status_code).startswith('4') or str(response.status_code).startswith('5'): | |
return f"{response.json()['message']} ({response.status_code})" | |
return f"Translator added succesfully, ID : {response.json()['data']['_id']}" | |
return f"{response.json()['message']} ({response.status_code})" | |
client.close() | |
return f"Translator added succesfully, ID : {response.json()['data']['_id']}" | |
def get_translator_names(platform): | |
if platform == "dev": | |
api_url = "https://api.dev-env.deepsync.co/api/v1" | |
client = pymongo.MongoClient(os.environ.get("MONGO_DEV_URI")) | |
source_db = client.deepsync_stage | |
else: | |
api_url = "https://api.deepsync.co/api/v1" | |
client = pymongo.MongoClient(os.environ.get("MONGO_URI")) | |
source_db = client.deepsync_prod | |
response = requests.get(f"{api_url}/dashboard/translators", headers=HEADERS) | |
translators = list(filter(lambda x: not x.get('isDeleted', False), response.json()['data'])) | |
translators_names = [] | |
if len(translators): | |
desired_ids = [ObjectId(t["translator"]) for t in translators] | |
companies_cursor = source_db.companies.find({'_id': {'$in': desired_ids}}) | |
companies = {c["_id"]: c for c in companies_cursor} | |
for translator in translators: | |
try: | |
data = companies[ObjectId(translator["translator"])] | |
translators_names.append(f'{data["name"]} / {data["email"]} / {translator["_id"]}') | |
except Exception as e: | |
print(e) | |
pass | |
client.close() | |
return gr.Dropdown.update(choices=translators_names) | |
def select_current_data(platform, translator_name): | |
if translator_name is None: | |
return "Please select a translator." | |
name, email, translator_id = list(map(lambda x: x.strip(), translator_name.split("/"))) | |
if platform == "dev": | |
api_url = "https://api.dev-env.deepsync.co/api/v1" | |
else: | |
api_url = "https://api.deepsync.co/api/v1" | |
response = requests.get(f"{api_url}/dashboard/translators", headers=HEADERS) | |
translators = list(filter(lambda x: not x.get('isDeleted', False) and x["_id"] == translator_id, response.json()['data'])) | |
if len(translators): | |
translator = translators[0] | |
else: | |
raise Exception("Translator", translator_name, "not found.") | |
return gr.CheckboxGroup.update(value=[language_maps[l] for l in translator["sourceAccent"]]), gr.CheckboxGroup.update(value=[language_maps[l] for l in translator["targetAccent"]]), gr.Textbox.update(value="" if translator["contactNum"] is None else translator["contactNum"]), gr.Textbox.update(value=-1 if translator["pricing"] is None else translator["pricing"]) | |
def update_translator(platform, translator_name, source_languages, target_languages, contact_num, pricing): | |
if translator_name is None: | |
return "Please select a translator." | |
name, email, translator_id = list(map(lambda x: x.strip(), translator_name.split("/"))) | |
if platform == "dev": | |
api_url = "https://api.dev-env.deepsync.co/api/v1" | |
else: | |
api_url = "https://api.deepsync.co/api/v1" | |
response = requests.get(f"{api_url}/dashboard/translators", headers=HEADERS) | |
translators = list(filter(lambda x: not x.get('isDeleted', False), response.json()['data'])) | |
if len(translators): | |
translator = translators[0] | |
else: | |
raise Exception("Translator", translator_name, "not found.") | |
source_accents = [language_maps[l] for l in source_languages] | |
target_accents = [language_maps[l] for l in target_languages] | |
updateable_data = { | |
"sourceAccent": source_accents, | |
"targetAccent": target_accents, | |
"contactNum": "" if contact_num is None else contact_num.strip(), | |
"pricing": 0 if pricing is None else pricing | |
} | |
response = requests.patch(f"{api_url}/dashboard/translators/{translator_id}", json=updateable_data, headers=HEADERS) | |
if str(response.status_code).startswith('4') or str(response.status_code).startswith('5'): | |
return response.json()['message'] | |
return f"Updated translator details succesfully." | |
def get_videos_under_review(platform, status_key): | |
status_keys = [status_key] | |
if platform == "dev": | |
api_url = "https://api.dev-env.deepsync.co/api/v1" | |
else: | |
api_url = "https://api.deepsync.co/api/v1" | |
if status_keys is None or len(status_keys) == 0: | |
API_URL = f"{api_url}/dashboard/dubbed/videos/review/requested" | |
response = requests.get(API_URL, headers=HEADERS) | |
if response.status_code == 200: | |
options = [f"{str(v['_id'])} / {v['title']}" for v in response.json()['data'] if 'title' in v] | |
return gr.Dropdown.update(choices=options, value=None), "" | |
else: | |
return gr.Dropdown.update(choices=[], value=None), "" | |
else: | |
options = [] | |
count = {} | |
for status in status_keys: | |
API_URL = f"{api_url}/dashboard/dubbed/videos/review/requested/{video_status[status]}" | |
response = requests.get(API_URL, headers=HEADERS) | |
if response.status_code == 200: | |
response_data = response.json()['data'] | |
if status == "Not Assigned": | |
response_data = list(filter(lambda x: x["videoReviewInformation"].get("assignedTranslator") is None, response_data)) | |
if status == "Under Review": | |
response_data = list(filter(lambda x: x["videoReviewInformation"].get("assignedTranslator") is not None, response_data)) | |
current_options = [f"{str(v['_id'])} / {v['title']}" for v in response_data if 'title' in v] | |
count[status] = len(current_options) | |
options.extend(current_options) | |
data_text = "\n".join([f"{k} : {v} videos" for k,v in count.items()]) | |
return gr.Dropdown.update(choices=options, value=None), data_text | |
def delete_translator_func(platform, translator_name): | |
if translator_name is None: | |
return "Please select a translator." | |
name, email, translator_id = list(map(lambda x: x.strip(), translator_name.split("/"))) | |
if platform == "dev": | |
api_url = "https://api.dev-env.deepsync.co/api/v1" | |
else: | |
api_url = "https://api.deepsync.co/api/v1" | |
response = requests.patch(f"{api_url}/dashboard/translators/{translator_id}", json={"isDeleted": True}, headers=HEADERS) | |
if str(response.status_code).startswith('4') or str(response.status_code).startswith('5'): | |
return response.json()['message'] | |
return "Translator deleted." | |
def get_traslators_assign(platform, video): | |
video_id = video.split("/", maxsplit=1)[0].strip() | |
if platform == "dev": | |
api_url = "https://api.dev-env.deepsync.co/api/v1" | |
client = pymongo.MongoClient(os.environ.get("MONGO_DEV_URI")) | |
source_db = client.deepsync_stage | |
else: | |
api_url = "https://api.deepsync.co/api/v1" | |
client = pymongo.MongoClient(os.environ.get("MONGO_URI")) | |
source_db = client.deepsync_prod | |
video = source_db.dubbedvideos.find_one({"_id": ObjectId(video_id)}) | |
source_language = video["sourceLanguage"].split("-")[0] | |
target_language = video["targetLanguage"].split("-")[0] | |
translators = list(source_db.translators.find({"sourceAccent": {"$elemMatch": {"$eq": source_language}}, "targetAccent": {"$elemMatch": {"$eq": target_language}}})) | |
translators_list = [] | |
pending_videos = [] | |
for status in ["NOT_STARTED", "DUBBING_STARTED", "WAITING_QA"]: | |
API_URL = f"{api_url}/dashboard/dubbed/videos/review/requested/{status}" | |
response = requests.get(API_URL, headers=HEADERS) | |
if response.status_code == 200: | |
response_data = response.json()['data'] | |
pending_videos.extend(response_data) | |
translator_pv_count = defaultdict(lambda: 0) | |
for pv in pending_videos: | |
if pv.get("videoReviewInformation", {}).get("assignedTranslator") is not None: | |
trs = str(pv.get("videoReviewInformation", {}).get("assignedTranslator")) | |
translator_pv_count[trs] += 1 | |
if len(translators): | |
desired_ids = [t["translator"] for t in translators] | |
companies_cursor = source_db.companies.find({'_id': {'$in': desired_ids}}) | |
companies = {c["_id"]: c for c in companies_cursor} | |
for translator in translators: | |
company = companies[translator["translator"]] | |
translators_list.append(f"{company['name']} / {company['email']} / {str(translator['_id'])} / Pending Videos: {translator_pv_count[str(translator['_id'])]}") | |
button = gr.Button.update(interactive=False) | |
current_translator = None | |
current_qa = None | |
if "videoReviewInformation" in video: | |
if video["videoReviewInformation"]["status"] in {"NOT_STARTED", "DUBBING_STARTED", "WAITING_QA"}: | |
button = gr.Button.update(interactive=True) | |
if video["videoReviewInformation"].get("assignedTranslator") is not None: | |
str_id = str(video["videoReviewInformation"].get("assignedTranslator")) | |
value = [v for v in translators_list if str_id in v] | |
current_translator = value[0] if len(value) else None | |
if video["videoReviewInformation"].get("assignedQA") is not None: | |
str_id = str(video["videoReviewInformation"].get("assignedQA")) | |
value = [v for v in translators_list if str_id in v] | |
current_qa = value[0] if len(value) else None | |
client.close() | |
return gr.Dropdown.update(choices=translators_list, value=current_translator), gr.Dropdown.update(choices=translators_list, value=current_qa), language_maps[source_language], language_maps[target_language], button, button | |
def assign_translator(platform, translator, video): | |
translator_id = translator.split("/")[2].strip() | |
video_id = video.split("/", maxsplit=1)[0].strip() | |
role = "translator" | |
if platform == "dev": | |
api_url = "https://api.dev-env.deepsync.co/api/v1" | |
else: | |
api_url = "https://api.deepsync.co/api/v1" | |
data = { | |
"videoId": video_id, | |
"userId": translator_id | |
} | |
response = requests.post(f"{api_url}/dashboard/dubbed/video/assign/{role}", json=data, headers=HEADERS) | |
if str(response.status_code).startswith('2'): | |
print_slack(platform, video_id, translator, "translator") | |
return "Successfully assigned." | |
else: | |
return str(response.status_code) | |
def assign_qa(platform, qa, video): | |
translator_id = qa.split("/")[2].strip() | |
video_id = video.split("/", maxsplit=1)[0].strip() | |
role = "QA" | |
if platform == "dev": | |
api_url = "https://api.dev-env.deepsync.co/api/v1" | |
else: | |
api_url = "https://api.deepsync.co/api/v1" | |
data = { | |
"videoId": video_id, | |
"userId": translator_id | |
} | |
response = requests.post(f"{api_url}/dashboard/dubbed/video/assign/{role}", json=data, headers=HEADERS) | |
if str(response.status_code).startswith('2'): | |
print_slack(platform, video_id, qa, "qa") | |
return "Successfully assigned." | |
else: | |
return str(response.status_code) | |
with gr.Blocks() as demo: | |
gr.Markdown("# Translators Admin Dashboard") | |
with gr.Tab("List"): | |
source_platform = gr.Radio(["dev", "prod"], value="prod", label="Source Platform") | |
refresh_list = gr.Button("Load") | |
translators_list = gr.Dataframe(label="Translators", interactive=False, max_rows=20, overflow_row_behaviour="paginate") | |
with gr.Tab("Add"): | |
gr.Markdown("Add a new translator") | |
source_platform_add = gr.Radio(["dev", "prod"], value="prod", label="Source Platform") | |
email_id = gr.Textbox(label="Email ID", lines=1) | |
source_languages = gr.CheckboxGroup(ALL_LANGUAGES, label="Source Languages") | |
target_languages = gr.CheckboxGroup(ALL_LANGUAGES, label="Target Languages") | |
contact_num_box = gr.Textbox(label="Contact Number", lines=1) | |
pricing_box = gr.Number(label="Pricing", value=-1) | |
submit = gr.Button("Submit") | |
result = gr.Textbox(label="Logs") | |
with gr.Tab("Update"): | |
gr.Markdown("Update Translator") | |
source_platform_update = gr.Radio(["dev", "prod"], value="prod", label="Source Platform") | |
refresh_update_platform = gr.Button("Load") | |
translator_name = gr.Dropdown([], label="Translators") | |
source_languages_update = gr.CheckboxGroup(ALL_LANGUAGES, label="Source Languages") | |
target_languages_update = gr.CheckboxGroup(ALL_LANGUAGES, label="Target Languages") | |
contact_num_update = gr.Textbox(label="Contact Number", lines=1) | |
pricing_update = gr.Number(label="Pricing") | |
update_language = gr.Button("Update") | |
delete_translator = gr.Button("Delete Translator") | |
result_update = gr.Textbox(label="Logs") | |
with gr.Tab("Assign"): | |
gr.Markdown("Assign Translator / QA") | |
source_platform_assign = gr.Radio(["dev", "prod"], value="prod", label="Source Platform") | |
filter_videos_assign = gr.Radio(['Not Assigned', 'Under Review', 'Dubbing Started', 'Waiting QA', 'Sent to Client', 'Approved', 'Rejected', 'Churned', 'Re-review Requested'], label="Filter videos. Do not select any of them for videos without request info.") | |
video_details = gr.Textbox(label="Video Details") | |
refresh_assign = gr.Button("Load Videos") | |
under_review_videos_list = gr.Dropdown([], label="Video IDs") | |
with gr.Row(): | |
source_language_box = gr.Textbox(label="Source Language") | |
target_language_box = gr.Textbox(label="Target Language") | |
with gr.Row(): | |
translator_name_assign = gr.Dropdown([], label="Translator") | |
submit_tr_assign_button = gr.Button("Assign Translator") | |
with gr.Row(): | |
qa_name_assign = gr.Dropdown([], label="QA") | |
submit_qa_assign_button = gr.Button("Assign QA") | |
assign_logs = gr.Textbox(label="Logs") | |
# For list | |
refresh_list.click(get_translators, source_platform, translators_list) | |
source_platform.change(get_translators, source_platform, translators_list) | |
# For add | |
submit.click(add_translator, [source_platform_add, email_id, source_languages, target_languages, contact_num_box, pricing_box], result) | |
# For Update | |
refresh_update_platform.click(get_translator_names, source_platform_update, translator_name) | |
source_platform_update.change(get_translator_names, source_platform_update, translator_name) | |
translator_name.change(select_current_data, [source_platform_update, translator_name], [source_languages_update, target_languages_update, contact_num_update, pricing_update]) | |
update_language.click(update_translator, [source_platform_update, translator_name, source_languages_update, target_languages_update, contact_num_update, pricing_update], result_update) | |
delete_translator.click(delete_translator_func, [source_platform_update, translator_name], result_update) | |
# For Assign | |
refresh_assign.click(get_videos_under_review, [source_platform_assign, filter_videos_assign], [under_review_videos_list, video_details]) | |
under_review_videos_list.change(get_traslators_assign, [source_platform_assign, under_review_videos_list], [translator_name_assign, qa_name_assign, source_language_box, target_language_box, submit_tr_assign_button, submit_qa_assign_button]) | |
submit_tr_assign_button.click(assign_translator, [source_platform_assign, translator_name_assign, under_review_videos_list], assign_logs) | |
submit_qa_assign_button.click(assign_qa, [source_platform_assign, qa_name_assign, under_review_videos_list], assign_logs) | |
if __name__=="__main__": | |
demo.launch(auth=(os.environ.get("GRADIO_USERNAME"), os.environ.get("GRADIO_PASSWORD"))) |