Spaces:
Sleeping
Sleeping
from transformers import pipeline, AutoTokenizer, ElectraForPreTraining | |
import pandas as pd | |
import numpy as np | |
import torch | |
import streamlit as st | |
from annotated_text import annotated_text | |
USE_GPU = True | |
if USE_GPU and torch.cuda.is_available(): | |
device = torch.device("cuda:0") | |
else: | |
device = torch.device('cpu') | |
MODEL_NAME_CHINESE = "IDEA-CCNL/Erlangshen-DeBERTa-v2-710M-Chinese" | |
RTD_MODEL_NAME_CHINESE = "hfl/chinese-electra-180g-large-discriminator" | |
WORD_PROBABILITY_THRESHOLD = 0.05 | |
TOP_K_WORDS = 10 | |
def get_model_chinese(): | |
return pipeline("fill-mask", MODEL_NAME_CHINESE, device = device) | |
def get_rtd_tokenizer_chinese(): | |
return AutoTokenizer.from_pretrained(RTD_MODEL_NAME_CHINESE) | |
def get_rtd_model_chinese(): | |
return ElectraForPreTraining.from_pretrained(RTD_MODEL_NAME_CHINESE) | |
def get_wordlist_chinese(): | |
df = pd.read_csv('wordlist_chinese_v2.csv') | |
wordlist = df[df.assess == True] | |
return wordlist['Chinese'].tolist() | |
def get_allowed_words(): | |
df = pd.read_csv('allowed_words.csv') | |
return set(list(df['word'])) | |
def assess_chinese(word, sentence): | |
print("Assessing Chinese") | |
number_of_chars = len(word) | |
assert number_of_chars == 2 | |
allowed_words = get_allowed_words() | |
if sentence.lower().find(word.lower()) == -1: | |
print('Sentence does not contain the word!') | |
return | |
text = sentence.replace(word.lower(), "[MASK]"*number_of_chars) | |
top_k_prediction = [] | |
candidates = mask_filler_chinese(text, top_k=TOP_K_WORDS)[0] | |
for candidate in candidates: | |
temp_text = text.replace("[MASK]", candidate['token_str'], 1) | |
second_predictions = mask_filler_chinese(temp_text, top_k=5) | |
for prediction in second_predictions: | |
prediction['token_str'] = candidate['token_str'] + prediction['token_str'] | |
prediction['score'] = candidate['score'] * prediction['score'] | |
top_k_prediction.extend(second_predictions) | |
top_k_prediction = sorted(top_k_prediction, key = lambda x: x['score'], reverse = True)[:(TOP_K_WORDS*5)] | |
norm_factor = 0 | |
for output in top_k_prediction: | |
if output['token_str'] not in allowed_words: | |
norm_factor += output['score'] | |
top_k_prediction_new = [] | |
for output in top_k_prediction: | |
if output['token_str'] in allowed_words: | |
output['score'] = output['score']/(1-min(0.5,norm_factor)) | |
top_k_prediction_new.append(output) | |
print (f"NORM_FACTOR: {norm_factor}") | |
# Get target word prediction | |
temp_text = text | |
output1 = mask_filler_chinese(text, targets=word[0])[0][0] | |
temp_text = text.replace("[MASK]", word[0], 1) | |
output2 = mask_filler_chinese(temp_text, targets = word[1])[0] | |
output2['token_str'] = output1['token_str'] + output2['token_str'] | |
output2['score'] = output1['score'] * output2['score'] | |
target_word_prediction = output2 | |
target_word_prediction['score'] = target_word_prediction['score'] / (1-min(0.5,norm_factor)) | |
score = target_word_prediction['score'] | |
# append the original word if its not found in the results | |
top_k_prediction_filtered = [output for output in top_k_prediction_new if \ | |
output['token_str'] == word] | |
if len(top_k_prediction_filtered) == 0: | |
top_k_prediction_new.extend([target_word_prediction]) | |
return top_k_prediction_new, score | |
def assess_sentence(word, sentence): | |
return assess_chinese(word, sentence) | |
def get_annotated_sentence(sentence, errors): | |
if len(errors) == 0: | |
return sentence | |
output = ["Input sentence: "] | |
wrong_char_indices = [e[0].item() for e in errors] | |
curr_ind = 0 | |
for i in range(len(wrong_char_indices)): | |
output.append(sentence[curr_ind:wrong_char_indices[i]]) | |
output.append((sentence[wrong_char_indices[i]], "", "#F8C8DC")) | |
# output.append((sentence[wrong_char_indices[i]], " ", "#ff4b4b")) | |
curr_ind = wrong_char_indices[i] + 1 | |
output.append(sentence[curr_ind:]) | |
print(output) | |
return output | |
def get_word_errors(word, sentence): | |
tokens = rtd_tokenizer_chinese(sentence, return_tensors = 'pt', return_offsets_mapping = True) | |
scores = rtd_model_chinese(**rtd_tokenizer_chinese(sentence, return_tensors = 'pt'))[0][0] | |
errors = [] | |
for i in range(len(scores)): | |
if scores[i] > 0: | |
errors.append(tokens['offset_mapping'][0][i]) | |
print(errors) | |
return errors | |
def get_chinese_word(): | |
possible_words = get_wordlist_chinese() | |
word = np.random.choice(possible_words) | |
return word | |
def get_word(): | |
return get_chinese_word() | |
mask_filler_chinese = get_model_chinese() | |
#wordlist_chinese = get_wordlist_chinese() | |
rtd_tokenizer_chinese = get_rtd_tokenizer_chinese() | |
rtd_model_chinese = get_rtd_model_chinese() | |
def highlight_given_word(row): | |
color = '#ACE5EE' if row.Words == target_word else 'white' | |
return [f'background-color:{color}'] * len(row) | |
def get_top_5_results(top_k_prediction): | |
predictions_df = pd.DataFrame(top_k_prediction) | |
predictions_df = predictions_df.drop(columns=["token", "sequence"]) | |
predictions_df = predictions_df.rename(columns={"score": "Probability", "token_str": "Words"}) | |
if (predictions_df[:5].Words == target_word).sum() == 0: | |
print("target word not in top 5") | |
top_5_df = predictions_df[:5] | |
target_word_df = predictions_df[(predictions_df.Words == target_word)] | |
print(target_word_df) | |
top_5_df = pd.concat([top_5_df, target_word_df]) | |
else: | |
top_5_df = predictions_df[:5] | |
top_5_df['Probability'] = top_5_df['Probability'].apply(lambda x: f"{x:.2%}") | |
return top_5_df | |
#### Streamlit Page | |
st.title("้ ๅฅ Self-marking Demo") | |
if 'target_word' not in st.session_state: | |
st.session_state['target_word'] = get_word() | |
target_word = st.session_state['target_word'] | |
target_word_ind = get_wordlist_chinese().index(target_word) | |
#st.write("Target word: ", target_word) | |
target_word = st.selectbox("Choose a word:", get_wordlist_chinese(), index = target_word_ind) | |
if st.button("Get random word"): | |
st.session_state['target_word'] = get_word() | |
st.experimental_rerun() | |
st.subheader("Form your sentence and input below!") | |
sentence = st.text_input('Enter your sentence here', placeholder="Enter your sentence here!") | |
if st.button("Grade"): | |
if sentence.find(target_word) == -1: | |
st.error("Error: Sentence must include the target word!") | |
top_k_prediction, score = assess_sentence(target_word, sentence) | |
with open('./result01.json', 'w') as outfile: | |
outfile.write(str(top_k_prediction)) | |
errors = get_word_errors(target_word, sentence) | |
annotated_sentence = get_annotated_sentence(sentence, errors) | |
annotated_text(annotated_sentence) | |
st.write(f"Probability score: {score:.1%}. (Target: {WORD_PROBABILITY_THRESHOLD:.1%})") | |
# st.write(f"Target probability: {WORD_PROBABILITY_THRESHOLD:.1%}") | |
predictions_df = get_top_5_results(top_k_prediction) | |
df_style = predictions_df.style.apply(highlight_given_word, axis=1) | |
if (score >= WORD_PROBABILITY_THRESHOLD): | |
# st.balloons() | |
if (len(errors) == 0): | |
st.success("Yay good job! ๐บ Practice again with other words", icon="โ ") | |
else: | |
st.warning("Potential word errors detected. Try again?") | |
else: | |
st.warning("Probability score too low. Maybe try again?") | |
st.table(df_style) | |