healthsea-demo / support_functions.py
thomashacker
Improve graph viz
5511f86
raw
history blame
11.6 kB
import pandas as pd
import difflib
from spacy.tokens import Doc
import numpy as np
from numpy import dot
from numpy.linalg import norm
from pyvis.network import Network
import streamlit.components.v1 as components
class HealthseaSearch:
def __init__(self, _health_aspects, _products, _conditions, _benefits):
self.health_aspects = _health_aspects
self.products = _products
self.conditions = _conditions
self.benefits = _benefits
def __call__(self, query):
return query
# Load product meta
def get_products(self, _aspect, n):
product_list = []
product_ids = {}
_n = n
_aspect = _aspect.replace(" ", "_")
if _aspect in self.health_aspects:
aspect = self.health_aspects[_aspect]
else:
_aspect = difflib.get_close_matches("_aspect", self.health_aspects.keys())[
0
]
aspect = self.health_aspects[_aspect]
product_scoring = aspect["products"]
if n != 0:
if n > len(product_scoring):
n = len(product_scoring)
product_scoring = aspect["products"][:n]
for product in product_scoring:
if product[1] not in product_ids:
product_list.append((product[0], self.products[product[1]], _aspect))
product_ids[product[1]] = 1
for alias in aspect["alias"]:
n = _n
_product_scoring = self.health_aspects[alias]["products"]
if n != 0:
if n > len(_product_scoring):
n = len(_product_scoring)
_product_scoring = self.health_aspects[alias]["products"][:n]
for product in _product_scoring:
if product[1] not in product_ids:
product_list.append((product[0], self.products[product[1]], alias))
product_ids[product[1]] = 1
n = _n
if len(product_list) > n and n != 0:
product_list = product_list[:n]
product_list = sorted(product_list, key=lambda tup: tup[0], reverse=True)
return product_list
# Load product meta and return as DataFrame
def get_products_df(self, _aspect, n):
product_list = self.get_products(_aspect, n)
product_data = {
"product": [],
"score": [],
"health_aspect": [],
"rating": [],
"reviews": [],
}
for product in product_list:
product_data["score"].append(product[0])
product_data["product"].append(product[1]["name"])
product_data["health_aspect"].append(product[2])
product_data["rating"].append(product[1]["rating"])
product_data["reviews"].append(product[1]["review_count"])
datatypes = {
"product": str,
"score": int,
"health_aspect": str,
"rating": str,
"reviews": int,
}
df = pd.DataFrame(data=product_data)
df = df.astype(datatypes)
return df
# Get health aspect
def get_aspect(self, _aspect):
_aspect = _aspect.replace(" ", "_")
if _aspect in self.health_aspects:
return self.health_aspects[_aspect]
else:
_aspect = difflib.get_close_matches("_aspect", self.health_aspects.keys())[
0
]
return self.health_aspects[_aspect]
# Get health aspect meta
def get_aspect_meta(self, _aspect):
_aspect = _aspect.replace(" ", "_")
if _aspect in self.conditions:
return self.conditions[_aspect]
elif _aspect in self.benefits:
return self.benefits[_aspect]
else:
_aspect = difflib.get_close_matches("_aspect", self.conditions.keys())[0]
return self.conditions[_aspect]
def pyvis(self, vectors):
net = Network(height='500px', width='700px', bgcolor="#0E1117", font_color="#ffffff")
net.barnes_hut(gravity=-2500)
net.set_edge_smooth("dynamic")
net.toggle_stabilization(False)
net.add_node(vectors[0][0], label=vectors[0][0], color="#4EA0DB", value=100, shape="circle") # node id = 1 and label = Node 1
for vector in vectors[1:]:
net.add_node(vector[0], label=vector[0], color="#FE51B9", value=70, shape="circle") # node id = 1 and label = Node 1
for i, vector in enumerate(vectors):
current_vector = vectors[i]
if i < len(vectors):
if i == 0:
for _vector in vectors[i+1:]:
sim = self.calculate_cosine_sim(current_vector[1],_vector[1])
net.add_edge(current_vector[0],_vector[0], weight=sim, value=sim, title=sim)
else:
for _vector in vectors[i+1:]:
sim = self.calculate_cosine_sim(current_vector[1],_vector[1])
net.add_edge(current_vector[0],_vector[0], weight=sim, value=sim/2, title=sim)
#for _vector in vectors[1:]:
# sim = self.calculate_cosine_sim(vectors[0][1],_vector[1])
# net.add_edge(vectors[0][0],_vector[0], weight=sim, value=sim*0.1, title=sim)
net.save_graph("viz.html")
HtmlFile = open("viz.html", 'r', encoding='utf-8')
source_code = HtmlFile.read()
components.html(source_code, height = 500, width=700)
def calculate_cosine_sim(self,a,b):
cos_sim = dot(a, b)/(norm(a)*norm(b))
return cos_sim
# Load substance meta
def get_substances(self, _aspect, n):
substance_list = []
substance_ids = {}
exclude = ["sodium", "sugar", "sugar_alcohol"]
_n = n
_aspect = _aspect.replace(" ", "_")
if _aspect in self.health_aspects:
aspect = self.health_aspects[_aspect]
else:
_aspect = difflib.get_close_matches("_aspect", self.health_aspects.keys())[
0
]
aspect = self.health_aspects[_aspect]
substance_scoring = aspect["substance"]
if n != 0:
if n > len(substance_scoring):
n = len(substance_scoring)
substance_scoring = aspect["substance"][:n]
for substance in substance_scoring:
if substance[1] in exclude:
continue
if substance[1] not in substance_ids:
substance_list.append((substance[0], substance[1], _aspect))
substance_ids[substance[1]] = 1
for alias in aspect["alias"]:
n = _n
_substance_scoring = self.health_aspects[alias]["substance"]
if n != 0:
if n > len(_substance_scoring):
n = len(_substance_scoring)
_substance_scoring = self.health_aspects[alias]["substance"][:n]
for substance in _substance_scoring:
if substance[1] in exclude:
continue
if substance[1] not in substance_ids:
substance_list.append((substance[0], substance[1], alias))
substance_ids[substance[1]] = 1
n = _n
if len(substance_list) > n and n != 0:
substance_list = substance_list[:n]
substance_list = sorted(substance_list, key=lambda tup: tup[0], reverse=True)
return substance_list
# Load substance meta and return as DataFrame
def get_substances_df(self, _aspect, n):
substance_list = self.get_substances(_aspect, n)
substance_data = {"substance": [], "score": [], "health_aspect": []}
for substance in substance_list:
substance_data["score"].append(substance[0])
substance_data["substance"].append(substance[1])
substance_data["health_aspect"].append(substance[2])
datatypes = {"substance": str, "score": int, "health_aspect": str}
df = pd.DataFrame(data=substance_data)
df = df.astype(datatypes)
return df
# Get all health aspect indices
def get_all_conditions(self):
condition_list = []
for condition_key in self.conditions:
if condition_key in self.health_aspects:
alias = len(self.health_aspects[condition_key]["alias"])
else:
alias = 0
condition_list.append((self.conditions[condition_key]["frequency"],condition_key,alias))
condition_list = sorted(condition_list, key=lambda tup: tup[0], reverse=True)
return condition_list
def get_all_conditions_df(self):
condition_list = self.get_all_conditions()[:100]
condition_data = {
"Condition": [],
"Frequency": [],
"Alias": []
}
for condition in condition_list:
condition_data["Frequency"].append(condition[0])
condition_data["Condition"].append(condition[1])
condition_data["Alias"].append(condition[2])
datatypes = {
"Frequency": int,
"Condition": str,
"Alias": int
}
df = pd.DataFrame(data=condition_data)
df = df.astype(datatypes)
return df
def get_all_benefits(self):
benefit_list = []
for benefit_key in self.benefits:
if benefit_key in self.health_aspects:
alias = len(self.health_aspects[benefit_key]["alias"])
else:
alias = 0
benefit_list.append((self.benefits[benefit_key]["frequency"],benefit_key,alias))
benefit_list = sorted(benefit_list, key=lambda tup: tup[0], reverse=True)
return benefit_list
def get_all_benefits_df(self):
benefit_list = self.get_all_benefits()[:100]
benefit_data = {
"Benefit": [],
"Frequency": [],
"Alias": []
}
for benefit in benefit_list:
benefit_data["Frequency"].append(benefit[0])
benefit_data["Benefit"].append(benefit[1])
benefit_data["Alias"].append(benefit[2])
datatypes = {
"Frequency": int,
"Benefit": str,
"Alias": int
}
df = pd.DataFrame(data=benefit_data)
df = df.astype(datatypes)
return df
class HealthseaPipe:
# Get Clauses and their predictions
def get_clauses(self, doc):
clauses = []
for clause in doc._.clauses:
words = []
spaces = []
clause_slice = doc[clause["split_indices"][0] : clause["split_indices"][1]]
if clause["has_ent"]:
for token in clause_slice:
if token.i == clause["ent_indices"][0]:
words.append(
clause["blinder"].replace(">", "").replace("<", "")
)
spaces.append(True)
elif token.i not in range(
clause["ent_indices"][0], clause["ent_indices"][1]
):
words.append(token.text)
spaces.append(token.whitespace_)
clauses.append(Doc(doc.vocab, words=words, spaces=spaces))
else:
for token in clause_slice:
words.append(token.text)
spaces.append(token.whitespace_)
clauses.append(Doc(doc.vocab, words=words, spaces=spaces))
return clauses