import streamlit as st import pinecone from sentence_transformers import SentenceTransformer import logging PINECONE_KEY = st.secrets["PINECONE_KEY"] # app.pinecone.io INDEX_ID = 'ask-youtube' @st.experimental_singleton def init_pinecone(): pinecone.init(api_key=PINECONE_KEY, environment="us-west1-gcp") return pinecone.Index(INDEX_ID) @st.experimental_singleton def init_retriever(): return SentenceTransformer("multi-qa-mpnet-base-dot-v1") def make_query(query, retriever, top_k=10, include_values=True, include_metadata=True, filter=None): xq = retriever.encode([query]).tolist() logging.info(f"Query: {query}") attempt = 0 while attempt < 3: try: xc = st.session_state.index.query( xq, top_k=top_k, include_values=include_values, include_metadata=include_metadata, filter=filter ) matches = xc['matches'] break except: # force reload pinecone.init(api_key=PINECONE_KEY, environment="us-west1-gcp") st.session_state.index = pinecone.Index(INDEX_ID) attempt += 1 matches = [] if len(matches) == 0: logging.error(f"Query failed") return matches st.session_state.index = init_pinecone() retriever = init_retriever() def card(thumbnail: str, title: str, urls: list, contexts: list, starts: list, ends: list): meta = [(e, s, u, c) for e, s, u, c in zip(ends, starts, urls, contexts)] meta.sort(reverse=False) text_content = [] current_start = 0 current_end = 0 for end, start, url, context in meta: # reformat seconds to timestamp time = start / 60 mins = f"0{int(time)}"[-2:] secs = f"0{int(round((time - int(mins))*60, 0))}"[-2:] timestamp = f"{mins}:{secs}" if start < current_end and start > current_start: # this means it is a continuation of the previous sentence text_content[-1][0] = text_content[-1][0].split(context[:10])[0] text_content.append([f"[{timestamp}] {context.capitalize()}", url]) else: text_content.append(["xxLINEBREAKxx", ""]) text_content.append([f"[{timestamp}] {context}", url]) current_start = start current_end = end html_text = "" for text, url in text_content: if text == "xxLINEBREAKxx": html_text += "
" else: html_text += f"{text.strip()}... " print(text) html = f"""

{title}

{html_text}

""" return st.markdown(html, unsafe_allow_html=True) channel_map = { } st.write(""" # YouTube Q&A """) st.info(""" YouTube search for the Quantum Resistant Ledger Youtube Channel. Credits to: James Briggs. """) st.markdown(""" """, unsafe_allow_html=True) query = st.text_input("Search!", "") if query != "": print(f"query: {query}") matches = make_query( query, retriever, top_k=5, ) results = {} order = [] for context in matches: video_id = context['metadata']['url'].split('/')[-1] if video_id not in results: results[video_id] = { 'title': context['metadata']['title'], 'urls': [f"{context['metadata']['url']}?t={int(context['metadata']['start'])}"], 'contexts': [context['metadata']['text']], 'starts': [int(context['metadata']['start'])], 'ends': [int(context['metadata']['end'])] } order.append(video_id) else: results[video_id]['urls'].append( f"{context['metadata']['url']}?t={int(context['metadata']['start'])}" ) results[video_id]['contexts'].append( context['metadata']['text'] ) results[video_id]['starts'].append(int(context['metadata']['start'])) results[video_id]['ends'].append(int(context['metadata']['end'])) # now display cards for video_id in order: card( thumbnail=f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg", title=results[video_id]['title'], urls=results[video_id]['urls'], contexts=results[video_id]['contexts'], starts=results[video_id]['starts'], ends=results[video_id]['ends'] )