# Streamlit classes import streamlit as st from streamlit_agraph import agraph, Node, Edge, Config from streamlit_chat import message # Data manipulation and embeddings import pandas as pd import numpy as np import openai from openai.embeddings_utils import distances_from_embeddings import whisper # Exec tasks import os, json import math import re from threading import Thread # Custom classes from transcription import * from keywords import Keywords from summary import TextSummarizer from takeaways import KeyTakeaways from mindmap import MindMap import models as md def get_initial_message(): messages=[ {"role": "system", "content": "You are a helpful AI Tutor. Who anwers brief questions about AI."}, {"role": "user", "content": "I want to learn AI"}, {"role": "assistant", "content": "Thats awesome, what do you want to know aboout AI"} ] return messages REGEXP_YOUTUBE_URL = "^(https?\:\/\/)?((www\.)?youtube\.com|youtu\.be)\/.+$" model = whisper.load_model('base') output = '' data = [] data_transcription = {"title":"", "text":""} embeddings = [] text_chunks_lib = dict() user_input = None title_entry = "" tldr = "" summary = "" takeaways = [] keywords = [] folder_name = "./tests" input_accepted = False is_completed_analysis = False if not os.path.exists(folder_name): os.mkdir(folder_name) user_secret = os.getenv("OPENAI_API_KEY") # Define the purpose of the application st.header('Almithal') st.subheader('Almithal is a comprehensive video and PDF study buddy.') st.write('It provides a summary, transcription, key insights, a mind map and a Q&A feature where you can actually "talk" to the datasource.') bar = st.progress(0) def generate_word_embeddings(): global data if not os.path.exists(f"{folder_name}/word_embeddings.csv"): for i, segment in enumerate(segments): bar.progress(max(math.ceil((i/len(segments) * 50)), 1)) response = openai.Embedding.create( input= segment["text"].strip(), model="text-embedding-ada-002" ) embeddings = response['data'][0]['embedding'] meta = { "text": segment["text"].strip(), "embedding": embeddings } data.append(meta) pd.DataFrame(data).to_csv(f'{folder_name}/word_embeddings.csv') else: data = pd.read_csv(f'{folder_name}/word_embeddings.csv') def generate_text_chunks_lib(): global data_transcription global title_entry, text_chunks_lib global keywords global tldr global summary global takeaways global input_accepted # For each body of text, create text chunks of a certain token size required for the transformer text_df = pd.DataFrame.from_dict({"title": [data_transcription["title"]], "text":[data_transcription["text"]]}) input_accepted = True title_entry = text_df['title'][0] print("\n\nFIRST TITLE_ENTRY", title_entry) for i in range(0, len(text_df)): nested_sentences = md.create_nest_sentences(document=text_df['text'][i], token_max_length=1024) # For each chunk of sentences (within the token max) text_chunks = [] for n in range(0, len(nested_sentences)): tc = " ".join(map(str, nested_sentences[n])) text_chunks.append(tc) text_chunks_lib[title_entry] = text_chunks # Generate key takeaways key_engine = Keywords(title_entry) keywords = key_engine.get_keywords(text_chunks_lib) # =========== SIDEBAR FOR GENERATION =========== with st.sidebar: youtube_link = st.text_input(label = "Type in your Youtube link", placeholder = "", key="url") st.markdown("OR") pdf_file = st.file_uploader("Upload your PDF", type="pdf") st.markdown("OR") audio_file = st.file_uploader("Upload your MP3 audio file", type=["wav", "mp3"]) gen_keywords = st.radio( "Generate keywords from text?", ('Yes', 'No') ) gen_summary = st.radio( "Generate summary from text? (recommended for label matching below, but will take longer)", ('Yes', 'No') ) if st.button("Start Analysis"): # Youtube Transcription if re.search(REGEXP_YOUTUBE_URL, youtube_link): vte = VideoTranscription(youtube_link) YOUTUBE_VIDEO_ID = youtube_link.split("=")[1] folder_name = f"./tests/{YOUTUBE_VIDEO_ID}" if not os.path.exists(folder_name): os.mkdir(folder_name) with st.spinner('Running transcription...'): data_transcription = vte.transcribe() segments = data_transcription['segments'] # PDF Transcription elif pdf_file is not None: pte = PDFTranscription(pdf_file) folder_name = pte.get_redacted_name() if not os.path.exists(folder_name): os.mkdir(folder_name) with st.spinner('Running transcription...'): data_transcription = pte.transcribe() segments = data_transcription['segments'] # Audio transcription elif audio_file is not None: ate = AudioTranscription(audio_file) folder_name = ate.get_redacted_name() if not os.path.exists(f""): os.mkdir(folder_name) with st.spinner('Running transcription...'): data_transcription = ate.transcribe() segments = data_transcription['segments'] with open(f"{folder_name}/data.json", "w") as f: json.dump(data_transcription, f, indent=4) else: st.error("Please type in your youtube link or upload the PDF") st.experimental_rerun() # Generate embeddings thread1 = Thread(target=generate_word_embeddings) thread1.start() # Generate text chunks thread2 = Thread(target=generate_text_chunks_lib) thread2.start() # Wait for them to complete thread1.join() thread2.join() def generate_summary(): pass def generate_key_takeaways(): pass threadSum = Thread(target=generate_summary) threadTak = Thread(target=generate_key_takeaways) # Generate the summary if gen_summary == 'Yes': se = TextSummarizer(title_entry) text_transcription = data_transcription['text'] with st.spinner("Generating summary and TLDR..."): summary = se.generate_full_summary(text_chunks_lib) summary_list = summary.split("\n\n") tldr = se.generate_short_summary(summary_list) # Generate key takeaways kt = KeyTakeaways() with st.spinner("Generating key takeaways ... "): takeaways = kt.generate_key_takeaways(text_chunks_lib) is_completed_analysis = True bar.progress(100) with open(f"{folder_name}/data.json", "w") as f: json.dump(data_transcription, f, indent=4) if is_completed_analysis: st.header("Key Takeaways") st.write("Here are some of the key takeaways from the data:") for takeaway in takeaways: st.markdown(f"- {takeaway}") tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs(["Introduction", "Summary", "Transcription", "Mind Map", "Keywords", "Q&A"]) # =========== INTRODUCTION =========== with tab1: st.markdown("## How do I use this?") st.markdown("Do one of the following") st.markdown('* Type in your youtube URL that you want worked on') st.markdown('* Place the PDF file that you want worked on') st.markdown('* Place the audio file that you want worked on') st.markdown("**Once the file / url has finished saving, a 'Start Analysis' button will appear. Click on this button to begin the note generation**") st.warning("NOTE: This is just a demo product in alpha testing. Any and all bugs will soon be fixed") st.warning("After the note taking is done, you will see multiple tabs for more information") # =========== SUMMARIZATION =========== with tab2: if is_completed_analysis: st.header("TL;DR") for point in tldr: st.markdown(f"- {point}") st.header("Summary") st.write(summary) else: st.warning("Please wait for the analysis to finish") # =========== TRANSCRIPTION =========== with tab3: st.header("Transcription") if is_completed_analysis: with st.spinner("Generating transcript ..."): st.write("") for text in text_chunks_lib[title_entry]: st.write(text) else: st.warning("Please wait for the analysis to finish") # =========== MIND MAP =========== with tab4: st.header("Mind Map") if is_completed_analysis: mindmap = MindMap() with st.spinner("Generating mind map..."): mindmap.generate_graph(text_chunks_lib) else: st.warning("Please wait for the analysis to finish") # =========== KEYWORDS =========== with tab5: st.header("Keywords:") if is_completed_analysis and gen_keywords: for i, keyword in enumerate(keywords): st.markdown(f"{i+1}. {keyword}") else: st.warning("Please wait for the analysis to finish") # =========== QUERY BOT =========== with tab6: if 'generated' not in st.session_state: st.session_state['generated'] = [] if 'past' not in st.session_state: st.session_state['past'] = [] def get_text(): st.header("Ask me something about the video:") input_text = st.text_input("You: ", key="prompt") return input_text def get_embedding_text(prompt): response = openai.Embedding.create( input= prompt.strip(), model="text-embedding-ada-002" ) q_embedding = response['data'][0]['embedding'] print("the folder name at got here 1.5 is ", folder_name) # df = pd.read_csv(f'{folder_name}/word_embeddings.csv', index_col=0) data['embedding'] = data['embedding'].apply(eval).apply(np.array) data['distances'] = distances_from_embeddings(q_embedding, data['embedding'].values, distance_metric='cosine') returns = [] # Sort by distance with 2 hints for i, row in data.sort_values('distances', ascending=True).head(4).iterrows(): # Else add it to the text that is being returned returns.append(row["text"]) # Return the context return "\n\n###\n\n".join(returns) def generate_response(prompt): one_shot_prompt = ''' I am YoutubeGPT, a highly intelligent question answering bot. If you ask me a question that is rooted in truth, I will give you the answer. Q: What is human life expectancy in the United States? A: Human life expectancy in the United States is 78 years. Q: '''+prompt+''' A: ''' completions = openai.Completion.create( engine = "text-davinci-003", prompt = one_shot_prompt, max_tokens = 1024, n = 1, stop=["Q:"], temperature=0.5, ) message = completions.choices[0].text return message user_input = get_text() print("user input is ", user_input) print("the folder name at got here 0.5 is ", folder_name) if user_input: print("got here 1") print("the folder name at got here 1.5 is ", folder_name) text_embedding = get_embedding_text(user_input) print("the folder name at got here 1.5 is ", folder_name) print("got here 2") title = data_transcription['title'] string_title = "\n\n###\n\n".join(title) user_input_embedding = 'Using this context: "'+string_title+'. '+text_embedding+'", answer the following question. \n'+user_input print("got here 3") output = generate_response(user_input_embedding) st.session_state.past.append(user_input) st.session_state.generated.append(output) if st.session_state['generated']: for i in range(len(st.session_state['generated'])-1, -1, -1): message(st.session_state["generated"][i], key=str(i)) message(st.session_state['past'][i], is_user=True, key=str(i) + '_user') # st.header("What else")