import os from typing import Optional, Dict from dotenv import load_dotenv import chainlit as cl from phi.assistant import Assistant from phi.llm.openai import OpenAIChat from phi.tools.duckduckgo import DuckDuckGo from phi.tools.yfinance import YFinanceTools from src.databases.postgres import sqlalchemy_engine from src.knowledge_bases.combined import knowledge_base from src.tools.crypto_swap_toolkit import CryptoSwapTools from src.tools.crypto_bridge_toolkit import CrossChainSwapTools from src.tools.crypto_data_toolkit import CryptoDataTools from portkey_ai import PORTKEY_GATEWAY_URL, createHeaders from src.config.portkey_config import generate_portkey_config from src.tools.user_profile_toolkit import UserProfileToolkit from phi.storage.assistant.postgres import PgAssistantStorage from src.tools.crypto_evm_wallet_toolkit import CryptoEVMWalletTools from src.tools.user_confirmation_pin_toolkit import UserConfirmationPinToolkit # Load environment variables load_dotenv() # Initialize storage storage = PgAssistantStorage( table_name="assistant_runs", db_engine=sqlalchemy_engine ) # OAuth callback @cl.oauth_callback def oauth_callback(provider_id: str, token: str, raw_user_data: Dict[str, str], default_user: cl.User) -> Optional[cl.User]: return cl.User( identifier=raw_user_data.get('email'), email=raw_user_data.get('email'), name=raw_user_data.get('name'), metadata={**raw_user_data, **({'image': default_user.metadata.get('image')})} ) # Password authentication callback @cl.password_auth_callback def auth_callback(username: str, password: str) -> Optional[cl.User]: if (username, password) == ("admin", "admin"): user = { "identifier": "admin", "email": username, "metadata": {"role": "admin", "provider": "credentials"} } cl.user_session.set("user_profile", user) return cl.User( identifier="admin", email=username, metadata={"role": "admin", "provider": "credentials"} ) return None # Set starters @cl.set_starters async def set_starters(): return [ cl.Starter( label="Create a crypto wallet for me", message="create an evm crypto wallet for me", icon="/public/wallet-svgrepo-com.svg", ), cl.Starter( label="Latest News on defi, crypto and solana", message="What news are currently trending on defi.", icon="/public/news-svgrepo-com.svg", ), cl.Starter( label="Get price of BTC, ETH and PEPE", message="Get me the price of BTC, ETH and PEPE", icon="/public/coins-electronics-svgrepo-com.svg", ), cl.Starter( label="Get trending stocks", message="Get latest stock", icon="/public/stockchart-svgrepo-com.svg", ) ] # On chat start @cl.on_chat_start async def start(): is_dev_mode = bool(os.getenv("DEV_MODE")) portkey_local_gateway = bool(os.getenv("PORTKEY_LOCAL_GATEWAY_URL")) portkey_config = generate_portkey_config(local=portkey_local_gateway) XBTfinancialmarketresearcher = Assistant( name="XBT-financial-market-researcher", llm=OpenAIChat(model="gpt-4o"), description="A specialized assistant for researching the financial market, with a focus on crypto, Web3, and DeFi news and trends.", instructions=[ "Use DuckDuckGo for general web searches and recent news.", "Use YFinanceTools for obtaining stock prices and related financial data.", "Use CryptoDataTools for accessing cryptocurrency prices and market information.", "Incorporate relevant news and trends in crypto, Web3, and DeFi into your responses.", "Provide accurate and up-to-date information with citations when necessary.", ], markdown=True, role="Helps research the financial market and get news and trends on crypto, Web3, and DeFi", tools=[ DuckDuckGo(), YFinanceTools(stock_price=True), CryptoDataTools(), ], show_tool_calls=is_dev_mode, knowledge_base=knowledge_base, storage=storage, search_knowledge=True, read_chat_history=True, add_references_to_prompt=True, add_chat_history_to_prompt=True, ) # Initialize the assistant cxbt_assistant = Assistant( introduction="Hi, I'm ChatXBT, your AI assistant for Web3 and DeFi. I can help you with your queries related to Web3 and DeFi.", llm=OpenAIChat( model="gpt-4o" # api_key=os.getenv("PORTKEY_API_KEY"), # # base_url=os.getenv("PORTKEY_LOCAL_GATEWAY_URL") or PORTKEY_GATEWAY_URL, # base_url=PORTKEY_GATEWAY_URL, # default_headers=createHeaders( # api_key=os.getenv("PORTKEY_API_KEY") or None, # config=portkey_config # ), ), tools=[ DuckDuckGo(), # CryptoDataTools(), CryptoSwapTools(), CrossChainSwapTools(), UserProfileToolkit(), CryptoEVMWalletTools(), # UserConfirmationPinToolkit(), # YFinanceTools(stock_price=True) ], team=[XBTfinancialmarketresearcher], show_tool_calls=False, markdown=True, knowledge_base=knowledge_base, storage=storage, search_knowledge=True, read_chat_history=True, add_references_to_prompt=True, add_chat_history_to_prompt=True, prevent_hallucinations=True, prevent_prompt_injection=True, add_datetime_to_instructions=True ) cxbt_assistant.knowledge_base.load(recreate=False) # Set the assistant in the user session cl.user_session.set("agent", cxbt_assistant) # On message @cl.on_message async def main(message: cl.Message): msg = cl.Message(content="") # Retrieve the assistant from the user session agent = cl.user_session.get("agent") # Process the user message using the assistant try: response_generator = agent.run(message=message.content, stream=True) for delta in response_generator: await msg.stream_token(str(delta)) except TypeError as e: # Handle specific TypeError and log or print additional information for debugging print(f"Error occurred: {e}") await msg.stream_token(f"\n\n I encountetrd an error, please try again later.") await msg.send() # Run the Chainlit application if __name__ == "__main__": cl.run_sync()