File size: 7,593 Bytes
3ec9549
2609434
 
 
 
3ec9549
2609434
 
 
 
 
 
 
 
3ec9549
 
e4cf87a
3ec9549
 
f6cb372
 
 
 
 
0a6e4e2
3ec9549
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0a6e4e2
f6cb372
15553b2
3ec9549
 
 
 
f6cb372
 
 
 
3ec9549
f6cb372
3ec9549
 
 
 
 
 
 
 
 
 
 
f6cb372
0a6e4e2
 
3ec9549
0a6e4e2
 
 
 
3ec9549
0a6e4e2
 
3ec9549
 
 
 
 
 
 
 
 
 
 
 
 
f6cb372
 
0a6e4e2
f6cb372
 
3ec9549
f6cb372
0a6e4e2
f6cb372
15553b2
 
f6cb372
3ec9549
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f6cb372
3ec9549
15553b2
f6cb372
 
15553b2
 
f6cb372
 
3a52889
3ec9549
 
f6cb372
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3ec9549
f6cb372
 
15553b2
f6cb372
3ec9549
 
 
 
f6cb372
3ec9549
15553b2
f6cb372
 
 
 
 
 
aaab29f
 
15553b2
f6cb372
 
 
3ec9549
 
 
 
 
 
 
f6cb372
 
 
 
dd36097
3ec9549
 
 
 
f6cb372
3ec9549
 
f6cb372
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
from transformers import pipeline, AutoTokenizer, ElectraForPreTraining
import pandas as pd
import numpy as np
import torch
import streamlit as st
from annotated_text import annotated_text

USE_GPU = True

if USE_GPU and torch.cuda.is_available():
    device = torch.device("cuda:0")
else: 
    device = torch.device('cpu')

MODEL_NAME_CHINESE = "IDEA-CCNL/Erlangshen-DeBERTa-v2-710M-Chinese"
RTD_MODEL_NAME_CHINESE = "hfl/chinese-electra-180g-large-discriminator"

WORD_PROBABILITY_THRESHOLD = 0.05
TOP_K_WORDS = 10

@st.cache_resource
def get_model_chinese():
    return pipeline("fill-mask", MODEL_NAME_CHINESE, device = device)

@st.cache_resource
def get_rtd_tokenizer_chinese():
    return AutoTokenizer.from_pretrained(RTD_MODEL_NAME_CHINESE)

@st.cache_resource
def get_rtd_model_chinese():
    return ElectraForPreTraining.from_pretrained(RTD_MODEL_NAME_CHINESE)

@st.cache_resource
def get_wordlist_chinese():
    df = pd.read_csv('wordlist_chinese_v2.csv')
    wordlist = df[df.assess == True]
    return wordlist['Chinese'].tolist()

@st.cache_resource
def get_allowed_words():
    df = pd.read_csv('allowed_words.csv')
    return set(list(df['word']))

def assess_chinese(word, sentence):
    print("Assessing Chinese")
    number_of_chars = len(word)
    assert number_of_chars == 2

    allowed_words = get_allowed_words()    
    if sentence.lower().find(word.lower()) == -1:
        print('Sentence does not contain the word!')
        return

    text = sentence.replace(word.lower(), "[MASK]"*number_of_chars)

    top_k_prediction = []
    candidates = mask_filler_chinese(text, top_k=TOP_K_WORDS)[0]
    for candidate in candidates: 
        temp_text = text.replace("[MASK]", candidate['token_str'], 1)    
        second_predictions = mask_filler_chinese(temp_text, top_k=5)
        for prediction in second_predictions:
            prediction['token_str'] = candidate['token_str'] + prediction['token_str']
            prediction['score'] = candidate['score'] * prediction['score']
            
        top_k_prediction.extend(second_predictions)
    top_k_prediction = sorted(top_k_prediction, key = lambda x: x['score'], reverse = True)[:(TOP_K_WORDS*5)]

    norm_factor = 0
    for output in top_k_prediction: 
        if output['token_str'] not in allowed_words:
            norm_factor += output['score']

    top_k_prediction_new = []
    for output in top_k_prediction:
        if output['token_str'] in allowed_words:        
            output['score'] = output['score']/(1-min(0.5,norm_factor))
            top_k_prediction_new.append(output)
    print (f"NORM_FACTOR: {norm_factor}")
    
    # Get target word prediction
    temp_text = text
    output1 = mask_filler_chinese(text, targets=word[0])[0][0]
    temp_text = text.replace("[MASK]", word[0], 1)
    output2 = mask_filler_chinese(temp_text, targets = word[1])[0]
    output2['token_str'] = output1['token_str'] + output2['token_str']
    output2['score'] = output1['score'] * output2['score']
    target_word_prediction = output2

    target_word_prediction['score'] = target_word_prediction['score'] / (1-min(0.5,norm_factor))
    score = target_word_prediction['score']

    # append the original word if its not found in the results
    top_k_prediction_filtered = [output for output in top_k_prediction_new if \
                                 output['token_str'] == word]
    if len(top_k_prediction_filtered) == 0:
        top_k_prediction_new.extend([target_word_prediction])

    return top_k_prediction_new, score

