salesiq / backend /app /utils /utils_opportunity_review.py
richlai's picture
add files
7781557
raw
history blame
20.1 kB
import json
import os
import re
from langchain_community.document_loaders import CSVLoader, PyPDFLoader, Docx2txtLoader
from langgraph.graph import StateGraph, END
from langchain.prompts import PromptTemplate
from langchain.schema import Document, AIMessage
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from pathlib import Path
from pydantic import BaseModel, Field
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from typing import List, Dict, Any
from pydantic import BaseModel, Field
from typing import Dict, Any
llm = ChatOpenAI(model_name="gpt-4o")
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
qdrant = QdrantClient(":memory:") # In-memory Qdrant instance
# Create collection
qdrant.create_collection(
collection_name="opportunities",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)
class State(BaseModel):
file_path: str
document_processed: str = ""
opportunity_evaluation: Dict[str, Any] = Field(default_factory=dict)
next_action: Dict[str, Any] = Field(default_factory=dict)
def dict_representation(self) -> Dict[str, Any]:
return {
"file_path": self.file_path,
"document_processed": self.document_processed,
"opportunity_evaluation": self.opportunity_evaluation,
"next_action": self.next_action
}
async def prep_opportunity_review(session_state):
file_path = prep_document()
structured_results = run_analysis(file_path)
opportunity_review_report = create_opportunity_review_report(structured_results)
session_state.opportunity_review_results = structured_results
session_state.opportunity_review_report = opportunity_review_report
def prep_document():
file_path = "data/HSBC Opportunity Information.docx"
path = Path(file_path)
if path.exists():
if path.is_file():
print(f"File found: {path}")
print(f"File size: {path.stat().st_size / 1024:.2f} KB")
print(f"Last modified: {path.stat().st_mtime}")
print("File is ready for processing.")
if os.access(path, os.R_OK):
print("File is readable.")
else:
print("Warning: File exists but may not be readable. Check permissions.")
else:
print(f"Error: {path} exists but is not a file. It might be a directory.")
else:
print(f"Error: File not found at {path}")
print("Please check the following:")
print("1. Ensure the file path is correct.")
print("2. Verify that the file exists in the specified location.")
print("3. Check if you have the necessary permissions to access the file.")
parent = path.parent
if not parent.exists():
print(f"Note: The directory {parent} does not exist.")
elif not parent.is_dir():
print(f"Note: {parent} exists but is not a directory.")
file_path_for_processing = str(path)
return file_path_for_processing
def load_and_chunk_document(file_path: str) -> List[Document]:
"""Load and chunk the document based on file type."""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
_, file_extension = os.path.splitext(file_path.lower())
if file_extension == '.csv':
loader = CSVLoader(file_path)
elif file_extension == '.pdf':
loader = PyPDFLoader(file_path)
elif file_extension == '.docx':
loader = Docx2txtLoader(file_path)
else:
raise ValueError(f"Unsupported file type: {file_extension}")
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
return text_splitter.split_documents(documents)
def agent_1(file_path: str) -> str:
"""Agent 1: Load, chunk, embed, and store document in Qdrant."""
try:
chunks = load_and_chunk_document(file_path)
points = []
for i, chunk in enumerate(chunks):
vector = embeddings.embed_query(chunk.page_content)
points.append(PointStruct(id=i, vector=vector, payload={"text": chunk.page_content}))
qdrant.upsert(
collection_name="opportunities",
points=points
)
return f"Document processed and stored in Qdrant. {len(chunks)} chunks created."
except Exception as e:
print(f"Error in agent_1: {str(e)}")
return f"Error processing document: {str(e)}"
def agent_2() -> Dict[str, Any]:
"""Agent 2: Evaluate opportunity based on MEDDIC criteria."""
try:
results = qdrant.scroll(collection_name="opportunities", limit=100)
if not results or len(results[0]) == 0:
raise ValueError("No documents found in Qdrant")
full_text = " ".join([point.payload.get("text", "") for point in results[0]])
meddic_template = """
Analyze the following opportunity information using the MEDDIC sales methodology:
{opportunity_info}
Assign an overall opportunity score (1-100) with 100 means that the opportunity is a sure win.
Provide a Summary of the opportunity. There must always be a summary. Ensure the summary is on the
same line as the Summary: title
Evaluate the opportunity based on each MEDDIC criterion and assign a score for each criterion:
1. Metrics
2. Economic Buyer
3. Decision Criteria
4. Decision Process
5. Identify Pain
6. Champion
Format your response as follows:
Summary: [Opportunity Summary]
Score: [Overall Opportunity Score between 1 to 100 based on MEDDIC criteria]
MEDDIC Evaluation:
- Metrics: [Score on Metrics, Evaluation on Metrics criterion]
- Economic Buyer: [Score on Economic Buyer, Evaluation on Economic Buyer criterion]
- Decision Criteria: [Score on Decision Criteria, Evaluation on Decision Criteria criterion]
- Decision Process: [Score on Decision Process, Evaluation on Decision Process criterion]
- Identify Pain: [Score on Identify Pain, Evaluation on Identify Pain criterion]
- Champion: [Score on Champion, Evaluation on Champion criterion]
"""
meddic_prompt = PromptTemplate(template=meddic_template, input_variables=["opportunity_info"])
meddic_chain = meddic_prompt | llm
response = meddic_chain.invoke({"opportunity_info": full_text})
if isinstance(response, AIMessage):
response_content = response.content
elif isinstance(response, str):
response_content = response
else:
raise ValueError(f"Unexpected response type: {type(response)}")
print(response_content)
# Parse the response content
lines = response_content.split('\n')
summary = next((line.split('Summary:')[1].strip() for line in lines if line.startswith('Summary:')), 'N/A')
print(summary)
score = next((int(line.split('Score:')[1].strip()) for line in lines if line.startswith('Score:')), 0)
print(score)
meddic_eval = {}
current_criterion = None
for line in lines:
if line.strip().startswith('-'):
parts = line.split(':', 1)
if len(parts) == 2:
current_criterion = parts[0].strip('- ')
meddic_eval[current_criterion] = parts[1].strip()
elif current_criterion and line.strip():
meddic_eval[current_criterion] += ' ' + line.strip()
return {
'summary': summary,
'score': score,
'meddic_evaluation': meddic_eval
}
except Exception as e:
print(f"Error in agent_2: {str(e)}")
return {
'summary': "Error occurred during evaluation",
'score': 0,
'meddic_evaluation': str(e)
}
def clean_and_parse_json(json_string):
# Remove triple backticks and "json" label if present
json_string = re.sub(r'^```json\s*', '', json_string)
json_string = re.sub(r'\s*```$', '', json_string)
# Remove any leading/trailing whitespace
json_string = json_string.strip()
# Parse the cleaned JSON string
try:
return json.loads(json_string)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return None
def agent_2_json() -> Dict[str, Any]:
"""Agent 2: Evaluate opportunity based on MEDDIC criteria."""
try:
results = qdrant.scroll(collection_name="opportunities", limit=100)
if not results or len(results[0]) == 0:
raise ValueError("No documents found in Qdrant")
full_text = " ".join([point.payload.get("text", "") for point in results[0]])
meddic_template = """
Analyze the following opportunity information using the MEDDIC sales methodology:
{opportunity_info}
Assign an overall opportunity score (1-100) with 100 means that the opportunity is a sure win.
Provide a Summary of the opportunity. There must always be a summary. Ensure the summary is on the
same line as the Summary: title
Evaluate the opportunity based on each MEDDIC criterion and assign a score for each criterion:
1. Metrics
2. Economic Buyer
3. Decision Criteria
4. Decision Process
5. Identify Pain
6. Champion
Your response must be in JSON format.
Use the following keys in the JSON:
Summary: Opportunity Summary
Score: Overall Opportunity Score between 1 to 100 based on MEDDIC criteria
Metrics Score: MEDDIC score on Metrics
Metrics Evaluation: Evaluation on MEDDIC Metrics criterion
Economic Buyer Score: MEDDIC Score on Economic Buyer
Economic Buyer Evaluation: Evaluation on MEDDIC Economic Buyer criterion
Decision Criteria Score: MEDDIC Score on Decision Criteria
Decision Criteria Evaluation: Evaluation on MEDDIC Decision Criteria criterion
Decision Process Score: MEDDIC Score on Decision Process
Decision Process Evaluation: Evaluation on MEDDIC Decision Process criterion
Identify Pain Score: MEDDIC Score on Identify Pain
Identify Pain Evaluation: Evaluation on MEDDIC Identify Pain criterion
Champion Score: MEDDIC Score on Champion
Champion Evaluation: Evaluation on MEDDIC Champion criterion
"""
meddic_prompt = PromptTemplate(template=meddic_template, input_variables=["opportunity_info"])
meddic_chain = meddic_prompt | llm
response = meddic_chain.invoke({"opportunity_info": full_text})
if isinstance(response, AIMessage):
response_content = response.content
elif isinstance(response, str):
response_content = response
else:
raise ValueError(f"Unexpected response type: {type(response)}")
print(response_content)
meddic_data = clean_and_parse_json(response_content)
print("jsonified")
print(meddic_data)
# Create the output structure
output = {
'summary': meddic_data.get('Summary', 'N/A'),
'score': meddic_data.get('Score', 0),
'meddic_evaluation': {
'Metrics': f"{meddic_data.get('Metrics Score', 'N/A')} - {meddic_data.get('Metrics Evaluation', 'N/A')}",
'Economic Buyer': f"{meddic_data.get('Economic Buyer Score', 'N/A')} - {meddic_data.get('Economic Buyer Evaluation', 'N/A')}",
'Decision Criteria': f"{meddic_data.get('Decision Criteria Score', 'N/A')} - {meddic_data.get('Decision Criteria Evaluation', 'N/A')}",
'Decision Process': f"{meddic_data.get('Decision Process Score', 'N/A')} - {meddic_data.get('Decision Process Evaluation', 'N/A')}",
'Identify Pain': f"{meddic_data.get('Identify Pain Score', 'N/A')} - {meddic_data.get('Identify Pain Evaluation', 'N/A')}",
'Champion': f"{meddic_data.get('Champion Score', 'N/A')} - {meddic_data.get('Champion Evaluation', 'N/A')}"
}
}
print("output")
print(output)
return output
except Exception as e:
print(f"Error in agent_2_json: {str(e)}")
return {
'summary': "Error occurred during evaluation",
'score': 0,
'meddic_evaluation': str(e)
}
def agent_3(meddic_evaluation: Dict[str, Any]) -> Dict[str, Any]:
"""Agent 3: Suggest next best action and talking points."""
try:
next_action_template = """
Based on the following MEDDIC evaluation of an opportunity:
{meddic_evaluation}
Suggest the next best action for the upcoming customer meeting and provide the top 3 talking points.
Format your response as follows:
Next Action: [Your suggested action]
Talking Points:
1. [First talking point]
2. [Second talking point]
3. [Third talking point]
"""
next_action_prompt = PromptTemplate(template=next_action_template, input_variables=["meddic_evaluation"])
next_action_chain = next_action_prompt | llm
response = next_action_chain.invoke({"meddic_evaluation": json.dumps(meddic_evaluation)})
if isinstance(response, AIMessage):
response_content = response.content
elif isinstance(response, str):
response_content = response
else:
raise ValueError(f"Unexpected response type: {type(response)}")
# Parse the response content
lines = response_content.split('\n')
next_action = next((line.split('Next Action:')[1].strip() for line in lines if line.startswith('Next Action:')), 'N/A')
talking_points = [line.split('.')[1].strip() for line in lines if line.strip().startswith(('1.', '2.', '3.'))]
return {
'next_action': next_action,
'talking_points': talking_points
}
except Exception as e:
print(f"Error in agent_3: {str(e)}")
return {
'next_action': "Error occurred while suggesting next action",
'talking_points': [str(e)]
}
def process_document(state: State) -> State:
print("Agent 1: Processing document...")
file_path = state.file_path
result = agent_1(file_path)
return State(file_path=state.file_path, document_processed=result)
def evaluate_opportunity(state: State) -> State:
print("Agent 2: Evaluating opportunity...")
result = agent_2_json()
return State(file_path=state.file_path, document_processed=state.document_processed, opportunity_evaluation=result)
def suggest_next_action(state: State) -> State:
print("Agent 3: Suggesting next actions...")
result = agent_3(state.opportunity_evaluation)
return State(file_path=state.file_path, document_processed=state.document_processed, opportunity_evaluation=state.opportunity_evaluation, next_action=result)
def define_graph() -> StateGraph:
workflow = StateGraph(State)
workflow.add_node("process_document", process_document)
workflow.add_node("evaluate_opportunity", evaluate_opportunity)
workflow.add_node("suggest_next_action", suggest_next_action)
workflow.set_entry_point("process_document")
workflow.add_edge("process_document", "evaluate_opportunity")
workflow.add_edge("evaluate_opportunity", "suggest_next_action")
return workflow
def run_analysis(file_path: str) -> Dict[str, Any]:
if not os.path.exists(file_path):
return {"error": f"File not found: {file_path}"}
graph = define_graph()
initial_state = State(file_path=file_path)
try:
app = graph.compile()
final_state = app.invoke(initial_state)
# Convert the final state to a dictionary manually
structured_results = {
"file_path": final_state["file_path"],
"document_processed": final_state["document_processed"],
"opportunity_evaluation": final_state["opportunity_evaluation"],
"next_action": final_state["next_action"]
}
# Print a summary of the results
print("\n--- Analysis Results ---")
print(f"Document Processing: {'Successful' if 'Error' not in structured_results['document_processed'] else 'Failed'}")
print(f"Details: {structured_results['document_processed']}")
if isinstance(structured_results['opportunity_evaluation'], dict):
print("\nOpportunity Evaluation:")
print(f"Summary: {structured_results['opportunity_evaluation'].get('summary', 'N/A')}")
print(f"Score: {structured_results['opportunity_evaluation'].get('score', 'N/A')}")
print("MEDDIC Evaluation:")
for criterion, evaluation in structured_results['opportunity_evaluation'].get('meddic_evaluation', {}).items():
print(f"{criterion}: {evaluation}")
else:
print("\nOpportunity Evaluation:")
print(f"Error: {structured_results['opportunity_evaluation']}")
if isinstance(structured_results['next_action'], dict):
print("\nNext Action:")
print(f"Action: {structured_results['next_action'].get('next_action', 'N/A')}")
print("Talking Points:")
for i, point in enumerate(structured_results['next_action'].get('talking_points', []), 1):
print(f" {i}. {point}")
else:
print("\nNext Action:")
print(f"Error: {structured_results['next_action']}")
return structured_results
except Exception as e:
print(f"An error occurred during analysis: {str(e)}")
return {"error": str(e)}
def create_opportunity_review_report(structured_results):
opportunity_review_report = ""
opportunity_review_report += "**Analysis Results**\n\n"
if 'Error' in structured_results['document_processed']:
opportunity_review_report += f"Opportunity Analysis Failed\n"
else:
if isinstance(structured_results['opportunity_evaluation'], dict):
opportunity_review_report += f"**Summary:** {structured_results['opportunity_evaluation'].get('summary', 'N/A')}\n\n"
opportunity_review_report += f"**Score:** {structured_results['opportunity_evaluation'].get('score', 'N/A')}\n\n"
opportunity_review_report += "**MEDDIC Evaluation:**\n\n"
for criterion, evaluation in structured_results['opportunity_evaluation'].get('meddic_evaluation', {}).items():
opportunity_review_report += f"**{criterion}:** {evaluation}\n"
if isinstance(structured_results['next_action'], dict):
opportunity_review_report += "\n\n**Next Steps**\n\n"
opportunity_review_report += f"{structured_results['next_action'].get('next_action', 'N/A')}\n\n"
opportunity_review_report += "**Talking Points:**\n\n"
for i, point in enumerate(structured_results['next_action'].get('talking_points', []), 1):
opportunity_review_report += f" {i}. {point}\n"
file_path = "./reports/HSBC Opportunity Review Report.md"
save_md_file(file_path, opportunity_review_report)
return opportunity_review_report
def save_md_file(file_path, file_content):
try:
if os.path.exists(file_path):
os.remove(file_path)
print(f"Existing file deleted: {file_path}")
with open(file_path, 'w', encoding='utf-8') as md_file:
md_file.write(file_content)
print(f"File saved successfully: {file_path}")
except PermissionError:
print(f"Permission denied when trying to delete or save file: {file_path}")
return None