Spaces:
Runtime error
Runtime error
from typing import Dict, List, Set, Tuple, Optional, Set | |
import argparse | |
from vllm.config import (CacheConfig, DeviceConfig, LoadConfig, LoRAConfig, | |
ModelConfig, ParallelConfig, SchedulerConfig, | |
SpeculativeConfig, VisionLanguageConfig) | |
from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase | |
from vllm.logger import init_logger | |
from vllm.lora.request import LoRARequest | |
from vllm.sequence import SamplerOutput, SequenceGroupMetadata | |
from vllm.utils import (get_distributed_init_method, get_ip, get_open_port, | |
make_async) | |
logger = init_logger(__name__) | |
class GPUExecutor(ExecutorBase): | |
def __init__( | |
self, | |
args: argparse.ArgumentParser, | |
model_config: ModelConfig, | |
cache_config: CacheConfig, | |
parallel_config: ParallelConfig, | |
scheduler_config: SchedulerConfig, | |
device_config: DeviceConfig, | |
load_config: LoadConfig, | |
lora_config: Optional[LoRAConfig], | |
vision_language_config: Optional[VisionLanguageConfig], | |
speculative_config: Optional[SpeculativeConfig], | |
) -> None: | |
self.args = args | |
self.model_config = model_config | |
self.cache_config = cache_config | |
self.lora_config = lora_config | |
self.load_config = load_config | |
self.parallel_config = parallel_config | |
self.scheduler_config = scheduler_config | |
self.device_config = device_config | |
self.vision_language_config = vision_language_config | |
self.speculative_config = speculative_config | |
self._init_executor() | |
def _init_executor(self) -> None: | |
"""Initialize the worker and load the model. | |
If speculative decoding is enabled, we instead create the speculative | |
worker. | |
""" | |
if self.speculative_config is None: | |
self._init_non_spec_worker() | |
else: | |
self._init_spec_worker() | |
def _init_non_spec_worker(self): | |
# Lazy import the Worker to avoid importing torch.cuda/xformers | |
# before CUDA_VISIBLE_DEVICES is set in the Worker | |
# from vllm.worker.worker import Worker | |
from serve.worker import Worker | |
assert self.parallel_config.world_size == 1, ( | |
"GPUExecutor only supports single GPU.") | |
distributed_init_method = get_distributed_init_method( | |
get_ip(), get_open_port()) | |
self.driver_worker = Worker( | |
model_config=self.model_config, | |
parallel_config=self.parallel_config, | |
scheduler_config=self.scheduler_config, | |
device_config=self.device_config, | |
cache_config=self.cache_config, | |
load_config=self.load_config, | |
local_rank=0, | |
rank=0, | |
distributed_init_method=distributed_init_method, | |
lora_config=self.lora_config, | |
vision_language_config=self.vision_language_config, | |
is_driver_worker=True, | |
) | |
self.driver_worker.init_device() | |
self.driver_worker.load_model(self.args) | |
def _init_spec_worker(self): | |
"""Initialize a SpecDecodeWorker, using a draft model for proposals. | |
""" | |
assert self.speculative_config is not None | |
from vllm.spec_decode.multi_step_worker import MultiStepWorker | |
from vllm.spec_decode.spec_decode_worker import SpecDecodeWorker | |
from vllm.worker.worker import Worker | |
distributed_init_method = get_distributed_init_method( | |
get_ip(), get_open_port()) | |
target_worker = Worker( | |
model_config=self.model_config, | |
parallel_config=self.parallel_config, | |
scheduler_config=self.scheduler_config, | |
device_config=self.device_config, | |
cache_config=self.cache_config, | |
load_config=self.load_config, | |
local_rank=0, | |
rank=0, | |
distributed_init_method=distributed_init_method, | |
lora_config=self.lora_config, | |
vision_language_config=self.vision_language_config, | |
is_driver_worker=True, | |
) | |
draft_worker = MultiStepWorker( | |
model_config=self.speculative_config.draft_model_config, | |
parallel_config=self.speculative_config.draft_parallel_config, | |
scheduler_config=self.scheduler_config, | |
device_config=self.device_config, | |
cache_config=self.cache_config, | |
load_config=self.load_config, | |
local_rank=0, | |
rank=0, | |
distributed_init_method=distributed_init_method, | |
lora_config=self.lora_config, | |
vision_language_config=self.vision_language_config, | |
is_driver_worker=True, | |
) | |
spec_decode_worker = SpecDecodeWorker.from_workers( | |
proposer_worker=draft_worker, scorer_worker=target_worker) | |
assert self.parallel_config.world_size == 1, ( | |
"GPUExecutor only supports single GPU.") | |
self.driver_worker = spec_decode_worker | |
# Load model handled in spec decode worker. | |
self.driver_worker.init_device() | |
def determine_num_available_blocks(self) -> Tuple[int, int]: | |
"""Determine the number of available KV blocks by invoking the | |
underlying worker. | |
""" | |
return self.driver_worker.determine_num_available_blocks() | |
def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks) -> None: | |
"""Initialize the KV cache by invoking the underlying worker. | |
""" | |
# NOTE: This is logged in the executor because there can be >1 worker | |
# with other executors. We could log in the engine level, but work | |
# remains to abstract away the device for non-GPU configurations. | |
logger.info(f"# GPU blocks: {num_gpu_blocks}, " | |
f"# CPU blocks: {num_cpu_blocks}") | |
self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks) | |
def execute_model( | |
self, | |
seq_group_metadata_list: List[SequenceGroupMetadata], | |
blocks_to_swap_in: Dict[int, int], | |
blocks_to_swap_out: Dict[int, int], | |
blocks_to_copy: Dict[int, List[int]], | |
num_lookahead_slots: int, | |
) -> List[SamplerOutput]: | |
output = self.driver_worker.execute_model( | |
seq_group_metadata_list=seq_group_metadata_list, | |
blocks_to_swap_in=blocks_to_swap_in, | |
blocks_to_swap_out=blocks_to_swap_out, | |
blocks_to_copy=blocks_to_copy, | |
num_lookahead_slots=num_lookahead_slots, | |
) | |
return output | |
def add_lora(self, lora_request: LoRARequest) -> bool: | |
assert lora_request.lora_int_id > 0, "lora_id must be greater than 0." | |
return self.driver_worker.add_lora(lora_request) | |
def remove_lora(self, lora_id: int) -> bool: | |
assert lora_id > 0, "lora_id must be greater than 0." | |
return self.driver_worker.remove_lora(lora_id) | |
def list_loras(self) -> Set[int]: | |
return self.driver_worker.list_loras() | |
def check_health(self) -> None: | |
# GPUExecutor will always be healthy as long as | |
# it's running. | |
return | |
class GPUExecutorAsync(GPUExecutor, ExecutorAsyncBase): | |
async def execute_model_async( | |
self, | |
seq_group_metadata_list: List[SequenceGroupMetadata], | |
blocks_to_swap_in: Dict[int, int], | |
blocks_to_swap_out: Dict[int, int], | |
blocks_to_copy: Dict[int, List[int]], | |
) -> SamplerOutput: | |
output = await make_async(self.driver_worker.execute_model)( | |
seq_group_metadata_list=seq_group_metadata_list, | |
blocks_to_swap_in=blocks_to_swap_in, | |
blocks_to_swap_out=blocks_to_swap_out, | |
blocks_to_copy=blocks_to_copy) | |
return output |