import gradio as gr from twelvedata import TDClient import os import pandas as pd import json import yfinance as yf import math import time import datetime # from dotenv import load_dotenv # load_dotenv('.env') def valid_ticker(td, ticker): try: data = td.get_logo(symbol = ticker).as_json() return True except: data = yf.Ticker(ticker).info if len(data) != 1: return True else: gr.Warning("Please enter valid ticker!") return False def get_enterprise_value(td, ticker): try: valuation = td.get_statistics(symbol = ticker).as_json() enterprise_value = valuation['statistics']['valuations_metrics'].get('enterprise_value', 0) return enterprise_value except: comp = yf.Ticker(ticker).info enterprise_value = comp.get("enterpriseValue", 0) return enterprise_value def get_market_cap(td, ticker): try: valuation = td.get_statistics(symbol = ticker).as_json() maret_cap = valuation['statistics']['valuations_metrics'].get('market_capitalization', 0) return maret_cap except: comp = yf.Ticker(ticker).info market_cap = comp.get("marketCap", 0) return market_cap def get_total_revenue(td, ticker): try: income_statement = td.get_income_statement(symbol = ticker).as_json() total_revenue = income_statement['income_statement'][0]['sales'] if total_revenue != None: return int(total_revenue) else: return int(income_statement['income_statement'][1]['sales']) except: comp = yf.Ticker(ticker).income_stmt total_revenue = comp.iloc[:, 0]['Total Revenue'] if not math.isnan(total_revenue): return int(total_revenue) else: return int(comp.iloc[:, 1]['Total Revenue']) def get_selling_general_admin_exp(td, ticker): try: income_statement = td.get_income_statement(symbol = ticker).as_json() selling_general_admin_exp = income_statement['income_statement'][0]['operating_expense']['selling_general_and_administrative'] if selling_general_admin_exp != None: return int (selling_general_admin_exp) else: return income_statement['income_statement'][1]['operating_expense']['selling_general_and_administrative'] except: comp = yf.Ticker(ticker).income_stmt selling_general_admin_exp = comp.iloc[:, 0]['Selling General And Administration'] if not math.isnan(selling_general_admin_exp): return int(selling_general_admin_exp) else: return comp.iloc[:, 1]['Selling General And Administration'] def get_valuation(td, ticker, driver): try: valuation = td.get_statistics(symbol = ticker).as_json() currency_symbol = valuation['meta'].get('currency') company_name = valuation['meta'].get('name') except: comp = yf.Ticker(ticker).info currency_symbol = comp.get('currency') company_name = comp.get('shortName') if driver == 'Market Capitalization': market_cap = get_market_cap(td, ticker) if market_cap is None: market_cap = 0 return market_cap, currency_symbol, company_name elif driver == 'Enterprise Value': enterprise_value = get_enterprise_value(td, ticker) if enterprise_value is None: enterprise_value = 0 return enterprise_value, currency_symbol, company_name def get_industry_name(td, ticker): if ticker == 'AAPL': return 'Information Technology' try: profile = td.get_profile(symbol = ticker).as_json() industry = profile.get('industry') if '—' in industry: industry = industry.replace('—', ' - ') except: comp = yf.Ticker(ticker).info industry = comp.get('industry') query = f"SELECT ydc_template FROM kb.ai_industry_mapping aim WHERE twelve_data_industry = '{industry}'" df = pd.read_sql(query, connection_string) ydc_template = df['ydc_template'][0] return ydc_template def get_balance_sheet(td, ticker): try: values = td.get_balance_sheet(symbol = ticker).as_json() goodwill = values['balance_sheet'][0]['assets']['non_current_assets']['goodwill'] shareholders_equity = values['balance_sheet'][0]['shareholders_equity']['total_shareholders_equity'] return shareholders_equity, goodwill except: comp = yf.Ticker(ticker).balance_sheet try: goodwill = int(comp.iloc[:, 0]['Goodwill']) except: goodwill = 0 shareholders_equity = int(comp.iloc[:, 0]['Total Equity Gross Minority Interest']) return shareholders_equity, goodwill def get_data_valuation_from_industry(td, connection_string, ticker): ydc_template = get_industry_name(td, ticker) if ydc_template == 'Thrifts & Mortgage Finance': industry_query = f"SELECT dmi_driver, avg_dmi FROM kb.template_dmi WHERE template_name = 'Banks'" else: industry_query = f"SELECT dmi_driver, avg_dmi FROM kb.template_dmi WHERE template_name = '{ydc_template}'" industry = pd.read_sql(industry_query, connection_string) driver = industry['dmi_driver'][0] industry_dmi = industry['avg_dmi'][0] driver_valuation, currency_code, company_name = get_valuation(td, ticker, driver) data_valuation = driver_valuation * industry_dmi return int(data_valuation), industry_dmi, ydc_template def get_data_value(connection_string, ticker): query = f"SELECT data_value, dmi FROM kb.company_dmi WHERE company_ticker = '{ticker}'" df = pd.read_sql(query, connection_string) if df.empty: data_valuation, company_dmi, ydc_template = get_data_valuation_from_industry(td, connection_string, ticker) return data_valuation, company_dmi, False else: return df['data_value'][0], df['dmi'][0], True def get_value_by_key(json_data, key): if isinstance(json_data, dict): if key in json_data: return json_data[key] for value in json_data.values(): result = get_value_by_key(value, key) if result is not None: return result elif isinstance(json_data, list): for item in json_data: result = get_value_by_key(item, key) if result is not None: return result return None def get_banks_average_dmi(connection_string, ydc_template = 'Banks'): banks_query = f"SELECT avg_dmi FROM kb.template_dmi WHERE template_name = 'Banks'" banks_data = pd.read_sql(banks_query, connection_string) avg_dmi = banks_data['avg_dmi'][0] return avg_dmi def get_data_products_driver(connection_string, id, ticker, data_valuation, type, sum_of_other_products_data, sum_of_other_products_ai): driver_query = f"SELECT * FROM kb.master_data_and_ai_products WHERE id = '{id}'" driver_json = pd.read_sql(driver_query, connection_string).to_json(orient = 'records') driver_json = json.loads(driver_json) driver_code_value_json = {} for driver in driver_json: driver.pop('id') if driver['driver_value'] == None: if driver['driver_name'] == 'market_capitalization': driver['driver_value'] = get_market_cap(td, ticker) elif driver['driver_name'] == 'enterprise_value': driver['driver_value'] = get_enterprise_value(td, ticker) elif driver['driver_name'] == 'total_revenue' or driver['driver_name'] == 'sales': driver['driver_value'] = get_total_revenue(td, ticker) elif driver['driver_name'] == 'selling_general_admin_exp': driver['driver_value'] = get_selling_general_admin_exp(td, ticker) elif driver['driver_name'] == 'total_data_valuation': driver['driver_value'] = data_valuation elif driver['driver_name'] == 'sum_of_other_products' and type == 'Data': driver['driver_value'] = sum_of_other_products_data elif driver['driver_name'] == 'sum_of_other_products' and type == 'AI': driver['driver_value'] = sum_of_other_products_ai elif driver['driver_name'] == 'banks_average_dmi': driver['driver_value'] = get_banks_average_dmi(connection_string, 'Banks') else: driver_value_query = f"SELECT details FROM kb.company_dmi WHERE company_ticker = '{ticker}'" driver_value = pd.read_sql(driver_value_query, connection_string).to_json(orient = 'records') driver['driver_value'] = get_value_by_key(eval(driver_value)[0]['details'], driver['driver_name']) driver_code_value_json[driver['driver_code']] = driver['driver_value'] return driver_json, driver_code_value_json def get_data_products_with_drivers(connection_string, ticker, present_in_database, data_valuation): ydc_template = get_industry_name(td, ticker) filter_column = 'in_db' if present_in_database else 'out_db' industries_query = f"SELECT * FROM kb.data_and_ai_products WHERE industry = '{ydc_template}' AND {filter_column} = 'Y'" df = pd.read_sql(industries_query, connection_string) if df.empty: return None, 0 data_products = [] sum_of_other_products_data = 0 sum_of_other_products_ai = 0 for index, row in df.iterrows(): id, product_name, product_type, forumla = row['id'], row['product_name'], row['product_type'], row['formula'] driver_json, driver_code_value_json = get_data_products_driver(connection_string, id, ticker, data_valuation, product_type, sum_of_other_products_data, sum_of_other_products_ai) product_value = int(eval(forumla, driver_code_value_json)) if product_type == 'Data': sum_of_other_products_data += product_value else: sum_of_other_products_ai += product_value data_products.append({"product_name": product_name, "product_type": product_type, "product_value": product_value, "formula": forumla, "drivers": driver_json}) return data_products, sum_of_other_products_ai def get_data_products(td, connection_string, ticker, present_in_database): if present_in_database: data_products_query = f"SELECT (json_each(details -> 'analysis_data_product')).key AS json_key, (json_each(details -> 'analysis_data_product')).value AS json_value FROM kb.company_dmi WHERE company_ticker = '{ticker.upper()}'" df = pd.read_sql(data_products_query, connection_string) data_products = [] for count in range(len(df)): product_value = df['json_value'][count] object = { "product_name": df['json_key'][count].replace('_', ' '), "type": "Data", "product_value": int(product_value) } data_products.append(object) else: data_valuation, industry_dmi, ydc_template = get_data_valuation_from_industry(td, connection_string, ticker) data_products_query = f"SELECT data_product, data_valuation_driver, percentage, data_product_type FROM kb.ai_data_product_mapping WHERE ydc_template = '{ydc_template}'" df = pd.read_sql(data_products_query, connection_string) json_data = df.to_json(orient = 'records') data_products = json.loads(json_data) for data_product in data_products: data_product['product_name'] = data_product.pop('data_product') data_valuation_driver = data_product.pop('data_valuation_driver') percentage = data_product.pop('percentage') data_product.pop('data_product_type') data_product['type'] = 'Data' if data_valuation_driver in ['Market Capitalization', 'Enterprise Value']: data_product_driver_valuation, currency_symbol, company_name = get_valuation(td, ticker, data_valuation_driver) data_product['product_value'] = int(data_product_driver_valuation * percentage) elif data_valuation_driver == 'Residual': valuation = 0 for product_valuation in data_products: valuation += int(product_valuation.get('product_value')) if product_valuation.get('product_value', 0) != 0 else 0 data_product['product_value'] = int(data_valuation - valuation) elif data_valuation_driver == 'Data Value': data_product['product_value'] = int(data_valuation * percentage) return data_products def get_industries_dmi(connection_string): industries_dmi_query = f"SELECT template_name as industry, avg_dmi as dmi FROM kb.template_dmi" df = pd.read_sql(industries_dmi_query, connection_string) industries_dmi = {} for count in range(len(df)): industries_dmi[df['industry'][count]] = df['dmi'][count] if not math.isnan(df['dmi'][count]) else None return dict(industries_dmi) def get_sectors_dmi(connection_string): sectors_dmi_query = f"SELECT sector, avg_dmi FROM kb.sector_dmi" df = pd.read_sql(sectors_dmi_query, connection_string) sectors_dmi = {} for count in range(len(df)): sectors_dmi[df['sector'][count]] = df['avg_dmi'][count] if not math.isnan(df['avg_dmi'][count]) else None return dict(sectors_dmi) def get_platform_data_valution(ticker, aws_percentage, azure_percentage, databricks_percentage, google_cloud_percentage, snowflake_percentage, other_cloud_platforms_percentage, on_premise_percentage, request: gr.Request): if ticker in [None, '']: gr.Warning("Please fill the ticker!") else: ticker = ticker.upper() user_id = get_public_ip(request) max_requests = 20 # Maximum number of requests timeframe = 24 * 3600 # Timeframe in seconds (24 hour) if rate_limit_check(user_id, max_requests, timeframe): pass else: raise gr.Error("Maximum number of hits exceeded!") if valid_ticker(td, ticker): if aws_percentage + azure_percentage + databricks_percentage + google_cloud_percentage + snowflake_percentage + other_cloud_platforms_percentage + on_premise_percentage != 100: raise gr.Error("Sum of all platform percentages should be equal to 100!") platforms = { "aws": aws_percentage, "azure": azure_percentage, "databricks": databricks_percentage, "google_cloud": google_cloud_percentage, "snowflake": snowflake_percentage, "other_cloud_platforms": other_cloud_platforms_percentage, "on_premise": on_premise_percentage } market_cap, currency_code, company_name = get_valuation(td, ticker, driver = 'Market Capitalization') enterprise_value = get_enterprise_value(td, ticker) data_valuation, company_dmi, present_in_database = get_data_value(connection_string, ticker) data_products, sum_of_ai_products = get_data_products_with_drivers(connection_string, ticker, present_in_database, data_valuation) if data_products == None: data_products = get_data_products(td, connection_string, ticker, present_in_database) platforms_percentage = {} for platform, percentage in platforms.items(): if percentage == 0: continue else: platforms_percentage[f"{platform}_data_valuation"] = int(data_valuation * percentage / 100) shareholders_equity, goodwill = get_balance_sheet(td, ticker) try: tangible_net_worth = shareholders_equity - goodwill except: tangible_net_worth = shareholders_equity unattributed_intangibles = market_cap - tangible_net_worth brand = int(0.345 * unattributed_intangibles) industries_dmi = get_industries_dmi(connection_string) sectors_dmi = get_sectors_dmi(connection_string) result = { "info": { "company": company_name, "ticker": ticker, "currency": currency_code, }, "company_dmi": company_dmi, "enterprise_value": enterprise_value, "market_cap": market_cap, "data_valuation": data_valuation, "ai_valuation": sum_of_ai_products, "data_products": data_products, "platform_data_valuation": platforms_percentage, "financial": { "shareholders_equity": shareholders_equity if shareholders_equity != None else None, "goodwill": goodwill if goodwill != None else None, "tangible_net_worth": tangible_net_worth if tangible_net_worth != None else None, "unattributed_intangibles": unattributed_intangibles, "brand": brand }, "industries_dmi": industries_dmi, "sectors_dmi": sectors_dmi, "note": { "disclaimer": "YDC analysis, U.S.-listed companies only" } } return json.dumps(result, indent = 4) TWELVEDATA_API = os.environ.get('twelvedata_api') DB_USER = os.environ.get("db_user") DB_PASSWORD = os.environ.get("db_password") DB_HOST = os.environ.get("db_host") PORT = os.environ.get("db_port") DB_NAME = os.environ.get("db_name") td = TDClient(apikey = TWELVEDATA_API) connection_string = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}?port={PORT}&dbname={DB_NAME}" # Function to retrieve public IP address def get_public_ip(request: gr.Request): ip_address = None headers = dict(request.headers) ip_address = headers['x-forwarded-for'] return ip_address # Rate-limiting mechanism user_requests = {} def rate_limit_check(user_id, max_requests, timeframe): current_time = time.time() timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") if user_id in user_requests: requests = user_requests[user_id] if len(requests) >= max_requests: # Check if the oldest request is within the timeframe if current_time - requests[0] < timeframe: return False # Rate limit exceeded else: # Remove the oldest request and add the new one requests.pop(0) user_requests[user_id].append(current_time) else: user_requests[user_id] = [current_time] print(f"[{timestamp}] {user_id} - {len(user_requests[user_id])}/{max_requests}") return True # Rate limit not exceeded ticker_input = gr.Textbox(label="Ticker (US Only)") aws_percentage = gr.Slider(label = "AWS Percentage", minimum = 0, maximum = 100) azure_percentage = gr.Slider(label = "Azure Percentage", minimum = 0, maximum = 100) databricks_percentage = gr.Slider(label = "Databricks Percentage", minimum = 0, maximum = 100) google_cloud_percentage = gr.Slider(label = "Google Cloud Percentage", minimum = 0, maximum = 100) snowflake_percentage = gr.Slider(label = "Snowflake Percentage", minimum = 0, maximum = 100) other_cloud_platforms_percentage = gr.Slider(label = "Other Cloud Platforms Percentage", minimum = 0, maximum = 100) on_premise_percentage = gr.Slider(label = "On-Premise Percentage", value = 100, minimum = 0, maximum = 100) demo = gr.Interface( fn=get_platform_data_valution, inputs=[ticker_input, aws_percentage, azure_percentage, databricks_percentage, google_cloud_percentage, snowflake_percentage, other_cloud_platforms_percentage, on_premise_percentage], outputs=[gr.Textbox(label="Output", lines=5, show_copy_button=True)], allow_flagging="never", theme=gr.themes.Soft() ) demo.launch()