from typing import List import typing from aiser import RestAiServer, KnowledgeBase, SemanticSearchResult, Agent from aiser.models import ChatMessage import asyncio import gradio as gr import requests import os import logging # Configure logging logging.basicConfig(level=logging.DEBUG) # Define environment variables API_URL = os.getenv("API_URL", "http://ec2-54-166-81-166.compute-1.amazonaws.com:3000/api/v1/prediction/117a5076-c05e-4208-91d9-d0e772bf981e") API_TOKEN = os.getenv("API_TOKEN", "Bearer 0Ouk5cgljCYuuF3LDfBkIAcuqj9hgWaaK5qRCLfbfrg=") class ChatBot: def __init__(self): self.history = [] def predict(self, input): new_user_input = input # User input should be converted into model input format # Prepare payload for API call payload = {"question": new_user_input} # Make an external API call headers = {"Authorization": API_TOKEN} response = requests.post(API_URL, headers=headers, json=payload) # Initialize the response text with an error message by default response_text = f"API call failed with status code {response.status_code}" if response.status_code == 200: response_text = response.text # Get the raw text response # Process the API response and update history self.history.append(response_text) # Log API request and response logging.debug(f"API Request: {API_URL}, Payload: {payload}, Headers: {headers}") logging.debug(f"API Response: {response.status_code}, Content: {response_text}") # Return the response text return response_text bot = ChatBot() title = "👋🏻النور اسلام میں خوش آمدید🌠" description = "یہاں آپ اسلام یا اپنی زندگی کے بارے میں سوالات پوچھ سکتے ہیں:" examples = ["محنت کے بارے میں قرآن کیا کہتا ہے؟"] iface = gr.Interface( fn=bot.predict, title=title, description=description, examples=examples, inputs="text", outputs="text") iface.launch() # Placeholder classes, replace with actual implementations class KnowledgeBaseExample(KnowledgeBase): def perform_semantic_search(self, query_text: str, desired_number_of_results: int) -> List[SemanticSearchResult]: result_example = SemanticSearchResult( content="This is an example of a semantic search result", score=0.5, ) return [result_example for _ in range(desired_number_of_results)] class AgentExample(Agent): async def reply(self, messages: typing.List[ChatMessage]) -> typing.AsyncGenerator[ChatMessage, None]: reply_message = "This is an example of a reply from an agent" for character in reply_message: yield ChatMessage(text_content=character) await asyncio.sleep(0.1) if __name__ == '__main__': server = RestAiServer( agents=[ AgentExample( agent_id='10209b93-2dd0-47a0-8eb2-33fb018a783b' # replace with your agent id ), ], knowledge_bases=[ KnowledgeBaseExample( knowledge_base_id='85bc1c72-b8e0-4042-abcf-8eb2d478f207' # replace with your knowledge base id ), ], port=5000 ) server.run()