from PyPDF2 import PdfReader from concurrent.futures import ThreadPoolExecutor import streamlit as st import io from anthropic import Anthropic import tiktoken import re client = Anthropic() encoding_openAI = tiktoken.get_encoding("cl100k_base") encoding_anthropic = client.get_tokenizer() # Model choice and max tokens input model_choice = st.sidebar.selectbox("Choose a Model", ["OpenAI", "Anthropic"]) def clean_text_content(text): # Keep only English letters, numbers, spaces, line breaks, and common punctuation/symbols cleaned_text = re.sub(r'[^a-zA-Z0-9 \r\n.,;!?()\-\'\"&+:%$#@*]', '', text) return cleaned_text def create_chunks(text, n, tokenizer_name): """Returns successive n-sized chunks from provided text.""" tokenizer = encoding_openAI if tokenizer_name == "OpenAI" else encoding_anthropic encoded = tokenizer.encode(text) # Check for type of token and adapt accordingly tokens = encoded.ids if hasattr(encoded, "ids") else encoded i = 0 while i < len(tokens): # Find the nearest end of sentence within a range of 0.5 * n and 1.5 * n tokens j = min(i + int(1.5 * n), len(tokens)) while j > i + int(0.5 * n): # Decode the tokens and check for full stop or newline chunk = tokenizer.decode(tokens[i:j]) if chunk.endswith(".") or chunk.endswith("\n"): break j -= 1 # If no end of sentence found, use n tokens as the chunk size if j == i + int(0.5 * n): j = min(i + n, len(tokens)) yield tokens[i:j] i = j def convert_pdf_to_text(pdf_file_data, file_name): text = "\n---\n" text += f"file name: {file_name}\n content: \n" pdf_reader = PdfReader(pdf_file_data) text += "".join([page.extract_text() for page in pdf_reader.pages]) text += "\n---\n" return text def pdf_to_text(pdf_files_data, file_names): with ThreadPoolExecutor() as executor: results = executor.map(convert_pdf_to_text, pdf_files_data, file_names) return results st.title("PDF Utility") # Create tabs step01 = "Step 01: Upload Files" step02 = "Step 02: Edit Knowledge Base" step03 = "Step 03: Split text" tabs = [step01, step02, step03] if "selected_tab" not in st.session_state: st.session_state.selected_tab = step01 selected_tab = st.sidebar.radio( "Choose a tab", tabs, index=tabs.index(st.session_state.selected_tab)) if "text_content" not in st.session_state: st.session_state.text_content = "" # Define content for each tab if selected_tab == step02: st.subheader("Knowledge Base Text Area") st.session_state.text_content = st.text_area( "Knowledge Text Area", st.session_state.text_content, height=400) if st.button("Compute Tokens"): if model_choice == "OpenAI": num_tokens = len(encoding_openAI.encode( st.session_state.text_content)) st.write(f"Total number of tokens (OpenAI): {num_tokens}") else: tokens_count = len(encoding_anthropic.encode( st.session_state.text_content)) st.write(f"Total number of tokens (Anthropic): {tokens_count}") elif selected_tab == step01: st.subheader("Upload PDFs to Append to Knowledge Base") uploaded_files = st.file_uploader( "Upload PDF files", type="pdf", accept_multiple_files=True) if uploaded_files: pdf_files_data = [io.BytesIO(uploaded_file.read()) for uploaded_file in uploaded_files] file_names = [uploaded_file.name for uploaded_file in uploaded_files] if st.button('Convert to text'): converting_message = st.text("Converting PDFs...") converted_text = "\n".join(pdf_to_text(pdf_files_data, file_names)) st.session_state.text_content += converted_text converting_message.empty() st.session_state.selected_tab = step02 st.experimental_rerun() elif selected_tab == step03: st.subheader("Splitting Options") model_choice = st.selectbox( "Choose a Model", ["OpenAI", "Anthropic"], key="model_choice_selectbox") max_tokens = st.number_input( "Max number of tokens per chunk", min_value=100, value=8000, key="max_tokens_input") clean_text = st.checkbox("Clean text before encoding and splitting?") # Add prefix and postfix input options prefix = st.text_area("Prefix for each chunk:", "") postfix = st.text_area("Postfix for each chunk:", "") if clean_text: st.session_state.text_content = clean_text_content( st.session_state.text_content) chunks_generator = create_chunks( st.session_state.text_content, max_tokens, model_choice) chunks = [encoding_openAI.decode(chunk_tokens) if model_choice == "OpenAI" else encoding_anthropic.decode( chunk_tokens) for chunk_tokens in chunks_generator] for i, chunk in enumerate(chunks, 1): # Add prefix and postfix to each chunk chunk_with_affixes = f"{prefix}{chunk}{postfix}" chunk_content = st.text_area( f"Chunk {i} content:", chunk_with_affixes, height=200)