import gradio as gr import json import posixpath from fastapi import HTTPException, Path, Query, Request from fastapi.responses import StreamingResponse from gradio_huggingfacehub_search import HuggingfaceHubSearch from huggingface_hub import HfApi, HfFileSystem from typing import Annotated, Any, NamedTuple from urllib.parse import urlencode from _hf_explorer import FileExplorer from _hf_gguf import standard_metadata, GGUFValueType, HuggingGGUFstream hfapi = HfApi() class MetadataState(NamedTuple): var: dict[str, Any] key: dict[str, tuple[int, Any]] add: dict[str, Any] rem: set def init_state( ): return MetadataState( var = {}, key = {}, add = {}, rem = set(), ) def human_readable_metadata( typ: int, val: Any, ) -> tuple[str, Any]: typ = GGUFValueType(typ).name if typ == 'ARRAY': val = '[[...], ...]' elif isinstance(val, list): typ = f'[{typ}][{len(val)}]' if len(val) > 8: val = str(val[:8])[:-1] + ', ...]' else: val = str(val) return typ, val with gr.Blocks( ) as blocks: with gr.Row(): hf_search = HuggingfaceHubSearch( label = "Search Huggingface Hub", placeholder = "Search for models on Huggingface", search_type = "model", sumbit_on_select = True, scale = 2, ) hf_branch = gr.Dropdown( None, label = "Branch", scale = 1, ) gr.LoginButton( "Sign in to access gated/private repos", scale = 1, ) hf_file = FileExplorer( visible=False, ) with gr.Row(): with gr.Column(): meta_keys = gr.Dropdown( None, label = "Modify Metadata", allow_custom_value = True, visible = False, ) with gr.Column(): meta_types = gr.Dropdown( [e.name for e in GGUFValueType], label = "Metadata Type", type = "index", visible = False, ) with gr.Column(): btn_delete = gr.Button( "Remove Key", variant = 'stop', visible = False, ) meta_boolean = gr.Checkbox( visible = False, ) with gr.Row(): # Too slow unfortunately, needs a proper search box # meta_lookup = gr.Dropdown( # label = 'Lookup token', # type = 'index', # visible = False, # ) meta_number = gr.Number( visible = False, ) meta_string = gr.Textbox( visible = False, ) meta_array = gr.Matrix( None, label = "Unsupported", row_count = (1, 'fixed'), height = "1rem", interactive = False, visible = False, ) meta_changes = gr.HighlightedText( None, label = "Metadata Changes", color_map = {'add': 'green', 'rem': 'red'}, interactive = False, visible = False, ) btn_download = gr.Button( "Download GGUF", variant = "primary", visible = False, ) file_meta = gr.Matrix( None, col_count = (3, "fixed"), headers = [ "Metadata Name", "Type", "Value", ], datatype = ["str", "str", "str"], column_widths = ["35%", "15%", "50%"], wrap = True, interactive = False, visible = False, ) meta_state = gr.State() # init_state # BUG: For some reason using gr.State initial value turns tuple to list? meta_state.value = init_state() file_change_components = [ meta_changes, file_meta, meta_keys, btn_download, ] state_change_components = [ meta_state, ] + file_change_components @gr.on( triggers = [ hf_search.submit, ], inputs = [ hf_search, ], outputs = [ hf_branch, ], ) def get_branches( repo: str, oauth_token: gr.OAuthToken | None = None, ): branches = [] try: refs = hfapi.list_repo_refs( repo, token = oauth_token.token if oauth_token else False, ) branches = [b.name for b in refs.branches] except Exception as e: raise gr.Error(e) return { hf_branch: gr.Dropdown( branches or None, value = "main" if "main" in branches else None, ), } @gr.on( triggers = [ hf_search.submit, hf_branch.input, ], inputs = [ hf_search, hf_branch, ], outputs = [ hf_file, ] + file_change_components, ) def get_files( repo: str, branch: str | None, oauth_token: gr.OAuthToken | None = None, ): return { hf_file: FileExplorer( "**/*.gguf", file_count = "single", root_dir = repo, branch = branch, token = oauth_token.token if oauth_token else None, visible = True, ), meta_changes: gr.HighlightedText( None, visible = False, ), file_meta: gr.Matrix( # None, # FIXME (see Dataframe bug below) visible = False, ), meta_keys: gr.Dropdown( None, visible = False, ), btn_download: gr.Button( visible = False, ), } @gr.on( triggers = [ hf_file.change, ], inputs = [ hf_file, hf_branch, ], outputs = [ meta_state, ] + file_change_components, show_progress = 'minimal', ) def load_metadata( repo_file: str | None, branch: str | None, progress: gr.Progress = gr.Progress(), oauth_token: gr.OAuthToken | None = None, ): m = [] meta = init_state() yield { meta_state: meta, file_meta: gr.Matrix( [['', '', '']] * 100, # FIXME: Workaround for Dataframe bug when user has selected data visible = True, ), meta_changes: gr.HighlightedText( None, visible = False, ), meta_keys: gr.Dropdown( None, visible = False, ), btn_download: gr.Button( visible = False, ), } if not repo_file: return fs = HfFileSystem( token = oauth_token.token if oauth_token else None, ) try: progress(0, desc = 'Loading file...') with fs.open( repo_file, "rb", revision = branch, block_size = 8 * 1024 * 1024, cache_type = "readahead", ) as fp: progress(0, desc = 'Reading header...') gguf = HuggingGGUFstream(fp) num_metadata = gguf.header['metadata'].value metadata = gguf.read_metadata() meta.var['repo_file'] = repo_file meta.var['branch'] = branch for k, v in progress.tqdm(metadata, desc = 'Reading metadata...', total = num_metadata, unit = f' of {num_metadata} metadata keys...'): m.append([k, *human_readable_metadata(v.type, v.value)]) meta.key[k] = (v.type, v.value) # FIXME # yield { # file_meta: gr.Matrix( # m, # ), # } except Exception as e: raise gr.Error(e) yield { meta_state: meta, file_meta: gr.Matrix( m, ), meta_keys: gr.Dropdown( sorted(meta.key.keys() | standard_metadata.keys()), value = '', visible = True, ), } @gr.on( triggers = [ meta_keys.change, ], inputs = [ meta_state, meta_keys, ], outputs = [ meta_types, btn_delete, ], ) def update_metakey( meta: MetadataState, key: str | None, ): typ = None if (val := meta.key.get(key, standard_metadata.get(key))) is not None: typ = GGUFValueType(val[0]).name elif key and key.startswith('tokenizer.chat_template.'): typ = GGUFValueType.STRING.name return { meta_types: gr.Dropdown( value = typ, interactive = False if key in meta.key else True, visible = True if key else False, ), btn_delete: gr.Button( visible = True if key in meta.key else False, ), } @gr.on( triggers = [ meta_keys.change, meta_types.input, ], inputs = [ meta_state, meta_keys, meta_types, ], outputs = [ meta_boolean, meta_number, meta_string, meta_array, ], ) def update_metatype( meta: MetadataState, key: str, typ: int, ): val = None if (data := meta.key.get(key, standard_metadata.get(key))) is not None: typ = data[0] val = data[1] elif not key: typ = None if isinstance(val, list): # TODO: Support arrays? typ = GGUFValueType.ARRAY match typ: case GGUFValueType.INT8 | GGUFValueType.INT16 | GGUFValueType.INT32 | GGUFValueType.INT64 | GGUFValueType.UINT8 | GGUFValueType.UINT16 | GGUFValueType.UINT32 | GGUFValueType.UINT64 | GGUFValueType.FLOAT32 | GGUFValueType.FLOAT64: is_number = True case _: is_number = False return { meta_boolean: gr.Checkbox( value = val if typ == GGUFValueType.BOOL and data is not None else False, visible = True if typ == GGUFValueType.BOOL else False, ), meta_number: gr.Number( value = val if is_number and data is not None else 0, precision = 10 if typ == GGUFValueType.FLOAT32 or typ == GGUFValueType.FLOAT64 else 0, visible = True if is_number else False, ), meta_string: gr.Textbox( value = val if typ == GGUFValueType.STRING else '', visible = True if typ == GGUFValueType.STRING else False, ), meta_array: gr.Matrix( visible = True if typ == GGUFValueType.ARRAY else False, ), } # FIXME: Disabled for now due to Dataframe bug when user has selected data # @gr.on( # triggers = [ # file_meta.select, # ], # inputs = [ # ], # outputs = [ # meta_keys, # ], # ) # def select_metakey( # evt: gr.SelectData, # ): # return { # meta_keys: gr.Dropdown( # value = evt.row_value[0] if evt.selected else '', # ), # } def notify_state_change( meta: MetadataState, request: gr.Request, ): changes = [(k, 'rem') for k in meta.rem] for k, v in meta.add.items(): changes.append((k, 'add')) changes.append((str(v[1]), None)) m = [] for k, v in meta.key.items(): m.append([k, *human_readable_metadata(v[0], v[1])]) link = str(request.request.url_for('download', repo_file = meta.var['repo_file']).include_query_params(branch = meta.var['branch'])) if link.startswith('http:'): link = 'https' + link[4:] if meta.rem or meta.add: link += '&' + urlencode( { 'rem': meta.rem, 'add': [json.dumps([k, *v], ensure_ascii = False) for k, v in meta.add.items()], }, doseq = True, safe = '[]{}:"\',', ) return { meta_state: meta, meta_changes: gr.HighlightedText( changes, visible = True if changes else False, ), file_meta: gr.Matrix( m, ), meta_keys: gr.Dropdown( sorted(meta.key.keys() | standard_metadata.keys()), value = '', ), btn_download: gr.Button( link = link, visible = True if changes else False, ), } @gr.on( triggers = [ btn_delete.click, ], inputs = [ meta_state, meta_keys, ], outputs = [ ] + state_change_components, ) def rem_metadata( meta: MetadataState, key: str, request: gr.Request, ): if key in meta.add: del meta.add[key] if key in meta.key: del meta.key[key] meta.rem.add(key) return notify_state_change( meta, request, ) def add_metadata( meta: MetadataState, key: str, typ: int | None, val: Any, request: gr.Request, ): if not key or typ is None: if key: gr.Warning('Missing required value type') return { meta_changes: gr.HighlightedText( ) } if key in meta.rem: meta.rem.remove(key) meta.key[key] = meta.add[key] = (typ, val) if key.startswith('tokenizer.chat_template.'): template = key[24:] if template not in meta.key.get('tokenizer.chat_templates', []): templates = [x[24:] for x in meta.key.keys() if x.startswith('tokenizer.chat_template.')] meta.key['tokenizer.chat_templates'] = meta.add['tokenizer.chat_templates'] = (GGUFValueType.STRING, templates) return notify_state_change( meta, request, ) meta_boolean.input( add_metadata, inputs = [ meta_state, meta_keys, meta_types, meta_boolean, ], outputs = [ ] + state_change_components, ) # meta_lookup.input( # lambda token: gr.Number(value = token), # inputs = meta_lookup, # outputs = meta_number, # ) meta_number.submit( add_metadata, inputs = [ meta_state, meta_keys, meta_types, meta_number, ], outputs = [ ] + state_change_components, ) meta_string.submit( add_metadata, inputs = [ meta_state, meta_keys, meta_types, meta_string, ], outputs = [ ] + state_change_components, ) meta_array.input( add_metadata, inputs = [ meta_state, meta_keys, meta_types, meta_array, ], outputs = [ ] + state_change_components, ) def stream_repo_file( repo_file: str, branch: str, add_meta: list[str] | None, rem_meta: list[str] | None, token: str | None = None, ): fs = HfFileSystem( token = token, ) with fs.open( repo_file, "rb", revision = branch, block_size = 8 * 1024 * 1024, cache_type = "readahead", ) as fp: if not rem_meta: rem_meta = [] if not add_meta: add_meta = [] gguf = HuggingGGUFstream(fp) for _ in gguf.read_metadata(): pass for k in rem_meta: gguf.remove_metadata(k) for k in add_meta: k = json.loads(k) if isinstance(k, list) and len(k) == 3: gguf.add_metadata(*k) yield gguf.filesize yield b''.join((v.data for k, v in gguf.header.items())) for k, v in gguf.metadata.items(): yield v.data while True: if not (data := fp.read(65536)): break yield data if __name__ == "__main__": blocks.queue( max_size = 10, default_concurrency_limit = 10, ) app, local_url, share_url = blocks.launch( show_api = False, prevent_thread_lock = True, ) async def download( request: Request, repo_file: Annotated[str, Path()], branch: Annotated[str, Query()] = "main", add: Annotated[list[str] | None, Query()] = None, rem: Annotated[list[str] | None, Query()] = None, ): token = request.session.get('oauth_info', {}).get('access_token') if posixpath.normpath(repo_file) != repo_file or '\\' in repo_file or repo_file.startswith('../') or repo_file.startswith('/') or repo_file.count('/') < 2: raise HTTPException( status_code = 404, detail = 'Invalid repository', ) stream = stream_repo_file( repo_file, branch, add, rem, token = token, ) size = next(stream) return StreamingResponse( stream, headers = { 'Content-Length': str(size), }, media_type = 'application/octet-stream', ) app.add_api_route( "/download/{repo_file:path}", download, methods = ["GET"], ) # app.openapi_schema = None # app.setup() blocks.block_thread()