Spaces:
Sleeping
Sleeping
import time | |
import urllib | |
import spacy | |
import pandas as pd | |
import unicodedata | |
import requests | |
import json | |
import os | |
import tiktoken | |
from bs4 import BeautifulSoup | |
from openai import OpenAI | |
from langchain.document_loaders import DataFrameLoader | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.text_splitter import CharacterTextSplitter | |
# from langchain.vectorstores.deeplake import DeepLake | |
from langchain.prompts import ChatPromptTemplate | |
from langchain.chat_models import ChatOpenAI | |
from langchain.schema.output_parser import StrOutputParser | |
from langchain.schema.runnable import RunnableParallel | |
from urllib.parse import quote | |
from urllib.request import Request | |
class MLSalesPitch: | |
def __init__(self): | |
self.retriever_sales_pitch = None | |
self.retriever_about = None | |
self.TOKEN_ML = None#os.environ['TOKEN_ML'] | |
self.OPENAI_API_KEY = os.environ['OPENAI_KEY'] | |
self.client = OpenAI() | |
self.nlp = None#spacy.load("pt_core_news_sm") | |
OpenAI.api_key = self.OPENAI_API_KEY | |
self.output_parser = StrOutputParser() | |
self.model = ChatOpenAI(openai_api_key=self.OPENAI_API_KEY, model="gpt-3.5-turbo") | |
template = """Com base nas seguintes informações de produtos fornecidas abaixo: | |
{about} | |
Crie um discurso muito convincente e interessante de venda para os seguintes produtos: | |
{products} | |
Que fazem parte da sub_categoria: | |
{sub_category} | |
Pontue bem as vantagens dos produtos e suas caracteristicas bem como a grande oportunidade que o cliente está tendo em adquiri-los | |
Tenha como base os seguintes discursos de venda: | |
{sales_pitch} | |
Não fique preso apenas um discurso de venda. Leve mais em consideração a construção dos discrusos bem como as vantagens, caracterisitcas e descrição dos produtos. | |
Adicione preços para os produtos como variáveis com prefixo _PRECO_ | |
Não se identifique e não coloque o nome da empresa | |
""" | |
self.prompt = ChatPromptTemplate.from_template(template) | |
def get_ml_product_descriptions(self): | |
f = open('/data/ml_categories.json', 'r') | |
categories_json = json.load(f) | |
df = pd.read_csv('/data/mercado_livre_products.csv') | |
for item in categories_json: | |
category = {'name': categories_json[item]['name'], 'id': categories_json[item]['id']} | |
for sub_category in categories_json[item]['children_categories']: | |
offset = 0 | |
limit = 50 | |
while offset < 1000: | |
headers = {'Authorization': f'Bearer {self.TOKEN_ML}'} | |
ans = \ | |
requests.get(f"https://api.mercadolibre.com/sites/MLB/search?category=" | |
f"{sub_category['id']}&search_type=scan&offset={offset}&limit=50", | |
headers=headers) | |
if ans.ok: | |
data_ans = ans.json() | |
print( | |
f"[{sub_category['name']}]: {100.0 * (offset / int(data_ans['paging']['total']))}" | |
f" downloaded...") | |
if len(data_ans['results']) == 0: | |
break | |
lt_prod_info = [{'id': info['id'], 'title': info['title']} for info in data_ans['results']] | |
for info in lt_prod_info: | |
resp = requests.get(f"https://api.mercadolibre.com/items/{info['id']}/description", | |
headers=headers) | |
if resp.ok: | |
data_resp = resp.json() | |
if 'plain_text' in data_resp: | |
info['description'] = data_resp['plain_text'] | |
df_tmp = pd.DataFrame.from_dict(lt_prod_info) | |
df_tmp['category'] = category['name'] | |
df_tmp['sub_category'] = sub_category['name'] | |
df: pd.DataFrame = pd.concat([df, df_tmp]) | |
df.to_csv('/data/mercado_livre_products.csv', header=True, index=False) | |
else: | |
print(f'FAIL! Error {ans.status_code}: {ans.content}') | |
offset = offset + limit + 1 | |
def enrich_with_google_search(): | |
df = pd.read_csv('/data/mercado_livre_products.csv', low_memory=False) | |
df.to_csv('/data/mercado_livre_products_enriched_with_google_about.csv', header=True, index=False) | |
p_names = list(set(df['title'].to_list())) | |
map_name = {} | |
k = 0 | |
for name in p_names: | |
url = 'https://www.google.com/search?q=' + quote('sobre ou descrição: ' + name) | |
req = Request(url, headers={'User-Agent': 'Mozilla/5.0'}) | |
lt_text = [] | |
try: | |
response = urllib.request.urlopen(req) | |
if response.code == 429: | |
print(f'Sleeping {int(response.headers["Retry-After"])} seconds...') | |
time.sleep(int(response.headers["Retry-After"])) | |
content = response.read().decode('UTF-8').replace(u'\xa0', u' ') | |
soup = BeautifulSoup(content, 'html.parser') | |
div_bs4 = soup.find_all('div', {"class": "BNeawe s3v9rd AP7Wnd"}) | |
lt_text = [p.get_text() for p in div_bs4] | |
except Exception as error: | |
print(error) | |
map_name[name] = ', '.join(lt_text) | |
df2 = pd.read_csv('/data/mercado_livre_products_enriched_with_google_about.csv', low_memory=False) | |
df2['about'] = df2[['title', 'about']].apply( | |
lambda x: map_name[x[0]] if ((x[0] in map_name.keys()) and (x[1] is None)) else x[1], axis=1) | |
df2.to_csv('/data/mercado_livre_products_enriched_with_google_about.csv', header=True, index=False) | |
k = k + 1 | |
print(f'[{k} of {len(p_names)}]: {(k / len(p_names)) * 100.0}% completed') | |
time.sleep(1) | |
def cleans_and_preprocesses_the_data(self) -> pd.DataFrame: | |
df_ml = pd.read_csv('/data/mercado_livre_products_enriched_with_google_about.csv', low_memory=False) | |
df_ml = df_ml[~(df_ml['description'].isna() | df_ml['about'].isna())] | |
df_ml['description'] = df_ml['description'].apply(lambda x: self.__clean_txt(x)) | |
df_ml['description'] = df_ml['description'].apply(lambda x: x[0: self.__find_best_position_to_cut(x) + 1]) | |
df_ml['about'] = df_ml['about'].apply(lambda x: x[0: self.__find_best_position_to_cut(x) + 1]) | |
df_ml['size'] = df_ml['description'].apply(lambda x: self.__count_tokens(x)) | |
df_ml = df_ml.sort_values(by=['size'], ascending=False) | |
df_ml = df_ml.reset_index(drop=True) | |
df_ml['sales_pitch'] = df_ml[['title', 'sub_category', 'description']].apply( | |
lambda x: f'Nome do produto:{x[0]}\nCategoria do produto:{x[1]}\nSugestão de como vender:{x[2]}', axis=1) | |
df_ml['about'] = df_ml[['title', 'sub_category', 'about']].apply( | |
lambda x: f'Nome do produto:{x[0]}\nCategoria do produto:{x[1]}\nSobre o produto:{x[2]}', axis=1) | |
# df_ml['sales_pitch'] = df_ml['sales_pitch'].apply(lambda x: self.__chat_gpt_summarize(x)) | |
# map_about = {} | |
# lt_about = list(df_ml[['id', 'about']].apply(lambda x: {'id': x[0], 'about': x[1]}, axis=1).to_list()) | |
# for about in lt_about: | |
# map_about[about['id']] = self.__chat_gpt_summarize(about['about']) | |
# df_ml['about'] = df_ml['about'].map(map_about) | |
df_ml = df_ml[['title', 'category', 'sub_category', 'sales_pitch', 'about']] | |
return df_ml | |
def embedding(self, df_ml: pd.DataFrame = pd.DataFrame(), add_docs=False): | |
if add_docs: | |
loader_sales_pitch = DataFrameLoader(df_ml, page_content_column="sales_pitch") | |
documents_sales_pitch = loader_sales_pitch.load() | |
documents_sales_pitch.extend(loader_sales_pitch.load_and_split()) | |
loader_about = DataFrameLoader(df_ml, page_content_column="about") | |
documents_about = loader_about.load() | |
documents_about.extend(loader_about.load_and_split()) | |
text_splitter = CharacterTextSplitter(chunk_size=2000, separator='\n', chunk_overlap=0) | |
docs_sales_pitch = text_splitter.split_documents(documents_sales_pitch) | |
docs_about = text_splitter.split_documents(documents_sales_pitch) | |
else: | |
docs_sales_pitch = None | |
docs_about = None | |
embeddings = HuggingFaceEmbeddings() | |
from langchain.vectorstores.deeplake import DeepLake | |
vector_store_sales_pitch = \ | |
DeepLake(dataset_path="data/my_deeplake/sales_pitch/", embedding_function=embeddings, read_only=True) | |
vector_store_about = \ | |
DeepLake(dataset_path="data/my_deeplake/about/", embedding_function=embeddings, read_only=True) | |
if add_docs: | |
vector_store_sales_pitch.add_documents(docs_sales_pitch) | |
vector_store_about.add_documents(docs_about) | |
self.retriever_sales_pitch = vector_store_sales_pitch.as_retriever() | |
self.retriever_about = vector_store_about.as_retriever() | |
def generate_sales_pitch(self, query: dict) -> str: | |
chain = RunnableParallel({ | |
"sales_pitch": lambda x: self.retriever_sales_pitch.get_relevant_documents(x["products"])[0:1], | |
"about": lambda x: self.retriever_about.get_relevant_documents(x["products"])[0:1], | |
"products": lambda x: x["products"], | |
"sub_category": lambda x: x["sub_category"] | |
}) | self.prompt | self.model | self.output_parser | |
return chain.invoke(query) | |
def __count_tokens(text): | |
encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") | |
return len(encoding.encode(text)) | |
def __text_to_chunks(self, text): | |
chunks = [[]] | |
chunk_total_words = 0 | |
sentences = self.nlp(text) | |
for sentence in sentences.sents: | |
chunk_total_words += len(sentence.text.split(" ")) | |
if chunk_total_words > 2000: | |
chunks.append([]) | |
chunk_total_words = len(sentence.text.split(" ")) | |
chunks[len(chunks) - 1].append(sentence.text) | |
return chunks | |
def __chat_gpt_summarize(self, text): | |
prompt = f"Resuma o seguinte texto em no máximo 5 frases:\n{text}" | |
response = self.client.completions.create( | |
model="gpt-3.5-turbo-instruct", | |
prompt=prompt, | |
temperature=0.3, | |
max_tokens=150, | |
top_p=1, | |
frequency_penalty=0, | |
presence_penalty=1 | |
) | |
return response.choices[0].text | |
def __summarize_text(self, text): | |
chunks = self.__text_to_chunks(text) | |
chunk_summaries = [] | |
for chunk in chunks: | |
chunk_summary = self.__chat_gpt_summarize(" ".join(chunk)) | |
chunk_summaries.append(chunk_summary) | |
break | |
summary = " ".join(chunk_summaries) | |
return summary | |
def __find_best_position_to_cut(self, text): | |
lo = 0 | |
hi = len(text) - 1 | |
mid = hi | |
while lo <= hi: | |
mid = (lo + hi) >> 1 | |
if self.__count_tokens(text[0:mid]) >= 1000: | |
hi = mid - 1 | |
else: | |
lo = mid + 1 | |
return mid | |
def __clean_txt(txt): | |
while txt.find('\n\n') != -1: | |
txt = txt.replace('\n\n', '\n') | |
while txt.find('--') != -1: | |
txt = txt.replace('--', '-') | |
while txt.find(' ') != -1: | |
txt = txt.replace(' ', ' ') | |
while txt.find('__') != -1: | |
txt = txt.replace('__', '_') | |
while txt.find('\n_\n') != -1: | |
txt = txt.replace('\n_\n', '\n') | |
while txt.find('\n \n') != -1: | |
txt = txt.replace('\n \n', '\n') | |
return txt | |