financial_news_bot / src /services /async_trading_grid_calculator.py
Dmitry Beresnev
refactor AsyncTradingGridGenerator and cmd
db46b9f
raw
history blame
21.8 kB
import asyncio
import aiohttp
import yfinance as yf
import numpy as np
import pandas as pd
import warnings
from typing import Dict, List, Tuple, Optional, Union
from datetime import datetime, timedelta
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns
import io
import base64
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import logging
warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AsyncTradingGridGenerator:
"""
Asynchronous trading grid generator for Telegram bots
"""
def __init__(self):
self.strategies = {
"conservative": {
"growth": 1.15,
"drop_pct": 0.2,
"levels": 5,
"risk_factor": 0.8,
"description": "πŸ›‘οΈ Conservative strategy with minimal risk",
"emoji": "🐌"
},
"medium": {
"growth": 1.3,
"drop_pct": 0.35,
"levels": 8,
"risk_factor": 1.0,
"description": "βš–οΈ Balanced strategy with medium aggression",
"emoji": "🎯"
},
"aggressive": {
"growth": 1.5,
"drop_pct": 0.5,
"levels": 12,
"risk_factor": 1.3,
"description": "πŸš€ Aggressive strategy with high profitability",
"emoji": "πŸ”₯"
},
"ultra_aggressive": {
"growth": 1.8,
"drop_pct": 0.7,
"levels": 15,
"risk_factor": 1.6,
"description": "⚑ Maximum aggressive strategy",
"emoji": "πŸ’₯"
}
}
self.executor = ProcessPoolExecutor(max_workers=4)
self.chart_executor = ThreadPoolExecutor(max_workers=2)
@staticmethod
def fetch_data(ticker: str, period: str = "1y") -> pd.DataFrame:
try:
stock = yf.Ticker(ticker)
data = stock.history(period=period)
if data.empty:
raise ValueError(f"Data for ticker {ticker} not found")
# Add technical indicators
data['SMA_20'] = data['Close'].rolling(20).mean()
data['SMA_50'] = data['Close'].rolling(50).mean()
data['Volatility'] = data['Close'].pct_change().rolling(20).std() * np.sqrt(252)
return data
except Exception as e:
logger.error(f"Error getting data for {ticker}: {e}")
raise ValueError(f"Data retrieval error: {e}")
async def get_stock_data_async(self, ticker: str, period: str = "1y") -> pd.DataFrame:
"""Asynchronous stock data retrieval"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor, AsyncTradingGridGenerator.fetch_data, ticker, period
)
def calculate_technical_indicators(self, data: pd.DataFrame) -> Dict[str, float]:
"""Technical indicators calculation"""
current_price = data['Close'].iloc[-1]
# ATR (Average True Range)
high_low = data['High'] - data['Low']
high_close = np.abs(data['High'] - data['Close'].shift())
low_close = np.abs(data['Low'] - data['Close'].shift())
tr = np.maximum.reduce([high_low, high_close, low_close])
tr_series = pd.Series(tr, index=data.index)
atr = tr_series.rolling(14).mean().iloc[-1]
# Support and resistance
support = data['Low'].rolling(20).min().iloc[-1]
resistance = data['High'].rolling(20).max().iloc[-1]
# Volatility
volatility = data['Volatility'].iloc[-1] if not pd.isna(data['Volatility'].iloc[-1]) else 0.25
# RSI
delta = data['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return {
'current_price': current_price,
'atr': atr,
'support': support,
'resistance': resistance,
'volatility': volatility,
'rsi': rsi.iloc[-1] if not pd.isna(rsi.iloc[-1]) else 50,
'sma_20': data['SMA_20'].iloc[-1] if not pd.isna(data['SMA_20'].iloc[-1]) else current_price,
'sma_50': data['SMA_50'].iloc[-1] if not pd.isna(data['SMA_50'].iloc[-1]) else current_price
}
def fibonacci_levels(self, current_price: float, low: float, high: float) -> List[float]:
"""Fibonacci levels calculation"""
ratios = [0.236, 0.382, 0.5, 0.618, 0.786, 0.886]
levels = []
for ratio in ratios:
level = high - (high - low) * ratio
if level < current_price:
levels.append(level)
return sorted(levels, reverse=True)
def geometric_levels(self, current_price: float, drop_pct: float, levels: int) -> List[float]:
"""Geometric levels"""
min_price = current_price * (1 - drop_pct)
step = (current_price - min_price) / levels
return [current_price - step * (i + 1) for i in range(levels)]
def volatility_adjusted_levels(self, current_price: float, atr: float,
volatility: float, levels: int) -> List[float]:
"""Volatility-based levels"""
vol_step = atr * (1 + volatility)
return [current_price - vol_step * (i + 1) for i in range(levels)]
def combine_and_optimize_levels(self, fib_levels: List[float],
geom_levels: List[float],
vol_levels: List[float],
current_price: float,
min_distance: float = 0.02) -> List[float]:
"""Combining and optimizing levels"""
all_levels = fib_levels + geom_levels + vol_levels
# Remove duplicates and sort
unique_levels = list(set([round(level, 2) for level in all_levels
if level < current_price and level > 0]))
unique_levels.sort(reverse=True)
# Remove levels that are too close
optimized_levels = []
last_level = current_price
for level in unique_levels:
distance = abs(last_level - level) / current_price
if distance >= min_distance:
optimized_levels.append(level)
last_level = level
return optimized_levels
def calculate_position_sizes(self, levels: List[float], current_price: float,
capital: float, growth: float,
risk_factor: float) -> List[float]:
"""Position sizes calculation"""
n = len(levels)
if n == 0:
return []
# Weights increase with price decline
weights = []
for i, level in enumerate(levels):
drop_pct = (current_price - level) / current_price
weight = (growth ** i) * (1 + drop_pct * risk_factor)
weights.append(weight)
# Weight normalization
total_weight = sum(weights)
position_sizes = [(weight / total_weight) * capital for weight in weights]
return position_sizes
def calculate_grid_metrics(self, df: pd.DataFrame, current_price: float) -> Dict[str, float]:
"""Grid metrics calculation"""
if df.empty:
return {}
total_capital = df['OrderSize'].sum()
max_drawdown = df['%Drop'].max()
avg_order_size = df['OrderSize'].mean()
# Potential profit when returning to current price
potential_profit = 0
for _, row in df.iterrows():
shares = row['OrderSize'] / row['Price']
profit = shares * (current_price - row['Price'])
potential_profit += profit
return {
'total_capital': total_capital,
'max_drawdown': max_drawdown,
'avg_order_size': avg_order_size,
'potential_profit': potential_profit,
'profit_margin': (potential_profit / total_capital) * 100 if total_capital > 0 else 0,
'number_of_orders': len(df)
}
async def generate_grid_async(self, ticker: str, capital: float = 10000,
strategy: str = "medium") -> Tuple[Dict, pd.DataFrame, Dict]:
"""Asynchronous grid generation"""
if strategy not in self.strategies:
raise ValueError(f"Unknown strategy: {strategy}")
# Get data asynchronously
data = await self.get_stock_data_async(ticker)
indicators = self.calculate_technical_indicators(data)
strategy_params = self.strategies[strategy]
current_price = indicators['current_price']
# Generate levels
fib_levels = self.fibonacci_levels(
current_price,
indicators['support'],
indicators['resistance']
)
geom_levels = self.geometric_levels(
current_price,
strategy_params['drop_pct'],
strategy_params['levels']
)
vol_levels = self.volatility_adjusted_levels(
current_price,
indicators['atr'],
indicators['volatility'],
strategy_params['levels'] // 2
)
# Combine levels
combined_levels = self.combine_and_optimize_levels(
fib_levels, geom_levels, vol_levels, current_price
)
if not combined_levels:
raise ValueError("Failed to generate grid levels")
# Calculate position sizes
position_sizes = self.calculate_position_sizes(
combined_levels,
current_price,
capital,
strategy_params['growth'],
strategy_params['risk_factor']
)
# Create DataFrame
df = pd.DataFrame({
'Level': range(1, len(combined_levels) + 1),
'Price': combined_levels,
'OrderSize': position_sizes,
'Shares': [size / price for size, price in zip(position_sizes, combined_levels)],
'%Drop': [(current_price - price) / current_price * 100 for price in combined_levels],
'Distance_ATR': [(current_price - price) / indicators['atr'] for price in combined_levels]
})
# Add level type column
df['Type'] = 'Combined'
for i, price in enumerate(combined_levels):
if price in fib_levels:
df.loc[i, 'Type'] = 'Fibonacci'
elif abs(price - min(geom_levels, key=lambda x: abs(x - price))) < 0.01:
df.loc[i, 'Type'] = 'Geometric'
# Calculate metrics
metrics = self.calculate_grid_metrics(df, current_price)
return indicators, df, metrics
def format_telegram_message(self, ticker: str, indicators: Dict, df: pd.DataFrame,
metrics: Dict, strategy: str) -> str:
"""Telegram message formatting"""
strategy_info = self.strategies[strategy]
# Determine trend
trend_emoji = "πŸ“ˆ" if indicators['current_price'] > indicators['sma_20'] else "πŸ“‰"
rsi_status = "πŸ”΄ Oversold" if indicators['rsi'] < 30 else "🟒 Overbought" if indicators[
'rsi'] > 70 else "🟑 Neutral"
message = f"""🎯 <b>TRADING GRID {ticker.upper()}</b>
{strategy_info['emoji']} <b>Strategy:</b> {strategy.upper()}
{strategy_info['description']}
πŸ“Š <b>CURRENT INDICATORS:</b>
πŸ’° Price: <code>${indicators['current_price']:.2f}</code> {trend_emoji}
πŸ“Š RSI: <code>{indicators['rsi']:.1f}</code> {rsi_status}
⚑ Volatility: <code>{indicators['volatility']:.1%}</code>
🎯 ATR: <code>${indicators['atr']:.2f}</code>
πŸ”» Support: <code>${indicators['support']:.2f}</code>
πŸ”Ί Resistance: <code>${indicators['resistance']:.2f}</code>
πŸ“‹ <b>GRID METRICS:</b>
πŸ’΅ Capital: <code>${metrics['total_capital']:.2f}</code>
πŸ“‰ Max drawdown: <code>{metrics['max_drawdown']:.2f}%</code>
🎯 Orders: <code>{metrics['number_of_orders']}</code>
πŸ’° Potential: <code>${metrics['potential_profit']:.2f}</code>
πŸ“ˆ Margin: <code>{metrics['profit_margin']:.2f}%</code>
🎯 <b>TOP-{min(5, len(df))} LEVELS:</b>
```
β„– Price Size Drop Type
"""
# Add top-5 levels
for i in range(min(5, len(df))):
row = df.iloc[i]
type_emoji = "πŸŒ€" if row['Type'] == 'Fibonacci' else "πŸ“" if row['Type'] == 'Geometric' else "⚑"
message += f"{row['Level']:2.0f} ${row['Price']:6.2f} ${row['OrderSize']:7.0f} {row['%Drop']:6.2f}% {type_emoji}\n"
message += "```"
if len(df) > 5:
message += f"\nπŸ“ <i>Showing {min(5, len(df))} of {len(df)} levels</i>"
return message
def format_comparison_message(self, ticker: str, comparison_data: List[Dict]) -> str:
"""Strategy comparison message formatting for Telegram"""
message = f"""πŸ“Š <b>STRATEGY COMPARISON {ticker.upper()}</b>
"""
for data in comparison_data:
strategy = data['strategy']
strategy_info = self.strategies[strategy]
message += f"""{strategy_info['emoji']} <b>{strategy.upper()}</b>
🎯 Orders: <code>{data['orders']}</code>
πŸ“‰ Drawdown: <code>{data['max_drawdown']:.2f}%</code>
πŸ’° Profit: <code>${data['potential_profit']:.0f}</code>
πŸ“ˆ Margin: <code>{data['profit_margin']:.1f}%</code>
"""
# Recommendation
best_strategy = max(comparison_data, key=lambda x: x['profit_margin'])
message += f"πŸ’‘ <b>Recommendation:</b> {self.strategies[best_strategy['strategy']]['emoji']} {best_strategy['strategy'].upper()}"
return message
def create_chart(self, ticker, indicators, df, strategy):
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 10))
fig.suptitle(f'{ticker.upper()} - {strategy}', fontsize=14, fontweight='bold')
current_price = indicators['current_price']
# 1. Order distribution
colors = ['#FF6B6B' if t == 'Fibonacci' else '#4ECDC4' if t == 'Geometric' else '#45B7D1'
for t in df['Type']]
ax1.barh(df['Level'], df['OrderSize'], color=colors, alpha=0.8)
ax1.set_xlabel('Order Size ($)')
ax1.set_ylabel('Level')
ax1.set_title('πŸ“Š Order Distribution')
ax1.grid(True, alpha=0.3)
# 2. Level prices
ax2.scatter(df['Price'], df['%Drop'], c=colors, s=df['OrderSize'] / 30, alpha=0.8)
ax2.axvline(x=current_price, color='red', linestyle='--', alpha=0.7,
label=f'${current_price:.2f}')
ax2.set_xlabel('Price ($)')
ax2.set_ylabel('% Drop')
ax2.set_title('🎯 Levels and Drops')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 3. Cumulative capital
cumulative = df['OrderSize'].cumsum()
ax3.plot(df['Level'], cumulative, marker='o', linewidth=2, color='#FF6B6B')
ax3.fill_between(df['Level'], cumulative, alpha=0.3, color='#FF6B6B')
ax3.set_xlabel('Level')
ax3.set_ylabel('Capital ($)')
ax3.set_title('πŸ’° Cumulative Capital')
ax3.grid(True, alpha=0.3)
# 4. Level types
type_counts = df['Type'].value_counts()
colors_pie = ['#FF6B6B', '#4ECDC4', '#45B7D1']
ax4.pie(type_counts.values, labels=type_counts.index, autopct='%1.1f%%',
startangle=90, colors=colors_pie)
ax4.set_title('πŸ“ˆ Level Types')
plt.tight_layout()
# Save to bytes
buf = io.BytesIO()
plt.savefig(buf, format='png', dpi=150, bbox_inches='tight')
buf.seek(0)
chart_bytes = buf.getvalue()
buf.close()
plt.close()
return chart_bytes
async def create_grid_chart_async(self, ticker: str, indicators: Dict,
df: pd.DataFrame, strategy: str) -> bytes:
"""Asynchronous grid chart creation"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.chart_executor, self.create_chart, ticker, indicators, df, strategy
)
# --- Telegram Bot Integration Functions ---
async def generate_grid_message(ticker: str, capital: float = 10000,
strategy: str = "medium") -> Tuple[str, Optional[bytes]]:
"""
Grid message and chart generation for Telegram bot
Returns:
Tuple[str, Optional[bytes]]: (message, chart in bytes)
"""
generator = AsyncTradingGridGenerator()
try:
indicators, df, metrics = await generator.generate_grid_async(ticker, capital, strategy)
message = generator.format_telegram_message(ticker, indicators, df, metrics, strategy)
chart = await generator.create_grid_chart_async(ticker, indicators, df, strategy)
return message, chart
except Exception as e:
error_message = f"❌ <b>Grid generation error</b>\n\nπŸ” Ticker: <code>{ticker}</code>\nπŸ’« Error: <i>{str(e)}</i>"
logger.error(f"Grid generation error for {ticker}: {e}", exc_info=True)
return error_message, None
async def compare_strategies_message(ticker: str, capital: float = 10000,
strategies: List[str] = None) -> str:
"""
Strategy comparison for Telegram bot
Returns:
str: Formatted message for Telegram
"""
if strategies is None:
strategies = ["conservative", "medium", "aggressive"]
generator = AsyncTradingGridGenerator()
comparison_data = []
try:
for strategy in strategies:
try:
indicators, df, metrics = await generator.generate_grid_async(ticker, capital, strategy)
comparison_data.append({
'strategy': strategy,
'orders': metrics['number_of_orders'],
'max_drawdown': metrics['max_drawdown'],
'potential_profit': metrics['potential_profit'],
'profit_margin': metrics['profit_margin'],
})
except Exception as e:
logger.error(f"Error for strategy {strategy}: {e}")
continue
if comparison_data:
return generator.format_comparison_message(ticker, comparison_data)
else:
return f"❌ <b>Error</b>\n\nCould not retrieve data for {ticker.upper()}"
except Exception as e:
logger.error(f"Strategy comparison error for {ticker}: {e}")
return f"❌ <b>Comparison error</b>\n\nπŸ” Ticker: <code>{ticker}</code>\nπŸ’« Error: <i>{str(e)}</i>"
async def get_available_strategies() -> Dict[str, Dict]:
"""Get list of available strategies"""
generator = AsyncTradingGridGenerator()
return generator.strategies
async def validate_ticker(ticker: str) -> bool:
"""Ticker validation"""
generator = AsyncTradingGridGenerator()
try:
await generator.get_stock_data_async(ticker, period="5d")
return True
except:
return False
# --- Example usage for Telegram bot ---
class TelegramGridBot:
"""
Example class for Telegram bot integration
"""
def __init__(self):
self.generator = AsyncTradingGridGenerator()
async def handle_grid_command(self, ticker: str, capital: str = "10000",
strategy: str = "medium") -> Tuple[str, Optional[bytes]]:
"""Grid generation command handler"""
try:
capital_float = float(capital)
if capital_float <= 0:
return "❌ Capital must be a positive number", None
return await generate_grid_message(ticker.upper(), capital_float, strategy.lower())
except ValueError:
return "❌ Invalid capital format. Enter a number.", None
except Exception as e:
return f"❌ An error occurred: {str(e)}", None
async def handle_compare_command(self, ticker: str, capital: str = "10000") -> str:
"""Strategy comparison command handler"""
try:
capital_float = float(capital)
if capital_float <= 0:
return "❌ Capital must be a positive number"
return await compare_strategies_message(ticker.upper(), capital_float)
except ValueError:
return "❌ Invalid capital format. Enter a number."
except Exception as e:
return f"❌ An error occurred: {str(e)}"
async def handle_strategies_command(self) -> str:
"""Available strategies list"""
strategies = await get_available_strategies()
message = "πŸ“‹ <b>AVAILABLE STRATEGIES:</b>\n\n"
for name, info in strategies.items():
message += f"""{info['emoji']} <b>{name.upper()}</b>
{info['description']}
πŸ“ˆ Growth: <code>{info['growth']}</code>
πŸ“‰ Max drop: <code>{info['drop_pct']:.0%}</code>
🎯 Levels: <code>{info['levels']}</code>
"""
return message
# --- Example execution ---
async def main():
"""Usage example"""
ticker = "NVDA"
# Grid generation
message, chart = await generate_grid_message(ticker, 10000, "aggressive")
with open('chart.png', 'wb') as f:
f.write(chart)
print("Message:", message)
print("Chart generated:", chart is not None)
# Strategy comparison
comparison = await compare_strategies_message(ticker, 10000)
print("\nComparison:", comparison)
if __name__ == "__main__":
asyncio.run(main())