Spaces:
Runtime error
Runtime error
File size: 4,141 Bytes
1d75522 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
"""Main entry point for the Advanced Agentic System."""
import asyncio
import logging
from typing import Dict, Any, Optional
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import gradio as gr
import uvicorn
from agentic_system import AgenticSystem
from reasoning.unified_engine import UnifiedReasoningEngine
from api.openai_compatible import OpenAICompatibleAPI
from api.venture_api import VentureAPI
from ui.venture_ui import VentureUI
from config import Config
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AgenticSystemApp:
"""Main application class integrating all components."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = Config(config)
# Initialize core components
self.reasoning_engine = UnifiedReasoningEngine(
min_confidence=self.config.get('min_confidence', 0.7),
parallel_threshold=self.config.get('parallel_threshold', 3),
learning_rate=self.config.get('learning_rate', 0.1)
)
self.agentic_system = AgenticSystem(
config=self.config.get('agentic_system', {})
)
# Initialize APIs
self.venture_api = VentureAPI(self.reasoning_engine)
self.openai_api = OpenAICompatibleAPI(self.reasoning_engine)
# Initialize FastAPI
self.app = FastAPI(
title="Advanced Agentic System",
description="Venture Strategy Optimizer with OpenAI-compatible API",
version="1.0.0"
)
# Setup middleware
self._setup_middleware()
# Setup routes
self._setup_routes()
# Initialize UI
self.ui = VentureUI(self.venture_api)
self.interface = self.ui.create_interface()
# Mount Gradio app
self.app = gr.mount_gradio_app(self.app, self.interface, path="/")
def _setup_middleware(self):
"""Setup FastAPI middleware."""
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def _setup_routes(self):
"""Setup API routes."""
# Include OpenAI-compatible routes
self.app.include_router(
self.openai_api.router,
tags=["OpenAI Compatible"]
)
# Health check
@self.app.get("/api/health")
async def health_check():
return {
"status": "healthy",
"version": "1.0.0",
"components": {
"reasoning_engine": "active",
"agentic_system": "active",
"openai_api": "active",
"venture_api": "active"
}
}
# System status
@self.app.get("/api/system/status")
async def system_status():
return await self.agentic_system.get_system_status()
# Reasoning endpoint
@self.app.post("/api/reason")
async def reason(query: str, context: Optional[Dict[str, Any]] = None):
return await self.reasoning_engine.reason(query, context or {})
# Venture analysis
@self.app.post("/api/venture/analyze")
async def analyze_venture(
venture_type: str,
description: str,
metrics: Optional[Dict[str, Any]] = None
):
return await self.venture_api.analyze_venture(
venture_type=venture_type,
description=description,
metrics=metrics or {}
)
def run(self, host: str = "0.0.0.0", port: int = 7860):
"""Run the application."""
uvicorn.run(
self.app,
host=host,
port=port,
workers=4
)
def main():
"""Main entry point."""
app = AgenticSystemApp()
app.run()
if __name__ == "__main__":
main()
|