import os import time import json import joblib import math import itertools import argparse import multiprocessing as mp import pandas as pd from dotenv import load_dotenv from serpapi import GoogleSearch import tiktoken from openai import OpenAI from tqdm import tqdm from model import llm from utils import parse_json_garbage load_dotenv() ORGANIZATION_ID = os.getenv('OPENAI_ORGANIZATION_ID') SERP_API_KEY = os.getenv('SERP_APIKEY') def get_leads( file_path: str, names: list = ['營業地址', '統一編號', '總機構統一編號', '營業人名稱', '資本額', '設立日期', '組織別名稱', '使用統一發票', '行業代號', '名稱', '行業代號1', '名稱1', '行業代號2', '名稱2', '行業代號3', '名稱3']): """ """ assert os.path.exists(file_path) data = pd.read_csv( file_path, names=names) return data def get_serp( query: str, google_domain: str, gl: str, lr: str) -> dict: """ """ results = [] search = GoogleSearch({ "q": query, 'google_domain': google_domain, 'gl': gl, 'lr': lr, "api_key": SERP_API_KEY }) result = search.get_dict() # print(result['organic_results'][0]) # return result['organic_results'][0] return result def get_condensed_result(result): """ Argument result Return condensed_result: Example: result['knowledge_graph'].keys() # 'title', 'thumbnail', 'type', 'entity_type', 'kgmid', 'knowledge_graph_search_link', 'serpapi_knowledge_graph_search_link', 'tabs', 'place_id', 'directions', 'local_map', 'rating', 'review_count', '服務項目', '地址', '地址_links', 'raw_hours', 'hours', '電話號碼', '電話號碼_links', 'popular_times', 'user_reviews', 'reviews_from_the_web', 'unclaimed_listing', '個人資料', '其他人也搜尋了以下項目', '其他人也搜尋了以下項目_link', '其他人也搜尋了以下項目_stick' """ filtered_results = [ {"title": r.get('title',""), 'snippet': r.get('snippet',"")} for r in result['organic_results'] ] if 'knowledge_graph' in result: if 'user_reviews' in result['knowledge_graph']: filtered_results.append( {'title': result['knowledge_graph']['title'], '顧客評價': "\t".join([ _.get('summary', '') for _ in result['knowledge_graph']['user_reviews']]) }) if '其他人也搜尋了以下項目' in result['knowledge_graph']: filtered_results.append( {'title': "類似的店", 'snippet': "\t".join([ str(_.get('extensions', '')) for _ in result['knowledge_graph']['其他人也搜尋了以下項目']]) }) if '暫停營業' in result['knowledge_graph']: filtered_results.append( {'status': '暫停營業' if result['knowledge_graph']['暫停營業'] else '營業中'}) if '電話號碼' in result['knowledge_graph']: filtered_results.append( {'telephone_number': result['knowledge_graph']['電話號碼']}) condensed_result = json.dumps(filtered_results, ensure_ascii=False) # print( condensed_results ) return condensed_result def compose_extraction( query, search_results, classes: list, provider: str, model: str): """ Argument query: str search_results: str system_prompt: str classes: list, `小吃店`, `日式料理(含居酒屋,串燒)`, `火(鍋/爐)`, `東南亞料理(不含日韓)`, `海鮮熱炒`, `特色餐廳(含雞、鵝、牛、羊肉)`, `傳統餐廳`, `燒烤`, `韓式料理(含火鍋,烤肉)`, `西餐廳(含美式,義式,墨式)`, `西餐廳(餐酒館、酒吧、飛鏢吧、pub、lounge bar)`, `西餐廳(土耳其、漢堡、薯條、法式、歐式、印度)` or `早餐` provider: "openai" model: "gpt-4-0125-preview" or 'gpt-3.5-turbo-0125' Return response: str """ classes = ", ".join([ "`"+x+"`" for x in classes if x!='早餐' ])+ " or " + "`早餐`" system_prompt = f''' As a helpful and rigorous retail analyst, given the provided query and a list of search results for the query, your task is to first identify relevant information of the identical store based on store name and proxmity of address if known. After that, extract `store_name`, `address`, `description`, `category` and `phone_number` from the found relevant information, where `category` can only be {classes}. It's very important to omit unrelated results. Do not make up any assumption. Please think step by step, and output in json format. An example output json is like {{"store_name": "...", "address": "...", "description": "... products, service or highlights ...", "category": "...", "phone_number": "..."}} If no relevant information has been found, simply output json with empty values. I'll tip you and guarantee a place in heaven you do a great job completely according to my instruction. ''' user_content = f"`query`: `{query}`\n`search_results`: {search_results}" response = llm( provider = provider, model = model, system_prompt = system_prompt, user_content = user_content ) return response def compose_classication( user_content, classes: list, backup_classes: list, provider: str, model: str) -> str: """ Argument client: evidence: str classes: list provider: e.g. 'openai' model: e.g. 'gpt-3.5-turbo-0125', 'gpt-4-0125-preview' Return response: str """ if isinstance(classes, list): classes = ", ".join([ f"`{x}`" for x in classes]) elif isinstance(classes, str): pass else: raise Exception(f"Incorrect classes type: {type(classes)}") system_prompt = f""" As a helpful and rigorous retail analyst, given the provided information about a store, your task is two-fold. First, classify provided evidence below into the mostly relevant category from the following: {classes}. Second, if no relevant information has been found, classify the evidence into the mostly relevant supercategory from the following: {backup_classes}. It's very important to omit unrelated piece of evidence and don't make up any assumption. Please think step by step, and must output in json format. An example output json is like {{"category": "..."}} If no relevant piece of information can ever be found at all, simply output json with empty string "". I'll tip you and guarantee a place in heaven you do a great job completely according to my instruction. """ response = llm( provider = provider, model = model, system_prompt = system_prompt, user_content = user_content, ) return response def classify_results( analysis_results: pd.DataFrame, classes: list, backup_classes: list, provider: str, model: str, input_column: str = 'evidence', output_column: str = 'classified_category', ): """Classify the results Argument analysis_results: dataframe input_column: str output_column: str classes: list Return analysis_results: dataframe """ classified_results = analysis_results.copy() labels, empty_indices = [], [] for idx, evidence in zip( analysis_results['index'], analysis_results[input_column]): try: user_content = f'''`evidence`: `{evidence}`''' pred_cls = compose_classication( user_content, classes=classes, backup_classes=backup_classes, provider=provider, model=model) label = parse_json_garbage(pred_cls)['category'] labels.append(label) except Exception as e: print(f"# CLASSIFICATION error: e -> {e}, user_content -> {user_content}, evidence: {evidence}") labels.append("") empty_indices.append(idx) classified_results[output_column] = labels return { "classified_results": classified_results, "empty_indices": empty_indices } def classify_results_mp( extracted_results: pd.DataFrame, classified_file_path: str, classes: list, backup_classes: list, provider: str, model: str, n_processes: int = 4): """ Argument extracted_results: classified_file_path: classes: e.g. ['小吃店', '日式料理(含居酒屋,串燒)', '火(鍋/爐)', '東南亞料理(不含日韓)', '海鮮熱炒', '特色餐廳(含雞、鵝、牛、羊肉)', '傳統餐廳', '燒烤', '韓式料理(含火鍋,烤肉)', '西餐廳(含美式,義式,墨式)'] backup_classes: e.g. [ '中式', '西式'] provider: model: n_processes: int Return classified_results: dataframe Reference 200 records, 4 processes, 122.4695s """ st = time.time() # classified_file_path = "data/classified_result.joblib" if not os.path.exists(classified_file_path): split_data = split_dataframe(extracted_results) with mp.Pool(args.n_processes) as pool: classified_results = pool.starmap( classify_results, [ ( d, classes, backup_classes, provider, model, 'evidence', 'classified_category', ) for d in split_data] ) classified_results = merge_results( classified_results, dataframe_columns=['classified_results'], list_columns=['empty_indices']) with open( classified_file_path, "wb") as f: joblib.dump( classified_results, f) else: with open( classified_file_path, "rb") as f: classified_results = joblib.load(f) print( f"total time: {time.time() - st}") return classified_results def compose_query( address, name, with_index: bool = True, exclude: str = "-inurl:twincn.com -inurl:findcompany.com.tw -inurl:iyp.com.tw -inurl:twypage.com -inurl:alltwcompany.com -inurl:zhupiter.com -inurl:twinc.com.tw"): """ Argumemnt # d: series with d[1]: 地址, d[4]: 營業人名稱 # address: str name: str with_index: bool Return query: `縣市` `營業人名稱` """ # if with_index: # .itertuples() # query = f"{d[1][:3]} {d[4]}" # else: # query = f"{d[0][:3]} {d[3]}" query = f"{address[:3]} {name} {exclude}" return query def crawl_results( data: pd.DataFrame, google_domain: str = 'google.com.tw', gl: str = 'tw', lr: str = 'lang_zh-TW'): """ Argument data: dataframe google_domain: str gl: str lr: str Return crawled_results Reference 200 records, 4 processes, 171.36490321159363 """ serp_results = [] condensed_results = [] crawled_results = [] empty_indices = [] for i, d in tqdm(enumerate(data.itertuples())): idx = d[0] address = d[1] business_id = d[2] business_name = d[4] query = compose_query(address, business_name) try: res = get_serp( query, google_domain, gl, lr) serp_results.append(res) except: print( f"# SERP error: i = {i}, idx = {idx}, query = {query}") empty_indices.append(i) continue try: cond_res = get_condensed_result(res) condensed_results.append(cond_res) except: print(f"# CONDENSE error: i = {i}, idx = {idx}, res = {res}") empty_indices.append(i) continue crawled_results.append( { "index": idx, "business_id": business_id, "business_name": business_name, "serp": res, "evidence": cond_res, "address": address, } ) crawled_results = pd.DataFrame(crawled_results) return { "crawled_results": crawled_results, "empty_indices": empty_indices } def crawl_results_mp( data: pd.DataFrame, crawl_file_path: str, n_processes: int = 4): st = time.time() # crawl_file_path = "data/crawled_results.joblib" if not os.path.exists(crawl_file_path): split_data = split_dataframe( data ) with mp.Pool(n_processes) as pool: crawled_results = pool.map( crawl_results, split_data) crawled_results = merge_results( crawled_results, dataframe_columns=['crawled_results'], list_columns=['empty_indices']) with open( crawl_file_path, "wb") as f: joblib.dump( crawled_results, f) else: with open( crawl_file_path, "rb") as f: crawled_results = joblib.load(f) print( f"total time: {time.time() - st}") return crawled_results def extract_results( data: pd.DataFrame, classes: list, provider: str, model: str): """ Argument data: `evidence`, `result` Return extracted_results: dataframe of `extracted_evidence` """ extracted_results, empty_indices, ext_res = [], [], [] for i, d in tqdm(enumerate(data.itertuples())): idx = d[1] evidence = d.evidence business_id = d[2] business_name = d[3] address = d[6] ana_res = None query = compose_query( address, business_name) try: ext_res = compose_extraction( query = query, search_results = evidence, classes = classes, provider = provider, model = model) ext_res = parse_json_garbage(ext_res) except Exception as e: print(f"# ANALYSIS error: e = {e}, i = {i}, q = {query}, ext_res = {ext_res}") empty_indices.append(i) continue extracted_results.append( { "index": idx, "business_id": business_id, "business_name": business_name, "evidence": evidence, ** ext_res } ) extracted_results = pd.DataFrame(extracted_results) return { "extracted_results": extracted_results, "empty_indices": empty_indices } def extract_results_mp( crawled_results, extracted_file_path, classes: list, provider: str, model: str, n_processes: int = 4): """ Argument crawled_results: dataframe extracted_file_path classes: list Return Reference 200 records, 4 processes, 502.26914715766907 """ st = time.time() # args.extracted_file_path = "data/extracted_results.joblib" if not os.path.exists(extracted_file_path): split_data = split_dataframe( crawled_results) with mp.Pool(n_processes) as pool: extracted_results = pool.starmap( extract_results, [ (x, classes, provider, model) for x in split_data]) extracted_results = merge_results( extracted_results, dataframe_columns=['extracted_results'], list_columns=['empty_indices']) with open( extracted_file_path, "wb") as f: joblib.dump( extracted_results, f) else: with open( extracted_file_path, "rb") as f: extracted_results = joblib.load(f) print( f"total time: {time.time() - st}") return extracted_results def postprocess_result( results: pd.DataFrame, postprocessed_results_path, category_hierarchy: dict, column_name: str = 'category'): """ Argument analysis_result: `evidence`, `result` postprocessed_results_path Return """ # index = analysis_result['result']['index'] # store_name = data.loc[index]['營業人名稱'] if len(analysis_result['result'].get('store_name',''))==0 else analysis_result['result']['store_name'] # address = data.loc[index]['營業地址'] if len(analysis_result['result'].get('address',''))==0 else analysis_result['result']['address'] # post_res = { # "evidence": analysis_result['evidence'], # "index": index, # "begin_date": data.loc[index]['設立日期'], # "store_name": store_name, # "address": address, # "description": analysis_result['result'].get('description', ""), # "phone_number": analysis_result['result'].get('phone_number', ""), # "category": analysis_result['result'].get('category', ""), # "supercategory": category_hierarchy.get(analysis_result['result'].get('category', ""), analysis_result['result'].get('category',"")), # } if not os.path.exists(postprocessed_results_path): postprocessed_results = results.copy() postprocessed_results['supercategory'] = postprocessed_results[column_name].apply(lambda x: category_hierarchy.get(x, '')) with open( postprocessed_results_path, "wb") as f: joblib.dump( postprocessed_results, f) else: with open( postprocessed_results_path, "rb") as f: postprocessed_results = joblib.load(f) return postprocessed_results def combine_results( results: pd.DataFrame, combined_results_path: str, src_column: str = 'classified_category', tgt_column: str = 'category', strategy: str = 'replace'): """ Argument classified_results_df: dataframe combined_results_path src_column: str strategy: str, 'replace' or 'patch' Return combined_results: dataframe """ if not os.path.exists(combined_results_path): combined_results = results.copy() if strategy == 'replace': condition = (combined_results[tgt_column]=='') | (combined_results[src_column]!=combined_results[tgt_column]) combined_results.loc[ condition, tgt_column] = combined_results[condition][src_column].values elif strategy == 'patch': condition = (combined_results[tgt_column]=='') combined_results.loc[ condition, tgt_column] = combined_results[condition][src_column].values else: raise Exception(f"Strategy {strategy} not implemented") with open( combined_results_path, "wb") as f: joblib.dump( combined_results, f) else: with open( combined_results_path, "rb") as f: combined_results = joblib.load(f) return combined_results def format_evidence(evidence): """ """ formatted = [] evidence = json.loads(evidence) # print( len(evidence) ) for i in range(len(evidence)): if 'title' in evidence[i] and '顧客評價' in evidence[i]: f = f"\n> 顧客評價: {evidence[i]['顧客評價']}" elif 'title' in evidence[i] and evidence[i]['title']=='類似的店': f = f"\n> 類似的店: {evidence[i]['snippet']}" elif 'status' in evidence[i]: f = f"\n> 經營狀態: {evidence[i]['status']}" elif 'telephone_number' in evidence[i]: f = f"\n> 電話號碼: {evidence[i]['telephone_number']}" else: try: f = f"{i+1}. {evidence[i]['title']} ({evidence[i].get('snippet','')})" except KeyError: print( evidence[i] ) raise KeyError formatted.append(f) return "\n".join(formatted) def format_output( df: pd.DataFrame, input_column: str = 'evidence', output_column: str = 'formatted_evidence', format_func = format_evidence): """ Argument df: `evidence`, `result` input_column: output_column: format_func: Return formatted_df: dataframe of `formatted_evidence` """ formatted_df = df.copy() formatted_df[output_column] = formatted_df[input_column].apply(format_evidence) return formatted_df def merge_results( results: list, dataframe_columns: list, list_columns: list): """ Argument results: a list of dataframes dataframe_columns: list list_columns: list """ assert len(results) > 0, "No results to merge" merged_results = {} for result in results: for key in dataframe_columns: mer_res = pd.concat([ r[key] for r in results], ignore_index=True) merged_results[key] = mer_res for key in list_columns: mer_res = list(itertools.chain(*[ r[key] for r in results])) merged_results[key] = mer_res return merged_results def split_dataframe( df: pd.DataFrame, n_processes: int = 4) -> list: """ """ n = df.shape[0] n_per_process = math.ceil(n / n_processes) return [ df.iloc[i:i+n_per_process] for i in range(0, n, n_per_process)] def continue_missing(args): """ """ data = get_leads(args.data_path) n_data = data.shape[0] formatted_results_path = os.path.join( args.output_dir, args.formatted_results_path) formatted_results = pd.read_csv(formatted_results_path) missing_indices = [] for i in range(n_data): if i not in formatted_results['index'].unique(): print(f"{i} is not found") missing_indices.append(i) crawled_results_path = os.path.join( args.output_dir, args.crawled_file_path) crawled_results = joblib.load( open( crawled_results_path, "rb")) crawled_results = crawled_results['crawled_results'].query( f"index in {missing_indices}") print( crawled_results) er = extract_results( crawled_results, classes = args.classes, provider = args.provider, model = args.model) er = er['extracted_results'] print(er['category']) postprossed_results = postprocess_result( er, "/tmp/postprocessed_results.joblib", category2supercategory ) out_formatted_results = format_output( postprossed_results, input_column = 'evidence', output_column = 'formatted_evidence', format_func = format_evidence ) out_formatted_results.to_csv( "/tmp/formatted_results.missing.csv", index=False) formatted_results = pd.concat([formatted_results, out_formatted_results], ignore_index=True) formatted_results.sort_values(by='index', ascending=True, inplace=True) formatted_results.to_csv( "/tmp/formatted_results.csv", index=False) def main(args): """ Argument args: argparse Note 200 records crawl: 585.3285548686981 extract: 2791.631685256958(delay = 10) classify: 2374.4915606975555(delay = 10) """ crawled_file_path = os.path.join( args.output_dir, args.crawled_file_path) extracted_file_path = os.path.join( args.output_dir, args.extracted_file_path) classified_file_path = os.path.join( args.output_dir, args.classified_file_path) combined_file_path = os.path.join( args.output_dir, args.combined_file_path) postprocessed_results = os.path.join( args.output_dir, args.postprocessed_results) formatted_results_path = os.path.join( args.output_dir, args.formatted_results_path) ## 讀取資料名單 ## data = get_leads(args.data_path) ## 進行爬蟲與分析 ## crawled_results = crawl_results_mp( data, crawled_file_path, n_processes=args.n_processes) # crawled_results = { k:v[-5:] for k,v in crawled_results.items()} ## 方法 1: 擷取關鍵資訊與分類 ## extracted_results = extract_results_mp( crawled_results = crawled_results['crawled_results'], extracted_file_path = extracted_file_path, classes = args.classes, provider = args.provider, model = args.model, n_processes = args.n_processes ) ## 方法2: 直接對爬蟲結果分類 ## classified_results = classify_results_mp( extracted_results['extracted_results'], classified_file_path, classes = args.classes, backup_classes = args.backup_classes, provider = args.provider, model = args.model, n_processes = args.n_processes ) ## 合併分析結果 ## combined_results = combine_results( classified_results['classified_results'], combined_file_path, src_column = 'classified_category', tgt_column = 'category', strategy = args.strategy ) ## 後處理分析結果 ## postprossed_results = postprocess_result( combined_results, postprocessed_results, category2supercategory ) formatted_results = format_output( postprossed_results, input_column = 'evidence', output_column = 'formatted_evidence', format_func = format_evidence) formatted_results.to_csv( formatted_results_path, index=False) category2supercategory = { "小吃店": "中式", "日式料理(含居酒屋,串燒)": "中式", "火(鍋/爐)": "中式", "東南亞料理(不含日韓)": "中式", "海鮮熱炒": "中式", "特色餐廳(含雞、鵝、牛、羊肉)": "中式", "傳統餐廳": "中式", "燒烤": "中式", "韓式料理(含火鍋,烤肉)": "中式", "西餐廳(含美式,義式,墨式)": "西式", "中式": "中式", "西式": "西式", "西餐廳(餐酒館、酒吧、飛鏢吧、pub、lounge bar)": "西式", "西餐廳(土耳其、漢堡、薯條、法式、歐式、印度)": "西式", "早餐": "" } supercategory2category = { "中式": [ "小吃店", "日式料理(含居酒屋,串燒)", "火(鍋/爐)", "東南亞料理(不含日韓)", "海鮮熱炒", "特色餐廳(含雞、鵝、牛、羊肉)", "傳統餐廳", "燒烤", "韓式料理(含火鍋,烤肉)" ], "西式": ["西餐廳(含美式,義式,墨式)", "西餐廳(餐酒館、酒吧、飛鏢吧、pub、lounge bar)", "西餐廳(土耳其、漢堡、薯條、法式、歐式、印度)"], "": ["早餐"] } if __name__=='__main__': base = "https://serpapi.com/search.json" engine = 'google' # query = "Coffee" google_domain = 'google.com.tw' gl = 'tw' lr = 'lang_zh-TW' # url = f"{base}?engine={engine}&q={query}&google_domain={google_domain}&gl={gl}&lr={lr}" n_processes = 4 client = OpenAI( organization = ORGANIZATION_ID) parser = argparse.ArgumentParser() parser.add_argument("--data_path", type=str, default="data/餐廳類型分類.xlsx - 測試清單.csv") parser.add_argument("--task", type=str, default="new", choices = ["new", "continue"], help="new or continue") parser.add_argument("--output_dir", type=str, help='output directory') parser.add_argument("--classified_file_path", type=str, default="classified_results.joblib") parser.add_argument("--extracted_file_path", type=str, default="extracted_results.joblib") parser.add_argument("--crawled_file_path", type=str, default="crawled_results.joblib") parser.add_argument("--combined_file_path", type=str, default="combined_results.joblib") parser.add_argument("--postprocessed_results", type=str, default="postprocessed_results.joblib") parser.add_argument("--formatted_results_path", type=str, default="formatted_results.csv") parser.add_argument("--classes", type=list, default=['小吃店', '日式料理(含居酒屋,串燒)', '火(鍋/爐)', '東南亞料理(不含日韓)', '海鮮熱炒', '特色餐廳(含雞、鵝、牛、羊肉)', '傳統餐廳', '燒烤', '韓式料理(含火鍋,烤肉)', '西餐廳(含美式,義式,墨式)', '西餐廳(餐酒館、酒吧、飛鏢吧、pub、lounge bar)', '西餐廳(土耳其、漢堡、薯條、法式、歐式、印度)', '早餐']) parser.add_argument("--backup_classes", type=list, default=['中式', '西式']) parser.add_argument("--strategy", type=str, default='patch', choices=['replace', 'patch']) parser.add_argument("--provider", type=str, default='openai', choices=['openai', 'anthropic']) parser.add_argument("--model", type=str, default='gpt-4-0125-preview', choices=['claude-3-sonnet-20240229', 'claude-3-haiku-20240307', 'gpt-3.5-turbo-0125', 'gpt-4-0125-preview']) parser.add_argument("--n_processes", type=int, default=4) args = parser.parse_args() if args.task == 'new': main(args) elif args.task == 'continue': continue_missing(args) else: raise Exception(f"Task {args.task} not implemented")