Spaces:
Running
on
Zero
Running
on
Zero
File size: 6,144 Bytes
29f7f08 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
import json
import uuid
import re
from typing import List
import subprocess
import sys
def install(package):
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
try:
import pythonmonkey
except ImportError:
install('pythonmonkey')
import pythonmonkey
# Your code using pythonmonkey
# Assuming jsonrepair is accessible
jsonrepair = pythonmonkey.require('jsonrepair').jsonrepair
def clean_command_string(command_str):
cleaned_command = re.sub(r'\\(?!["\\/bfnrt]|u[a-fA-F0-9]{4})', '', command_str)
cleaned_command = cleaned_command.replace('\\"', '"')
if cleaned_command.startswith('"') and cleaned_command.endswith('"'):
cleaned_command = cleaned_command[1:-1]
return cleaned_command
def parse_json_safely(json_str):
try:
return json.loads(json_str)
except json.JSONDecodeError:
try:
repaired = jsonrepair(json_str)
return json.loads(repaired)
except Exception:
return json_str
def clean_json_object(obj):
if isinstance(obj, dict):
return {k: clean_json_object(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [clean_json_object(item) for item in obj]
elif isinstance(obj, str):
cleaned = clean_command_string(obj)
return parse_json_safely(cleaned) if cleaned.startswith('{') or cleaned.startswith('[') else cleaned
else:
return obj
def extract_tool_calls(output_str):
# Pattern to capture everything after 'starttoolcall' until 'endtoolcall' or end of string if 'endtoolcall' isn't present
pattern = r'starttoolcall(.*?)(?:endtoolcall|$)'
matches = [match for match in re.findall(pattern, output_str, re.DOTALL)]
return matches
def extract_tool_calls_and_text(output_str):
# Initialize an empty list to collect all segments
segments = []
# Last index processed in the string
last_end = 0
# Pattern to capture everything after 'starttoolcall' until 'endtoolcall' or end of string if 'endtoolcall' isn't present
pattern = r'(starttoolcall(.*?)(?:endtoolcall|$))'
for match in re.finditer(pattern, output_str, re.DOTALL):
start, end = match.span(1)
# Capture any text between the end of the last tool call and the start of the current one
if start > last_end:
text_between = output_str[last_end:start].strip()
if text_between:
segments.append({"text": text_between, "type": "text"})
# Append the current tool call to the list
tool_call_content = match.group(2).strip()
segments.append({"tool_call": tool_call_content, "type": "function"})
# Update the last processed index
last_end = end
# Check if there is any remaining text after the last tool call
if last_end < len(output_str):
remaining_text = output_str[last_end:].strip()
if remaining_text:
segments.append({"text": remaining_text, "type": "text"})
return segments
def postprocess_output(output_str: str):
segments = extract_tool_calls_and_text(output_str)
results = []
for segment in segments:
print("processing segment")
print(segment)
if segment['type'] == 'function':
call = segment['tool_call']
try:
parsed_call = parse_json_safely(call)
cleaned_call = clean_json_object(parsed_call)
if isinstance(cleaned_call, dict) and 'name' in cleaned_call and 'arguments' in cleaned_call:
if isinstance(cleaned_call.get('arguments'), dict):
cleaned_call['arguments'] = json.dumps(cleaned_call['arguments'])
results.append({
"id": uuid.uuid4().hex[:8],
"function": cleaned_call,
"type": "function",
})
else:
results.append({
"id": uuid.uuid4().hex[:8],
"text": call,
"type": "text",
})
except Exception as e:
results.append({
"id": uuid.uuid4().hex[:8],
"text": call,
"type": "text",
})
else:
results.append({
"id": uuid.uuid4().hex[:8],
"text": segment['text'],
"type": "text",
})
return results
def json_to_markdown(json_obj):
"""Convert a JSON object to a formatted markdown string."""
markdown = ""
for item in json_obj:
if item.get("type") == "text":
# For text items, just add the text content
markdown += item.get("text", "") + "\n\n"
elif item.get("type") == "function":
# For function calls, format as JSON
markdown += "```json\n"
markdown += json.dumps(item.get("function", {}), indent=2)
markdown += "\n```\n\n"
return markdown.strip()
if __name__ == "__main__":
# Test the function with a sample input
# output_str = '''Some text before starttoolcall{"name": "funcA", "arguments": {"param1": 1}endtoolcall
# More text starttoolcall{"name": "funcB", "arguments": {"param2": "test"}}endtoolcall'''
# output_str = '''starttoolcall{"name": "get_current_weather", "arguments": {"location": "San Francisco", "unit": "celsius"}}endtoolcall starttoolcall{"name": "get_current_weather", "arguments": {"location": "Tokyo", "unit": "celsius"}}endtoolcall okay great '''
output_str = '''starttoolcall{"name": "get_current_weather", "arguments": {"location": "San Francisco", "unit": "celsius"}}endtoolcall starttoolcall{"name": "get_current_weather", "arguments": {"location": "Tokyo", "unit": "celsius"}}endtoolcall starttoolcall{"name": "get_current_weather", "arguments": {"location": "Paris", "unit": '''
parsed_json = postprocess_output(output_str)
print(json.dumps(parsed_json, indent=2))
print("-----")
print(json_to_markdown(parsed_json)) |