| from __future__ import annotations
|
| from typing import Literal, TypedDict
|
| import asyncio
|
| import os
|
| import streamlit as st
|
| import json
|
| import datetime
|
| import logfire
|
| from supabase import Client
|
| from pydantic_ai.models.gemini import GeminiModel
|
| from supabase import create_client
|
|
|
|
|
| from pydantic_ai.messages import (
|
| ModelMessage,
|
| ModelRequest,
|
| ModelResponse,
|
| SystemPromptPart,
|
| UserPromptPart,
|
| TextPart,
|
| ToolCallPart,
|
| ToolReturnPart,
|
| RetryPromptPart,
|
| ModelMessagesTypeAdapter
|
| )
|
| from pydantic_ai_agent import PineScript_expert, PydanticAIDeps
|
|
|
|
|
| from dotenv import load_dotenv
|
| load_dotenv()
|
|
|
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
|
|
|
|
|
| supabase: Client = create_client(
|
| os.getenv("SUPABASE_URL1"),
|
| os.getenv("SUPABASE_SERVICE_KEY1")
|
| )
|
|
|
|
|
| logfire.configure(send_to_logfire='always')
|
|
|
| class ChatMessage(TypedDict):
|
| """Format of messages sent to the browser/API."""
|
| role: Literal['user', 'model']
|
| timestamp: str
|
| content: str
|
|
|
| def display_message_part(part):
|
| """
|
| Display a single part of a message in the Streamlit UI.
|
| Customize how you display system prompts, user prompts,
|
| tool calls, tool returns, etc.
|
| """
|
|
|
| if part.part_kind == 'system-prompt':
|
| with st.chat_message("system"):
|
| st.markdown(f"**System**: {part.content}")
|
|
|
| elif part.part_kind == 'user-prompt':
|
| with st.chat_message("user"):
|
| st.markdown(part.content)
|
|
|
| elif part.part_kind == 'text':
|
| with st.chat_message("assistant"):
|
| st.markdown(part.content)
|
|
|
| async def run_agent_with_streaming(user_input: str):
|
| """
|
| Run the agent with streaming text for the user_input prompt,
|
| while maintaining the entire conversation in `st.session_state.messages`.
|
| """
|
|
|
| deps = PydanticAIDeps(
|
| supabase=supabase,
|
| gemini=GeminiModel('gemini-1.5-flash', api_key=GOOGLE_API_KEY)
|
| )
|
| message_placeholder = st.empty()
|
| partial_text = ""
|
| try:
|
|
|
| async with PineScript_expert.run_stream(
|
| user_input,
|
| deps=deps,
|
| message_history=st.session_state.messages[:-1],
|
| ) as result:
|
|
|
| async for chunk in result.stream_text(delta=True):
|
| partial_text += chunk
|
| message_placeholder.markdown(partial_text)
|
| except Exception as e:
|
|
|
| message_placeholder.markdown("**Could you please ask again?**")
|
| st.error("Could you please ask again?")
|
|
|
| st.session_state.messages.append(
|
| ModelResponse(parts=[TextPart(content="Could you please ask again?")])
|
| )
|
| return
|
|
|
|
|
| filtered_messages = [
|
| msg for msg in result.new_messages()
|
| if not (hasattr(msg, 'parts') and any(part.part_kind == 'user-prompt' for part in msg.parts))
|
| ]
|
| st.session_state.messages.extend(filtered_messages)
|
| st.session_state.messages.append(
|
| ModelResponse(parts=[TextPart(content=partial_text)])
|
| )
|
|
|
|
|
| def store_user_query(user_query: str):
|
| """
|
| Stores the user's query in the 'user_queries' table.
|
| """
|
| try:
|
| result = supabase.table('user_queries').insert({
|
| 'query': user_query,
|
| 'timestamp': datetime.datetime.now().isoformat()
|
| }).execute()
|
| print("User query stored successfully.")
|
| except Exception as e:
|
| print(f"Error storing user query: {e}")
|
|
|
| async def main():
|
| st.title("PineScript Agentic RAG")
|
| st.write("Ask any question about PineScript, the hidden truths of its beauty lie within.")
|
|
|
|
|
| if "messages" not in st.session_state:
|
| st.session_state.messages = []
|
|
|
|
|
|
|
|
|
| for msg in st.session_state.messages:
|
| if isinstance(msg, ModelRequest) or isinstance(msg, ModelResponse):
|
| for part in msg.parts:
|
| display_message_part(part)
|
|
|
|
|
| user_input = st.chat_input("What questions do you have about PineScript?")
|
|
|
| if user_input:
|
|
|
| store_user_query(user_input)
|
| st.session_state.messages.append(
|
| ModelRequest(parts=[UserPromptPart(content=user_input)])
|
| )
|
|
|
|
|
| with st.chat_message("user"):
|
| st.markdown(user_input)
|
|
|
|
|
| with st.chat_message("assistant"):
|
|
|
| await run_agent_with_streaming(user_input)
|
|
|
| if __name__ == "__main__":
|
| asyncio.run(main()) |