|
import pandas as pd |
|
import difflib |
|
|
|
import numpy as np |
|
from numpy import dot |
|
from numpy.linalg import norm |
|
|
|
from pyvis.network import Network |
|
import streamlit.components.v1 as components |
|
|
|
color_code_node = { |
|
0: '#4B9EFF', |
|
1: '#4BD4FF', |
|
2: '#3CDFCB', |
|
3: '#37DF8E', |
|
4: '#A0C159', |
|
5: '#CA804B', |
|
6: '#CA524B', |
|
7: '#CA4B97', |
|
8: '#C04BCA', |
|
9: '#5D4BCA', |
|
10: '#213ABA', |
|
11: '#0E6697', |
|
} |
|
|
|
class HealthseaSearch: |
|
def __init__(self, _health_aspects, _products, _conditions, _benefits): |
|
self.health_aspects = _health_aspects |
|
self.products = _products |
|
self.conditions = _conditions |
|
self.benefits = _benefits |
|
|
|
def __call__(self, query): |
|
return query |
|
|
|
|
|
def get_products(self, _aspect, n): |
|
product_list = [] |
|
product_ids = {} |
|
_n = n |
|
_aspect = _aspect.replace(" ", "_") |
|
if _aspect in self.health_aspects: |
|
aspect = self.health_aspects[_aspect] |
|
else: |
|
_aspect = difflib.get_close_matches("_aspect", self.health_aspects.keys())[ |
|
0 |
|
] |
|
aspect = self.health_aspects[_aspect] |
|
|
|
product_scoring = aspect["products"] |
|
if n != 0: |
|
if n > len(product_scoring): |
|
n = len(product_scoring) |
|
product_scoring = aspect["products"][:n] |
|
|
|
for product in product_scoring: |
|
if product[1] not in product_ids: |
|
product_list.append((product[0], self.products[product[1]], _aspect)) |
|
product_ids[product[1]] = 1 |
|
|
|
for alias in aspect["alias"]: |
|
n = _n |
|
_product_scoring = self.health_aspects[alias]["products"] |
|
if n != 0: |
|
if n > len(_product_scoring): |
|
n = len(_product_scoring) |
|
_product_scoring = self.health_aspects[alias]["products"][:n] |
|
|
|
for product in _product_scoring: |
|
if product[1] not in product_ids: |
|
product_list.append((product[0], self.products[product[1]], alias)) |
|
product_ids[product[1]] = 1 |
|
|
|
n = _n |
|
if len(product_list) > n and n != 0: |
|
product_list = product_list[:n] |
|
product_list = sorted(product_list, key=lambda tup: tup[0], reverse=True) |
|
|
|
return product_list |
|
|
|
|
|
def get_products_df(self, _aspect, n): |
|
product_list = self.get_products(_aspect, n) |
|
product_data = { |
|
"product": [], |
|
"score": [], |
|
"health_aspect": [], |
|
"rating": [], |
|
"reviews": [], |
|
} |
|
for product in product_list: |
|
product_data["score"].append(product[0]) |
|
product_data["product"].append(product[1]["name"]) |
|
product_data["health_aspect"].append(product[2]) |
|
product_data["rating"].append(product[1]["rating"]) |
|
product_data["reviews"].append(product[1]["review_count"]) |
|
|
|
datatypes = { |
|
"product": str, |
|
"score": int, |
|
"health_aspect": str, |
|
"rating": str, |
|
"reviews": int, |
|
} |
|
|
|
df = pd.DataFrame(data=product_data) |
|
df = df.astype(datatypes) |
|
|
|
return df |
|
|
|
|
|
def get_aspect(self, _aspect): |
|
_aspect = _aspect.replace(" ", "_") |
|
if _aspect in self.health_aspects: |
|
return self.health_aspects[_aspect] |
|
else: |
|
_aspect = difflib.get_close_matches("_aspect", self.health_aspects.keys())[ |
|
0 |
|
] |
|
return self.health_aspects[_aspect] |
|
|
|
|
|
def get_aspect_meta(self, _aspect): |
|
_aspect = _aspect.replace(" ", "_") |
|
if _aspect in self.conditions: |
|
return self.conditions[_aspect] |
|
elif _aspect in self.benefits: |
|
return self.benefits[_aspect] |
|
else: |
|
_aspect = difflib.get_close_matches("_aspect", self.conditions.keys())[0] |
|
return self.conditions[_aspect] |
|
|
|
def pyvis(self, vectors): |
|
net = Network(height='500px', width='700px', bgcolor="#0E1117", font_color="#ffffff") |
|
net.barnes_hut(gravity=-2500) |
|
net.set_edge_smooth("dynamic") |
|
net.toggle_stabilization(False) |
|
|
|
net.add_node(vectors[0][0], label=vectors[0][0], color="#4EA0DB", value=100, shape="circle") |
|
|
|
for vector in vectors[1:]: |
|
net.add_node(vector[0], label=vector[0], color="#FE51B9", value=70, shape="circle") |
|
|
|
for i, vector in enumerate(vectors): |
|
current_vector = vectors[i] |
|
if i < len(vectors): |
|
if i == 0: |
|
for _vector in vectors[i+1:]: |
|
sim = self.calculate_cosine_sim(current_vector[1],_vector[1]) |
|
net.add_edge(current_vector[0],_vector[0], weight=sim, value=sim, title=sim) |
|
else: |
|
for _vector in vectors[i+1:]: |
|
sim = self.calculate_cosine_sim(current_vector[1],_vector[1]) |
|
net.add_edge(current_vector[0],_vector[0], weight=sim, value=sim/2, title=sim) |
|
|
|
|
|
|
|
|
|
|
|
|
|
net.save_graph("viz.html") |
|
HtmlFile = open("viz.html", 'r', encoding='utf-8') |
|
source_code = HtmlFile.read() |
|
components.html(source_code, height = 500, width=700) |
|
|
|
|
|
def get_recursive_alias(self, _aspect, n, node_list, edge_list, _max): |
|
aspect = self.get_aspect(_aspect) |
|
|
|
aspect_name = aspect["name"].replace(" ","_") |
|
|
|
if aspect_name not in node_list: |
|
node_list[aspect_name] = {"level":n} |
|
|
|
aspect_alias = aspect["alias"] |
|
|
|
if len(aspect_alias) > 0 and n <= _max: |
|
for alias in aspect_alias: |
|
if alias not in node_list: |
|
edge_list.append((aspect_name,alias,n)) |
|
self.get_recursive_alias(alias, n+1, node_list, edge_list,_max) |
|
|
|
return node_list, edge_list |
|
else: |
|
return node_list, edge_list |
|
|
|
def add_to_network(self, network, node_list, edge_list): |
|
for node in node_list: |
|
value = 100-(15*node_list[node]["level"]) |
|
network.add_node(node, label=node, color=color_code_node[node_list[node]["level"]], value=value, shape="dot", title = str(node_list[node]["level"])) |
|
|
|
for edge in edge_list: |
|
value = 1-(0.15*edge[2]) |
|
network.add_edge(edge[0], edge[1], value=value) |
|
|
|
def pyvis2(self, node_list, edge_list): |
|
net = Network(height='500px', width='700px', bgcolor="#0E1117", font_color="#ffffff") |
|
net.barnes_hut(gravity=-2500-(len(node_list)*2)) |
|
net.set_edge_smooth("dynamic") |
|
|
|
self.add_to_network(net, node_list, edge_list) |
|
|
|
net.save_graph("viz.html") |
|
HtmlFile = open("viz.html", 'r', encoding='utf-8') |
|
source_code = HtmlFile.read() |
|
components.html(source_code, height = 500, width=700) |
|
|
|
def calculate_cosine_sim(self,a,b): |
|
cos_sim = dot(a, b)/(norm(a)*norm(b)) |
|
return cos_sim |
|
|
|
|
|
def get_substances(self, _aspect, n): |
|
substance_list = [] |
|
substance_ids = {} |
|
exclude = ["sodium", "sugar", "sugar_alcohol"] |
|
_n = n |
|
_aspect = _aspect.replace(" ", "_") |
|
if _aspect in self.health_aspects: |
|
aspect = self.health_aspects[_aspect] |
|
else: |
|
_aspect = difflib.get_close_matches("_aspect", self.health_aspects.keys())[ |
|
0 |
|
] |
|
aspect = self.health_aspects[_aspect] |
|
|
|
substance_scoring = aspect["substance"] |
|
if n != 0: |
|
if n > len(substance_scoring): |
|
n = len(substance_scoring) |
|
substance_scoring = aspect["substance"][:n] |
|
|
|
for substance in substance_scoring: |
|
if substance[1] in exclude: |
|
continue |
|
if substance[1] not in substance_ids: |
|
substance_list.append((substance[0], substance[1], _aspect)) |
|
substance_ids[substance[1]] = 1 |
|
|
|
for alias in aspect["alias"]: |
|
n = _n |
|
_substance_scoring = self.health_aspects[alias]["substance"] |
|
if n != 0: |
|
if n > len(_substance_scoring): |
|
n = len(_substance_scoring) |
|
_substance_scoring = self.health_aspects[alias]["substance"][:n] |
|
|
|
for substance in _substance_scoring: |
|
if substance[1] in exclude: |
|
continue |
|
if substance[1] not in substance_ids: |
|
substance_list.append((substance[0], substance[1], alias)) |
|
substance_ids[substance[1]] = 1 |
|
|
|
n = _n |
|
if len(substance_list) > n and n != 0: |
|
substance_list = substance_list[:n] |
|
substance_list = sorted(substance_list, key=lambda tup: tup[0], reverse=True) |
|
|
|
return substance_list |
|
|
|
|
|
def get_substances_df(self, _aspect, n): |
|
substance_list = self.get_substances(_aspect, n) |
|
substance_data = {"substance": [], "score": [], "health_aspect": []} |
|
for substance in substance_list: |
|
substance_data["score"].append(substance[0]) |
|
substance_data["substance"].append(substance[1]) |
|
substance_data["health_aspect"].append(substance[2]) |
|
|
|
datatypes = {"substance": str, "score": int, "health_aspect": str} |
|
|
|
df = pd.DataFrame(data=substance_data) |
|
df = df.astype(datatypes) |
|
|
|
return df |
|
|
|
|
|
def get_all_conditions(self): |
|
condition_list = [] |
|
for condition_key in self.conditions: |
|
if condition_key in self.health_aspects: |
|
alias = len(self.health_aspects[condition_key]["alias"]) |
|
else: |
|
alias = 0 |
|
condition_list.append((self.conditions[condition_key]["frequency"],condition_key,alias)) |
|
|
|
condition_list = sorted(condition_list, key=lambda tup: tup[0], reverse=True) |
|
return condition_list |
|
|
|
def get_all_conditions_df(self): |
|
condition_list = self.get_all_conditions()[:100] |
|
condition_data = { |
|
"Condition": [], |
|
"Frequency": [], |
|
"Alias": [] |
|
} |
|
for condition in condition_list: |
|
condition_data["Frequency"].append(condition[0]) |
|
condition_data["Condition"].append(condition[1]) |
|
condition_data["Alias"].append(condition[2]) |
|
|
|
datatypes = { |
|
"Frequency": int, |
|
"Condition": str, |
|
"Alias": int |
|
} |
|
|
|
df = pd.DataFrame(data=condition_data) |
|
df = df.astype(datatypes) |
|
|
|
return df |
|
|
|
def get_all_benefits(self): |
|
benefit_list = [] |
|
for benefit_key in self.benefits: |
|
if benefit_key in self.health_aspects: |
|
alias = len(self.health_aspects[benefit_key]["alias"]) |
|
else: |
|
alias = 0 |
|
benefit_list.append((self.benefits[benefit_key]["frequency"],benefit_key,alias)) |
|
|
|
benefit_list = sorted(benefit_list, key=lambda tup: tup[0], reverse=True) |
|
return benefit_list |
|
|
|
def get_all_benefits_df(self): |
|
benefit_list = self.get_all_benefits()[:100] |
|
benefit_data = { |
|
"Benefit": [], |
|
"Frequency": [], |
|
"Alias": [] |
|
} |
|
for benefit in benefit_list: |
|
benefit_data["Frequency"].append(benefit[0]) |
|
benefit_data["Benefit"].append(benefit[1]) |
|
benefit_data["Alias"].append(benefit[2]) |
|
|
|
|
|
datatypes = { |
|
"Frequency": int, |
|
"Benefit": str, |
|
"Alias": int |
|
} |
|
|
|
df = pd.DataFrame(data=benefit_data) |
|
df = df.astype(datatypes) |
|
|
|
return df |
|
|