File size: 4,988 Bytes
d83a8df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

import torch
from transformers import (
    AutoTokenizer,
    AutoModelForSeq2SeqLM,
    AutoModelForTokenClassification,
    pipeline)
from transformers import Pipeline
import re

model = AutoModelForSeq2SeqLM.from_pretrained("emirhangazi77/Turkish-T5")
tokenizer = AutoTokenizer.from_pretrained("emirhangazi77/Turkish-T5")
ner_model = AutoModelForTokenClassification.from_pretrained("akdeniz27/bert-base-turkish-cased-ner") # pretrained ner model
ner_tokenizer = AutoTokenizer.from_pretrained("akdeniz27/bert-base-turkish-cased-ner") # pretrained ner tokenizer
ner = pipeline('ner', model=ner_model, tokenizer=ner_tokenizer, aggregation_strategy="first") #



device = torch.device('cpu')

class Diacritic_Pipe(Pipeline):

    def __init__(self,ner,model,tokenizer):
        super().__init__(model = model, tokenizer = tokenizer)
        self.ner_pipe = ner

    def generate_result(self,text):
        prefix = "Correct diacritics for : "
        postfix = " </s>"
        text = prefix + text + postfix

        self.tokenizer.truncation_side = "left"
        batch = self.tokenizer(text, return_tensors='pt', max_length = 64, truncation = False).to(device)
        result = self.model.generate(**batch, max_new_tokens = 128)
        result = self.tokenizer.batch_decode(result)

        return str(result[0])
    def ner_predict_mapping(self,text, threshold=0.3):
        result = self.ner_pipe(text)
        if len(result) == 0:
            return []
        else:
            special_words = [result["word"] for result in result if result["score"] > threshold]
            special_words_ = []
            for word_ in special_words:
                if word_.lower()[0] == "i":
                    word_ = word_.replace("I","İ")
                if len(word_.split()) > 1:
                    special_words_.extend(word_.split())
                else:
                    special_words_.append(word_)

            return special_words_

    def split_text_into_n_worded_chunks(self,text, n):
        words = text.split()
        chunks = []
        for i in range(0, len(words), n):
            chunks.append(' '.join(words[i:i+n]))
        last_chunk_words = len(words) % n
        if last_chunk_words != 0:
            chunks[-1] = ' '.join(words[-last_chunk_words:])
        return chunks

    def chunk_2(self,text):
        chunks = self.split_text_into_n_worded_chunks(text, 2)
        processed_chunks = [re.sub(r'(["q(°\[\]{}&´])\s+', r'\1',self.generate_result(chunk))  for chunk in chunks]
        result = ' '.join(processed_chunks)
        return result.replace("<pad>","").replace("</s>","").replace("  "," ")

    def chunk_1(self,text):
        chunks = self.split_text_into_n_worded_chunks(text, 1)
        processed_chunks = [self.generate_result(chunk).replace(" ","")  for chunk in chunks]
        result = ''.join(processed_chunks)
        return result.replace("<pad>"," ").replace("</s>","")

    def process_text(self,text):
        words = self.ner_predict_mapping(text)
        two_chunk = self.chunk_2(text)
        one_chunk = self.chunk_1(text)
        if len(one_chunk.split()) != len(two_chunk.split()):
            for word in words:
                one_chunk = one_chunk.replace(word.lower().replace('i̇',"i"),word)
            return one_chunk
        else:
            for word in words:
                two_chunk = two_chunk.replace(word.lower().replace('i̇',"i"),word)
            return two_chunk

    def _sanitize_parameters(self, **kwargs):
        preprocess_kwargs = {}
        if "maybe_arg" in kwargs:
            preprocess_kwargs["maybe_arg"] = kwargs["maybe_arg"]
        return preprocess_kwargs, {}, {}

    def preprocess(self, inputs, maybe_arg=2):
        return {"model_input": inputs}

    def _forward(self, model_inputs):
        #model_inputs == {"model_input": model_input}
        outputs = self.process_text(model_inputs["model_input"])
        # Maybe {"logits": Tensor(...)}
        return outputs

    def postprocess(self, model_outputs):
        return model_outputs

import gradio as gr

diacritics = Diacritic_Pipe(ner = ner , model = model , tokenizer = tokenizer)

def fn(query):
    response = diacritics(query)
    return str(response)

def my_chatbot(input, history):
    history = history or []
    my_history = list(sum(history, ()))
    my_history.append(input)
    my_input = ' '.join(my_history)
    output = fn(input)
    history.append((input, output))
    return history, history

import gradio as gr
import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

with gr.Blocks() as demo:
    gr.Markdown("""<h1><center>Diacritics on Turkish</center></h1>""")
    chatbot = gr.Chatbot()
    state = gr.State()
    txt = gr.Textbox(show_label=False, placeholder="Ask me a question and press enter.")
    txt.submit(my_chatbot, inputs=[txt, state], outputs=[chatbot, state])

demo.launch(share=True)