from ctypes.wintypes import LANGID
from email.policy import default
import pycountry
import os
import csv
import random
import pandas as pd
import numpy as np
import gradio as gr
from collections import Counter
from article import ARTICLE
from utils import *
import matplotlib.pyplot as plt
import scipy.io.wavfile as wavf
from huggingface_hub import Repository, upload_file
HF_TOKEN = os.environ.get("HF_TOKEN")
NUMBER_DIR = './number'
number_files = [f.name for f in os.scandir(NUMBER_DIR)]
DEFAULT_LIST_OF_COUNTRIES = [country.name for country in pycountry.countries]
DATASET_REPO_URL = "https://huggingface.co/datasets/chrisjay/crowd-speech-africa"
EMAILS_REPO_URL="https://huggingface.co/datasets/chrisjay/african-digits-recording-sprint-email"
REPOSITORY_DIR = "data"
LOCAL_DIR = 'data_local'
os.makedirs(LOCAL_DIR,exist_ok=True)
#DEFAULT_LANGS = {'Igbo':'ibo','Yoruba':'yor','Hausa':'hau'}
GENDER = ['Choose Gender','Male','Female','Other','Prefer not to say']
#------------------Work on Languages--------------------
DEFAULT_LANGS = {}
languages = read_json_lines('clean_languages.json')
languages_lower=[l for l in languages]
_ = [DEFAULT_LANGS.update({l['full'].lower():l['id'].lower()}) for l in languages_lower]
#_ = [DEFAULT_LANGS.update({l_other.lower():[l['id'].lower()]}) for l in languages_lower for l_other in l['others'] if l_other.lower()!=l['full'].lower()]
#------------------Work on Languages--------------------
repo = Repository(
local_dir="data", clone_from=DATASET_REPO_URL, use_auth_token=HF_TOKEN
)
repo.git_pull()
with open('app.css','r') as f:
BLOCK_CSS = f.read()
def save_record(language,text,record,number,age,gender,accent,number_history,current_number,country,email,done_recording):
# set default
number_history = number_history if number_history is not None else [0]
current_number = current_number if current_number is not None else 0
done_recording = done_recording if done_recording is not None else False
#----
# Save text and its corresponding record to flag
speaker_metadata={}
speaker_metadata['gender'] = gender if gender!=GENDER[0] else ''
speaker_metadata['age'] = age if age !='' else ''
speaker_metadata['accent'] = accent if accent!='' else ''
default_record = None
if not done_recording:
if language!=None and language!='Choose language' and record is not None and number is not None:
language = language.lower()
lang_id = DEFAULT_LANGS[language]
text =text.strip()
# Write audio to file
audio_name = get_unique_name()
SAVE_FILE_DIR = os.path.join(LOCAL_DIR,audio_name)
os.makedirs(SAVE_FILE_DIR,exist_ok=True)
audio_output_filename = os.path.join(SAVE_FILE_DIR,'audio.wav')
wavf.write(audio_output_filename,record[0],record[1])
# Write metadata.json to file
json_file_path = os.path.join(SAVE_FILE_DIR,'metadata.jsonl')
metadata= {'id':audio_name,'file_name':'audio.wav',
'language_name':language,'language_id':lang_id,
'number':current_number, 'text':text,'frequency':record[0],
'age': speaker_metadata['age'],'gender': speaker_metadata['gender'],
'accent': speaker_metadata['accent'],
'country':country
}
dump_json(metadata,json_file_path)
# Simply upload the audio file and metadata using the hub's upload_file
# Upload the audio
repo_audio_path = os.path.join(REPOSITORY_DIR,os.path.join(audio_name,'audio.wav'))
_ = upload_file(path_or_fileobj = audio_output_filename,
path_in_repo =repo_audio_path,
repo_id='chrisjay/crowd-speech-africa',
repo_type='dataset',
token=HF_TOKEN
)
# Upload the metadata
repo_json_path = os.path.join(REPOSITORY_DIR,os.path.join(audio_name,'metadata.jsonl'))
_ = upload_file(path_or_fileobj = json_file_path,
path_in_repo =repo_json_path,
repo_id='chrisjay/crowd-speech-africa',
repo_type='dataset',
token=HF_TOKEN
)
output = f'Recording successfully saved! On to the next one...'
# Choose the next number
number_history.append(current_number)
number_choices = [num for num in [i for i in range(10)] if num not in number_history]
if number_choices!=[]:
next_number = random.choice(number_choices)
next_number_image = f'number/{next_number}.jpg'
else:
email_metadata_name = get_unique_name()
EMAIL_SAVE_FILE = os.path.join(LOCAL_DIR,f"{email_metadata_name}.json")
# Write metadata.json to file
email_metadata = {'id':email_metadata_name,'email':email,
'language_name':language,'language_id':lang_id,
'age': speaker_metadata['age'],'gender': speaker_metadata['gender'],
'accent': speaker_metadata['accent'],
'country':country
}
dump_json(email_metadata,EMAIL_SAVE_FILE)
# Upload the metadata
repo_json_path = os.path.join('emails',f"{email_metadata_name}.json")
_ = upload_file(path_or_fileobj = EMAIL_SAVE_FILE,
path_in_repo =repo_json_path,
repo_id='chrisjay/african-digits-recording-sprint-email',
repo_type='dataset',
token=HF_TOKEN
)
# Delete the email from local repo
if os.path.exists(EMAIL_SAVE_FILE):
os.remove(EMAIL_SAVE_FILE)
#-------------------
done_recording=True
next_number = 0 # the default number
next_number_image = f'number/best.gif'
output = "You have finished all recording! You can reload to start again."
output_string = "
"+output+"
"
return output_string,next_number_image,number_history,next_number,done_recording,default_record
if number is None:
output = "Number must be specified!"
if record is None:
output="No recording found!"
if language is None or language=='Choose language':
output = 'Language must be specified!'
output_string = "
"+output+"
"
# return output_string, previous image and state
return output_string, number,number_history,current_number,done_recording,default_record
else:
# Stop submitting recording (best.gif is displaying)
output = '🙌 You have finished all recording! Thank You. You can reload to start again (maybe in another language).'
output_string = "
"+output+"
"
next_number = 0 # the default number
next_number_image = f'number/best.gif'
return output_string,next_number_image,number_history,next_number,done_recording,default_record
def show_records():
repo.git_pull()
REPOSITORY_DATA_DIR = os.path.join(REPOSITORY_DIR,'data')
repo_recordings = [os.path.join(REPOSITORY_DATA_DIR,f.name) for f in os.scandir(REPOSITORY_DATA_DIR)] if os.path.isdir(REPOSITORY_DATA_DIR) else []
audio_repo = [os.path.join(f,'audio.wav') for f in repo_recordings]
audio_repo = [a.replace('data/data/','https://huggingface.co/datasets/chrisjay/crowd-speech-africa/resolve/main/data/') for a in audio_repo]
metadata_all = [read_json_lines(os.path.join(f,'metadata.jsonl'))[0] for f in repo_recordings]
audios_all = audio_repo
langs=[m['language_name'] for m in metadata_all]
lang_dict = Counter(langs)
lang_dict.update({'All others':0})
all_langs = list(lang_dict.keys())
langs_count = [lang_dict[k] for k in all_langs]
y_pos = np.arange(len(all_langs))
plt.barh(all_langs, langs_count)
plt.ylabel("Language")
plt.xlabel('Number of audio samples')
plt.title('Distribution of audio samples over languages')
#audios = [a for a in audios_all]
#texts = [m['text'] for m in metadata_all]
#numbers = [m['number'] for m in metadata_all]
html = f"""
Hooray! We have collected {len(metadata_all)} samples!
"""
return html,plt
def display_records():
repo.git_pull()
REPOSITORY_DATA_DIR = os.path.join(REPOSITORY_DIR,'data')
repo_recordings = [os.path.join(REPOSITORY_DATA_DIR,f.name) for f in os.scandir(REPOSITORY_DATA_DIR)] if os.path.isdir(REPOSITORY_DATA_DIR) else []
audio_repo = [os.path.join(f,'audio.wav') for f in repo_recordings]
audio_repo = [a.replace('data/data/','https://huggingface.co/datasets/chrisjay/crowd-speech-africa/resolve/main/data/') for a in audio_repo]
metadata_repo = [read_json_lines(os.path.join(f,'metadata.jsonl'))[0] for f in repo_recordings]
audios_all = audio_repo
metadata_all = metadata_repo
langs=[m['language_name'] for m in metadata_all]
audios = [a for a in audios_all]
texts = [m['text'] for m in metadata_all]
numbers = [m['number'] for m in metadata_all]
html = f"""
Hooray! We have collected {len(metadata_all)} samples!
language
audio
number
text
"""
for lang, audio, text,num_ in zip(langs,audios,texts,numbers):
html+= f"""
{lang}
{num_}
{text}
"""
html+="
"
return html
# NUMBERS = [{'image':os.path.join(NUMBER_DIR,f),'number':int(f.split('.')[0])} for f in number_files]
markdown = """
Africa Crowdsource Speech
This is a platform to contribute to your African language by recording your voice
"""
markdown="""
# 🌍 African Digits Recording Sprint
> Record numbers 0-9 in your African language. The ten people who record the most from 19th May - 19th June will each receive a prize.
1. Fill in your email. This is completely optional. We need this to track your progress for the prize.
__Note:__ You should record all numbers shown till the end. It does not count if you stop mid-way.
2. Choose your African language
3. Fill in the speaker metadata (age, gender, accent). This is optional but important to build better speech models.
4. You will see the image of a number __(this is the number you will record)__.
5. Fill in the word of that number (optional). You can leave this blank.
6. Click record and say the number in your African language.
7. Click ‘Submit’. It will save your record and go to the next number.
8. Repeat 4-7
9. Leave a ❤ in the Space, if you found it fun.
"""
# Interface design begins
block = gr.Blocks(css=BLOCK_CSS)
with block:
gr.Markdown(markdown)
email = gr.inputs.Textbox(placeholder='your email',label="Email (Your email is not made public. We need it to consider you for the prize.)",default='')
with gr.Tabs():
with gr.TabItem('Record'):
with gr.Row():
language = gr.inputs.Dropdown(choices = sorted([lang_.title() for lang_ in list(DEFAULT_LANGS.keys())]),label="Choose language",default="Choose language")
age = gr.inputs.Textbox(placeholder='e.g. 21',label="Your age (optional)",default='')
gender = gr.inputs.Dropdown(choices=GENDER, type="value", default=None, label="Gender (optional)")
accent = gr.inputs.Textbox(label="Accent (optional)",default='')
country = gr.Dropdown(choices=[''] + sorted(DEFAULT_LIST_OF_COUNTRIES),type='value',default=None,label="Country you are recording from (optional)")
number = gr.Image('number/0.jpg',image_mode="L")
text = gr.inputs.Textbox(placeholder='e.g. `one` is `otu` in Igbo or `ọkan` in Yoruba',label="How is the number called in your language (optional)")
record = gr.Audio(source="microphone",label='Record your voice')
output_result = gr.outputs.HTML()
state = gr.Variable()
current_number = gr.Variable()
done_recording = gr.Variable() # Signifies when to stop submitting records even if `submit`` is clicked
save = gr.Button("Submit")
save.click(save_record, inputs=[language,text,record,number,age,gender,accent,state,current_number,country,email,done_recording],outputs=[output_result,number,state,current_number,done_recording,record])
with gr.TabItem('Dashboard') as listen_tab:
gr.Markdown("Listen to the recordings contributed. You can find them here.")
display_html = gr.HTML("""