Spaces:
Runtime error
Runtime error
| import logging | |
| import os | |
| import sys | |
| from typing import Any | |
| from telegram import Update | |
| from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes | |
| from dotenv import load_dotenv | |
| from src.financial_news_requester import fetch_comp_financial_news | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| load_dotenv() | |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") | |
| def format_news_for_telegram(news_json: list[dict[str, Any]]) -> str: | |
| message = "" | |
| for item in news_json: | |
| message += ( | |
| f"π° <b>{item.get('headline')}</b>\n" | |
| f"π {item.get('summary')}\n" | |
| f"π·οΈ Source: {item.get('source')}\n" | |
| f"π <a href=\"{item.get('url')}\">Read more</a>\n\n" | |
| ) | |
| return message | |
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| await update.message.reply_text("Hello! I'm your Financial Bot.") | |
| async def run_crew(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| await update.message.reply_text("Running ...") | |
| try: | |
| feed = fetch_comp_financial_news() | |
| logging.info(f"processed: {feed} news") | |
| await update.message.reply_text(f"Result:\n{format_news_for_telegram(feed)}", parse_mode='HTML') | |
| except Exception as e: | |
| await update.message.reply_text(f"Error: {e}") | |
| if __name__ == "__main__": | |
| logging.basicConfig(level=logging.INFO) | |
| app = ApplicationBuilder().token(TELEGRAM_TOKEN).build() | |
| app.add_handler(CommandHandler("start", start)) | |
| app.add_handler(CommandHandler("run", run_crew)) | |
| app.run_polling() | |