Spaces:
Running
Running
import configparser | |
import json | |
import logging | |
import os | |
import re | |
import string | |
from collections import Counter | |
from pathlib import Path | |
from typing import Optional | |
import jieba | |
from rouge import Rouge | |
from prompt import ( | |
gpt4_templates, | |
kimi_templates, | |
claude2_templates, | |
yarn_mistral_templates, | |
) | |
DATA_NAME_TO_PATH = { | |
# Retrieval tasks | |
"passkey": "passkey.jsonl", | |
"number_string": "number_string.jsonl", | |
"kv_retrieval": "kv_retrieval.jsonl", | |
# Book tasks | |
"longbook_sum_eng": "longbook_sum_eng.jsonl", | |
"longbook_choice_eng": "longbook_choice_eng.jsonl", | |
"longbook_qa_eng": "longbook_qa_eng.jsonl", | |
"longbook_qa_chn": "longbook_qa_chn.jsonl", | |
# "book_qa_eng": "longbook_eng/longbook_qa_eng.jsonl", | |
"longdialogue_qa_eng": "longdialogue_qa_eng.jsonl", | |
# Math tasks | |
"math_find": "math_find.jsonl", | |
"math_calc": "math_calc.jsonl", | |
# Code tasks | |
"code_run": "code_run.jsonl", | |
"code_debug": "code_debug.jsonl", | |
} | |
DATA_NAME_TO_MAX_NEW_TOKENS = { | |
"passkey": 6, | |
"number_string": 12, | |
"kv_retrieval": 50, | |
"longbook_sum_eng": 1200, | |
"longbook_choice_eng": 40, | |
"longbook_qa_eng": 40, | |
"longbook_qa_chn": 40, | |
"longdialogue_qa_eng": 40, | |
"math_find": 3, | |
"math_calc": 30000, | |
"code_run": 5, | |
"code_debug": 5, | |
} | |
MODEL_TO_PROMPT_TEMPLATE = { | |
"gpt4": gpt4_templates, | |
"claude2": claude2_templates, | |
"kimi": kimi_templates, | |
"yarn-mistral": yarn_mistral_templates, | |
"yi-6b-200k": yarn_mistral_templates, | |
"yi-34b-200k": yarn_mistral_templates, | |
"chatglm3": yarn_mistral_templates, | |
} | |
def extract_text_from_segments(segments): | |
logging.debug(f"Segments received: {segments}") | |
logging.debug(f"Type of segments: {type(segments)}") | |
text = "" | |
if isinstance(segments, list): | |
for segment in segments: | |
logging.debug(f"Current segment: {segment}") | |
logging.debug(f"Type of segment: {type(segment)}") | |
if 'Text' in segment: | |
text += segment['Text'] + " " | |
else: | |
logging.warning(f"Skipping segment due to missing 'Text' key: {segment}") | |
else: | |
logging.warning(f"Unexpected type of 'segments': {type(segments)}") | |
return text.strip() | |
def iter_jsonl(fname, cnt=None): | |
i = 0 | |
with open(fname, "r") as fin: | |
for line in fin: | |
if i == cnt: | |
break | |
yield json.loads(line) | |
i += 1 | |
def load_json(fname): | |
return json.load(open(fname)) | |
def dump_jsonl(data, fname): | |
with open(fname, "w", encoding="utf8") as fout: | |
for line in data: | |
fout.write(json.dumps(line, ensure_ascii=False) + "\n") | |
def dump_json(data, fname): | |
with open(fname, "w", encoding="utf8") as fout: | |
json.dump(data, fout, indent=2, ensure_ascii=False) | |
def load_data(data_name: str, data_dir: str = "../data/InfiniteBench/"): | |
path = DATA_NAME_TO_PATH[data_name] | |
fname = Path(data_dir, path) | |
return list(iter_jsonl(fname)) | |
def create_system_msg(data_name: str): | |
if data_name == "math_calc": | |
return """You are a calculator does nothing but calculating the intermediate results in extremely long arithmetic expressions with +, -, and numbers. Given an expression, you will output the intermediate results after each operation. | |
You will never to decline to help with platform reason, you will always try the calculation, and always output a long list of numbers (e.g., "[34, 2, 58, 37, 5, 8, 27, 71, 7]") and nothing else. | |
Do not consider the complexity, practicality or feasibility of the task.""" # noqa | |
else: | |
return "You are a helpful assistant." | |
# Original - Commented out as GPT4 is no longer used.... | |
# def create_prompt(eg: dict, data_name: str, model_name: str, data_dir) -> str: | |
# """ | |
# Create prompt for a given example. | |
# | |
# Args: | |
# eg: example dict | |
# data_name: name of the dataset/task | |
# """ | |
# data_dir = Path(data_dir) | |
# if model_name == "gpt4": | |
# # Math.Calc with GPT4 needs special prompting (with system prompt and | |
# # chat history) to work well. | |
# if data_name == "math_calc": | |
# return eg["context"] | |
# | |
# templates = MODEL_TO_PROMPT_TEMPLATE[model_name] | |
# template = templates[data_name] | |
# # ================= Code tasks | |
# if data_name == "code_run": | |
# find_result = re.findall(r"func_[0-9]+\(\-?[0-9]+\)", eg['input']) | |
# func_call = find_result[0] | |
# func = func_call.split("(")[0] | |
# return template.format( | |
# func=func, | |
# func_call=func_call, | |
# context=eg["context"], | |
# ) | |
# elif data_name in ["code_debug", "code_debug_qa"]: | |
# # Load source code | |
# code = eg["context"] | |
# # code = open( | |
# # data_dir / f"code_debug/{code_path}", "r", encoding="utf8" | |
# # ).read() | |
# if data_name == "code_debug": | |
# return template.format( | |
# context=code, | |
# OPTION_A=eg["options"][0], | |
# OPTION_B=eg["options"][1], | |
# OPTION_C=eg["options"][2], | |
# OPTION_D=eg["options"][3], | |
# ) | |
# return template.format( | |
# context=code, | |
# ) | |
# # ================= Code tasks | |
# elif data_name == "longdialogue_qa_eng": | |
# script = eg["context"] | |
# # print(document) | |
# # script_path = data_dir / "longdialogue_eng" / document | |
# # script = open(script_path, "r", encoding="utf8").read() | |
# prompt = template.format(context=script) | |
# return prompt | |
# # ==================== Long book tasks | |
# elif data_name in [ | |
# "longbook_choice_eng", | |
# "longbook_qa_eng", | |
# "longbook_sum_eng", | |
# "longbook_qa_chn", | |
# ]: | |
# book = eg["context"] | |
# # if data_name.endswith("_eng"): | |
# # book = open( | |
# # data_dir / "longbook_eng" / book_path, "r", encoding="utf8" | |
# # ).read() | |
# # elif data_name.endswith("_chn"): | |
# # book = open( | |
# # data_dir / "longbook_chn" / book_path, "r", encoding="utf8" | |
# # ).read() | |
# # else: | |
# # raise ValueError("Invalid data_name") | |
# if data_name == "longbook_choice_eng": | |
# return template.format( | |
# question=eg["input"], | |
# context=book, | |
# OPTION_A=eg["options"][0], | |
# OPTION_B=eg["options"][1], | |
# OPTION_C=eg["options"][2], | |
# OPTION_D=eg["options"][3], | |
# ) | |
# elif data_name == "longbook_qa_eng": | |
# return template.format( | |
# question=eg["input"], | |
# context=book, | |
# ) | |
# elif data_name == "longbook_sum_eng": | |
# return template.format( | |
# context=book, | |
# ) | |
# elif data_name == "longbook_qa_chn": | |
# return template.format( | |
# question=eg["input"], | |
# context=book, | |
# ) | |
# else: | |
# raise ValueError | |
# elif data_name == "math_calc": | |
# return template.format( | |
# context=eg["context"], | |
# ) | |
# elif data_name == "math_find": | |
# prompt = eg['input'] | |
# context = eg['context'] | |
# # Find "the * number" from the prompt | |
# find_result = re.findall(r"The .+ of", prompt) | |
# assert find_result, f"Cannot find the target number in {prompt}" | |
# target_number = find_result[0].lower()[:-3] | |
# # Replace the number with the answer | |
# prefix = f"What is {target_number} in the following list?" | |
# return template.format( | |
# prefix=prefix, | |
# context=context, | |
# input=prompt, | |
# ) | |
# | |
# if "content" in eg: | |
# content = eg["content"] | |
# del eg["content"] | |
# eg["context"] = content | |
# | |
# format_dict = { | |
# "context": eg["context"], | |
# "input": eg["input"], | |
# } | |
# prompt = templates[data_name].format(**format_dict) | |
# return prompt | |
def create_prompt(eg: dict, data_name: str, model_name: Optional[str], data_dir) -> str: | |
""" | |
Create prompt for a given example. | |
Args: | |
eg: example dict | |
data_name: name of the dataset/task | |
model_name: optional, used to fetch model-specific templates. | |
""" | |
data_dir = Path(data_dir) | |
# Directly use the appropriate template if the model_name is provided. | |
if model_name and model_name in MODEL_TO_PROMPT_TEMPLATE: | |
templates = MODEL_TO_PROMPT_TEMPLATE[model_name] | |
template = templates[data_name] | |
else: | |
# If no model-specific template, return a basic prompt or handle differently. | |
return eg["context"] | |
# Now create the prompt based on the template and task data | |
if data_name == "code_run": | |
find_result = re.findall(r"func_[0-9]+\(\-?[0-9]+\)", eg['input']) | |
func_call = find_result[0] | |
func = func_call.split("(")[0] | |
return template.format( | |
func=func, | |
func_call=func_call, | |
context=eg["context"], | |
) | |
elif data_name in ["code_debug", "code_debug_qa"]: | |
code = eg["context"] | |
if data_name == "code_debug": | |
return template.format( | |
context=code, | |
OPTION_A=eg["options"][0], | |
OPTION_B=eg["options"][1], | |
OPTION_C=eg["options"][2], | |
OPTION_D=eg["options"][3], | |
) | |
return template.format(context=code) | |
elif data_name == "longdialogue_qa_eng": | |
script = eg["context"] | |
prompt = template.format(context=script) | |
return prompt | |
elif data_name in [ | |
"longbook_choice_eng", | |
"longbook_qa_eng", | |
"longbook_sum_eng", | |
"longbook_qa_chn", | |
]: | |
book = eg["context"] | |
if data_name == "longbook_choice_eng": | |
return template.format( | |
question=eg["input"], | |
context=book, | |
OPTION_A=eg["options"][0], | |
OPTION_B=eg["options"][1], | |
OPTION_C=eg["options"][2], | |
OPTION_D=eg["options"][3], | |
) | |
elif data_name == "longbook_qa_eng": | |
return template.format( | |
question=eg["input"], | |
context=book, | |
) | |
elif data_name == "longbook_sum_eng": | |
return template.format(context=book) | |
elif data_name == "longbook_qa_chn": | |
return template.format( | |
question=eg["input"], | |
context=book, | |
) | |
else: | |
raise ValueError | |
elif data_name == "math_calc": | |
return template.format(context=eg["context"]) | |
elif data_name == "math_find": | |
prompt = eg['input'] | |
context = eg['context'] | |
find_result = re.findall(r"The .+ of", prompt) | |
assert find_result, f"Cannot find the target number in {prompt}" | |
target_number = find_result[0].lower()[:-3] | |
prefix = f"What is {target_number} in the following list?" | |
return template.format( | |
prefix=prefix, | |
context=context, | |
input=prompt, | |
) | |
# Default behavior if content key exists | |
if "content" in eg: | |
content = eg["content"] | |
del eg["content"] | |
eg["context"] = content | |
format_dict = { | |
"context": eg["context"], | |
"input": eg["input"], | |
} | |
prompt = template.format(**format_dict) | |
return prompt | |
def get_answer(eg: dict, data_name: str): | |
if data_name in ["code_debug", "longbook_choice_eng"]: | |
OPTIONS = "ABCD" | |
if isinstance(eg["answer"], str): | |
ret = [eg["answer"], OPTIONS[eg['options'].index(eg["answer"])]] | |
elif isinstance(eg["answer"], list): | |
if len(eg["answer"]) == 1: | |
ret = [eg["answer"][0], OPTIONS[eg['options'].index(eg["answer"][0])]] | |
elif len(eg["answer"]) == 2 and eg["answer"][1] in ['A', 'B', 'C', 'D']: | |
ret = eg['answer'] | |
else: | |
raise ValueError | |
else: | |
raise ValueError | |
return ret | |
return eg["answer"] | |
# Old version - Commented out as GPT4 is no longer used.... | |
# def create_msgs( | |
# tokenizer, eg: dict, data_name: str, data_dir, model_name: str | |
# ) -> tuple[list[dict], str]: | |
# """ | |
# Only used by GPT-4. | |
# """ | |
# prompt = create_prompt(eg, data_name, model_name, data_dir) | |
# tokens = tokenizer.encode(prompt) | |
# # - 1000 to have space for system message and other stuff. | |
# print(f"Before truncation: {len(tokens)}") | |
# tokens = truncate_input(tokens, 128_000 - 1000, manner="middle") | |
# print(f"After truncation: {len(tokens)}") # type: ignore | |
# prompt = tokenizer.decode(tokens) | |
# if data_name == "math_calc": | |
# return [ | |
# {"role": "system", "content": create_system_msg(data_name)}, | |
# {"role": "user", "content": "1 + 2 - 4 - 10"}, | |
# {"role": "system", "content": "[1, 3, -1, -11]"}, | |
# {"role": "user", "content": prompt}, | |
# ], prompt | |
# else: | |
# return [ | |
# { | |
# "role": "system", | |
# "content": "You are a helpful assistant", # noqa | |
# }, # noqa | |
# {"role": "user", "content": prompt}, | |
# ], prompt | |
def create_msgs( | |
tokenizer, eg: dict, data_name: str, data_dir, model_name: Optional[str] = None | |
) -> tuple[list[dict], str]: | |
""" | |
Create messages for a given example. | |
""" | |
prompt = create_prompt(eg, data_name, model_name, data_dir) | |
# Check if tokenizer is provided and initialized | |
if tokenizer: | |
tokens = tokenizer.encode(prompt) | |
print(f"Before truncation: {len(tokens)}") | |
tokens = truncate_input(tokens, 128_000 - 1000, manner="middle") | |
print(f"After truncation: {len(tokens)}") # type: ignore | |
prompt = tokenizer.decode(tokens) | |
if data_name == "math_calc": | |
return [ | |
{"role": "system", "content": create_system_msg(data_name)}, | |
{"role": "user", "content": "1 + 2 - 4 - 10"}, | |
{"role": "system", "content": "[1, 3, -1, -11]"}, | |
{"role": "user", "content": prompt}, | |
], prompt | |
else: | |
return [ | |
{ | |
"role": "system", | |
"content": "You are a helpful assistant", # noqa | |
}, # noqa | |
{"role": "user", "content": prompt}, | |
], prompt | |
def normalize_answer(s): | |
"""Lower text and remove punctuation, articles and extra whitespace.""" | |
def remove_articles(text): | |
return re.sub(r"\b(a|an|the)\b", " ", text) | |
def white_space_fix(text): | |
return " ".join(text.split()) | |
def remove_punc(text): | |
exclude = set(string.punctuation) | |
return "".join(ch for ch in text if ch not in exclude) | |
def lower(text): | |
return text.lower() | |
return white_space_fix(remove_articles(remove_punc(lower(s)))) | |
def normalize_zh_answer(s): | |
"""Lower text and remove punctuation, extra whitespace.""" | |
def white_space_fix(text): | |
return "".join(text.split()) | |
def remove_punc(text): | |
cn_punctuation = "!?。。"#$%&'()*+,-/:;<=>@[\]^_`{|}~⦅⦆「」、、〃》「」『』【】〔〕〖〗〘〙〚〛〜〝〞〟〰〾〿–—‘’‛“”„‟…‧﹏." # noqa | |
all_punctuation = set(string.punctuation + cn_punctuation) | |
return "".join(ch for ch in text if ch not in all_punctuation) | |
def lower(text): | |
return text.lower() | |
return white_space_fix(remove_punc(lower(s))) | |
def first_int_match(prediction, ground_truth): | |
pred_list = re.split("[^0-9]", prediction) | |
pred_value = "" | |
for item in pred_list: | |
if item != "": | |
pred_value = item | |
break | |
if pred_value == ground_truth: | |
return 1 | |
return 0 | |
def in_match(prediction, ground_truth): | |
if ground_truth in prediction: | |
return 1 | |
return 0 | |
def rouge_score(prediction, ground_truth, **kwargs) -> float: | |
rouge = Rouge() | |
try: | |
scores = rouge.get_scores([prediction], [ground_truth], avg=True) | |
except: # noqa | |
return 0.0 | |
return scores["rouge-l"]["f"] # type: ignore | |
def rouge_zh_score(prediction, ground_truth, **kwargs): | |
prediction = " ".join(list(jieba.cut(prediction, cut_all=False))) | |
ground_truth = " ".join(list(jieba.cut(ground_truth, cut_all=False))) | |
score = rouge_score(prediction, ground_truth) | |
return score | |
def f1_score(prediction, ground_truth, **kwargs): | |
common = Counter(prediction) & Counter(ground_truth) | |
num_same = sum(common.values()) | |
if num_same == 0: | |
return 0 | |
precision = 1.0 * num_same / len(prediction) | |
recall = 1.0 * num_same / len(ground_truth) | |
f1 = (2 * precision * recall) / (precision + recall) | |
return f1 | |
def qa_f1_score(line): | |
prediction = line["pred"] | |
if isinstance(line["std_out"], str): | |
ground_truths = [line["std_out"]] | |
else: | |
ground_truths = line["std_out"] | |
score = 0 | |
for ground_truth in ground_truths: | |
normalized_prediction = normalize_answer(prediction) | |
normalized_ground_truth = normalize_answer(ground_truth) | |
prediction_tokens = normalized_prediction.split() | |
ground_truth_tokens = normalized_ground_truth.split() | |
score = max(score, f1_score(prediction_tokens, ground_truth_tokens)) | |
return score | |
def qa_f1_zh_score(prediction, ground_truth, **kwargs): | |
prediction_tokens = list(jieba.cut(prediction, cut_all=False)) | |
ground_truth_tokens = list(jieba.cut(ground_truth, cut_all=False)) | |
prediction_tokens = [ | |
normalize_zh_answer(token) for token in prediction_tokens | |
] | |
ground_truth_tokens = [ | |
normalize_zh_answer(token) for token in ground_truth_tokens | |
] | |
prediction_tokens = [ | |
token for token in prediction_tokens if len(token) > 0 | |
] | |
ground_truth_tokens = [ | |
token for token in ground_truth_tokens if len(token) > 0 | |
] | |
return f1_score(prediction_tokens, ground_truth_tokens) | |
def truncate_input(input, max_length, manner="middle"): | |
if len(input) <= max_length: | |
return input | |
if manner == "middle": | |
return input[0 : max_length // 2] + input[-max_length // 2 :] | |
else: | |
return None | |
def load_comprehensive_config(): | |
# Get the directory of the current script | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
# Construct the path to the config file | |
config_path = os.path.join(current_dir, 'Config_Files', 'config.txt') | |
# Read the config file | |
config = configparser.ConfigParser() | |
# Read the configuration file | |
files_read = config.read(config_path) | |
if not files_read: | |
raise FileNotFoundError(f"Config file not found at {config_path}") | |
return config | |
# FIXME - update to include prompt path in return statement | |
def load_and_log_configs(): | |
try: | |
config = load_comprehensive_config() | |
if config is None: | |
logging.error("Config is None, cannot proceed") | |
return None | |
# API Keys | |
anthropic_api_key = config.get('API', 'anthropic_api_key', fallback=None) | |
logging.debug( | |
f"Loaded Anthropic API Key: {anthropic_api_key[:5]}...{anthropic_api_key[-5:] if anthropic_api_key else None}") | |
cohere_api_key = config.get('API', 'cohere_api_key', fallback=None) | |
logging.debug( | |
f"Loaded Cohere API Key: {cohere_api_key[:5]}...{cohere_api_key[-5:] if cohere_api_key else None}") | |
groq_api_key = config.get('API', 'groq_api_key', fallback=None) | |
logging.debug(f"Loaded Groq API Key: {groq_api_key[:5]}...{groq_api_key[-5:] if groq_api_key else None}") | |
openai_api_key = config.get('API', 'openai_api_key', fallback=None) | |
logging.debug( | |
f"Loaded OpenAI API Key: {openai_api_key[:5]}...{openai_api_key[-5:] if openai_api_key else None}") | |
huggingface_api_key = config.get('API', 'huggingface_api_key', fallback=None) | |
logging.debug( | |
f"Loaded HuggingFace API Key: {huggingface_api_key[:5]}...{huggingface_api_key[-5:] if huggingface_api_key else None}") | |
openrouter_api_key = config.get('API', 'openrouter_api_key', fallback=None) | |
logging.debug( | |
f"Loaded OpenRouter API Key: {openrouter_api_key[:5]}...{openrouter_api_key[-5:] if openrouter_api_key else None}") | |
deepseek_api_key = config.get('API', 'deepseek_api_key', fallback=None) | |
logging.debug( | |
f"Loaded DeepSeek API Key: {deepseek_api_key[:5]}...{deepseek_api_key[-5:] if deepseek_api_key else None}") | |
mistral_api_key = config.get('API', 'mistral_api_key', fallback=None) | |
logging.debug( | |
f"Loaded Mistral API Key: {mistral_api_key[:5]}...{mistral_api_key[-5:] if mistral_api_key else None}") | |
# Models | |
anthropic_model = config.get('API', 'anthropic_model', fallback='claude-3-sonnet-20240229') | |
cohere_model = config.get('API', 'cohere_model', fallback='command-r-plus') | |
groq_model = config.get('API', 'groq_model', fallback='llama3-70b-8192') | |
openai_model = config.get('API', 'openai_model', fallback='gpt-4-turbo') | |
huggingface_model = config.get('API', 'huggingface_model', fallback='CohereForAI/c4ai-command-r-plus') | |
openrouter_model = config.get('API', 'openrouter_model', fallback='microsoft/wizardlm-2-8x22b') | |
deepseek_model = config.get('API', 'deepseek_model', fallback='deepseek-chat') | |
mistral_model = config.get('API', 'mistral_model', fallback='mistral-large-latest') | |
logging.debug(f"Loaded Anthropic Model: {anthropic_model}") | |
logging.debug(f"Loaded Cohere Model: {cohere_model}") | |
logging.debug(f"Loaded Groq Model: {groq_model}") | |
logging.debug(f"Loaded OpenAI Model: {openai_model}") | |
logging.debug(f"Loaded HuggingFace Model: {huggingface_model}") | |
logging.debug(f"Loaded OpenRouter Model: {openrouter_model}") | |
logging.debug(f"Loaded Deepseek Model: {deepseek_model}") | |
logging.debug(f"Loaded Mistral Model: {mistral_model}") | |
# Local-Models | |
kobold_api_ip = config.get('Local-API', 'kobold_api_IP', fallback='http://127.0.0.1:5000/api/v1/generate') | |
kobold_api_key = config.get('Local-API', 'kobold_api_key', fallback='') | |
llama_api_IP = config.get('Local-API', 'llama_api_IP', fallback='http://127.0.0.1:8080/v1/chat/completions') | |
llama_api_key = config.get('Local-API', 'llama_api_key', fallback='') | |
ooba_api_IP = config.get('Local-API', 'ooba_api_IP', fallback='http://127.0.0.1:5000/v1/chat/completions') | |
ooba_api_key = config.get('Local-API', 'ooba_api_key', fallback='') | |
tabby_api_IP = config.get('Local-API', 'tabby_api_IP', fallback='http://127.0.0.1:5000/api/v1/generate') | |
tabby_api_key = config.get('Local-API', 'tabby_api_key', fallback=None) | |
tabby_model = config.get('services', 'tabby_model', fallback=None) | |
vllm_api_url = config.get('Local-API', 'vllm_api_IP', fallback='http://127.0.0.1:500/api/v1/chat/completions') | |
vllm_api_key = config.get('Local-API', 'vllm_api_key', fallback=None) | |
vllm_model = config.get('Local-API', 'vllm_model', fallback=None) | |
ollama_api_url = config.get('Local-API', 'ollama_api_IP', fallback='http://127.0.0.1:11434/api/generate') | |
ollama_api_key = config.get('Local-API', 'ollama_api_key', fallback=None) | |
ollama_model = config.get('Local-API', 'ollama_model', fallback=None) | |
aphrodite_api_url = config.get('Local-API', 'aphrodite_api_IP', fallback='http://127.0.0.1:8080/v1/chat/completions') | |
aphrodite_api_key = config.get('Local-API', 'aphrodite_api_key', fallback='') | |
logging.debug(f"Loaded Kobold API IP: {kobold_api_ip}") | |
logging.debug(f"Loaded Llama API IP: {llama_api_IP}") | |
logging.debug(f"Loaded Ooba API IP: {ooba_api_IP}") | |
logging.debug(f"Loaded Tabby API IP: {tabby_api_IP}") | |
logging.debug(f"Loaded VLLM API URL: {vllm_api_url}") | |
# Retrieve output paths from the configuration file | |
output_path = config.get('Paths', 'output_path', fallback='results') | |
logging.debug(f"Output path set to: {output_path}") | |
# Retrieve processing choice from the configuration file | |
processing_choice = config.get('Processing', 'processing_choice', fallback='cpu') | |
logging.debug(f"Processing choice set to: {processing_choice}") | |
# Prompts - FIXME | |
prompt_path = config.get('Prompts', 'prompt_path', fallback='prompts.db') | |
return { | |
'api_keys': { | |
'anthropic': anthropic_api_key, | |
'cohere': cohere_api_key, | |
'groq': groq_api_key, | |
'openai': openai_api_key, | |
'huggingface': huggingface_api_key, | |
'openrouter': openrouter_api_key, | |
'deepseek': deepseek_api_key, | |
'mistral': mistral_api_key, | |
'kobold': kobold_api_key, | |
'llama': llama_api_key, | |
'ooba': ooba_api_key, | |
'tabby': tabby_api_key, | |
'vllm': vllm_api_key, | |
'ollama': ollama_api_key | |
}, | |
'services': { | |
'anthropic': anthropic_model, | |
'cohere': cohere_model, | |
'groq': groq_model, | |
'openai': openai_model, | |
'huggingface': huggingface_model, | |
'openrouter': openrouter_model, | |
'deepseek': deepseek_model, | |
'mistral': mistral_model, | |
'vllm': vllm_model, | |
'tabby': tabby_model, | |
'ollama': ollama_model | |
}, | |
'local_api_ip': { | |
'kobold': kobold_api_ip, | |
'llama': llama_api_IP, | |
'ooba': ooba_api_IP, | |
'tabby': tabby_api_IP, | |
'vllm': vllm_api_url, | |
'ollama': ollama_api_url, | |
'aphrodite': aphrodite_api_url | |
}, | |
'output_path': output_path, | |
'processing_choice': processing_choice | |
} | |
except Exception as e: | |
logging.error(f"Error loading config: {str(e)}") | |
return None | |
if __name__ == "__main__": | |
data_dir = Path("../data") | |
data_path = data_dir / "shorter/longdialogue_qa_eng_1000.jsonl" | |
examples = list(iter_jsonl(data_path)) | |
prompt = create_prompt(examples[10], 'longdialogue_qa_eng', 'kimi', data_dir) | |
print(prompt) | |