Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import dotenv | |
| import pandas as pd | |
| import requests | |
| class FetchForecast: | |
| def __init__(self, ticker: str, df_hist: pd.DataFrame, debug=False) -> None: | |
| if debug: | |
| self.logger_level = logging.DEBUG | |
| else: | |
| self.logger_level = logging.INFO | |
| self.logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=self.logger_level) | |
| # args | |
| self.ticker = ticker | |
| self.df_hist = df_hist | |
| if df_hist is None: | |
| self.endpoint = "v1/forecast/from_symbol" | |
| logdatasuffix = "without data" | |
| else: | |
| self.endpoint = "v1/forecast/from_data" | |
| logdatasuffix = "with historic data" | |
| self.logger.info(f"Initialized FetchForecast for ticker: {self.ticker} {logdatasuffix}") | |
| # constants | |
| self.past_horizon = 5 # number of past business days | |
| # build the api-url based on env variables | |
| self.api_env = os.environ.get("FORECAST_API_ENV") | |
| api_url_temp = os.environ.get("API_URL_TEMPLATE") | |
| self.api_url = api_url_temp.replace("ENV", self.api_env) | |
| def run(self): | |
| past_df, fcst_df = self.call_api() | |
| return past_df, fcst_df | |
| def call_api(self) -> tuple: | |
| if self.endpoint.split("/")[-1] in ["from_symbol"]: | |
| self.logger.info(f"Sending the ticker symmbol to the forecast API ({self.api_env}) {self.endpoint} endpoint") | |
| pl_in = {"ticker": self.ticker, "past_horizon": self.past_horizon} | |
| elif self.endpoint.split("/")[-1] in ["from_data"]: | |
| self.logger.info(f"Formatting and sending ticker data to the forecast API ({self.api_env}) {self.endpoint} endpoint") | |
| pl_in = self.build_payload_with_data(ticker=self.ticker, past_horizon=self.past_horizon) | |
| resp = requests.post(f"{self.api_url}/{self.endpoint}", json=pl_in, timeout=30) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| past_df, fcst_df = self.transform_data(data) | |
| else: | |
| self.logger.error(f"Error (status: {resp.status_code}) fetching stock info for {self.ticker}.") | |
| past_df, fcst_df = None, None | |
| return past_df, fcst_df | |
| def transform_data(self, data) -> tuple: | |
| past_df = pd.DataFrame(data["past"]).rename(columns={"index": "Date"}) | |
| fcst_df = pd.DataFrame(data["forecast"]).rename(columns={"index": "Date"}) | |
| # Convert to dates | |
| past_df["Date"] = pd.to_datetime(past_df["Date"], utc=True).dt.tz_convert(None) | |
| fcst_df["Date"] = pd.to_datetime(fcst_df["Date"], utc=True).dt.tz_convert(None) | |
| # move Date to the front | |
| past_df = past_df[["Date"] + [col for col in past_df.columns if col != "Date"]] | |
| fcst_df = fcst_df[["Date"] + [col for col in fcst_df.columns if col != "Date"]] | |
| return past_df, fcst_df | |
| def build_payload_with_data(self, ticker: str, past_horizon: int) -> dict: | |
| """ | |
| Takes a dataframe | |
| and returns the columnar JSON dict expected by the /from_data endpoint. | |
| """ | |
| df = self.df_hist | |
| df.columns.name = None | |
| # Re-introduce the ticker as a regular column | |
| df["Ticker"] = ticker | |
| # Make 'Date' a regular column by resetting the index | |
| df.reset_index(inplace=True) | |
| if "Date" not in df.columns or "Close" not in df.columns: | |
| raise ValueError("DataFrame must contain 'Date' and 'Close' columns.") | |
| df = df[["Date", "Close"]].copy() | |
| df["Date"] = pd.to_datetime(df["Date"], utc=True, errors="coerce") | |
| # Clean + order | |
| df = ( | |
| df.dropna(subset=["Date", "Close"]) | |
| .sort_values("Date") | |
| .drop_duplicates(subset=["Date"], keep="last") | |
| ) | |
| payload = { | |
| "ticker": ticker, | |
| "series": { | |
| "date": df["Date"].dt.strftime("%Y-%m-%d").tolist(), | |
| "close": df["Close"].astype(float).tolist(), | |
| }, | |
| "past_horizon": past_horizon, | |
| } | |
| return payload | |
| if __name__ == "__main__": | |
| # load the env variables fom .env file | |
| dotenv.load_dotenv(dotenv.find_dotenv()) | |
| past_df, fcst_df = FetchForecast("AAPL").run() | |
| print("Last available price:\n", past_df.tail(1)) | |
| print("Forecasts:\n", fcst_df.head()) | |