Spaces:
Runtime error
Runtime error
import pandas as pd | |
import difflib | |
from spacy.tokens import Doc | |
import numpy as np | |
from numpy import dot | |
from numpy.linalg import norm | |
from pyvis.network import Network | |
import streamlit.components.v1 as components | |
class HealthseaSearch: | |
def __init__(self, _health_aspects, _products, _conditions, _benefits): | |
self.health_aspects = _health_aspects | |
self.products = _products | |
self.conditions = _conditions | |
self.benefits = _benefits | |
def __call__(self, query): | |
return query | |
# Load product meta | |
def get_products(self, _aspect, n): | |
product_list = [] | |
product_ids = {} | |
_n = n | |
_aspect = _aspect.replace(" ", "_") | |
if _aspect in self.health_aspects: | |
aspect = self.health_aspects[_aspect] | |
else: | |
_aspect = difflib.get_close_matches("_aspect", self.health_aspects.keys())[ | |
0 | |
] | |
aspect = self.health_aspects[_aspect] | |
product_scoring = aspect["products"] | |
if n != 0: | |
if n > len(product_scoring): | |
n = len(product_scoring) | |
product_scoring = aspect["products"][:n] | |
for product in product_scoring: | |
if product[1] not in product_ids: | |
product_list.append((product[0], self.products[product[1]], _aspect)) | |
product_ids[product[1]] = 1 | |
for alias in aspect["alias"]: | |
n = _n | |
_product_scoring = self.health_aspects[alias]["products"] | |
if n != 0: | |
if n > len(_product_scoring): | |
n = len(_product_scoring) | |
_product_scoring = self.health_aspects[alias]["products"][:n] | |
for product in _product_scoring: | |
if product[1] not in product_ids: | |
product_list.append((product[0], self.products[product[1]], alias)) | |
product_ids[product[1]] = 1 | |
n = _n | |
if len(product_list) > n and n != 0: | |
product_list = product_list[:n] | |
product_list = sorted(product_list, key=lambda tup: tup[0], reverse=True) | |
return product_list | |
# Load product meta and return as DataFrame | |
def get_products_df(self, _aspect, n): | |
product_list = self.get_products(_aspect, n) | |
product_data = { | |
"product": [], | |
"score": [], | |
"health_aspect": [], | |
"rating": [], | |
"reviews": [], | |
} | |
for product in product_list: | |
product_data["score"].append(product[0]) | |
product_data["product"].append(product[1]["name"]) | |
product_data["health_aspect"].append(product[2]) | |
product_data["rating"].append(product[1]["rating"]) | |
product_data["reviews"].append(product[1]["review_count"]) | |
datatypes = { | |
"product": str, | |
"score": int, | |
"health_aspect": str, | |
"rating": str, | |
"reviews": int, | |
} | |
df = pd.DataFrame(data=product_data) | |
df = df.astype(datatypes) | |
return df | |
# Get health aspect | |
def get_aspect(self, _aspect): | |
_aspect = _aspect.replace(" ", "_") | |
if _aspect in self.health_aspects: | |
return self.health_aspects[_aspect] | |
else: | |
_aspect = difflib.get_close_matches("_aspect", self.health_aspects.keys())[ | |
0 | |
] | |
return self.health_aspects[_aspect] | |
# Get health aspect meta | |
def get_aspect_meta(self, _aspect): | |
_aspect = _aspect.replace(" ", "_") | |
if _aspect in self.conditions: | |
return self.conditions[_aspect] | |
elif _aspect in self.benefits: | |
return self.benefits[_aspect] | |
else: | |
_aspect = difflib.get_close_matches("_aspect", self.conditions.keys())[0] | |
return self.conditions[_aspect] | |
def pyvis(self, vectors): | |
net = Network(height='500px', width='700px', bgcolor="#0E1117", font_color="#ffffff") | |
net.barnes_hut(gravity=-2500) | |
net.set_edge_smooth("dynamic") | |
net.toggle_stabilization(False) | |
net.add_node(vectors[0][0], label=vectors[0][0], color="#4EA0DB", value=100, shape="circle") # node id = 1 and label = Node 1 | |
for vector in vectors[1:]: | |
net.add_node(vector[0], label=vector[0], color="#FE51B9", value=70, shape="circle") # node id = 1 and label = Node 1 | |
for i, vector in enumerate(vectors): | |
current_vector = vectors[i] | |
if i < len(vectors): | |
if i == 0: | |
for _vector in vectors[i+1:]: | |
sim = self.calculate_cosine_sim(current_vector[1],_vector[1]) | |
net.add_edge(current_vector[0],_vector[0], weight=sim, value=sim, title=sim) | |
else: | |
for _vector in vectors[i+1:]: | |
sim = self.calculate_cosine_sim(current_vector[1],_vector[1]) | |
net.add_edge(current_vector[0],_vector[0], weight=sim, value=sim/2, title=sim) | |
#for _vector in vectors[1:]: | |
# sim = self.calculate_cosine_sim(vectors[0][1],_vector[1]) | |
# net.add_edge(vectors[0][0],_vector[0], weight=sim, value=sim*0.1, title=sim) | |
net.save_graph("viz.html") | |
HtmlFile = open("viz.html", 'r', encoding='utf-8') | |
source_code = HtmlFile.read() | |
components.html(source_code, height = 500, width=700) | |
def calculate_cosine_sim(self,a,b): | |
cos_sim = dot(a, b)/(norm(a)*norm(b)) | |
return cos_sim | |
# Load substance meta | |
def get_substances(self, _aspect, n): | |
substance_list = [] | |
substance_ids = {} | |
exclude = ["sodium", "sugar", "sugar_alcohol"] | |
_n = n | |
_aspect = _aspect.replace(" ", "_") | |
if _aspect in self.health_aspects: | |
aspect = self.health_aspects[_aspect] | |
else: | |
_aspect = difflib.get_close_matches("_aspect", self.health_aspects.keys())[ | |
0 | |
] | |
aspect = self.health_aspects[_aspect] | |
substance_scoring = aspect["substance"] | |
if n != 0: | |
if n > len(substance_scoring): | |
n = len(substance_scoring) | |
substance_scoring = aspect["substance"][:n] | |
for substance in substance_scoring: | |
if substance[1] in exclude: | |
continue | |
if substance[1] not in substance_ids: | |
substance_list.append((substance[0], substance[1], _aspect)) | |
substance_ids[substance[1]] = 1 | |
for alias in aspect["alias"]: | |
n = _n | |
_substance_scoring = self.health_aspects[alias]["substance"] | |
if n != 0: | |
if n > len(_substance_scoring): | |
n = len(_substance_scoring) | |
_substance_scoring = self.health_aspects[alias]["substance"][:n] | |
for substance in _substance_scoring: | |
if substance[1] in exclude: | |
continue | |
if substance[1] not in substance_ids: | |
substance_list.append((substance[0], substance[1], alias)) | |
substance_ids[substance[1]] = 1 | |
n = _n | |
if len(substance_list) > n and n != 0: | |
substance_list = substance_list[:n] | |
substance_list = sorted(substance_list, key=lambda tup: tup[0], reverse=True) | |
return substance_list | |
# Load substance meta and return as DataFrame | |
def get_substances_df(self, _aspect, n): | |
substance_list = self.get_substances(_aspect, n) | |
substance_data = {"substance": [], "score": [], "health_aspect": []} | |
for substance in substance_list: | |
substance_data["score"].append(substance[0]) | |
substance_data["substance"].append(substance[1]) | |
substance_data["health_aspect"].append(substance[2]) | |
datatypes = {"substance": str, "score": int, "health_aspect": str} | |
df = pd.DataFrame(data=substance_data) | |
df = df.astype(datatypes) | |
return df | |
# Get all health aspect indices | |
def get_all_conditions(self): | |
condition_list = [] | |
for condition_key in self.conditions: | |
if condition_key in self.health_aspects: | |
alias = len(self.health_aspects[condition_key]["alias"]) | |
else: | |
alias = 0 | |
condition_list.append((self.conditions[condition_key]["frequency"],condition_key,alias)) | |
condition_list = sorted(condition_list, key=lambda tup: tup[0], reverse=True) | |
return condition_list | |
def get_all_conditions_df(self): | |
condition_list = self.get_all_conditions()[:100] | |
condition_data = { | |
"Condition": [], | |
"Frequency": [], | |
"Alias": [] | |
} | |
for condition in condition_list: | |
condition_data["Frequency"].append(condition[0]) | |
condition_data["Condition"].append(condition[1]) | |
condition_data["Alias"].append(condition[2]) | |
datatypes = { | |
"Frequency": int, | |
"Condition": str, | |
"Alias": int | |
} | |
df = pd.DataFrame(data=condition_data) | |
df = df.astype(datatypes) | |
return df | |
def get_all_benefits(self): | |
benefit_list = [] | |
for benefit_key in self.benefits: | |
if benefit_key in self.health_aspects: | |
alias = len(self.health_aspects[benefit_key]["alias"]) | |
else: | |
alias = 0 | |
benefit_list.append((self.benefits[benefit_key]["frequency"],benefit_key,alias)) | |
benefit_list = sorted(benefit_list, key=lambda tup: tup[0], reverse=True) | |
return benefit_list | |
def get_all_benefits_df(self): | |
benefit_list = self.get_all_benefits()[:100] | |
benefit_data = { | |
"Benefit": [], | |
"Frequency": [], | |
"Alias": [] | |
} | |
for benefit in benefit_list: | |
benefit_data["Frequency"].append(benefit[0]) | |
benefit_data["Benefit"].append(benefit[1]) | |
benefit_data["Alias"].append(benefit[2]) | |
datatypes = { | |
"Frequency": int, | |
"Benefit": str, | |
"Alias": int | |
} | |
df = pd.DataFrame(data=benefit_data) | |
df = df.astype(datatypes) | |
return df | |
class HealthseaPipe: | |
# Get Clauses and their predictions | |
def get_clauses(self, doc): | |
clauses = [] | |
for clause in doc._.clauses: | |
words = [] | |
spaces = [] | |
clause_slice = doc[clause["split_indices"][0] : clause["split_indices"][1]] | |
if clause["has_ent"]: | |
for token in clause_slice: | |
if token.i == clause["ent_indices"][0]: | |
words.append( | |
clause["blinder"].replace(">", "").replace("<", "") | |
) | |
spaces.append(True) | |
elif token.i not in range( | |
clause["ent_indices"][0], clause["ent_indices"][1] | |
): | |
words.append(token.text) | |
spaces.append(token.whitespace_) | |
clauses.append(Doc(doc.vocab, words=words, spaces=spaces)) | |
else: | |
for token in clause_slice: | |
words.append(token.text) | |
spaces.append(token.whitespace_) | |
clauses.append(Doc(doc.vocab, words=words, spaces=spaces)) | |
return clauses | |