import numpy as np import pandas as pd import gradio as gr import plotly.graph_objects as go from plotly.subplots import make_subplots from bertopic._utils import check_documents_type, validate_distance_matrix from bertopic.plotting._hierarchy import _get_annotations import plotly.figure_factory as ff from packaging import version import math from umap import UMAP from typing import List, Union, Callable from scipy.sparse import csr_matrix from scipy.cluster import hierarchy as sch from sklearn.metrics.pairwise import cosine_similarity from sklearn import __version__ as sklearn_version from tqdm import tqdm import itertools import numpy as np # Following adapted from Bertopic original implementation here (Maarten Grootendorst): https://github.com/MaartenGr/BERTopic/blob/master/bertopic/plotting/_documents.py def visualize_documents_custom(topic_model, docs: List[str], hover_labels: List[str], topics: List[int] = None, embeddings: np.ndarray = None, reduced_embeddings: np.ndarray = None, sample: float = None, hide_annotations: bool = False, hide_document_hover: bool = False, custom_labels: Union[bool, str] = False, title: str = "Documents and Topics", width: int = 1200, height: int = 750, progress=gr.Progress(track_tqdm=True)): """ Visualize documents and their topics in 2D Arguments: topic_model: A fitted BERTopic instance. docs: The documents you used when calling either `fit` or `fit_transform` topics: A selection of topics to visualize. Not to be confused with the topics that you get from `.fit_transform`. For example, if you want to visualize only topics 1 through 5: `topics = [1, 2, 3, 4, 5]`. embeddings: The embeddings of all documents in `docs`. reduced_embeddings: The 2D reduced embeddings of all documents in `docs`. sample: The percentage of documents in each topic that you would like to keep. Value can be between 0 and 1. Setting this value to, for example, 0.1 (10% of documents in each topic) makes it easier to visualize millions of documents as a subset is chosen. hide_annotations: Hide the names of the traces on top of each cluster. hide_document_hover: Hide the content of the documents when hovering over specific points. Helps to speed up generation of visualization. custom_labels: If bool, whether to use custom topic labels that were defined using `topic_model.set_topic_labels`. If `str`, it uses labels from other aspects, e.g., "Aspect1". title: Title of the plot. width: The width of the figure. height: The height of the figure. Examples: To visualize the topics simply run: ```python topic_model.visualize_documents(docs) ``` Do note that this re-calculates the embeddings and reduces them to 2D. The advised and prefered pipeline for using this function is as follows: ```python from sklearn.datasets import fetch_20newsgroups from sentence_transformers import SentenceTransformer from bertopic import BERTopic from umap import UMAP # Prepare embeddings docs = fetch_20newsgroups(subset='all', remove=('headers', 'footers', 'quotes'))['data'] sentence_model = SentenceTransformer("all-MiniLM-L6-v2") embeddings = sentence_model.encode(docs, show_progress_bar=False) # Train BERTopic topic_model = BERTopic().fit(docs, embeddings) # Reduce dimensionality of embeddings, this step is optional # reduced_embeddings = UMAP(n_neighbors=10, n_components=2, min_dist=0.0, metric='cosine').fit_transform(embeddings) # Run the visualization with the original embeddings topic_model.visualize_documents(docs, embeddings=embeddings) # Or, if you have reduced the original embeddings already: topic_model.visualize_documents(docs, reduced_embeddings=reduced_embeddings) ``` Or if you want to save the resulting figure: ```python fig = topic_model.visualize_documents(docs, reduced_embeddings=reduced_embeddings) fig.write_html("path/to/file.html") ``` """ topic_per_doc = topic_model.topics_ # Add
tags to hover labels to get them to appear on multiple lines def wrap_by_word(s, n): '''returns a string up to 300 words where \\n is inserted between every n words''' a = s.split()[:300] ret = '' for i in range(0, len(a), n): ret += ' '.join(a[i:i+n]) + '
' return ret # Apply the function to every element in the list hover_labels = [wrap_by_word(s, n=20) for s in hover_labels] # Sample the data to optimize for visualization and dimensionality reduction if sample is None or sample > 1: sample = 1 indices = [] for topic in set(topic_per_doc): s = np.where(np.array(topic_per_doc) == topic)[0] size = len(s) if len(s) < 100 else int(len(s) * sample) indices.extend(np.random.choice(s, size=size, replace=False)) indices = np.array(indices) df = pd.DataFrame({"topic": np.array(topic_per_doc)[indices]}) df["doc"] = [docs[index] for index in indices] df["hover_labels"] = [hover_labels[index] for index in indices] df["topic"] = [topic_per_doc[index] for index in indices] # Extract embeddings if not already done if sample is None: if embeddings is None and reduced_embeddings is None: embeddings_to_reduce = topic_model._extract_embeddings(df.doc.to_list(), method="document") else: embeddings_to_reduce = embeddings else: if embeddings is not None: embeddings_to_reduce = embeddings[indices] elif embeddings is None and reduced_embeddings is None: embeddings_to_reduce = topic_model._extract_embeddings(df.doc.to_list(), method="document") # Reduce input embeddings if reduced_embeddings is None: umap_model = UMAP(n_neighbors=10, n_components=2, min_dist=0.0, metric='cosine').fit(embeddings_to_reduce) embeddings_2d = umap_model.embedding_ elif sample is not None and reduced_embeddings is not None: embeddings_2d = reduced_embeddings[indices] elif sample is None and reduced_embeddings is not None: embeddings_2d = reduced_embeddings unique_topics = set(topic_per_doc) if topics is None: topics = unique_topics # Combine data df["x"] = embeddings_2d[:, 0] df["y"] = embeddings_2d[:, 1] # Prepare text and names trace_name_char_length = 60 if isinstance(custom_labels, str): names = [[[str(topic), None]] + topic_model.topic_aspects_[custom_labels][topic] for topic in unique_topics] names = ["_".join([label[0] for label in labels[:4]]) for labels in names] names = [label if len(label) < 30 else label[:27] + "..." for label in names] elif topic_model.custom_labels_ is not None and custom_labels: #print("Using custom labels: ", topic_model.custom_labels_) #names = [topic_model.custom_labels_[topic + topic_model._outliers] for topic in unique_topics] # Limit label length to 100 chars names = [label[:trace_name_char_length] for label in (topic_model.custom_labels_[topic + topic_model._outliers] for topic in unique_topics)] else: #print("Not using custom labels") # Limit label length to 100 chars names = [f"{topic} " + ", ".join([word for word, value in topic_model.get_topic(topic)][:3])[:trace_name_char_length] for topic in unique_topics] #names = [f"{topic} " + ", ".join([word for word, value in topic_model.get_topic(topic)][:3]) for topic in unique_topics] #print(names) # Visualize fig = go.Figure() # Outliers and non-selected topics non_selected_topics = set(unique_topics).difference(topics) if len(non_selected_topics) == 0: non_selected_topics = [-1] selection = df.loc[df.topic.isin(non_selected_topics), :] selection["text"] = "" selection.loc[len(selection), :] = [None, None, None, selection.x.mean(), selection.y.mean(), "Other documents"] fig.add_trace( go.Scattergl( x=selection.x, y=selection.y, hovertext=selection.hover_labels if not hide_document_hover else None, hoverinfo="text", mode='markers+text', name="other", showlegend=False, marker=dict(color='#CFD8DC', size=5, opacity=0.5), hoverlabel=dict(align='left') ) ) # Selected topics for name, topic in zip(names, unique_topics): #print(name) #print(topic) if topic in topics and topic != -1: selection = df.loc[df.topic == topic, :] selection["text"] = "" if not hide_annotations: selection.loc[len(selection), :] = [None, None, selection.x.mean(), selection.y.mean(), name] fig.add_trace( go.Scattergl( x=selection.x, y=selection.y, hovertext=selection.hover_labels if not hide_document_hover else None, hoverinfo="text", text=selection.text, mode='markers+text', name=name, textfont=dict( size=12, ), marker=dict(size=5, opacity=0.5), hoverlabel=dict(align='left') )) # Add grid in a 'plus' shape x_range = (df.x.min() - abs((df.x.min()) * .15), df.x.max() + abs((df.x.max()) * .15)) y_range = (df.y.min() - abs((df.y.min()) * .15), df.y.max() + abs((df.y.max()) * .15)) fig.add_shape(type="line", x0=sum(x_range) / 2, y0=y_range[0], x1=sum(x_range) / 2, y1=y_range[1], line=dict(color="#CFD8DC", width=2)) fig.add_shape(type="line", x0=x_range[0], y0=sum(y_range) / 2, x1=x_range[1], y1=sum(y_range) / 2, line=dict(color="#9E9E9E", width=2)) fig.add_annotation(x=x_range[0], y=sum(y_range) / 2, text="D1", showarrow=False, yshift=10) fig.add_annotation(y=y_range[1], x=sum(x_range) / 2, text="D2", showarrow=False, xshift=10) # Stylize layout fig.update_layout( template="simple_white", title={ 'text': f"{title}", 'x': 0.5, 'xanchor': 'center', 'yanchor': 'top', 'font': dict( size=22, color="Black") }, hoverlabel_align = 'left', width=width, height=height ) fig.update_xaxes(visible=False) fig.update_yaxes(visible=False) return fig def hierarchical_topics_custom(self, docs: List[str], linkage_function: Callable[[csr_matrix], np.ndarray] = None, distance_function: Callable[[csr_matrix], csr_matrix] = None, progress=gr.Progress(track_tqdm=True)) -> pd.DataFrame: """ Create a hierarchy of topics To create this hierarchy, BERTopic needs to be already fitted once. Then, a hierarchy is calculated on the distance matrix of the c-TF-IDF representation using `scipy.cluster.hierarchy.linkage`. Based on that hierarchy, we calculate the topic representation at each merged step. This is a local representation, as we only assume that the chosen step is merged and not all others which typically improves the topic representation. Arguments: docs: The documents you used when calling either `fit` or `fit_transform` linkage_function: The linkage function to use. Default is: `lambda x: sch.linkage(x, 'ward', optimal_ordering=True)` distance_function: The distance function to use on the c-TF-IDF matrix. Default is: `lambda x: 1 - cosine_similarity(x)`. You can pass any function that returns either a square matrix of shape (n_samples, n_samples) with zeros on the diagonal and non-negative values or condensed distance matrix of shape (n_samples * (n_samples - 1) / 2,) containing the upper triangular of the distance matrix. Returns: hierarchical_topics: A dataframe that contains a hierarchy of topics represented by their parents and their children Examples: ```python from bertopic import BERTopic topic_model = BERTopic() topics, probs = topic_model.fit_transform(docs) hierarchical_topics = topic_model.hierarchical_topics(docs) ``` A custom linkage function can be used as follows: ```python from scipy.cluster import hierarchy as sch from bertopic import BERTopic topic_model = BERTopic() topics, probs = topic_model.fit_transform(docs) # Hierarchical topics linkage_function = lambda x: sch.linkage(x, 'ward', optimal_ordering=True) hierarchical_topics = topic_model.hierarchical_topics(docs, linkage_function=linkage_function) ``` """ check_documents_type(docs) if distance_function is None: distance_function = lambda x: 1 - cosine_similarity(x) if linkage_function is None: linkage_function = lambda x: sch.linkage(x, 'ward', optimal_ordering=True) # Calculate distance embeddings = self.c_tf_idf_[self._outliers:] X = distance_function(embeddings) X = validate_distance_matrix(X, embeddings.shape[0]) # Use the 1-D condensed distance matrix as an input instead of the raw distance matrix Z = linkage_function(X) # Calculate basic bag-of-words to be iteratively merged later documents = pd.DataFrame({"Document": docs, "ID": range(len(docs)), "Topic": self.topics_}) documents_per_topic = documents.groupby(['Topic'], as_index=False).agg({'Document': ' '.join}) documents_per_topic = documents_per_topic.loc[documents_per_topic.Topic != -1, :] clean_documents = self._preprocess_text(documents_per_topic.Document.values) # Scikit-Learn Deprecation: get_feature_names is deprecated in 1.0 # and will be removed in 1.2. Please use get_feature_names_out instead. if version.parse(sklearn_version) >= version.parse("1.0.0"): words = self.vectorizer_model.get_feature_names_out() else: words = self.vectorizer_model.get_feature_names() bow = self.vectorizer_model.transform(clean_documents) # Extract clusters hier_topics = pd.DataFrame(columns=["Parent_ID", "Parent_Name", "Topics", "Child_Left_ID", "Child_Left_Name", "Child_Right_ID", "Child_Right_Name"]) for index in tqdm(range(len(Z))): # Find clustered documents clusters = sch.fcluster(Z, t=Z[index][2], criterion='distance') - self._outliers nr_clusters = len(clusters) # Extract first topic we find to get the set of topics in a merged topic topic = None val = Z[index][0] while topic is None: if val - len(clusters) < 0: topic = int(val) else: val = Z[int(val - len(clusters))][0] clustered_topics = [i for i, x in enumerate(clusters) if x == clusters[topic]] # Group bow per cluster, calculate c-TF-IDF and extract words grouped = csr_matrix(bow[clustered_topics].sum(axis=0)) c_tf_idf = self.ctfidf_model.transform(grouped) selection = documents.loc[documents.Topic.isin(clustered_topics), :] selection.Topic = 0 words_per_topic = self._extract_words_per_topic(words, selection, c_tf_idf, calculate_aspects=False) # Extract parent's name and ID parent_id = index + len(clusters) parent_name = ", ".join([x[0] for x in words_per_topic[0]][:5]) # Extract child's name and ID Z_id = Z[index][0] child_left_id = Z_id if Z_id - nr_clusters < 0 else Z_id - nr_clusters if Z_id - nr_clusters < 0: child_left_name = ", ".join([x[0] for x in self.get_topic(Z_id)][:5]) else: child_left_name = hier_topics.iloc[int(child_left_id)].Parent_Name # Extract child's name and ID Z_id = Z[index][1] child_right_id = Z_id if Z_id - nr_clusters < 0 else Z_id - nr_clusters if Z_id - nr_clusters < 0: child_right_name = ", ".join([x[0] for x in self.get_topic(Z_id)][:5]) else: child_right_name = hier_topics.iloc[int(child_right_id)].Parent_Name # Save results hier_topics.loc[len(hier_topics), :] = [parent_id, parent_name, clustered_topics, int(Z[index][0]), child_left_name, int(Z[index][1]), child_right_name] hier_topics["Distance"] = Z[:, 2] hier_topics = hier_topics.sort_values("Parent_ID", ascending=False) hier_topics[["Parent_ID", "Child_Left_ID", "Child_Right_ID"]] = hier_topics[["Parent_ID", "Child_Left_ID", "Child_Right_ID"]].astype(str) return hier_topics def visualize_hierarchy_custom(topic_model, orientation: str = "left", topics: List[int] = None, top_n_topics: int = None, custom_labels: Union[bool, str] = False, title: str = "Hierarchical Clustering", width: int = 1000, height: int = 600, hierarchical_topics: pd.DataFrame = None, linkage_function: Callable[[csr_matrix], np.ndarray] = None, distance_function: Callable[[csr_matrix], csr_matrix] = None, color_threshold: int = 1) -> go.Figure: """ Visualize a hierarchical structure of the topics A ward linkage function is used to perform the hierarchical clustering based on the cosine distance matrix between topic embeddings. Arguments: topic_model: A fitted BERTopic instance. orientation: The orientation of the figure. Either 'left' or 'bottom' topics: A selection of topics to visualize top_n_topics: Only select the top n most frequent topics custom_labels: If bool, whether to use custom topic labels that were defined using `topic_model.set_topic_labels`. If `str`, it uses labels from other aspects, e.g., "Aspect1". NOTE: Custom labels are only generated for the original un-merged topics. title: Title of the plot. width: The width of the figure. Only works if orientation is set to 'left' height: The height of the figure. Only works if orientation is set to 'bottom' hierarchical_topics: A dataframe that contains a hierarchy of topics represented by their parents and their children. NOTE: The hierarchical topic names are only visualized if both `topics` and `top_n_topics` are not set. linkage_function: The linkage function to use. Default is: `lambda x: sch.linkage(x, 'ward', optimal_ordering=True)` NOTE: Make sure to use the same `linkage_function` as used in `topic_model.hierarchical_topics`. distance_function: The distance function to use on the c-TF-IDF matrix. Default is: `lambda x: 1 - cosine_similarity(x)`. You can pass any function that returns either a square matrix of shape (n_samples, n_samples) with zeros on the diagonal and non-negative values or condensed distance matrix of shape (n_samples * (n_samples - 1) / 2,) containing the upper triangular of the distance matrix. NOTE: Make sure to use the same `distance_function` as used in `topic_model.hierarchical_topics`. color_threshold: Value at which the separation of clusters will be made which will result in different colors for different clusters. A higher value will typically lead in less colored clusters. Returns: fig: A plotly figure Examples: To visualize the hierarchical structure of topics simply run: ```python topic_model.visualize_hierarchy() ``` If you also want the labels visualized of hierarchical topics, run the following: ```python # Extract hierarchical topics and their representations hierarchical_topics = topic_model.hierarchical_topics(docs) # Visualize these representations topic_model.visualize_hierarchy(hierarchical_topics=hierarchical_topics) ``` If you want to save the resulting figure: ```python fig = topic_model.visualize_hierarchy() fig.write_html("path/to/file.html") ``` """ if distance_function is None: distance_function = lambda x: 1 - cosine_similarity(x) if linkage_function is None: linkage_function = lambda x: sch.linkage(x, 'ward', optimal_ordering=True) # Select topics based on top_n and topics args freq_df = topic_model.get_topic_freq() freq_df = freq_df.loc[freq_df.Topic != -1, :] if topics is not None: topics = list(topics) elif top_n_topics is not None: topics = sorted(freq_df.Topic.to_list()[:top_n_topics]) else: topics = sorted(freq_df.Topic.to_list()) # Select embeddings all_topics = sorted(list(topic_model.get_topics().keys())) indices = np.array([all_topics.index(topic) for topic in topics]) # Select topic embeddings if topic_model.c_tf_idf_ is not None: embeddings = topic_model.c_tf_idf_[indices] else: embeddings = np.array(topic_model.topic_embeddings_)[indices] # Annotations if hierarchical_topics is not None and len(topics) == len(freq_df.Topic.to_list()): annotations = _get_annotations(topic_model=topic_model, hierarchical_topics=hierarchical_topics, embeddings=embeddings, distance_function=distance_function, linkage_function=linkage_function, orientation=orientation, custom_labels=custom_labels) else: annotations = None # wrap distance function to validate input and return a condensed distance matrix distance_function_viz = lambda x: validate_distance_matrix( distance_function(x), embeddings.shape[0]) # Create dendogram fig = ff.create_dendrogram(embeddings, orientation=orientation, distfun=distance_function_viz, linkagefun=linkage_function, hovertext=annotations, color_threshold=color_threshold) # Create nicer labels axis = "yaxis" if orientation == "left" else "xaxis" if isinstance(custom_labels, str): new_labels = [[[str(x), None]] + topic_model.topic_aspects_[custom_labels][x] for x in fig.layout[axis]["ticktext"]] new_labels = [", ".join([label[0] for label in labels[:4]]) for labels in new_labels] new_labels = [label if len(label) < 30 else label[:27] + "..." for label in new_labels] elif topic_model.custom_labels_ is not None and custom_labels: new_labels = [topic_model.custom_labels_[topics[int(x)] + topic_model._outliers] for x in fig.layout[axis]["ticktext"]] else: new_labels = [[[str(topics[int(x)]), None]] + topic_model.get_topic(topics[int(x)]) for x in fig.layout[axis]["ticktext"]] new_labels = [", ".join([label[0] for label in labels[:4]]) for labels in new_labels] new_labels = [label if len(label) < 30 else label[:27] + "..." for label in new_labels] # Stylize layout fig.update_layout( plot_bgcolor='#ECEFF1', template="plotly_white", title={ 'text': f"{title}", 'x': 0.5, 'xanchor': 'center', 'yanchor': 'top', 'font': dict( size=22, color="Black") }, hoverlabel=dict( bgcolor="white", font_size=16, font_family="Rockwell" ), ) # Stylize orientation if orientation == "left": fig.update_layout(height=200 + (15 * len(topics)), width=width, yaxis=dict(tickmode="array", ticktext=new_labels)) # Fix empty space on the bottom of the graph y_max = max([trace['y'].max() + 5 for trace in fig['data']]) y_min = min([trace['y'].min() - 5 for trace in fig['data']]) fig.update_layout(yaxis=dict(range=[y_min, y_max])) else: fig.update_layout(width=200 + (15 * len(topics)), height=height, xaxis=dict(tickmode="array", ticktext=new_labels)) if hierarchical_topics is not None: for index in [0, 3]: axis = "x" if orientation == "left" else "y" xs = [data["x"][index] for data in fig.data if (data["text"] and data[axis][index] > 0)] ys = [data["y"][index] for data in fig.data if (data["text"] and data[axis][index] > 0)] hovertext = [data["text"][index] for data in fig.data if (data["text"] and data[axis][index] > 0)] fig.add_trace(go.Scatter(x=xs, y=ys, marker_color='black', hovertext=hovertext, hoverinfo="text", mode='markers', showlegend=False)) return fig def visualize_hierarchical_documents_custom(topic_model, docs: List[str], hover_labels: List[str], hierarchical_topics: pd.DataFrame, topics: List[int] = None, embeddings: np.ndarray = None, reduced_embeddings: np.ndarray = None, sample: Union[float, int] = None, hide_annotations: bool = False, hide_document_hover: bool = True, nr_levels: int = 10, level_scale: str = 'linear', custom_labels: Union[bool, str] = False, title: str = "Hierarchical Documents and Topics", width: int = 1200, height: int = 750, progress=gr.Progress(track_tqdm=True)) -> go.Figure: """ Visualize documents and their topics in 2D at different levels of hierarchy Arguments: docs: The documents you used when calling either `fit` or `fit_transform` hierarchical_topics: A dataframe that contains a hierarchy of topics represented by their parents and their children topics: A selection of topics to visualize. Not to be confused with the topics that you get from `.fit_transform`. For example, if you want to visualize only topics 1 through 5: `topics = [1, 2, 3, 4, 5]`. embeddings: The embeddings of all documents in `docs`. reduced_embeddings: The 2D reduced embeddings of all documents in `docs`. sample: The percentage of documents in each topic that you would like to keep. Value can be between 0 and 1. Setting this value to, for example, 0.1 (10% of documents in each topic) makes it easier to visualize millions of documents as a subset is chosen. hide_annotations: Hide the names of the traces on top of each cluster. hide_document_hover: Hide the content of the documents when hovering over specific points. Helps to speed up generation of visualizations. nr_levels: The number of levels to be visualized in the hierarchy. First, the distances in `hierarchical_topics.Distance` are split in `nr_levels` lists of distances. Then, for each list of distances, the merged topics are selected that have a distance less or equal to the maximum distance of the selected list of distances. NOTE: To get all possible merged steps, make sure that `nr_levels` is equal to the length of `hierarchical_topics`. level_scale: Whether to apply a linear or logarithmic (log) scale levels of the distance vector. Linear scaling will perform an equal number of merges at each level while logarithmic scaling will perform more mergers in earlier levels to provide more resolution at higher levels (this can be used for when the number of topics is large). custom_labels: If bool, whether to use custom topic labels that were defined using `topic_model.set_topic_labels`. If `str`, it uses labels from other aspects, e.g., "Aspect1". NOTE: Custom labels are only generated for the original un-merged topics. title: Title of the plot. width: The width of the figure. height: The height of the figure. Examples: To visualize the topics simply run: ```python topic_model.visualize_hierarchical_documents(docs, hierarchical_topics) ``` Do note that this re-calculates the embeddings and reduces them to 2D. The advised and prefered pipeline for using this function is as follows: ```python from sklearn.datasets import fetch_20newsgroups from sentence_transformers import SentenceTransformer from bertopic import BERTopic from umap import UMAP # Prepare embeddings docs = fetch_20newsgroups(subset='all', remove=('headers', 'footers', 'quotes'))['data'] sentence_model = SentenceTransformer("all-MiniLM-L6-v2") embeddings = sentence_model.encode(docs, show_progress_bar=False) # Train BERTopic and extract hierarchical topics topic_model = BERTopic().fit(docs, embeddings) hierarchical_topics = topic_model.hierarchical_topics(docs) # Reduce dimensionality of embeddings, this step is optional # reduced_embeddings = UMAP(n_neighbors=10, n_components=2, min_dist=0.0, metric='cosine').fit_transform(embeddings) # Run the visualization with the original embeddings topic_model.visualize_hierarchical_documents(docs, hierarchical_topics, embeddings=embeddings) # Or, if you have reduced the original embeddings already: topic_model.visualize_hierarchical_documents(docs, hierarchical_topics, reduced_embeddings=reduced_embeddings) ``` Or if you want to save the resulting figure: ```python fig = topic_model.visualize_hierarchical_documents(docs, hierarchical_topics, reduced_embeddings=reduced_embeddings) fig.write_html("path/to/file.html") ``` NOTE: This visualization was inspired by the scatter plot representation of Doc2Map: https://github.com/louisgeisler/Doc2Map """ topic_per_doc = topic_model.topics_ # Add
tags to hover labels to get them to appear on multiple lines def wrap_by_word(s, n): '''returns a string up to 300 words where \\n is inserted between every n words''' a = s.split()[:300] ret = '' for i in range(0, len(a), n): ret += ' '.join(a[i:i+n]) + '
' return ret # Apply the function to every element in the list hover_labels = [wrap_by_word(s, n=20) for s in hover_labels] # Sample the data to optimize for visualization and dimensionality reduction if sample is None or sample > 1: sample = 1 indices = [] for topic in set(topic_per_doc): s = np.where(np.array(topic_per_doc) == topic)[0] size = len(s) if len(s) < 100 else int(len(s)*sample) indices.extend(np.random.choice(s, size=size, replace=False)) indices = np.array(indices) df = pd.DataFrame({"topic": np.array(topic_per_doc)[indices]}) df["doc"] = [docs[index] for index in indices] df["hover_labels"] = [hover_labels[index] for index in indices] df["topic"] = [topic_per_doc[index] for index in indices] # Extract embeddings if not already done if sample is None: if embeddings is None and reduced_embeddings is None: embeddings_to_reduce = topic_model._extract_embeddings(df.doc.to_list(), method="document") else: embeddings_to_reduce = embeddings else: if embeddings is not None: embeddings_to_reduce = embeddings[indices] elif embeddings is None and reduced_embeddings is None: embeddings_to_reduce = topic_model._extract_embeddings(df.doc.to_list(), method="document") # Reduce input embeddings if reduced_embeddings is None: umap_model = UMAP(n_neighbors=10, n_components=2, min_dist=0.0, metric='cosine').fit(embeddings_to_reduce) embeddings_2d = umap_model.embedding_ elif sample is not None and reduced_embeddings is not None: embeddings_2d = reduced_embeddings[indices] elif sample is None and reduced_embeddings is not None: embeddings_2d = reduced_embeddings # Combine data df["x"] = embeddings_2d[:, 0] df["y"] = embeddings_2d[:, 1] # Create topic list for each level, levels are created by calculating the distance distances = hierarchical_topics.Distance.to_list() if level_scale == 'log' or level_scale == 'logarithmic': log_indices = np.round(np.logspace(start=math.log(1,10), stop=math.log(len(distances)-1,10), num=nr_levels)).astype(int).tolist() log_indices.reverse() max_distances = [distances[i] for i in log_indices] elif level_scale == 'lin' or level_scale == 'linear': max_distances = [distances[indices[-1]] for indices in np.array_split(range(len(hierarchical_topics)), nr_levels)][::-1] else: raise ValueError("level_scale needs to be one of 'log' or 'linear'") for index, max_distance in enumerate(max_distances): # Get topics below `max_distance` mapping = {topic: topic for topic in df.topic.unique()} selection = hierarchical_topics.loc[hierarchical_topics.Distance <= max_distance, :] selection.Parent_ID = selection.Parent_ID.astype(int) selection = selection.sort_values("Parent_ID") for row in selection.iterrows(): for topic in row[1].Topics: mapping[topic] = row[1].Parent_ID # Make sure the mappings are mapped 1:1 mappings = [True for _ in mapping] while any(mappings): for i, (key, value) in enumerate(mapping.items()): if value in mapping.keys() and key != value: mapping[key] = mapping[value] else: mappings[i] = False # Create new column df[f"level_{index+1}"] = df.topic.map(mapping) df[f"level_{index+1}"] = df[f"level_{index+1}"].astype(int) # Prepare topic names of original and merged topics trace_names = [] topic_names = {} trace_name_char_length = 60 for topic in range(hierarchical_topics.Parent_ID.astype(int).max()): if topic < hierarchical_topics.Parent_ID.astype(int).min(): if topic_model.get_topic(topic): if isinstance(custom_labels, str): trace_name = f"{topic} " + ", ".join(list(zip(*topic_model.topic_aspects_[custom_labels][topic]))[0][:5]) elif topic_model.custom_labels_ is not None and custom_labels: trace_name = topic_model.custom_labels_[topic + topic_model._outliers] else: trace_name = f"{topic} " + ", ".join([word[:20] for word, _ in topic_model.get_topic(topic)][:5]) topic_names[topic] = {"trace_name": trace_name[:trace_name_char_length], "plot_text": trace_name[:trace_name_char_length]} trace_names.append(trace_name) else: trace_name = f"{topic} " + hierarchical_topics.loc[hierarchical_topics.Parent_ID == str(topic), "Parent_Name"].values[0] plot_text = ", ".join([name[:20] for name in trace_name.split(" ")[:5]]) topic_names[topic] = {"trace_name": trace_name[:trace_name_char_length], "plot_text": plot_text[:trace_name_char_length]} trace_names.append(trace_name) # Prepare traces all_traces = [] for level in range(len(max_distances)): traces = [] # Outliers if topic_model._outliers: traces.append( go.Scattergl( x=df.loc[(df[f"level_{level+1}"] == -1), "x"], y=df.loc[df[f"level_{level+1}"] == -1, "y"], mode='markers+text', name="other", hoverinfo="text", hovertext=df.loc[(df[f"level_{level+1}"] == -1), "hover_labels"] if not hide_document_hover else None, showlegend=False, marker=dict(color='#CFD8DC', size=5, opacity=0.5), hoverlabel=dict(align='left') ) ) # Selected topics if topics: selection = df.loc[(df.topic.isin(topics)), :] unique_topics = sorted([int(topic) for topic in selection[f"level_{level+1}"].unique()]) else: unique_topics = sorted([int(topic) for topic in df[f"level_{level+1}"].unique()]) for topic in unique_topics: if topic != -1: if topics: selection = df.loc[(df[f"level_{level+1}"] == topic) & (df.topic.isin(topics)), :] else: selection = df.loc[df[f"level_{level+1}"] == topic, :] if not hide_annotations: selection.loc[len(selection), :] = None selection["text"] = "" selection.loc[len(selection) - 1, "x"] = selection.x.mean() selection.loc[len(selection) - 1, "y"] = selection.y.mean() selection.loc[len(selection) - 1, "text"] = topic_names[int(topic)]["plot_text"] traces.append( go.Scattergl( x=selection.x, y=selection.y, text=selection.text if not hide_annotations else None, hovertext=selection.hover_labels if not hide_document_hover else None, hoverinfo="text", name=topic_names[int(topic)]["trace_name"], mode='markers+text', marker=dict(size=5, opacity=0.5), hoverlabel=dict(align='left') ) ) all_traces.append(traces) # Track and count traces nr_traces_per_set = [len(traces) for traces in all_traces] trace_indices = [(0, nr_traces_per_set[0])] for index, nr_traces in enumerate(nr_traces_per_set[1:]): start = trace_indices[index][1] end = nr_traces + start trace_indices.append((start, end)) # Visualization fig = go.Figure() for traces in all_traces: for trace in traces: fig.add_trace(trace) for index in range(len(fig.data)): if index >= nr_traces_per_set[0]: fig.data[index].visible = False # Create and add slider steps = [] for index, indices in enumerate(trace_indices): step = dict( method="update", label=str(index), args=[{"visible": [False] * len(fig.data)}] ) for index in range(indices[1]-indices[0]): step["args"][0]["visible"][index+indices[0]] = True steps.append(step) sliders = [dict( currentvalue={"prefix": "Level: "}, pad={"t": 20}, steps=steps )] # Add grid in a 'plus' shape x_range = (df.x.min() - abs((df.x.min()) * .15), df.x.max() + abs((df.x.max()) * .15)) y_range = (df.y.min() - abs((df.y.min()) * .15), df.y.max() + abs((df.y.max()) * .15)) fig.add_shape(type="line", x0=sum(x_range) / 2, y0=y_range[0], x1=sum(x_range) / 2, y1=y_range[1], line=dict(color="#CFD8DC", width=2)) fig.add_shape(type="line", x0=x_range[0], y0=sum(y_range) / 2, x1=x_range[1], y1=sum(y_range) / 2, line=dict(color="#9E9E9E", width=2)) fig.add_annotation(x=x_range[0], y=sum(y_range) / 2, text="D1", showarrow=False, yshift=10) fig.add_annotation(y=y_range[1], x=sum(x_range) / 2, text="D2", showarrow=False, xshift=10) # Stylize layout fig.update_layout( sliders=sliders, template="simple_white", title={ 'text': f"{title}", 'x': 0.5, 'xanchor': 'center', 'yanchor': 'top', 'font': dict( size=22, color="Black") }, width=width, height=height, ) fig.update_xaxes(visible=False) fig.update_yaxes(visible=False) hierarchy_topics_df = df.filter(regex=r'topic|^level').drop_duplicates(subset="topic") topic_names = pd.DataFrame(topic_names).T return fig, hierarchy_topics_df, topic_names def visualize_barchart_custom(topic_model, topics: List[int] = None, top_n_topics: int = 8, n_words: int = 5, custom_labels: Union[bool, str] = False, title: str = "Topic Word Scores", width: int = 250, height: int = 250, progress=gr.Progress(track_tqdm=True)) -> go.Figure: """ Visualize a barchart of selected topics Arguments: topic_model: A fitted BERTopic instance. topics: A selection of topics to visualize. top_n_topics: Only select the top n most frequent topics. n_words: Number of words to show in a topic custom_labels: If bool, whether to use custom topic labels that were defined using `topic_model.set_topic_labels`. If `str`, it uses labels from other aspects, e.g., "Aspect1". title: Title of the plot. width: The width of each figure. height: The height of each figure. Returns: fig: A plotly figure Examples: To visualize the barchart of selected topics simply run: ```python topic_model.visualize_barchart() ``` Or if you want to save the resulting figure: ```python fig = topic_model.visualize_barchart() fig.write_html("path/to/file.html") ``` """ colors = itertools.cycle(["#D55E00", "#0072B2", "#CC79A7", "#E69F00", "#56B4E9", "#009E73", "#F0E442"]) # Select topics based on top_n and topics args freq_df = topic_model.get_topic_freq() freq_df = freq_df.loc[freq_df.Topic != -1, :] if topics is not None: topics = list(topics) elif top_n_topics is not None: topics = sorted(freq_df.Topic.to_list()[:top_n_topics]) else: topics = sorted(freq_df.Topic.to_list()[0:6]) # Initialize figure if isinstance(custom_labels, str): subplot_titles = [[[str(topic), None]] + topic_model.topic_aspects_[custom_labels][topic] for topic in topics] subplot_titles = ["_".join([label[0] for label in labels[:4]]) for labels in subplot_titles] subplot_titles = [label if len(label) < 30 else label[:27] + "..." for label in subplot_titles] elif topic_model.custom_labels_ is not None and custom_labels: subplot_titles = [topic_model.custom_labels_[topic + topic_model._outliers] for topic in topics] else: subplot_titles = [f"Topic {topic}" for topic in topics] columns = 3 rows = int(np.ceil(len(topics) / columns)) fig = make_subplots(rows=rows, cols=columns, shared_xaxes=False, horizontal_spacing=.1, vertical_spacing=.4 / rows if rows > 1 else 0, subplot_titles=subplot_titles) # Add barchart for each topic row = 1 column = 1 for topic in topics: words = [word + " " for word, _ in topic_model.get_topic(topic)][:n_words][::-1] scores = [score for _, score in topic_model.get_topic(topic)][:n_words][::-1] fig.add_trace( go.Bar(x=scores, y=words, orientation='h', marker_color=next(colors)), row=row, col=column) if column == columns: column = 1 row += 1 else: column += 1 # Stylize graph fig.update_layout( template="plotly_white", showlegend=False, title={ 'text': f"{title}", 'x': .5, 'xanchor': 'center', 'yanchor': 'top', 'font': dict( size=14, color="Black") }, width=width*4, height=height*rows if rows > 1 else height * 1.3, hoverlabel=dict( bgcolor="white", font_size=14, font_family="Rockwell" ), ) fig.update_xaxes(showgrid=True) fig.update_yaxes(showgrid=True) return fig