|
import json |
|
import os |
|
import re |
|
from langchain_community.document_loaders import CSVLoader, PyPDFLoader, Docx2txtLoader |
|
from langgraph.graph import StateGraph, END |
|
from langchain.prompts import PromptTemplate |
|
from langchain.schema import Document, AIMessage |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain_openai import ChatOpenAI, OpenAIEmbeddings |
|
from pathlib import Path |
|
from pydantic import BaseModel, Field |
|
from qdrant_client import QdrantClient |
|
from qdrant_client.models import Distance, VectorParams, PointStruct |
|
from typing import List, Dict, Any |
|
|
|
from pydantic import BaseModel, Field |
|
from typing import Dict, Any |
|
|
|
|
|
llm = ChatOpenAI(model_name="gpt-4o") |
|
embeddings = OpenAIEmbeddings(model="text-embedding-3-small") |
|
qdrant = QdrantClient(":memory:") |
|
|
|
|
|
qdrant.create_collection( |
|
collection_name="opportunities", |
|
vectors_config=VectorParams(size=1536, distance=Distance.COSINE), |
|
) |
|
|
|
class State(BaseModel): |
|
file_path: str |
|
document_processed: str = "" |
|
opportunity_evaluation: Dict[str, Any] = Field(default_factory=dict) |
|
next_action: Dict[str, Any] = Field(default_factory=dict) |
|
|
|
def dict_representation(self) -> Dict[str, Any]: |
|
return { |
|
"file_path": self.file_path, |
|
"document_processed": self.document_processed, |
|
"opportunity_evaluation": self.opportunity_evaluation, |
|
"next_action": self.next_action |
|
} |
|
|
|
async def prep_opportunity_review(session_state): |
|
file_path = prep_document() |
|
structured_results = run_analysis(file_path) |
|
opportunity_review_report = create_opportunity_review_report(structured_results) |
|
session_state.opportunity_review_results = structured_results |
|
session_state.opportunity_review_report = opportunity_review_report |
|
|
|
|
|
def prep_document(): |
|
file_path = "data/HSBC Opportunity Information.docx" |
|
path = Path(file_path) |
|
|
|
if path.exists(): |
|
if path.is_file(): |
|
print(f"File found: {path}") |
|
print(f"File size: {path.stat().st_size / 1024:.2f} KB") |
|
print(f"Last modified: {path.stat().st_mtime}") |
|
print("File is ready for processing.") |
|
if os.access(path, os.R_OK): |
|
print("File is readable.") |
|
else: |
|
print("Warning: File exists but may not be readable. Check permissions.") |
|
else: |
|
print(f"Error: {path} exists but is not a file. It might be a directory.") |
|
else: |
|
print(f"Error: File not found at {path}") |
|
print("Please check the following:") |
|
print("1. Ensure the file path is correct.") |
|
print("2. Verify that the file exists in the specified location.") |
|
print("3. Check if you have the necessary permissions to access the file.") |
|
|
|
parent = path.parent |
|
if not parent.exists(): |
|
print(f"Note: The directory {parent} does not exist.") |
|
elif not parent.is_dir(): |
|
print(f"Note: {parent} exists but is not a directory.") |
|
|
|
file_path_for_processing = str(path) |
|
return file_path_for_processing |
|
|
|
def load_and_chunk_document(file_path: str) -> List[Document]: |
|
"""Load and chunk the document based on file type.""" |
|
if not os.path.exists(file_path): |
|
raise FileNotFoundError(f"File not found: {file_path}") |
|
|
|
_, file_extension = os.path.splitext(file_path.lower()) |
|
|
|
if file_extension == '.csv': |
|
loader = CSVLoader(file_path) |
|
elif file_extension == '.pdf': |
|
loader = PyPDFLoader(file_path) |
|
elif file_extension == '.docx': |
|
loader = Docx2txtLoader(file_path) |
|
else: |
|
raise ValueError(f"Unsupported file type: {file_extension}") |
|
|
|
documents = loader.load() |
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) |
|
return text_splitter.split_documents(documents) |
|
|
|
def agent_1(file_path: str) -> str: |
|
"""Agent 1: Load, chunk, embed, and store document in Qdrant.""" |
|
try: |
|
chunks = load_and_chunk_document(file_path) |
|
points = [] |
|
for i, chunk in enumerate(chunks): |
|
vector = embeddings.embed_query(chunk.page_content) |
|
points.append(PointStruct(id=i, vector=vector, payload={"text": chunk.page_content})) |
|
|
|
qdrant.upsert( |
|
collection_name="opportunities", |
|
points=points |
|
) |
|
return f"Document processed and stored in Qdrant. {len(chunks)} chunks created." |
|
except Exception as e: |
|
print(f"Error in agent_1: {str(e)}") |
|
return f"Error processing document: {str(e)}" |
|
|
|
def agent_2() -> Dict[str, Any]: |
|
"""Agent 2: Evaluate opportunity based on MEDDIC criteria.""" |
|
try: |
|
results = qdrant.scroll(collection_name="opportunities", limit=100) |
|
if not results or len(results[0]) == 0: |
|
raise ValueError("No documents found in Qdrant") |
|
|
|
full_text = " ".join([point.payload.get("text", "") for point in results[0]]) |
|
|
|
meddic_template = """ |
|
Analyze the following opportunity information using the MEDDIC sales methodology: |
|
|
|
{opportunity_info} |
|
|
|
Assign an overall opportunity score (1-100) with 100 means that the opportunity is a sure win. |
|
|
|
Provide a Summary of the opportunity. There must always be a summary. Ensure the summary is on the |
|
same line as the Summary: title |
|
|
|
Evaluate the opportunity based on each MEDDIC criterion and assign a score for each criterion: |
|
1. Metrics |
|
2. Economic Buyer |
|
3. Decision Criteria |
|
4. Decision Process |
|
5. Identify Pain |
|
6. Champion |
|
|
|
Format your response as follows: |
|
Summary: [Opportunity Summary] |
|
Score: [Overall Opportunity Score between 1 to 100 based on MEDDIC criteria] |
|
MEDDIC Evaluation: |
|
- Metrics: [Score on Metrics, Evaluation on Metrics criterion] |
|
- Economic Buyer: [Score on Economic Buyer, Evaluation on Economic Buyer criterion] |
|
- Decision Criteria: [Score on Decision Criteria, Evaluation on Decision Criteria criterion] |
|
- Decision Process: [Score on Decision Process, Evaluation on Decision Process criterion] |
|
- Identify Pain: [Score on Identify Pain, Evaluation on Identify Pain criterion] |
|
- Champion: [Score on Champion, Evaluation on Champion criterion] |
|
""" |
|
|
|
meddic_prompt = PromptTemplate(template=meddic_template, input_variables=["opportunity_info"]) |
|
meddic_chain = meddic_prompt | llm |
|
|
|
response = meddic_chain.invoke({"opportunity_info": full_text}) |
|
|
|
if isinstance(response, AIMessage): |
|
response_content = response.content |
|
elif isinstance(response, str): |
|
response_content = response |
|
else: |
|
raise ValueError(f"Unexpected response type: {type(response)}") |
|
|
|
print(response_content) |
|
|
|
lines = response_content.split('\n') |
|
summary = next((line.split('Summary:')[1].strip() for line in lines if line.startswith('Summary:')), 'N/A') |
|
print(summary) |
|
score = next((int(line.split('Score:')[1].strip()) for line in lines if line.startswith('Score:')), 0) |
|
print(score) |
|
meddic_eval = {} |
|
current_criterion = None |
|
for line in lines: |
|
if line.strip().startswith('-'): |
|
parts = line.split(':', 1) |
|
if len(parts) == 2: |
|
current_criterion = parts[0].strip('- ') |
|
meddic_eval[current_criterion] = parts[1].strip() |
|
elif current_criterion and line.strip(): |
|
meddic_eval[current_criterion] += ' ' + line.strip() |
|
|
|
return { |
|
'summary': summary, |
|
'score': score, |
|
'meddic_evaluation': meddic_eval |
|
} |
|
|
|
except Exception as e: |
|
print(f"Error in agent_2: {str(e)}") |
|
return { |
|
'summary': "Error occurred during evaluation", |
|
'score': 0, |
|
'meddic_evaluation': str(e) |
|
} |
|
|
|
def clean_and_parse_json(json_string): |
|
|
|
json_string = re.sub(r'^```json\s*', '', json_string) |
|
json_string = re.sub(r'\s*```$', '', json_string) |
|
|
|
|
|
json_string = json_string.strip() |
|
|
|
|
|
try: |
|
return json.loads(json_string) |
|
except json.JSONDecodeError as e: |
|
print(f"Error parsing JSON: {e}") |
|
return None |
|
|
|
def agent_2_json() -> Dict[str, Any]: |
|
"""Agent 2: Evaluate opportunity based on MEDDIC criteria.""" |
|
try: |
|
results = qdrant.scroll(collection_name="opportunities", limit=100) |
|
if not results or len(results[0]) == 0: |
|
raise ValueError("No documents found in Qdrant") |
|
|
|
full_text = " ".join([point.payload.get("text", "") for point in results[0]]) |
|
|
|
meddic_template = """ |
|
Analyze the following opportunity information using the MEDDIC sales methodology: |
|
|
|
{opportunity_info} |
|
|
|
Assign an overall opportunity score (1-100) with 100 means that the opportunity is a sure win. |
|
|
|
Provide a Summary of the opportunity. There must always be a summary. Ensure the summary is on the |
|
same line as the Summary: title |
|
|
|
Evaluate the opportunity based on each MEDDIC criterion and assign a score for each criterion: |
|
1. Metrics |
|
2. Economic Buyer |
|
3. Decision Criteria |
|
4. Decision Process |
|
5. Identify Pain |
|
6. Champion |
|
|
|
Your response must be in JSON format. |
|
Use the following keys in the JSON: |
|
Summary: Opportunity Summary |
|
Score: Overall Opportunity Score between 1 to 100 based on MEDDIC criteria |
|
Metrics Score: MEDDIC score on Metrics |
|
Metrics Evaluation: Evaluation on MEDDIC Metrics criterion |
|
Economic Buyer Score: MEDDIC Score on Economic Buyer |
|
Economic Buyer Evaluation: Evaluation on MEDDIC Economic Buyer criterion |
|
Decision Criteria Score: MEDDIC Score on Decision Criteria |
|
Decision Criteria Evaluation: Evaluation on MEDDIC Decision Criteria criterion |
|
Decision Process Score: MEDDIC Score on Decision Process |
|
Decision Process Evaluation: Evaluation on MEDDIC Decision Process criterion |
|
Identify Pain Score: MEDDIC Score on Identify Pain |
|
Identify Pain Evaluation: Evaluation on MEDDIC Identify Pain criterion |
|
Champion Score: MEDDIC Score on Champion |
|
Champion Evaluation: Evaluation on MEDDIC Champion criterion |
|
""" |
|
|
|
meddic_prompt = PromptTemplate(template=meddic_template, input_variables=["opportunity_info"]) |
|
meddic_chain = meddic_prompt | llm |
|
|
|
response = meddic_chain.invoke({"opportunity_info": full_text}) |
|
|
|
if isinstance(response, AIMessage): |
|
response_content = response.content |
|
elif isinstance(response, str): |
|
response_content = response |
|
else: |
|
raise ValueError(f"Unexpected response type: {type(response)}") |
|
|
|
print(response_content) |
|
|
|
meddic_data = clean_and_parse_json(response_content) |
|
print("jsonified") |
|
print(meddic_data) |
|
|
|
output = { |
|
'summary': meddic_data.get('Summary', 'N/A'), |
|
'score': meddic_data.get('Score', 0), |
|
'meddic_evaluation': { |
|
'Metrics': f"{meddic_data.get('Metrics Score', 'N/A')} - {meddic_data.get('Metrics Evaluation', 'N/A')}", |
|
'Economic Buyer': f"{meddic_data.get('Economic Buyer Score', 'N/A')} - {meddic_data.get('Economic Buyer Evaluation', 'N/A')}", |
|
'Decision Criteria': f"{meddic_data.get('Decision Criteria Score', 'N/A')} - {meddic_data.get('Decision Criteria Evaluation', 'N/A')}", |
|
'Decision Process': f"{meddic_data.get('Decision Process Score', 'N/A')} - {meddic_data.get('Decision Process Evaluation', 'N/A')}", |
|
'Identify Pain': f"{meddic_data.get('Identify Pain Score', 'N/A')} - {meddic_data.get('Identify Pain Evaluation', 'N/A')}", |
|
'Champion': f"{meddic_data.get('Champion Score', 'N/A')} - {meddic_data.get('Champion Evaluation', 'N/A')}" |
|
} |
|
} |
|
print("output") |
|
print(output) |
|
return output |
|
|
|
except Exception as e: |
|
print(f"Error in agent_2_json: {str(e)}") |
|
return { |
|
'summary': "Error occurred during evaluation", |
|
'score': 0, |
|
'meddic_evaluation': str(e) |
|
} |
|
|
|
def agent_3(meddic_evaluation: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Agent 3: Suggest next best action and talking points.""" |
|
try: |
|
next_action_template = """ |
|
Based on the following MEDDIC evaluation of an opportunity: |
|
|
|
{meddic_evaluation} |
|
|
|
Suggest the next best action for the upcoming customer meeting and provide the top 3 talking points. |
|
Format your response as follows: |
|
Next Action: [Your suggested action] |
|
Talking Points: |
|
1. [First talking point] |
|
2. [Second talking point] |
|
3. [Third talking point] |
|
""" |
|
|
|
next_action_prompt = PromptTemplate(template=next_action_template, input_variables=["meddic_evaluation"]) |
|
next_action_chain = next_action_prompt | llm |
|
|
|
response = next_action_chain.invoke({"meddic_evaluation": json.dumps(meddic_evaluation)}) |
|
|
|
if isinstance(response, AIMessage): |
|
response_content = response.content |
|
elif isinstance(response, str): |
|
response_content = response |
|
else: |
|
raise ValueError(f"Unexpected response type: {type(response)}") |
|
|
|
|
|
lines = response_content.split('\n') |
|
next_action = next((line.split('Next Action:')[1].strip() for line in lines if line.startswith('Next Action:')), 'N/A') |
|
talking_points = [line.split('.')[1].strip() for line in lines if line.strip().startswith(('1.', '2.', '3.'))] |
|
|
|
return { |
|
'next_action': next_action, |
|
'talking_points': talking_points |
|
} |
|
except Exception as e: |
|
print(f"Error in agent_3: {str(e)}") |
|
return { |
|
'next_action': "Error occurred while suggesting next action", |
|
'talking_points': [str(e)] |
|
} |
|
|
|
def process_document(state: State) -> State: |
|
print("Agent 1: Processing document...") |
|
file_path = state.file_path |
|
result = agent_1(file_path) |
|
return State(file_path=state.file_path, document_processed=result) |
|
|
|
def evaluate_opportunity(state: State) -> State: |
|
print("Agent 2: Evaluating opportunity...") |
|
result = agent_2_json() |
|
return State(file_path=state.file_path, document_processed=state.document_processed, opportunity_evaluation=result) |
|
|
|
def suggest_next_action(state: State) -> State: |
|
print("Agent 3: Suggesting next actions...") |
|
result = agent_3(state.opportunity_evaluation) |
|
return State(file_path=state.file_path, document_processed=state.document_processed, opportunity_evaluation=state.opportunity_evaluation, next_action=result) |
|
|
|
def define_graph() -> StateGraph: |
|
workflow = StateGraph(State) |
|
|
|
workflow.add_node("process_document", process_document) |
|
workflow.add_node("evaluate_opportunity", evaluate_opportunity) |
|
workflow.add_node("suggest_next_action", suggest_next_action) |
|
|
|
workflow.set_entry_point("process_document") |
|
workflow.add_edge("process_document", "evaluate_opportunity") |
|
workflow.add_edge("evaluate_opportunity", "suggest_next_action") |
|
|
|
return workflow |
|
|
|
|
|
def run_analysis(file_path: str) -> Dict[str, Any]: |
|
if not os.path.exists(file_path): |
|
return {"error": f"File not found: {file_path}"} |
|
|
|
graph = define_graph() |
|
initial_state = State(file_path=file_path) |
|
|
|
try: |
|
app = graph.compile() |
|
final_state = app.invoke(initial_state) |
|
|
|
|
|
structured_results = { |
|
"file_path": final_state["file_path"], |
|
"document_processed": final_state["document_processed"], |
|
"opportunity_evaluation": final_state["opportunity_evaluation"], |
|
"next_action": final_state["next_action"] |
|
} |
|
|
|
|
|
print("\n--- Analysis Results ---") |
|
print(f"Document Processing: {'Successful' if 'Error' not in structured_results['document_processed'] else 'Failed'}") |
|
print(f"Details: {structured_results['document_processed']}") |
|
|
|
if isinstance(structured_results['opportunity_evaluation'], dict): |
|
print("\nOpportunity Evaluation:") |
|
print(f"Summary: {structured_results['opportunity_evaluation'].get('summary', 'N/A')}") |
|
print(f"Score: {structured_results['opportunity_evaluation'].get('score', 'N/A')}") |
|
print("MEDDIC Evaluation:") |
|
for criterion, evaluation in structured_results['opportunity_evaluation'].get('meddic_evaluation', {}).items(): |
|
print(f"{criterion}: {evaluation}") |
|
else: |
|
print("\nOpportunity Evaluation:") |
|
print(f"Error: {structured_results['opportunity_evaluation']}") |
|
|
|
if isinstance(structured_results['next_action'], dict): |
|
print("\nNext Action:") |
|
print(f"Action: {structured_results['next_action'].get('next_action', 'N/A')}") |
|
print("Talking Points:") |
|
for i, point in enumerate(structured_results['next_action'].get('talking_points', []), 1): |
|
print(f" {i}. {point}") |
|
else: |
|
print("\nNext Action:") |
|
print(f"Error: {structured_results['next_action']}") |
|
|
|
return structured_results |
|
|
|
except Exception as e: |
|
print(f"An error occurred during analysis: {str(e)}") |
|
return {"error": str(e)} |
|
|
|
def create_opportunity_review_report(structured_results): |
|
opportunity_review_report = "" |
|
opportunity_review_report += "**Analysis Results**\n\n" |
|
if 'Error' in structured_results['document_processed']: |
|
opportunity_review_report += f"Opportunity Analysis Failed\n" |
|
|
|
else: |
|
if isinstance(structured_results['opportunity_evaluation'], dict): |
|
opportunity_review_report += f"**Summary:** {structured_results['opportunity_evaluation'].get('summary', 'N/A')}\n\n" |
|
opportunity_review_report += f"**Score:** {structured_results['opportunity_evaluation'].get('score', 'N/A')}\n\n" |
|
opportunity_review_report += "**MEDDIC Evaluation:**\n\n" |
|
for criterion, evaluation in structured_results['opportunity_evaluation'].get('meddic_evaluation', {}).items(): |
|
opportunity_review_report += f"**{criterion}:** {evaluation}\n" |
|
|
|
if isinstance(structured_results['next_action'], dict): |
|
opportunity_review_report += "\n\n**Next Steps**\n\n" |
|
opportunity_review_report += f"{structured_results['next_action'].get('next_action', 'N/A')}\n\n" |
|
opportunity_review_report += "**Talking Points:**\n\n" |
|
for i, point in enumerate(structured_results['next_action'].get('talking_points', []), 1): |
|
opportunity_review_report += f" {i}. {point}\n" |
|
file_path = "./reports/HSBC Opportunity Review Report.md" |
|
save_md_file(file_path, opportunity_review_report) |
|
return opportunity_review_report |
|
|
|
def save_md_file(file_path, file_content): |
|
try: |
|
if os.path.exists(file_path): |
|
os.remove(file_path) |
|
print(f"Existing file deleted: {file_path}") |
|
|
|
with open(file_path, 'w', encoding='utf-8') as md_file: |
|
md_file.write(file_content) |
|
print(f"File saved successfully: {file_path}") |
|
except PermissionError: |
|
print(f"Permission denied when trying to delete or save file: {file_path}") |
|
|
|
return None |
|
|