from collections import defaultdict
import os
import time
PINECONE_API_KEY = os.getenv('PINECONE_API_KEY')
PINECONE_HOST = 'prorata-postman-ds-ul-dp9xwvt.svc.aped-4627-b74a.pinecone.io'
from pinecone import Pinecone
pc = Pinecone(api_key=PINECONE_API_KEY)
pc_ul = pc.Index('prorata-postman-ds-ul')
style1_str = """
"""
style2_str = """
"""
chunk_separator = '[...]'
from langchain.text_splitter import RecursiveCharacterTextSplitter
sentence_splitter = RecursiveCharacterTextSplitter(
chunk_size=1024,
chunk_overlap=0,
separators=["\n\n", "\n", "."],
keep_separator=False
)
# def get_article_from_url(url):
# headers = {
# "Content-Type": "application/json",
# "Api-Key": PINECONE_API_KEY
# }
# data = {
# "id": url,
# "topK": 1,
# "includeMetadata": True,
# }
# res = requests.post(f"https://{PINECONE_HOST}/query", headers=headers, json=data)
# if not res:
# return {}
# top_match_metadata = res.json()['matches'][0]['metadata']
# return {
# 'title': top_match_metadata['title'],
# 'url': top_match_metadata['url'],
# 'text': top_match_metadata['text'],
# }
def get_article_from_url(url):
res = pc_ul.query(id=url, top_k=1, include_metadata=True)
if not res['matches']:
return {}
top_match_metadata = res['matches'][0]['metadata']
return {
'title': top_match_metadata['title'],
'url': top_match_metadata['url'],
'text': top_match_metadata['text'],
}
def print_w_time_elapsed(msg, start_time, file=None):
print(f"{msg} ({time.perf_counter()-start_time:.2f} secs)", file=file)
# def _add_chunk_text_formatted_l_aggmatch_determination(aggmatch_determination):
# chunk_text_l = aggmatch_determination['chunk_text_l']
# n_chunks = len(chunk_text_l)
# if 'quote_matches_l' not in aggmatch_determination:
# aggmatch_determination['chunk_support_flags'] = n_chunks*[True]
# aggmatch_determination['chunk_text_formatted_l'] = chunk_text_l
# return
# quote_matches_l = aggmatch_determination['quote_matches_l']
# last_end, coffset = 0, 0
# chunk_support_flags = [False]*n_chunks
# chunk_text_formatted_l = []
# for cidx, ctext in enumerate(chunk_text_l):
# ctext_formatted = ""
# for quote_match in quote_matches_l:
# if quote_match['start'] > coffset and quote_match['end'] <= coffset + len(ctext):
# chunk_support_flags[cidx] = True
# # TODO: handle case were quote spans across chunks
# ctext_formatted += ctext[last_end-coffset:quote_match['start']-coffset]
# ctext_formatted += quote_start + ctext[quote_match['start']-coffset:quote_match['end']-coffset] + quote_end
# last_end = quote_match['end']
# ctext_formatted += ctext[last_end-coffset:]
# chunk_text_formatted_l.append(ctext_formatted)
# coffset += len(ctext) + 2
# last_end = coffset
# aggmatch_determination['chunk_support_flags'] = chunk_support_flags
# aggmatch_determination['chunk_text_formatted_l'] = chunk_text_formatted_l
# # TODO: need to operate on single copy of each chunk (so all quotes are kept)
# def _add_chunk_text_formatted_l(atom_support_l):
# for atom_support in atom_support_l:
# for url, aggmatch_determination in atom_support.items():
# _add_chunk_text_formatted_l_aggmatch_determination(aggmatch_determination)
def create_url_to_cid_to_ctext_formatted_map(atom_support_l):
url_to_cid_to_ctext_map = defaultdict(dict)
url_to_cid_to_ctext_formatted_map = defaultdict(dict)
url_to_cid_to_nquotes_map = defaultdict(dict)
for atom_support in atom_support_l:
for url, aggmatch_determination in atom_support.items():
cid_to_ctext_map = url_to_cid_to_ctext_map[url]
cid_to_ctext_formatted_map = url_to_cid_to_ctext_formatted_map[url]
cid_to_nquotes_map = url_to_cid_to_nquotes_map[url]
chunk_id_l = aggmatch_determination['id_l']
chunk_text_l = aggmatch_determination['chunk_text_l']
for cid, ctext in zip(chunk_id_l, chunk_text_l):
cid_to_ctext_map[cid] = ctext
quote_matches_l = aggmatch_determination.get('quote_matches_l', None)
if quote_matches_l:
last_end, coffset = 0, 0
chunk_text_formatted_l = []
for cid, ctext in zip(chunk_id_l, chunk_text_l):
nquotes = 0
ctext_formatted = ""
for quote_match in quote_matches_l:
if quote_match['start'] >= coffset and quote_match['end'] <= coffset + len(ctext):
nquotes += 1
# TODO: handle case were quote spans across chunks
ctext_formatted += ctext[last_end-coffset:quote_match['start']-coffset]
ctext_formatted += quote_start + ctext[quote_match['start']-coffset:quote_match['end']-coffset] + quote_end
last_end = quote_match['end']
ctext_formatted += ctext[last_end-coffset:]
chunk_text_formatted_l.append(ctext_formatted)
coffset += len(ctext) + 2
last_end = coffset
# this one used in per claim breakdown
aggmatch_determination['chunk_text_formatted_l'] = chunk_text_formatted_l
# these are for the main view
if not cid in cid_to_nquotes_map or nquotes > cid_to_nquotes_map[cid]:
print(f"\n\n### {url} storing formatted cid={cid} ctext:")
print(f"quote_matches_l={quote_matches_l}")
print(f"nquotes={nquotes}, ctext_formatted={ctext_formatted}")
cid_to_nquotes_map[cid] = nquotes
cid_to_ctext_formatted_map[cid] = ctext_formatted
return url_to_cid_to_ctext_map, url_to_cid_to_ctext_formatted_map, url_to_cid_to_nquotes_map
# def get_url_to_supporting_cid_ctext_tuples(atom_support_l):
# url_to_supporting_cid_quote_flag_map = defaultdict(dict)
# url_to_supporting_cid_ctext_map = defaultdict(dict)
# for atom_support in atom_support_l:
# for url, aggmatch_determination in atom_support.items():
# if aggmatch_determination['true']:
# use_formatted = 'chunk_text_formatted_l' in aggmatch_determination
# include_only_formatted = use_formatted and any(aggmatch_determination['chunk_support_flags'])
# chunk_text_l_key = 'chunk_text_formatted_l' if use_formatted else 'chunk_text_l'
# for lidx, (cid, ctext) in enumerate(zip(aggmatch_determination['id_l'], aggmatch_determination[chunk_text_l_key])):
# chunk_has_quote = aggmatch_determination['chunk_support_flags'][lidx]
# if cid not in url_to_supporting_cid_quote_flag_map[url] or not url_to_supporting_cid_quote_flag_map[url][cid]:
# if not include_only_formatted or chunk_has_quote:
# url_to_supporting_cid_quote_flag_map[url][cid] = chunk_has_quote
# url_to_supporting_cid_ctext_map[url][cid] = ctext
# # now sort each list of chunks
# url_to_supporting_cid_ctext_tuples = {}
# for url, cid_ctext_map in url_to_supporting_cid_ctext_map.items():
# # url_to_supporting_cid_ctext_tuples[url] = sorted(cid_ctext_tuple_l, key=lambda x: x[0])
# url_to_supporting_cid_ctext_tuples[url] = sorted(list(cid_ctext_map.items()), key=lambda x: x[0])
# # pprint.pp(url_to_supporting_cid_ctext_tuples)
# return url_to_supporting_cid_ctext_tuples
def format_chunk_texts_for_display(cid_ctext_tuples):
ids_l = [int(x[0].split('-')[1]) for x in cid_ctext_tuples]
match_text = ""
n_chunks = len(cid_ctext_tuples)
for j, cid_ctext_tuple in enumerate(cid_ctext_tuples):
ctext = cid_ctext_tuple[1]
need_ellipsis = False
if j < n_chunks-1 and ids_l[j] != ids_l[j+1]:
need_ellipsis = True
if len(ctext) > 512:
ctext = sentence_splitter.split_text(ctext)[0]
need_ellipsis = True
if j > 0:
match_text += '\n\n'
match_text += ctext
if need_ellipsis:
match_text += chunk_separator
return match_text
quote_start = ''
quote_end = ''
import re
quote_pattern_l = [
r"(\n[\s]*){1}\[\S\s]*\",
r"(\n\s*[A-Z“\"]){1}.*\[\S\s]*\",
r"(\n\s*[A-Z“\"]){1}[\S\s]*\[\S\s]*\",
r"(\n|^){1}[\S\s]*\[\S\s]*\",
]
def format_chunk_texts_for_display2(url, cid_ctext_tuples):
ids_l = [int(x[0].split('-')[1]) for x in cid_ctext_tuples]
n_chunks = len(cid_ctext_tuples)
print(f"Formatting {url} n_chunks={n_chunks}...")
ctext_formatted_l, has_quote_l, needs_ellipsis_l = [], [], []
for j, cid_ctext_tuple in enumerate(cid_ctext_tuples):
ctext = cid_ctext_tuple[1]
print(f"cid={cid_ctext_tuple[0]}:")
print(f"```{ctext}```")
first_quote_idx, needs_ellipsis = -1, False
if j < n_chunks-1 and ids_l[j] != ids_l[j+1]:
needs_ellipsis = True
if len(ctext) > 512:
_first_quote_idx = ctext.find(quote_start) # TODO: remove with better set of regex
print(f"_first_quote_idx={_first_quote_idx}")
for pidx, quote_pattern in enumerate(quote_pattern_l):
match = re.search(quote_pattern, ctext)
if match:
print(f"pidx={pidx} found match: {match}")
first_quote_idx = match.span()[0]
break
first_quote_idx = min(first_quote_idx, _first_quote_idx)
print(f"first_quote_idx={first_quote_idx}")
if first_quote_idx >= 0:
ctext = ctext[first_quote_idx:]
ctext = sentence_splitter.split_text(ctext)[0]
needs_ellipsis = True
ctext_formatted_l.append(ctext)
has_quote_l.append(first_quote_idx >= 0)
needs_ellipsis_l.append(needs_ellipsis)
if any(has_quote_l):
ctext_formatted_l = [ctext_formatted_l[i] for i in range(n_chunks) if has_quote_l[i]]
needs_ellipsis_l = [needs_ellipsis_l[i] for i in range(n_chunks) if has_quote_l[i]]
match_text = ""
for j, ctext_formatted in enumerate(ctext_formatted_l):
if j > 0:
match_text += '\n\n'
match_text += ctext_formatted
if needs_ellipsis_l[j]:
match_text += chunk_separator
return match_text
def format_chunk_texts_for_display3(url, cid_to_ctext_map, cid_to_ctext_formatted_map, cid_to_nquotes_map):
cid_w_quotes_map = { cid: cid_to_ctext_formatted_map[cid] for cid, nquotes in cid_to_nquotes_map.items() if nquotes > 0 }
if cid_w_quotes_map:
cid_ctext_tuples = sorted(list(cid_w_quotes_map.items()), key=lambda x: x[0])
else:
cid_ctext_tuples = sorted(list(cid_to_ctext_map.items()), key=lambda x: x[0])
# print(f"{url}:")
# print(f"cid_ctext_tuples={cid_ctext_tuples}")
return format_chunk_texts_for_display2(url, cid_ctext_tuples)