Spaces:
Runtime error
Runtime error
import ast | |
import warnings | |
from collections import Counter | |
import classla | |
import pandas as pd | |
classla.download('bg') | |
classla_nlp = classla.Pipeline('bg') | |
warnings.filterwarnings('ignore') | |
INPUT_FILE_TYPE = ['.csv', '.json', '.txt'] | |
OUTPUT_FILE_TYPE = ['.csv', '.xlsx'] | |
STATS_OUTPUT = 'classla_stats' | |
OUTPUT_FILE_NAME = 'result.csv' | |
def to_output(df, output_file): | |
if 'xlsx' in output_file: | |
df.to_excel(output_file, index=False) | |
if 'csv' in output_file: | |
df.to_csv(output_file, index=False) | |
return df.head(10), output_file | |
def remove_duplicates(df, input_column, output_column, output_file): | |
df.drop_duplicates(subset=[input_column], inplace=True) | |
return to_output(df, output_file) | |
def remove_links(df, input_column, output_column, output_file): | |
link_regex = r'(https?:\/\/(?:www\.|(?!www))[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]\.[^\s]{2,}|www\.[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]\.[^\s]{2,}|https?:\/\/(?:www\.|(?!www))[a-zA-Z0-9]+\.[^\s]{2,}|www\.[a-zA-Z0-9]+\.[^\s]{2,})' | |
if input_column != output_column: | |
df[output_column] = df[input_column] | |
df_links = df[df[output_column].str.contains(link_regex, regex=True, na=False)] | |
df = pd.concat([df, df_links, df_links]).drop_duplicates(keep=False) | |
df_links[output_column] = df_links[output_column].str.replace(link_regex, '', regex=True) | |
df = pd.concat([df, df_links]) | |
return to_output(df, output_file) | |
def remove_emails(df, input_column, output_column, output_file): | |
email_regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
if input_column != output_column: | |
df[output_column] = df[input_column] | |
df_email = df[df[output_column].str.contains(email_regex, regex=True, na=False)] | |
df = pd.concat([df, df_email, df_email]).drop_duplicates(keep=False) | |
df_email[output_column] = df_email[output_column].str.replace(email_regex, '<EMAIL>', regex=True) | |
df = pd.concat([df, df_email]) | |
return to_output(df, output_file) | |
def remove_phones(df, input_column, output_column, output_file): | |
phone_regex = r'(?<!\S)(\+|0)[1-9][0-9 \-\(\)]{7,32}' | |
if input_column != output_column: | |
df[output_column] = df[input_column] | |
df_phone = df[df[output_column].str.contains(phone_regex, regex=True, na=False)] | |
df = pd.concat([df, df_phone, df_phone]).drop_duplicates(keep=False) | |
df_phone[output_column] = df_phone[output_column].str.replace(phone_regex, '<PHONE>', regex=True) | |
df = pd.concat([df, df_phone]) | |
return to_output(df, output_file) | |
def get_sentences(df, input_column, output_column, output_file): | |
def split_sentences(input_list=None): | |
if input_list is None: | |
input_list = [] | |
temp = [] | |
res = [] | |
for idx in range(len(input_list)): | |
if input_list[idx][0] <= input_list[idx - 1][0]: | |
res.append(" ".join([x[1] for x in temp])) | |
temp = [] | |
temp.append(input_list[idx]) | |
res.append(" ".join([x[1] for x in temp])) | |
res.pop(0) # first element is always [], so it is removed | |
return res | |
if input_column != output_column: | |
df[output_column] = df[input_column] | |
sentences_separated = [] | |
for index in range(df.shape[0]): | |
row_nlp = classla_nlp(df.iloc[index][input_column]) | |
row_result_upos = row_nlp.get('upos') | |
row_id = row_nlp.get('id') | |
row_text = row_nlp.get('text') | |
row_result = [[row_id[x], row_text[x]] for x in range(len(row_id)) if | |
row_result_upos[x] != 'PUNCT'] # filter punctuation | |
row_result = split_sentences(input_list=row_result) # splitting messages | |
sentences_separated.append(row_result) | |
df[output_column] = sentences_separated | |
return to_output(df, output_file) | |
def get_classla_ner(df, input_column, output_column, output_file): | |
def sentence_classla(sentence_list): | |
result_ner = list() | |
for sentence in sentence_list: | |
current_nlp = classla_nlp(sentence).to_dict() | |
sentence_ner = [word['ner'] for word in current_nlp[0][0]] | |
result_ner.append(sentence_ner) | |
return result_ner | |
df[input_column] = df[input_column].apply(lambda x: ast.literal_eval(x)) | |
if input_column != output_column: | |
df[output_column] = df[input_column] | |
clarin_classla_result = [sentence_classla(df.iloc[index][input_column]) for index in range(df.shape[0])] | |
df[output_column] = [clarin_classla_result[index] for index in range(df.shape[0])] | |
return to_output(df, output_file) | |
def get_classla_all(df, input_column, output_column, output_file): | |
def sentence_classla(sentence_list): | |
result_all = list() | |
for sentence in sentence_list: | |
current_nlp = classla_nlp(sentence).to_dict() | |
result_all.append(current_nlp) | |
return result_all | |
df[input_column] = df[input_column].apply(lambda x: ast.literal_eval(x)) | |
if input_column != output_column: | |
df[output_column] = df[input_column] | |
clarin_classla_result = [sentence_classla(df.iloc[index][input_column]) for index in range(df.shape[0])] | |
df[output_column] = [clarin_classla_result[index] for index in range(df.shape[0])] | |
return to_output(df, output_file) | |
def classla_stats(df, input_column, output_column, output_file): | |
def count_ner(ner_list: []): | |
counter = Counter() | |
for el in ner_list: | |
counter += Counter(el) | |
return str(dict(counter)) | |
global STATS_OUTPUT | |
STATS_OUTPUT = output_column | |
global OUTPUT_FILE_NAME | |
OUTPUT_FILE_NAME = output_file | |
df[input_column] = df[input_column].apply(lambda x: ast.literal_eval(x)) | |
if input_column != output_column: | |
df[output_column] = df[input_column] | |
clarin_classla_result = [count_ner(df.iloc[index][input_column]) for index in range(df.shape[0])] | |
df[output_column] = [clarin_classla_result[index] for index in range(df.shape[0])] | |
return to_output(df, output_file) | |
def get_classla_stats_df(): | |
print(OUTPUT_FILE_NAME) | |
df = pd.read_csv(OUTPUT_FILE_NAME, encoding='utf-8') | |
df[STATS_OUTPUT] = df[STATS_OUTPUT].apply(lambda x: ast.literal_eval(x)) | |
counter = Counter() | |
for _, line in df.iterrows(): | |
counter += Counter(line[STATS_OUTPUT]) | |
r = pd.DataFrame(dict(counter), index=range(len(dict(counter)))) | |
r.drop_duplicates(inplace=True) | |
r = r.melt(var_name='value', value_name='count') | |
return r | |
def run_all(df, input_column, output_column, output_file): | |
def load_file(output_file): | |
df = None | |
if 'xlsx' in output_file: | |
df = pd.read_excel(output_file) | |
if 'csv' in output_file: | |
df = pd.read_csv(output_file) | |
return df | |
_, _ = remove_duplicates(df, input_column, output_column, output_file) | |
df = load_file(output_file) | |
_, _ = remove_links(df, input_column, 'removed_links', output_file) | |
df = load_file(output_file) | |
_, _ = remove_emails(df, 'removed_links', 'removed_emails', output_file) | |
df = load_file(output_file) | |
_, _ = remove_phones(df, 'removed_emails', 'removed_phones', output_file) | |
df = load_file(output_file) | |
_, _ = get_sentences(df, 'removed_phones', 'extracted_sentences', output_file) | |
df = load_file(output_file) | |
_, _ = get_classla_all(df, 'extracted_sentences', 'classla_all', output_file) | |
df = load_file(output_file) | |
_, _ = get_classla_ner(df, 'extracted_sentences', 'classla_ner', output_file) | |
df = load_file(output_file) | |
_, _ = classla_stats(df, 'classla_ner', 'classla_stats', output_file) | |
df = load_file(output_file) | |
return df.head(10), output_file | |
functions = { | |
'remove duplicate rows': remove_duplicates, | |
'remove links': remove_links, | |
'remove e-mails': remove_emails, | |
'remove phone numbers': remove_phones, | |
'separate sentences': get_sentences, | |
'Classla NER': get_classla_ner, | |
'Classla full result': get_classla_all, | |
'classla stats': classla_stats, | |
'run all': run_all, | |
} | |