anonimizador / app.py
dayannex's picture
documentacion
4fe5a18
import gradio as gr
import torch
import json
import pandas as pd
import numpy as np
from transformers import AutoTokenizer, RobertaForTokenClassification
from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from json import JSONEncoder
from faker import Faker
from keras.utils import pad_sequences
class out_json():
def __init__(self, w,l):
self.word = w
self.label = l
class MyEncoder(JSONEncoder):
def default(self, o):
return o.__dict__
class Model:
def __init__(self):
self.texto=""
self.idioma=""
self.modelo_ner=""
self.categoria_texto=""
##
### Función que aplica el modelo e identifica su idioma
###
def identificacion_idioma(self,text):
self.texto=text
tokenizer = AutoTokenizer.from_pretrained("papluca/xlm-roberta-base-language-detection")
model = AutoModelForSequenceClassification.from_pretrained("papluca/xlm-roberta-base-language-detection")
inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt")
with torch.no_grad():
logits = model(**inputs).logits
preds = torch.softmax(logits, dim=-1)
id2lang = model.config.id2label
vals, idxs = torch.max(preds, dim=1)
#retorna el idioma con mayor porcentaje
maximo=vals.max()
idioma=''
porcentaje=0
for k, v in zip(idxs, vals):
if v.item()==maximo:
idioma,porcentaje=id2lang[k.item()],v.item()
if idioma=='es':
self.idioma="es"
self.modelo_ner='BSC-LT/roberta_model_for_anonimization'
self.faker_ = Faker('es_MX')
self.model = RobertaForTokenClassification.from_pretrained(self.modelo_ner)
else:
self.idioma="en"
self.faker_ = Faker('en_US')
self.modelo_ner="FacebookAI/xlm-roberta-large-finetuned-conll03-english"
self.model = AutoModelForTokenClassification.from_pretrained(self.modelo_ner)
self.categorizar_texto(self.texto)
def reordenacion_tokens(self,tokens,caracter):
i=0
new_tokens=[]
ig_tokens=[]
for token in tokens:
print('token_texto:',token,caracter)
ind=len(new_tokens)
if i<len(tokens):
if token.startswith(caracter):
new_tokens.append(token)
i=i+1
else:
new_tokens[ind-1] = (new_tokens[ind-1] + token)
ig_tokens.append(i)
i=i+1
return (
new_tokens,
ig_tokens
)
def reordenacion_identificadores(self,ig_tokens,predicted_tokens_classes):
x=0
new_identificadores=[]
for token in predicted_tokens_classes:
if x not in ig_tokens:
new_identificadores.append(token)
x=x+1
else:
x=x+1
return new_identificadores
def salida_json(self,tokens,pre_tokens):
list=[]
i=0
for t in tokens:
if pre_tokens[i]!='O':
a = out_json(t.replace('▁','').replace('Ġ','').replace('Ċ',''),pre_tokens[i].replace('▁',''))
list.append(a)
i=i+1
return MyEncoder().encode(list)
def salida_texto( self,tokens,pre_tokens):
new_labels = []
current_word = None
i=0
for token in tokens:
if pre_tokens[i]=='O' or 'MISC' in pre_tokens[i]:
new_labels.append(' ' +token.replace('▁','').replace('Ġ',''))
else:
new_labels.append(' ' + pre_tokens[i])
i=i+1
a=''
for i in new_labels:
a = a+i
return a
def salida_texto_anonimizado(self, ids,pre_tokens):
new_labels = []
current_word = None
i=0
for identificador in pre_tokens:
if identificador=='O' or 'OTH' in identificador:
new_labels.append(self.tokenizer.decode(ids[i]))
else:
new_labels.append(' ' + identificador)
i=i+1
a=''
for i in new_labels:
a = a+i
return a
def formato_salida(self,out):
a=""
for i in out:
a = a + i.replace('▁','').replace(' ','') + ' '
return a
def fake_pers(self):
return self.faker_.name(self)
def fake_word(self):
return self.faker_.word()
def fake_first_name(self):
return self.faker_.first_name()
def fake_last_name(self):
return self.faker_.last_name()
def fake_address(self):
return self.faker_.address()
def fake_sentence(self,n):
return self.faker_.sentence(nb_words=n)
def fake_text(self):
return self.faker_.text()
def fake_company(self):
return self.faker_.company()
def fake_city(self):
return self.faker_.city()
def reemplazo_fake(self,identificadores):
new_iden=[]
for id in identificadores:
if 'PER' in id:
new_iden.append(self.fake_first_name())
elif 'ORG' in id:
new_iden.append(self.fake_company())
elif 'LOC' in id:
new_iden.append(self.fake_city())
else:
new_iden.append(id)
return new_iden
###
### Función que aplica los modelo para categorizar el texto segun su contexto
###
def categorizar_texto(self,texto):
name="elozano/bert-base-cased-news-category"
tokenizer = AutoTokenizer.from_pretrained(name)
model_ = AutoModelForSequenceClassification.from_pretrained(name)
inputs_ = tokenizer(texto, padding=True, truncation=True, return_tensors="pt")
with torch.no_grad():
logits = model_(**inputs_).logits
preds = torch.softmax(logits, dim=-1)
id2lang = model_.config.id2label
vals, idxs = torch.max(preds, dim=1)
#retorna el idioma con mayor porcentaje
maximo=vals.max()
cat=''
self.categoria_texto=''
porcentaje=0
for k, v in zip(idxs, vals):
if v.item()==maximo:
cat,porcentaje=id2lang[k.item()],v.item()
self.categoria_texto=cat
return cat, porcentaje
###
### Función que aplica los modelos sobre un texto
###
def predict(self,etiquetas):
categoria, porcentaje = self.categorizar_texto(self.texto)
print(categoria, porcentaje)
self.tokenizer = AutoTokenizer.from_pretrained(self.modelo_ner)
tokens = self.tokenizer.tokenize(self.texto)
ids = self.tokenizer.convert_tokens_to_ids(tokens)
input_ids = torch.tensor([ids])
with torch.no_grad():
logits = self.model(input_ids).logits
predicted_token_class_ids = logits.argmax(-1)
predicted_tokens_classes = [self.model.config.id2label[t.item()] for t in predicted_token_class_ids[0]]
labels = predicted_token_class_ids
loss = self.model(input_ids, labels=labels).loss
if (self.idioma=='es'):
new_tokens,ig_tokens=self.reordenacion_tokens(tokens,'Ġ')
else:
new_tokens,ig_tokens=self.reordenacion_tokens(tokens,'▁')
new_identificadores = self.reordenacion_identificadores(ig_tokens,predicted_tokens_classes)
out1 = self.salida_json(new_tokens,new_identificadores)
if etiquetas:
out2 = self.salida_texto(new_tokens,new_identificadores)#solo identificadores
else:
out2 = self.salida_texto(new_tokens,self.reemplazo_fake(new_identificadores))
return (
out1,
str(out2)
)
class ModeloDataset:
def __init__(self):
self.texto=""
self.idioma=""
self.modelo_ner=""
self.categoria_texto=""
self.tokenizer = AutoTokenizer.from_pretrained("BSC-LT/roberta_model_for_anonimization")
def reordenacion_tokens(self,tokens,caracter):
i=0
new_tokens=[]
ig_tokens=[]
for token in tokens:
print('tokensss:',tokens,caracter)
ind=len(new_tokens)
if i<len(tokens):
if token.startswith(caracter):
new_tokens.append(token)
i=i+1
else:
new_tokens[ind-1] = (new_tokens[ind-1] + token)
ig_tokens.append(i)
i=i+1
return (
new_tokens,
ig_tokens
)
def reordenacion_identificadores(self,ig_tokens,predicted_tokens_classes, tamano):
x=0
new_identificadores=[]
for token in predicted_tokens_classes:
if x not in ig_tokens:
if len(new_identificadores) < tamano:
new_identificadores.append(token)
x=x+1
else:
x=x+1
return new_identificadores
###
### Funciones para generar diversos datos fake dependiendo de la catagoria
###
def fake_pers(self):
return self.faker_.name(self)
def fake_word(self):
return self.faker_.word()
def fake_first_name(self):
return self.faker_.first_name()
def fake_last_name(self):
return self.faker_.last_name()
def fake_address(self):
return self.faker_.address()
def fake_sentence(self,n):
return self.faker_.sentence(nb_words=n)
def fake_text(self):
return self.faker_.text()
def fake_company(self):
return self.faker_.company()
def fake_city(self):
return self.faker_.city()
def reemplazo_fake(self,identificadores):
if self.idioma=='es':
self.faker_ = Faker('es_MX')
else:
self.faker_ = Faker('en_US')
new_iden=[]
for id in identificadores:
if 'PER' in id:
new_iden.append(self.fake_first_name())
elif 'ORG' in id:
new_iden.append(self.fake_company())
elif 'LOC' in id:
new_iden.append(self.fake_city())
else:
new_iden.append(id)
return new_iden
###
### Función que aplica los modelos de acuerdo al idioma detectado
###
def aplicar_modelo(self,_sentences,idioma, etiquetas):
if idioma=="es":
self.tokenizer = AutoTokenizer.from_pretrained("BSC-LT/roberta_model_for_anonimization")
tokenized_text=[self.tokenizer.tokenize(sentence[:500]) for sentence in _sentences]
ids = [self.tokenizer.convert_tokens_to_ids(x) for x in tokenized_text]
MAX_LEN=128
ids=pad_sequences(ids,maxlen=MAX_LEN,dtype="long",truncating="post", padding="post")
input_ids = torch.tensor(ids)
self.model = RobertaForTokenClassification.from_pretrained("BSC-LT/roberta_model_for_anonimization")
with torch.no_grad():
logits = self.model(input_ids).logits
predicted_token_class_ids = logits.argmax(-1)
i=0
_predicted_tokens_classes=[]
for a in predicted_token_class_ids:
_predicted_tokens_classes.append([self.model.config.id2label[t.item()] for t in predicted_token_class_ids[i]])
i=i+1
labels = predicted_token_class_ids
loss = self.model(input_ids, labels=labels).loss
new_tokens=[]
ig_tok=[]
i=0
new_identificadores=[]
for item in tokenized_text:
aux1, aux2= self.reordenacion_tokens(item,"Ġ")
new_tokens.append(aux1)
ig_tok.append(aux2)
for items in _predicted_tokens_classes:
aux=self.reordenacion_identificadores(ig_tok[i],items,len(new_tokens[i]))
new_identificadores.append(aux)
i=i+1
return new_identificadores, new_tokens
else:
print('idioma:',idioma)
self.tokenizer = AutoTokenizer.from_pretrained("FacebookAI/xlm-roberta-large-finetuned-conll03-english")
tokenized_text=[self.tokenizer.tokenize(sentence[:500]) for sentence in _sentences]
ids = [self.tokenizer.convert_tokens_to_ids(x) for x in tokenized_text]
MAX_LEN=128
ids=pad_sequences(ids,maxlen=MAX_LEN,dtype="long",truncating="post", padding="post")
input_ids = torch.tensor(ids)
self.model = AutoModelForTokenClassification.from_pretrained("FacebookAI/xlm-roberta-large-finetuned-conll03-english")
with torch.no_grad():
logits = self.model(input_ids).logits
predicted_token_class_ids = logits.argmax(-1)
i=0
_predicted_tokens_classes=[]
for a in predicted_token_class_ids:
_predicted_tokens_classes.append([self.model.config.id2label[t.item()] for t in predicted_token_class_ids[i]])
i=i+1
labels = predicted_token_class_ids
loss = self.model(input_ids, labels=labels).loss
new_tokens=[]
ig_tok=[]
i=0
new_identificadores=[]
for item in tokenized_text:
aux1, aux2= self.reordenacion_tokens(item,"▁")
new_tokens.append(aux1)
ig_tok.append(aux2)
for items in _predicted_tokens_classes:
aux=self.reordenacion_identificadores(ig_tok[i],items,len(new_tokens[i]))
new_identificadores.append(aux)
i=i+1
return new_identificadores, new_tokens
###
### Procesa los tokens generados del texto de entradas con los tokens predichos, para generar los tokens por palabra
###
def salida_texto( self,tokens,pre_tokens):
new_labels = []
current_word = None
i=0
for token in tokens:
if pre_tokens[i]=='O' or 'MISC' in pre_tokens[i]:
new_labels.append(' ' +token.replace('▁','').replace('Ġ',''))
else:
new_labels.append(' ' + pre_tokens[i])
i=i+1
a=''
for i in new_labels:
a = a+i
return a
def salida_texto2(self, tokens,labels,etiquetas):
i=0
out=[]
for iden in labels:
if etiquetas:
out.append(self.salida_texto( iden,np.array(tokens[i])))
else:
out.append(self.salida_texto(iden,self.reemplazo_fake(np.array(tokens[i]))))
i=i+1
return out
def unir_array(self,_out):
i=0
salida=[]
for item in _out:
salida.append("".join(str(x) for x in _out[i]))
i=i+1
return salida
def unir_columna_valores(self,df,columna):
out = ','.join(df[columna])
return out
###
### Funcion para procesar archivos json, recibe archivo
###
class utilJSON:
def __init__(self,archivo):
with open(archivo, encoding='utf-8') as f:
self.data = json.load(f)
def obtener_keys_json(self,data):
out=[]
for key in data:
out.append(key)
return(out)
###
### funcion "flatten_json" tomada de https://levelup.gitconnected.com/a-deep-dive-into-nested-json-to-data-frame-with-python-69bdabb41938
### Renu Khandelwal Jul 23, 2023
def flatten_json(self,y):
try:
out = {}
def flatten(x, name=''):
if type(x) is dict:
for a in x:
flatten(x[a], name + a + '_')
elif type(x) is list:
i = 0
for a in x:
flatten(a, name + str(i) + '_')
i += 1
else:
out[name[:-1]] = x
flatten(y)
return out
except json.JSONDecodeError:
print("Error: The JSON document could not be decoded.")
except TypeError:
print("Error: Invalid operation or function argument type.")
except KeyError:
print("Error: One or more keys do not exist.")
except ValueError:
print("Error: Invalid value detected.")
except Exception as e:
print(f"An unexpected error occurred: {str(e)}")
def obtener_dataframe(self,data):
claves=self.obtener_keys_json(data)
if len(claves)==1:
data_flattened = [self.flatten_json(class_info) for class_info in data[claves[0]]]
df = pd.DataFrame(data_flattened)
else:
data_flattened = [self.flatten_json(class_info) for class_info in data]
df = pd.DataFrame(data_flattened)
return df
modelo = ModeloDataset()
model = Model()
def get_model():
return model
###
### Función que interactúa con la interfaz Gradio para el procesamiento de texto, csv o json
###
def procesar(texto,archivo, etiquetas):
if len(texto)>0:
print('text')
model.identificacion_idioma(texto[:1869])
return model.idioma + "/" + model.categoria_texto, model.predict(etiquetas),gr.Dataframe(),gr.File()
else:
if archivo.name.split(".")[1]=="csv":
print('csv')
df=pd.read_csv(archivo.name,delimiter=";",encoding='latin-1')
df_new = pd.DataFrame( columns=df.columns.values)
model.identificacion_idioma(df.iloc[0][0])
modelo.idioma=model.idioma
print(model.idioma)
for item in df.columns.values:
sentences=df[item]
ides, predicted = modelo.aplicar_modelo(sentences,model.idioma,etiquetas)
out=modelo.salida_texto2( ides,predicted,etiquetas)
print('out es:',out)
df_new[item] = modelo.unir_array(out)
return modelo.idioma,"", df_new, df_new.to_csv(sep='\t', encoding='utf-8',index=False)
else:
print('json')
if archivo.name.split(".")[1]=="json":
util = utilJSON(archivo.name)
df=util.obtener_dataframe(util.data)
df_new = pd.DataFrame( columns=df.columns.values)
model.identificacion_idioma(df.iloc[0][0])
modelo.idioma=model.idioma
for item in df.columns.values:
sentences=df[item]
ides, predicted = modelo.aplicar_modelo(sentences,modelo.idioma,etiquetas)
out=modelo.salida_texto2( ides,predicted,etiquetas)
print('out:',out)
df_new[item] = modelo.unir_array(out)
return modelo.idioma,"", df_new, df_new.to_csv(sep='\t', encoding='utf-8',index=False)
demo = gr.Interface(fn=procesar,inputs=["text",gr.File(), "checkbox"] , outputs=[gr.Label(label="idioma/categoría"),gr.Textbox(label="texto procesado"),gr.Dataframe(label="Datos procesados en dataframe",interactive=False),gr.Textbox(label="datos csv")])
#
demo.launch(share=True)