| | import re |
| | import json |
| | import uuid |
| |
|
| | def parse_arguments(json_value): |
| | """ |
| | Attempt to parse a string as JSON |
| | |
| | Args: |
| | json_value: String to parse |
| | |
| | Returns: |
| | tuple: (parsed_value, is_valid_json) |
| | """ |
| | try: |
| | parsed_value = json.loads(json_value) |
| | return parsed_value, True |
| | except: |
| | return json_value, False |
| |
|
| | def get_argument_type(func_name: str, arg_key: str, defined_tools: list): |
| | """ |
| | Get the type definition of a tool parameter |
| | |
| | Args: |
| | func_name: Name of the function/tool |
| | arg_key: Parameter key name |
| | defined_tools: List of tool definitions |
| | |
| | Returns: |
| | str or None: Type of the parameter ('string', 'object', 'array', 'integer', 'number', 'boolean') |
| | """ |
| | name2tool = {tool["name"]: tool for tool in defined_tools} |
| | if func_name not in name2tool: |
| | return None |
| | tool = name2tool[func_name] |
| | if "parameters" not in tool or "properties" not in tool["parameters"]: |
| | return None |
| | if arg_key not in tool["parameters"]["properties"]: |
| | return None |
| | return tool["parameters"]["properties"][arg_key].get("type") |
| |
|
| | def parse_model_response(response: str, defined_tools: list=[]): |
| | """ |
| | Parse model response to extract reasoning_content, content, and tool_calls |
| | |
| | Args: |
| | response: Raw response text from the model |
| | defined_tools: List of tool definitions |
| | |
| | Returns: |
| | dict: Message containing role, reasoning_content (optional), content (optional), |
| | and tool_calls (optional) |
| | """ |
| | text = response |
| | reasoning_content = None |
| | content = None |
| | tool_calls = [] |
| | |
| | formatted_tools = [] |
| | for tool in defined_tools: |
| | if "function" in tool: |
| | formatted_tools.append(tool['function']) |
| | else: |
| | formatted_tools.append(tool) |
| | |
| | if '</longcat_think>' in text: |
| | text = text.replace('<longcat_think>', '') |
| | thinking_end = text.find('</longcat_think>') |
| | reasoning_content = text[: thinking_end].strip() |
| | text = text[thinking_end + len('</longcat_think>'):].lstrip() |
| | |
| | assert '<longcat_think>' not in text, "Unclosed <longcat_think> tag found in remaining text" |
| | assert '</longcat_think>' not in text, "Unexpected </longcat_think> tag found without opening tag" |
| | |
| | if '<longcat_tool_call>' in text: |
| | index = text.find('<longcat_tool_call>') |
| | content = text[:index] |
| | text = text[index:].strip() |
| | else: |
| | content = text |
| | text = "" |
| | |
| | open_tags = text.count('<longcat_tool_call>') |
| | close_tags = text.count('</longcat_tool_call>') |
| | assert open_tags == close_tags, \ |
| | f"Mismatched tool_call tags: {open_tags} opening tags, {close_tags} closing tags" |
| | |
| | tool_call_strs = re.findall( |
| | r'<longcat_tool_call>(.*?)</longcat_tool_call>', |
| | text, |
| | re.DOTALL |
| | ) |
| | |
| | for call in tool_call_strs: |
| | func_name_match = re.match(r'([^\n<]+)', call.strip()) |
| | assert func_name_match, f"Missing function name in tool call: {call[:100]}" |
| | |
| | func_name = func_name_match.group(1).strip() |
| | assert func_name, "Empty function name in tool call" |
| | |
| | |
| | arg_key_count = call.count('<longcat_arg_key>') |
| | arg_key_close_count = call.count('</longcat_arg_key>') |
| | arg_value_count = call.count('<longcat_arg_value>') |
| | arg_value_close_count = call.count('</longcat_arg_value>') |
| | |
| | assert arg_key_count == arg_key_close_count, \ |
| | f"Mismatched arg_key tags in function {func_name}: {arg_key_count} opening, {arg_key_close_count} closing" |
| | assert arg_value_count == arg_value_close_count, \ |
| | f"Mismatched arg_value tags in function {func_name}: {arg_value_count} opening, {arg_value_close_count} closing" |
| | assert arg_key_count == arg_value_count, \ |
| | f"Mismatched arg_key and arg_value count in function {func_name}: {arg_key_count} keys, {arg_value_count} values" |
| | |
| | pairs = re.findall( |
| | r'<longcat_arg_key>(.*?)</longcat_arg_key>\s*<longcat_arg_value>(.*?)</longcat_arg_value>', |
| | call, |
| | re.DOTALL |
| | ) |
| | |
| | assert len(pairs) == arg_key_count, \ |
| | f"Failed to parse all arguments in function {func_name}: expected {arg_key_count}, got {len(pairs)}" |
| | |
| | arguments = {} |
| | for arg_key, arg_value in pairs: |
| | arg_key = arg_key.strip() |
| | arg_value = arg_value.strip() |
| | |
| | assert arg_key, f"Empty argument key in function {func_name}" |
| | assert arg_key not in arguments, \ |
| | f"Duplicate argument key '{arg_key}' in function {func_name}" |
| | |
| | arg_type = get_argument_type(func_name, arg_key, formatted_tools) |
| | |
| | if arg_type and arg_type != 'string': |
| | parsed_value, is_good_json = parse_arguments(arg_value) |
| | arg_value = parsed_value |
| | |
| | arguments[arg_key] = arg_value |
| | |
| | tool_calls.append({ |
| | 'id': "tool-call-" + str(uuid.uuid4()), |
| | 'type': "function", |
| | 'function': { |
| | 'name': func_name, |
| | 'arguments': arguments |
| | } |
| | }) |
| | |
| | message = {'role': 'assistant'} |
| | |
| | if reasoning_content: |
| | message['reasoning_content'] = reasoning_content |
| | message['content'] = content |
| | if tool_calls: |
| | message['tool_calls'] = tool_calls |
| | |
| | return message |
| |
|
| | if __name__=="__main__": |
| | from transformers import AutoModelForCausalLM, AutoTokenizer |
| | from parse_model_response import parse_model_response |
| |
|
| | model_name = "meituan-longcat/LongCat-Flash-Lite" |
| | model = AutoModelForCausalLM.from_pretrained( |
| | model_name, |
| | torch_dtype="auto", |
| | device_map="auto", |
| | trust_remote_code=True |
| | ) |
| | tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) |
| |
|
| | messages = [ |
| | {"role": "system", "content": "You are a helpful assistant."}, |
| | {"role": "user", "content": "Give me a brief introduction to large language models."} |
| | ] |
| | input_ids = tokenizer.apply_chat_template( |
| | messages, |
| | add_generation_prompt=True, |
| | return_tensors="pt" |
| | ).to(model.device) |
| | generated_ids = model.generate(inputs=input_ids, max_new_tokens=256) |
| | output_ids = generated_ids[0][len(input_ids[0]):].tolist() |
| | response = tokenizer.decode(output_ids, skip_special_tokens=True).strip("\n") |
| | print("Example 1: sample response.") |
| | print("\nRaw response:") |
| | print(response) |
| | print("\nParsed result:") |
| |
|
| | response = tokenizer.decode(output_ids, skip_special_tokens=True).strip("\n") |
| | parsed_message = parse_model_response(response) |
| | print(json.dumps(parsed_message, indent=2, ensure_ascii=False)) |
| |
|
| | tools = [ |
| | { |
| | "type": "function", |
| | "function": { |
| | "name": "func_add", |
| | "description": "Calculate the sum of two numbers", |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "x1": {"type": "number", "description": "The first addend"}, |
| | "x2": {"type": "number", "description": "The second addend"} |
| | }, |
| | "required": ["x1", "x2"] |
| | } |
| | } |
| | } |
| | ] |
| | messages = [ |
| | {"role": "system", "content": "You are a helpful assistant."}, |
| | {"role": "user", "content": "Please tell me what is $$125679 + 234519$$?"}, |
| | |
| | |
| | |
| | |
| | |
| | |
| | ] |
| |
|
| | input_ids = tokenizer.apply_chat_template( |
| | messages, |
| | tools=tools, |
| | add_generation_prompt=True, |
| | return_tensors="pt" |
| | ).to(model.device) |
| | generated_ids = model.generate(inputs=input_ids, max_new_tokens=256) |
| | output_ids = generated_ids[0][len(input_ids[0]):].tolist() |
| | response = tokenizer.decode(output_ids, skip_special_tokens=True).strip("\n") |
| | print("Example 2: tool call response.") |
| | print("\nRaw response:") |
| | print(response) |
| | print("\nParsed result:") |
| | parsed_message = parse_model_response(response, tools) |
| | print(json.dumps(parsed_message, indent=2, ensure_ascii=False)) |
| |
|