|
|
import asyncio |
|
|
import aiohttp |
|
|
import yfinance as yf |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import warnings |
|
|
from typing import Dict, List, Tuple, Optional, Union |
|
|
from datetime import datetime, timedelta |
|
|
import matplotlib |
|
|
matplotlib.use('Agg') |
|
|
import matplotlib.pyplot as plt |
|
|
import seaborn as sns |
|
|
import io |
|
|
import base64 |
|
|
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor |
|
|
import logging |
|
|
|
|
|
warnings.filterwarnings('ignore') |
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class AsyncTradingGridGenerator: |
|
|
""" |
|
|
Asynchronous trading grid generator for Telegram bots |
|
|
""" |
|
|
def __init__(self): |
|
|
self.strategies = { |
|
|
"conservative": { |
|
|
"growth": 1.15, |
|
|
"drop_pct": 0.2, |
|
|
"levels": 5, |
|
|
"risk_factor": 0.8, |
|
|
"description": "π‘οΈ Conservative strategy with minimal risk", |
|
|
"emoji": "π" |
|
|
}, |
|
|
"medium": { |
|
|
"growth": 1.3, |
|
|
"drop_pct": 0.35, |
|
|
"levels": 8, |
|
|
"risk_factor": 1.0, |
|
|
"description": "βοΈ Balanced strategy with medium aggression", |
|
|
"emoji": "π―" |
|
|
}, |
|
|
"aggressive": { |
|
|
"growth": 1.5, |
|
|
"drop_pct": 0.5, |
|
|
"levels": 12, |
|
|
"risk_factor": 1.3, |
|
|
"description": "π Aggressive strategy with high profitability", |
|
|
"emoji": "π₯" |
|
|
}, |
|
|
"ultra_aggressive": { |
|
|
"growth": 1.8, |
|
|
"drop_pct": 0.7, |
|
|
"levels": 15, |
|
|
"risk_factor": 1.6, |
|
|
"description": "β‘ Maximum aggressive strategy", |
|
|
"emoji": "π₯" |
|
|
} |
|
|
} |
|
|
self.executor = ProcessPoolExecutor(max_workers=4) |
|
|
self.chart_executor = ThreadPoolExecutor(max_workers=2) |
|
|
|
|
|
@staticmethod |
|
|
def fetch_data(ticker: str, period: str = "1y") -> pd.DataFrame: |
|
|
try: |
|
|
stock = yf.Ticker(ticker) |
|
|
data = stock.history(period=period) |
|
|
if data.empty: |
|
|
raise ValueError(f"Data for ticker {ticker} not found") |
|
|
|
|
|
data['SMA_20'] = data['Close'].rolling(20).mean() |
|
|
data['SMA_50'] = data['Close'].rolling(50).mean() |
|
|
data['Volatility'] = data['Close'].pct_change().rolling(20).std() * np.sqrt(252) |
|
|
return data |
|
|
except Exception as e: |
|
|
logger.error(f"Error getting data for {ticker}: {e}") |
|
|
raise ValueError(f"Data retrieval error: {e}") |
|
|
|
|
|
async def get_stock_data_async(self, ticker: str, period: str = "1y") -> pd.DataFrame: |
|
|
"""Asynchronous stock data retrieval""" |
|
|
loop = asyncio.get_event_loop() |
|
|
return await loop.run_in_executor( |
|
|
self.executor, AsyncTradingGridGenerator.fetch_data, ticker, period |
|
|
) |
|
|
|
|
|
def calculate_technical_indicators(self, data: pd.DataFrame) -> Dict[str, float]: |
|
|
"""Technical indicators calculation""" |
|
|
current_price = data['Close'].iloc[-1] |
|
|
|
|
|
high_low = data['High'] - data['Low'] |
|
|
high_close = np.abs(data['High'] - data['Close'].shift()) |
|
|
low_close = np.abs(data['Low'] - data['Close'].shift()) |
|
|
tr = np.maximum.reduce([high_low, high_close, low_close]) |
|
|
tr_series = pd.Series(tr, index=data.index) |
|
|
atr = tr_series.rolling(14).mean().iloc[-1] |
|
|
|
|
|
support = data['Low'].rolling(20).min().iloc[-1] |
|
|
resistance = data['High'].rolling(20).max().iloc[-1] |
|
|
|
|
|
volatility = data['Volatility'].iloc[-1] if not pd.isna(data['Volatility'].iloc[-1]) else 0.25 |
|
|
|
|
|
delta = data['Close'].diff() |
|
|
gain = (delta.where(delta > 0, 0)).rolling(14).mean() |
|
|
loss = (-delta.where(delta < 0, 0)).rolling(14).mean() |
|
|
rs = gain / loss |
|
|
rsi = 100 - (100 / (1 + rs)) |
|
|
return { |
|
|
'current_price': current_price, |
|
|
'atr': atr, |
|
|
'support': support, |
|
|
'resistance': resistance, |
|
|
'volatility': volatility, |
|
|
'rsi': rsi.iloc[-1] if not pd.isna(rsi.iloc[-1]) else 50, |
|
|
'sma_20': data['SMA_20'].iloc[-1] if not pd.isna(data['SMA_20'].iloc[-1]) else current_price, |
|
|
'sma_50': data['SMA_50'].iloc[-1] if not pd.isna(data['SMA_50'].iloc[-1]) else current_price |
|
|
} |
|
|
|
|
|
def fibonacci_levels(self, current_price: float, low: float, high: float) -> List[float]: |
|
|
"""Fibonacci levels calculation""" |
|
|
ratios = [0.236, 0.382, 0.5, 0.618, 0.786, 0.886] |
|
|
levels = [] |
|
|
for ratio in ratios: |
|
|
level = high - (high - low) * ratio |
|
|
if level < current_price: |
|
|
levels.append(level) |
|
|
return sorted(levels, reverse=True) |
|
|
|
|
|
def geometric_levels(self, current_price: float, drop_pct: float, levels: int) -> List[float]: |
|
|
"""Geometric levels""" |
|
|
min_price = current_price * (1 - drop_pct) |
|
|
step = (current_price - min_price) / levels |
|
|
return [current_price - step * (i + 1) for i in range(levels)] |
|
|
|
|
|
def volatility_adjusted_levels(self, current_price: float, atr: float, |
|
|
volatility: float, levels: int) -> List[float]: |
|
|
"""Volatility-based levels""" |
|
|
vol_step = atr * (1 + volatility) |
|
|
return [current_price - vol_step * (i + 1) for i in range(levels)] |
|
|
|
|
|
def combine_and_optimize_levels(self, fib_levels: List[float], |
|
|
geom_levels: List[float], |
|
|
vol_levels: List[float], |
|
|
current_price: float, |
|
|
min_distance: float = 0.02) -> List[float]: |
|
|
"""Combining and optimizing levels""" |
|
|
all_levels = fib_levels + geom_levels + vol_levels |
|
|
|
|
|
unique_levels = list(set([round(level, 2) for level in all_levels |
|
|
if level < current_price and level > 0])) |
|
|
unique_levels.sort(reverse=True) |
|
|
|
|
|
optimized_levels = [] |
|
|
last_level = current_price |
|
|
for level in unique_levels: |
|
|
distance = abs(last_level - level) / current_price |
|
|
if distance >= min_distance: |
|
|
optimized_levels.append(level) |
|
|
last_level = level |
|
|
return optimized_levels |
|
|
|
|
|
def calculate_position_sizes(self, levels: List[float], current_price: float, |
|
|
capital: float, growth: float, |
|
|
risk_factor: float) -> List[float]: |
|
|
"""Position sizes calculation""" |
|
|
n = len(levels) |
|
|
if n == 0: |
|
|
return [] |
|
|
|
|
|
weights = [] |
|
|
for i, level in enumerate(levels): |
|
|
drop_pct = (current_price - level) / current_price |
|
|
weight = (growth ** i) * (1 + drop_pct * risk_factor) |
|
|
weights.append(weight) |
|
|
|
|
|
total_weight = sum(weights) |
|
|
position_sizes = [(weight / total_weight) * capital for weight in weights] |
|
|
return position_sizes |
|
|
|
|
|
def calculate_grid_metrics(self, df: pd.DataFrame, current_price: float) -> Dict[str, float]: |
|
|
"""Grid metrics calculation""" |
|
|
if df.empty: |
|
|
return {} |
|
|
total_capital = df['OrderSize'].sum() |
|
|
max_drawdown = df['%Drop'].max() |
|
|
avg_order_size = df['OrderSize'].mean() |
|
|
|
|
|
potential_profit = 0 |
|
|
for _, row in df.iterrows(): |
|
|
shares = row['OrderSize'] / row['Price'] |
|
|
profit = shares * (current_price - row['Price']) |
|
|
potential_profit += profit |
|
|
return { |
|
|
'total_capital': total_capital, |
|
|
'max_drawdown': max_drawdown, |
|
|
'avg_order_size': avg_order_size, |
|
|
'potential_profit': potential_profit, |
|
|
'profit_margin': (potential_profit / total_capital) * 100 if total_capital > 0 else 0, |
|
|
'number_of_orders': len(df) |
|
|
} |
|
|
|
|
|
async def generate_grid_async(self, ticker: str, capital: float = 10000, |
|
|
strategy: str = "medium") -> Tuple[Dict, pd.DataFrame, Dict]: |
|
|
"""Asynchronous grid generation""" |
|
|
if strategy not in self.strategies: |
|
|
raise ValueError(f"Unknown strategy: {strategy}") |
|
|
|
|
|
data = await self.get_stock_data_async(ticker) |
|
|
indicators = self.calculate_technical_indicators(data) |
|
|
strategy_params = self.strategies[strategy] |
|
|
current_price = indicators['current_price'] |
|
|
|
|
|
fib_levels = self.fibonacci_levels( |
|
|
current_price, |
|
|
indicators['support'], |
|
|
indicators['resistance'] |
|
|
) |
|
|
geom_levels = self.geometric_levels( |
|
|
current_price, |
|
|
strategy_params['drop_pct'], |
|
|
strategy_params['levels'] |
|
|
) |
|
|
vol_levels = self.volatility_adjusted_levels( |
|
|
current_price, |
|
|
indicators['atr'], |
|
|
indicators['volatility'], |
|
|
strategy_params['levels'] // 2 |
|
|
) |
|
|
|
|
|
combined_levels = self.combine_and_optimize_levels( |
|
|
fib_levels, geom_levels, vol_levels, current_price |
|
|
) |
|
|
if not combined_levels: |
|
|
raise ValueError("Failed to generate grid levels") |
|
|
|
|
|
position_sizes = self.calculate_position_sizes( |
|
|
combined_levels, |
|
|
current_price, |
|
|
capital, |
|
|
strategy_params['growth'], |
|
|
strategy_params['risk_factor'] |
|
|
) |
|
|
|
|
|
df = pd.DataFrame({ |
|
|
'Level': range(1, len(combined_levels) + 1), |
|
|
'Price': combined_levels, |
|
|
'OrderSize': position_sizes, |
|
|
'Shares': [size / price for size, price in zip(position_sizes, combined_levels)], |
|
|
'%Drop': [(current_price - price) / current_price * 100 for price in combined_levels], |
|
|
'Distance_ATR': [(current_price - price) / indicators['atr'] for price in combined_levels] |
|
|
}) |
|
|
|
|
|
df['Type'] = 'Combined' |
|
|
for i, price in enumerate(combined_levels): |
|
|
if price in fib_levels: |
|
|
df.loc[i, 'Type'] = 'Fibonacci' |
|
|
elif abs(price - min(geom_levels, key=lambda x: abs(x - price))) < 0.01: |
|
|
df.loc[i, 'Type'] = 'Geometric' |
|
|
|
|
|
metrics = self.calculate_grid_metrics(df, current_price) |
|
|
return indicators, df, metrics |
|
|
|
|
|
def format_telegram_message(self, ticker: str, indicators: Dict, df: pd.DataFrame, |
|
|
metrics: Dict, strategy: str) -> str: |
|
|
"""Telegram message formatting""" |
|
|
strategy_info = self.strategies[strategy] |
|
|
|
|
|
trend_emoji = "π" if indicators['current_price'] > indicators['sma_20'] else "π" |
|
|
rsi_status = "π΄ Oversold" if indicators['rsi'] < 30 else "π’ Overbought" if indicators[ |
|
|
'rsi'] > 70 else "π‘ Neutral" |
|
|
message = f"""π― <b>TRADING GRID {ticker.upper()}</b> |
|
|
{strategy_info['emoji']} <b>Strategy:</b> {strategy.upper()} |
|
|
{strategy_info['description']} |
|
|
|
|
|
π <b>CURRENT INDICATORS:</b> |
|
|
π° Price: <code>${indicators['current_price']:.2f}</code> {trend_emoji} |
|
|
π RSI: <code>{indicators['rsi']:.1f}</code> {rsi_status} |
|
|
β‘ Volatility: <code>{indicators['volatility']:.1%}</code> |
|
|
π― ATR: <code>${indicators['atr']:.2f}</code> |
|
|
π» Support: <code>${indicators['support']:.2f}</code> |
|
|
πΊ Resistance: <code>${indicators['resistance']:.2f}</code> |
|
|
|
|
|
π <b>GRID METRICS:</b> |
|
|
π΅ Capital: <code>${metrics['total_capital']:.2f}</code> |
|
|
π Max drawdown: <code>{metrics['max_drawdown']:.2f}%</code> |
|
|
π― Orders: <code>{metrics['number_of_orders']}</code> |
|
|
π° Potential: <code>${metrics['potential_profit']:.2f}</code> |
|
|
π Margin: <code>{metrics['profit_margin']:.2f}%</code> |
|
|
|
|
|
π― <b>TOP-{min(5, len(df))} LEVELS:</b> |
|
|
``` |
|
|
β Price Size Drop Type |
|
|
""" |
|
|
|
|
|
for i in range(min(5, len(df))): |
|
|
row = df.iloc[i] |
|
|
type_emoji = "π" if row['Type'] == 'Fibonacci' else "π" if row['Type'] == 'Geometric' else "β‘" |
|
|
message += f"{row['Level']:2.0f} ${row['Price']:6.2f} ${row['OrderSize']:7.0f} {row['%Drop']:6.2f}% {type_emoji}\n" |
|
|
message += "```" |
|
|
if len(df) > 5: |
|
|
message += f"\nπ <i>Showing {min(5, len(df))} of {len(df)} levels</i>" |
|
|
return message |
|
|
|
|
|
def format_comparison_message(self, ticker: str, comparison_data: List[Dict]) -> str: |
|
|
"""Strategy comparison message formatting for Telegram""" |
|
|
message = f"""π <b>STRATEGY COMPARISON {ticker.upper()}</b> |
|
|
""" |
|
|
for data in comparison_data: |
|
|
strategy = data['strategy'] |
|
|
strategy_info = self.strategies[strategy] |
|
|
message += f"""{strategy_info['emoji']} <b>{strategy.upper()}</b> |
|
|
π― Orders: <code>{data['orders']}</code> |
|
|
π Drawdown: <code>{data['max_drawdown']:.2f}%</code> |
|
|
π° Profit: <code>${data['potential_profit']:.0f}</code> |
|
|
π Margin: <code>{data['profit_margin']:.1f}%</code> |
|
|
""" |
|
|
|
|
|
best_strategy = max(comparison_data, key=lambda x: x['profit_margin']) |
|
|
message += f"π‘ <b>Recommendation:</b> {self.strategies[best_strategy['strategy']]['emoji']} {best_strategy['strategy'].upper()}" |
|
|
return message |
|
|
|
|
|
def create_chart(self, ticker, indicators, df, strategy): |
|
|
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 10)) |
|
|
fig.suptitle(f'{ticker.upper()} - {strategy}', fontsize=14, fontweight='bold') |
|
|
current_price = indicators['current_price'] |
|
|
|
|
|
colors = ['#FF6B6B' if t == 'Fibonacci' else '#4ECDC4' if t == 'Geometric' else '#45B7D1' |
|
|
for t in df['Type']] |
|
|
ax1.barh(df['Level'], df['OrderSize'], color=colors, alpha=0.8) |
|
|
ax1.set_xlabel('Order Size ($)') |
|
|
ax1.set_ylabel('Level') |
|
|
ax1.set_title('π Order Distribution') |
|
|
ax1.grid(True, alpha=0.3) |
|
|
|
|
|
ax2.scatter(df['Price'], df['%Drop'], c=colors, s=df['OrderSize'] / 30, alpha=0.8) |
|
|
ax2.axvline(x=current_price, color='red', linestyle='--', alpha=0.7, |
|
|
label=f'${current_price:.2f}') |
|
|
ax2.set_xlabel('Price ($)') |
|
|
ax2.set_ylabel('% Drop') |
|
|
ax2.set_title('π― Levels and Drops') |
|
|
ax2.legend() |
|
|
ax2.grid(True, alpha=0.3) |
|
|
|
|
|
cumulative = df['OrderSize'].cumsum() |
|
|
ax3.plot(df['Level'], cumulative, marker='o', linewidth=2, color='#FF6B6B') |
|
|
ax3.fill_between(df['Level'], cumulative, alpha=0.3, color='#FF6B6B') |
|
|
ax3.set_xlabel('Level') |
|
|
ax3.set_ylabel('Capital ($)') |
|
|
ax3.set_title('π° Cumulative Capital') |
|
|
ax3.grid(True, alpha=0.3) |
|
|
|
|
|
type_counts = df['Type'].value_counts() |
|
|
colors_pie = ['#FF6B6B', '#4ECDC4', '#45B7D1'] |
|
|
ax4.pie(type_counts.values, labels=type_counts.index, autopct='%1.1f%%', |
|
|
startangle=90, colors=colors_pie) |
|
|
ax4.set_title('π Level Types') |
|
|
plt.tight_layout() |
|
|
|
|
|
buf = io.BytesIO() |
|
|
plt.savefig(buf, format='png', dpi=150, bbox_inches='tight') |
|
|
buf.seek(0) |
|
|
chart_bytes = buf.getvalue() |
|
|
buf.close() |
|
|
plt.close() |
|
|
return chart_bytes |
|
|
|
|
|
async def create_grid_chart_async(self, ticker: str, indicators: Dict, |
|
|
df: pd.DataFrame, strategy: str) -> bytes: |
|
|
"""Asynchronous grid chart creation""" |
|
|
loop = asyncio.get_event_loop() |
|
|
return await loop.run_in_executor( |
|
|
self.chart_executor, self.create_chart, ticker, indicators, df, strategy |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def generate_grid_message(ticker: str, capital: float = 10000, |
|
|
strategy: str = "medium") -> Tuple[str, Optional[bytes]]: |
|
|
""" |
|
|
Grid message and chart generation for Telegram bot |
|
|
|
|
|
Returns: |
|
|
Tuple[str, Optional[bytes]]: (message, chart in bytes) |
|
|
""" |
|
|
generator = AsyncTradingGridGenerator() |
|
|
try: |
|
|
indicators, df, metrics = await generator.generate_grid_async(ticker, capital, strategy) |
|
|
message = generator.format_telegram_message(ticker, indicators, df, metrics, strategy) |
|
|
chart = await generator.create_grid_chart_async(ticker, indicators, df, strategy) |
|
|
return message, chart |
|
|
except Exception as e: |
|
|
error_message = f"β <b>Grid generation error</b>\n\nπ Ticker: <code>{ticker}</code>\nπ« Error: <i>{str(e)}</i>" |
|
|
logger.error(f"Grid generation error for {ticker}: {e}", exc_info=True) |
|
|
return error_message, None |
|
|
|
|
|
|
|
|
async def compare_strategies_message(ticker: str, capital: float = 10000, |
|
|
strategies: List[str] = None) -> str: |
|
|
""" |
|
|
Strategy comparison for Telegram bot |
|
|
|
|
|
Returns: |
|
|
str: Formatted message for Telegram |
|
|
""" |
|
|
if strategies is None: |
|
|
strategies = ["conservative", "medium", "aggressive"] |
|
|
generator = AsyncTradingGridGenerator() |
|
|
comparison_data = [] |
|
|
try: |
|
|
for strategy in strategies: |
|
|
try: |
|
|
indicators, df, metrics = await generator.generate_grid_async(ticker, capital, strategy) |
|
|
comparison_data.append({ |
|
|
'strategy': strategy, |
|
|
'orders': metrics['number_of_orders'], |
|
|
'max_drawdown': metrics['max_drawdown'], |
|
|
'potential_profit': metrics['potential_profit'], |
|
|
'profit_margin': metrics['profit_margin'], |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error for strategy {strategy}: {e}") |
|
|
continue |
|
|
if comparison_data: |
|
|
return generator.format_comparison_message(ticker, comparison_data) |
|
|
else: |
|
|
return f"β <b>Error</b>\n\nCould not retrieve data for {ticker.upper()}" |
|
|
except Exception as e: |
|
|
logger.error(f"Strategy comparison error for {ticker}: {e}") |
|
|
return f"β <b>Comparison error</b>\n\nπ Ticker: <code>{ticker}</code>\nπ« Error: <i>{str(e)}</i>" |
|
|
|
|
|
|
|
|
async def get_available_strategies() -> Dict[str, Dict]: |
|
|
"""Get list of available strategies""" |
|
|
generator = AsyncTradingGridGenerator() |
|
|
return generator.strategies |
|
|
|
|
|
|
|
|
async def validate_ticker(ticker: str) -> bool: |
|
|
"""Ticker validation""" |
|
|
generator = AsyncTradingGridGenerator() |
|
|
try: |
|
|
await generator.get_stock_data_async(ticker, period="5d") |
|
|
return True |
|
|
except: |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
class TelegramGridBot: |
|
|
""" |
|
|
Example class for Telegram bot integration |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.generator = AsyncTradingGridGenerator() |
|
|
|
|
|
async def handle_grid_command(self, ticker: str, capital: str = "10000", |
|
|
strategy: str = "medium") -> Tuple[str, Optional[bytes]]: |
|
|
"""Grid generation command handler""" |
|
|
try: |
|
|
capital_float = float(capital) |
|
|
if capital_float <= 0: |
|
|
return "β Capital must be a positive number", None |
|
|
|
|
|
return await generate_grid_message(ticker.upper(), capital_float, strategy.lower()) |
|
|
except ValueError: |
|
|
return "β Invalid capital format. Enter a number.", None |
|
|
except Exception as e: |
|
|
return f"β An error occurred: {str(e)}", None |
|
|
|
|
|
async def handle_compare_command(self, ticker: str, capital: str = "10000") -> str: |
|
|
"""Strategy comparison command handler""" |
|
|
try: |
|
|
capital_float = float(capital) |
|
|
if capital_float <= 0: |
|
|
return "β Capital must be a positive number" |
|
|
return await compare_strategies_message(ticker.upper(), capital_float) |
|
|
except ValueError: |
|
|
return "β Invalid capital format. Enter a number." |
|
|
except Exception as e: |
|
|
return f"β An error occurred: {str(e)}" |
|
|
|
|
|
async def handle_strategies_command(self) -> str: |
|
|
"""Available strategies list""" |
|
|
strategies = await get_available_strategies() |
|
|
message = "π <b>AVAILABLE STRATEGIES:</b>\n\n" |
|
|
for name, info in strategies.items(): |
|
|
message += f"""{info['emoji']} <b>{name.upper()}</b> |
|
|
{info['description']} |
|
|
π Growth: <code>{info['growth']}</code> |
|
|
π Max drop: <code>{info['drop_pct']:.0%}</code> |
|
|
π― Levels: <code>{info['levels']}</code> |
|
|
""" |
|
|
return message |
|
|
|
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Usage example""" |
|
|
ticker = "NVDA" |
|
|
|
|
|
message, chart = await generate_grid_message(ticker, 10000, "aggressive") |
|
|
with open('chart.png', 'wb') as f: |
|
|
f.write(chart) |
|
|
print("Message:", message) |
|
|
print("Chart generated:", chart is not None) |
|
|
|
|
|
|
|
|
comparison = await compare_strategies_message(ticker, 10000) |
|
|
print("\nComparison:", comparison) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |
|
|
|