import spaces import gradio as gr import logging import os import datamapplot import duckdb import numpy as np import requests from dotenv import load_dotenv from gradio_huggingfacehub_search import HuggingfaceHubSearch from bertopic import BERTopic from bertopic.representation import KeyBERTInspired from bertopic.representation import TextGeneration from huggingface_hub import HfApi, SpaceCard from sklearn.feature_extraction.text import CountVectorizer from sentence_transformers import SentenceTransformer from templates import REPRESENTATION_PROMPT, SPACE_REPO_CARD_CONTENT from torch import cuda, bfloat16 from transformers import ( BitsAndBytesConfig, AutoTokenizer, AutoModelForCausalLM, pipeline, ) """ TODOs: - Improve representation layer (Try with llamacpp or TextGeneration) - Make it run on Zero GPU - Try with more rows (Current: 50_000/10_000 -> Minimal Targett: 1_000_000/20_000) - Export interactive plots and serve their HTML content (It doesn't work with gr.HTML) """ load_dotenv() HF_TOKEN = os.getenv("HF_TOKEN") assert HF_TOKEN is not None, "You need to set HF_TOKEN in your environment variables" EXPORTS_REPOSITORY = os.getenv("EXPORTS_REPOSITORY") assert ( EXPORTS_REPOSITORY is not None ), "You need to set EXPORTS_REPOSITORY in your environment variables" MAX_ROWS = int(os.getenv("MAX_ROWS", "8_000")) CHUNK_SIZE = int(os.getenv("CHUNK_SIZE", "2_000")) DATASET_VIEWE_API_URL = "https://datasets-server.huggingface.co/" DATASETS_TOPICS_ORGANIZATION = os.getenv( "DATASETS_TOPICS_ORGANIZATION", "datasets-topics" ) USE_ARROW_STYLE = int(os.getenv("USE_ARROW_STYLE", "0")) USE_CUML = int(os.getenv("USE_CUML", "0")) if USE_CUML: from cuml.manifold import UMAP from cuml.cluster import HDBSCAN else: from umap import UMAP from hdbscan import HDBSCAN USE_LLM_TEXT_GENERATION = int(os.getenv("USE_LLM_TEXT_GENERATION", "1")) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) api = HfApi(token=HF_TOKEN) session = requests.Session() sentence_model = SentenceTransformer("all-MiniLM-L6-v2") # Representation model if USE_LLM_TEXT_GENERATION: bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_use_double_quant=True, bnb_4bit_compute_dtype=bfloat16, ) model_id = "meta-llama/Llama-2-7b-chat-hf" tokenizer = AutoTokenizer.from_pretrained(model_id) model = AutoModelForCausalLM.from_pretrained( model_id, trust_remote_code=True, quantization_config=bnb_config, device_map="auto", ) model.eval() generator = pipeline( model=model, tokenizer=tokenizer, task="text-generation", temperature=0.1, max_new_tokens=500, repetition_penalty=1.1, ) representation_model = TextGeneration(generator, prompt=REPRESENTATION_PROMPT) else: representation_model = KeyBERTInspired() vectorizer_model = CountVectorizer(stop_words="english") def get_split_rows(dataset, config, split): config_size = session.get( f"{DATASET_VIEWE_API_URL}/size?dataset={dataset}&config={config}", timeout=20, ).json() if "error" in config_size: raise Exception(f"Error fetching config size: {config_size['error']}") split_size = next( (s for s in config_size["size"]["splits"] if s["split"] == split), None, ) if split_size is None: raise Exception(f"Error fetching split {split} in config {config}") return split_size["num_rows"] def get_parquet_urls(dataset, config, split): parquet_files = session.get( f"{DATASET_VIEWE_API_URL}/parquet?dataset={dataset}&config={config}&split={split}", timeout=20, ).json() if "error" in parquet_files: raise Exception(f"Error fetching parquet files: {parquet_files['error']}") parquet_urls = [file["url"] for file in parquet_files["parquet_files"]] logging.debug(f"Parquet files: {parquet_urls}") return ",".join(f"'{url}'" for url in parquet_urls) def get_docs_from_parquet(parquet_urls, column, offset, limit): SQL_QUERY = f"SELECT {column} FROM read_parquet([{parquet_urls}]) LIMIT {limit} OFFSET {offset};" df = duckdb.sql(SQL_QUERY).to_df() return df[column].tolist() # @spaces.GPU def calculate_embeddings(docs): return sentence_model.encode(docs, show_progress_bar=True, batch_size=32) def calculate_n_neighbors_and_components(n_rows): n_neighbors = min(max(n_rows // 20, 15), 100) n_components = 10 if n_rows > 1000 else 5 # Higher components for larger datasets return n_neighbors, n_components # @spaces.GPU def fit_model(docs, embeddings, n_neighbors, n_components): umap_model = UMAP( n_neighbors=n_neighbors, n_components=n_components, min_dist=0.0, metric="cosine", random_state=42, ) hdbscan_model = HDBSCAN( min_cluster_size=max( 5, n_neighbors // 2 ), # Reducing min_cluster_size for fewer outliers metric="euclidean", cluster_selection_method="eom", prediction_data=True, ) new_model = BERTopic( language="english", # Sub-models embedding_model=sentence_model, # Step 1 - Extract embeddings umap_model=umap_model, # Step 2 - UMAP model hdbscan_model=hdbscan_model, # Step 3 - Cluster reduced embeddings vectorizer_model=vectorizer_model, # Step 4 - Tokenize topics representation_model=representation_model, # Step 5 - Label topics # Hyperparameters top_n_words=10, verbose=True, min_topic_size=n_neighbors, # Coherent with n_neighbors? ) logging.info("Fitting new model") new_model.fit(docs, embeddings) logging.info("End fitting new model") return new_model def _push_to_hub( dataset_id, file_path, ): logging.info(f"Pushing file to hub: {dataset_id} on file {file_path}") file_name = file_path.split("/")[-1] try: logging.info(f"About to push {file_path} - {dataset_id}") api.upload_file( path_or_fileobj=file_path, path_in_repo=file_name, repo_id=EXPORTS_REPOSITORY, repo_type="dataset", ) except Exception as e: logging.info("Failed to push file", e) raise def create_space_with_content(dataset_id, html_file_path): repo_id = f"{DATASETS_TOPICS_ORGANIZATION}/{dataset_id.replace('/', '-')}" logging.info(f"Creating space with content: {repo_id} on file {html_file_path}") api.create_repo( repo_id=repo_id, repo_type="space", private=False, exist_ok=True, token=HF_TOKEN, space_sdk="static", ) SpaceCard( content=SPACE_REPO_CARD_CONTENT.format(dataset_id=dataset_id) ).push_to_hub(repo_id=repo_id, repo_type="space", token=HF_TOKEN) api.upload_file( path_or_fileobj=html_file_path, path_in_repo="index.html", repo_type="space", repo_id=repo_id, token=HF_TOKEN, ) logging.info(f"Space creation done") return repo_id @spaces.GPU(duration=60*5) def generate_topics(dataset, config, split, column, nested_column, plot_type): logging.info( f"Generating topics for {dataset=} {config=} {split=} {column=} {nested_column=} {plot_type=}" ) parquet_urls = get_parquet_urls(dataset, config, split) split_rows = get_split_rows(dataset, config, split) logging.info(f"Split rows: {split_rows}") limit = min(split_rows, MAX_ROWS) n_neighbors, n_components = calculate_n_neighbors_and_components(limit) reduce_umap_model = UMAP( n_neighbors=n_neighbors, n_components=2, # For visualization, keeping it for 2D min_dist=0.0, metric="cosine", random_state=42, ) offset = 0 rows_processed = 0 base_model = None all_docs = [] reduced_embeddings_list = [] topics_info, topic_plot = None, None full_processing = split_rows <= MAX_ROWS message = ( f"⚙️ Processing full dataset: 0 of ({split_rows} rows)" if full_processing else f"⚙️ Processing partial dataset 0 of ({limit} rows)" ) yield ( gr.Accordion(open=False), gr.DataFrame(value=[], interactive=False, visible=True), gr.Plot(value=None, visible=True), gr.Label({message: rows_processed / limit}, visible=True), "", "", ) while offset < limit: docs = get_docs_from_parquet(parquet_urls, column, offset, CHUNK_SIZE) if not docs: break logging.info( f"----> Processing chunk: {offset=} {CHUNK_SIZE=} with {len(docs)} docs" ) embeddings = calculate_embeddings(docs) new_model = fit_model(docs, embeddings, n_neighbors, n_components) if base_model is None: base_model = new_model else: updated_model = BERTopic.merge_models([base_model, new_model]) nr_new_topics = len(set(updated_model.topics_)) - len( set(base_model.topics_) ) new_topics = list(updated_model.topic_labels_.values())[-nr_new_topics:] logging.info(f"The following topics are newly found: {new_topics}") base_model = updated_model reduced_embeddings = reduce_umap_model.fit_transform(embeddings) reduced_embeddings_list.append(reduced_embeddings) all_docs.extend(docs) reduced_embeddings_array = np.vstack(reduced_embeddings_list) topics_info = base_model.get_topic_info() all_topics, _ = base_model.transform(all_docs) all_topics = np.array(all_topics) sub_title = ( f"Data map for the entire dataset ({limit} rows) using the column '{column}'" if full_processing else f"Data map for a sample of the dataset (first {limit} rows) using the column '{column}'" ) topic_plot = ( base_model.visualize_document_datamap( docs=all_docs, reduced_embeddings=reduced_embeddings_array, title=dataset, sub_title=sub_title, width=800, height=700, arrowprops={ "arrowstyle": "wedge,tail_width=0.5", "connectionstyle": "arc3,rad=0.05", "linewidth": 0, "fc": "#33333377", }, dynamic_label_size=True, label_wrap_width=12, label_over_points=True, max_font_size=36, min_font_size=4, ) if plot_type == "DataMapPlot" else base_model.visualize_documents( docs=all_docs, reduced_embeddings=reduced_embeddings_array, custom_labels=True, title=dataset, ) ) rows_processed += len(docs) progress = min(rows_processed / limit, 1.0) logging.info(f"Progress: {progress} % - {rows_processed} of {limit}") message = ( f"⚙️ Processing full dataset: {rows_processed} of {limit}" if full_processing else f"⚙️ Processing partial dataset: {rows_processed} of {limit} rows" ) yield ( gr.Accordion(open=False), topics_info, topic_plot, gr.Label({message: progress}, visible=True), "", "", ) offset += CHUNK_SIZE logging.info("Finished processing all data") plot_png = f"{dataset.replace('/', '-')}-{plot_type.lower()}.png" if plot_type == "DataMapPlot": topic_plot.savefig(plot_png, format="png", dpi=300) else: topic_plot.write_image(plot_png) _push_to_hub(dataset, plot_png) all_topics, _ = base_model.transform(all_docs) topic_info = base_model.get_topic_info() topic_names = {row["Topic"]: row["Name"] for index, row in topic_info.iterrows()} topic_names_array = np.array( [ topic_names.get(topic, "No Topic").split("_")[1].strip("-") for topic in all_topics ] ) dataset_clear_name = dataset.replace("/", "-") interactive_plot = datamapplot.create_interactive_plot( reduced_embeddings_array, topic_names_array, hover_text=all_docs, title=dataset, sub_title=sub_title.replace( "dataset", f"dataset", ), enable_search=True, # TODO: Export data to .arrow and also serve it inline_data=True, # offline_data_prefix=dataset_clear_name, initial_zoom_fraction=0.8, ) html_content = str(interactive_plot) html_file_path = f"{dataset_clear_name}.html" with open(html_file_path, "w", encoding="utf-8") as html_file: html_file.write(html_content) space_id = create_space_with_content(dataset, html_file_path) plot_png_link = ( f"https://huggingface.co/datasets/{EXPORTS_REPOSITORY}/blob/main/{plot_png}" ) space_link = f"https://huggingface.co/spaces/{space_id}" yield ( gr.Accordion(open=False), topics_info, topic_plot, gr.Label( {f"✅ Done: {rows_processed} rows have been processed": 1.0}, visible=True ), f"[![Download as PNG](https://img.shields.io/badge/Download_as-PNG-red)]({plot_png_link})", f"[![Go to interactive plot](https://img.shields.io/badge/%F0%9F%A4%97%20Hugging%20Face-Space-blue)]({space_link})", ) cuda.empty_cache() with gr.Blocks() as demo: gr.Markdown("# 💠 Dataset Topic Discovery 🔭") gr.Markdown("## Select dataset and text column") data_details_accordion = gr.Accordion("Data details", open=True) with data_details_accordion: with gr.Row(): with gr.Column(scale=3): dataset_name = HuggingfaceHubSearch( label="Hub Dataset ID", placeholder="Search for dataset id on Huggingface", search_type="dataset", ) subset_dropdown = gr.Dropdown(label="Subset", visible=False) split_dropdown = gr.Dropdown(label="Split", visible=False) with gr.Accordion("Dataset preview", open=False): @gr.render(inputs=[dataset_name, subset_dropdown, split_dropdown]) def embed(name, subset, split): html_code = f""" """ return gr.HTML(value=html_code) with gr.Row(): text_column_dropdown = gr.Dropdown(label="Text column name") nested_text_column_dropdown = gr.Dropdown( label="Nested text column name", visible=False ) plot_type_radio = gr.Radio( ["DataMapPlot", "Plotly"], value="DataMapPlot", label="Choose the plot type", interactive=True, ) generate_button = gr.Button("Generate Topics", variant="primary") gr.Markdown("## Data map") full_topics_generation_label = gr.Label(visible=False, show_label=False) with gr.Row(): open_png_label = gr.Markdown() open_space_label = gr.Markdown() topics_plot = gr.Plot() with gr.Accordion("Topics Info", open=False): topics_df = gr.DataFrame(interactive=False, visible=True) generate_button.click( generate_topics, inputs=[ dataset_name, subset_dropdown, split_dropdown, text_column_dropdown, nested_text_column_dropdown, plot_type_radio, ], outputs=[ data_details_accordion, topics_df, topics_plot, full_topics_generation_label, open_png_label, open_space_label, ], ) def _resolve_dataset_selection( dataset: str, default_subset: str, default_split: str, text_feature ): if "/" not in dataset.strip().strip("/"): return { subset_dropdown: gr.Dropdown(visible=False), split_dropdown: gr.Dropdown(visible=False), text_column_dropdown: gr.Dropdown(label="Text column name"), nested_text_column_dropdown: gr.Dropdown(visible=False), } info_resp = session.get( f"{DATASET_VIEWE_API_URL}/info?dataset={dataset}", timeout=20 ).json() if "error" in info_resp: return { subset_dropdown: gr.Dropdown(visible=False), split_dropdown: gr.Dropdown(visible=False), text_column_dropdown: gr.Dropdown(label="Text column name"), nested_text_column_dropdown: gr.Dropdown(visible=False), } subsets: list[str] = list(info_resp["dataset_info"]) subset = default_subset if default_subset in subsets else subsets[0] splits: list[str] = list(info_resp["dataset_info"][subset]["splits"]) split = default_split if default_split in splits else splits[0] features = info_resp["dataset_info"][subset]["features"] def _is_string_feature(feature): return isinstance(feature, dict) and feature.get("dtype") == "string" text_features = [ feature_name for feature_name, feature in features.items() if _is_string_feature(feature) ] nested_features = [ feature_name for feature_name, feature in features.items() if isinstance(feature, dict) and isinstance(next(iter(feature.values())), dict) ] nested_text_features = [ feature_name for feature_name in nested_features if any( _is_string_feature(nested_feature) for nested_feature in features[feature_name].values() ) ] if not text_feature: return { subset_dropdown: gr.Dropdown( value=subset, choices=subsets, visible=len(subsets) > 1 ), split_dropdown: gr.Dropdown( value=split, choices=splits, visible=len(splits) > 1 ), text_column_dropdown: gr.Dropdown( choices=text_features + nested_text_features, label="Text column name", ), nested_text_column_dropdown: gr.Dropdown(visible=False), } if text_feature in nested_text_features: nested_keys = [ feature_name for feature_name, feature in features[text_feature].items() if _is_string_feature(feature) ] return { subset_dropdown: gr.Dropdown( value=subset, choices=subsets, visible=len(subsets) > 1 ), split_dropdown: gr.Dropdown( value=split, choices=splits, visible=len(splits) > 1 ), text_column_dropdown: gr.Dropdown( choices=text_features + nested_text_features, label="Text column name", ), nested_text_column_dropdown: gr.Dropdown( value=nested_keys[0], choices=nested_keys, label="Nested text column name", visible=True, ), } return { subset_dropdown: gr.Dropdown( value=subset, choices=subsets, visible=len(subsets) > 1 ), split_dropdown: gr.Dropdown( value=split, choices=splits, visible=len(splits) > 1 ), text_column_dropdown: gr.Dropdown( choices=text_features + nested_text_features, label="Text column name" ), nested_text_column_dropdown: gr.Dropdown(visible=False), } @dataset_name.change( inputs=[dataset_name], outputs=[ subset_dropdown, split_dropdown, text_column_dropdown, nested_text_column_dropdown, ], ) def show_input_from_subset_dropdown(dataset: str) -> dict: return _resolve_dataset_selection( dataset, default_subset="default", default_split="train", text_feature=None ) @subset_dropdown.change( inputs=[dataset_name, subset_dropdown], outputs=[ subset_dropdown, split_dropdown, text_column_dropdown, nested_text_column_dropdown, ], ) def show_input_from_subset_dropdown(dataset: str, subset: str) -> dict: return _resolve_dataset_selection( dataset, default_subset=subset, default_split="train", text_feature=None ) @split_dropdown.change( inputs=[dataset_name, subset_dropdown, split_dropdown], outputs=[ subset_dropdown, split_dropdown, text_column_dropdown, nested_text_column_dropdown, ], ) def show_input_from_split_dropdown(dataset: str, subset: str, split: str) -> dict: return _resolve_dataset_selection( dataset, default_subset=subset, default_split=split, text_feature=None ) @text_column_dropdown.change( inputs=[dataset_name, subset_dropdown, split_dropdown, text_column_dropdown], outputs=[ subset_dropdown, split_dropdown, text_column_dropdown, nested_text_column_dropdown, ], ) def show_input_from_text_column_dropdown( dataset: str, subset: str, split: str, text_column ) -> dict: return _resolve_dataset_selection( dataset, default_subset=subset, default_split=split, text_feature=text_column, ) demo.launch()