Spaces:
				
			
			
	
			
			
					
		Running
		
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
	| import gradio as gr | |
| import pandas as pd | |
| import plotly.express as px | |
| import time | |
| from datasets import load_dataset | |
| # Using the stable, community-built RangeSlider component | |
| from gradio_rangeslider import RangeSlider | |
| import datetime # Import the datetime module | |
| # --- Constants --- | |
| PARAM_CHOICES = ['< 1B', '1B', '5B', '12B', '32B', '64B', '128B', '256B', '> 500B'] | |
| PARAM_CHOICES_DEFAULT_INDICES = (0, len(PARAM_CHOICES) - 1) | |
| TOP_K_CHOICES = list(range(5, 51, 5)) | |
| HF_DATASET_ID = "evijit/modelverse_daily_data" | |
| TAG_FILTER_CHOICES = [ "Audio & Speech", "Time series", "Robotics", "Music", "Video", "Images", "Text", "Biomedical", "Sciences" ] | |
| PIPELINE_TAGS = [ 'text-generation', 'text-to-image', 'text-classification', 'text2text-generation', 'audio-to-audio', 'feature-extraction', 'image-classification', 'translation', 'reinforcement-learning', 'fill-mask', 'text-to-speech', 'automatic-speech-recognition', 'image-text-to-text', 'token-classification', 'sentence-similarity', 'question-answering', 'image-feature-extraction', 'summarization', 'zero-shot-image-classification', 'object-detection', 'image-segmentation', 'image-to-image', 'image-to-text', 'audio-classification', 'visual-question-answering', 'text-to-video', 'zero-shot-classification', 'depth-estimation', 'text-ranking', 'image-to-video', 'multiple-choice', 'unconditional-image-generation', 'video-classification', 'text-to-audio', 'time-series-forecasting', 'any-to-any', 'video-text-to-text', 'table-question-answering' ] | |
| def load_models_data(): | |
| overall_start_time = time.time() | |
| print(f"Attempting to load dataset from Hugging Face Hub: {HF_DATASET_ID}") | |
| try: | |
| dataset_dict = load_dataset(HF_DATASET_ID) | |
| df = dataset_dict[list(dataset_dict.keys())[0]].to_pandas() | |
| if 'params' in df.columns: | |
| df['params'] = pd.to_numeric(df['params'], errors='coerce').fillna(-1) | |
| else: | |
| df['params'] = -1 | |
| if 'createdAt' in df.columns: | |
| df['createdAt'] = pd.to_datetime(df['createdAt'], errors='coerce') | |
| msg = f"Successfully loaded dataset in {time.time() - overall_start_time:.2f}s." | |
| print(msg) | |
| return df, True, msg | |
| except Exception as e: | |
| err_msg = f"Failed to load dataset. Error: {e}" | |
| print(err_msg) | |
| return pd.DataFrame(), False, err_msg | |
| def get_param_range_values(param_range_labels): | |
| min_label, max_label = param_range_labels | |
| min_val = 0.0 if '<' in min_label else float(min_label.replace('B', '')) | |
| max_val = float('inf') if '>' in max_label else float(max_label.replace('B', '')) | |
| return min_val, max_val | |
| def make_treemap_data(df, count_by, top_k=25, tag_filter=None, pipeline_filter=None, param_range=None, skip_orgs=None, include_unknown_param_size=True, created_after_date: float = None): | |
| if df is None or df.empty: return pd.DataFrame() | |
| filtered_df = df.copy() | |
| if not include_unknown_param_size and 'params' in filtered_df.columns: | |
| filtered_df = filtered_df[filtered_df['params'] != -1] | |
| col_map = { "Audio & Speech": "is_audio_speech", "Music": "has_music", "Robotics": "has_robot", "Biomedical": "is_biomed", "Time series": "has_series", "Sciences": "has_science", "Video": "has_video", "Images": "has_image", "Text": "has_text" } | |
| if tag_filter and tag_filter in col_map and col_map[tag_filter] in filtered_df.columns: | |
| filtered_df = filtered_df[filtered_df[col_map[tag_filter]]] | |
| if pipeline_filter and "pipeline_tag" in filtered_df.columns: | |
| filtered_df = filtered_df[filtered_df["pipeline_tag"].astype(str) == pipeline_filter] | |
| if param_range: | |
| min_params, max_params = get_param_range_values(param_range) | |
| is_default_range = (param_range[0] == PARAM_CHOICES[0] and param_range[1] == PARAM_CHOICES[-1]) | |
| if not is_default_range and 'params' in filtered_df.columns: | |
| if min_params is not None: filtered_df = filtered_df[filtered_df['params'] >= min_params] | |
| if max_params is not None and max_params != float('inf'): filtered_df = filtered_df[filtered_df['params'] < max_params] | |
| # --- CORRECTED DATE FILTER LOGIC FOR FLOAT TIMESTAMP --- | |
| if created_after_date is not None and 'createdAt' in filtered_df.columns: | |
| # Drop rows where 'createdAt' could not be parsed to avoid errors | |
| filtered_df = filtered_df.dropna(subset=['createdAt']) | |
| # Convert the Unix timestamp (float) from the UI into a Python date object | |
| filter_date = datetime.datetime.fromtimestamp(created_after_date).date() | |
| # Compare its date part with the date part of the 'createdAt' column. | |
| filtered_df = filtered_df[filtered_df['createdAt'].dt.date > filter_date] | |
| if skip_orgs and len(skip_orgs) > 0 and "organization" in filtered_df.columns: | |
| filtered_df = filtered_df[~filtered_df["organization"].isin(skip_orgs)] | |
| if filtered_df.empty: return pd.DataFrame() | |
| if count_by not in filtered_df.columns: filtered_df[count_by] = 0.0 | |
| filtered_df[count_by] = pd.to_numeric(filtered_df[count_by], errors='coerce').fillna(0.0) | |
| org_totals = filtered_df.groupby("organization")[count_by].sum().nlargest(top_k, keep='first') | |
| top_orgs_list = org_totals.index.tolist() | |
| treemap_data = filtered_df[filtered_df["organization"].isin(top_orgs_list)][["id", "organization", count_by]].copy() | |
| treemap_data["root"] = "models" | |
| return treemap_data | |
| def create_treemap(treemap_data, count_by, title=None): | |
| if treemap_data.empty: | |
| fig = px.treemap(names=["No data matches filters"], parents=[""], values=[1]) | |
| fig.update_layout(title="No data matches the selected filters", margin=dict(t=50, l=25, r=25, b=25)) | |
| return fig | |
| fig = px.treemap(treemap_data, path=["root", "organization", "id"], values=count_by, title=title, color_discrete_sequence=px.colors.qualitative.Plotly) | |
| fig.update_layout(margin=dict(t=50, l=25, r=25, b=25)) | |
| fig.update_traces(textinfo="label+value+percent root", hovertemplate="<b>%{label}</b><br>%{value:,} " + count_by + "<br>%{percentRoot:.2%} of total<extra></extra>") | |
| return fig | |
| custom_css = """ | |
| .model-parameters-group > .block { | |
| background: none !important; | |
| border: none !important; | |
| box-shadow: none !important; | |
| } | |
| #param-slider-wrapper .head, | |
| #param-slider-wrapper div[data-testid="range-slider"] > span { | |
| display: none !important; | |
| } | |
| """ | |
| with gr.Blocks(title="🤗 ModelVerse Explorer", fill_width=True, css=custom_css) as demo: | |
| models_data_state = gr.State(pd.DataFrame()) | |
| loading_complete_state = gr.State(False) | |
| with gr.Row(): | |
| gr.Markdown("# 🤗 ModelVerse Explorer") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| count_by_dropdown = gr.Dropdown(label="Metric", choices=[("Downloads (last 30 days)", "downloads"), ("Downloads (All Time)", "downloadsAllTime"), ("Likes", "likes")], value="downloads") | |
| filter_choice_radio = gr.Radio(label="Filter Type", choices=["None", "Tag Filter", "Pipeline Filter"], value="None") | |
| tag_filter_dropdown = gr.Dropdown(label="Select Tag", choices=TAG_FILTER_CHOICES, value=None, visible=False) | |
| pipeline_filter_dropdown = gr.Dropdown(label="Select Pipeline Tag", choices=PIPELINE_TAGS, value=None, visible=False) | |
| with gr.Group(elem_classes="model-parameters-group"): | |
| gr.Markdown("<div style='font-weight: 500;'>Model Parameters</div>") | |
| param_range_slider = RangeSlider( | |
| minimum=0, maximum=len(PARAM_CHOICES) - 1, value=PARAM_CHOICES_DEFAULT_INDICES, | |
| step=1, label=None, show_label=False, elem_id="param-slider-wrapper" | |
| ) | |
| param_range_display = gr.Markdown(f"Range: `{PARAM_CHOICES[0]}` to `{PARAM_CHOICES[-1]}`") | |
| include_unknown_params_checkbox = gr.Checkbox(label="Include models with unknown parameter size", value=True) | |
| created_after_datepicker = gr.DateTime(label="Created After") | |
| top_k_dropdown = gr.Dropdown(label="Number of Top Organizations", choices=TOP_K_CHOICES, value=25) | |
| skip_orgs_textbox = gr.Textbox(label="Organizations to Skip (comma-separated)", value="TheBloke,MaziyarPanahi,unsloth,modularai,Gensyn,bartowski") | |
| generate_plot_button = gr.Button(value="Generate Plot", variant="primary", interactive=False) | |
| with gr.Column(scale=3): | |
| plot_output = gr.Plot() | |
| status_message_md = gr.Markdown("Initializing...") | |
| data_info_md = gr.Markdown("") | |
| def update_param_display(value: tuple): | |
| min_idx, max_idx = int(value[0]), int(value[1]) | |
| return f"Range: `{PARAM_CHOICES[min_idx]}` to `{PARAM_CHOICES[max_idx]}`" | |
| def _toggle_unknown_params_checkbox(param_range_indices): | |
| min_idx, max_idx = int(param_range_indices[0]), int(param_range_indices[1]) | |
| is_default_range = (min_idx == PARAM_CHOICES_DEFAULT_INDICES[0] and max_idx == PARAM_CHOICES_DEFAULT_INDICES[1]) | |
| if not is_default_range: | |
| return gr.update(interactive=False, value=False) | |
| else: | |
| return gr.update(interactive=True) | |
| param_range_slider.change(update_param_display, param_range_slider, param_range_display) | |
| param_range_slider.change(_toggle_unknown_params_checkbox, param_range_slider, include_unknown_params_checkbox) | |
| loading_complete_state.change(lambda is_loaded: gr.update(interactive=is_loaded), loading_complete_state, generate_plot_button) | |
| filter_choice_radio.change(lambda choice: (gr.update(visible=choice == "Tag Filter"), gr.update(visible=choice == "Pipeline Filter")), | |
| filter_choice_radio, [tag_filter_dropdown, pipeline_filter_dropdown]) | |
| def load_and_generate_initial_plot(progress=gr.Progress()): | |
| progress(0, desc=f"Loading dataset '{HF_DATASET_ID}'...") | |
| current_df, load_success_flag, status_msg_from_load = pd.DataFrame(), False, "" | |
| try: | |
| current_df, load_success_flag, status_msg_from_load = load_models_data() | |
| if load_success_flag: | |
| progress(0.5, desc="Processing data...") | |
| ts = pd.to_datetime(current_df['data_download_timestamp'].iloc[0], utc=True) if 'data_download_timestamp' in current_df.columns and pd.notna(current_df['data_download_timestamp'].iloc[0]) else None | |
| date_display = ts.strftime('%B %d, %Y, %H:%M:%S %Z') if ts else "Pre-processed (date unavailable)" | |
| param_count = (current_df['params'] != -1).sum() | |
| data_info_text = (f"### Data Information\n- Source: `{HF_DATASET_ID}`\n- Status: {status_msg_from_load}\n" | |
| f"- Total models loaded: {len(current_df):,}\n- Models with known parameter counts: {param_count:,}\n" | |
| f"- Models with unknown parameter counts: {len(current_df) - param_count:,}\n- Data as of: {date_display}\n") | |
| else: | |
| data_info_text = f"### Data Load Failed\n- {status_msg_from_load}" | |
| except Exception as e: | |
| status_msg_from_load = f"An unexpected error occurred: {str(e)}" | |
| data_info_text = f"### Critical Error\n- {status_msg_from_load}" | |
| print(f"Critical error in load_and_generate_initial_plot: {e}") | |
| progress(0.6, desc="Generating initial plot...") | |
| initial_plot, initial_status = ui_generate_plot_controller( | |
| "downloads", "None", None, None, PARAM_CHOICES_DEFAULT_INDICES, 25, | |
| "TheBloke,MaziyarPanahi,unsloth,modularai,Gensyn,bartowski", True, None, current_df, progress | |
| ) | |
| return current_df, load_success_flag, data_info_text, initial_status, initial_plot | |
| def ui_generate_plot_controller(metric_choice, filter_type, tag_choice, pipeline_choice, | |
| param_range_indices, k_orgs, skip_orgs_input, include_unknown_param_size_flag, | |
| created_after_date, df_current_models, progress=gr.Progress()): | |
| if df_current_models.empty: | |
| return create_treemap(pd.DataFrame(), metric_choice, "Error: Model Data Not Loaded"), "Model data is not loaded." | |
| progress(0.1, desc="Preparing data...") | |
| param_labels = [PARAM_CHOICES[int(param_range_indices[0])], PARAM_CHOICES[int(param_range_indices[1])]] | |
| treemap_df = make_treemap_data( | |
| df_current_models, metric_choice, k_orgs, | |
| tag_choice if filter_type == "Tag Filter" else None, | |
| pipeline_choice if filter_type == "Pipeline Filter" else None, | |
| param_labels, [org.strip() for org in skip_orgs_input.split(',') if org.strip()], | |
| include_unknown_param_size_flag, created_after_date | |
| ) | |
| progress(0.7, desc="Generating plot...") | |
| title_labels = {"downloads": "Downloads (last 30 days)", "downloadsAllTime": "Downloads (All Time)", "likes": "Likes"} | |
| plotly_fig = create_treemap(treemap_df, metric_choice, f"HuggingFace Models - {title_labels.get(metric_choice, metric_choice)} by Organization") | |
| plot_stats_md = (f"## Plot Statistics\n- **Models shown**: {len(treemap_df['id'].unique()):,}\n" | |
| f"- **Total {metric_choice}**: {int(treemap_df[metric_choice].sum()):,}") if not treemap_df.empty else "No data matches the selected filters." | |
| return plotly_fig, plot_stats_md | |
| demo.load(load_and_generate_initial_plot, None, [models_data_state, loading_complete_state, data_info_md, status_message_md, plot_output]) | |
| generate_plot_button.click( | |
| ui_generate_plot_controller, | |
| [count_by_dropdown, filter_choice_radio, tag_filter_dropdown, pipeline_filter_dropdown, | |
| param_range_slider, top_k_dropdown, skip_orgs_textbox, include_unknown_params_checkbox, | |
| created_after_datepicker, models_data_state], | |
| [plot_output, status_message_md] | |
| ) | |
| if __name__ == "__main__": | |
| print(f"Application starting...") | |
| demo.queue().launch() | 
 
			
