Spaces:
Running
Running
#Import Libraries | |
import spacy | |
nlp = spacy.load("en_core_web_sm") | |
from transformer_srl import dataset_readers, models, predictors | |
predictor = predictors.SrlTransformersPredictor.from_path("srl_bert_base_conll2012.tar.gz","transformer_srl") | |
import re | |
import nltk | |
nltk.download('stopwords') | |
from nltk.corpus import stopwords | |
stop_words = set(stopwords.words('english')) | |
import numpy as np | |
import pandas as pd | |
import networkx as nx | |
from IPython.core.display import display, HTML | |
from pyvis.network import Network | |
from somajo import SoMaJo | |
import requests | |
import json | |
import gradio as gr | |
import matplotlib | |
import matplotlib.pyplot as plt | |
from statistics import mean | |
from statistics import stdev | |
import math | |
#Semantic Graph Function | |
def semantic_graph(text_input, srl_model, flow, predication, action, setting, cause, manner, negation, | |
orientation): | |
#Define dictionary to convert pronouns to their root | |
pronouns_dic = {'i': 'i', 'I': 'i', 'he': 'he', 'she': 'she', 'you': 'you', 'we': 'we', 'they': 'they', 'me': 'i', | |
'my': 'i', 'mine': 'i', 'your': 'you', 'yours': 'you', 'him': 'he', 'his': 'he', 'her': 'she', | |
'hers': 'she', 'us': 'we', 'ours': 'we', 'our': 'we', 'their': 'they', 'theirs': 'they', | |
'them': 'they', 'its': 'it', 'it': 'it', "'em": 'they', 'myself': 'i', 'that': 'that', | |
'this': 'this', 'those': 'those', 'these': 'these'} | |
nlp = spacy.load("en_core_web_sm") | |
text = [text_input] | |
#Parse text into sentences: | |
sentence_df = pd.DataFrame() | |
tokenizer = SoMaJo("en_PTB", split_camel_case=True) | |
somajo_sentences = tokenizer.tokenize_text(text) | |
n_sent = 1 | |
for sentence in somajo_sentences: | |
each_sentence = [] | |
for token in sentence: | |
each_sentence.append(token.text) | |
content_sent = ' '.join(each_sentence) | |
instance = pd.DataFrame({'sentence_id': [n_sent], 'content': [content_sent]}) | |
sentence_df = sentence_df.append(instance, ignore_index=True) | |
n_sent += 1 | |
#Define semantic Roles: | |
arguments = ['ARG0', 'ARG1', 'ARG2', 'ARG3', 'ARG4' ,'ARG5'] | |
spatiotemporal = ['ARGM-LOC', 'ARGM-DIR', 'ARGM-GOL', 'ARGM-TMP'] | |
causal = ['ARGM-PRP', 'ARGM-CAU'] | |
how = ['ARGM-MNR', 'ARGM-COM', 'ARGM-EXT', 'ARGM-ADV', 'ARGM-ADJ', 'ARGM-PRD'] | |
meta = ['ARGM-DIS', 'ARGM-MOD', 'ARGM-REC'] | |
neg = ['ARGM-NEG'] | |
columns = ['sentence_id', 'pred_id', 'frame', 'lemma'] + arguments + spatiotemporal + causal + how + meta + neg | |
srl_df = pd.DataFrame(columns=columns) | |
pred_id = 1 | |
#srl_model = 'VerbAtlas' | |
#Branch for SRL models: | |
if srl_model == 'Transformer_SRL': | |
for index, row in sentence_df.iterrows(): | |
srl = predictor.predict((row['content'])) | |
for each_verb in srl['verbs']: | |
srl_elements = re.findall(r'\[.*?\]', each_verb['description']) | |
instance = {} | |
instance['sentence_id'] = row['sentence_id'] | |
instance['frame'] = each_verb['frame'] | |
instance['lemma'] = each_verb['lemma'] | |
instance['pred_id'] = pred_id | |
any_element = False | |
for element in srl_elements: | |
if element.split(':')[0][1:] != each_verb['frame']: | |
any_element = True | |
srl_content = element[1:-1].split(':') | |
# print(srl_content) | |
instance[srl_content[0].strip()] = srl_content[1].strip() | |
if any_element == True: | |
srl_df = srl_df.append(instance, ignore_index=True) | |
pred_id += 1 | |
srl_df['Pred'] = srl_df['lemma'] | |
elif srl_model == 'VerbAtlas': | |
URL = "https://verbatlas.org/api/model" | |
for index, row in sentence_df.iterrows(): | |
try: | |
r = requests.post(url=URL, json=[{"text": row['content'], "lang": "EN"}]) | |
all_srl = json.loads(r.text) | |
for item in all_srl: | |
token_register = {} | |
for token in item['tokens']: | |
token_register[token['index']] = token['rawText'] | |
for head in item['annotations']: | |
any_element = False | |
head_index = head['tokenIndex'] | |
english_prop = head['englishPropbank'] | |
prop_head = english_prop['frameName'] | |
prop_roles = english_prop['roles'] | |
instance = {} | |
instance['sentence_id'] = row['sentence_id'] | |
instance['frame'] = prop_head | |
instance['pred_id'] = pred_id | |
for role in prop_roles: | |
cat = role['role'] | |
span_0 = role['span'][0] | |
span_1 = role['span'][1] | |
token_list = [] | |
for i in range(span_0, span_1): | |
token_list.append(token_register[i]) | |
instance[cat] = ' '.join(token_list) | |
any_element = True | |
if any_element == True: | |
srl_df = srl_df.append(instance, ignore_index=True) | |
pred_id += 1 | |
except: | |
pass | |
srl_df['Pred'] = srl_df['frame'] | |
semantic_df = srl_df.copy() | |
#Extract roles: | |
roles = semantic_df.drop(['sentence_id', 'pred_id', 'frame', 'lemma', 'Pred'], axis=1).columns.to_list() | |
#Iterate on roles and clean: | |
for index, row in semantic_df.iterrows(): | |
for role in roles: | |
try: | |
arg_content = row[role].split(' ') | |
filtered_content = [] | |
for token in arg_content: | |
if nlp(token)[0].pos_ == 'PRON': | |
try: | |
filtered_content.append(pronouns_dic[token.lower()]) # , | |
except: | |
pass | |
elif nlp(token)[0].pos_ not in ['ADP', 'PART', 'PUNCT', 'DET', 'SPACE']: | |
filtered_content.append(token) | |
semantic_df.at[index, role] = ' '.join(filtered_content) | |
except: | |
pass | |
#Define Graph: | |
G_sem = nx.MultiDiGraph() | |
#Iterate on semantic df and add nodes and edges to the semantic graph: | |
for index, row in semantic_df.iterrows(): | |
if index < len(semantic_df) - 1 and len(semantic_df) > 1: | |
a = semantic_df.at[index, 'Pred'].lower() | |
b = semantic_df.at[index + 1, 'Pred'].lower() | |
if a not in G_sem: | |
G_sem.add_node(a, category='Predication', color='salmon', label=a) | |
if b not in G_sem: | |
G_sem.add_node(b, category='Predication', color='salmon', label=b) | |
G_sem.add_edge(a, b, turn=index + 0.5, color='brown', category='Flow') | |
elif len(semantic_df) == 1: | |
a = semantic_df.at[index, 'Pred'].lower() | |
G_sem.add_node(a, category='Predication', color='salmon', label=a) | |
a = semantic_df.at[index, 'Pred'].lower() | |
for arg in arguments: | |
if type(row[arg]) != float and row[arg] != '': | |
c = row[arg].lower() | |
G_sem.add_node(c, category='Argument', color='lightblue', label=c) | |
G_sem.add_edge(a, c, turn=index, category='Predication', color='salmon') | |
if type(row['ARG0']) != float: | |
c = row['ARG0'].lower() | |
if type(row['ARG1']) != float and row['ARG1'] != '': | |
d = row['ARG1'].lower() | |
G_sem.add_edge(c, d, turn=index, category='Argument', color='lightblue') | |
if type(row['ARG2']) != float and row['ARG2'] != '': | |
d = row['ARG2'].lower() | |
G_sem.add_edge(c, d, turn=index, category='Argument', color='lightblue') | |
#if type(row['ARG3']) != float: | |
#d = row['ARG3'].lower() | |
#G_sem.add_edge(c, d, turn=index, category='Argument', color='lightblue') | |
#if type(row['ARG4']) != float: | |
#d = row['ARG4'].lower() | |
#G_sem.add_edge(c, d, turn=index, category='Argument', color='lightblue') | |
for argm in spatiotemporal: | |
if type(row[argm]) != float and row[arg] != '': | |
d = row[argm].lower() | |
G_sem.add_node(d, category='Setting', color='lightgreen', label=d) | |
G_sem.add_edge(d, a, turn=index, category='Setting', color='lightgreen') | |
for argm in causal: | |
if type(row[argm]) != float and row[arg] != '': | |
d = row[argm].lower() | |
G_sem.add_node(d, category='Cause', color='gold', label=d) | |
G_sem.add_edge(d, a, turn=index, category='Cause', color='gold') | |
for argm in how: | |
if type(row[argm]) != float and row[arg] != '': | |
d = row[argm].lower() | |
G_sem.add_node(d, category='Manner', color='grey', label=d) | |
G_sem.add_edge(d, a, turn=index, category='Manner', color='grey') | |
for argm in meta: | |
if type(row[argm]) != float and row[arg] != '': | |
d = row[argm].lower() | |
G_sem.add_node(d, category='Orientation', color='aqua', label=d) | |
G_sem.add_edge(d, a, turn=index, category='Orientation', color='aqua') | |
for argm in neg: | |
if type(row[argm]) != float and row[arg] != '': | |
d = row[argm].lower() | |
G_sem.add_node(d, category='Negation', color='black', label=d) | |
G_sem.add_edge(d, a, turn=index, category='Negation', color='black') | |
cat_list = ['Flow', 'Predication', 'Argument', 'Setting', 'Cause', 'Manner', 'Negation', 'Orientation'] | |
categories = [flow, predication, action, setting, cause, manner, negation, orientation] | |
network_oi = [] | |
for i in range(0, len(categories)): | |
if categories[i] == True: | |
network_oi.append(cat_list[i]) | |
# G_filtered_cat = G_sem.subgraph([n for n,v in G_sem.nodes(data=True) if v['cat'] in network_oi]) | |
G_filtered_cat = nx.MultiDiGraph( | |
[(u, v, {'turn': e['turn'], 'category': e['category'], 'color': e['color']}) for u, v, e in G_sem.edges(data=True) if | |
e['category'] in network_oi]) | |
for v in G_filtered_cat.nodes: | |
G_filtered_cat.nodes[v]['category'] = G_sem.nodes[v]['category'] | |
G_filtered_cat.nodes[v]['title'] = G_sem.nodes[v]['category'] | |
G_filtered_cat.nodes[v]['color'] = G_sem.nodes[v]['color'] | |
#Visualize static graph: | |
color_map = [] | |
for each_node in G_filtered_cat.nodes(data=True): | |
color_map.append(each_node[1]['color']) | |
color_map2 = [] | |
for each_edge in G_filtered_cat.edges(data=True): | |
color_map2.append(each_edge[2]['color']) | |
legend_colors = pd.DataFrame(columns=['x','y','category','color']) | |
for each_node in G_filtered_cat.nodes(data=True): | |
instance = {} | |
instance['color']=(each_node[1]['color']) | |
instance['category']=(each_node[1]['category']) | |
legend_colors = legend_colors.append(instance, ignore_index=True) | |
legend_colors=legend_colors.drop_duplicates() | |
legend_colors2 = pd.DataFrame(columns=['x','y','category','color']) | |
for each_edge in G_filtered_cat.edges(data=True): | |
instance = {} | |
instance['color']=(each_edge[2]['color']) | |
instance['category']=(each_edge[2]['category']) | |
legend_colors2 = legend_colors2.append(instance, ignore_index=True) | |
legend_colors2=legend_colors2.drop_duplicates() | |
fig, ax = plt.subplots(figsize=(8,8)) | |
for index, row in legend_colors.iterrows(): | |
plt.scatter(row['x'],row['y'], c=row['color'], label=row['category']) | |
legend = plt.legend(bbox_to_anchor=(1.05, 1.0),loc=1, title='Nodes') | |
#nx.draw_kamada_kawai(G_filtered_cat,node_color=color_map,edge_color=color_map2, with_labels=True) | |
pos = nx.spring_layout(G_filtered_cat, k=5/math.sqrt(G_filtered_cat.order())) | |
nx.draw(G_filtered_cat, pos=pos,node_color=color_map,edge_color=color_map2, with_labels=True) | |
plt.tight_layout() | |
plt.show() | |
#Visualize interactive graph: | |
net = Network("600px", "600px", notebook=True, directed=True, cdn_resources="remote", filter_menu=True, select_menu=True) | |
net.from_nx(G_filtered_cat, show_edge_weights=False) | |
net.show_buttons(filter_=['physics']) | |
net.show('network.html') | |
#display(HTML('network.html')) | |
#Compute Graph metrics: | |
G_stat = nx.DiGraph(G_filtered_cat) | |
nn = G_stat.number_of_nodes() | |
ne = G_stat.number_of_edges() | |
try: | |
diameter = nx.diameter(nx.to_undirected(G_stat)) | |
except: | |
diameter = np.nan | |
try: | |
aspl = nx.average_shortest_path_length(nx.to_undirected(G_stat)) | |
except: | |
aspl=np.nan | |
try: | |
ad = sum([d for (n, d) in nx.degree(G_stat)]) / float(G_stat.number_of_nodes()) | |
except: | |
ad = 0 | |
try: | |
awd = sum([d for (n, d) in nx.degree(G_stat,weight='weight')]) / float(G_stat.number_of_nodes()) | |
except: | |
awd = 0 | |
gd = nx.density(G_stat) | |
try: | |
cc = nx.average_clustering(nx.Graph(G_stat)) | |
except: | |
cc = np.nan | |
ncc = len(sorted(nx.connected_components(nx.Graph(G_stat)), key=len, reverse=True)) | |
try: | |
lcc = len(max(nx.connected_components(nx.Graph(G_stat)), key=len)) | |
except: | |
lcc = 0 | |
try: | |
lscc = len(max(nx.strongly_connected_components(G_stat), key=len)) | |
except: | |
lscc = 0 | |
ncc_list = [] | |
lcc_list = [] | |
lscc_list =[] | |
for seed in range(0,1000): | |
G_rand = nx.gnm_random_graph(nn, ne, seed=seed,directed=True) | |
try: | |
ncc_list.append(len(sorted(nx.connected_components(nx.Graph(G_rand)), key=len, reverse=True))) | |
except: | |
pass | |
try: | |
lcc_list.append(len(max(nx.connected_components(nx.Graph(G_rand)), key=len))) | |
except: | |
pass | |
try: | |
lscc_list.append(len(max(nx.strongly_connected_components(G_rand), key=len))) | |
except: | |
pass | |
try: | |
nccz = (ncc-mean(ncc_list))/stdev(ncc_list) | |
except: | |
nccz = np.nan | |
try: | |
lccz = (lcc-mean(lcc_list))/stdev(lcc_list) | |
except: | |
lccz = np.nan | |
try: | |
lsccz = (lscc-mean(lscc_list))/stdev(lscc_list) | |
except: | |
lsccz = np.nan | |
cols = ['#Nodes', '#Edges','Diameter','ASPL','AD','AWD','Density','CC','NCC', 'LCC', 'LSCC', 'NCCZ', 'LCCZ', 'LSCCZ'] | |
df_stats = pd.DataFrame(data=[[nn,ne,diameter,aspl,ad,awd,gd,cc,ncc,lcc,lscc,nccz,lccz,lsccz]], columns=cols) | |
return fig, df_stats, 'network.html' #, net.generate_html('network.html') | |
#Build gradio interface | |
demo = gr.Interface( | |
fn=semantic_graph, title = 'Multi-Layered Semantic Speech Graph - Tang Lab', | |
inputs=[gr.Textbox(label='Insert speech sample:', placeholder= "Type or paste..."), gr.Radio(["Transformer_SRL", "VerbAtlas"], label='Select Semantic Role Labeling model and semantic layers:'), | |
gr.Checkbox(label='Flow of Predicates'), gr.Checkbox(label='Predication'), gr.Checkbox(label='Action'), | |
gr.Checkbox(label='Setting'), gr.Checkbox(label='Cause'), gr.Checkbox(label='Manner'), | |
gr.Checkbox(label='Negation'), gr.Checkbox(label='Orientation')], | |
examples = [['The dog is chasing the cat.', 'VerbAtlas', False, True, True, False, False, False, False, False]], | |
outputs=[ gr.Plot(label='Graph Representation', scroll_to_output=True), gr.Dataframe(label='Graph Metrics'), gr.File(label='Interactive Graph')] #['A large hall, numerous guests, whom we were receiving. Among them was Irma. I at once took her to one side, as though to answer her letter and to reproach her for not having accepted my solution yet.','Transformer_SRL', False, True, True, True, False, False, False, False]], | |
) | |
demo.launch() #share=True |