from ctypes.wintypes import LANGID from email.policy import default import pycountry import os import csv import random import pandas as pd import numpy as np import gradio as gr from collections import Counter from article import ARTICLE from utils import * import matplotlib.pyplot as plt import scipy.io.wavfile as wavf from huggingface_hub import Repository, upload_file HF_TOKEN = os.environ.get("HF_TOKEN") NUMBER_DIR = './number' number_files = [f.name for f in os.scandir(NUMBER_DIR)] DEFAULT_LIST_OF_COUNTRIES = [country.name for country in pycountry.countries] DATASET_REPO_URL = "https://huggingface.co/datasets/chrisjay/crowd-speech-africa" EMAILS_REPO_URL="https://huggingface.co/datasets/chrisjay/african-digits-recording-sprint-email" REPOSITORY_DIR = "data" LOCAL_DIR = 'data_local' os.makedirs(LOCAL_DIR,exist_ok=True) #DEFAULT_LANGS = {'Igbo':'ibo','Yoruba':'yor','Hausa':'hau'} GENDER = ['Choose Gender','Male','Female','Other','Prefer not to say'] #------------------Work on Languages-------------------- DEFAULT_LANGS = {} languages = read_json_lines('clean_languages.json') languages_lower=[l for l in languages] _ = [DEFAULT_LANGS.update({l['full'].lower():l['id'].lower()}) for l in languages_lower] #_ = [DEFAULT_LANGS.update({l_other.lower():[l['id'].lower()]}) for l in languages_lower for l_other in l['others'] if l_other.lower()!=l['full'].lower()] #------------------Work on Languages-------------------- repo = Repository( local_dir="data", clone_from=DATASET_REPO_URL, use_auth_token=HF_TOKEN ) repo.git_pull() with open('app.css','r') as f: BLOCK_CSS = f.read() def save_record(language,text,record,number,age,gender,accent,number_history,current_number,country,email,done_recording): # set default number_history = number_history if number_history is not None else [0] current_number = current_number if current_number is not None else 0 done_recording = done_recording if done_recording is not None else False #---- # Save text and its corresponding record to flag speaker_metadata={} speaker_metadata['gender'] = gender if gender!=GENDER[0] else '' speaker_metadata['age'] = age if age !='' else '' speaker_metadata['accent'] = accent if accent!='' else '' default_record = None if not done_recording: if language!=None and language!='Choose language' and record is not None and number is not None: language = language.lower() lang_id = DEFAULT_LANGS[language] text =text.strip() # Write audio to file audio_name = get_unique_name() SAVE_FILE_DIR = os.path.join(LOCAL_DIR,audio_name) os.makedirs(SAVE_FILE_DIR,exist_ok=True) audio_output_filename = os.path.join(SAVE_FILE_DIR,'audio.wav') wavf.write(audio_output_filename,record[0],record[1]) # Write metadata.json to file json_file_path = os.path.join(SAVE_FILE_DIR,'metadata.jsonl') metadata= {'id':audio_name,'file_name':'audio.wav', 'language_name':language,'language_id':lang_id, 'number':current_number, 'text':text,'frequency':record[0], 'age': speaker_metadata['age'],'gender': speaker_metadata['gender'], 'accent': speaker_metadata['accent'], 'country':country } dump_json(metadata,json_file_path) # Simply upload the audio file and metadata using the hub's upload_file # Upload the audio repo_audio_path = os.path.join(REPOSITORY_DIR,os.path.join(audio_name,'audio.wav')) _ = upload_file(path_or_fileobj = audio_output_filename, path_in_repo =repo_audio_path, repo_id='chrisjay/crowd-speech-africa', repo_type='dataset', token=HF_TOKEN ) # Upload the metadata repo_json_path = os.path.join(REPOSITORY_DIR,os.path.join(audio_name,'metadata.jsonl')) _ = upload_file(path_or_fileobj = json_file_path, path_in_repo =repo_json_path, repo_id='chrisjay/crowd-speech-africa', repo_type='dataset', token=HF_TOKEN ) output = f'Recording successfully saved! On to the next one...' # Choose the next number number_history.append(current_number) number_choices = [num for num in [i for i in range(10)] if num not in number_history] if number_choices!=[]: next_number = random.choice(number_choices) next_number_image = f'number/{next_number}.jpg' else: email_metadata_name = get_unique_name() EMAIL_SAVE_FILE = os.path.join(LOCAL_DIR,f"{email_metadata_name}.json") # Write metadata.json to file email_metadata = {'id':email_metadata_name,'email':email, 'language_name':language,'language_id':lang_id, 'age': speaker_metadata['age'],'gender': speaker_metadata['gender'], 'accent': speaker_metadata['accent'], 'country':country } dump_json(email_metadata,EMAIL_SAVE_FILE) # Upload the metadata repo_json_path = os.path.join('emails',f"{email_metadata_name}.json") _ = upload_file(path_or_fileobj = EMAIL_SAVE_FILE, path_in_repo =repo_json_path, repo_id='chrisjay/african-digits-recording-sprint-email', repo_type='dataset', token=HF_TOKEN ) # Delete the email from local repo if os.path.exists(EMAIL_SAVE_FILE): os.remove(EMAIL_SAVE_FILE) #------------------- done_recording=True next_number = 0 # the default number next_number_image = f'number/best.gif' output = "You have finished all recording! You can reload to start again." output_string = "
"+output+"
" return output_string,next_number_image,number_history,next_number,done_recording,default_record if number is None: output = "Number must be specified!" if record is None: output="No recording found!" if language is None or language=='Choose language': output = 'Language must be specified!' output_string = "
"+output+"
" # return output_string, previous image and state return output_string, number,number_history,current_number,done_recording,default_record else: # Stop submitting recording (best.gif is displaying) output = '🙌 You have finished all recording! Thank You. You can reload to start again (maybe in another language).' output_string = "
"+output+"
" next_number = 0 # the default number next_number_image = f'number/best.gif' return output_string,next_number_image,number_history,next_number,done_recording,default_record def show_records(): repo.git_pull() REPOSITORY_DATA_DIR = os.path.join(REPOSITORY_DIR,'data') repo_recordings = [os.path.join(REPOSITORY_DATA_DIR,f.name) for f in os.scandir(REPOSITORY_DATA_DIR)] if os.path.isdir(REPOSITORY_DATA_DIR) else [] audio_repo = [os.path.join(f,'audio.wav') for f in repo_recordings] audio_repo = [a.replace('data/data/','https://huggingface.co/datasets/chrisjay/crowd-speech-africa/resolve/main/data/') for a in audio_repo] metadata_all = [read_json_lines(os.path.join(f,'metadata.jsonl'))[0] for f in repo_recordings] audios_all = audio_repo langs=[m['language_name'] for m in metadata_all] lang_dict = Counter(langs) lang_dict.update({'All others':0}) all_langs = list(lang_dict.keys()) langs_count = [lang_dict[k] for k in all_langs] y_pos = np.arange(len(all_langs)) plt.barh(all_langs, langs_count) plt.ylabel("Language") plt.xlabel('Number of audio samples') plt.title('Distribution of audio samples over languages') #audios = [a for a in audios_all] #texts = [m['text'] for m in metadata_all] #numbers = [m['number'] for m in metadata_all] html = f"""

