import asyncio import gradio as gr import numpy as np import time import json import os import tempfile import requests import logging from aiohttp import ClientSession from langchain.text_splitter import RecursiveCharacterTextSplitter from datasets import Dataset, load_dataset from tqdm import tqdm from tqdm.asyncio import tqdm_asyncio HF_TOKEN = os.getenv("HF_TOKEN") SEMAPHORE_BOUND = os.getenv("SEMAPHORE_BOUND", "5") logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class Chunker: def __init__(self, strategy, split_seq=".", chunk_len=512): self.split_seq = split_seq self.chunk_len = chunk_len if strategy == "recursive": self.split = RecursiveCharacterTextSplitter().split_text if strategy == "sequence": self.split = self.seq_splitter if strategy == "constant": self.split = self.const_splitter def seq_splitter(self, text): return text.split(self.split_seq) def const_splitter(self, text): return [ text[i * self.chunk_len:(i + 1) * self.chunk_len] for i in range(int(np.ceil(len(text) / self.chunk_len))) ] def generator(input_ds, input_text_col, chunker): for i in tqdm(range(len(input_ds))): chunks = chunker.split(input_ds[i][input_text_col]) for chunk in chunks: if chunk: yield {input_text_col: chunk} def chunk(input_ds, input_splits, input_text_col, output_ds, strategy, split_seq, chunk_len, private): input_splits = [spl.strip() for spl in input_splits.split(",") if spl] input_ds = load_dataset(input_ds, split="+".join(input_splits)) chunker = Chunker(strategy, split_seq, chunk_len) gen_kwargs = { "input_ds": input_ds, "input_text_col": input_text_col, "chunker": chunker } dataset = Dataset.from_generator(generator, gen_kwargs=gen_kwargs) dataset.push_to_hub( output_ds, private=private, token=HF_TOKEN ) logger.info("Done chunking") async def embed_sent(sentence, embed_in_text_col, semaphore, tei_url, tmp_file): async with semaphore: payload = { "inputs": sentence, "truncate": True } async with ClientSession( headers={ "Content-Type": "application/json", "Authorization": f"Bearer {HF_TOKEN}" } ) as session: async with session.post(tei_url, json=payload) as resp: if resp.status != 200: raise RuntimeError(await resp.text()) result = await resp.json() tmp_file.write( json.dumps({"vector": result[0], embed_in_text_col: sentence}) + "\n" ) async def embed_ds(input_ds, tei_url, embed_in_text_col, temp_file): semaphore = asyncio.BoundedSemaphore(int(SEMAPHORE_BOUND)) jobs = [ asyncio.create_task(embed_sent(row[embed_in_text_col], embed_in_text_col, semaphore, tei_url, temp_file)) for row in input_ds if row[embed_in_text_col].strip() ] logger.info(f"num chunks to embed: {len(jobs)}") tic = time.time() await tqdm_asyncio.gather(*jobs) logger.info(f"embed time: {time.time() - tic}") def wake_up_endpoint(url): n_loop = 0 while requests.get( url=url, headers={"Authorization": f"Bearer {HF_TOKEN}"} ).status_code != 200: time.sleep(2) n_loop += 1 if n_loop > 30: raise TimeoutError("TEI endpoint is unavailable") logger.info("TEI endpoint is up") def run_embed(input_ds, input_splits, embed_in_text_col, output_ds, tei_url, private): wake_up_endpoint(tei_url) input_splits = [spl.strip() for spl in input_splits.split(",") if spl] input_ds = load_dataset(input_ds, split="+".join(input_splits)) with tempfile.NamedTemporaryFile(mode="a", suffix=".jsonl") as temp_file: asyncio.run(embed_ds(input_ds, tei_url, embed_in_text_col, temp_file)) dataset = Dataset.from_json(temp_file.name) dataset.push_to_hub( output_ds, private=private, token=HF_TOKEN ) logger.info("Done embedding") def change_dropdown(choice): if choice == "recursive" or choice == "sequence": return [ gr.Textbox(visible=True), gr.Textbox(visible=False) ] else: return [ gr.Textbox(visible=False), gr.Textbox(visible=True) ] with gr.Blocks() as demo: gr.Markdown( """ ## Chunk your dataset before embedding """ ) with gr.Tab("Chunk"): chunk_in_ds = gr.Textbox(lines=1, label="Input dataset name") with gr.Row(): chunk_in_splits = gr.Textbox(lines=1, label="Input dataset splits", placeholder="train, test") chunk_in_text_col = gr.Textbox(lines=1, label="Input text column name", placeholder="text") with gr.Row(): chunk_out_ds = gr.Textbox(lines=1, label="Output dataset name", scale=6) chunk_private = gr.Checkbox(label="Make chunked dataset private") with gr.Row(): dropdown = gr.Dropdown( ["recursive", "sequence", "constant"], label="Chunking strategy", info="'recursive' uses a Langchain recursive tokenizer, 'sequence' splits texts by a chosen sequence, " "'constant' makes chunks of the constant size", scale=2 ) split_seq = gr.Textbox( lines=1, interactive=True, visible=False, label="Sequence", info="A text sequence to split on", placeholder="\n\n" ) chunk_len = gr.Textbox( lines=1, interactive=True, visible=False, label="Length", info="The length of chunks to split into", placeholder="512" ) dropdown.change(fn=change_dropdown, inputs=dropdown, outputs=[split_seq, chunk_len]) with gr.Row(): gr.ClearButton( components=[ chunk_in_ds, chunk_in_splits, chunk_in_text_col, chunk_out_ds, dropdown, split_seq, chunk_len, chunk_private ] ) chunk_btn = gr.Button("Chunk") chunk_btn.click( fn=chunk, inputs=[chunk_in_ds, chunk_in_splits, chunk_in_text_col, chunk_out_ds, dropdown, split_seq, chunk_len, chunk_private] ) with gr.Tab("Embed"): embed_in_ds = gr.Textbox(lines=1, label="Input dataset name") with gr.Row(): embed_in_splits = gr.Textbox(lines=1, label="Input dataset splits", placeholder="train, test") embed_in_text_col = gr.Textbox(lines=1, label="Input text column name", placeholder="text") with gr.Row(): embed_out_ds = gr.Textbox(lines=1, label="Output dataset name", scale=6) embed_private = gr.Checkbox(label="Make embedded dataset private") tei_url = gr.Textbox(lines=1, label="TEI endpoint url") with gr.Row(): gr.ClearButton( components=[embed_in_ds, embed_in_splits, embed_in_text_col, embed_out_ds, tei_url, embed_private] ) embed_btn = gr.Button("Run embed") embed_btn.click( fn=run_embed, inputs=[embed_in_ds, embed_in_splits, embed_in_text_col, embed_out_ds, tei_url, embed_private] ) demo.launch(debug=True)