SuperExpert / models /llms.py
JarvisChan630's picture
git
75d4aef
import requests
import time
import json
import os
import logging
from typing import List, Dict
from utils.logging import log_function, setup_logging
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, wait_exponential
from config.load_configs import load_config
from dotenv import load_dotenv
from requests.packages.urllib3.util.retry import Retry
load_dotenv()
setup_logging(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class BaseModel:
def __init__(self, temperature: float, model: str, json_response: bool, max_retries: int = 3, retry_delay: int = 1):
self.temperature = temperature
self.model = model
self.json_response = json_response
self.max_retries = max_retries
self.retry_delay = retry_delay
# @retry(stop=stop_after_attempt(3), wait=wait_fixed(1), retry=retry_if_exception_type(requests.RequestException))
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=15), reraise=True)
def _make_request(self, url, headers, payload):
response = requests.post(url, headers=headers, data=json.dumps(payload))
response.raise_for_status()
return response.json()
class MistralModel(BaseModel):
def __init__(self, temperature: float, model: str, json_response: bool, max_retries: int = 3, retry_delay: int = 1):
super().__init__(temperature, model, json_response, max_retries, retry_delay)
# config_path = os.path.join(os.path.dirname(__file__), '..', 'config', 'config.yaml')
# load_config(config_path)
# load_config()
self.api_key = os.environ.get("MISTRAL_API_KEY")
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': f'Bearer {self.api_key}'
}
self.model_endpoint = "https://api.mistral.ai/v1/chat/completions"
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1), retry=retry_if_exception_type(requests.RequestException))
def _make_request(self, url, headers, payload):
response = requests.post(url, headers=headers, data=json.dumps(payload))
response.raise_for_status()
return response.json()
def invoke(self, messages: List[Dict[str, str]]) -> str:
system = messages[0]["content"]
user = messages[1]["content"]
payload = {
"model": self.model,
"messages": [
{
"role": "system",
"content": system
},
{
"role": "user",
"content": user
}
],
"temperature": self.temperature,
}
if self.json_response:
payload["response_format"] = {"type": "json_object"}
try:
request_response_json = self._make_request(self.model_endpoint, self.headers, payload)
if 'choices' not in request_response_json or len(request_response_json['choices']) == 0:
raise ValueError("No choices in response")
response_content = request_response_json['choices'][0]['message']['content']
if self.json_response:
response = json.dumps(json.loads(response_content))
else:
response = response_content
return response
except requests.RequestException as e:
return json.dumps({"error": f"Error in invoking model after {self.max_retries} retries: {str(e)}"})
except (ValueError, KeyError, json.JSONDecodeError) as e:
return json.dumps({"error": f"Error processing response: {str(e)}"})
class ClaudeModel(BaseModel):
def __init__(self, temperature: float, model: str, json_response: bool, max_retries: int = 3, retry_delay: int = 1):
super().__init__(temperature, model, json_response, max_retries, retry_delay)
# config_path = os.path.join(os.path.dirname(__file__), '..', 'config', 'config.yaml')
# load_config(config_path)
self.api_key = os.environ.get("ANTHROPIC_API_KEY")
self.headers = {
'Content-Type': 'application/json',
'x-api-key': self.api_key,
'anthropic-version': '2023-06-01'
}
self.model_endpoint = "https://api.302.ai/v1/chat/completions"
def invoke(self, messages: List[Dict[str, str]]) -> str:
# time.sleep(5)
system = messages[0]["content"]
user = messages[1]["content"]
content = f"system:{system}\n\n user:{user}"
if self.json_response:
content += ". Your output must be json formatted. Just return the specified json format, do not prepend your response with anything."
payload = {
"model": self.model,
"messages": [
{
"role": "user",
"content": content
}
],
"max_tokens": 4096,
"temperature": self.temperature,
}
try:
request_response_json = self._make_request(self.model_endpoint, self.headers, payload)
if 'content' not in request_response_json or not request_response_json['content']:
raise ValueError("No content in response")
response_content = request_response_json['content'][0]['text']
if self.json_response:
response = json.dumps(json.loads(response_content))
else:
response = response_content
return response
except requests.RequestException as e:
return json.dumps({"error": f"Error in invoking model after {self.max_retries} retries: {str(e)}"})
except (ValueError, KeyError, json.JSONDecodeError) as e:
return json.dumps({"error": f"Error processing response: {str(e)}"})
class GeminiModel(BaseModel):
def __init__(self, temperature: float, model: str, json_response: bool, max_retries: int = 3, retry_delay: int = 1):
super().__init__(temperature, model, json_response, max_retries, retry_delay)
# config_path = os.path.join(os.path.dirname(__file__), '..', 'config', 'config.yaml')
# load_config(config_path)
self.api_key = os.environ.get("GEMINI_API_KEY")
self.headers = {
'Content-Type': 'application/json'
}
self.model_endpoint = f"https://generativelanguage.googleapis.com/v1/models/{model}:generateContent?key={self.api_key}"
def invoke(self, messages: List[Dict[str, str]]) -> str:
system = messages[0]["content"]
user = messages[1]["content"]
content = f"system:{system}\n\nuser:{user}"
if self.json_response:
content += ". Your output must be JSON formatted. Just return the specified JSON format, do not prepend your response with anything."
payload = {
"contents": [
{
"parts": [
{
"text": content
}
]
}
],
"generationConfig": {
"temperature": self.temperature
},
}
if self.json_response:
payload = {
"contents": [
{
"parts": [
{
"text": content
}
]
}
],
"generationConfig": {
"response_mime_type": "application/json",
"temperature": self.temperature
},
}
# payload["generationConfig"]["response_mime_type"] = "application/json"
try:
request_response_json = self._make_request(self.model_endpoint, self.headers, payload)
if 'candidates' not in request_response_json or not request_response_json['candidates']:
raise ValueError("No content in response")
response_content = request_response_json['candidates'][0]['content']['parts'][0]['text']
if self.json_response:
response = json.dumps(json.loads(response_content))
else:
response = response_content
return response
except requests.RequestException as e:
return json.dumps({"error": f"Error in invoking model after {self.max_retries} retries: {str(e)}"})
except (ValueError, KeyError, json.JSONDecodeError) as e:
return json.dumps({"error": f"Error processing response: {str(e)}"})
class GroqModel(BaseModel):
def __init__(self, temperature: float, model: str, json_response: bool, max_retries: int = 3, retry_delay: int = 1):
super().__init__(temperature, model, json_response, max_retries, retry_delay)
# config_path = os.path.join(os.path.dirname(__file__), '..', 'config', 'config.yaml')
# load_config(config_path)
self.api_key = os.environ.get("GROQ_API_KEY")
self.headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}'
}
self.model_endpoint = "https://api.groq.com/openai/v1/chat/completions"
def invoke(self, messages: List[Dict[str, str]]) -> str:
system = messages[0]["content"]
user = messages[1]["content"]
payload = {
"model": self.model,
"messages": [
{
"role": "user",
"content": f"system:{system}\n\n user:{user}"
}
],
"temperature": self.temperature,
}
time.sleep(10)
if self.json_response:
payload["response_format"] = {"type": "json_object"}
try:
request_response_json = self._make_request(self.model_endpoint, self.headers, payload)
if 'choices' not in request_response_json or len(request_response_json['choices']) == 0:
raise ValueError("No choices in response")
response_content = request_response_json['choices'][0]['message']['content']
if self.json_response:
response = json.dumps(json.loads(response_content))
else:
response = response_content
return response
except requests.RequestException as e:
return json.dumps({"error": f"Error in invoking model after {self.max_retries} retries: {str(e)}"})
except (ValueError, KeyError, json.JSONDecodeError) as e:
return json.dumps({"error": f"Error processing response: {str(e)}"})
class OllamaModel(BaseModel):
def __init__(self, temperature: float, model: str, json_response: bool, max_retries: int = 3, retry_delay: int = 1):
super().__init__(temperature, model, json_response, max_retries, retry_delay)
self.headers = {"Content-Type": "application/json"}
self.ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434")
self.model_endpoint = f"{self.ollama_host}/api/generate"
def _check_and_pull_model(self):
# Check if the model exists
response = requests.get(f"{self.ollama_host}/api/tags")
if response.status_code == 200:
models = response.json().get("models", [])
if not any(model["name"] == self.model for model in models):
print(f"Model {self.model} not found. Pulling the model...")
self._pull_model()
else:
print(f"Model {self.model} is already available.")
else:
print(f"Failed to check models. Status code: {response.status_code}")
def _pull_model(self):
pull_endpoint = f"{self.ollama_host}/api/pull"
payload = {"name": self.model}
response = requests.post(pull_endpoint, json=payload, stream=True)
if response.status_code == 200:
for line in response.iter_lines():
if line:
status = json.loads(line.decode('utf-8'))
print(f"Pulling model: {status.get('status')}")
print(f"Model {self.model} pulled successfully.")
else:
print(f"Failed to pull model. Status code: {response.status_code}")
def invoke(self, messages: List[Dict[str, str]]) -> str:
self._check_and_pull_model() # Check and pull the model if necessary
system = messages[0]["content"]
user = messages[1]["content"]
payload = {
"model": self.model,
"prompt": user,
"system": system,
"stream": False,
"temperature": self.temperature,
}
if self.json_response:
payload["format"] = "json"
try:
request_response_json = self._make_request(self.model_endpoint, self.headers, payload)
if self.json_response:
response = json.dumps(json.loads(request_response_json['response']))
else:
response = str(request_response_json['response'])
return response
except requests.RequestException as e:
return json.dumps({"error": f"Error in invoking model after {self.max_retries} retries: {str(e)}"})
except json.JSONDecodeError as e:
return json.dumps({"error": f"Error processing response: {str(e)}"})
class VllmModel(BaseModel):
def __init__(self, temperature: float, model: str, model_endpoint: str, json_response: bool, stop: str = None, max_retries: int = 5, retry_delay: int = 1):
super().__init__(temperature, model, json_response, max_retries, retry_delay)
self.headers = {"Content-Type": "application/json"}
self.model_endpoint = model_endpoint + 'v1/chat/completions'
self.stop = stop
def invoke(self, messages: List[Dict[str, str]], guided_json: dict = None) -> str:
system = messages[0]["content"]
user = messages[1]["content"]
prefix = self.model.split('/')[0]
if prefix == "mistralai":
payload = {
"model": self.model,
"messages": [
{
"role": "user",
"content": f"system:{system}\n\n user:{user}"
}
],
"temperature": self.temperature,
"stop": None,
}
else:
payload = {
"model": self.model,
"messages": [
{
"role": "system",
"content": system
},
{
"role": "user",
"content": user
}
],
"temperature": self.temperature,
"stop": self.stop,
}
if self.json_response:
payload["response_format"] = {"type": "json_object"}
payload["guided_json"] = guided_json
try:
request_response_json = self._make_request(self.model_endpoint, self.headers, payload)
response_content = request_response_json['choices'][0]['message']['content']
if self.json_response:
response = json.dumps(json.loads(response_content))
else:
response = str(response_content)
return response
except requests.RequestException as e:
return json.dumps({"error": f"Error in invoking model after {self.max_retries} retries: {str(e)}"})
except json.JSONDecodeError as e:
return json.dumps({"error": f"Error processing response: {str(e)}"})
class OpenAIModel(BaseModel):
def __init__(self, temperature: float, model: str, json_response: bool, max_retries: int = 3, retry_delay: int = 1):
super().__init__(temperature, model, json_response, max_retries, retry_delay)
# config_path = os.path.join(os.path.dirname(__file__), '..', 'config', 'config.yaml')
# load_config(config_path)
load_dotenv()
self.model_endpoint = 'https://api.302.ai/v1/chat/completions'
self.api_key = os.environ.get('OPENAI_API_KEY')
self.headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}'
}
def invoke(self, messages: List[Dict[str, str]]) -> str:
system = messages[0]["content"]
user = messages[1]["content"]
if self.model == "o1-preview" or self.model == "o1-mini":
payload = {
"model": self.model,
"messages": [
{
"role": "user",
"content": f"{system}\n\n{user}"
}
]
}
else:
payload = {
"model": self.model,
"messages": [
{
"role": "system",
"content": system
},
{
"role": "user",
"content": user
}
],
"stream": False,
"temperature": self.temperature,
}
if self.json_response:
payload["response_format"] = {"type": "json_object"}
payload["messages"][0]["content"] = f"{system}\n\nYou must respond in JSON format."
try:
response_json = self._make_request(self.model_endpoint, self.headers, payload)
if self.json_response:
response = json.dumps(json.loads(response_json['choices'][0]['message']['content']))
else:
response = response_json['choices'][0]['message']['content']
return response
except requests.RequestException as e:
return json.dumps({"error": f"Error in invoking model after {self.max_retries} retries: {str(e)}"})
except json.JSONDecodeError as e:
return json.dumps({"error": f"Error processing response: {str(e)}"})