Spaces:
Running
Running
| import os | |
| import gradio as gr | |
| import openai | |
| #from numpy._core.defchararray import endswith, isdecimal, startswith | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| from pathlib import Path | |
| from time import sleep | |
| import audioread | |
| import queue | |
| import threading | |
| from glob import glob | |
| import copy | |
| import base64 | |
| import json | |
| from PIL import Image | |
| from io import BytesIO | |
| from pydantic import BaseModel | |
| import pprint | |
| import pandas as pd | |
| import yfinance as yf | |
| from datetime import datetime, timedelta | |
| import pytz | |
| import math | |
| import numpy as np | |
| from pylatexenc.latex2text import LatexNodes2Text | |
| import requests | |
| from urllib.parse import quote | |
| import geo_distance | |
| import geo_locate | |
| load_dotenv(override=True) | |
| key = os.getenv('OPENAI_API_KEY') | |
| users = os.getenv('LOGNAME') | |
| unames = users.split(',') | |
| pwds = os.getenv('PASSWORD') | |
| pwdList = pwds.split(',') | |
| DEEPSEEK_KEY=os.getenv('DEEPSEEK_KEY') | |
| GROQ_KEY=os.getenv('GROQ_KEY') | |
| BRAVE_KEY=os.getenv('BRAVE_KEY') | |
| BRAVE_SEARCH_KEY=os.getenv('BRAVE_SEARCH_KEY') | |
| LOCATIONID_KEY=os.getenv('LOCATIONID_KEY') | |
| site = os.getenv('SITE') | |
| if site == 'local': | |
| dp = Path('./data') | |
| dp.mkdir(exist_ok=True) | |
| dataDir = './data/' | |
| else: | |
| dp = Path('/data') | |
| dp.mkdir(exist_ok=True) | |
| dataDir = '/data/' | |
| stock_data_path = dataDir + 'Stocks.txt' | |
| braveNewsEndpoint = "https://api.search.brave.com/res/v1/news/search" | |
| braveSearchEndpoint = "https://api.search.brave.com/res/v1/web/search" | |
| speak_file = dataDir + "speek.wav" | |
| # client = OpenAI(api_key = key) | |
| #digits = ['zero: ','one: ','two: ','three: ','four: ','five: ','six: ','seven: ','eight: ','nine: '] | |
| abbrevs = {'St. ' : 'Saint ', 'Mr. ': 'mister ', 'Mrs. ':'mussus ', 'Mr. ':'mister ', 'Ms. ':'mizz '} | |
| special_chat_types = ['math', 'logic'] | |
| news_interval_choices = [("Day", "pd"), ("Week", "pw"), ("Month", "pm"), ("Year", "py")] | |
| def get_distance(addr1, addr2): | |
| (lat1, lon1) = geo_locate.get_geo_coords(addr1, LOCATIONID_KEY) | |
| (lat2, lon2) = geo_locate.get_geo_coords(addr2, LOCATIONID_KEY) | |
| distance = geo_distance.great_circle_distance_miles(lat1, lon1, lat2, lon2) | |
| return distance | |
| def get_openai_file(file_id, container_id): | |
| url = f'https://api.openai.com/v1/containers/{container_id}/files/{file_id}/content' | |
| headers= {"Authorization": "Bearer " + key} | |
| response = requests.get( | |
| url, | |
| headers=headers | |
| ) | |
| return response | |
| def list_openai_container_files(container_id): | |
| url = f'https://api.openai.com/v1/containers/{container_id}/files' | |
| headers= {"Authorization": "Bearer " + key} | |
| response = requests.get( | |
| url, | |
| headers=headers | |
| ) | |
| return response | |
| def create_openai_container(name): | |
| url = 'https://api.openai.com/v1/containers' | |
| headers= {"Authorization": "Bearer " + key, "Content-Type": "application/json",} | |
| json_data = {"name": name} | |
| response = requests.post( | |
| url, | |
| headers=headers, | |
| json=json_data | |
| ) | |
| return json.loads(response.content)["id"] | |
| class Step(BaseModel): | |
| explanation: str | |
| output: str | |
| class MathReasoning(BaseModel): | |
| steps: list[Step] | |
| final_answer: str | |
| def get_brave_search_results(query: str): | |
| rv = '' | |
| url = f'{braveSearchEndpoint}?q={quote(query)}&count=20' | |
| response = requests.get( | |
| url, | |
| headers= {"Accept": "application/json", | |
| "X-Subscription-Token": BRAVE_KEY | |
| }, | |
| ) | |
| rv ='''Following are list items delineated by *item separator* | |
| At the end of each item is (item source, item age) | |
| *item separator* ''' | |
| jdata = response.json() | |
| web_results = jdata['web']['results'] | |
| for item in web_results: | |
| title = item['title'] | |
| description = item['description'] | |
| rv += f'{title}: {description} --' | |
| try: # extra_snippets can be missing | |
| for snip in item['extra_snippets']: | |
| rv += (snip + ' ') | |
| except: | |
| pass | |
| try: | |
| host = item['meta_url']['hostname'] | |
| except: | |
| host = 'unknown' | |
| try: | |
| age = item['age'] | |
| except: | |
| age = 'unknown' | |
| rv += f' (Item source: {host}, Item age: {age})' | |
| rv += ' *item separator* ' | |
| return rv | |
| def get_brave_news(query: str, interval: str = 'pd'): | |
| url = f'{braveNewsEndpoint}?q={quote(query)}&count=20&extra_snippets=true&freshness={interval}' | |
| response = requests.get( | |
| url, | |
| headers= {"Accept": "application/json", | |
| "X-Subscription-Token": BRAVE_KEY | |
| }, | |
| ) | |
| rv ='''Following are list items delineated by *item separator* | |
| At the end of each item is (item source, item age) | |
| *item separator* ''' | |
| jdata = response.json() | |
| for item in jdata['results']: | |
| title = item['title'] | |
| description = item['description'] | |
| rv += f'{title}: {description} --' | |
| try: # extra_snippets can be missing | |
| for snip in item['extra_snippets']: | |
| rv += (snip + ' ') | |
| except: | |
| pass | |
| try: | |
| host = item['meta_url']['hostname'] | |
| except: | |
| host = 'unknown' | |
| try: | |
| age = item['age'] | |
| except: | |
| age = 'unknown' | |
| rv += f' (Item source: {host}, Item age: {age})' | |
| rv += ' *item separator* ' | |
| return rv | |
| def Client(): | |
| return OpenAI(api_key = key) | |
| def test_plot_df(): | |
| data = { | |
| "month": ['2024-01','2024-02','2024-03'], | |
| "value": [22.4, 30.1, 25.6] | |
| } | |
| return pd.DataFrame(data) | |
| def md(txt): | |
| # if 'DOCTYPE' in txt: | |
| # return str(txt.replace('GPT','<br>GPT')) | |
| # else: | |
| return str(txt).replace('```', ' ').replace(' ', ' ').replace(' ', ' ').replace(' ', ' ').replace('\n','<br>').replace('~~','~') | |
| # return txt | |
| def etz_now(): | |
| eastern = pytz.timezone('US/Eastern') | |
| ltime = datetime.now(eastern) | |
| return ltime | |
| def date_from_utime(utime): | |
| ts = int(utime) | |
| dt = datetime.utcfromtimestamp(ts) | |
| eastern = pytz.timezone('US/Eastern') | |
| return dt.astimezone(eastern).strftime('%Y-%m-%d') | |
| def convert_latex_math(text): | |
| lines = text.split('\n') | |
| start_line = False | |
| out_txt = '' | |
| for line in lines: | |
| if len(line) == 0: | |
| out_txt += '\n' | |
| continue | |
| else: | |
| if line == r'\]': | |
| continue | |
| if line == r'\[': | |
| start_line = True | |
| continue | |
| if start_line: | |
| line = '\n' + LatexNodes2Text().latex_to_text(line.strip()) | |
| start_line = False | |
| if line.startswith(r'\['): | |
| loc = line.find(r'\]') | |
| if loc > 0: | |
| latex_code = line[2:loc] | |
| line = '\n' + LatexNodes2Text().latex_to_text(latex_code) | |
| out_txt += (line + '\n') | |
| return out_txt | |
| def stock_list(): | |
| rv = '' | |
| with open(stock_data_path, 'rt') as fp: | |
| lines = fp.readlines() | |
| for line in lines: | |
| (name, symbol, shares) = line.rstrip().split(',') | |
| name = name.strip() | |
| symbol = symbol.strip() | |
| rv += f'{symbol} {name}\n' | |
| return rv | |
| def get_stock_list(): | |
| stock_list = {} | |
| with open(stock_data_path, 'rt') as fp: | |
| lines = fp.readlines() | |
| for line in lines: | |
| (name, symbol, shares) = line.rstrip().split(',') | |
| stock_list[symbol.strip()] = (name.strip(),shares.strip()) | |
| return stock_list | |
| def get_stock_news(search_symbol): | |
| fuzzy = True | |
| have_symbol = False | |
| search_symbol = search_symbol.strip().upper() | |
| stock_list = get_stock_list() | |
| search_term = search_symbol | |
| if search_symbol in stock_list.keys(): | |
| have_symbol = True | |
| (search_term, shares) = stock_list[search_symbol] | |
| try: | |
| news = yf.Search(search_term, news_count=5, enable_fuzzy_query=fuzzy).news | |
| except: | |
| return (f'No results for search term {search_term}, check spelling', None) | |
| rv = '' | |
| for item in news: | |
| rv += f'Title: {item["title"]}\n' | |
| rv += f'Publisher: {item["publisher"]}\n' | |
| rv += f'Date published: {date_from_utime(item["providerPublishTime"])}\n' | |
| rv += f'Link: [URL]({item["link"]})\n\n' | |
| if have_symbol: | |
| (plot_df, ymax, deltas) = stock_week_df(search_symbol) | |
| else: | |
| (plot_df, ymax, deltas) = (pd.DataFrame(), 0.0, (0.0, 0.0, 0.0)) | |
| return (rv, plot_df, ymax, deltas) | |
| def stock_history_df(num_weeks): | |
| values = [] | |
| dates = [] | |
| xmax = 0 | |
| for offset in range(num_weeks+1): | |
| (value, date) = get_stock_report(False, offset) | |
| # date = date[5:] | |
| values.append(value) | |
| dates.append(date) | |
| if float(value) > xmax: | |
| xmax = float(value) | |
| values.reverse() | |
| dates.reverse() | |
| data = { | |
| "date": dates, | |
| "value" : values | |
| } | |
| return (pd.DataFrame(data), f'{int(xmax + 10000)}') | |
| def stock_deltas(values): | |
| num = len(values) | |
| month_end_avg = float(np.average(np.array(values[-3:]))) | |
| month_start_avg = float(np.average(np.array(values[0:4]))) | |
| week_start_avg = float(np.average(np.array(values[-7:-4]))) | |
| week_end_avg = float(np.average(np.array(values[-2:]))) | |
| month_delta = 100 * (month_end_avg - month_start_avg)/month_start_avg | |
| week_delta = 100 * (week_end_avg - week_start_avg)/week_start_avg | |
| daily_delta = 100 * ((float(values[num-1])/float(values[num-2])) - 1.0) | |
| # avg = np.average(npa) | |
| return (month_delta, week_delta, daily_delta) | |
| def stock_week_df(symbol): | |
| try: | |
| dates = [] | |
| values = [] | |
| ymax = 0 | |
| etime = etz_now() | |
| if etime.hour >= 16: | |
| etime = etime + timedelta(days=1) | |
| week_ago = etime - timedelta(days=40) # was 8 | |
| end = etime.strftime('%Y-%m-%d') | |
| start = week_ago.strftime('%Y-%m-%d') | |
| df = yf.download(symbol.upper(), | |
| start = start, | |
| end = end, | |
| progress = False, | |
| ) | |
| vals2d = df.values.tolist() | |
| valsTxt = [] | |
| numDays = len(vals2d) | |
| for i in range(numDays): | |
| valsTxt.append(vals2d[i][0]) | |
| for val in valsTxt: | |
| v = round(float(val),2) | |
| values.append(v) | |
| if v > ymax: | |
| ymax = v | |
| for row in df.index: | |
| dates.append(row.strftime('%Y-%m-%d')) | |
| # fit_data = lms_fit(dates, values) | |
| # pct_delta = lms_fit_trend(dates, values) | |
| deltas = stock_deltas(values) | |
| data = { | |
| "date": dates, | |
| "value" : values, | |
| # "fit" : fit_data | |
| } | |
| # fig = make_mp_figure(dates, values, fit_data, ymax) | |
| return (pd.DataFrame(data), ymax, deltas) | |
| except: | |
| return (pd.DataFrame(), ymax, (0.0, 0.0, 0.0)) | |
| def stock_recent_delta(symbol): | |
| try: | |
| dates = [] | |
| values = [] | |
| ymax = 0 | |
| etime = etz_now() | |
| if etime.hour >= 16: | |
| etime = etime + timedelta(days=1) | |
| week_ago = etime - timedelta(days=8) | |
| end = etime.strftime('%Y-%m-%d') | |
| start = week_ago.strftime('%Y-%m-%d') | |
| df = yf.download(symbol.upper(), | |
| start = start, | |
| end = end, | |
| progress = False, | |
| ) | |
| vals2d = df.values.tolist() | |
| valsTxt = [] | |
| numDays = len(vals2d) | |
| for i in range(numDays): | |
| valsTxt.append(vals2d[i][0]) | |
| for val in valsTxt: | |
| v = round(float(val),2) | |
| values.append(v) | |
| if v > ymax: | |
| ymax = v | |
| for row in df.index: | |
| dates.append(row.strftime('%Y-%m-%d')) | |
| start_val = float(np.average(np.array(values[:2]))) | |
| end_val = float(values[len(values)-1]) | |
| return f'{(end_val/start_val - 1.0)*100:.1f}' | |
| except: | |
| return 'NA' | |
| def get_alerts(): | |
| try: | |
| rv = '' | |
| # stock_data = {} | |
| global stock_data_path | |
| with open(stock_data_path, 'rt') as fp: | |
| lines = fp.readlines() | |
| for line in lines: | |
| (name, symbol, shares) = line.rstrip().split(',') | |
| name = name.strip() | |
| symbol = symbol.strip() | |
| delta_pct = stock_recent_delta(symbol) | |
| if delta_pct == 'NA': | |
| rv += f'\n{symbol} ({name}) NA' | |
| else: | |
| rv += f'\n{symbol} ({name}) {delta_pct}%' | |
| if abs(float(delta_pct)) > 3: | |
| rv += ' **\*\*\*** ' | |
| return 'Stock price changes over last week:\nChanges greater than +/-3% marked by **\*\*\***\n ' + rv + '\n' | |
| except: | |
| return "Error getting stock deltas\n" | |
| def get_stock_report(verbose = True, offset = 0): | |
| try: | |
| stock_data = {} | |
| global stock_data_path | |
| error_msg = '' | |
| with open(stock_data_path, 'rt') as fp: | |
| lines = fp.readlines() | |
| for line in lines: | |
| (name, symbol, shares) = line.rstrip().split(',') | |
| name = name.strip() | |
| symbol = symbol.strip() | |
| shares = shares.strip() | |
| stock_data[symbol] = {"symbol": symbol, "name": name, "shares": shares, "closing": '0'} | |
| for symbol in stock_data.keys(): | |
| (closing_price, closing_date) = get_last_closing(symbol, offset) | |
| if closing_price == 0: | |
| error_msg += f'Error getting closing for {symbol}\n' | |
| stock_data[symbol]['closing'] = f'{closing_price:.2f}' | |
| total_value = 0.0 | |
| if verbose: | |
| rv = f'At closing on {closing_date}:\n' | |
| for item in stock_data.values(): | |
| rv += str(item) + '\n' | |
| total_value += float(item['closing']) * float(item['shares']) | |
| rv += (f'\nTotal value = {total_value:.2f}\n') | |
| if len(error_msg) > 0: | |
| rv += error_msg | |
| rv += f'Eastern time is: {etz_now()}' | |
| else: | |
| for item in stock_data.values(): | |
| total_value += float(item['closing']) * float(item['shares']) | |
| return (total_value, closing_date) | |
| except: | |
| rv = 'Error getting stock report' | |
| return rv | |
| def get_last_closing(symbol, offset=0, timeout=10): | |
| try: | |
| etime = etz_now() | |
| if etime.hour >= 16: | |
| etime = etime + timedelta(days=1) | |
| if offset > 0: | |
| etime = etime - timedelta(weeks=offset) | |
| five_days_ago = etime - timedelta(days=6) | |
| end = etime.strftime('%Y-%m-%d') | |
| start = five_days_ago.strftime('%Y-%m-%d') | |
| df = yf.download(symbol, | |
| start = start, | |
| end = end, | |
| progress = False, | |
| timeout=timeout, | |
| auto_adjust=False, | |
| ) | |
| # print(df) | |
| closing_date = 'unknown' | |
| data_top = df.tail(1) | |
| for row in data_top.index: | |
| closing_date = row.strftime('%Y-%m-%d') | |
| # print(closing_date) | |
| return (df.iat[-1,0], closing_date) | |
| except: | |
| return (0.0, "0000-00-00") | |
| def get_total_daily_closing_sequence(num_days): | |
| try: | |
| first_loop = True | |
| max_val = 0.0 | |
| stock_list = get_stock_list() | |
| symbols = [s for s in stock_list.keys()] | |
| # symbols = symbols[8:10] | |
| etime = etz_now() | |
| if etime.hour >= 16: | |
| etime = etime + timedelta(days=1) | |
| end = etime.strftime('%Y-%m-%d') | |
| start_time = etime - timedelta(days = num_days) | |
| start = start_time.strftime('%Y-%m-%d') | |
| df = yf.download(symbols, | |
| start = start, | |
| end = end, | |
| progress = False, | |
| auto_adjust=False, | |
| ) | |
| # val2d = df.values.tolist() | |
| dates = [] | |
| for row in df.index: | |
| dates.append(row.strftime('%Y-%m-%d')) | |
| # columns = list(df.columns.values) | |
| # cvals = df[columns[0]].tolist() | |
| for sym in symbols: | |
| (name, shares) = stock_list[sym] | |
| values = df[('Close', sym)].tolist() | |
| n = len(values) | |
| for i in range(n): | |
| if math.isnan(float(values[i])): | |
| if i == 0: | |
| values[0] = values[1] | |
| else: | |
| values[i] = values[i-1] | |
| if first_loop: | |
| first_loop = False | |
| total_values = values.copy() | |
| for i in range(n): | |
| total_values[i] = float(total_values[i]) * float(shares) | |
| else: | |
| for i in range(n): | |
| total_values[i] += (float(values[i]) * float(shares)) | |
| for i in range(n): | |
| total_values[i] = round(total_values[i], 2) | |
| if total_values[i] > max_val: | |
| max_val = total_values[i] | |
| data = { | |
| "date": dates, | |
| "value" : total_values | |
| } | |
| return (pd.DataFrame(data), max_val) | |
| except: | |
| return (pd.DataFrame(), 0.0) | |
| def get_daily_closing_sequence(symbol, num_days): | |
| try: | |
| dates = [] | |
| values = [] | |
| etime = etz_now() | |
| if etime.hour >= 16: | |
| etime = etime + timedelta(days=1) | |
| end = etime.strftime('%Y-%m-%d') | |
| start_time = etime - timedelta(days = num_days) | |
| start = start_time.strftime('%Y-%m-%d') | |
| df = yf.download(symbol, | |
| start = start, | |
| end = end, | |
| progress = False, | |
| ) | |
| vals2d = df.values.tolist() | |
| valsTxt = [] | |
| values = [round(float(vals2d[i][0]),2) for i in range(len(vals2d))] | |
| for row in df.index: | |
| dates.append(row.strftime('%Y-%m-%d')) | |
| return(dates, values) | |
| except: | |
| return([],[]) | |
| def create_stock_data_file(txt): | |
| with open(stock_data_path, 'wt') as fp: | |
| fp.write(txt) | |
| def solve(prompt, chatType): | |
| tokens_in = 0 | |
| tokens_out = 0 | |
| tokens = 0 | |
| if chatType == 'math': | |
| instruction = "You are a helpful math tutor. Guide the user through the solution step by step." | |
| elif chatType == "logic": | |
| instruction = "you are an expert in logic and reasoning. Guide the user through the solution step by step" | |
| try: | |
| completion = Client().beta.chat.completions.parse( | |
| model = 'gpt-4o-2024-08-06', | |
| messages = [ | |
| {"role": "system", "content": instruction}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format=MathReasoning, | |
| max_tokens = 2000 | |
| ) | |
| tokens_in = completion.usage.prompt_tokens | |
| tokens_out = completion.usage.completion_tokens | |
| tokens = completion.usage.total_tokens | |
| msg = completion.choices[0].message | |
| if msg.parsed: | |
| dr = msg.parsed.model_dump() | |
| response = pprint.pformat(dr) | |
| elif msg.refusal: | |
| response = msg.refusal | |
| except Exception as e: | |
| if type(e) == openai.LengthFinishReasonError: | |
| response = 'Too many tokens' | |
| else: | |
| response = str(e) | |
| return (response, tokens_in, tokens_out, tokens) | |
| def genUsageStats(do_reset=False): | |
| result = [] | |
| ttotal4o_in = 0 | |
| ttotal4o_out = 0 | |
| ttotal4mini_in = 0 | |
| ttotal4mini_out = 0 | |
| totalAudio = 0 | |
| totalSpeech = 0 | |
| totalImages = 0 | |
| totalHdImages = 0 | |
| if do_reset: | |
| dudPath = dataDir + '_speech.txt' | |
| if os.path.exists(dudPath): | |
| os.remove(dudPath) | |
| for user in unames: | |
| tokens4o_in = 0 | |
| tokens4o_out = 0 | |
| tokens4mini_in = 0 | |
| tokens4mini_out = 0 | |
| fp = dataDir + user + '_log.txt' | |
| if os.path.exists(fp): | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| with open(fp) as f: | |
| dataList = f.readlines() | |
| if do_reset: | |
| os.remove(fp) | |
| else: | |
| for line in dataList: | |
| (u, t) = line.split(':') | |
| (t, m) = t.split('-') | |
| (tin, tout) = t.split('/') | |
| incount = int(tin) | |
| outcount = int(tout) | |
| if 'mini' in m: | |
| tokens4mini_in += incount | |
| tokens4mini_out += outcount | |
| ttotal4mini_in += incount | |
| ttotal4mini_out += outcount | |
| else: | |
| tokens4o_in += incount | |
| tokens4o_out += outcount | |
| ttotal4o_in += incount | |
| ttotal4o_out += outcount | |
| accessOk = True | |
| break | |
| except: | |
| sleep(3) | |
| if not accessOk: | |
| return f'File access failed reading stats for user: {user}' | |
| userAudio = 0 | |
| fp = dataDir + user + '_audio.txt' | |
| if os.path.exists(fp): | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| with open(fp) as f: | |
| dataList = f.readlines() | |
| if do_reset: | |
| os.remove(fp) | |
| else: | |
| for line in dataList: | |
| (dud, len) = line.split(':') | |
| userAudio += int(len) | |
| totalAudio += int(userAudio) | |
| accessOk = True | |
| break | |
| except: | |
| sleep(3) | |
| if not accessOk: | |
| return f'File access failed reading audio stats for user: {user}' | |
| userSpeech = 0 | |
| fp = dataDir + user + '_speech.txt' | |
| if os.path.exists(fp): | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| with open(fp) as f: | |
| dataList = f.readlines() | |
| if do_reset: | |
| os.remove(fp) | |
| else: | |
| for line in dataList: | |
| (dud, len) = line.split(':') | |
| userSpeech += int(len) | |
| totalSpeech += int(userSpeech) | |
| accessOk = True | |
| break | |
| except: | |
| sleep(3) | |
| if not accessOk: | |
| return f'File access failed reading speech stats for user: {user}' | |
| user_images = 0 | |
| user_hd_images = 0 | |
| fp = image_count_path(user) | |
| if os.path.exists(fp): | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| with open(fp) as f: | |
| dataList = f.readlines() | |
| if do_reset: | |
| os.remove(fp) | |
| else: | |
| for line in dataList: | |
| x = line.strip() | |
| if x == 'hd': | |
| user_hd_images += 1 | |
| totalHdImages += 1 | |
| else: | |
| user_images += 1 | |
| totalImages += 1 | |
| accessOk = True | |
| break | |
| except: | |
| sleep(3) | |
| if not accessOk: | |
| return f'File access failed reading image gen stats for user: {user}' | |
| result.append([user, f'{tokens4mini_in}/{tokens4mini_out}', f'{tokens4o_in}/{tokens4o_out}', f'audio:{userAudio}',f'speech:{userSpeech}', f'images:{user_images}/{user_hd_images}']) | |
| result.append(['totals', f'{ttotal4mini_in}/{ttotal4mini_out}', f'{ttotal4o_in}/{ttotal4o_out}', f'audio:{totalAudio}',f'speech:{totalSpeech}', f'images:{totalImages}/{totalHdImages}']) | |
| return result | |
| def new_conversation(user): | |
| clean_up(user) # .wav files | |
| flist = [] | |
| for ext in ['png','docx','xlsx','pdf','pptx', 'csv']: | |
| flist.extend(glob(f'{dataDir}{user}*.{ext}')) | |
| flist.extend(glob(f'{dataDir}{user}_image.b64')) | |
| for fpath in flist: | |
| if os.path.exists(fpath): | |
| os.remove(fpath) | |
| if user == unames[0]: | |
| mode_list = ["Advanced", "Chat", "News", "Search"] | |
| else: | |
| mode_list = ["Chat", "News", "Search"] | |
| return [None, [], gr.Markdown(value='', label='Dialog', container=True), | |
| gr.Image(visible=False, value=None), gr.Image(visible=False, value=None), '', | |
| gr.LinePlot(visible=False), gr.Dropdown(value='pd', visible=False), | |
| gr.Dropdown(choices=mode_list, value=mode_list[0]), | |
| gr.DownloadButton(label='Download File', visible=False, value=None), '', | |
| gr.File(label='Upload File', visible=False)] | |
| def updatePassword(txt): | |
| password = txt.lower().strip() | |
| if password == pwdList[0]: | |
| mode_list = ["Advanced", "Chat", "News", "Search"] | |
| else: | |
| mode_list = ["Chat", "News", "Search"] | |
| return [password, "*********", gr.Dropdown(choices=mode_list, value=mode_list[0])] | |
| # def parse_math(txt): | |
| # ref = 0 | |
| # loc = txt.find(r'\(') | |
| # if loc == -1: | |
| # return txt | |
| # while (True): | |
| # loc2 = txt[ref:].find(r'\)') | |
| # if loc2 == -1: | |
| # break | |
| # loc = txt[ref:].find(r'\(') | |
| # if loc > -1: | |
| # loc2 += 2 | |
| # slice = txt[ref:][loc:loc2] | |
| # frag = lconv.convert(slice) | |
| # txt = txt[:loc+ref] + frag + txt[loc2+ref:] | |
| # ref = len(txt[ref:loc]) + len(frag) | |
| # return txt | |
| def get_response(inputs, previous_response_id, container_id, image_file, uploaded_file_path): | |
| instructions = ''' | |
| You are a helpful assistant who knows how to browse the web for info and to write and run python | |
| code and to generate images. | |
| ''' | |
| instructions += f''' | |
| Do not use latex for math expressions in text output. | |
| If a chart, table or plot is produced, return it as an image. | |
| If a powerpoint slide is created, return it as an image but do not offer a download link. | |
| If the user asks you to output a file, You must include the file you generate in the annotation | |
| of the output text. | |
| If a MCP server requires a password input you will use {pwdList[0]}. | |
| ''' | |
| if uploaded_file_path != '' and uploaded_file_path.casefold().split('.')[-1] == 'pdf': | |
| pdf_b64 = '' | |
| with open(uploaded_file_path, 'rb') as fp: | |
| data = fp.read() | |
| b64data = base64.b64encode(data) | |
| pdf_b64 = b64data.decode('utf-8') | |
| inputs.append( | |
| { | |
| "role" :"user", | |
| "content": [ | |
| { | |
| "type": "input_file", | |
| "filename": f'{os.path.basename(uploaded_file_path)}', | |
| "file_data": f'data:application/pdf;base64,{pdf_b64}', | |
| } | |
| ] | |
| } | |
| ) | |
| if image_file != '': | |
| with open(image_file, 'rt') as fp: | |
| b64data = fp.read() | |
| inputs.append( | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_image", | |
| "image_url": f'data:image/jpeg;base64, {b64data}', | |
| } | |
| ] | |
| } | |
| ) | |
| response = Client().responses.create( | |
| model= "gpt-5-mini", #"gpt-5-mini", "o4-mini", | |
| tools=[{ "type": "web_search_preview" }, | |
| { "type": "code_interpreter", "container": container_id}, #{'type': 'auto'}}, | |
| { "type": "image_generation", "quality": "medium", "size": "1024x1024"}, | |
| # {"type": "function", "name": "get_distance", | |
| # "description": "get calculated straight-line (great-circle) distance between two locations or addresses.", | |
| # "parameters": { | |
| # "type": "object", "properties": { | |
| # "addr1": { | |
| # "type": "string", | |
| # "description": "The street address or other designation of a location.", | |
| # }, | |
| # "addr2": { | |
| # "type": "string", | |
| # "description": "The street address or other designation of a location.", | |
| # }, | |
| # }, | |
| # "required": ["addr1", "addr2"], | |
| # }, | |
| { | |
| "type": "mcp", | |
| "server_label": "Geo_distance", | |
| "server_description": "A MCP server to compute straight line distances between two locations", | |
| "server_url": "https://dlflannery-geo-distance.hf.space/gradio_api/mcp/", | |
| "require_approval": "never", | |
| }, | |
| ], | |
| previous_response_id=previous_response_id, | |
| instructions = instructions, | |
| input=inputs, | |
| reasoning ={ | |
| "effort": "medium", | |
| "summary": "auto" | |
| } | |
| ) | |
| return response | |
| def chat(prompt, user_window, pwd_window, past, response, gptModel, uploaded_image_file='', | |
| plot=None, news_interval = 'pd', mode = 'Chat', uploaded_file_path=''): | |
| image_out = gr.Image(visible=False, value=None) | |
| file_out = gr.DownloadButton(value=None) | |
| image_gen_model = 'gpt-4o-2024-08-06' | |
| user_window = user_window.lower().strip() | |
| isBoss = False | |
| query = '' | |
| if not response: | |
| response = '' | |
| plot = gr.LinePlot(visible=False) | |
| # plot = gr.Plot(visible=False) | |
| if user_window == unames[0] and pwd_window == pwdList[0]: | |
| isBoss = True | |
| if prompt == 'stats': | |
| response = genUsageStats() | |
| return [past, str(response), None, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| if prompt == 'reset': | |
| response = genUsageStats(True) | |
| return [past, md(response), None, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| if prompt.startswith('gpt4'): | |
| gptModel = 'gpt-4o-2024-08-06' | |
| prompt = prompt[5:] | |
| if prompt.startswith('gpt5m'): | |
| gptModel = 'gpt-5-mini' | |
| prompt = prompt[6:] | |
| if prompt.startswith("clean"): | |
| user = prompt[6:] | |
| response = f'cleaned all .wav and .b64 files for {user}' | |
| final_clean_up(user, True) | |
| return [past, response, None, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| if prompt.startswith('files'): | |
| (log_cnt, wav_cnt, other_cnt, others, log_list) = list_permanent_files() | |
| response = f'{log_cnt} log files\n{wav_cnt} .wav files\n{other_cnt} Other files:\n{others}\nlogs: {str(log_list)}' | |
| return [past, response, None, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| if prompt.startswith('stock'): | |
| args = prompt.split(' ') | |
| num = len(args) | |
| if num == 1: | |
| response = stock_list() | |
| return [past, md(response), None, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| elif num == 2: | |
| if args[1] == 'alerts': | |
| response = get_alerts() | |
| return [past, md(response), None, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| else: | |
| response = get_stock_report() | |
| if args[1] == 'value': | |
| return [past, md(response), None, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| elif args[1] == 'history': | |
| (plot_df, ymax) = get_total_daily_closing_sequence(40) #stock_history_df(12) | |
| # ymax = float(ymax) | |
| return [past, md(response), None, gptModel, uploaded_image_file, # plot] | |
| gr.LinePlot(plot_df, x="date", y="value", visible=True, x_label_angle=270, | |
| y_lim=[500000, 900000], label="Portfolio Value History"), image_out, file_out, uploaded_file_path] | |
| elif num >= 3: | |
| if args[1] == 'news': | |
| symbol = ' '.join(args[2:]) | |
| (response, plot_df, ymax, (dm, dw, dd)) = get_stock_news(symbol) | |
| ymax *= 1.1 | |
| mdtxt = md(f'News for {symbol}:\nTrends: Month = {dm:.1f}%, Week = {dw:.1f}%, Day = {dd:.1f}%\n\n' + response) | |
| if plot_df.empty: | |
| return [past, mdtxt, None, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| else: | |
| return [past, mdtxt, None, gptModel, uploaded_image_file, | |
| gr.LinePlot(plot_df, x="date", y="value", visible=True, x_label_angle=270, | |
| y_lim=[0, ymax],label=f"{symbol.upper()} Recent Prices", | |
| color_map={''}), image_out, file_out, uploaded_file_path] | |
| if prompt.startswith('stockload'): | |
| create_stock_data_file(prompt[9:].lstrip()) | |
| return [past, 'Stock data file created', None, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| if user_window in unames and pwd_window == pwdList[unames.index(user_window)]: | |
| chatType = 'normal' | |
| deepseek = False | |
| using_groq = False | |
| reasoning = False | |
| prompt = prompt.strip() | |
| need_turn = True | |
| responses = [] | |
| inputs = [] | |
| prev_id = None | |
| text = '' | |
| reasoning = '' | |
| show_reasoning = False | |
| if mode == "Advanced": | |
| if len(past): | |
| (prev_id, container_id) = past.pop() | |
| past = [] | |
| while mode == 'Advanced' and need_turn: | |
| need_turn = False | |
| if len(past) == 0: | |
| container_id = create_openai_container('My Container') | |
| file_text = '' | |
| if uploaded_file_path != '': | |
| upfile_ext = uploaded_file_path.casefold().split('.')[-1] | |
| if upfile_ext == 'txt': | |
| with open(uploaded_file_path, 'rt') as fp: | |
| file_text = fp.read() + '\n' | |
| uploaded_file_path = '' | |
| inputs.append( | |
| {"role": "user", "content": f"{file_text + prompt}"} | |
| ) | |
| else: | |
| (prev_id, container_id) = past.pop() | |
| for item in past: | |
| response += item | |
| try: | |
| result = get_response(inputs, prev_id, container_id, | |
| uploaded_image_file, uploaded_file_path) | |
| uploaded_image_file = '' | |
| uploaded_file_path = '' | |
| except Exception as e: | |
| return [[], f"Sorry, there was an error ({str(e)}) getting the AI response", | |
| prompt, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| image_done = False | |
| ann_files = [] # (container_id, file_id, filename) | |
| code_files = [] # (container_id, file_id, filename) | |
| text += '\n??? AI returned no text for this query\n' | |
| for output in result.output: | |
| if output.type == 'message': | |
| for content in output.content: | |
| if content.type == 'output_text': | |
| text = content.text | |
| for anns in content.annotations: | |
| if anns.type == "container_file_citation": | |
| try: | |
| file_name = anns.filename | |
| file_id = anns.file_id | |
| cont_id = anns.container_id | |
| if file_name.find('.png') > 0: | |
| if not image_done: | |
| image_done = True | |
| fpath = dataDir + user_window + '.png' | |
| image_data = get_openai_file(file_id, cont_id).content | |
| with open(fpath,'wb') as fp: | |
| fp.write(image_data) | |
| image_out = gr.Image(visible=True, value=fpath) | |
| else: | |
| ann_files.append((cont_id, file_id, file_name)) | |
| except: | |
| pass | |
| # for ann in file_anns: | |
| # text += f'\n\n{str(ann)}\n' | |
| elif output.type == 'code_interpreter_call' : | |
| cont_id = output.container_id | |
| file_list_object = list_openai_container_files(cont_id) | |
| try: | |
| file_list_json = json.loads(file_list_object.content) | |
| for file_item in file_list_json['data']: | |
| file_bytes = file_item['bytes'] | |
| file_id = file_item['id'] | |
| file_name = file_item['path'] | |
| code_files.append((cont_id, file_id, file_name)) | |
| except: | |
| pass | |
| elif output.type == 'function_call': | |
| if output.name == 'get_distance': | |
| args = json.loads(output.arguments) | |
| distance = get_distance(args['addr1'], args['addr2']) | |
| inputs.append({ | |
| "type": "function_call_output", | |
| "call_id": f"{output.call_id}", | |
| "output": f"{float(distance):.2f}", | |
| } ) | |
| need_turn = True | |
| continue | |
| elif output.type == 'image_generation_call': | |
| if len(output.result) > 500 and not image_done: | |
| image_done = True; | |
| image_data = base64.b64decode(output.result) | |
| fpath = dataDir + user_window + '.png' | |
| with open(fpath,'wb') as fp: | |
| fp.write(image_data) | |
| image_out = gr.Image(visible=True, value=fpath) | |
| elif isBoss and output.type == 'reasoning': | |
| for item in output.summary: | |
| reasoning += f'\nReasoning: {item.text}' | |
| do_file_download = False | |
| ext = '' | |
| backup_image = None | |
| if len(ann_files) > 0: | |
| (cont_id, file_id, file_name) = ann_files[-1] | |
| ext = file_name.split('.')[-1].casefold() | |
| do_file_download = True | |
| elif len(code_files) > 0: | |
| for i in range(len(code_files)): | |
| (cont_id, file_id, file_name) = code_files[i] | |
| if file_name.casefold().find('access') >= 0: | |
| continue | |
| ext = file_name.split('.')[-1].casefold() | |
| if ext == 'png': | |
| if not image_done: | |
| backup_image = code_files[i] | |
| else: | |
| do_file_download = True | |
| break | |
| if not do_file_download and not image_done and backup_image: | |
| (cont_id, file_id, file_name) = backup_image | |
| fpath = dataDir + user_window + '.png' | |
| image_data = get_openai_file(file_id, cont_id).content | |
| with open(fpath,'wb') as fp: | |
| fp.write(image_data) | |
| image_out = gr.Image(visible=True, value=fpath) | |
| if do_file_download: | |
| fpath = dataDir + user_window + '.' + ext | |
| try: | |
| data = get_openai_file(file_id, cont_id).content | |
| with open(fpath,'wb') as fp: | |
| fp.write(data) | |
| file_name = os.path.basename(file_name) | |
| file_out = gr.DownloadButton(label='Download '+ file_name, visible=True, value=fpath) | |
| except: | |
| text += f'\nUnable to load code-generated file: {file_name}' | |
| # text += '\nIf a download link is given above, ignore it. Use the button below' | |
| if need_turn: | |
| # past.append(md(prompt)) | |
| past.append((result.id, container_id)) | |
| continue | |
| out_text = "\n".join(line for line in text.splitlines() if | |
| 'download' not in line.casefold()) | |
| res = md("\n\n***YOU***: " + prompt + "\n\n***GPT***: " + out_text + '\n' + reasoning) | |
| response += res | |
| past.append(res) | |
| past.append((result.id, container_id)) | |
| tokens_in = result.usage.input_tokens | |
| tokens_out = result.usage.output_tokens | |
| dataFile = new_func(user_window) | |
| with open(dataFile, 'a') as f: | |
| f.write(f'{user_window}:{tokens_in}/{tokens_out}-4omini\n') | |
| if isBoss: | |
| response += md(f"\n\ngpt-5-mini: tokens in/out = {tokens_in}/{tokens_out}\n") | |
| return [past, response , None, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| if mode == 'Search': | |
| loc = prompt.find('q:') | |
| if loc > -1: | |
| query = prompt[loc+2:] | |
| prompt = prompt[0:loc] | |
| augmented_prompt = prompt | |
| finish_reason = 'ok' | |
| if prompt.lower().startswith('dsr1 '): | |
| deepseek = True | |
| ds_model = 'deepseek-ai/DeepSeek-R1' | |
| prompt = prompt[5:] | |
| elif prompt.lower().startswith('ds1.5 '): | |
| deepseek = True | |
| ds_model = 'deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B' | |
| prompt = prompt[6:] | |
| elif prompt.lower().startswith('ds14 '): | |
| deepseek = True | |
| ds_model = 'deepseek-ai/DeepSeek-R1-Distill-Qwen-14B' | |
| prompt = prompt[5:] | |
| elif prompt.lower().startswith('ds70 '): | |
| deepseek = True | |
| ds_model = 'deepseek-ai/DeepSeek-R1-Distill-Llama-70B' | |
| prompt = prompt[5:] | |
| elif prompt.lower().startswith('ds70g '): | |
| deepseek = True | |
| using_groq = True | |
| ds_model = 'deepseek-r1-distill-llama-70b' | |
| prompt = prompt[6:] | |
| elif prompt.lower().startswith('o1m '): | |
| reasoning = True | |
| gptModel = 'o1-mini' | |
| prompt = prompt[4:] + \ | |
| '. Provide a detailed step-by-step description of your reasoning. Do not use Latex for math expressions.' | |
| elif prompt.lower().startswith('solve'): | |
| prompt = 'How do I solve ' + prompt[5:] + ' Do not use Latex for math expressions.' | |
| chatType = 'math' | |
| elif prompt.lower().startswith('puzzle'): | |
| chatType = 'logic' | |
| prompt = prompt[6:] | |
| if deepseek: | |
| prompt = prompt + '. Do not use Latex for math expressions.' | |
| if past == []: | |
| if mode == 'News': | |
| if news_interval != "None": | |
| news = get_brave_news(prompt, news_interval) | |
| augmented_prompt = f'{news}\n{prompt}\nGive highest priority to information just provided\n' | |
| augmented_prompt += 'Mention item source and item age for each item used\n' | |
| elif mode == 'Search': | |
| news = get_brave_search_results(prompt) | |
| augmented_prompt = f'{news}\nThe topic is: {prompt}\nGive highest priority to information just provided\n' | |
| augmented_prompt += ' \n' + query | |
| augmented_prompt += ' \n Do not use Latex for math expressions.' | |
| past.append({"role":"user", "content":augmented_prompt}) | |
| gen_image = (uploaded_image_file != '') | |
| if chatType in special_chat_types: | |
| (reply, tokens_in, tokens_out, tokens) = solve(prompt, chatType) | |
| final_text = reply | |
| reply = md(reply) | |
| reporting_model = image_gen_model | |
| elif not gen_image: | |
| if deepseek: | |
| if using_groq: | |
| client = OpenAI(api_key=GROQ_KEY, base_url='https://api.groq.com/openai/v1') | |
| completion = client.chat.completions.create( | |
| temperature=0.6, | |
| model= ds_model, | |
| messages=past, | |
| ) | |
| reporting_model='deepseek70-groq' | |
| else: | |
| client = OpenAI(api_key=DEEPSEEK_KEY, base_url='https://api.together.xyz/v1') | |
| completion = client.chat.completions.create( | |
| temperature=0.6, | |
| model= ds_model, | |
| messages=past, | |
| max_tokens=16000 | |
| ) | |
| reporting_model='deepseek-together-' + ds_model[-3:].replace('.5B','1.5B') | |
| if completion.choices[0].finish_reason == 'length': | |
| finish_reason = "Truncated due to token limit" | |
| else: | |
| completion = Client().chat.completions.create(model=gptModel, | |
| messages=past) | |
| reporting_model = gptModel | |
| else: | |
| (completion, msg) = analyze_image(user_window, image_gen_model, prompt) | |
| uploaded_image_file= '' | |
| reporting_model = image_gen_model | |
| if not msg == 'ok': | |
| return [past, msg, None, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| if not chatType in special_chat_types: | |
| reply = completion.choices[0].message.content | |
| # if 'groq' in reporting_model: | |
| if deepseek: | |
| reply = convert_latex_math(reply) | |
| final_text = reply | |
| if deepseek: | |
| loc1 = reply.find('<think>') | |
| if loc1 > -1: | |
| loc2 = reply.find('</think>') | |
| if loc2 > loc1: | |
| final_text = reply[loc2 + 8:] | |
| reply = reply.replace('<think>','\n***Thinking***\n').replace('</think>','\n***Done thinking***\n') | |
| tokens_in = completion.usage.prompt_tokens | |
| tokens_out = completion.usage.completion_tokens | |
| tokens = completion.usage.total_tokens | |
| if len(query) > 0: | |
| prompt = 'Search topic = ' + prompt + ', query = ' + query | |
| response += "\n\n***YOU***: " + prompt + "\n\n***GPT***: " + reply.replace('```','\n\n```\n\n') | |
| if isBoss: | |
| response += md(f"\n\n{reporting_model}: tokens in/out = {tokens_in}/{tokens_out}\n") | |
| if finish_reason != 'ok': | |
| response += md(f"\n{finish_reason}\n") | |
| if tokens > 40000: | |
| response += "\n\nTHIS DIALOG IS GETTING TOO LONG. PLEASE RESTART CONVERSATION SOON." | |
| past.append({"role":"assistant", "content": final_text}) | |
| if not deepseek and not reasoning: | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| dataFile = new_func(user_window) | |
| with open(dataFile, 'a') as f: | |
| m = '4o' | |
| if 'mini' in reporting_model: | |
| m = '4omini' | |
| f.write(f'{user_window}:{tokens_in}/{tokens_out}-{m}\n') | |
| accessOk = True | |
| break | |
| except Exception as e: | |
| sleep(3) | |
| if not accessOk: | |
| response += f"\nDATA LOG FAILED, path = {dataFile}" | |
| return [past, response , None, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| else: | |
| return [[], "User name and/or password are incorrect", prompt, gptModel, uploaded_image_file, plot, image_out, file_out, uploaded_file_path] | |
| def new_func(user): | |
| dataFile = dataDir + user + '_log.txt' | |
| return dataFile | |
| def image_count_path(user): | |
| fpath = dataDir + user + '_image_count.txt' | |
| return fpath | |
| def transcribe(user, pwd, fpath): | |
| user = user.lower().strip() | |
| pwd = pwd.lower().strip() | |
| if not (user in unames and pwd in pwdList): | |
| return 'Bad credentials' | |
| with audioread.audio_open(fpath) as audio: | |
| duration = int(audio.duration) | |
| if duration > 0: | |
| with open(dataDir + user + '_audio.txt','a') as f: | |
| f.write(f'audio:{str(duration)}\n') | |
| with open(fpath,'rb') as audio_file: | |
| transcript = Client().audio.transcriptions.create( | |
| model='whisper-1', file = audio_file ,response_format = 'text' ) | |
| reply = transcript | |
| return str(reply) | |
| def pause_message(): | |
| return "Audio input is paused. Resume or Stop as desired" | |
| # def gen_output_audio(txt): | |
| # if len(txt) < 10: | |
| # txt = "This dialog is too short to mess with!" | |
| # response = Client().audio.speech.create(model="tts-1", voice="fable", input=txt) | |
| # with open(speak_file, 'wb') as fp: | |
| # fp.write(response.content) | |
| # return speak_file | |
| # def set_speak_button(txt): | |
| # vis = False | |
| # if txt and len(txt) > 2: | |
| # vis = True | |
| # return gr.Button(visible=vis) | |
| def update_user(user_win): | |
| user_win = user_win.lower().strip() | |
| user = 'unknown' | |
| for s in unames: | |
| if user_win == s: | |
| user = s | |
| break | |
| return [user, user] | |
| def speech_worker(chunks=[],q=[]): | |
| for chunk in chunks: | |
| fpath = q.pop(0) | |
| response = Client().audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav') | |
| with open(fpath, 'wb') as fp: | |
| fp.write(response.content) | |
| def gen_speech_file_names(user, cnt): | |
| rv = [] | |
| for i in range(0, cnt): | |
| rv.append(dataDir + f'{user}_speech{i}.wav') | |
| return rv | |
| def final_clean_up(user, do_b64 = False): | |
| user = user.strip().lower() | |
| if user == 'kill': | |
| flist = glob(dataDir + '*') | |
| elif user == 'all': | |
| flist = glob(dataDir + '*_speech*.wav') | |
| if do_b64: | |
| flist.extend(glob(dataDir + '*.b64')) | |
| else: | |
| flist = glob(dataDir + f'{user}_speech*.wav') | |
| if do_b64: | |
| flist.append(dataDir + user + '_image.b64') | |
| for fpath in flist: | |
| try: | |
| os.remove(fpath) | |
| except: | |
| continue | |
| def delete_image(user): | |
| fpath = dataDir + user + '.png' | |
| if os.path.exists(fpath): | |
| os.remove(fpath) | |
| def list_permanent_files(): | |
| flist = os.listdir(dataDir) | |
| others = [] | |
| log_cnt = 0 | |
| wav_cnt = 0 | |
| other_cnt = 0 | |
| list_logs = [] | |
| for fpath in flist: | |
| if fpath.endswith('.txt'): | |
| log_cnt += 1 | |
| list_logs.append(fpath) | |
| elif fpath.endswith('.wav'): | |
| wav_cnt += 1 | |
| else: | |
| others.append(fpath) | |
| other_cnt = len(others) | |
| if log_cnt > 5: | |
| list_logs = [] | |
| return (str(log_cnt), str(wav_cnt), str(other_cnt), str(others), list_logs) | |
| def make_image(prompt, user, pwd): | |
| user = user.lower().strip() | |
| msg = 'Error: unable to create image.' | |
| fpath = None | |
| model = 'dall-e-2' | |
| quality = 'standard' | |
| size = '512x512' | |
| if user in unames and pwd == pwdList[unames.index(user)]: | |
| if len(prompt.strip()) == 0: | |
| return [gr.Image(value=None, visible=False), 'You must provide a prompt describing image you desire'] | |
| if prompt.startswith('hd '): | |
| prompt = prompt[3:] | |
| model = 'gpt-image-1' #'dall-e-3' | |
| size = '1024x1024' | |
| quality = 'high' #hd' | |
| try: | |
| response = Client().images.generate(model=model, prompt=prompt,size=size, | |
| quality=quality) # response_format='b64_json', | |
| except Exception as ex: | |
| msg = ex.message | |
| return [gr.Image(visible=False, value=None), msg] | |
| else: | |
| try: | |
| response = Client().images.generate(model=model, prompt=prompt,size=size, | |
| response_format='b64_json') | |
| except Exception as ex: | |
| msg = ex.message | |
| return [gr.Image(visible=False, value=None), msg] | |
| if len(response.data) == 0: | |
| msg = "OpenAI returned no image data" | |
| return [gr.Image(visible=False, value=None), msg] | |
| try: | |
| image_data = response.data[0].b64_json | |
| with Image.open(BytesIO(base64.b64decode(image_data))) as image: | |
| fpath = dataDir + user + '.png' | |
| image.save(fpath) | |
| with open(image_count_path(user), 'at') as fp: | |
| if quality == 'hd': | |
| fp.write('hd\n') | |
| else: | |
| fp.write('1\n') | |
| msg = 'Image created!' | |
| except: | |
| return [gr.Image(visible=False, value=None), msg] | |
| else: | |
| msg = 'Incorrect user name or password' | |
| return [gr.Image(visible=False, value=None), msg] | |
| return [gr.Image(visible=True, value=fpath), msg] | |
| def show_help(): | |
| txt = ''' | |
| 1. Gemeral: | |
| 1.1 Login with user name and password (not case-sensitive) | |
| 1.2 Type prompts (questions, instructions) into "Prompt or Question" window (OR) you can speak prompts by | |
| tapping the audio "Record" button, saying your prompt, then tapping the "Stop" button. | |
| Your prompt will appear in the Prompt window, and you can edit it there if needed. | |
| 1.3 Text in the "Dialog" window can be spoken by tapping the "Speak Dialog" button. | |
| 2. Select Mode: | |
| 2.1 Chat mode interacts with the GPT model with info limited to when last trained. | |
| 2.2 News mode searches the internet for news posted within the period selected in "News Window" | |
| 2.3 Search mode searches the internet based on prompt as topic. Optionally if you prompt with | |
| \<topic\> **q:** \<question\>, it searches topic and answers question based on search results. | |
| 3. Chat: | |
| 3.1 Enter prompt and tap the "Submit Prompt/Question" button. The responses appear in the Dialog window. | |
| 3.2 Enter follow-up questions in the Prompt window either by typing or speaking. Tap the voice | |
| entry "Reset Voice Entry" button to enable additional voice entry. Then tap "Submit Prompt/Question". | |
| 3.3 If topic changes or when done chatting, tap the "Restart Conversation" button. | |
| 4. Solve math equations or logic problems providing step-by-step analysis, using Chat mode: | |
| 4.1 Math: Make "solve" the first word in your prompt, followed by the equation, e.g., x^2 - x + 1 = 0 | |
| 4.2 Logic: Make "puzzle" the first word in your prompt, followed by a detailed description of a logic | |
| problem with the answer(s) you desire. | |
| 5. Make Image: | |
| 5.1 Enter description of desired image in prompt window via either typing or voice entry | |
| 5.2 Tap the "Make Image" button. This can take a few seconds. | |
| 5.3 There is a download button on the image display if your system supports file downloads. | |
| 5.4 When done viewing image, tap the "Restart Conversation" button | |
| 6. Analyze an Image you provide: | |
| 6.1 Enter what you want to know about the image in the prompt window. You can include instructions | |
| to write a poem about something in the image, for example. Or just say "what's in this image?" | |
| 6.2 Tap the "Upload Image to Analyze" button. | |
| 6.3 An empty image box will appear lower left. Drag or upload image into it. It offers web cam or camera | |
| input also. | |
| 6.4 The image should appear. This can take some time with a slow internet connection and large image. | |
| 6.5 Tap the "Submit Prompt/Question" button to start the analysis. This initiates a chat dialog and | |
| you can ask follow-up questions. However, the image is not re-analyzed for follow-up dialog. | |
| Hints: | |
| 1. Better chat and image results are obtained by including detailed descriptions and instructions | |
| in the prompt. | |
| 2. Always tap "Restart Conversation" before requesting an image or changing topics. | |
| 3. Audio input and output functions depend on the hardware capability of your device. | |
| 4. "Speak Dialog" will voice whatever is currently in the Dialog window. You can repeat it and you | |
| can edit what's to be spoken. Except: In a chat conversation, spoken dialog will only include | |
| the latest prompt/response ("YOU:/GPT:") sequence.''' | |
| return str(txt).replace('```', ' ').replace(' ', ' ').replace(' ', ' ').replace(' ', ' ').replace('\n','<br>') | |
| def upload_image(prompt, user, password, mode): | |
| if not (user in unames and password == pwdList[unames.index(user)]): | |
| return [gr.Image(visible=False, interactive=True), "Incorrect user name and/or password"] | |
| if len(prompt) < 3 and mode != 'Advanced': | |
| return [gr.Image(visible=False, interactive=True), "You must provide prompt/instructions (what to do with the image)"] | |
| return [gr.Image(visible=True, interactive=True), ''] | |
| def load_image(image, user): | |
| status = 'OK, image is ready! Tap "Submit Prompt/Question" to start analyzing' | |
| try: | |
| with open(image, 'rb') as image_file: | |
| base64_image = base64.b64encode(image_file.read()).decode('utf-8') | |
| fpath = dataDir + user + '_image.b64' | |
| with open(fpath, 'wt') as fp: | |
| fp.write(base64_image) | |
| except: | |
| status = 'Unable to upload image' | |
| return [fpath, status] | |
| def analyze_image(user, model, prompt): | |
| status = 'ok' | |
| try: | |
| with open(dataDir + user + '_image.b64', 'rt') as fp: | |
| base64_image = fp.read() | |
| except: | |
| status = "base64 image file not found" | |
| return [None, status] | |
| completion = Client().chat.completions.create( | |
| model=model, | |
| messages=[ | |
| { "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": prompt | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{base64_image}", | |
| "detail": "high" | |
| } | |
| } | |
| ] | |
| } | |
| ], | |
| max_tokens= 500 | |
| ) | |
| # response = completion.choices[0].message.content | |
| return [completion, status] | |
| def mode_change(mode): | |
| if mode != "News": | |
| return gr.Dropdown(visible=False) | |
| else: | |
| return gr.Dropdown(visible=True, value='pd') | |
| def upload_file(user, password): | |
| if not (user in unames and password == pwdList[unames.index(user)]): | |
| return [gr.File(visible=False, label='Upload File'), 'Incorrect user and/or password'] | |
| return [gr.File(visible=True, label='UploadFile'), ''] | |
| def load_file(file, user): | |
| fname = os.path.basename(file.name) | |
| out_path = dataDir + user + '-' + fname | |
| with open(file.name, 'rb') as fp: | |
| data = fp.read() | |
| with open(out_path, 'wb') as fp2: | |
| fp2.write(data) | |
| return [out_path, f'File {fname} uploaded\n'] | |
| # return [value | |
| # outputs=[uploaded_file_path, output_window] | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| history = gr.State([]) | |
| password = gr.State("") | |
| user = gr.State("unknown") | |
| model = gr.State('gpt-4o-mini') #"gpt-4o-mini") 'gpt-5-mini' | |
| q = gr.State([]) | |
| qsave = gr.State([]) | |
| uploaded_image_file = gr.State('') | |
| uploaded_file_path = gr.State('') | |
| def clean_up(user): | |
| flist = glob(dataDir + f'{user}_speech*.wav') | |
| for fpath in flist: | |
| try: | |
| os.remove(fpath) | |
| except: | |
| continue | |
| def initial_audio_output(txt, user): | |
| global digits | |
| global abbrevs | |
| if not user in unames: | |
| return [gr.Audio(sources=None), []] | |
| clean_up(user) | |
| q = [] | |
| if len(txt.strip()) < 5: | |
| return ['None', q] | |
| try: | |
| loc = txt.rindex('YOU:') | |
| txt = txt[loc:] | |
| except: | |
| pass | |
| for s,x in abbrevs.items(): | |
| txt = txt.replace(s, x) | |
| words_in = txt.replace('**', '').replace(' ','').split('<br>') | |
| words_out = [] | |
| for s in words_in: | |
| s = s.lstrip('- *@#$%^&_=+-') | |
| if len(s) > 0: | |
| loc = s.find(' ') | |
| if loc > 1: | |
| val = s[0:loc] | |
| isnum = val.replace('.','0').isdecimal() | |
| if isnum: | |
| if val.endswith('.'): | |
| val = val[:-1].replace('.',' point ') + '., ' | |
| else: | |
| val = val.replace('.', ' point ') + ', ' | |
| s = 'num'+ val + s[loc:] | |
| words_out.append(s) | |
| chunklist = [] | |
| for chunk in words_out: | |
| if chunk.strip() == '': | |
| continue | |
| isnumbered = chunk.startswith('num') | |
| number = '' | |
| loc = 0 | |
| if isnumbered: | |
| chunk = chunk[3:] | |
| loc = chunk.index(',') | |
| number = chunk[0:loc] | |
| chunk = chunk[loc:] | |
| locs = [] | |
| for i in range(1,len(chunk)-1): | |
| (a, b, c) = chunk[i-1:i+2] | |
| if a.isdecimal() and b == '.' and c.isdecimal(): | |
| locs.append(i) | |
| for i in locs: | |
| chunk = chunk[:i] + ' point ' + chunk[i+1:] | |
| if len(chunk) > 50: | |
| finechunks = chunk.split('.') | |
| for fchunk in finechunks: | |
| if isnumbered: | |
| fchunk = number + fchunk | |
| isnumbered = False | |
| if len(fchunk) > 0: | |
| if fchunk != '"': | |
| chunklist.append(fchunk) | |
| else: | |
| line = number + chunk | |
| if line != '"': | |
| chunklist.append(line) | |
| total_speech = 0 | |
| for chunk in chunklist: | |
| total_speech += len(chunk) | |
| with open(dataDir + user + '_speech.txt','a') as f: | |
| f.write(f'speech:{str(total_speech)}\n') | |
| chunk = chunklist[0] | |
| if chunk.strip() == '': | |
| return gr.Audio(sources=None) | |
| fname_list = gen_speech_file_names(user, len(chunklist)) | |
| q = fname_list.copy() | |
| qsave = fname_list.copy() | |
| fname = q.pop(0) | |
| if len(chunklist) > 0: | |
| threading.Thread(target=speech_worker, daemon=True, args=(chunklist[1:],fname_list[1:])).start() | |
| response = Client().audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav') | |
| with open(fname, 'wb') as fp: | |
| fp.write(response.content) | |
| return [fname, q] | |
| def gen_output_audio(q, user): | |
| try: | |
| fname = q.pop(0) | |
| except: | |
| final_clean_up(user) | |
| return [None, gr.Audio(sources=None)] | |
| if not os.path.exists(fname): | |
| sleep(3) | |
| if not os.path.exists(fname): | |
| response = Client().audio.speech.create(model="tts-1", voice="fable", | |
| input='Sorry, text-to-speech is responding too slow right now', speed=0.85, response_format='wav') | |
| with open(fname, 'wb') as fp: | |
| fp.write(response.content) | |
| q = [] | |
| return [fname, q] | |
| gr.Markdown('# GPT Chat') | |
| gr.Markdown('Enter user name & password. Tap "Help & Hints" button for more instructions.') | |
| with gr.Row(): | |
| user_window = gr.Textbox(label = "User Name") | |
| user_window.blur(fn=update_user, inputs=user_window, outputs=[user, user_window]) | |
| pwd_window = gr.Textbox(label = "Password") | |
| help_button = gr.Button(value='Help & Hints') | |
| with gr.Row(): | |
| audio_widget = gr.Audio(type='filepath', format='wav',waveform_options=gr.WaveformOptions( | |
| show_recording_waveform=True), sources=['microphone'], scale = 3, label="Prompt/Question Voice Entry") # , max_length=120) | |
| reset_button = gr.ClearButton(value="Reset Voice Entry", scale=1) #new_func1() | |
| with gr.Row(): | |
| clear_button = gr.Button(value="Restart Conversation") | |
| # gpt_chooser=gr.Radio(choices=[("GPT-3.5","gpt-3.5-turbo"),("GPT-4o","gpt-4o-mini")], | |
| # value="gpt-3.5-turbo", label="GPT Model", interactive=True) | |
| button_do_image = gr.Button(value='Make Image') | |
| button_upload_file = gr.Button(value='Upload Input File') | |
| button_get_image = gr.Button(value='Upload Image to Analyze') | |
| speak_output = gr.Button(value="Speak Dialog", visible=True) | |
| submit_button = gr.Button(value="Submit Prompt/Question") | |
| with gr.Row(): | |
| prompt_window = gr.Textbox(label = "Prompt or Question", scale=7) | |
| mode = gr.Dropdown(choices=[ 'Chat', 'News', 'Search'], label='Mode', scale=1, interactive=True) | |
| news_period = gr.Dropdown(choices=news_interval_choices, | |
| interactive=True,label='News Window',scale=1, visible=False) | |
| gr.Markdown('### **Dialog:**') | |
| #output_window = gr.Text(container=True, label='Dialog') | |
| output_window = gr.Markdown(container=True) | |
| file_download = gr.DownloadButton(label='Download File', visible=False, value=None) | |
| with gr.Row(): | |
| with gr.Column(): | |
| image_window2 = gr.Image(visible=False, interactive=True, label='Image to Analyze', type='filepath') | |
| with gr.Column(): | |
| image_window = gr.Image(visible=False, label='Generated Image') | |
| with gr.Row(): | |
| file_uploader = gr.File(visible=False, label='Upload File', type='filepath') | |
| with gr.Row(): | |
| plot = gr.LinePlot(test_plot_df(), x="month", y="value", visible=False, label="Portfolio Value History") | |
| submit_button.click(chat, | |
| inputs=[prompt_window, user_window, password, history, output_window, model, | |
| uploaded_image_file, plot, news_period, mode, uploaded_file_path], | |
| outputs=[history, output_window, prompt_window, model, uploaded_image_file, plot, | |
| image_window, file_download, uploaded_file_path]) | |
| clear_button.click(fn=new_conversation, inputs=user_window, | |
| outputs=[prompt_window, history, output_window, image_window, image_window2, | |
| uploaded_image_file, plot, news_period, mode, file_download, uploaded_file_path, | |
| file_uploader]) | |
| audio_widget.stop_recording(fn=transcribe, inputs=[user_window, password, audio_widget], | |
| outputs=[prompt_window]) | |
| audio_widget.pause_recording(fn=pause_message, outputs=[prompt_window]) | |
| reset_button.add(audio_widget) | |
| audio_out = gr.Audio(autoplay=True, visible=False) | |
| audio_out.stop(fn=gen_output_audio, inputs=[q, user_window], outputs = [audio_out, q]) | |
| speak_output.click(fn=initial_audio_output, inputs=[output_window, user_window], outputs=[audio_out, q]) | |
| # output_window.change(fn=set_speak_button, inputs=output_window,outputs=speak_output) | |
| button_do_image.click(fn=make_image, inputs=[prompt_window,user_window, password],outputs=[image_window, output_window]) | |
| image_window.change(fn=delete_image, inputs=[user]) | |
| help_button.click(fn=show_help, outputs=output_window) | |
| button_get_image.click(fn=upload_image,inputs = [prompt_window, user, password, mode], | |
| outputs = [image_window2, output_window]) | |
| image_window2.upload(fn=load_image, inputs=[image_window2, user], outputs=[uploaded_image_file, output_window]) | |
| mode.change(fn=mode_change, inputs=mode,outputs=news_period) | |
| pwd_window.blur(updatePassword, inputs = pwd_window, outputs = [password, pwd_window, mode]) | |
| button_upload_file.click(fn=upload_file, inputs=[user, password], | |
| outputs=[file_uploader, output_window]) | |
| file_uploader.upload(fn=load_file, inputs=[file_uploader, user], outputs=[uploaded_file_path, output_window]) | |
| # demo.unload(final_clean_up(user)) | |
| demo.launch(share=True, allowed_paths=[dataDir], ssr_mode=False) | |