Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
import json | |
import pandas as pd | |
import numpy as np | |
from transformers import AutoTokenizer, RobertaForTokenClassification | |
from transformers import AutoTokenizer, AutoModelForTokenClassification | |
from transformers import AutoModelForSequenceClassification, AutoTokenizer | |
from json import JSONEncoder | |
from faker import Faker | |
from keras.utils import pad_sequences | |
class out_json(): | |
def __init__(self, w,l): | |
self.word = w | |
self.label = l | |
class MyEncoder(JSONEncoder): | |
def default(self, o): | |
return o.__dict__ | |
class Model: | |
def __init__(self): | |
self.texto="" | |
self.idioma="" | |
self.modelo_ner="" | |
self.categoria_texto="" | |
## | |
### Función que aplica el modelo e identifica su idioma | |
### | |
def identificacion_idioma(self,text): | |
self.texto=text | |
tokenizer = AutoTokenizer.from_pretrained("papluca/xlm-roberta-base-language-detection") | |
model = AutoModelForSequenceClassification.from_pretrained("papluca/xlm-roberta-base-language-detection") | |
inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt") | |
with torch.no_grad(): | |
logits = model(**inputs).logits | |
preds = torch.softmax(logits, dim=-1) | |
id2lang = model.config.id2label | |
vals, idxs = torch.max(preds, dim=1) | |
#retorna el idioma con mayor porcentaje | |
maximo=vals.max() | |
idioma='' | |
porcentaje=0 | |
for k, v in zip(idxs, vals): | |
if v.item()==maximo: | |
idioma,porcentaje=id2lang[k.item()],v.item() | |
if idioma=='es': | |
self.idioma="es" | |
self.modelo_ner='BSC-LT/roberta_model_for_anonimization' | |
self.faker_ = Faker('es_MX') | |
self.model = RobertaForTokenClassification.from_pretrained(self.modelo_ner) | |
else: | |
self.idioma="en" | |
self.faker_ = Faker('en_US') | |
self.modelo_ner="FacebookAI/xlm-roberta-large-finetuned-conll03-english" | |
self.model = AutoModelForTokenClassification.from_pretrained(self.modelo_ner) | |
self.categorizar_texto(self.texto) | |
def reordenacion_tokens(self,tokens,caracter): | |
i=0 | |
new_tokens=[] | |
ig_tokens=[] | |
for token in tokens: | |
print('token_texto:',token,caracter) | |
ind=len(new_tokens) | |
if i<len(tokens): | |
if token.startswith(caracter): | |
new_tokens.append(token) | |
i=i+1 | |
else: | |
new_tokens[ind-1] = (new_tokens[ind-1] + token) | |
ig_tokens.append(i) | |
i=i+1 | |
return ( | |
new_tokens, | |
ig_tokens | |
) | |
def reordenacion_identificadores(self,ig_tokens,predicted_tokens_classes): | |
x=0 | |
new_identificadores=[] | |
for token in predicted_tokens_classes: | |
if x not in ig_tokens: | |
new_identificadores.append(token) | |
x=x+1 | |
else: | |
x=x+1 | |
return new_identificadores | |
def salida_json(self,tokens,pre_tokens): | |
list=[] | |
i=0 | |
for t in tokens: | |
if pre_tokens[i]!='O': | |
a = out_json(t.replace('▁','').replace('Ġ','').replace('Ċ',''),pre_tokens[i].replace('▁','')) | |
list.append(a) | |
i=i+1 | |
return MyEncoder().encode(list) | |
def salida_texto( self,tokens,pre_tokens): | |
new_labels = [] | |
current_word = None | |
i=0 | |
for token in tokens: | |
if pre_tokens[i]=='O' or 'MISC' in pre_tokens[i]: | |
new_labels.append(' ' +token.replace('▁','').replace('Ġ','')) | |
else: | |
new_labels.append(' ' + pre_tokens[i]) | |
i=i+1 | |
a='' | |
for i in new_labels: | |
a = a+i | |
return a | |
def salida_texto_anonimizado(self, ids,pre_tokens): | |
new_labels = [] | |
current_word = None | |
i=0 | |
for identificador in pre_tokens: | |
if identificador=='O' or 'OTH' in identificador: | |
new_labels.append(self.tokenizer.decode(ids[i])) | |
else: | |
new_labels.append(' ' + identificador) | |
i=i+1 | |
a='' | |
for i in new_labels: | |
a = a+i | |
return a | |
def formato_salida(self,out): | |
a="" | |
for i in out: | |
a = a + i.replace('▁','').replace(' ','') + ' ' | |
return a | |
def fake_pers(self): | |
return self.faker_.name(self) | |
def fake_word(self): | |
return self.faker_.word() | |
def fake_first_name(self): | |
return self.faker_.first_name() | |
def fake_last_name(self): | |
return self.faker_.last_name() | |
def fake_address(self): | |
return self.faker_.address() | |
def fake_sentence(self,n): | |
return self.faker_.sentence(nb_words=n) | |
def fake_text(self): | |
return self.faker_.text() | |
def fake_company(self): | |
return self.faker_.company() | |
def fake_city(self): | |
return self.faker_.city() | |
def reemplazo_fake(self,identificadores): | |
new_iden=[] | |
for id in identificadores: | |
if 'PER' in id: | |
new_iden.append(self.fake_first_name()) | |
elif 'ORG' in id: | |
new_iden.append(self.fake_company()) | |
elif 'LOC' in id: | |
new_iden.append(self.fake_city()) | |
else: | |
new_iden.append(id) | |
return new_iden | |
### | |
### Función que aplica los modelo para categorizar el texto segun su contexto | |
### | |
def categorizar_texto(self,texto): | |
name="elozano/bert-base-cased-news-category" | |
tokenizer = AutoTokenizer.from_pretrained(name) | |
model_ = AutoModelForSequenceClassification.from_pretrained(name) | |
inputs_ = tokenizer(texto, padding=True, truncation=True, return_tensors="pt") | |
with torch.no_grad(): | |
logits = model_(**inputs_).logits | |
preds = torch.softmax(logits, dim=-1) | |
id2lang = model_.config.id2label | |
vals, idxs = torch.max(preds, dim=1) | |
#retorna el idioma con mayor porcentaje | |
maximo=vals.max() | |
cat='' | |
self.categoria_texto='' | |
porcentaje=0 | |
for k, v in zip(idxs, vals): | |
if v.item()==maximo: | |
cat,porcentaje=id2lang[k.item()],v.item() | |
self.categoria_texto=cat | |
return cat, porcentaje | |
### | |
### Función que aplica los modelos sobre un texto | |
### | |
def predict(self,etiquetas): | |
categoria, porcentaje = self.categorizar_texto(self.texto) | |
print(categoria, porcentaje) | |
self.tokenizer = AutoTokenizer.from_pretrained(self.modelo_ner) | |
tokens = self.tokenizer.tokenize(self.texto) | |
ids = self.tokenizer.convert_tokens_to_ids(tokens) | |
input_ids = torch.tensor([ids]) | |
with torch.no_grad(): | |
logits = self.model(input_ids).logits | |
predicted_token_class_ids = logits.argmax(-1) | |
predicted_tokens_classes = [self.model.config.id2label[t.item()] for t in predicted_token_class_ids[0]] | |
labels = predicted_token_class_ids | |
loss = self.model(input_ids, labels=labels).loss | |
if (self.idioma=='es'): | |
new_tokens,ig_tokens=self.reordenacion_tokens(tokens,'Ġ') | |
else: | |
new_tokens,ig_tokens=self.reordenacion_tokens(tokens,'▁') | |
new_identificadores = self.reordenacion_identificadores(ig_tokens,predicted_tokens_classes) | |
out1 = self.salida_json(new_tokens,new_identificadores) | |
if etiquetas: | |
out2 = self.salida_texto(new_tokens,new_identificadores)#solo identificadores | |
else: | |
out2 = self.salida_texto(new_tokens,self.reemplazo_fake(new_identificadores)) | |
return ( | |
out1, | |
str(out2) | |
) | |
class ModeloDataset: | |
def __init__(self): | |
self.texto="" | |
self.idioma="" | |
self.modelo_ner="" | |
self.categoria_texto="" | |
self.tokenizer = AutoTokenizer.from_pretrained("BSC-LT/roberta_model_for_anonimization") | |
def reordenacion_tokens(self,tokens,caracter): | |
i=0 | |
new_tokens=[] | |
ig_tokens=[] | |
for token in tokens: | |
print('tokensss:',tokens,caracter) | |
ind=len(new_tokens) | |
if i<len(tokens): | |
if token.startswith(caracter): | |
new_tokens.append(token) | |
i=i+1 | |
else: | |
new_tokens[ind-1] = (new_tokens[ind-1] + token) | |
ig_tokens.append(i) | |
i=i+1 | |
return ( | |
new_tokens, | |
ig_tokens | |
) | |
def reordenacion_identificadores(self,ig_tokens,predicted_tokens_classes, tamano): | |
x=0 | |
new_identificadores=[] | |
for token in predicted_tokens_classes: | |
if x not in ig_tokens: | |
if len(new_identificadores) < tamano: | |
new_identificadores.append(token) | |
x=x+1 | |
else: | |
x=x+1 | |
return new_identificadores | |
### | |
### Funciones para generar diversos datos fake dependiendo de la catagoria | |
### | |
def fake_pers(self): | |
return self.faker_.name(self) | |
def fake_word(self): | |
return self.faker_.word() | |
def fake_first_name(self): | |
return self.faker_.first_name() | |
def fake_last_name(self): | |
return self.faker_.last_name() | |
def fake_address(self): | |
return self.faker_.address() | |
def fake_sentence(self,n): | |
return self.faker_.sentence(nb_words=n) | |
def fake_text(self): | |
return self.faker_.text() | |
def fake_company(self): | |
return self.faker_.company() | |
def fake_city(self): | |
return self.faker_.city() | |
def reemplazo_fake(self,identificadores): | |
if self.idioma=='es': | |
self.faker_ = Faker('es_MX') | |
else: | |
self.faker_ = Faker('en_US') | |
new_iden=[] | |
for id in identificadores: | |
if 'PER' in id: | |
new_iden.append(self.fake_first_name()) | |
elif 'ORG' in id: | |
new_iden.append(self.fake_company()) | |
elif 'LOC' in id: | |
new_iden.append(self.fake_city()) | |
else: | |
new_iden.append(id) | |
return new_iden | |
### | |
### Función que aplica los modelos de acuerdo al idioma detectado | |
### | |
def aplicar_modelo(self,_sentences,idioma, etiquetas): | |
if idioma=="es": | |
self.tokenizer = AutoTokenizer.from_pretrained("BSC-LT/roberta_model_for_anonimization") | |
tokenized_text=[self.tokenizer.tokenize(sentence[:500]) for sentence in _sentences] | |
ids = [self.tokenizer.convert_tokens_to_ids(x) for x in tokenized_text] | |
MAX_LEN=128 | |
ids=pad_sequences(ids,maxlen=MAX_LEN,dtype="long",truncating="post", padding="post") | |
input_ids = torch.tensor(ids) | |
self.model = RobertaForTokenClassification.from_pretrained("BSC-LT/roberta_model_for_anonimization") | |
with torch.no_grad(): | |
logits = self.model(input_ids).logits | |
predicted_token_class_ids = logits.argmax(-1) | |
i=0 | |
_predicted_tokens_classes=[] | |
for a in predicted_token_class_ids: | |
_predicted_tokens_classes.append([self.model.config.id2label[t.item()] for t in predicted_token_class_ids[i]]) | |
i=i+1 | |
labels = predicted_token_class_ids | |
loss = self.model(input_ids, labels=labels).loss | |
new_tokens=[] | |
ig_tok=[] | |
i=0 | |
new_identificadores=[] | |
for item in tokenized_text: | |
aux1, aux2= self.reordenacion_tokens(item,"Ġ") | |
new_tokens.append(aux1) | |
ig_tok.append(aux2) | |
for items in _predicted_tokens_classes: | |
aux=self.reordenacion_identificadores(ig_tok[i],items,len(new_tokens[i])) | |
new_identificadores.append(aux) | |
i=i+1 | |
return new_identificadores, new_tokens | |
else: | |
print('idioma:',idioma) | |
self.tokenizer = AutoTokenizer.from_pretrained("FacebookAI/xlm-roberta-large-finetuned-conll03-english") | |
tokenized_text=[self.tokenizer.tokenize(sentence[:500]) for sentence in _sentences] | |
ids = [self.tokenizer.convert_tokens_to_ids(x) for x in tokenized_text] | |
MAX_LEN=128 | |
ids=pad_sequences(ids,maxlen=MAX_LEN,dtype="long",truncating="post", padding="post") | |
input_ids = torch.tensor(ids) | |
self.model = AutoModelForTokenClassification.from_pretrained("FacebookAI/xlm-roberta-large-finetuned-conll03-english") | |
with torch.no_grad(): | |
logits = self.model(input_ids).logits | |
predicted_token_class_ids = logits.argmax(-1) | |
i=0 | |
_predicted_tokens_classes=[] | |
for a in predicted_token_class_ids: | |
_predicted_tokens_classes.append([self.model.config.id2label[t.item()] for t in predicted_token_class_ids[i]]) | |
i=i+1 | |
labels = predicted_token_class_ids | |
loss = self.model(input_ids, labels=labels).loss | |
new_tokens=[] | |
ig_tok=[] | |
i=0 | |
new_identificadores=[] | |
for item in tokenized_text: | |
aux1, aux2= self.reordenacion_tokens(item,"▁") | |
new_tokens.append(aux1) | |
ig_tok.append(aux2) | |
for items in _predicted_tokens_classes: | |
aux=self.reordenacion_identificadores(ig_tok[i],items,len(new_tokens[i])) | |
new_identificadores.append(aux) | |
i=i+1 | |
return new_identificadores, new_tokens | |
### | |
### Procesa los tokens generados del texto de entradas con los tokens predichos, para generar los tokens por palabra | |
### | |
def salida_texto( self,tokens,pre_tokens): | |
new_labels = [] | |
current_word = None | |
i=0 | |
for token in tokens: | |
if pre_tokens[i]=='O' or 'MISC' in pre_tokens[i]: | |
new_labels.append(' ' +token.replace('▁','').replace('Ġ','')) | |
else: | |
new_labels.append(' ' + pre_tokens[i]) | |
i=i+1 | |
a='' | |
for i in new_labels: | |
a = a+i | |
return a | |
def salida_texto2(self, tokens,labels,etiquetas): | |
i=0 | |
out=[] | |
for iden in labels: | |
if etiquetas: | |
out.append(self.salida_texto( iden,np.array(tokens[i]))) | |
else: | |
out.append(self.salida_texto(iden,self.reemplazo_fake(np.array(tokens[i])))) | |
i=i+1 | |
return out | |
def unir_array(self,_out): | |
i=0 | |
salida=[] | |
for item in _out: | |
salida.append("".join(str(x) for x in _out[i])) | |
i=i+1 | |
return salida | |
def unir_columna_valores(self,df,columna): | |
out = ','.join(df[columna]) | |
return out | |
### | |
### Funcion para procesar archivos json, recibe archivo | |
### | |
class utilJSON: | |
def __init__(self,archivo): | |
with open(archivo, encoding='utf-8') as f: | |
self.data = json.load(f) | |
def obtener_keys_json(self,data): | |
out=[] | |
for key in data: | |
out.append(key) | |
return(out) | |
### | |
### funcion "flatten_json" tomada de https://levelup.gitconnected.com/a-deep-dive-into-nested-json-to-data-frame-with-python-69bdabb41938 | |
### Renu Khandelwal Jul 23, 2023 | |
def flatten_json(self,y): | |
try: | |
out = {} | |
def flatten(x, name=''): | |
if type(x) is dict: | |
for a in x: | |
flatten(x[a], name + a + '_') | |
elif type(x) is list: | |
i = 0 | |
for a in x: | |
flatten(a, name + str(i) + '_') | |
i += 1 | |
else: | |
out[name[:-1]] = x | |
flatten(y) | |
return out | |
except json.JSONDecodeError: | |
print("Error: The JSON document could not be decoded.") | |
except TypeError: | |
print("Error: Invalid operation or function argument type.") | |
except KeyError: | |
print("Error: One or more keys do not exist.") | |
except ValueError: | |
print("Error: Invalid value detected.") | |
except Exception as e: | |
print(f"An unexpected error occurred: {str(e)}") | |
def obtener_dataframe(self,data): | |
claves=self.obtener_keys_json(data) | |
if len(claves)==1: | |
data_flattened = [self.flatten_json(class_info) for class_info in data[claves[0]]] | |
df = pd.DataFrame(data_flattened) | |
else: | |
data_flattened = [self.flatten_json(class_info) for class_info in data] | |
df = pd.DataFrame(data_flattened) | |
return df | |
modelo = ModeloDataset() | |
model = Model() | |
def get_model(): | |
return model | |
### | |
### Función que interactúa con la interfaz Gradio para el procesamiento de texto, csv o json | |
### | |
def procesar(texto,archivo, etiquetas): | |
if len(texto)>0: | |
print('text') | |
model.identificacion_idioma(texto[:1869]) | |
return model.idioma + "/" + model.categoria_texto, model.predict(etiquetas),gr.Dataframe(),gr.File() | |
else: | |
if archivo.name.split(".")[1]=="csv": | |
print('csv') | |
df=pd.read_csv(archivo.name,delimiter=";",encoding='latin-1') | |
df_new = pd.DataFrame( columns=df.columns.values) | |
model.identificacion_idioma(df.iloc[0][0]) | |
modelo.idioma=model.idioma | |
print(model.idioma) | |
for item in df.columns.values: | |
sentences=df[item] | |
ides, predicted = modelo.aplicar_modelo(sentences,model.idioma,etiquetas) | |
out=modelo.salida_texto2( ides,predicted,etiquetas) | |
print('out es:',out) | |
df_new[item] = modelo.unir_array(out) | |
return modelo.idioma,"", df_new, df_new.to_csv(sep='\t', encoding='utf-8',index=False) | |
else: | |
print('json') | |
if archivo.name.split(".")[1]=="json": | |
util = utilJSON(archivo.name) | |
df=util.obtener_dataframe(util.data) | |
df_new = pd.DataFrame( columns=df.columns.values) | |
model.identificacion_idioma(df.iloc[0][0]) | |
modelo.idioma=model.idioma | |
for item in df.columns.values: | |
sentences=df[item] | |
ides, predicted = modelo.aplicar_modelo(sentences,modelo.idioma,etiquetas) | |
out=modelo.salida_texto2( ides,predicted,etiquetas) | |
print('out:',out) | |
df_new[item] = modelo.unir_array(out) | |
return modelo.idioma,"", df_new, df_new.to_csv(sep='\t', encoding='utf-8',index=False) | |
demo = gr.Interface(fn=procesar,inputs=["text",gr.File(), "checkbox"] , outputs=[gr.Label(label="idioma/categoría"),gr.Textbox(label="texto procesado"),gr.Dataframe(label="Datos procesados en dataframe",interactive=False),gr.Textbox(label="datos csv")]) | |
# | |
demo.launch(share=True) | |