|
|
|
import json |
|
import logging |
|
import os |
|
from pathlib import Path |
|
|
|
import openai |
|
from dotenv import load_dotenv |
|
from openai.error import (APIError, RateLimitError, ServiceUnavailableError, |
|
Timeout, APIConnectionError, InvalidRequestError) |
|
from tenacity import (before_sleep_log, retry, retry_if_exception_type, |
|
stop_after_delay, wait_random_exponential, stop_after_attempt) |
|
from tiktoken import Encoding, encoding_for_model |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
load_dotenv() |
|
|
|
|
|
MAX_MODEL_TOKEN_COUNT = 4096 |
|
|
|
MAX_RESPONSE_TOKEN_COUNT = 512 |
|
RESPONSES_DIRECTORY_PATH = Path('../openai-api-responses-new') |
|
|
|
|
|
def get_openai_model_encoding(model_id): |
|
"""Get the encoding (tokenizer) for the OpenAI model.""" |
|
return encoding_for_model(model_id) |
|
|
|
|
|
def get_max_chapter_segment_token_count(prompt: str, model_id: str) -> int: |
|
""" |
|
Calculate the maximum number of tokens that a chapter segment may contain |
|
given the prompt. |
|
""" |
|
encoding = get_openai_model_encoding(model_id) |
|
|
|
|
|
prompt_token_count = len(encoding.encode_ordinary(prompt)) |
|
|
|
|
|
|
|
|
|
|
|
max_chapter_segment_token_count = (MAX_MODEL_TOKEN_COUNT |
|
- MAX_RESPONSE_TOKEN_COUNT |
|
- prompt_token_count - 8 - 1) |
|
return max_chapter_segment_token_count |
|
|
|
|
|
@retry(retry=retry_if_exception_type((APIError, Timeout, RateLimitError, |
|
ServiceUnavailableError, APIConnectionError, InvalidRequestError)), |
|
wait=wait_random_exponential(max=60), stop=stop_after_attempt(10), |
|
before_sleep=before_sleep_log(logger, logging.WARNING)) |
|
def save_openai_api_response(prompt_messages): |
|
""" |
|
Use a prompt to make a request to the OpenAI API and return the response data. |
|
""" |
|
|
|
openai.api_key = prompt_messages[0]['api_key'] |
|
model_id = prompt_messages[0]['model_id'] |
|
prompt_messages[0].pop('api_key') |
|
prompt_messages[0].pop('model_id') |
|
|
|
try: |
|
logger.info('Calling OpenAI API...') |
|
response = openai.ChatCompletion.create( |
|
model=model_id, messages=prompt_messages, temperature=0 |
|
) |
|
finish_reason = response.choices[0].finish_reason |
|
if finish_reason != 'stop': |
|
logger.error(f'`finish_reason` is `{finish_reason}`.') |
|
|
|
save_data = { |
|
'model': response.model, |
|
'usage': response.usage, |
|
'finish_reason': finish_reason, |
|
'prompt_messages': prompt_messages, |
|
'response': response.choices[0].message.content |
|
} |
|
except InvalidRequestError: |
|
logger.error('InvalidRequestError encountered 10 times. Returning empty response.') |
|
save_data = { |
|
'model': None, |
|
'usage': None, |
|
'finish_reason': 'invalid_request', |
|
'prompt_messages': prompt_messages, |
|
'response': ' ' |
|
} |
|
|
|
return save_data |
|
|
|
|
|
def load_response_text(save_path): |
|
""" |
|
Load the response text from a JSON file containing response data from the |
|
OpenAI API. |
|
""" |
|
with open(save_path, 'r') as save_file: |
|
save_data = json.load(save_file) |
|
return save_data['response'] |
|
|