from langchain_community.llms import Ollama from langchain_community.chat_models import ChatOllama from langchain import hub from agentops.langchain_callback_handler import LangchainCallbackHandler as AgentOpsLangchainCallbackHandler from langchain.chains.conversation.memory import ConversationBufferWindowMemory from tools.cve_avd_tool import CVESearchTool from tools.misp_tool import MispTool from tools.coder_tool import CoderTool from tools.mitre_tool import MitreTool from langchain.agents import initialize_agent, AgentType, load_tools from langchain.evaluation import load_evaluator from dotenv import load_dotenv import os import re load_dotenv(override=True) llm = Ollama(model="openhermes", base_url=os.getenv('OLLAMA_HOST'), temperature=0.3, num_predict=8192, num_ctx=8192) wrn = Ollama(model="openfc", base_url=os.getenv('OLLAMA_HOST')) llama3 = Ollama(model="llama3", base_url=os.getenv('OLLAMA_HOST'), temperature=0.3) command_r = Ollama(model="command-r", base_url=os.getenv('OLLAMA_HOST'), temperature=0.1, num_ctx=8192) hermes_llama3 = Ollama(model="adrienbrault/nous-hermes2pro-llama3-8b:q4_K_M", base_url=os.getenv('OLLAMA_HOST'), temperature=0.3, num_ctx=32768) yarn_mistral_128k = Ollama(model="yarn-mistral-modified", base_url=os.getenv('OLLAMA_HOST'), temperature=0.1, num_ctx=65536, system="""""") chat_llm = ChatOllama(model="openhermes", base_url=os.getenv('OLLAMA_HOST'), num_predict=-1) cve_search_tool = CVESearchTool().cvesearch fetch_cve_tool = CVESearchTool().fetchcve misp_search_tool = MispTool().search misp_search_by_date_tool = MispTool().search_by_date misp_search_by_event_id_tool = MispTool().search_by_event_id coder_tool = CoderTool().code_generation_tool get_technique_by_id = MitreTool().get_technique_by_id get_technique_by_name = MitreTool().get_technique_by_name get_malware_by_name = MitreTool().get_malware_by_name get_tactic_by_keyword = MitreTool().get_tactic_by_keyword tools = [cve_search_tool, fetch_cve_tool, misp_search_tool, misp_search_by_date_tool, misp_search_by_event_id_tool, coder_tool, get_technique_by_id, get_technique_by_name, get_malware_by_name, get_tactic_by_keyword] # conversational agent memory memory = ConversationBufferWindowMemory( memory_key='chat_history', k=4, return_messages=True ) agentops_handler = AgentOpsLangchainCallbackHandler(api_key=os.getenv("AGENTOPS_API_KEY"), tags=['Langchain Example']) #Error handling def _handle_error(error) -> str: pattern = r'```(?!json)(.*?)```' match = re.search(pattern, str(error), re.DOTALL) if match: return "The answer contained a code blob which caused the parsing to fail, i recovered the code blob. Just use it to answer the user question: " + match.group(1) else: return llm.invoke(f"""Try to summarize and explain the following error into 1 short and consice sentence and give a small indication to correct the error: {error} """) prompt = hub.pull("hwchase17/react-chat-json") # create our agent conversational_agent = initialize_agent( # agent="chat-conversational-react-description", agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, tools=tools, prompt=prompt, llm=llm, verbose=True, max_iterations=5, memory=memory, early_stopping_method='generate', # callbacks=[agentops_handler], handle_parsing_errors=_handle_error, return_intermediate_steps=False, max_execution_time=40, ) evaluator = load_evaluator("trajectory", llm=chat_llm) # conversational_agent.agent.llm_chain.prompt.messages[0].prompt.template = """ # 'Respond to the human as helpfully and accurately as possible. # You should use the tools available to you to help answer the question. # Your final answer should be technical, well explained, and accurate. # You have access to the following tools:\n\n\n\nUse a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).\n\nValid "action" values: "Final Answer" or \n\nProvide only ONE action per $JSON_BLOB, as shown:\n\n```\n{{\n "action": $TOOL_NAME,\n "action_input": $INPUT\n}}\n```\n\nFollow this format:\n\nQuestion: input question to answer\nThought: consider previous and subsequent steps\nAction:\n```\n$JSON_BLOB\n```\nObservation: action result\n... (repeat Thought/Action/Observation N times)\nThought: I know what to respond\nAction:\n```\n{{\n "action": "Final Answer",\n "action_input": "Final response to human"\n}}\n```\n\nBegin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation:.\nThought:' # """ template = conversational_agent.agent.llm_chain.prompt.messages[0].prompt.template conversational_agent.agent.llm_chain.prompt.messages[0].prompt.template = """You are a cyber security analyst, you role is to respond to the human queries in a technical way while providing detailed explanations when providing final answer.""" + template def invoke(input_text): results = conversational_agent({"input":input_text}) # evaluation_result = evaluator.evaluate_agent_trajectory( # prediction=results["output"], # input=results["input"], # agent_trajectory=results["intermediate_steps"], # ) # print(evaluation_result) return results['output']