Spaces:
Runtime error
Runtime error
# utils.py - các hàm tiện ích chung cho toàn bộ ứng dụng | |
import pandas as pd | |
import numpy as np | |
import pandas_ta as ta | |
import scipy.signal | |
import mplfinance as mpf | |
import matplotlib.pyplot as plt | |
from datetime import datetime | |
import os | |
from groq import Groq | |
# Đường dẫn thư mục dữ liệu tài chính | |
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__))) | |
# Đặt API key Groq | |
os.environ["GROQ_API_KEY"] = "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" | |
client = Groq(api_key=os.environ.get("GROQ_API_KEY")) | |
# Tìm và thay thế tên model Groq AI | |
GROQ_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct" | |
def read_csv_safely(path): | |
if not os.path.exists(path): | |
return None | |
try: | |
return pd.read_csv(path) | |
except Exception as e: | |
print(f"[utils] Lỗi đọc file {path}: {e}") | |
return None | |
def save_csv_safely(df, path): | |
try: | |
df.to_csv(path, index=False) | |
except Exception as e: | |
print(f"[utils] Lỗi ghi file {path}: {e}") | |
def get_today_str(): | |
return datetime.now().strftime('%Y-%m-%d') | |
def detect_candlestick_patterns(df): | |
""" | |
Detects various candlestick patterns in the given dataframe using pandas-ta. | |
""" | |
import pandas_ta as ta | |
# Ensure that the necessary columns are present | |
if not all(col in df.columns for col in ['open', 'high', 'low', 'close']): | |
print("Missing OHLC data") | |
return {} | |
patterns = {} | |
# Hammer (đảo chiều tăng) | |
if hasattr(ta, 'cdl_hammer'): | |
patterns['hammer'] = ta.cdl_hammer(df['open'], df['high'], df['low'], df['close']) | |
else: | |
patterns['hammer'] = None | |
# Shooting Star (đảo chiều giảm) | |
if hasattr(ta, 'cdl_shootingstar'): | |
patterns['shooting_star'] = ta.cdl_shootingstar(df['open'], df['high'], df['low'], df['close']) | |
else: | |
patterns['shooting_star'] = None | |
# Engulfing (Bullish/Bearish) | |
if hasattr(ta, 'cdl_engulfing'): | |
engulfing = ta.cdl_engulfing(df['open'], df['high'], df['low'], df['close']) | |
patterns['bullish_engulfing'] = (engulfing == 100).astype(int) | |
patterns['bearish_engulfing'] = (engulfing == -100).astype(int) | |
else: | |
patterns['bullish_engulfing'] = None | |
patterns['bearish_engulfing'] = None | |
return patterns | |
def calculate_fibonacci_levels(df, period=250): | |
""" | |
Calculates Fibonacci retracement levels based on the most recent significant swing high and low. | |
""" | |
df_period = df.tail(period) | |
if df_period.empty: | |
return None, None, None | |
# Find the index of the highest high and lowest low in the period | |
idx_high = df_period['high'].idxmax() | |
idx_low = df_period['low'].idxmin() | |
# Determine if the trend is up (low then high) or down (high then low) | |
if idx_low < idx_high: | |
# Uptrend: swing low to swing high | |
swing_low = df_period.loc[idx_low]['low'] | |
swing_high = df_period.loc[idx_high]['high'] | |
is_uptrend = True | |
else: | |
# Downtrend: swing high to swing low | |
swing_high = df_period.loc[idx_high]['high'] | |
swing_low = df_period.loc[idx_low]['low'] | |
is_uptrend = False | |
if pd.isna(swing_high) or pd.isna(swing_low): | |
return None, None, None | |
diff = swing_high - swing_low | |
# Standard Fibonacci retracement levels | |
fibo_ratios = [0.236, 0.382, 0.5, 0.618, 0.786] | |
levels = {} | |
if is_uptrend: # Retracement levels are below the swing high | |
for ratio in fibo_ratios: | |
levels[f'{ratio*100:.1f}%'] = round(swing_high - diff * ratio, 2) | |
else: # Retracement levels are above the swing low | |
for ratio in fibo_ratios: | |
levels[f'{ratio*100:.1f}%'] = round(swing_low + diff * ratio, 2) | |
# Add extension levels | |
fibo_ext_ratios = [1.272, 1.618] | |
if is_uptrend: | |
for ratio in fibo_ext_ratios: | |
levels[f'Ext {ratio*100:.1f}%'] = round(swing_high + diff * (ratio - 1), 2) | |
else: | |
for ratio in fibo_ext_ratios: | |
levels[f'Ext {ratio*100:.1f}%'] = round(swing_low - diff * (ratio - 1), 2) | |
return levels, swing_low, swing_high | |
def calculate_money_flow(df): | |
""" | |
Calculates money flow for each day and compares it to the 20-day average. | |
""" | |
df['money_flow'] = df['volume'] * df['close'] | |
df['money_flow_20d_avg'] = df['money_flow'].rolling(window=20).mean() | |
return df | |
def find_double_top_bottom(df): | |
import scipy.signal | |
peaks, _ = scipy.signal.find_peaks(df['close']) | |
troughs, _ = scipy.signal.find_peaks(-df['close']) | |
double_tops = [] | |
double_bottoms = [] | |
for i in range(1, len(peaks)): | |
if peaks[i] - peaks[i-1] < 10: | |
double_tops.append((str(df['time'].iloc[peaks[i-1]]), str(df['time'].iloc[peaks[i]]))) | |
for i in range(1, len(troughs)): | |
if troughs[i] - troughs[i-1] < 10: | |
double_bottoms.append((str(df['time'].iloc[troughs[i-1]]), str(df['time'].iloc[troughs[i]]))) | |
return double_tops, double_bottoms | |
def detect_w_double_bottom(df, min_distance=5, max_distance=40, tolerance=0.03): | |
import scipy.signal | |
closes = df['close'].values | |
troughs, _ = scipy.signal.find_peaks(-closes) | |
results = [] | |
for i in range(len(troughs)-1): | |
idx1, idx2 = troughs[i], troughs[i+1] | |
if min_distance <= idx2 - idx1 <= max_distance: | |
val1, val2 = closes[idx1], closes[idx2] | |
if abs(val1-val2)/max(val1, val2) <= tolerance: | |
mid_idx = idx1 + (idx2-idx1)//2 | |
peak_between = closes[idx1:idx2+1].max() | |
if peak_between > max(val1, val2) * 1.05: | |
results.append((str(df['time'].iloc[idx1]), str(df['time'].iloc[idx2]))) | |
return results | |
def detect_m_double_top(df, min_distance=5, max_distance=40, tolerance=0.03): | |
import scipy.signal | |
closes = df['close'].values | |
peaks, _ = scipy.signal.find_peaks(closes) | |
results = [] | |
for i in range(len(peaks)-1): | |
idx1, idx2 = peaks[i], peaks[i+1] | |
if min_distance <= idx2 - idx1 <= max_distance: | |
val1, val2 = closes[idx1], closes[idx2] | |
if abs(val1-val2)/max(val1, val2) <= tolerance: | |
mid_idx = idx1 + (idx2-idx1)//2 | |
trough_between = closes[idx1:idx2+1].min() | |
if trough_between < min(val1, val2) * 0.95: | |
results.append((str(df['time'].iloc[idx1]), str(df['time'].iloc[idx2]))) | |
return results | |
def detect_cup_and_handle(df, window=40, min_cup_depth=0.08, min_handle_depth=0.03): | |
closes = df['close'].values | |
n = len(closes) | |
results = [] | |
for i in range(window, n - window): | |
left = i - window | |
right = i | |
cup_bottom_idx = left + closes[left:right].argmin() | |
cup_bottom = closes[cup_bottom_idx] | |
cup_left = closes[left] | |
cup_right = closes[right] | |
cup_top = max(cup_left, cup_right) | |
cup_depth = (cup_top - cup_bottom) / cup_top if cup_top > 0 else 0 | |
if cup_depth < min_cup_depth: | |
continue | |
handle_start = right | |
handle_end = min(n-1, handle_start + window//2) | |
handle_min_idx = handle_start + closes[handle_start:handle_end].argmin() | |
handle_min = closes[handle_min_idx] | |
handle_depth = (cup_top - handle_min) / cup_top if cup_top > 0 else 0 | |
if handle_depth < min_handle_depth: | |
continue | |
if closes[handle_end-1] > cup_top: | |
results.append({ | |
'cup_start': str(df['time'].iloc[left]), | |
'cup_bottom': str(df['time'].iloc[cup_bottom_idx]), | |
'cup_end': str(df['time'].iloc[right]), | |
'handle_start': str(df['time'].iloc[handle_start]), | |
'handle_end': str(df['time'].iloc[handle_end-1]) | |
}) | |
return results | |
def plot_candlestick_with_fibo_patterns(df, fibonacci_levels, pattern_results, symbol, chart_path, double_tops=None, double_bottoms=None, cup_handle_patterns=None, w_double_bottoms=None, m_double_tops=None, color_map=None): | |
import mplfinance as mpf | |
import numpy as np | |
import logging | |
# Remove unused imports | |
# import matplotlib.pyplot as plt | |
# import pandas as pd | |
# Parameter Validation | |
required_columns = ['open', 'high', 'low', 'close', 'volume', 'time'] | |
if not all(col in df.columns for col in required_columns): | |
raise ValueError(f"DataFrame must contain columns: {required_columns}") | |
df_plot = df.copy() | |
df_plot['time'] = pd.to_datetime(df_plot['time']) | |
df_plot.set_index('time', inplace=True) | |
df_plot = df_plot[['open', 'high', 'low', 'close', 'volume']] | |
fibo_lines = [] | |
if fibonacci_levels: | |
for level in fibonacci_levels: | |
fibo_lines.append(mpf.make_addplot([level]*len(df_plot), color='purple', linestyle='--')) | |
pattern_markers = [] | |
# Use default color map if none is provided | |
default_color_map = { | |
'hammer': 'red', | |
'shooting_star': 'blue', | |
'bullish_engulfing': 'green', | |
'bearish_engulfing': 'orange' | |
} | |
color_map = color_map or default_color_map | |
for pattern, pattern_data in pattern_results.items(): | |
if pattern_data is not None: | |
indices = df_plot.index[df[pattern] != 0] if pattern in df else [] | |
if len(indices) > 0: | |
marker_color = color_map.get(pattern, 'black') | |
marker_vals = [df_plot['close'].loc[idx] if idx in indices else None for idx in df_plot.index] | |
pattern_markers.append( | |
mpf.make_addplot( | |
marker_vals, | |
type='scatter', | |
markersize=40, | |
marker='o', | |
color=marker_color, | |
alpha=0.8, | |
secondary_y=False # Ensure markers are plotted on the primary y-axis | |
) | |
) | |
if double_tops: | |
top_dates = [pd.to_datetime(t[1]) for t in double_tops if pd.to_datetime(t[1]) in df_plot.index] | |
marker_vals = [float(df_plot['high'].loc[date]) if date in top_dates else np.nan for date in df_plot.index] | |
if any(marker_vals): | |
pattern_markers.append( | |
mpf.make_addplot( | |
marker_vals, | |
type='scatter', | |
markersize=80, | |
marker='^', | |
color='magenta', | |
alpha=0.9, | |
secondary_y=False, | |
label=""#data['label'] + ': ' + ', '.join([str(date.date()) for date in top_dates]) if top_dates else None | |
) | |
) | |
cup_handle_lines = [] | |
if cup_handle_patterns: | |
for pattern in cup_handle_patterns: | |
try: | |
cup_x = [pd.to_datetime(pattern['cup_start']), pd.to_datetime(pattern['cup_bottom']), pd.to_datetime(pattern['cup_end'])] | |
cup_y = [df_plot['close'].loc[cup_x[0]], df_plot['close'].loc[cup_x[1]], df_plot['close'].loc[cup_x[2]]] | |
cup_handle_lines.append( | |
mpf.make_addplot( | |
[cup_y[0] if date == cup_x[0] else cup_y[1] if date == cup_x[1] else cup_y[2] if date == cup_x[2] else np.nan for date in df_plot.index], | |
color='blue', width=2, secondary_y=False, | |
label='Cup: ' + ', '.join([str(x.date()) for x in cup_x]) | |
) | |
) | |
handle_x = [pd.to_datetime(pattern['handle_start']), pd.to_datetime(pattern['handle_end'])] | |
handle_y = [df_plot['close'].loc[handle_x[0]], df_plot['close'].loc[handle_x[1]]] | |
cup_handle_lines.append( | |
mpf.make_addplot( | |
[handle_y[0] if date == handle_x[0] else handle_y[1] if date == handle_x[1] else np.nan for date in df_plot.index], | |
color='green', width=2, secondary_y=False, | |
label='Handle: ' + ', '.join([str(x.date()) for x in handle_x]) | |
) | |
) | |
marker_vals = [df_plot['close'].loc[cup_x[1]] if date == cup_x[1] else np.nan for date in df_plot.index] | |
cup_handle_lines.append( | |
mpf.make_addplot( | |
marker_vals, | |
type='scatter', | |
markersize=120, | |
marker='o', | |
color='red', | |
alpha=0.5, | |
secondary_y=False, | |
label='Cup Bottom: ' + str(cup_x[1].date()) | |
) | |
) | |
except Exception as e: | |
logging.exception("Error plotting cup and handle pattern") | |
continue | |
addplots = [] | |
if fibo_lines: | |
addplots.extend([ap for ap in fibo_lines if ap is not None]) | |
if pattern_markers: | |
addplots.extend([ap for ap in pattern_markers if ap is not None]) | |
if cup_handle_lines: | |
addplots.extend([ap for ap in cup_handle_lines if ap is not None]) | |
plot_kwargs = dict( | |
type='candle', | |
style='yahoo', | |
volume=True, | |
title=f'Biểu đồ nến, Fibonacci, mẫu hình nến và Cup & Handle: {symbol}', | |
ylabel='Giá', | |
ylabel_lower='Khối lượng', | |
returnfig=True, | |
figsize=(12, 8) | |
) | |
#if addplots: | |
# plot_kwargs['addplot'] = addplots | |
fig, axlist = mpf.plot(df_plot, **plot_kwargs) | |
fig.savefig(chart_path) | |
plt.close(fig) | |
def get_financial_valuation(stock): | |
if stock is None or not hasattr(stock, 'finance') or stock.finance is None: | |
return {'error': 'Không có dữ liệu tài chính cho mã này hoặc API trả về lỗi.'} | |
try: | |
# Lấy bảng ratio (tỷ số tài chính) | |
ratio = stock.finance.ratio(period='year', lang='vi', dropna=True) | |
bs = stock.finance.balance_sheet(period='year', lang='vi', dropna=True) | |
is_ = stock.finance.income_statement(period='year', lang='vi', dropna=True) | |
# Kiểm tra tồn tại cột 'year' và không rỗng | |
for df, name in zip([ratio, bs, is_], ['ratio', 'balance_sheet', 'income_statement']): | |
if not isinstance(df, pd.DataFrame) or df.empty or 'year' not in df.columns: | |
return {'error': f'Dữ liệu {name} không hợp lệ hoặc thiếu cột year.'} | |
# Lấy năm gần nhất có đủ dữ liệu | |
years = set(ratio['year']).intersection(bs['year']).intersection(is_['year']) | |
if not years: | |
return {'error': 'Không đủ dữ liệu tài chính để định giá.'} | |
latest_year = max(years) | |
# Lấy dữ liệu năm gần nhất | |
ratio_row = ratio[ratio['year'] == latest_year].iloc[0] | |
bs_row = bs[bs['year'] == latest_year].iloc[0] | |
is_row = is_[is_['year'] == latest_year].iloc[0] | |
# Giá đóng cửa gần nhất | |
close_price = stock.quote.history(start=f"{latest_year}-01-01", end=f"{latest_year}-12-31", interval='1D')['close'].iloc[-1] | |
# Lấy chỉ số từ ratio nếu có | |
pe = ratio_row.get('P/E', None) | |
pb = ratio_row.get('P/B', None) | |
roe = ratio_row.get('ROE (%)', None) | |
eps = ratio_row.get('EPS', None) | |
bvps = ratio_row.get('BVPS', None) | |
# Fallback tự tính nếu thiếu | |
equity = bs_row.get('Vốn chủ sở hữu', None) | |
net_income = is_row.get('Lợi nhuận sau thuế', None) | |
shares = bs_row.get('Vốn chủ sở hữu', None) | |
if shares is None or shares == 0: | |
shares = bs_row.get('Vốn góp của chủ sở hữu', None) | |
if roe is None and net_income and equity and equity != 0: | |
roe = round(net_income / equity * 100, 2) | |
if pe is None and net_income and shares and shares != 0: | |
pe = round(close_price * shares / net_income, 2) | |
if pb is None and equity and shares and shares != 0: | |
pb = round(close_price * shares / equity, 2) | |
if eps is None and net_income and shares and shares != 0: | |
eps = round(net_income / shares, 2) | |
if bvps is None and equity and shares and shares != 0: | |
bvps = round(equity / shares, 2) | |
# Định giá nội tại đơn giản theo phương pháp chiết khấu ROE/PB | |
intrinsic_value = None | |
if roe and pb and pb != 0 and equity: | |
intrinsic_value = round((roe / 100) * equity / pb, 2) | |
return { | |
'year': latest_year, | |
'close_price': close_price, | |
'pe': pe, | |
'pb': pb, | |
'roe': roe, | |
'eps': eps, | |
'bvps': bvps, | |
'intrinsic_value': intrinsic_value, | |
'revenue': is_row.get('Doanh thu thuần', None), | |
'net_income': net_income, | |
'equity': equity | |
} | |
except Exception as e: | |
return {'error': f'Lỗi khi lấy dữ liệu tài chính: {e}'} | |
def calculate_dcf_valuation(fcf, growth_rate, wacc, years=5, terminal_growth=0.03): | |
""" | |
Tính giá trị nội tại theo phương pháp DCF (Discounted Cash Flow). | |
fcf: Free Cash Flow năm gần nhất | |
growth_rate: tốc độ tăng trưởng FCF dự kiến (vd: 0.1 = 10%) | |
wacc: Weighted Average Cost of Capital (tỷ lệ chiết khấu) | |
years: số năm dự báo | |
terminal_growth: tốc độ tăng trưởng dài hạn (vd: 0.03 = 3%) | |
""" | |
import numpy_financial as npf | |
cash_flows = [] | |
for i in range(1, years+1): | |
cash_flows.append(fcf * (1 + growth_rate) ** i) | |
terminal_value = cash_flows[-1] * (1 + terminal_growth) / (wacc - terminal_growth) | |
cash_flows[-1] += terminal_value | |
dcf_value = npf.npv(wacc, cash_flows) | |
return dcf_value | |
def calculate_ddm_valuation(dividend, growth_rate, required_return): | |
""" | |
Định giá theo mô hình chiết khấu cổ tức (DDM) với tăng trưởng không đổi. | |
""" | |
if required_return <= growth_rate: | |
return None | |
return dividend * (1 + growth_rate) / (required_return - growth_rate) | |
def calculate_nav(equity, debt=0): | |
""" | |
Định giá theo giá trị tài sản ròng (NAV). | |
""" | |
return equity - debt | |
def calculate_residual_income(net_income, equity, cost_of_equity): | |
""" | |
Tính Residual Income = Lợi nhuận sau thuế - chi phí vốn chủ sở hữu | |
""" | |
return net_income - cost_of_equity * equity | |
def calculate_eva(net_income, equity, debt, cost_of_equity, cost_of_debt): | |
""" | |
EVA = Lợi nhuận sau thuế - chi phí tổng vốn (vốn chủ + nợ) | |
""" | |
return net_income - (cost_of_equity * equity + cost_of_debt * debt) | |
def safe_float(val, default=0.0): | |
try: | |
return float(val) | |
except (TypeError, ValueError): | |
return default | |
def analyze_financial_csv_with_groq(csv_content, user_question=None): | |
prompt = ( | |
"Bạn là chuyên gia tài chính. Hãy phân tích dữ liệu tài chính sau (dưới dạng CSV):\n" | |
f"{csv_content}\n" | |
"1. Tóm tắt các điểm nổi bật: doanh thu, lợi nhuận, tăng trưởng, rủi ro.\n" | |
"2. Phát hiện xu hướng, bất thường, cảnh báo sớm nếu có.\n" | |
"3. Đưa ra nhận định, gợi ý chiến lược đầu tư.\n" | |
"4. Định giá cổ phiếu theo các phương pháp: P/E, P/B, Book Value, DDM, DCF (nếu đủ dữ liệu).\n" | |
"5. Thống kê quý nào trong năm thường có doanh thu và lợi nhuận sau thuế cao nhất (nếu có dữ liệu quý).\n" | |
"\nTrả lời toàn bộ bằng tiếng Việt." | |
) | |
if user_question: | |
prompt += f"\n6. Trả lời câu hỏi: {user_question}\n" | |
chat_completion = client.chat.completions.create( | |
messages=[{"role": "user", "content": prompt}], | |
model=GROQ_MODEL, | |
) | |
return chat_completion.choices[0].message.content | |
def fetch_vietstock_news(limit=15): | |
""" | |
Fetches the latest financial news from Vietstock. | |
""" | |
import requests | |
from bs4 import BeautifulSoup | |
try: | |
url = "https://vietstock.vn/chung-khoan.htm" | |
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'} | |
res = requests.get(url, headers=headers, timeout=10) | |
res.raise_for_status() | |
soup = BeautifulSoup(res.content, 'html.parser') | |
news_items = [] | |
articles = soup.find_all('div', class_='article-content', limit=limit) | |
for article in articles: | |
title_tag = article.find('a', class_='channel-title') | |
if title_tag: | |
title = title_tag.text.strip() | |
link = "https://vietstock.vn" + title_tag['href'] | |
news_items.append({'title': title, 'link': link}) | |
return news_items | |
except Exception as e: | |
print(f"Error fetching Vietstock news: {e}") | |
return [] | |
def analyze_news_with_groq(news_items): | |
""" | |
Analyzes a list of news headlines with Groq AI and returns a summary. | |
""" | |
if not news_items: | |
return "Không có tin tức để phân tích." | |
headlines = "- " + "\n- ".join([item['title'] for item in news_items]) | |
prompt = ( | |
"Bạn là một chuyên gia phân tích thị trường chứng khoán Việt Nam. " | |
"Dưới đây là các tiêu đề tin tức mới nhất:\n" | |
f"{headlines}\n\n" | |
"Dựa vào các tin tức này, hãy:\n" | |
"1. Tóm tắt các xu hướng chính của thị trường (tích cực, tiêu cực, trung lập).\n" | |
"2. Nhận định các ngành hoặc nhóm cổ phiếu nào có thể bị ảnh hưởng.\n" | |
"3. Đưa ra một bình luận ngắn gọn về tâm lý thị trường hiện tại.\n" | |
"Trình bày súc tích, chuyên nghiệp và hoàn toàn bằng tiếng Việt." | |
) | |
try: | |
chat_completion = client.chat.completions.create( | |
messages=[{"role": "user", "content": prompt}], | |
model=GROQ_MODEL, | |
max_tokens=500, | |
temperature=0.3, | |
) | |
return chat_completion.choices[0].message.content | |
except Exception as e: | |
print(f"Error analyzing news with Groq: {e}") | |
return "Lỗi khi phân tích tin tức với AI." | |
# Thêm các hàm tiện ích khác nếu cần | |