modelx / src /graphs /combinedAgentGraph.py
nivakaran's picture
Upload folder using huggingface_hub
b4c4175 verified
"""
combinedAgentGraph.py - Main entry point for the Combined Agent System.
"""
from __future__ import annotations
from typing import Dict, Any
import logging
from datetime import datetime
from langgraph.graph import StateGraph, START, END
from src.llms.groqllm import GroqLLM
from src.states.combinedAgentState import CombinedAgentState
from src.nodes.combinedAgentNode import CombinedAgentNode
try:
from src.config.langsmith_config import LangSmithConfig
_langsmith = LangSmithConfig()
_langsmith.configure()
except ImportError:
pass
from src.graphs.socialAgentGraph import SocialGraphBuilder
from src.graphs.intelligenceAgentGraph import IntelligenceGraphBuilder
from src.graphs.economicalAgentGraph import EconomicalGraphBuilder
from src.graphs.politicalAgentGraph import PoliticalGraphBuilder
from src.graphs.meteorologicalAgentGraph import MeteorologicalGraphBuilder
logger = logging.getLogger("main_graph")
logger.setLevel(logging.INFO)
if not logger.handlers:
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
logger.addHandler(ch)
class CombinedAgentGraphBuilder:
def __init__(self, llm):
self.llm = llm
def build_graph(self):
social_graph = SocialGraphBuilder(self.llm).build_graph()
intelligence_graph = IntelligenceGraphBuilder(self.llm).build_graph()
economical_graph = EconomicalGraphBuilder(self.llm).build_graph()
political_graph = PoliticalGraphBuilder(self.llm).build_graph()
meteorological_graph = MeteorologicalGraphBuilder(self.llm).build_graph()
def run_social_agent(state: CombinedAgentState) -> Dict[str, Any]:
logger.info("[CombinedGraph] Invoking SocialAgent...")
try:
result = social_graph.invoke({})
insights = result.get("domain_insights", [])
logger.info(
f"[CombinedGraph] SocialAgent returned {len(insights)} insights"
)
return {"domain_insights": insights}
except Exception as e:
logger.error(f"[CombinedGraph] SocialAgent FAILED: {e}")
return {"domain_insights": []}
def run_intelligence_agent(state: CombinedAgentState) -> Dict[str, Any]:
logger.info("[CombinedGraph] Invoking IntelligenceAgent...")
try:
result = intelligence_graph.invoke({})
insights = result.get("domain_insights", [])
logger.info(
f"[CombinedGraph] IntelligenceAgent returned {len(insights)} insights"
)
return {"domain_insights": insights}
except Exception as e:
logger.error(f"[CombinedGraph] IntelligenceAgent FAILED: {e}")
return {"domain_insights": []}
def run_economical_agent(state: CombinedAgentState) -> Dict[str, Any]:
logger.info("[CombinedGraph] Invoking EconomicalAgent...")
try:
result = economical_graph.invoke({})
insights = result.get("domain_insights", [])
logger.info(
f"[CombinedGraph] EconomicalAgent returned {len(insights)} insights"
)
return {"domain_insights": insights}
except Exception as e:
logger.error(f"[CombinedGraph] EconomicalAgent FAILED: {e}")
return {"domain_insights": []}
def run_political_agent(state: CombinedAgentState) -> Dict[str, Any]:
logger.info("[CombinedGraph] Invoking PoliticalAgent...")
try:
result = political_graph.invoke({})
insights = result.get("domain_insights", [])
logger.info(
f"[CombinedGraph] PoliticalAgent returned {len(insights)} insights"
)
return {"domain_insights": insights}
except Exception as e:
logger.error(f"[CombinedGraph] PoliticalAgent FAILED: {e}")
return {"domain_insights": []}
def run_meteorological_agent(state: CombinedAgentState) -> Dict[str, Any]:
logger.info("[CombinedGraph] Invoking MeteorologicalAgent...")
try:
result = meteorological_graph.invoke({})
insights = result.get("domain_insights", [])
logger.info(
f"[CombinedGraph] MeteorologicalAgent returned {len(insights)} insights"
)
return {"domain_insights": insights}
except Exception as e:
logger.error(f"[CombinedGraph] MeteorologicalAgent FAILED: {e}")
return {"domain_insights": []}
orchestrator = CombinedAgentNode(self.llm)
workflow = StateGraph(CombinedAgentState)
workflow.add_node("SocialAgent", run_social_agent)
workflow.add_node("IntelligenceAgent", run_intelligence_agent)
workflow.add_node("EconomicalAgent", run_economical_agent)
workflow.add_node("PoliticalAgent", run_political_agent)
workflow.add_node("MeteorologicalAgent", run_meteorological_agent)
workflow.add_node("GraphInitiator", orchestrator.graph_initiator)
workflow.add_node("FeedAggregatorAgent", orchestrator.feed_aggregator_agent)
workflow.add_node("DataRefresherAgent", orchestrator.data_refresher_agent)
workflow.add_node("DataRefreshRouter", orchestrator.data_refresh_router)
workflow.add_edge(START, "GraphInitiator")
sub_agents = [
"SocialAgent",
"IntelligenceAgent",
"EconomicalAgent",
"PoliticalAgent",
"MeteorologicalAgent",
]
for agent in sub_agents:
workflow.add_edge("GraphInitiator", agent)
workflow.add_edge(agent, "FeedAggregatorAgent")
workflow.add_edge("FeedAggregatorAgent", "DataRefresherAgent")
workflow.add_edge("DataRefresherAgent", "DataRefreshRouter")
workflow.add_conditional_edges(
"DataRefreshRouter",
lambda x: x.route if x.route else "END",
{"GraphInitiator": "GraphInitiator", "END": END},
)
return workflow.compile()
print("Building Combined Agent Graph...")
llm = GroqLLM().get_llm()
builder = CombinedAgentGraphBuilder(llm)
graph = builder.build_graph()
print("Combined Graph ready")