Hooray! We have collected {len(metadata_all)} samples!

""" return html,plt def display_records(): repo.git_pull() REPOSITORY_DATA_DIR = os.path.join(REPOSITORY_DIR,'data') repo_recordings = [os.path.join(REPOSITORY_DATA_DIR,f.name) for f in os.scandir(REPOSITORY_DATA_DIR)] if os.path.isdir(REPOSITORY_DATA_DIR) else [] audio_repo = [os.path.join(f,'audio.wav') for f in repo_recordings] audio_repo = [a.replace('data/data/','https://huggingface.co/datasets/chrisjay/crowd-speech-africa/resolve/main/data/') for a in audio_repo] metadata_repo = [read_json_lines(os.path.join(f,'metadata.jsonl'))[0] for f in repo_recordings] audios_all = audio_repo metadata_all = metadata_repo langs=[m['language_name'] for m in metadata_all] audios = [a for a in audios_all] texts = [m['text'] for m in metadata_all] numbers = [m['number'] for m in metadata_all] html = f"""

Hooray! We have collected {len(metadata_all)} samples!

""" for lang, audio, text,num_ in zip(langs,audios,texts,numbers): html+= f"""""" html+="
language audio number text
{lang} {num_} {text}
" return html # NUMBERS = [{'image':os.path.join(NUMBER_DIR,f),'number':int(f.split('.')[0])} for f in number_files] markdown = """

Africa Crowdsource Speech


This is a platform to contribute to your African language by recording your voice
""" markdown=""" # 🌍 African Digits Recording Sprint > Record numbers 0-9 in your African language. The ten people who record the most from 19th May - 19th June will each receive a prize. 1. Fill in your email. This is completely optional. We need this to track your progress for the prize. __Note:__ You should record all numbers shown till the end. It does not count if you stop mid-way. 2. Choose your African language 3. Fill in the speaker metadata (age, gender, accent). This is optional but important to build better speech models. 4. You will see the image of a number __(this is the number you will record)__. 5. Fill in the word of that number (optional). You can leave this blank. 6. Click record and say the number in your African language. 7. Click ‘Submit’. It will save your record and go to the next number. 8. Repeat 4-7 9. Leave a ❤ in the Space, if you found it fun. """ # Interface design begins block = gr.Blocks(css=BLOCK_CSS) with block: gr.Markdown(markdown) email = gr.inputs.Textbox(placeholder='your email',label="Email (Your email is not made public. We need it to consider you for the prize.)",default='') with gr.Tabs(): with gr.TabItem('Record'): with gr.Row(): language = gr.inputs.Dropdown(choices = sorted([lang_.title() for lang_ in list(DEFAULT_LANGS.keys())]),label="Choose language",default="Choose language") age = gr.inputs.Textbox(placeholder='e.g. 21',label="Your age (optional)",default='') gender = gr.inputs.Dropdown(choices=GENDER, type="value", default=None, label="Gender (optional)") accent = gr.inputs.Textbox(label="Accent (optional)",default='') country = gr.Dropdown(choices=[''] + sorted(DEFAULT_LIST_OF_COUNTRIES),type='value',default=None,label="Country you are recording from (optional)") number = gr.Image('number/0.jpg',image_mode="L") text = gr.inputs.Textbox(placeholder='e.g. `one` is `otu` in Igbo or `ọkan` in Yoruba',label="How is the number called in your language (optional)") record = gr.Audio(source="microphone",label='Record your voice') output_result = gr.outputs.HTML() state = gr.Variable() current_number = gr.Variable() done_recording = gr.Variable() # Signifies when to stop submitting records even if `submit`` is clicked save = gr.Button("Submit") save.click(save_record, inputs=[language,text,record,number,age,gender,accent,state,current_number,country,email,done_recording],outputs=[output_result,number,state,current_number,done_recording,record]) with gr.TabItem('Dashboard') as listen_tab: gr.Markdown("Listen to the recordings contributed. You can find them here.") display_html = gr.HTML("""

⌛ Please wait. Loading dashboard...

""") plot = gr.Plot(type="matplotlib") #listen = gr.Button("Listen") listen_tab.select(show_records,inputs=[],outputs=[display_html,plot]) gr.Markdown(ARTICLE) block.launch()