Spaces:
Running
Running
import numpy as np | |
import tiktoken | |
from typing import List, Tuple | |
from sklearn.metrics.pairwise import cosine_similarity | |
from .utils.execute_code import extract_and_run_python_code | |
from .utils.extractor import extract_answer, extract_cheatsheet | |
from litellm import completion | |
from functools import partial | |
import litellm | |
import os # Added for SAMBANOVA env vars | |
litellm._turn_on_debug() | |
class LanguageModel: | |
def __init__(self, | |
model_name: str, | |
) -> None: | |
""" | |
LanguageModel class to interact with different language models. | |
Arguments: | |
model_name : str : The name of the language model to use. | |
Raises: | |
ValueError : If the model name is not found or supported. | |
""" | |
self.model_name = model_name | |
# Known model list (remains the same) | |
known_model_list = [ | |
"openai/gpt-4o-mini", "openai/gpt-4o-mini-2024-07-18", | |
"openai/gpt-4o", "openai/gpt-4o-2024-08-06", "openai/gpt-4o-2024-11-20", | |
"openai/gpt-3.5-turbo", | |
"together_ai/meta-llama/Llama-3.3-70B-Instruct-Turbo", | |
"meta-llama/Llama-3.3-70B-Instruct-Turbo", | |
"openai/o3-mini", "openai/o3-mini-2025-01-31", | |
"openai/o1", "openai/o1-2024-12-17", | |
"anthropic/claude-3-5-sonnet-latest", "anthropic/claude-3-5-sonnet-20241022", | |
"anthropic/claude-3-5-haiku-latest", "anthropic/claude-3-5-haiku-20241022", | |
"anthropic/claude-3-7-sonnet-latest", "anthropic/claude-3-7-sonnet-20250219", | |
"together_ai/meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", | |
"together_ai/deepseek-ai/DeepSeek-R1", | |
"together_ai/deepseek-ai/DeepSeek-R1-Distill-Llama-70B", | |
"together_ai/deepseek-ai/DeepSeek-R1-Distill-Qwen-14B", | |
"together_ai/Qwen/Qwen2.5-Coder-32B-Instruct", | |
"together_ai/Qwen/QwQ-32B", | |
"together_ai/Qwen/Qwen2-72B-Instruct", | |
"together_ai/Qwen/Qwen2.5-7B-Instruct-Turbo", | |
"together_ai/Qwen/Qwen2.5-72B-Instruct-Turbo", | |
"gemini/gemini-2.0-flash", | |
"ollama/llama3:70b", | |
] | |
# Load the client for the model based on the model name | |
if self.model_name.startswith("sambanova/"): | |
samba_api_key = os.environ.get("SAMBANOVA_API_KEY") | |
samba_base_url = os.environ.get("SAMBANOVA_BASE_URL", "https://api.sambanova.ai/v1") # Default if not set | |
if not samba_api_key: | |
raise ValueError("SAMBANOVA_API_KEY environment variable not set for SambaNova model.") | |
# For SambaNova (OpenAI compatible), explicitly pass api_key and api_base | |
# The model name for litellm should be just the model identifier, not the full "samba/" prefix if api_base is provided. | |
# However, litellm docs suggest that for OpenAI compatible endpoints, the model name passed to `completion` | |
# should be what the endpoint expects. The `model` param in `partial` here is the one sent in the request body. | |
# The `custom_llm_provider` in litellm is another way, but direct params are simpler for OpenAI compatibility. | |
# Let's try keeping the model name as is (e.g. "samba/DeepSeek-R1-Distill-Llama-70B") | |
# and provide api_key and api_base. LiteLLM should use these for any model if provided. | |
# If this doesn't work, the model name might need to be stripped of "samba/" if api_base is set. | |
# According to LiteLLM docs, for custom OpenAI-compatible endpoints, you can pass `base_url` and `api_key`. | |
# The `model` parameter to `litellm.completion` will be the actual model ID the endpoint expects. | |
# The `self.model_name` here is e.g. "samba/DeepSeek-R1-Distill-Llama-70B". | |
# We need to ensure the `model` argument to `completion` is just "DeepSeek-R1-Distill-Llama-70B" | |
# and set `custom_llm_provider="openai"` along with `api_base` and `api_key`. | |
# Or, if SambaNova is a recognized provider by a different name in litellm, use that. | |
# Given the error, litellm is not recognizing "samba/" as a provider directly. | |
# The simp actual_model_name = self.model_name.split("samba/", 1)[1] if "samba/" in self.model_name else self.model_name | |
actual_model_name = self.model_name.split("sambanova/", 1)[1] if "sambanova/" in self.model_name else self.model_name | |
self.client = partial(completion, | |
model=actual_model_name, | |
api_key=samba_api_key, | |
api_base=samba_base_url, | |
custom_llm_provider="openai" | |
) | |
print(f"Initialized SambaNova model '{actual_model_name}' via custom OpenAI provider settings with api_base: {samba_base_url}") | |
elif self.model_name in known_model_list: | |
self.client = partial(completion, model=self.model_name) | |
else: | |
print(f"Warning: Model '{self.model_name}' not in explicit list and does not start with recognized prefixes. Attempting to initialize with litellm directly.") | |
try: | |
self.client = partial(completion, model=self.model_name) | |
print(f"Successfully initialized model '{self.model_name}' via litellm fallback.") | |
except Exception as e: raise ValueError(f"Model '{self.model_name}' is not in the known list, does not start with recognized prefixes, and could not be initialized by litellm directly: {{e}}") | |
self.gpt4Tokenizer = tiktoken.encoding_for_model("gpt-4o") | |
def count_tokens(self, text: str) -> int: | |
""" | |
Count the number of tokens in the text. | |
""" | |
tokens = self.gpt4Tokenizer.encode(text) | |
return len(tokens) | |
def generate(self, | |
history: List[str], | |
temperature: float = 0.1, | |
max_tokens: int = 2048, | |
current_depth: int = 1, | |
max_depth_num_rounds: int = 3, | |
allow_code_execution: bool = True, | |
code_execution_flag: str = "EXECUTE CODE!", | |
final_output: str = "" | |
) -> str: | |
""" | |
Generate a response from the language model. | |
""" | |
if len(history) == 0: | |
raise ValueError("History must contain at least one message.") | |
print('history\n', history) | |
# from litellm import num_tokens_from_messages | |
# tokens_num = num_tokens_from_messages( | |
# messages=history, | |
# model=self.model_name | |
# ) | |
try: | |
token_count = litellm.token_counter(model_name=self.model_name, messages=history) | |
print(f"DEBUG: litellm token_counter for '{model_name}' estimates: {token_count} tokens") | |
except Exception as e: | |
print(f"DEBUG: Error using litellm.token_counter: {e}") | |
# The self.client is already a partial function with model, api_key, base_url, etc., pre-filled for SambaNova | |
response = self.client( | |
messages=history, | |
# model=self.model_name, # This is now part of the partial self.client for SambaNova | |
temperature=temperature, | |
max_tokens=max_tokens, # litellm uses max_tokens or max_completion_tokens | |
) | |
output = response.choices[0].message.content # Corrected access to content | |
print('output\n', output) | |
pre_code_execution_flag = output.split(code_execution_flag)[0].strip() | |
if allow_code_execution and code_execution_flag in output and pre_code_execution_flag.endswith("```"): | |
output_prefix = output.split(code_execution_flag)[0].strip() | |
executed_code = extract_and_run_python_code(output_prefix) | |
executed_code = executed_code.strip() | |
current_output = f"{output_prefix}\n{code_execution_flag}\n\n{executed_code}" | |
final_output = f"{final_output}\n\n{current_output}".strip() | |
if current_depth <= max_depth_num_rounds: | |
warning_txt = "" | |
if current_depth == max_depth_num_rounds: | |
warning_txt = f" (This is the last round. No more code execution will be allowed. Please present your final solution now.)" | |
new_messages = [ | |
{"role": "assistant", "content": current_output}, | |
{"role": "user", "content": f"Proceed with any additional steps required and provide the completed solution. If everything is already complete, type FINAL ANSWER and submit it in the expected format. If you are stuck, please try alternative methods to solve the problem and provide the final solution.{warning_txt}"} | |
] | |
history += new_messages | |
return self.generate( | |
history=history, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
current_depth=current_depth+1, | |
max_depth_num_rounds=max_depth_num_rounds, | |
allow_code_execution=allow_code_execution, | |
code_execution_flag=code_execution_flag, | |
final_output=final_output, | |
) | |
else: | |
return final_output | |
else: | |
final_output = f"{final_output}\n\n{output}".strip() | |
return final_output | |
def advanced_generate(self, | |
approach_name: str, | |
input_txt: str, | |
cheatsheet: str = None, | |
generator_template: str = None, | |
cheatsheet_template: str = None, | |
temperature: float = 0.0, | |
max_tokens: int = 2048, | |
max_num_rounds: int = 1, | |
allow_code_execution: bool = True, | |
code_execution_flag: str = "EXECUTE CODE!", | |
add_previous_answers_to_cheatsheet: bool = True, | |
original_input_corpus: List[str] = None, | |
original_input_embeddings: np.ndarray = None, | |
generator_outputs_so_far: List[str] = None, | |
retrieve_top_k: int = 3, | |
) -> dict: | |
""" | |
Generate a response from the language model. | |
Returns dict instead of Tuple for clarity. | |
""" | |
if approach_name == "default": | |
generator_prompt = generator_template.replace("[[QUESTION]]", input_txt).replace("[[CHEATSHEET]]", "(empty)") | |
generator_history = [ | |
{"role": "user", "content": generator_prompt}, | |
] | |
generator_output = self.generate( | |
history=generator_history, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
allow_code_execution=allow_code_execution, | |
code_execution_flag=code_execution_flag, | |
) | |
generator_answer = extract_answer(generator_output) | |
return { | |
"input_txt": input_txt, | |
"steps": [ | |
{ | |
"round": 0, | |
"generator_prompt": generator_prompt, | |
"generator_output": generator_output, | |
"generator_answer": generator_answer, | |
"current_cheatsheet": None, | |
"new_cheatsheet": None, | |
} | |
], | |
"previous_answers": None, | |
"final_answer": generator_answer, | |
"final_output": generator_output, | |
"final_cheatsheet": None, | |
} | |
elif approach_name == "DynamicCheatsheet_Cumulative": | |
if cheatsheet is None: | |
raise ValueError("Cheatsheet must be provided for DynamicCheatsheet_Cumulative approach.") | |
if generator_template is None or cheatsheet_template is None: | |
raise ValueError("Generator and Cheatsheet templates must be provided for DynamicCheatsheet_Cumulative approach.") | |
steps = [] | |
previous_answers = [] | |
current_cheatsheet_in_round = cheatsheet # Use a local var for the loop | |
for round_num in range(max(1, max_num_rounds)): | |
generator_cheatsheet_content = current_cheatsheet_in_round | |
if round_num > 0 and add_previous_answers_to_cheatsheet and previous_answers: | |
previous_answers_txt = f"PREVIOUS ANSWERS:\n{'; '.join(previous_answers)}" | |
generator_cheatsheet_content = f"{generator_cheatsheet_content}\n\n{previous_answers_txt}" | |
generator_prompt = generator_template.replace("[[QUESTION]]", input_txt).replace("[[CHEATSHEET]]", generator_cheatsheet_content) | |
generator_history = [{"role": "user", "content": generator_prompt}] | |
generator_output = self.generate( | |
history=generator_history, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
allow_code_execution=allow_code_execution, | |
code_execution_flag=code_execution_flag, | |
) | |
generator_answer = extract_answer(generator_output) | |
cheatsheet_prompt = cheatsheet_template.replace("[[QUESTION]]", input_txt).replace("[[MODEL_ANSWER]]", generator_output).replace("[[PREVIOUS_CHEATSHEET]]", current_cheatsheet_in_round) | |
cheatsheet_history = [{"role": "user", "content": cheatsheet_prompt}] | |
# Pass explicit provider details for curator model if it's also SambaNova | |
# Assuming curator uses the same model instance for now. | |
cheatsheet_model_output = self.generate( | |
history=cheatsheet_history, | |
temperature=temperature, | |
max_tokens=2*max_tokens, # As per original | |
allow_code_execution=False, | |
) | |
new_cheatsheet = extract_cheatsheet(response=cheatsheet_model_output, old_cheatsheet=current_cheatsheet_in_round) | |
steps.append({ | |
"round": round_num, | |
"generator_prompt": generator_prompt, | |
"generator_output": generator_output, | |
"generator_answer": generator_answer, | |
"current_cheatsheet": current_cheatsheet_in_round, | |
"new_cheatsheet": new_cheatsheet, | |
}) | |
current_cheatsheet_in_round = new_cheatsheet # Update for next potential round | |
if generator_answer: | |
previous_answers.append(f"Round {round_num+1}: {generator_answer}") | |
print("input_txt", input_txt) | |
print("steps", steps) | |
print("previous_answers", previous_answers) | |
print("final_answer", generator_answer) | |
print("final_cheatsheet", current_cheatsheet_in_round) | |
print("final_output", generator_output) | |
return { | |
"input_txt": input_txt, | |
"steps": steps, | |
"previous_answers": previous_answers, | |
"final_answer": generator_answer, # Answer from the last round | |
"final_cheatsheet": current_cheatsheet_in_round, # Cheatsheet from the last round | |
"final_output": generator_output, # Full output from the last generator call | |
} | |
elif approach_name == "FullHistoryAppending": | |
length_of_history = len(generator_outputs_so_far) if generator_outputs_so_far else 0 | |
curated_cheatsheet = "(empty)" | |
if length_of_history > 0 and original_input_corpus and generator_outputs_so_far: | |
curated_cheatsheet = "### PREVIOUS SOLUTIONS (START)\n\n" | |
for i, (prev_input, prev_output) in enumerate(zip(original_input_corpus[:length_of_history], generator_outputs_so_far[:length_of_history])): | |
curated_cheatsheet += f"#### Previous Input #{i+1}:\n\n{prev_input}\n\n#### Model Solution to Previous Input #{i+1}:\n\n{prev_output}\n---\n---\n\n" | |
curated_cheatsheet += "#### PREVIOUS SOLUTIONS (END)" | |
generator_prompt = generator_template.replace("[[QUESTION]]", input_txt).replace("[[CHEATSHEET]]", curated_cheatsheet) | |
generator_history = [{"role": "user", "content": generator_prompt}] | |
generator_output = self.generate( | |
history=generator_history, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
allow_code_execution=allow_code_execution, | |
code_execution_flag=code_execution_flag, | |
) | |
generator_answer = extract_answer(generator_output) | |
return { | |
"input_txt": input_txt, | |
"steps": [], | |
"previous_answers": [], | |
"final_answer": generator_answer, | |
"final_cheatsheet": curated_cheatsheet, | |
"final_output": generator_output, | |
} | |
else: | |
raise ValueError(f"Unknown approach_name: {approach_name}") | |