def assess_sentence(word, sentence):
    return assess_chinese(word, sentence)
    
def get_annotated_sentence(sentence, errors):
    if len(errors) == 0:
        return sentence

    output = ["Input sentence: "]

    wrong_char_indices = [e[0].item() for e in errors]
    curr_ind = 0
    for i in range(len(wrong_char_indices)):
        output.append(sentence[curr_ind:wrong_char_indices[i]])
        output.append((sentence[wrong_char_indices[i]], "", "#F8C8DC"))
#        output.append((sentence[wrong_char_indices[i]], " ", "#ff4b4b"))
        curr_ind = wrong_char_indices[i] + 1
    output.append(sentence[curr_ind:])
    print(output)

    return output

def get_word_errors(word, sentence):
    tokens = rtd_tokenizer_chinese(sentence, return_tensors = 'pt', return_offsets_mapping = True)
    scores = rtd_model_chinese(**rtd_tokenizer_chinese(sentence, return_tensors = 'pt'))[0][0]

    errors = []
    for i in range(len(scores)):
        if scores[i] > 0:
            errors.append(tokens['offset_mapping'][0][i])
    
    print(errors)
    return errors


def get_chinese_word():
    possible_words = get_wordlist_chinese()
    word = np.random.choice(possible_words)
    return word

def get_word():
    return get_chinese_word()

mask_filler_chinese = get_model_chinese()
#wordlist_chinese = get_wordlist_chinese()
rtd_tokenizer_chinese = get_rtd_tokenizer_chinese()
rtd_model_chinese = get_rtd_model_chinese()

def highlight_given_word(row):
    color = '#ACE5EE' if row.Words == target_word else 'white'
    return [f'background-color:{color}'] * len(row)

def get_top_5_results(top_k_prediction):
    predictions_df = pd.DataFrame(top_k_prediction)
    predictions_df = predictions_df.drop(columns=["token", "sequence"])
    predictions_df = predictions_df.rename(columns={"score": "Probability", "token_str": "Words"})

    if (predictions_df[:5].Words == target_word).sum() == 0:
        print("target word not in top 5")
        top_5_df = predictions_df[:5]
        target_word_df = predictions_df[(predictions_df.Words == target_word)]
        print(target_word_df)
        top_5_df = pd.concat([top_5_df, target_word_df])

    else:
        top_5_df = predictions_df[:5]
    top_5_df['Probability'] = top_5_df['Probability'].apply(lambda x: f"{x:.2%}")

    return top_5_df

#### Streamlit Page
st.title("造句 Self-marking Demo")

if 'target_word' not in st.session_state:
    st.session_state['target_word'] = get_word()
target_word = st.session_state['target_word']
target_word_ind = get_wordlist_chinese().index(target_word)

#st.write("Target word: ", target_word)
target_word = st.selectbox("Choose a word:", get_wordlist_chinese(), index = target_word_ind)

if st.button("Get random word"):
    st.session_state['target_word'] = get_word()
    st.experimental_rerun()

st.subheader("Form your sentence and input below!")
sentence = st.text_input('Enter your sentence here', placeholder="Enter your sentence here!")

if st.button("Grade"):
    if sentence.find(target_word) == -1:
        st.error("Error: Sentence must include the target word!")
    top_k_prediction, score = assess_sentence(target_word, sentence)
    with open('./result01.json', 'w') as outfile:
        outfile.write(str(top_k_prediction))

    errors = get_word_errors(target_word, sentence)
    annotated_sentence = get_annotated_sentence(sentence, errors)

    annotated_text(annotated_sentence)

    st.write(f"Probability score: {score:.1%}. (Target: {WORD_PROBABILITY_THRESHOLD:.1%})")
 #   st.write(f"Target probability: {WORD_PROBABILITY_THRESHOLD:.1%}")
    predictions_df = get_top_5_results(top_k_prediction)
    df_style = predictions_df.style.apply(highlight_given_word, axis=1)

    if (score >= WORD_PROBABILITY_THRESHOLD):
#        st.balloons()
        if (len(errors) == 0):
            st.success("Yay good job! 🕺 Practice again with other words", icon="✅")
        else: 
            st.warning("Potential word errors detected. Try again?")
    else:
        st.warning("Probability score too low. Maybe try again?")
    st.table(df_style)