"""Wrapper around HuggingFace Pipeline API to run on self-hosted remote hardware.""" import importlib.util import logging from typing import Any, Callable, List, Mapping, Optional from pydantic import BaseModel, Extra from langchain.llms.self_hosted import SelfHostedPipeline from langchain.llms.utils import enforce_stop_tokens DEFAULT_MODEL_ID = "gpt2" DEFAULT_TASK = "text-generation" VALID_TASKS = ("text2text-generation", "text-generation") logger = logging.getLogger() def _generate_text( pipeline: Any, prompt: str, *args: Any, stop: Optional[List[str]] = None, **kwargs: Any, ) -> str: """Inference function to send to the remote hardware. Accepts a Hugging Face pipeline (or more likely, a key pointing to such a pipeline on the cluster's object store) and returns generated text. """ response = pipeline(prompt, *args, **kwargs) if pipeline.task == "text-generation": # Text generation return includes the starter text. text = response[0]["generated_text"][len(prompt) :] elif pipeline.task == "text2text-generation": text = response[0]["generated_text"] else: raise ValueError( f"Got invalid task {pipeline.task}, " f"currently only {VALID_TASKS} are supported" ) if stop is not None: text = enforce_stop_tokens(text, stop) return text def _load_transformer( model_id: str = DEFAULT_MODEL_ID, task: str = DEFAULT_TASK, device: int = 0, model_kwargs: Optional[dict] = None, ) -> Any: """Inference function to send to the remote hardware. Accepts a huggingface model_id and returns a pipeline for the task. """ from transformers import AutoModelForCausalLM, AutoModelForSeq2SeqLM, AutoTokenizer from transformers import pipeline as hf_pipeline _model_kwargs = model_kwargs or {} tokenizer = AutoTokenizer.from_pretrained(model_id, **_model_kwargs) try: if task == "text-generation": model = AutoModelForCausalLM.from_pretrained(model_id, **_model_kwargs) elif task == "text2text-generation": model = AutoModelForSeq2SeqLM.from_pretrained(model_id, **_model_kwargs) else: raise ValueError( f"Got invalid task {task}, " f"currently only {VALID_TASKS} are supported" ) except ImportError as e: raise ValueError( f"Could not load the {task} model due to missing dependencies." ) from e if importlib.util.find_spec("torch") is not None: import torch cuda_device_count = torch.cuda.device_count() if device < -1 or (device >= cuda_device_count): raise ValueError( f"Got device=={device}, " f"device is required to be within [-1, {cuda_device_count})" ) if device < 0 and cuda_device_count > 0: logger.warning( "Device has %d GPUs available. " "Provide device={deviceId} to `from_model_id` to use available" "GPUs for execution. deviceId is -1 for CPU and " "can be a positive integer associated with CUDA device id.", cuda_device_count, ) pipeline = hf_pipeline( task=task, model=model, tokenizer=tokenizer, device=device, model_kwargs=_model_kwargs, ) if pipeline.task not in VALID_TASKS: raise ValueError( f"Got invalid task {pipeline.task}, " f"currently only {VALID_TASKS} are supported" ) return pipeline class SelfHostedHuggingFaceLLM(SelfHostedPipeline, BaseModel): """Wrapper around HuggingFace Pipeline API to run on self-hosted remote hardware. Supported hardware includes auto-launched instances on AWS, GCP, Azure, and Lambda, as well as servers specified by IP address and SSH credentials (such as on-prem, or another cloud like Paperspace, Coreweave, etc.). To use, you should have the ``runhouse`` python package installed. Only supports `text-generation` and `text2text-generation` for now. Example using from_model_id: .. code-block:: python from langchain.llms import SelfHostedHuggingFaceLLM import runhouse as rh gpu = rh.cluster(name="rh-a10x", instance_type="A100:1") hf = SelfHostedHuggingFaceLLM( model_id="google/flan-t5-large", task="text2text-generation", hardware=gpu ) Example passing fn that generates a pipeline (bc the pipeline is not serializable): .. code-block:: python from langchain.llms import SelfHostedHuggingFaceLLM from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline import runhouse as rh def get_pipeline(): model_id = "gpt2" tokenizer = AutoTokenizer.from_pretrained(model_id) model = AutoModelForCausalLM.from_pretrained(model_id) pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer ) return pipe hf = SelfHostedHuggingFaceLLM( model_load_fn=get_pipeline, model_id="gpt2", hardware=gpu) """ model_id: str = DEFAULT_MODEL_ID """Hugging Face model_id to load the model.""" task: str = DEFAULT_TASK """Hugging Face task (either "text-generation" or "text2text-generation").""" device: int = 0 """Device to use for inference. -1 for CPU, 0 for GPU, 1 for second GPU, etc.""" model_kwargs: Optional[dict] = None """Key word arguments to pass to the model.""" hardware: Any """Remote hardware to send the inference function to.""" model_reqs: List[str] = ["./", "transformers", "torch"] """Requirements to install on hardware to inference the model.""" model_load_fn: Callable = _load_transformer """Function to load the model remotely on the server.""" inference_fn: Callable = _generate_text #: :meta private: """Inference function to send to the remote hardware.""" class Config: """Configuration for this pydantic object.""" extra = Extra.forbid def __init__(self, **kwargs: Any): """Construct the pipeline remotely using an auxiliary function. The load function needs to be importable to be imported and run on the server, i.e. in a module and not a REPL or closure. Then, initialize the remote inference function. """ load_fn_kwargs = { "model_id": kwargs.get("model_id", DEFAULT_MODEL_ID), "task": kwargs.get("task", DEFAULT_TASK), "device": kwargs.get("device", 0), "model_kwargs": kwargs.get("model_kwargs", None), } super().__init__(load_fn_kwargs=load_fn_kwargs, **kwargs) @property def _identifying_params(self) -> Mapping[str, Any]: """Get the identifying parameters.""" return { **{"model_id": self.model_id}, **{"model_kwargs": self.model_kwargs}, } @property def _llm_type(self) -> str: return "selfhosted_huggingface_pipeline" def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: return self.client(pipeline=self.pipeline_ref, prompt=prompt, stop=stop)