Spaces:
Runtime error
Runtime error
import os | |
import re | |
import numpy as np | |
import openai | |
import pandas as pd | |
from sklearn.preprocessing import MinMaxScaler | |
from statsforecast import StatsForecast | |
from statsforecast.models import Naive | |
openai.api_key = os.environ['OPENAI_API_KEY'] | |
class ChatGPTForecast: | |
def __init__(self): | |
self.bins = np.linspace(0, 1, num=10_000) # Create 1000 bins between -10 and 10 | |
self.mapping = {i: f"{i}" for i in range(len(self.bins))} | |
self.prompt = f""" | |
forecast this series, | |
(i know that you prefer using specific tools, but i'm testing something, | |
just give me your predicted numbers please, just print the numbers i dont need an explanation) | |
please consider: | |
- give the output with the same structure: "number1 number2 number3" | |
- give more weight to the most recent observations | |
- consider trend | |
- consider seasonality | |
""" | |
def tokenize_time_series(self, series): | |
indices = np.digitize(series, self.bins) - 1 # Find which bin each data point falls into | |
return ' '.join(self.mapping[i] for i in indices) | |
def clean_string(self, s): | |
pattern = r'(\d+)[^\s]*' | |
# Extract the bin_# parts and join them with space | |
cleaned = ' '.join(re.findall(pattern, s)) | |
return cleaned | |
def extend_string(self, s, h): | |
# Find all bin_# elements | |
bin_numbers = re.findall(r'\d+', s) | |
# Calculate current length | |
current_length = len(bin_numbers) | |
# If the string is already of length h, return as is | |
if current_length == h: | |
return s | |
# If the string length exceeds h, trim the string | |
elif current_length > h: | |
bin_numbers = bin_numbers[:h] | |
return ' '.join(bin_numbers) | |
else: | |
# Calculate how many full repeats we need | |
repeats = h // current_length | |
# If h is not a multiple of current_length, calculate how many more elements we need | |
extra = h % current_length | |
# Create the new string by repeating the original string and adding any extra elements | |
new_string = ' '.join(bin_numbers * repeats + bin_numbers[:extra]) | |
return new_string | |
def clean_gpt_output(self, output): | |
# Remove extra spaces and trailing underscores | |
cleaned_output = output.replace(" _", "_").replace("_ ", "_") | |
# Trim any trailing underscore | |
if cleaned_output.endswith("_"): | |
cleaned_output = cleaned_output[:-1] | |
return self.clean_string(cleaned_output) | |
def decode_time_series(self, tokens): | |
# Reverse the mapping | |
reverse_mapping = {v: k for k, v in self.mapping.items()} | |
# Split the token string into individual tokens and map them back to bin indices | |
indices = [int(token) for token in tokens.split()]#[reverse_mapping[token] for token in tokens.split()] | |
# Convert bin indices back to the original values | |
# Here we'll use the center point of each bin | |
bin_width = self.bins[1] - self.bins[0] | |
series = [self.bins[i] + bin_width / 2 for i in indices] | |
return series | |
def find_min_max(self, string_of_integers): | |
# Split the string into a list of strings | |
str_list = string_of_integers.split() | |
# Convert the list of strings into a list of integers | |
int_list = [int(i) for i in str_list] | |
# Find the minimum and maximum values | |
min_value = min(int_list) | |
max_value = max(int_list) | |
return min_value, max_value | |
def call_openai(self, series, seasonality, h, n_forecasts): | |
series_tokenized = self.tokenize_time_series(series) | |
min_val, max_val = self.find_min_max(series_tokenized) | |
prompt = f""" | |
{self.prompt}-consider {seasonality} as seasonality | |
- just print {h} steps ahead | |
- values should be integers between {min_val} and {max_val}, please be sure to do this | |
this is the series: {series_tokenized} | |
""" | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": prompt}], | |
n=n_forecasts | |
) | |
choices = response['choices'] | |
outputs = [] | |
for choice in choices: | |
output_gpt = choice['message']['content'] | |
if len(output_gpt.split()) < 2: | |
continue | |
output_gpt = self.extend_string(output_gpt, h) | |
output_gpt = ' '.join(f'{max(min(int(x), len(self.bins) - 1), 0)}' for x in output_gpt.split()) | |
outputs.append(self.decode_time_series(output_gpt)) | |
outputs = np.vstack(outputs) | |
return outputs | |
def forward(self, series, seasonality, h, n_forecasts): | |
outputs = self.call_openai(series, seasonality, h, n_forecasts) | |
outputs = np.median(outputs, axis=0) | |
return outputs | |
def conformal_intervals(self, series, seasonality, h, n_forecasts): | |
series_train, series_test = series[:-h], series[-h:] | |
outputs = self.call_openai(series_train, seasonality, h, n_forecasts) | |
errors = np.abs(outputs - series_test) | |
lower_levels = np.quantile(errors, q=0.05, axis=0) | |
upper_levels = np.quantile(errors, q=0.095, axis=0) | |
return lower_levels, upper_levels | |
def compute_ds_future(self, ds, fh): | |
ds_ = pd.to_datetime(ds) | |
try: | |
freq = pd.infer_freq(ds_) | |
except: | |
freq = None | |
if freq is not None: | |
ds_future = pd.date_range(ds_[-1], periods=fh + 1, freq=freq)[1:] | |
else: | |
freq = ds_[-1] - ds_[-2] | |
ds_future = [ds_[-1] + (i + 1) * freq for i in range(fh)] | |
ds_future = list(map(str, ds_future)) | |
return ds_future, freq | |
def forecast(self, df, h, input_size, n_forecasts=10): | |
df = df.copy() | |
scaler = MinMaxScaler() | |
df['y'] = scaler.fit_transform(df[['y']]) | |
ds_future, freq = self.compute_ds_future(df['ds'].values, h) | |
sf = StatsForecast(models=[Naive()], freq='D') | |
fcst_df = sf.forecast(df=df, h=h) | |
fcst_df['ds'] = ds_future | |
fcst_df['ChatGPT_3.5_Turbo'] = self.forward(df['y'].values[-input_size:], freq, h, n_forecasts)[-h:] | |
# add prediction intervals | |
lower_levels, upper_levels = self.conformal_intervals(df['y'].values[-(input_size + h):], freq, h, n_forecasts) | |
fcst_df['ChatGPT_3.5_Turbo-lo-90'] = fcst_df['ChatGPT_3.5_Turbo'] - lower_levels | |
fcst_df['ChatGPT_3.5_Turbo-hi-90'] = fcst_df['ChatGPT_3.5_Turbo'] + upper_levels | |
for col in ['Naive', 'ChatGPT_3.5_Turbo', 'ChatGPT_3.5_Turbo-lo-90', 'ChatGPT_3.5_Turbo-hi-90']: | |
fcst_df[col] = scaler.inverse_transform(fcst_df[[col]]) | |
df['y'] = scaler.inverse_transform(df[['y']]) | |
return sf.plot(df, fcst_df, max_insample_length=3 * h, level=[90]) | |