import os import re import time import json import random import finnhub import torch import gradio as gr import pandas as pd import yfinance as yf from pynvml import * from peft import PeftModel from collections import defaultdict from datetime import date, datetime, timedelta from transformers import AutoTokenizer, AutoModelForCausalLM, TextStreamer access_token = os.environ["HF_TOKEN"] finnhub_client = finnhub.Client(api_key=os.environ["FINNHUB_API_KEY"]) base_model = AutoModelForCausalLM.from_pretrained( 'meta-llama/Llama-2-7b-chat-hf', token=access_token, trust_remote_code=True, device_map="auto", torch_dtype=torch.float16, offload_folder="offload/" ) model = PeftModel.from_pretrained( base_model, 'FinGPT/fingpt-forecaster_dow30_llama2-7b_lora', offload_folder="offload/" ) model = model.eval() tokenizer = AutoTokenizer.from_pretrained( 'meta-llama/Llama-2-7b-chat-hf', token=access_token ) streamer = TextStreamer(tokenizer) B_INST, E_INST = "[INST]", "[/INST]" B_SYS, E_SYS = "<>\n", "\n<>\n\n" SYSTEM_PROMPT = "You are a seasoned stock market analyst. Your task is to list the positive developments and potential concerns for companies based on relevant news and basic financials from the past weeks, then provide an analysis and prediction for the companies' stock price movement for the upcoming week. " \ "Your answer format should be as follows:\n\n[Positive Developments]:\n1. ...\n\n[Potential Concerns]:\n1. ...\n\n[Prediction & Analysis]\nPrediction: ...\nAnalysis: ..." def print_gpu_utilization(): nvmlInit() handle = nvmlDeviceGetHandleByIndex(0) info = nvmlDeviceGetMemoryInfo(handle) print(f"GPU memory occupied: {info.used//1024**2} MB.") def get_curday(): return date.today().strftime("%Y-%m-%d") def n_weeks_before(date_string, n): date = datetime.strptime(date_string, "%Y-%m-%d") - timedelta(days=7*n) return date.strftime("%Y-%m-%d") def get_stock_data(stock_symbol, steps): stock_data = yf.download(stock_symbol, steps[0], steps[-1]) # print(stock_data) dates, prices = [], [] available_dates = stock_data.index.format() for date in steps[:-1]: for i in range(len(stock_data)): if available_dates[i] >= date: prices.append(stock_data['Close'][i]) dates.append(datetime.strptime(available_dates[i], "%Y-%m-%d")) break dates.append(datetime.strptime(available_dates[-1], "%Y-%m-%d")) prices.append(stock_data['Close'][-1]) return pd.DataFrame({ "Start Date": dates[:-1], "End Date": dates[1:], "Start Price": prices[:-1], "End Price": prices[1:] }) def get_news(symbol, data): news_list = [] for end_date, row in data.iterrows(): start_date = row['Start Date'].strftime('%Y-%m-%d') end_date = row['End Date'].strftime('%Y-%m-%d') # print(symbol, ': ', start_date, ' - ', end_date) time.sleep(1) # control qpm weekly_news = finnhub_client.company_news(symbol, _from=start_date, to=end_date) weekly_news = [ { "date": datetime.fromtimestamp(n['datetime']).strftime('%Y%m%d%H%M%S'), "headline": n['headline'], "summary": n['summary'], } for n in weekly_news ] weekly_news.sort(key=lambda x: x['date']) news_list.append(json.dumps(weekly_news)) data['News'] = news_list return data def get_company_prompt(symbol): profile = finnhub_client.company_profile2(symbol=symbol) company_template = "[Company Introduction]:\n\n{name} is a leading entity in the {finnhubIndustry} sector. Incorporated and publicly traded since {ipo}, the company has established its reputation as one of the key players in the market. As of today, {name} has a market capitalization of {marketCapitalization:.2f} in {currency}, with {shareOutstanding:.2f} shares outstanding." \ "\n\n{name} operates primarily in the {country}, trading under the ticker {ticker} on the {exchange}. As a dominant force in the {finnhubIndustry} space, the company continues to innovate and drive progress within the industry." formatted_str = company_template.format(**profile) return formatted_str def get_prompt_by_row(symbol, row): start_date = row['Start Date'] if isinstance(row['Start Date'], str) else row['Start Date'].strftime('%Y-%m-%d') end_date = row['End Date'] if isinstance(row['End Date'], str) else row['End Date'].strftime('%Y-%m-%d') term = 'increased' if row['End Price'] > row['Start Price'] else 'decreased' head = "From {} to {}, {}'s stock price {} from {:.2f} to {:.2f}. Company news during this period are listed below:\n\n".format( start_date, end_date, symbol, term, row['Start Price'], row['End Price']) news = json.loads(row["News"]) news = ["[Headline]: {}\n[Summary]: {}\n".format( n['headline'], n['summary']) for n in news if n['date'][:8] <= end_date.replace('-', '') and \ not n['summary'].startswith("Looking for stock market analysis and research with proves results?")] basics = json.loads(row['Basics']) if basics: basics = "Some recent basic financials of {}, reported at {}, are presented below:\n\n[Basic Financials]:\n\n".format( symbol, basics['period']) + "\n".join(f"{k}: {v}" for k, v in basics.items() if k != 'period') else: basics = "[Basic Financials]:\n\nNo basic financial reported." return head, news, basics def sample_news(news, k=5): return [news[i] for i in sorted(random.sample(range(len(news)), k))] def get_current_basics(symbol, curday): basic_financials = finnhub_client.company_basic_financials(symbol, 'all') final_basics, basic_list, basic_dict = [], [], defaultdict(dict) for metric, value_list in basic_financials['series']['quarterly'].items(): for value in value_list: basic_dict[value['period']].update({metric: value['v']}) for k, v in basic_dict.items(): v.update({'period': k}) basic_list.append(v) basic_list.sort(key=lambda x: x['period']) for basic in basic_list[::-1]: if basic['period'] <= curday: break return basic def get_all_prompts_online(symbol, data, curday, with_basics=True): company_prompt = get_company_prompt(symbol) prev_rows = [] for row_idx, row in data.iterrows(): head, news, _ = get_prompt_by_row(symbol, row) prev_rows.append((head, news, None)) prompt = "" for i in range(-len(prev_rows), 0): prompt += "\n" + prev_rows[i][0] sampled_news = sample_news( prev_rows[i][1], min(5, len(prev_rows[i][1])) ) if sampled_news: prompt += "\n".join(sampled_news) else: prompt += "No relative news reported." period = "{} to {}".format(curday, n_weeks_before(curday, -1)) if with_basics: basics = get_current_basics(symbol, curday) basics = "Some recent basic financials of {}, reported at {}, are presented below:\n\n[Basic Financials]:\n\n".format( symbol, basics['period']) + "\n".join(f"{k}: {v}" for k, v in basics.items() if k != 'period') else: basics = "[Basic Financials]:\n\nNo basic financial reported." info = company_prompt + '\n' + prompt + '\n' + basics prompt = info + f"\n\nBased on all the information before {curday}, let's first analyze the positive developments and potential concerns for {symbol}. Come up with 2-4 most important factors respectively and keep them concise. Most factors should be inferred from company related news. " \ f"Then make your prediction of the {symbol} stock price movement for next week ({period}). Provide a summary analysis to support your prediction." return info, prompt def construct_prompt(ticker, curday, n_weeks, use_basics): steps = [n_weeks_before(curday, n) for n in range(n_weeks + 1)][::-1] data = get_stock_data(ticker, steps) data = get_news(ticker, data) data['Basics'] = [json.dumps({})] * len(data) print(data) info, prompt = get_all_prompts_online(ticker, data, curday, use_basics) prompt = B_INST + B_SYS + SYSTEM_PROMPT + E_SYS + prompt + E_INST print(prompt) return info, prompt def predict(ticker, date, n_weeks, use_basics): print_gpu_utilization() info, prompt = construct_prompt(ticker, date, n_weeks, use_basics) inputs = tokenizer( prompt, return_tensors='pt', padding=False ) inputs = {key: value.to(model.device) for key, value in inputs.items()} print("Inputs loaded onto devices.") res = model.generate( **inputs, max_length=4096, do_sample=True, eos_token_id=tokenizer.eos_token_id, use_cache=True, streamer=streamer ) output = tokenizer.decode(res[0], skip_special_tokens=True) answer = re.sub(r'.*\[/INST\]\s*', '', output, flags=re.DOTALL) torch.cuda.empty_cache() return info, answer demo = gr.Interface( predict, inputs=[ gr.Textbox( label="Ticker", value="AAPL", info="Companys from Dow-30 are recommended" ), gr.Textbox( label="Date", value=get_curday, info="Date from which the prediction is made, use format yyyy-mm-dd" ), gr.Slider( minimum=1, maximum=4, value=3, step=1, label="n_weeks", info="Information of the past n weeks will be utilized, choose between 1 and 4" ), gr.Checkbox( label="Use Latest Financial Basics", value=False, info="If checked, the latest quarterly reported financial basics of the company is taken into account." ) ], outputs=[ gr.Textbox( label="Information" ), gr.Textbox( label="Response" ) ] ) demo.launch()