|  | from transformers import AutoTokenizer, PreTrainedTokenizerFast | 
					
						
						|  | from transformers.tokenization_utils_base import AddedToken | 
					
						
						|  | from http.server import HTTPServer, BaseHTTPRequestHandler | 
					
						
						|  | import json | 
					
						
						|  | import argparse | 
					
						
						|  |  | 
					
						
						|  | def _prompt_split_image( | 
					
						
						|  | image_seq_len, | 
					
						
						|  | image_rows, | 
					
						
						|  | image_cols, | 
					
						
						|  | fake_token_around_image, | 
					
						
						|  | image_token, | 
					
						
						|  | global_img_token, | 
					
						
						|  | ): | 
					
						
						|  | """Prompt with expanded image tokens for when the image is split into patches.""" | 
					
						
						|  | text_split_images = "" | 
					
						
						|  | for n_h in range(image_rows): | 
					
						
						|  | for n_w in range(image_cols): | 
					
						
						|  | text_split_images += ( | 
					
						
						|  | f"{fake_token_around_image}" | 
					
						
						|  | + f"<row_{n_h + 1}_col_{n_w + 1}>" | 
					
						
						|  | + f"{image_token}" * image_seq_len | 
					
						
						|  | ) | 
					
						
						|  | text_split_images += "\n" | 
					
						
						|  |  | 
					
						
						|  | text_split_images += ( | 
					
						
						|  | f"\n{fake_token_around_image}" | 
					
						
						|  | + f"{global_img_token}" | 
					
						
						|  | + f"{image_token}" * image_seq_len | 
					
						
						|  | + f"{fake_token_around_image}" | 
					
						
						|  | ) | 
					
						
						|  | return text_split_images | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _prompt_single_image( | 
					
						
						|  | image_seq_len, fake_token_around_image, image_token, global_img_token | 
					
						
						|  | ): | 
					
						
						|  | """Prompt with expanded image tokens for a single image.""" | 
					
						
						|  | return ( | 
					
						
						|  | f"{fake_token_around_image}" | 
					
						
						|  | + f"{global_img_token}" | 
					
						
						|  | + f"{image_token}" * image_seq_len | 
					
						
						|  | + f"{fake_token_around_image}" | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_image_prompt_string( | 
					
						
						|  | image_rows, | 
					
						
						|  | image_cols, | 
					
						
						|  | image_seq_len, | 
					
						
						|  | fake_token_around_image, | 
					
						
						|  | image_token, | 
					
						
						|  | global_img_token, | 
					
						
						|  | ): | 
					
						
						|  | if image_rows == 0 and image_cols == 0: | 
					
						
						|  | return _prompt_single_image( | 
					
						
						|  | image_seq_len, | 
					
						
						|  | fake_token_around_image=fake_token_around_image, | 
					
						
						|  | image_token=image_token, | 
					
						
						|  | global_img_token=global_img_token, | 
					
						
						|  | ) | 
					
						
						|  | return _prompt_split_image( | 
					
						
						|  | image_seq_len, | 
					
						
						|  | image_rows, | 
					
						
						|  | image_cols, | 
					
						
						|  | fake_token_around_image, | 
					
						
						|  | image_token, | 
					
						
						|  | global_img_token, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | class Tokenizer_Http(): | 
					
						
						|  |  | 
					
						
						|  | def __init__(self): | 
					
						
						|  |  | 
					
						
						|  | path = 'qwen2_5-vl-tokenizer' | 
					
						
						|  | self.tokenizer = AutoTokenizer.from_pretrained(path, | 
					
						
						|  | trust_remote_code=True, | 
					
						
						|  | use_fast=False) | 
					
						
						|  |  | 
					
						
						|  | def encode(self, content): | 
					
						
						|  | text = [f'<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\n{content}<|im_end|>\n<|im_start|>assistant\n'] | 
					
						
						|  |  | 
					
						
						|  | input_ids = self.tokenizer(text) | 
					
						
						|  | return input_ids["input_ids"][0] | 
					
						
						|  |  | 
					
						
						|  | def encode_vpm(self, content="Describe this image."): | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | text = f'<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\n<|vision_start|>' + '<|image_pad|>' * 256 + f'<|vision_end|>{content}<|im_end|>\n<|im_start|>assistant\n' | 
					
						
						|  |  | 
					
						
						|  | output_kwargs = {'text_kwargs': {'padding': True, 'return_tensors': 'pt'}, 'images_kwargs': {'return_tensors': 'pt'}, 'audio_kwargs': {'padding': True, 'return_tensors': 'pt'}, 'videos_kwargs': {'fps': 2.0, 'return_tensors': 'pt'}, 'common_kwargs': {'return_tensors': 'pt'}} | 
					
						
						|  |  | 
					
						
						|  | text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"]) | 
					
						
						|  | return text_inputs["input_ids"].tolist()[0] | 
					
						
						|  |  | 
					
						
						|  | def decode(self, token_ids): | 
					
						
						|  | return self.tokenizer.decode(token_ids, | 
					
						
						|  | clean_up_tokenization_spaces=False) | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def bos_id(self): | 
					
						
						|  | return self.tokenizer.bos_token_id | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def eos_id(self): | 
					
						
						|  | return self.tokenizer.eos_token_id | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def bos_token(self): | 
					
						
						|  | return self.tokenizer.bos_token | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def eos_token(self): | 
					
						
						|  | return self.tokenizer.eos_token | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def img_start_token(self): | 
					
						
						|  | return self.tokenizer.encode("<|vision_start|>")[0] | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def img_context_token(self): | 
					
						
						|  | return self.tokenizer.encode("<|image_pad|>")[0] | 
					
						
						|  |  | 
					
						
						|  | tokenizer = Tokenizer_Http() | 
					
						
						|  |  | 
					
						
						|  | print(tokenizer.bos_id, tokenizer.bos_token, tokenizer.eos_id, | 
					
						
						|  | tokenizer.eos_token) | 
					
						
						|  | token_ids = tokenizer.encode_vpm() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | print(token_ids) | 
					
						
						|  | print(len(token_ids)) | 
					
						
						|  | token_ids = tokenizer.encode("hello world") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | print(token_ids) | 
					
						
						|  | print(len(token_ids)) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class Request(BaseHTTPRequestHandler): | 
					
						
						|  |  | 
					
						
						|  | timeout = 5 | 
					
						
						|  | server_version = 'Apache' | 
					
						
						|  |  | 
					
						
						|  | def do_GET(self): | 
					
						
						|  | print(self.path) | 
					
						
						|  |  | 
					
						
						|  | self.send_response(200) | 
					
						
						|  | self.send_header("type", "get") | 
					
						
						|  | self.end_headers() | 
					
						
						|  |  | 
					
						
						|  | if self.path == '/bos_id': | 
					
						
						|  | bos_id = tokenizer.bos_id | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if bos_id is None: | 
					
						
						|  | msg = json.dumps({'bos_id': -1}) | 
					
						
						|  | else: | 
					
						
						|  | msg = json.dumps({'bos_id': bos_id}) | 
					
						
						|  | elif self.path == '/eos_id': | 
					
						
						|  | eos_id = tokenizer.eos_id | 
					
						
						|  | if eos_id is None: | 
					
						
						|  | msg = json.dumps({'eos_id': -1}) | 
					
						
						|  | else: | 
					
						
						|  | msg = json.dumps({'eos_id': eos_id}) | 
					
						
						|  | elif self.path == '/img_start_token': | 
					
						
						|  | img_start_token = tokenizer.img_start_token | 
					
						
						|  | if img_start_token is None: | 
					
						
						|  | msg = json.dumps({'img_start_token': -1}) | 
					
						
						|  | else: | 
					
						
						|  | msg = json.dumps({'img_start_token': img_start_token}) | 
					
						
						|  | elif self.path == '/img_context_token': | 
					
						
						|  | img_context_token = tokenizer.img_context_token | 
					
						
						|  | if img_context_token is None: | 
					
						
						|  | msg = json.dumps({'img_context_token': -1}) | 
					
						
						|  | else: | 
					
						
						|  | msg = json.dumps({'img_context_token': img_context_token}) | 
					
						
						|  | else: | 
					
						
						|  | msg = 'error' | 
					
						
						|  |  | 
					
						
						|  | print(msg) | 
					
						
						|  | msg = str(msg).encode() | 
					
						
						|  |  | 
					
						
						|  | self.wfile.write(msg) | 
					
						
						|  |  | 
					
						
						|  | def do_POST(self): | 
					
						
						|  |  | 
					
						
						|  | data = self.rfile.read(int( | 
					
						
						|  | self.headers['content-length'])) | 
					
						
						|  | data = data.decode() | 
					
						
						|  |  | 
					
						
						|  | self.send_response(200) | 
					
						
						|  | self.send_header("type", "post") | 
					
						
						|  | self.end_headers() | 
					
						
						|  |  | 
					
						
						|  | if self.path == '/encode': | 
					
						
						|  | req = json.loads(data) | 
					
						
						|  | print(req) | 
					
						
						|  | prompt = req['text'] | 
					
						
						|  | b_img_prompt = False | 
					
						
						|  | if 'img_prompt' in req: | 
					
						
						|  | b_img_prompt = req['img_prompt'] | 
					
						
						|  | if b_img_prompt: | 
					
						
						|  | token_ids = tokenizer.encode_vpm(prompt) | 
					
						
						|  | else: | 
					
						
						|  | token_ids = tokenizer.encode(prompt) | 
					
						
						|  |  | 
					
						
						|  | if token_ids is None: | 
					
						
						|  | msg = json.dumps({'token_ids': -1}) | 
					
						
						|  | else: | 
					
						
						|  | msg = json.dumps({'token_ids': token_ids}) | 
					
						
						|  |  | 
					
						
						|  | elif self.path == '/decode': | 
					
						
						|  | req = json.loads(data) | 
					
						
						|  | token_ids = req['token_ids'] | 
					
						
						|  | text = tokenizer.decode(token_ids) | 
					
						
						|  | if text is None: | 
					
						
						|  | msg = json.dumps({'text': ""}) | 
					
						
						|  | else: | 
					
						
						|  | msg = json.dumps({'text': text}) | 
					
						
						|  | else: | 
					
						
						|  | msg = 'error' | 
					
						
						|  | print(msg) | 
					
						
						|  | msg = str(msg).encode() | 
					
						
						|  |  | 
					
						
						|  | self.wfile.write(msg) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if __name__ == "__main__": | 
					
						
						|  |  | 
					
						
						|  | args = argparse.ArgumentParser() | 
					
						
						|  | args.add_argument('--host', type=str, default='localhost') | 
					
						
						|  | args.add_argument('--port', type=int, default=8080) | 
					
						
						|  | args = args.parse_args() | 
					
						
						|  |  | 
					
						
						|  | host = (args.host, args.port) | 
					
						
						|  | print('http://%s:%s' % host) | 
					
						
						|  | server = HTTPServer(host, Request) | 
					
						
						|  | server.serve_forever() | 
					
						
						|  |  |