#!/usr/bin/env python # coding=utf-8 # Copyright 2023 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import importlib import inspect import io import json import os import tempfile from typing import Any, Dict, List, Optional, Union from huggingface_hub import create_repo, hf_hub_download, metadata_update, upload_folder from huggingface_hub.utils import RepositoryNotFoundError, build_hf_headers, get_session from ..dynamic_module_utils import custom_object_save, get_class_from_dynamic_module, get_imports from ..image_utils import is_pil_image from ..models.auto import AutoProcessor from ..utils import ( CONFIG_NAME, cached_file, is_accelerate_available, is_torch_available, is_vision_available, logging, ) from .agent_types import handle_agent_inputs, handle_agent_outputs logger = logging.get_logger(__name__) if is_torch_available(): import torch if is_accelerate_available(): from accelerate.utils import send_to_device TOOL_CONFIG_FILE = "tool_config.json" def get_repo_type(repo_id, repo_type=None, **hub_kwargs): if repo_type is not None: return repo_type try: hf_hub_download(repo_id, TOOL_CONFIG_FILE, repo_type="space", **hub_kwargs) return "space" except RepositoryNotFoundError: try: hf_hub_download(repo_id, TOOL_CONFIG_FILE, repo_type="model", **hub_kwargs) return "model" except RepositoryNotFoundError: raise EnvironmentError(f"`{repo_id}` does not seem to be a valid repo identifier on the Hub.") except Exception: return "model" except Exception: return "space" # docstyle-ignore APP_FILE_TEMPLATE = """from transformers import launch_gradio_demo from {module_name} import {class_name} launch_gradio_demo({class_name}) """ class Tool: """ A base class for the functions used by the agent. Subclass this and implement the `__call__` method as well as the following class attributes: - **description** (`str`) -- A short description of what your tool does, the inputs it expects and the output(s) it will return. For instance 'This is a tool that downloads a file from a `url`. It takes the `url` as input, and returns the text contained in the file'. - **name** (`str`) -- A performative name that will be used for your tool in the prompt to the agent. For instance `"text-classifier"` or `"image_generator"`. - **inputs** (`List[str]`) -- The list of modalities expected for the inputs (in the same order as in the call). Modalitiies should be `"text"`, `"image"` or `"audio"`. This is only used by `launch_gradio_demo` or to make a nice space from your tool. - **outputs** (`List[str]`) -- The list of modalities returned but the tool (in the same order as the return of the call method). Modalitiies should be `"text"`, `"image"` or `"audio"`. This is only used by `launch_gradio_demo` or to make a nice space from your tool. You can also override the method [`~Tool.setup`] if your tool as an expensive operation to perform before being usable (such as loading a model). [`~Tool.setup`] will be called the first time you use your tool, but not at instantiation. """ description: str = "This is a tool that ..." name: str = "" inputs: List[str] outputs: List[str] def __init__(self, *args, **kwargs): self.is_initialized = False def __call__(self, *args, **kwargs): return NotImplemented("Write this method in your subclass of `Tool`.") def setup(self): """ Overwrite this method here for any operation that is expensive and needs to be executed before you start using your tool. Such as loading a big model. """ self.is_initialized = True def save(self, output_dir): """ Saves the relevant code files for your tool so it can be pushed to the Hub. This will copy the code of your tool in `output_dir` as well as autogenerate: - a config file named `tool_config.json` - an `app.py` file so that your tool can be converted to a space - a `requirements.txt` containing the names of the module used by your tool (as detected when inspecting its code) You should only use this method to save tools that are defined in a separate module (not `__main__`). Args: output_dir (`str`): The folder in which you want to save your tool. """ os.makedirs(output_dir, exist_ok=True) # Save module file if self.__module__ == "__main__": raise ValueError( f"We can't save the code defining {self} in {output_dir} as it's been defined in __main__. You " "have to put this code in a separate module so we can include it in the saved folder." ) module_files = custom_object_save(self, output_dir) module_name = self.__class__.__module__ last_module = module_name.split(".")[-1] full_name = f"{last_module}.{self.__class__.__name__}" # Save config file config_file = os.path.join(output_dir, "tool_config.json") if os.path.isfile(config_file): with open(config_file, "r", encoding="utf-8") as f: tool_config = json.load(f) else: tool_config = {} tool_config = {"tool_class": full_name, "description": self.description, "name": self.name} with open(config_file, "w", encoding="utf-8") as f: f.write(json.dumps(tool_config, indent=2, sort_keys=True) + "\n") # Save app file app_file = os.path.join(output_dir, "app.py") with open(app_file, "w", encoding="utf-8") as f: f.write(APP_FILE_TEMPLATE.format(module_name=last_module, class_name=self.__class__.__name__)) # Save requirements file requirements_file = os.path.join(output_dir, "requirements.txt") imports = [] for module in module_files: imports.extend(get_imports(module)) imports = list(set(imports)) with open(requirements_file, "w", encoding="utf-8") as f: f.write("\n".join(imports) + "\n") @classmethod def from_hub( cls, repo_id: str, model_repo_id: Optional[str] = None, token: Optional[str] = None, remote: bool = False, **kwargs, ): """ Loads a tool defined on the Hub. Args: repo_id (`str`): The name of the repo on the Hub where your tool is defined. model_repo_id (`str`, *optional*): If your tool uses a model and you want to use a different model than the default, you can pass a second repo ID or an endpoint url to this argument. token (`str`, *optional*): The token to identify you on hf.co. If unset, will use the token generated when running `huggingface-cli login` (stored in `~/.huggingface`). remote (`bool`, *optional*, defaults to `False`): Whether to use your tool by downloading the model or (if it is available) with an inference endpoint. kwargs (additional keyword arguments, *optional*): Additional keyword arguments that will be split in two: all arguments relevant to the Hub (such as `cache_dir`, `revision`, `subfolder`) will be used when downloading the files for your tool, and the others will be passed along to its init. """ if remote and model_repo_id is None: endpoints = get_default_endpoints() if repo_id not in endpoints: raise ValueError( f"Could not infer a default endpoint for {repo_id}, you need to pass one using the " "`model_repo_id` argument." ) model_repo_id = endpoints[repo_id] hub_kwargs_names = [ "cache_dir", "force_download", "resume_download", "proxies", "revision", "repo_type", "subfolder", "local_files_only", ] hub_kwargs = {k: v for k, v in kwargs.items() if k in hub_kwargs_names} # Try to get the tool config first. hub_kwargs["repo_type"] = get_repo_type(repo_id, **hub_kwargs) resolved_config_file = cached_file( repo_id, TOOL_CONFIG_FILE, use_auth_token=token, **hub_kwargs, _raise_exceptions_for_missing_entries=False, _raise_exceptions_for_connection_errors=False, ) is_tool_config = resolved_config_file is not None if resolved_config_file is None: resolved_config_file = cached_file( repo_id, CONFIG_NAME, use_auth_token=token, **hub_kwargs, _raise_exceptions_for_missing_entries=False, _raise_exceptions_for_connection_errors=False, ) if resolved_config_file is None: raise EnvironmentError( f"{repo_id} does not appear to provide a valid configuration in `tool_config.json` or `config.json`." ) with open(resolved_config_file, encoding="utf-8") as reader: config = json.load(reader) if not is_tool_config: if "custom_tool" not in config: raise EnvironmentError( f"{repo_id} does not provide a mapping to custom tools in its configuration `config.json`." ) custom_tool = config["custom_tool"] else: custom_tool = config tool_class = custom_tool["tool_class"] tool_class = get_class_from_dynamic_module(tool_class, repo_id, use_auth_token=token, **hub_kwargs) if len(tool_class.name) == 0: tool_class.name = custom_tool["name"] if tool_class.name != custom_tool["name"]: logger.warning( f"{tool_class.__name__} implements a different name in its configuration and class. Using the tool " "configuration name." ) tool_class.name = custom_tool["name"] if len(tool_class.description) == 0: tool_class.description = custom_tool["description"] if tool_class.description != custom_tool["description"]: logger.warning( f"{tool_class.__name__} implements a different description in its configuration and class. Using the " "tool configuration description." ) tool_class.description = custom_tool["description"] if remote: return RemoteTool(model_repo_id, token=token, tool_class=tool_class) return tool_class(model_repo_id, token=token, **kwargs) def push_to_hub( self, repo_id: str, commit_message: str = "Upload tool", private: Optional[bool] = None, token: Optional[Union[bool, str]] = None, create_pr: bool = False, ) -> str: """ Upload the tool to the Hub. Parameters: repo_id (`str`): The name of the repository you want to push your tool to. It should contain your organization name when pushing to a given organization. commit_message (`str`, *optional*, defaults to `"Upload tool"`): Message to commit while pushing. private (`bool`, *optional*): Whether or not the repository created should be private. token (`bool` or `str`, *optional*): The token to use as HTTP bearer authorization for remote files. If unset, will use the token generated when running `huggingface-cli login` (stored in `~/.huggingface`). create_pr (`bool`, *optional*, defaults to `False`): Whether or not to create a PR with the uploaded files or directly commit. """ repo_url = create_repo( repo_id=repo_id, token=token, private=private, exist_ok=True, repo_type="space", space_sdk="gradio" ) repo_id = repo_url.repo_id metadata_update(repo_id, {"tags": ["tool"]}, repo_type="space") with tempfile.TemporaryDirectory() as work_dir: # Save all files. self.save(work_dir) logger.info(f"Uploading the following files to {repo_id}: {','.join(os.listdir(work_dir))}") return upload_folder( repo_id=repo_id, commit_message=commit_message, folder_path=work_dir, token=token, create_pr=create_pr, repo_type="space", ) @staticmethod def from_gradio(gradio_tool): """ Creates a [`Tool`] from a gradio tool. """ class GradioToolWrapper(Tool): def __init__(self, _gradio_tool): super().__init__() self.name = _gradio_tool.name self.description = _gradio_tool.description GradioToolWrapper.__call__ = gradio_tool.run return GradioToolWrapper(gradio_tool) class RemoteTool(Tool): """ A [`Tool`] that will make requests to an inference endpoint. Args: endpoint_url (`str`, *optional*): The url of the endpoint to use. token (`str`, *optional*): The token to use as HTTP bearer authorization for remote files. If unset, will use the token generated when running `huggingface-cli login` (stored in `~/.huggingface`). tool_class (`type`, *optional*): The corresponding `tool_class` if this is a remote version of an existing tool. Will help determine when the output should be converted to another type (like images). """ def __init__(self, endpoint_url=None, token=None, tool_class=None): self.endpoint_url = endpoint_url self.client = EndpointClient(endpoint_url, token=token) self.tool_class = tool_class def prepare_inputs(self, *args, **kwargs): """ Prepare the inputs received for the HTTP client sending data to the endpoint. Positional arguments will be matched with the signature of the `tool_class` if it was provided at instantation. Images will be encoded into bytes. You can override this method in your custom class of [`RemoteTool`]. """ inputs = kwargs.copy() if len(args) > 0: if self.tool_class is not None: # Match args with the signature if issubclass(self.tool_class, PipelineTool): call_method = self.tool_class.encode else: call_method = self.tool_class.__call__ signature = inspect.signature(call_method).parameters parameters = [ k for k, p in signature.items() if p.kind not in [inspect._ParameterKind.VAR_POSITIONAL, inspect._ParameterKind.VAR_KEYWORD] ] if parameters[0] == "self": parameters = parameters[1:] if len(args) > len(parameters): raise ValueError( f"{self.tool_class} only accepts {len(parameters)} arguments but {len(args)} were given." ) for arg, name in zip(args, parameters): inputs[name] = arg elif len(args) > 1: raise ValueError("A `RemoteTool` can only accept one positional input.") elif len(args) == 1: if is_pil_image(args[0]): return {"inputs": self.client.encode_image(args[0])} return {"inputs": args[0]} for key, value in inputs.items(): if is_pil_image(value): inputs[key] = self.client.encode_image(value) return {"inputs": inputs} def extract_outputs(self, outputs): """ You can override this method in your custom class of [`RemoteTool`] to apply some custom post-processing of the outputs of the endpoint. """ return outputs def __call__(self, *args, **kwargs): args, kwargs = handle_agent_inputs(*args, **kwargs) output_image = self.tool_class is not None and self.tool_class.outputs == ["image"] inputs = self.prepare_inputs(*args, **kwargs) if isinstance(inputs, dict): outputs = self.client(**inputs, output_image=output_image) else: outputs = self.client(inputs, output_image=output_image) if isinstance(outputs, list) and len(outputs) == 1 and isinstance(outputs[0], list): outputs = outputs[0] outputs = handle_agent_outputs(outputs, self.tool_class.outputs if self.tool_class is not None else None) return self.extract_outputs(outputs) class PipelineTool(Tool): """ A [`Tool`] tailored towards Transformer models. On top of the class attributes of the base class [`Tool`], you will need to specify: - **model_class** (`type`) -- The class to use to load the model in this tool. - **default_checkpoint** (`str`) -- The default checkpoint that should be used when the user doesn't specify one. - **pre_processor_class** (`type`, *optional*, defaults to [`AutoProcessor`]) -- The class to use to load the pre-processor - **post_processor_class** (`type`, *optional*, defaults to [`AutoProcessor`]) -- The class to use to load the post-processor (when different from the pre-processor). Args: model (`str` or [`PreTrainedModel`], *optional*): The name of the checkpoint to use for the model, or the instantiated model. If unset, will default to the value of the class attribute `default_checkpoint`. pre_processor (`str` or `Any`, *optional*): The name of the checkpoint to use for the pre-processor, or the instantiated pre-processor (can be a tokenizer, an image processor, a feature extractor or a processor). Will default to the value of `model` if unset. post_processor (`str` or `Any`, *optional*): The name of the checkpoint to use for the post-processor, or the instantiated pre-processor (can be a tokenizer, an image processor, a feature extractor or a processor). Will default to the `pre_processor` if unset. device (`int`, `str` or `torch.device`, *optional*): The device on which to execute the model. Will default to any accelerator available (GPU, MPS etc...), the CPU otherwise. device_map (`str` or `dict`, *optional*): If passed along, will be used to instantiate the model. model_kwargs (`dict`, *optional*): Any keyword argument to send to the model instantiation. token (`str`, *optional*): The token to use as HTTP bearer authorization for remote files. If unset, will use the token generated when running `huggingface-cli login` (stored in `~/.huggingface`). hub_kwargs (additional keyword arguments, *optional*): Any additional keyword argument to send to the methods that will load the data from the Hub. """ pre_processor_class = AutoProcessor model_class = None post_processor_class = AutoProcessor default_checkpoint = None def __init__( self, model=None, pre_processor=None, post_processor=None, device=None, device_map=None, model_kwargs=None, token=None, **hub_kwargs, ): if not is_torch_available(): raise ImportError("Please install torch in order to use this tool.") if not is_accelerate_available(): raise ImportError("Please install accelerate in order to use this tool.") if model is None: if self.default_checkpoint is None: raise ValueError("This tool does not implement a default checkpoint, you need to pass one.") model = self.default_checkpoint if pre_processor is None: pre_processor = model self.model = model self.pre_processor = pre_processor self.post_processor = post_processor self.device = device self.device_map = device_map self.model_kwargs = {} if model_kwargs is None else model_kwargs if device_map is not None: self.model_kwargs["device_map"] = device_map self.hub_kwargs = hub_kwargs self.hub_kwargs["token"] = token super().__init__() def setup(self): """ Instantiates the `pre_processor`, `model` and `post_processor` if necessary. """ if isinstance(self.pre_processor, str): self.pre_processor = self.pre_processor_class.from_pretrained(self.pre_processor, **self.hub_kwargs) if isinstance(self.model, str): self.model = self.model_class.from_pretrained(self.model, **self.model_kwargs, **self.hub_kwargs) if self.post_processor is None: self.post_processor = self.pre_processor elif isinstance(self.post_processor, str): self.post_processor = self.post_processor_class.from_pretrained(self.post_processor, **self.hub_kwargs) if self.device is None: if self.device_map is not None: self.device = list(self.model.hf_device_map.values())[0] else: self.device = get_default_device() if self.device_map is None: self.model.to(self.device) super().setup() def encode(self, raw_inputs): """ Uses the `pre_processor` to prepare the inputs for the `model`. """ return self.pre_processor(raw_inputs) def forward(self, inputs): """ Sends the inputs through the `model`. """ with torch.no_grad(): return self.model(**inputs) def decode(self, outputs): """ Uses the `post_processor` to decode the model output. """ return self.post_processor(outputs) def __call__(self, *args, **kwargs): args, kwargs = handle_agent_inputs(*args, **kwargs) if not self.is_initialized: self.setup() encoded_inputs = self.encode(*args, **kwargs) encoded_inputs = send_to_device(encoded_inputs, self.device) outputs = self.forward(encoded_inputs) outputs = send_to_device(outputs, "cpu") decoded_outputs = self.decode(outputs) return handle_agent_outputs(decoded_outputs, self.outputs) def launch_gradio_demo(tool_class: Tool): """ Launches a gradio demo for a tool. The corresponding tool class needs to properly implement the class attributes `inputs` and `outputs`. Args: tool_class (`type`): The class of the tool for which to launch the demo. """ try: import gradio as gr except ImportError: raise ImportError("Gradio should be installed in order to launch a gradio demo.") tool = tool_class() def fn(*args, **kwargs): return tool(*args, **kwargs) gr.Interface( fn=fn, inputs=tool_class.inputs, outputs=tool_class.outputs, title=tool_class.__name__, article=tool.description, ).launch() # TODO: Migrate to Accelerate for this once `PartialState.default_device` makes its way into a release. def get_default_device(): if not is_torch_available(): raise ImportError("Please install torch in order to use this tool.") if torch.backends.mps.is_available() and torch.backends.mps.is_built(): return torch.device("mps") elif torch.cuda.is_available(): return torch.device("cuda") else: return torch.device("cpu") TASK_MAPPING = { "document-question-answering": "DocumentQuestionAnsweringTool", "image-captioning": "ImageCaptioningTool", "image-question-answering": "ImageQuestionAnsweringTool", "image-segmentation": "ImageSegmentationTool", "speech-to-text": "SpeechToTextTool", "summarization": "TextSummarizationTool", "text-classification": "TextClassificationTool", "text-question-answering": "TextQuestionAnsweringTool", "text-to-speech": "TextToSpeechTool", "translation": "TranslationTool", } def get_default_endpoints(): endpoints_file = cached_file("huggingface-tools/default-endpoints", "default_endpoints.json", repo_type="dataset") with open(endpoints_file, "r", encoding="utf-8") as f: endpoints = json.load(f) return endpoints def supports_remote(task_or_repo_id): endpoints = get_default_endpoints() return task_or_repo_id in endpoints def load_tool(task_or_repo_id, model_repo_id=None, remote=False, token=None, **kwargs): """ Main function to quickly load a tool, be it on the Hub or in the Transformers library. Args: task_or_repo_id (`str`): The task for which to load the tool or a repo ID of a tool on the Hub. Tasks implemented in Transformers are: - `"document-question-answering"` - `"image-captioning"` - `"image-question-answering"` - `"image-segmentation"` - `"speech-to-text"` - `"summarization"` - `"text-classification"` - `"text-question-answering"` - `"text-to-speech"` - `"translation"` model_repo_id (`str`, *optional*): Use this argument to use a different model than the default one for the tool you selected. remote (`bool`, *optional*, defaults to `False`): Whether to use your tool by downloading the model or (if it is available) with an inference endpoint. token (`str`, *optional*): The token to identify you on hf.co. If unset, will use the token generated when running `huggingface-cli login` (stored in `~/.huggingface`). kwargs (additional keyword arguments, *optional*): Additional keyword arguments that will be split in two: all arguments relevant to the Hub (such as `cache_dir`, `revision`, `subfolder`) will be used when downloading the files for your tool, and the others will be passed along to its init. """ if task_or_repo_id in TASK_MAPPING: tool_class_name = TASK_MAPPING[task_or_repo_id] main_module = importlib.import_module("transformers") tools_module = main_module.tools tool_class = getattr(tools_module, tool_class_name) if remote: if model_repo_id is None: endpoints = get_default_endpoints() if task_or_repo_id not in endpoints: raise ValueError( f"Could not infer a default endpoint for {task_or_repo_id}, you need to pass one using the " "`model_repo_id` argument." ) model_repo_id = endpoints[task_or_repo_id] return RemoteTool(model_repo_id, token=token, tool_class=tool_class) else: return tool_class(model_repo_id, token=token, **kwargs) else: return Tool.from_hub(task_or_repo_id, model_repo_id=model_repo_id, token=token, remote=remote, **kwargs) def add_description(description): """ A decorator that adds a description to a function. """ def inner(func): func.description = description func.name = func.__name__ return func return inner ## Will move to the Hub class EndpointClient: def __init__(self, endpoint_url: str, token: Optional[str] = None): self.headers = {**build_hf_headers(token=token), "Content-Type": "application/json"} self.endpoint_url = endpoint_url @staticmethod def encode_image(image): _bytes = io.BytesIO() image.save(_bytes, format="PNG") b64 = base64.b64encode(_bytes.getvalue()) return b64.decode("utf-8") @staticmethod def decode_image(raw_image): if not is_vision_available(): raise ImportError( "This tool returned an image but Pillow is not installed. Please install it (`pip install Pillow`)." ) from PIL import Image b64 = base64.b64decode(raw_image) _bytes = io.BytesIO(b64) return Image.open(_bytes) def __call__( self, inputs: Optional[Union[str, Dict, List[str], List[List[str]]]] = None, params: Optional[Dict] = None, data: Optional[bytes] = None, output_image: bool = False, ) -> Any: # Build payload payload = {} if inputs: payload["inputs"] = inputs if params: payload["parameters"] = params # Make API call response = get_session().post(self.endpoint_url, headers=self.headers, json=payload, data=data) # By default, parse the response for the user. if output_image: return self.decode_image(response.content) else: return response.json()