danghungithp's picture
Upload 1398 files
bec48e1 verified
# utils.py - các hàm tiện ích chung cho toàn bộ ứng dụng
import pandas as pd
import numpy as np
import pandas_ta as ta
import scipy.signal
import mplfinance as mpf
import matplotlib.pyplot as plt
from datetime import datetime
import os
from groq import Groq
# Đường dẫn thư mục dữ liệu tài chính
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)))
# Đặt API key Groq
os.environ["GROQ_API_KEY"] = "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"
client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
# Tìm và thay thế tên model Groq AI
GROQ_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct"
def read_csv_safely(path):
if not os.path.exists(path):
return None
try:
return pd.read_csv(path)
except Exception as e:
print(f"[utils] Lỗi đọc file {path}: {e}")
return None
def save_csv_safely(df, path):
try:
df.to_csv(path, index=False)
except Exception as e:
print(f"[utils] Lỗi ghi file {path}: {e}")
def get_today_str():
return datetime.now().strftime('%Y-%m-%d')
def detect_candlestick_patterns(df):
"""
Detects various candlestick patterns in the given dataframe using pandas-ta.
"""
import pandas_ta as ta
# Ensure that the necessary columns are present
if not all(col in df.columns for col in ['open', 'high', 'low', 'close']):
print("Missing OHLC data")
return {}
patterns = {}
# Hammer (đảo chiều tăng)
if hasattr(ta, 'cdl_hammer'):
patterns['hammer'] = ta.cdl_hammer(df['open'], df['high'], df['low'], df['close'])
else:
patterns['hammer'] = None
# Shooting Star (đảo chiều giảm)
if hasattr(ta, 'cdl_shootingstar'):
patterns['shooting_star'] = ta.cdl_shootingstar(df['open'], df['high'], df['low'], df['close'])
else:
patterns['shooting_star'] = None
# Engulfing (Bullish/Bearish)
if hasattr(ta, 'cdl_engulfing'):
engulfing = ta.cdl_engulfing(df['open'], df['high'], df['low'], df['close'])
patterns['bullish_engulfing'] = (engulfing == 100).astype(int)
patterns['bearish_engulfing'] = (engulfing == -100).astype(int)
else:
patterns['bullish_engulfing'] = None
patterns['bearish_engulfing'] = None
return patterns
def calculate_fibonacci_levels(df, period=250):
"""
Calculates Fibonacci retracement levels based on the most recent significant swing high and low.
"""
df_period = df.tail(period)
if df_period.empty:
return None, None, None
# Find the index of the highest high and lowest low in the period
idx_high = df_period['high'].idxmax()
idx_low = df_period['low'].idxmin()
# Determine if the trend is up (low then high) or down (high then low)
if idx_low < idx_high:
# Uptrend: swing low to swing high
swing_low = df_period.loc[idx_low]['low']
swing_high = df_period.loc[idx_high]['high']
is_uptrend = True
else:
# Downtrend: swing high to swing low
swing_high = df_period.loc[idx_high]['high']
swing_low = df_period.loc[idx_low]['low']
is_uptrend = False
if pd.isna(swing_high) or pd.isna(swing_low):
return None, None, None
diff = swing_high - swing_low
# Standard Fibonacci retracement levels
fibo_ratios = [0.236, 0.382, 0.5, 0.618, 0.786]
levels = {}
if is_uptrend: # Retracement levels are below the swing high
for ratio in fibo_ratios:
levels[f'{ratio*100:.1f}%'] = round(swing_high - diff * ratio, 2)
else: # Retracement levels are above the swing low
for ratio in fibo_ratios:
levels[f'{ratio*100:.1f}%'] = round(swing_low + diff * ratio, 2)
# Add extension levels
fibo_ext_ratios = [1.272, 1.618]
if is_uptrend:
for ratio in fibo_ext_ratios:
levels[f'Ext {ratio*100:.1f}%'] = round(swing_high + diff * (ratio - 1), 2)
else:
for ratio in fibo_ext_ratios:
levels[f'Ext {ratio*100:.1f}%'] = round(swing_low - diff * (ratio - 1), 2)
return levels, swing_low, swing_high
def calculate_money_flow(df):
"""
Calculates money flow for each day and compares it to the 20-day average.
"""
df['money_flow'] = df['volume'] * df['close']
df['money_flow_20d_avg'] = df['money_flow'].rolling(window=20).mean()
return df
def find_double_top_bottom(df):
import scipy.signal
peaks, _ = scipy.signal.find_peaks(df['close'])
troughs, _ = scipy.signal.find_peaks(-df['close'])
double_tops = []
double_bottoms = []
for i in range(1, len(peaks)):
if peaks[i] - peaks[i-1] < 10:
double_tops.append((str(df['time'].iloc[peaks[i-1]]), str(df['time'].iloc[peaks[i]])))
for i in range(1, len(troughs)):
if troughs[i] - troughs[i-1] < 10:
double_bottoms.append((str(df['time'].iloc[troughs[i-1]]), str(df['time'].iloc[troughs[i]])))
return double_tops, double_bottoms
def detect_w_double_bottom(df, min_distance=5, max_distance=40, tolerance=0.03):
import scipy.signal
closes = df['close'].values
troughs, _ = scipy.signal.find_peaks(-closes)
results = []
for i in range(len(troughs)-1):
idx1, idx2 = troughs[i], troughs[i+1]
if min_distance <= idx2 - idx1 <= max_distance:
val1, val2 = closes[idx1], closes[idx2]
if abs(val1-val2)/max(val1, val2) <= tolerance:
mid_idx = idx1 + (idx2-idx1)//2
peak_between = closes[idx1:idx2+1].max()
if peak_between > max(val1, val2) * 1.05:
results.append((str(df['time'].iloc[idx1]), str(df['time'].iloc[idx2])))
return results
def detect_m_double_top(df, min_distance=5, max_distance=40, tolerance=0.03):
import scipy.signal
closes = df['close'].values
peaks, _ = scipy.signal.find_peaks(closes)
results = []
for i in range(len(peaks)-1):
idx1, idx2 = peaks[i], peaks[i+1]
if min_distance <= idx2 - idx1 <= max_distance:
val1, val2 = closes[idx1], closes[idx2]
if abs(val1-val2)/max(val1, val2) <= tolerance:
mid_idx = idx1 + (idx2-idx1)//2
trough_between = closes[idx1:idx2+1].min()
if trough_between < min(val1, val2) * 0.95:
results.append((str(df['time'].iloc[idx1]), str(df['time'].iloc[idx2])))
return results
def detect_cup_and_handle(df, window=40, min_cup_depth=0.08, min_handle_depth=0.03):
closes = df['close'].values
n = len(closes)
results = []
for i in range(window, n - window):
left = i - window
right = i
cup_bottom_idx = left + closes[left:right].argmin()
cup_bottom = closes[cup_bottom_idx]
cup_left = closes[left]
cup_right = closes[right]
cup_top = max(cup_left, cup_right)
cup_depth = (cup_top - cup_bottom) / cup_top if cup_top > 0 else 0
if cup_depth < min_cup_depth:
continue
handle_start = right
handle_end = min(n-1, handle_start + window//2)
handle_min_idx = handle_start + closes[handle_start:handle_end].argmin()
handle_min = closes[handle_min_idx]
handle_depth = (cup_top - handle_min) / cup_top if cup_top > 0 else 0
if handle_depth < min_handle_depth:
continue
if closes[handle_end-1] > cup_top:
results.append({
'cup_start': str(df['time'].iloc[left]),
'cup_bottom': str(df['time'].iloc[cup_bottom_idx]),
'cup_end': str(df['time'].iloc[right]),
'handle_start': str(df['time'].iloc[handle_start]),
'handle_end': str(df['time'].iloc[handle_end-1])
})
return results
def plot_candlestick_with_fibo_patterns(df, fibonacci_levels, pattern_results, symbol, chart_path, double_tops=None, double_bottoms=None, cup_handle_patterns=None, w_double_bottoms=None, m_double_tops=None, color_map=None):
import mplfinance as mpf
import numpy as np
import logging
# Remove unused imports
# import matplotlib.pyplot as plt
# import pandas as pd
# Parameter Validation
required_columns = ['open', 'high', 'low', 'close', 'volume', 'time']
if not all(col in df.columns for col in required_columns):
raise ValueError(f"DataFrame must contain columns: {required_columns}")
df_plot = df.copy()
df_plot['time'] = pd.to_datetime(df_plot['time'])
df_plot.set_index('time', inplace=True)
df_plot = df_plot[['open', 'high', 'low', 'close', 'volume']]
fibo_lines = []
if fibonacci_levels:
for level in fibonacci_levels:
fibo_lines.append(mpf.make_addplot([level]*len(df_plot), color='purple', linestyle='--'))
pattern_markers = []
# Use default color map if none is provided
default_color_map = {
'hammer': 'red',
'shooting_star': 'blue',
'bullish_engulfing': 'green',
'bearish_engulfing': 'orange'
}
color_map = color_map or default_color_map
for pattern, pattern_data in pattern_results.items():
if pattern_data is not None:
indices = df_plot.index[df[pattern] != 0] if pattern in df else []
if len(indices) > 0:
marker_color = color_map.get(pattern, 'black')
marker_vals = [df_plot['close'].loc[idx] if idx in indices else None for idx in df_plot.index]
pattern_markers.append(
mpf.make_addplot(
marker_vals,
type='scatter',
markersize=40,
marker='o',
color=marker_color,
alpha=0.8,
secondary_y=False # Ensure markers are plotted on the primary y-axis
)
)
if double_tops:
top_dates = [pd.to_datetime(t[1]) for t in double_tops if pd.to_datetime(t[1]) in df_plot.index]
marker_vals = [float(df_plot['high'].loc[date]) if date in top_dates else np.nan for date in df_plot.index]
if any(marker_vals):
pattern_markers.append(
mpf.make_addplot(
marker_vals,
type='scatter',
markersize=80,
marker='^',
color='magenta',
alpha=0.9,
secondary_y=False,
label=""#data['label'] + ': ' + ', '.join([str(date.date()) for date in top_dates]) if top_dates else None
)
)
cup_handle_lines = []
if cup_handle_patterns:
for pattern in cup_handle_patterns:
try:
cup_x = [pd.to_datetime(pattern['cup_start']), pd.to_datetime(pattern['cup_bottom']), pd.to_datetime(pattern['cup_end'])]
cup_y = [df_plot['close'].loc[cup_x[0]], df_plot['close'].loc[cup_x[1]], df_plot['close'].loc[cup_x[2]]]
cup_handle_lines.append(
mpf.make_addplot(
[cup_y[0] if date == cup_x[0] else cup_y[1] if date == cup_x[1] else cup_y[2] if date == cup_x[2] else np.nan for date in df_plot.index],
color='blue', width=2, secondary_y=False,
label='Cup: ' + ', '.join([str(x.date()) for x in cup_x])
)
)
handle_x = [pd.to_datetime(pattern['handle_start']), pd.to_datetime(pattern['handle_end'])]
handle_y = [df_plot['close'].loc[handle_x[0]], df_plot['close'].loc[handle_x[1]]]
cup_handle_lines.append(
mpf.make_addplot(
[handle_y[0] if date == handle_x[0] else handle_y[1] if date == handle_x[1] else np.nan for date in df_plot.index],
color='green', width=2, secondary_y=False,
label='Handle: ' + ', '.join([str(x.date()) for x in handle_x])
)
)
marker_vals = [df_plot['close'].loc[cup_x[1]] if date == cup_x[1] else np.nan for date in df_plot.index]
cup_handle_lines.append(
mpf.make_addplot(
marker_vals,
type='scatter',
markersize=120,
marker='o',
color='red',
alpha=0.5,
secondary_y=False,
label='Cup Bottom: ' + str(cup_x[1].date())
)
)
except Exception as e:
logging.exception("Error plotting cup and handle pattern")
continue
addplots = []
if fibo_lines:
addplots.extend([ap for ap in fibo_lines if ap is not None])
if pattern_markers:
addplots.extend([ap for ap in pattern_markers if ap is not None])
if cup_handle_lines:
addplots.extend([ap for ap in cup_handle_lines if ap is not None])
plot_kwargs = dict(
type='candle',
style='yahoo',
volume=True,
title=f'Biểu đồ nến, Fibonacci, mẫu hình nến và Cup & Handle: {symbol}',
ylabel='Giá',
ylabel_lower='Khối lượng',
returnfig=True,
figsize=(12, 8)
)
#if addplots:
# plot_kwargs['addplot'] = addplots
fig, axlist = mpf.plot(df_plot, **plot_kwargs)
fig.savefig(chart_path)
plt.close(fig)
def get_financial_valuation(stock):
if stock is None or not hasattr(stock, 'finance') or stock.finance is None:
return {'error': 'Không có dữ liệu tài chính cho mã này hoặc API trả về lỗi.'}
try:
# Lấy bảng ratio (tỷ số tài chính)
ratio = stock.finance.ratio(period='year', lang='vi', dropna=True)
bs = stock.finance.balance_sheet(period='year', lang='vi', dropna=True)
is_ = stock.finance.income_statement(period='year', lang='vi', dropna=True)
# Kiểm tra tồn tại cột 'year' và không rỗng
for df, name in zip([ratio, bs, is_], ['ratio', 'balance_sheet', 'income_statement']):
if not isinstance(df, pd.DataFrame) or df.empty or 'year' not in df.columns:
return {'error': f'Dữ liệu {name} không hợp lệ hoặc thiếu cột year.'}
# Lấy năm gần nhất có đủ dữ liệu
years = set(ratio['year']).intersection(bs['year']).intersection(is_['year'])
if not years:
return {'error': 'Không đủ dữ liệu tài chính để định giá.'}
latest_year = max(years)
# Lấy dữ liệu năm gần nhất
ratio_row = ratio[ratio['year'] == latest_year].iloc[0]
bs_row = bs[bs['year'] == latest_year].iloc[0]
is_row = is_[is_['year'] == latest_year].iloc[0]
# Giá đóng cửa gần nhất
close_price = stock.quote.history(start=f"{latest_year}-01-01", end=f"{latest_year}-12-31", interval='1D')['close'].iloc[-1]
# Lấy chỉ số từ ratio nếu có
pe = ratio_row.get('P/E', None)
pb = ratio_row.get('P/B', None)
roe = ratio_row.get('ROE (%)', None)
eps = ratio_row.get('EPS', None)
bvps = ratio_row.get('BVPS', None)
# Fallback tự tính nếu thiếu
equity = bs_row.get('Vốn chủ sở hữu', None)
net_income = is_row.get('Lợi nhuận sau thuế', None)
shares = bs_row.get('Vốn chủ sở hữu', None)
if shares is None or shares == 0:
shares = bs_row.get('Vốn góp của chủ sở hữu', None)
if roe is None and net_income and equity and equity != 0:
roe = round(net_income / equity * 100, 2)
if pe is None and net_income and shares and shares != 0:
pe = round(close_price * shares / net_income, 2)
if pb is None and equity and shares and shares != 0:
pb = round(close_price * shares / equity, 2)
if eps is None and net_income and shares and shares != 0:
eps = round(net_income / shares, 2)
if bvps is None and equity and shares and shares != 0:
bvps = round(equity / shares, 2)
# Định giá nội tại đơn giản theo phương pháp chiết khấu ROE/PB
intrinsic_value = None
if roe and pb and pb != 0 and equity:
intrinsic_value = round((roe / 100) * equity / pb, 2)
return {
'year': latest_year,
'close_price': close_price,
'pe': pe,
'pb': pb,
'roe': roe,
'eps': eps,
'bvps': bvps,
'intrinsic_value': intrinsic_value,
'revenue': is_row.get('Doanh thu thuần', None),
'net_income': net_income,
'equity': equity
}
except Exception as e:
return {'error': f'Lỗi khi lấy dữ liệu tài chính: {e}'}
def calculate_dcf_valuation(fcf, growth_rate, wacc, years=5, terminal_growth=0.03):
"""
Tính giá trị nội tại theo phương pháp DCF (Discounted Cash Flow).
fcf: Free Cash Flow năm gần nhất
growth_rate: tốc độ tăng trưởng FCF dự kiến (vd: 0.1 = 10%)
wacc: Weighted Average Cost of Capital (tỷ lệ chiết khấu)
years: số năm dự báo
terminal_growth: tốc độ tăng trưởng dài hạn (vd: 0.03 = 3%)
"""
import numpy_financial as npf
cash_flows = []
for i in range(1, years+1):
cash_flows.append(fcf * (1 + growth_rate) ** i)
terminal_value = cash_flows[-1] * (1 + terminal_growth) / (wacc - terminal_growth)
cash_flows[-1] += terminal_value
dcf_value = npf.npv(wacc, cash_flows)
return dcf_value
def calculate_ddm_valuation(dividend, growth_rate, required_return):
"""
Định giá theo mô hình chiết khấu cổ tức (DDM) với tăng trưởng không đổi.
"""
if required_return <= growth_rate:
return None
return dividend * (1 + growth_rate) / (required_return - growth_rate)
def calculate_nav(equity, debt=0):
"""
Định giá theo giá trị tài sản ròng (NAV).
"""
return equity - debt
def calculate_residual_income(net_income, equity, cost_of_equity):
"""
Tính Residual Income = Lợi nhuận sau thuế - chi phí vốn chủ sở hữu
"""
return net_income - cost_of_equity * equity
def calculate_eva(net_income, equity, debt, cost_of_equity, cost_of_debt):
"""
EVA = Lợi nhuận sau thuế - chi phí tổng vốn (vốn chủ + nợ)
"""
return net_income - (cost_of_equity * equity + cost_of_debt * debt)
def safe_float(val, default=0.0):
try:
return float(val)
except (TypeError, ValueError):
return default
def analyze_financial_csv_with_groq(csv_content, user_question=None):
prompt = (
"Bạn là chuyên gia tài chính. Hãy phân tích dữ liệu tài chính sau (dưới dạng CSV):\n"
f"{csv_content}\n"
"1. Tóm tắt các điểm nổi bật: doanh thu, lợi nhuận, tăng trưởng, rủi ro.\n"
"2. Phát hiện xu hướng, bất thường, cảnh báo sớm nếu có.\n"
"3. Đưa ra nhận định, gợi ý chiến lược đầu tư.\n"
"4. Định giá cổ phiếu theo các phương pháp: P/E, P/B, Book Value, DDM, DCF (nếu đủ dữ liệu).\n"
"5. Thống kê quý nào trong năm thường có doanh thu và lợi nhuận sau thuế cao nhất (nếu có dữ liệu quý).\n"
"\nTrả lời toàn bộ bằng tiếng Việt."
)
if user_question:
prompt += f"\n6. Trả lời câu hỏi: {user_question}\n"
chat_completion = client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model=GROQ_MODEL,
)
return chat_completion.choices[0].message.content
def fetch_vietstock_news(limit=15):
"""
Fetches the latest financial news from Vietstock.
"""
import requests
from bs4 import BeautifulSoup
try:
url = "https://vietstock.vn/chung-khoan.htm"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'}
res = requests.get(url, headers=headers, timeout=10)
res.raise_for_status()
soup = BeautifulSoup(res.content, 'html.parser')
news_items = []
articles = soup.find_all('div', class_='article-content', limit=limit)
for article in articles:
title_tag = article.find('a', class_='channel-title')
if title_tag:
title = title_tag.text.strip()
link = "https://vietstock.vn" + title_tag['href']
news_items.append({'title': title, 'link': link})
return news_items
except Exception as e:
print(f"Error fetching Vietstock news: {e}")
return []
def analyze_news_with_groq(news_items):
"""
Analyzes a list of news headlines with Groq AI and returns a summary.
"""
if not news_items:
return "Không có tin tức để phân tích."
headlines = "- " + "\n- ".join([item['title'] for item in news_items])
prompt = (
"Bạn là một chuyên gia phân tích thị trường chứng khoán Việt Nam. "
"Dưới đây là các tiêu đề tin tức mới nhất:\n"
f"{headlines}\n\n"
"Dựa vào các tin tức này, hãy:\n"
"1. Tóm tắt các xu hướng chính của thị trường (tích cực, tiêu cực, trung lập).\n"
"2. Nhận định các ngành hoặc nhóm cổ phiếu nào có thể bị ảnh hưởng.\n"
"3. Đưa ra một bình luận ngắn gọn về tâm lý thị trường hiện tại.\n"
"Trình bày súc tích, chuyên nghiệp và hoàn toàn bằng tiếng Việt."
)
try:
chat_completion = client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model=GROQ_MODEL,
max_tokens=500,
temperature=0.3,
)
return chat_completion.choices[0].message.content
except Exception as e:
print(f"Error analyzing news with Groq: {e}")
return "Lỗi khi phân tích tin tức với AI."
# Thêm các hàm tiện ích khác nếu cần