| import multiprocessing as mp |
| import random |
| import time |
| import unittest |
|
|
| import torch |
| from transformers import AutoConfig, AutoTokenizer |
|
|
| from sglang.test.ci.ci_register import register_cuda_ci |
| from sglang.test.runners import DEFAULT_PROMPTS, HFRunner, SRTRunner |
| from sglang.test.test_utils import CustomTestCase, get_similarities, is_in_ci |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
|
|
| register_cuda_ci(est_time=270, suite="stage-b-test-small-1-gpu") |
|
|
| MODELS = [("BAAI/bge-small-en", 1, 1e-5), ("BAAI/bge-m3", 1, 1e-5)] |
|
|
| ATTENTION_BACKEND = ["torch_native", "triton", "flashinfer"] |
| BATCH_SIZE = [1, 2] |
| TORCH_DTYPES = [torch.float32, torch.float16] |
| sgl_to_st_ratio = [] |
|
|
|
|
| class TestEncoderEmbeddingModels(CustomTestCase): |
|
|
| @classmethod |
| def setUpClass(cls): |
| mp.set_start_method("spawn", force=True) |
|
|
| def _truncate_prompts(self, prompts, model_path): |
| config = AutoConfig.from_pretrained(model_path) |
| max_length = getattr(config, "max_position_embeddings", 512) - 20 |
|
|
| tokenizer = AutoTokenizer.from_pretrained(model_path) |
|
|
| truncated_prompts = [] |
| for prompt in prompts: |
| tokens = tokenizer(prompt, return_tensors="pt", truncation=False) |
| if len(tokens.input_ids[0]) > max_length: |
| truncated_text = tokenizer.decode( |
| tokens.input_ids[0][: max_length - 1], skip_special_tokens=True |
| ) |
| truncated_prompts.append(truncated_text) |
| else: |
| truncated_prompts.append(prompt) |
|
|
| return truncated_prompts |
|
|
| def assert_close_prefill_logits( |
| self, |
| prompts, |
| model_path, |
| tp_size, |
| torch_dtype, |
| prefill_tolerance, |
| attention_backend, |
| batch_size, |
| ) -> None: |
| truncated_prompts = self._truncate_prompts(prompts, model_path) |
| truncated_prompts = truncated_prompts * batch_size |
|
|
| with HFRunner( |
| model_path, |
| torch_dtype=torch_dtype, |
| model_type="embedding", |
| ) as hf_runner: |
| |
| hf_outputs = hf_runner.forward(truncated_prompts) |
|
|
| st_start_time = time.perf_counter() |
| hf_outputs = hf_runner.forward(truncated_prompts) |
| st_end_time = time.perf_counter() |
|
|
| with SRTRunner( |
| model_path, |
| tp_size=tp_size, |
| torch_dtype=torch_dtype, |
| model_type="embedding", |
| attention_backend=attention_backend, |
| chunked_prefill_size=-1, |
| disable_radix_cache=True, |
| ) as srt_runner: |
| |
| srt_outputs = srt_runner.forward(truncated_prompts) |
|
|
| sgl_start_time = time.perf_counter() |
| srt_outputs = srt_runner.forward(truncated_prompts) |
| sgl_end_time = time.perf_counter() |
|
|
| transformer_time = st_end_time - st_start_time |
| sgl_time = sgl_end_time - sgl_start_time |
| sgl_to_st_ratio.append(sgl_time / transformer_time) |
|
|
| for i in range(len(truncated_prompts)): |
| hf_logits = torch.Tensor(hf_outputs.embed_logits[i]) |
| srt_logits = torch.Tensor(srt_outputs.embed_logits[i]) |
|
|
| similarity = torch.tensor(get_similarities(hf_logits, srt_logits)) |
| |
| |
|
|
| if len(truncated_prompts[i]) <= 1000: |
| assert torch.all( |
| abs(similarity - 1) < prefill_tolerance |
| ), "embeddings are not all close" |
|
|
| def test_prefill_logits(self): |
| models_to_test = MODELS |
|
|
| if is_in_ci(): |
| models_to_test = [random.choice(MODELS)] |
|
|
| for model, tp_size, prefill_tolerance in models_to_test: |
| for attention_backend in ATTENTION_BACKEND: |
| for batch_size in BATCH_SIZE: |
| for torch_dtype in TORCH_DTYPES: |
| |
| |
| |
| |
| |
| |
| if attention_backend == "flashinfer": |
| if ( |
| model == "BAAI/bge-small-en" |
| or torch_dtype == torch.float32 |
| ): |
| continue |
|
|
| self.assert_close_prefill_logits( |
| DEFAULT_PROMPTS, |
| model, |
| tp_size, |
| torch_dtype, |
| prefill_tolerance, |
| attention_backend, |
| batch_size, |
| ) |
|
|
| for i in range(len(BATCH_SIZE)): |
| print( |
| "bacth size: ", |
| BATCH_SIZE[i] * 5, |
| "sgl_time/st_time", |
| round(sgl_to_st_ratio[i], 3), |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| unittest.main() |
|
|