|
import os |
|
import torch |
|
from typing import Dict, List, Any |
|
from vllm import LLM, SamplingParams |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
from peft import PeftModel, PeftConfig |
|
from loguru import logger |
|
|
|
class EndpointHandler: |
|
def __init__(self, path=""): |
|
logger.info("Initializing EndpointHandler") |
|
|
|
self.base_model_name = "deepseek-ai/DeepSeek-Coder-V2-Lite-Instruct" |
|
self.lora_model_id = "cobolcopilot/Order-DS-lora" |
|
self.max_model_len = 1024 |
|
self.merged_model_path = os.path.join(path, "merged_model") |
|
|
|
self.check_gpu() |
|
self.merge_lora_adapter() |
|
self.initialize_vllm() |
|
|
|
def check_gpu(self): |
|
if not torch.cuda.is_available(): |
|
logger.error("CUDA is not available. GPUs are required for this setup.") |
|
raise RuntimeError("CUDA is not available. GPUs are required for this setup.") |
|
|
|
num_gpus = torch.cuda.device_count() |
|
logger.info(f"Number of GPUs available: {num_gpus}") |
|
logger.info(f"GPU names: {[torch.cuda.get_device_name(i) for i in range(num_gpus)]}") |
|
|
|
def merge_lora_adapter(self): |
|
logger.info("Merging LoRA adapter with base model") |
|
try: |
|
base_model = AutoModelForCausalLM.from_pretrained(self.base_model_name, trust_remote_code=True, device_map="auto", torch_dtype=torch.bfloat16) |
|
logger.info("Base model loaded successfully") |
|
|
|
peft_model = PeftModel.from_pretrained(base_model, self.lora_model_id) |
|
logger.info("PeftModel created successfully") |
|
|
|
merged_model = peft_model.merge_and_unload() |
|
logger.info("Models merged successfully") |
|
|
|
merged_model.save_pretrained(self.merged_model_path) |
|
logger.info(f"Merged model saved to {self.merged_model_path}") |
|
except Exception as e: |
|
logger.error(f"Error during LoRA merging: {str(e)}") |
|
raise |
|
|
|
def initialize_vllm(self): |
|
logger.info("Initializing vLLM") |
|
try: |
|
self.tokenizer = AutoTokenizer.from_pretrained(self.merged_model_path) |
|
logger.info("Tokenizer initialized successfully") |
|
|
|
self.llm = LLM( |
|
model=self.merged_model_path, |
|
max_model_len=self.max_model_len, |
|
trust_remote_code=True, |
|
enforce_eager=True, |
|
tensor_parallel_size=1, |
|
dtype="auto", |
|
gpu_memory_utilization=0.9, |
|
max_num_batched_tokens=4096, |
|
max_num_seqs=8, |
|
) |
|
logger.info("vLLM engine initialized successfully") |
|
except Exception as e: |
|
logger.error(f"Error during vLLM initialization: {str(e)}") |
|
raise |
|
|
|
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
logger.info("Processing input request") |
|
input_text = data.get("inputs", "") |
|
max_new_tokens = data.get("max_new_tokens", 1024) |
|
temperature = data.get("temperature", 0.7) |
|
|
|
sampling_params = SamplingParams( |
|
temperature=temperature, |
|
max_tokens=max_new_tokens, |
|
stop_token_ids=[self.tokenizer.eos_token_id] |
|
) |
|
|
|
prompt_token_ids = self.tokenizer.encode(input_text, add_special_tokens=False) |
|
logger.info(f"Input tokenized. Token count: {len(prompt_token_ids)}") |
|
|
|
try: |
|
logger.info("Generating output") |
|
outputs = self.llm.generate( |
|
prompt_token_ids=[prompt_token_ids], |
|
sampling_params=sampling_params |
|
) |
|
|
|
generated_text = outputs[0].outputs[0].text |
|
logger.info(f"Output generated successfully. Length: {len(generated_text)}") |
|
|
|
return [{"generated_text": generated_text}] |
|
except Exception as e: |
|
logger.error(f"Error during generation: {str(e)}") |
|
return [{"error": f"Generation failed: {str(e)}"}] |
|
|
|
|
|
if __name__ == "__main__": |
|
handler = EndpointHandler() |
|
result = handler({"inputs": "Write a quicksort algorithm in Python.", "max_new_tokens": 500}) |
|
print(result) |