from collections import defaultdict import os import time PINECONE_API_KEY = os.getenv('PINECONE_API_KEY') PINECONE_HOST = 'prorata-postman-ds-ul-dp9xwvt.svc.aped-4627-b74a.pinecone.io' from pinecone import Pinecone pc = Pinecone(api_key=PINECONE_API_KEY) pc_ul = pc.Index('prorata-postman-ds-ul') style1_str = """ """ style2_str = """ """ chunk_separator = '[...]' from langchain.text_splitter import RecursiveCharacterTextSplitter sentence_splitter = RecursiveCharacterTextSplitter( chunk_size=1024, chunk_overlap=0, separators=["\n\n", "\n", "."], keep_separator=False ) # def get_article_from_url(url): # headers = { # "Content-Type": "application/json", # "Api-Key": PINECONE_API_KEY # } # data = { # "id": url, # "topK": 1, # "includeMetadata": True, # } # res = requests.post(f"https://{PINECONE_HOST}/query", headers=headers, json=data) # if not res: # return {} # top_match_metadata = res.json()['matches'][0]['metadata'] # return { # 'title': top_match_metadata['title'], # 'url': top_match_metadata['url'], # 'text': top_match_metadata['text'], # } def get_article_from_url(url): res = pc_ul.query(id=url, top_k=1, include_metadata=True) if not res['matches']: return {} top_match_metadata = res['matches'][0]['metadata'] return { 'title': top_match_metadata['title'], 'url': top_match_metadata['url'], 'text': top_match_metadata['text'], } def print_w_time_elapsed(msg, start_time, file=None): print(f"{msg} ({time.perf_counter()-start_time:.2f} secs)", file=file) # def _add_chunk_text_formatted_l_aggmatch_determination(aggmatch_determination): # chunk_text_l = aggmatch_determination['chunk_text_l'] # n_chunks = len(chunk_text_l) # if 'quote_matches_l' not in aggmatch_determination: # aggmatch_determination['chunk_support_flags'] = n_chunks*[True] # aggmatch_determination['chunk_text_formatted_l'] = chunk_text_l # return # quote_matches_l = aggmatch_determination['quote_matches_l'] # last_end, coffset = 0, 0 # chunk_support_flags = [False]*n_chunks # chunk_text_formatted_l = [] # for cidx, ctext in enumerate(chunk_text_l): # ctext_formatted = "" # for quote_match in quote_matches_l: # if quote_match['start'] > coffset and quote_match['end'] <= coffset + len(ctext): # chunk_support_flags[cidx] = True # # TODO: handle case were quote spans across chunks # ctext_formatted += ctext[last_end-coffset:quote_match['start']-coffset] # ctext_formatted += quote_start + ctext[quote_match['start']-coffset:quote_match['end']-coffset] + quote_end # last_end = quote_match['end'] # ctext_formatted += ctext[last_end-coffset:] # chunk_text_formatted_l.append(ctext_formatted) # coffset += len(ctext) + 2 # last_end = coffset # aggmatch_determination['chunk_support_flags'] = chunk_support_flags # aggmatch_determination['chunk_text_formatted_l'] = chunk_text_formatted_l # # TODO: need to operate on single copy of each chunk (so all quotes are kept) # def _add_chunk_text_formatted_l(atom_support_l): # for atom_support in atom_support_l: # for url, aggmatch_determination in atom_support.items(): # _add_chunk_text_formatted_l_aggmatch_determination(aggmatch_determination) def create_url_to_cid_to_ctext_formatted_map(atom_support_l): url_to_cid_to_ctext_map = defaultdict(dict) url_to_cid_to_ctext_formatted_map = defaultdict(dict) url_to_cid_to_nquotes_map = defaultdict(dict) for atom_support in atom_support_l: for url, aggmatch_determination in atom_support.items(): cid_to_ctext_map = url_to_cid_to_ctext_map[url] cid_to_ctext_formatted_map = url_to_cid_to_ctext_formatted_map[url] cid_to_nquotes_map = url_to_cid_to_nquotes_map[url] chunk_id_l = aggmatch_determination['id_l'] chunk_text_l = aggmatch_determination['chunk_text_l'] for cid, ctext in zip(chunk_id_l, chunk_text_l): cid_to_ctext_map[cid] = ctext quote_matches_l = aggmatch_determination.get('quote_matches_l', None) if quote_matches_l: last_end, coffset = 0, 0 chunk_text_formatted_l = [] for cid, ctext in zip(chunk_id_l, chunk_text_l): nquotes = 0 ctext_formatted = "" for quote_match in quote_matches_l: if quote_match['start'] >= coffset and quote_match['end'] <= coffset + len(ctext): nquotes += 1 # TODO: handle case were quote spans across chunks ctext_formatted += ctext[last_end-coffset:quote_match['start']-coffset] ctext_formatted += quote_start + ctext[quote_match['start']-coffset:quote_match['end']-coffset] + quote_end last_end = quote_match['end'] ctext_formatted += ctext[last_end-coffset:] chunk_text_formatted_l.append(ctext_formatted) coffset += len(ctext) + 2 last_end = coffset # this one used in per claim breakdown aggmatch_determination['chunk_text_formatted_l'] = chunk_text_formatted_l # these are for the main view if not cid in cid_to_nquotes_map or nquotes > cid_to_nquotes_map[cid]: print(f"\n\n### {url} storing formatted cid={cid} ctext:") print(f"quote_matches_l={quote_matches_l}") print(f"nquotes={nquotes}, ctext_formatted={ctext_formatted}") cid_to_nquotes_map[cid] = nquotes cid_to_ctext_formatted_map[cid] = ctext_formatted return url_to_cid_to_ctext_map, url_to_cid_to_ctext_formatted_map, url_to_cid_to_nquotes_map # def get_url_to_supporting_cid_ctext_tuples(atom_support_l): # url_to_supporting_cid_quote_flag_map = defaultdict(dict) # url_to_supporting_cid_ctext_map = defaultdict(dict) # for atom_support in atom_support_l: # for url, aggmatch_determination in atom_support.items(): # if aggmatch_determination['true']: # use_formatted = 'chunk_text_formatted_l' in aggmatch_determination # include_only_formatted = use_formatted and any(aggmatch_determination['chunk_support_flags']) # chunk_text_l_key = 'chunk_text_formatted_l' if use_formatted else 'chunk_text_l' # for lidx, (cid, ctext) in enumerate(zip(aggmatch_determination['id_l'], aggmatch_determination[chunk_text_l_key])): # chunk_has_quote = aggmatch_determination['chunk_support_flags'][lidx] # if cid not in url_to_supporting_cid_quote_flag_map[url] or not url_to_supporting_cid_quote_flag_map[url][cid]: # if not include_only_formatted or chunk_has_quote: # url_to_supporting_cid_quote_flag_map[url][cid] = chunk_has_quote # url_to_supporting_cid_ctext_map[url][cid] = ctext # # now sort each list of chunks # url_to_supporting_cid_ctext_tuples = {} # for url, cid_ctext_map in url_to_supporting_cid_ctext_map.items(): # # url_to_supporting_cid_ctext_tuples[url] = sorted(cid_ctext_tuple_l, key=lambda x: x[0]) # url_to_supporting_cid_ctext_tuples[url] = sorted(list(cid_ctext_map.items()), key=lambda x: x[0]) # # pprint.pp(url_to_supporting_cid_ctext_tuples) # return url_to_supporting_cid_ctext_tuples def format_chunk_texts_for_display(cid_ctext_tuples): ids_l = [int(x[0].split('-')[1]) for x in cid_ctext_tuples] match_text = "" n_chunks = len(cid_ctext_tuples) for j, cid_ctext_tuple in enumerate(cid_ctext_tuples): ctext = cid_ctext_tuple[1] need_ellipsis = False if j < n_chunks-1 and ids_l[j] != ids_l[j+1]: need_ellipsis = True if len(ctext) > 512: ctext = sentence_splitter.split_text(ctext)[0] need_ellipsis = True if j > 0: match_text += '\n\n' match_text += ctext if need_ellipsis: match_text += chunk_separator return match_text quote_start = '' quote_end = '' import re quote_pattern_l = [ r"(\n[\s]*){1}\[\S\s]*\", r"(\n\s*[A-Z“\"]){1}.*\[\S\s]*\", r"(\n\s*[A-Z“\"]){1}[\S\s]*\[\S\s]*\", r"(\n|^){1}[\S\s]*\[\S\s]*\", ] def format_chunk_texts_for_display2(url, cid_ctext_tuples): ids_l = [int(x[0].split('-')[1]) for x in cid_ctext_tuples] n_chunks = len(cid_ctext_tuples) print(f"Formatting {url} n_chunks={n_chunks}...") ctext_formatted_l, has_quote_l, needs_ellipsis_l = [], [], [] for j, cid_ctext_tuple in enumerate(cid_ctext_tuples): ctext = cid_ctext_tuple[1] print(f"cid={cid_ctext_tuple[0]}:") print(f"```{ctext}```") first_quote_idx, needs_ellipsis = -1, False if j < n_chunks-1 and ids_l[j] != ids_l[j+1]: needs_ellipsis = True if len(ctext) > 512: _first_quote_idx = ctext.find(quote_start) # TODO: remove with better set of regex print(f"_first_quote_idx={_first_quote_idx}") for pidx, quote_pattern in enumerate(quote_pattern_l): match = re.search(quote_pattern, ctext) if match: print(f"pidx={pidx} found match: {match}") first_quote_idx = match.span()[0] break first_quote_idx = min(first_quote_idx, _first_quote_idx) print(f"first_quote_idx={first_quote_idx}") if first_quote_idx >= 0: ctext = ctext[first_quote_idx:] ctext = sentence_splitter.split_text(ctext)[0] needs_ellipsis = True ctext_formatted_l.append(ctext) has_quote_l.append(first_quote_idx >= 0) needs_ellipsis_l.append(needs_ellipsis) if any(has_quote_l): ctext_formatted_l = [ctext_formatted_l[i] for i in range(n_chunks) if has_quote_l[i]] needs_ellipsis_l = [needs_ellipsis_l[i] for i in range(n_chunks) if has_quote_l[i]] match_text = "" for j, ctext_formatted in enumerate(ctext_formatted_l): if j > 0: match_text += '\n\n' match_text += ctext_formatted if needs_ellipsis_l[j]: match_text += chunk_separator return match_text def format_chunk_texts_for_display3(url, cid_to_ctext_map, cid_to_ctext_formatted_map, cid_to_nquotes_map): cid_w_quotes_map = { cid: cid_to_ctext_formatted_map[cid] for cid, nquotes in cid_to_nquotes_map.items() if nquotes > 0 } if cid_w_quotes_map: cid_ctext_tuples = sorted(list(cid_w_quotes_map.items()), key=lambda x: x[0]) else: cid_ctext_tuples = sorted(list(cid_to_ctext_map.items()), key=lambda x: x[0]) # print(f"{url}:") # print(f"cid_ctext_tuples={cid_ctext_tuples}") return format_chunk_texts_for_display2(url, cid_ctext_tuples)