afro-speech / app.py
import pycountry
import os
import csv
import random
import pandas as pd
import numpy as np
import gradio as gr
from collections import Counter
from article import ARTICLE
from utils import *
import matplotlib.pyplot as plt
import scipy.io.wavfile as wavf
from huggingface_hub import Repository, upload_file
HF_TOKEN = os.environ.get("HF_TOKEN")
NUMBER_DIR = './number'
number_files = [f.name for f in os.scandir(NUMBER_DIR)]
DEFAULT_LIST_OF_COUNTRIES = [country.name for country in pycountry.countries]
DATASET_REPO_URL = "https://huggingface.co/datasets/chrisjay/crowd-speech-africa"
EMAILS_REPO_URL="https://huggingface.co/datasets/chrisjay/african-digits-recording-sprint-email"
REPOSITORY_DIR = "data"
LOCAL_DIR = 'data_local'
os.makedirs(LOCAL_DIR,exist_ok=True)
#DEFAULT_LANGS = {'Igbo':'ibo','Yoruba':'yor','Hausa':'hau'}
GENDER = ['Choose Gender','Male','Female','Other','Prefer not to say']
#------------------Work on Languages--------------------
DEFAULT_LANGS = {}
languages = read_json_lines('clean_languages.json')
languages_lower=[l for l in languages]
_ = [DEFAULT_LANGS.update({l['full'].lower():l['id'].lower()}) for l in languages_lower]
#_ = [DEFAULT_LANGS.update({l_other.lower():[l['id'].lower()]}) for l in languages_lower for l_other in l['others'] if l_other.lower()!=l['full'].lower()]
#------------------Work on Languages--------------------
repo = Repository(
local_dir="data", clone_from=DATASET_REPO_URL, use_auth_token=HF_TOKEN
)
repo.git_pull()
with open('app.css','r') as f:
BLOCK_CSS = f.read()
def save_record(language,text,record,number,age,gender,accent,number_history,current_number,country,email,done_recording):
# set default
number_history = number_history if number_history is not None else [0]
current_number = current_number if current_number is not None else 0
done_recording = done_recording if done_recording is not None else False
#----
# Save text and its corresponding record to flag
speaker_metadata={}
speaker_metadata['gender'] = gender if gender!=GENDER[0] else ''
speaker_metadata['age'] = age if age !='' else ''
speaker_metadata['accent'] = accent if accent!='' else ''
default_record = None
if not done_recording:
if language!=None and language!='Choose language' and record is not None and number is not None:
language = language.lower()
lang_id = DEFAULT_LANGS[language]
text =text.strip()
# Write audio to file
audio_name = get_unique_name()
SAVE_FILE_DIR = os.path.join(LOCAL_DIR,audio_name)
os.makedirs(SAVE_FILE_DIR,exist_ok=True)
audio_output_filename = os.path.join(SAVE_FILE_DIR,'audio.wav')
wavf.write(audio_output_filename,record[0],record[1])
# Write metadata.json to file
json_file_path = os.path.join(SAVE_FILE_DIR,'metadata.jsonl')
metadata= {'id':audio_name,'file_name':'audio.wav',
'language_name':language,'language_id':lang_id,
'number':current_number, 'text':text,'frequency':record[0],
'age': speaker_metadata['age'],'gender': speaker_metadata['gender'],
'accent': speaker_metadata['accent'],
'country':country
}
dump_json(metadata,json_file_path)
# Simply upload the audio file and metadata using the hub's upload_file
# Upload the audio
repo_audio_path = os.path.join(REPOSITORY_DIR,os.path.join(audio_name,'audio.wav'))
_ = upload_file(path_or_fileobj = audio_output_filename,
path_in_repo =repo_audio_path,
repo_id='chrisjay/crowd-speech-africa',
repo_type='dataset',
token=HF_TOKEN
)
# Upload the metadata
repo_json_path = os.path.join(REPOSITORY_DIR,os.path.join(audio_name,'metadata.jsonl'))
_ = upload_file(path_or_fileobj = json_file_path,
path_in_repo =repo_json_path,
repo_id='chrisjay/crowd-speech-africa',
repo_type='dataset',
token=HF_TOKEN
)
output = f'Recording successfully saved! On to the next one...'
# Choose the next number
number_history.append(current_number)
number_choices = [num for num in [i for i in range(10)] if num not in number_history]
if number_choices!=[]:
next_number = random.choice(number_choices)
next_number_image = f'number/{next_number}.jpg'
else:
email_metadata_name = get_unique_name()
EMAIL_SAVE_FILE = os.path.join(LOCAL_DIR,f"{email_metadata_name}.json")
# Write metadata.json to file
email_metadata = {'id':email_metadata_name,'email':email,
'language_name':language,'language_id':lang_id,
'age': speaker_metadata['age'],'gender': speaker_metadata['gender'],
'accent': speaker_metadata['accent'],
'country':country
}
dump_json(email_metadata,EMAIL_SAVE_FILE)
# Upload the metadata
repo_json_path = os.path.join('emails',f"{email_metadata_name}.json")
_ = upload_file(path_or_fileobj = EMAIL_SAVE_FILE,
path_in_repo =repo_json_path,
repo_id='chrisjay/african-digits-recording-sprint-email',
repo_type='dataset',
token=HF_TOKEN
)
# Delete the email from local repo
if os.path.exists(EMAIL_SAVE_FILE):
os.remove(EMAIL_SAVE_FILE)
#-------------------
done_recording=True
next_number = 0 # the default number
next_number_image = f'number/best.gif'
output = "You have finished all recording! You can reload to start again."
output_string = "<html> <body> <div class='output' style='color:green; font-size:13px'>"+output+"</div> </body> </html>"
return output_string,next_number_image,number_history,next_number,done_recording,default_record
if number is None:
output = "Number must be specified!"
if record is None:
output="No recording found!"
if language is None or language=='Choose language':
output = 'Language must be specified!'
output_string = "<html> <body> <div class='output' style='color:green; font-size:13px'>"+output+"</div> </body> </html>"
# return output_string, previous image and state
return output_string, number,number_history,current_number,done_recording,default_record
else:
# Stop submitting recording (best.gif is displaying)
output = '🙌 You have finished all recording! Thank You. You can reload to start again (maybe in another language).'
output_string = "<div class='finished'>"+output+"</div>"
next_number = 0 # the default number
next_number_image = f'number/best.gif'
return output_string,next_number_image,number_history,next_number,done_recording,default_record
def get_metadata_json(path):
try:
return read_json_lines(path)[0]
except Exception:
return []
def plot_bar(value,name,x_name,y_name,title):
fig, ax = plt.subplots(figsize=(10,4),tight_layout=True)
ax.set(xlabel=x_name, ylabel=y_name,title=title)
ax.barh(name, value)
return ax.figure
def get_metadata_of_dataset():
repo.git_pull()
REPOSITORY_DATA_DIR = os.path.join(REPOSITORY_DIR,'data')
repo_recordings = [os.path.join(REPOSITORY_DATA_DIR,f.name) for f in os.scandir(REPOSITORY_DATA_DIR)] if os.path.isdir(REPOSITORY_DATA_DIR) else []
audio_repo = [os.path.join(f,'audio.wav') for f in repo_recordings]
audio_repo = [a.replace('data/data/','https://huggingface.co/datasets/chrisjay/crowd-speech-africa/resolve/main/data/') for a in audio_repo]
metadata_all = [get_metadata_json(os.path.join(f,'metadata.jsonl')) for f in repo_recordings]
metadata_all = [m for m in metadata_all if m!=[]]
return metadata_all
def display_records():
repo.git_pull()
REPOSITORY_DATA_DIR = os.path.join(REPOSITORY_DIR,'data')
repo_recordings = [os.path.join(REPOSITORY_DATA_DIR,f.name) for f in os.scandir(REPOSITORY_DATA_DIR)] if os.path.isdir(REPOSITORY_DATA_DIR) else []
audio_repo = [os.path.join(f,'audio.wav') for f in repo_recordings]
audio_repo = [a.replace('data/data/','https://huggingface.co/datasets/chrisjay/crowd-speech-africa/resolve/main/data/') for a in audio_repo]
metadata_repo = [read_json_lines(os.path.join(f,'metadata.jsonl'))[0] for f in repo_recordings]
audios_all = audio_repo
metadata_all = metadata_repo
langs=[m['language_name'] for m in metadata_all]
audios = [a for a in audios_all]
texts = [m['text'] for m in metadata_all]
numbers = [m['number'] for m in metadata_all]
html = f"""<div class="infoPoint">
<h1> Hooray! We have collected {len(metadata_all)} samples!</h1>
<table style="width:100%; text-align:center">
<tr>
<th>language</th>
<th>audio</th>
<th>number</th>
<th>text</th>
</tr>"""
for lang, audio, text,num_ in zip(langs,audios,texts,numbers):
html+= f"""<tr>
<td>{lang}</td>
<td><audio controls><source src="{audio}" type="audio/wav"> </audio></td>
<td>{num_}</td>
<td>{text}</td>
</tr>"""
html+="</table></div>"
return html
# NUMBERS = [{'image':os.path.join(NUMBER_DIR,f),'number':int(f.split('.')[0])} for f in number_files]
markdown = """<div style="text-align: center"><p style="font-size: 40px"> Africa Crowdsource Speech </p> <br>
This is a platform to contribute to your African language by recording your voice </div>"""
markdown="""
# 🌍 African Digits Recording Sprint
> Record numbers 0-9 in your African language.
1. Fill in your email. This is completely optional. We need this to track your progress for the prize.
__Note:__ You should record all numbers shown till the end. It does not count if you stop mid-way.
2. Choose your African language
3. Fill in the speaker metadata (age, gender, accent). This is optional but important to build better speech models.
4. You will see the image of a number __(this is the number you will record)__.
5. Fill in the word of that number (optional). You can leave this blank.
6. Click record and say the number in your African language.
7. Click ‘Submit’. It will save your record and go to the next number.
8. Repeat 4-7
9. Leave a ❤ in the Space, if you found it fun.
> Please Note: Record as many as times as possible (minimum of 20 and maximum of 200).
"""
PLOTS_FOR_GRADIO = []
FUNCTIONS_FOR_GRADIO = []
# Interface design begins
block = gr.Blocks(css=BLOCK_CSS)
with block:
gr.Markdown(markdown)
email = gr.inputs.Textbox(placeholder='your email',label="Email (Your email is not made public. We need it to consider you for the prize.)",default='')
with gr.Tabs():
with gr.TabItem('Record'):
with gr.Row():
language = gr.inputs.Dropdown(choices = sorted([lang_.title() for lang_ in list(DEFAULT_LANGS.keys())]),label="Choose language",default="Choose language")
age = gr.inputs.Textbox(placeholder='e.g. 21',label="Your age (optional)",default='')
gender = gr.inputs.Dropdown(choices=GENDER, type="value", default=None, label="Gender (optional)")
accent = gr.inputs.Textbox(label="Accent (optional)",default='')
country = gr.Dropdown(choices=[''] + sorted(DEFAULT_LIST_OF_COUNTRIES),type='value',default=None,label="Country you are recording from (optional)")
number = gr.Image('number/0.jpg',image_mode="L")
text = gr.inputs.Textbox(placeholder='e.g. `one` is `otu` in Igbo or `ọkan` in Yoruba',label="How is the number called in your language (optional)")
record = gr.Audio(source="microphone",label='Record your voice')
output_result = gr.outputs.HTML()
state = gr.Variable()
current_number = gr.Variable()
done_recording = gr.Variable() # Signifies when to stop submitting records even if `submit`` is clicked
save = gr.Button("Submit")
save.click(save_record, inputs=[language,text,record,number,age,gender,accent,state,current_number,country,email,done_recording],outputs=[output_result,number,state,current_number,done_recording,record])
with gr.TabItem('Dashboard') as listen_tab:
gr.Markdown("Statistics on the recordings contributed. You can find the dataset <a href='https://huggingface.co/datasets/chrisjay/crowd-speech-africa' target='blank'>here</a>.")
display_html = gr.HTML("""<div style="color: green">
<p> ⌛ Please wait. Loading dashboard... </p>
</div>
""")
plot = gr.Plot(type="matplotlib")
metadata_all = get_metadata_of_dataset()
def show_records():
global PLOTS_FOR_GRADIO
global FUNCTIONS_FOR_GRADIO
assert len(PLOTS_FOR_GRADIO) == len(FUNCTIONS_FOR_GRADIO), f"Function output and gradio plots must be the same length! \n Found: function => {len(FUNCTIONS_FOR_GRADIO)} and gradio plots => {len(PLOTS_FOR_GRADIO)}."
langs=[m['language_name'] for m in metadata_all]
all_genders = [m['gender'] for m in metadata_all
]
lang_dict = Counter(langs)
lang_dict.update({'All others':0})
all_langs = list(lang_dict.keys())
langs_count = [lang_dict[k] for k in all_langs]
plt_ = plot_bar(langs_count,all_langs,'Number of audio samples',"Language",'Distribution of audio samples over languages')
html = f"""<div class="infoPoint">
<h1> Hooray! We have collected {len(metadata_all)} samples!</h1>
"""
return [html,plt_]+FUNCTIONS_FOR_GRADIO
languages = list(Counter([m['language_name'] for m in metadata_all]).keys())
for language in languages:
with gr.Row() as row_lang:
metadata_for_language = [m for m in metadata_all if m['language_name']==language]
gender_for_language = [m['gender'] for m in metadata_for_language]
digits_for_language = [m['number'] for m in metadata_for_language]
gender_for_language = [g if g!="" else 'Not given' for g in gender_for_language]
digits_dict = Counter(digits_for_language)
gender_dict = Counter(gender_for_language)
digits_name_for_language = list(digits_dict.keys())
digits_count_for_language = [digits_dict[k] for k in digits_name_for_language]
gender_name_for_language = list(gender_dict.keys())
gender_count_for_language = [gender_dict[k] for k in gender_name_for_language]
plot_digits = gr.Plot(type="matplotlib")
plot_gender = gr.Plot(type="matplotlib")
PLOTS_FOR_GRADIO.append(plot_digits)
PLOTS_FOR_GRADIO.append(plot_gender)
def plot_metadata_for_language():
plt_digits = plot_bar(digits_count_for_language,digits_name_for_language,'Number of audio samples',"Digit",f"Distribution of audio samples over digits for {language.upper()} ")
plt_gender = plot_bar(gender_count_for_language,gender_name_for_language,'Number of audio samples',"Gender",f"Distribution of audio samples over gender for {language.upper()}")
return plt_digits, plt_gender
output_digits,ouput_gender = plot_metadata_for_language()
FUNCTIONS_FOR_GRADIO.append(output_digits)
FUNCTIONS_FOR_GRADIO.append(ouput_gender)
#listen = gr.Button("Listen")
listen_tab.select(show_records,inputs=[],outputs=[display_html,plot]+PLOTS_FOR_GRADIO)
# Have a list of the languages. lang
# We want digits per language and gender per language
# for l in range(len(lang),step =4)
# with Row().... d
gr.Markdown(ARTICLE)
block.launch()