|
import ast |
|
import json |
|
import xml.etree.ElementTree as ET |
|
|
|
import numpy as np |
|
import pandas as pd |
|
from datasets import Dataset |
|
|
|
from opencompass.openicl.icl_evaluator import BaseEvaluator |
|
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET |
|
|
|
from ..base import BaseDataset |
|
from .prompts import tspPrompts |
|
|
|
|
|
def q2text(q, p=tspPrompts): |
|
total_cities = q.shape[0] |
|
prompt_text = p['Intro'] + '\n' \ |
|
+ p['Initial_question'].format(total_cities=total_cities) + '\n' \ |
|
+ p['Output_content'] + '\n' \ |
|
+ p['Output_format'] + \ |
|
'\n The distances between cities are below: \n' |
|
for i in range(q.shape[0]): |
|
for j in range(q.shape[1]): |
|
if i < j: |
|
this_line = 'The path between City {} and City {} is with distance {}.'.format(i, j, q.iloc[i, j]) |
|
prompt_text += this_line + '\n' |
|
return prompt_text |
|
|
|
|
|
@LOAD_DATASET.register_module(force=True) |
|
class hard_TSP_Dataset(BaseDataset): |
|
|
|
@staticmethod |
|
def load(path: str): |
|
raw_data = [] |
|
data_path = path |
|
all_data = [] |
|
for level in range(10): |
|
for file_num in range(10): |
|
|
|
df = pd.read_csv(data_path + 'synthesized_data_TSP_level_{}_instance_{}.csv'.format(level, file_num + 1), |
|
header=None, |
|
index_col=False) |
|
|
|
all_data.append((level + 1, df)) |
|
for (level, q) in all_data: |
|
prompt = q2text(q) |
|
raw_data.append({ |
|
'prompt': prompt, |
|
'q': str(level) + '####\n' + json.dumps(q.to_json()), |
|
'level': level |
|
}) |
|
dataset = Dataset.from_list(raw_data) |
|
return dataset |
|
|
|
|
|
@ICL_EVALUATORS.register_module(force=True) |
|
class hard_TSP_Evaluator(BaseEvaluator): |
|
|
|
def score(self, predictions, references): |
|
assert len(predictions) == len(references) |
|
|
|
result = {'pass': 0, 'fail': 0} |
|
for index, (q, output) in enumerate(zip(references, predictions)): |
|
output_dict = {} |
|
level = int(q.split('####\n')[0]) |
|
q = json.loads(q.split('####\n')[-1]) |
|
q = pd.DataFrame(eval(q)) |
|
|
|
output_dict['output'] = output |
|
try: |
|
output_dict['correctness'], _ = self.tspCheck(q, output) |
|
except Exception as e: |
|
print(f'Check failed: {e}') |
|
output_dict['correctness'] = False |
|
output_dict['level'] = level |
|
|
|
if output_dict['correctness']: |
|
r = 'pass' |
|
else: |
|
r = 'fail' |
|
result[r] += level |
|
|
|
result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100 |
|
final_result = {'Weighted Accuracy': result['score']} |
|
return final_result |
|
|
|
def parse_xml_to_dict(self, xml_string): |
|
try: |
|
|
|
root = ET.fromstring(xml_string) |
|
|
|
|
|
final_answer_element = root.find('final_answer') |
|
|
|
|
|
reasoning_element = root.find('reasoning') |
|
except: |
|
try: |
|
assert '<final_answer>' in xml_string |
|
assert '</final_answer>' in xml_string |
|
assert '<reasoning>' in xml_string |
|
assert '</reasoning>' in xml_string |
|
final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>') |
|
final_answer_end = xml_string.index('</final_answer>') |
|
reasoning_start = xml_string.index('<reasoning>') + len('<reasoning>') |
|
reasoning_end = xml_string.index('</reasoning>') |
|
final_answer_element = xml_string[final_answer_start:final_answer_end] |
|
reasoning_element = xml_string[reasoning_start:reasoning_end] |
|
except: |
|
final_answer_element = '' |
|
reasoning_element = '' |
|
|
|
return final_answer_element, reasoning_element |
|
|
|
def tspCheck(self, distance_matrix, llm_string): |
|
"""Check if the TSP solution is complete and if the distance matches |
|
the greedy solution. |
|
|
|
:param tour_string: String representing the TSP tour in the format "0->1->2->...->N->0" |
|
:param distance_matrix: 2D numpy array representing the distances between cities |
|
:return: Boolean indicating whether the tour is complete and matches the greedy distance |
|
""" |
|
|
|
distance_matrix = np.array(distance_matrix) |
|
|
|
|
|
|
|
final_answer_element, reasoning_element = self.parse_xml_to_dict(llm_string) |
|
|
|
if final_answer_element == '': |
|
return False, '' |
|
elif final_answer_element is None: |
|
return False, '' |
|
else: |
|
if isinstance(final_answer_element, str): |
|
try: |
|
tour_string = ast.literal_eval(final_answer_element)['Path'] |
|
if tour_string is None: |
|
return False, '' |
|
except Exception: |
|
try: |
|
tour_string = ast.literal_eval('{' + final_answer_element + '}')['Path'] |
|
if tour_string is None: |
|
return False, '' |
|
except Exception: |
|
return False, '' |
|
else: |
|
try: |
|
tour_string = ast.literal_eval(final_answer_element.text)['Path'] |
|
if tour_string is None: |
|
return False, '' |
|
except Exception: |
|
return False, '' |
|
try: |
|
tour = list(map(int, tour_string.split('->'))) |
|
except Exception: |
|
return False, '' |
|
|
|
|
|
|
|
|
|
if tour[0] != tour[-1]: |
|
return False, 'The tour must start and end at the same city.' |
|
|
|
|
|
if len(tour) != len(distance_matrix) + 1: |
|
return False, 'The tour does not visit all cities exactly once.' |
|
|
|
|
|
tour_distance = sum(distance_matrix[tour[i]][tour[i + 1]] |
|
for i in range(len(tour) - 1)) |
|
|
|
|
|
greedy_tour, greedy_distance = self.greedy_tsp(distance_matrix) |
|
|
|
|
|
if tour_distance != greedy_distance: |
|
return False, f'The tour distance ({tour_distance}) does not match the greedy solution ({greedy_distance}).' |
|
|
|
return True, 'The solution is complete and matches the greedy solution distance.' |
|
|
|
def greedy_tsp(self, distance_matrix): |
|
"""Solve the Traveling Salesman Problem using a greedy algorithm. |
|
|
|
:param distance_matrix: 2D numpy array where the element at [i, j] is the distance between city i and j |
|
:return: A tuple containing a list of the cities in the order they were visited and the total distance |
|
""" |
|
num_cities = distance_matrix.shape[0] |
|
unvisited_cities = set(range(num_cities)) |
|
current_city = np.random.choice(list(unvisited_cities)) |
|
tour = [current_city] |
|
total_distance = 0 |
|
|
|
while unvisited_cities: |
|
unvisited_cities.remove(current_city) |
|
if unvisited_cities: |
|
|
|
distances_to_unvisited = distance_matrix[current_city][list(unvisited_cities)] |
|
nearest_city = list(unvisited_cities)[np.argmin(distances_to_unvisited)] |
|
tour.append(nearest_city) |
|
|
|
total_distance += distance_matrix[current_city, nearest_city] |
|
current_city = nearest_city |
|
|
|
|
|
total_distance += distance_matrix[current_city, tour[0]] |
|
tour.append(tour[0]) |
|
|
|
return tour, total_distance |
|
|