Spaces:
Sleeping
Sleeping
| """ | |
| RAG (Retrieval Augmented Generation) implementation for project assistant. | |
| """ | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| from datetime import datetime | |
| import chromadb | |
| from chromadb.config import Settings | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from src.parsers import MeetingNote, load_meetings_from_directory | |
| class ProjectRAG: | |
| """RAG system for project meeting notes.""" | |
| def __init__(self, data_dir: Path, persist_dir: Path = None): | |
| """Initialize the RAG system.""" | |
| self.data_dir = data_dir | |
| self.persist_dir = persist_dir or Path("./chroma_db") | |
| # Initialize embeddings | |
| self.embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2" | |
| ) | |
| # Initialize ChromaDB | |
| self.client = chromadb.PersistentClient(path=str(self.persist_dir)) | |
| self.collection = self.client.get_or_create_collection( | |
| name="meeting_notes", | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| # Text splitter for chunking | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=500, | |
| chunk_overlap=50, | |
| separators=["\n\n", "\n", ". ", " ", ""] | |
| ) | |
| self.meetings: List[MeetingNote] = [] | |
| def load_and_index(self): | |
| """Load all meetings and index them in the vector store.""" | |
| print("Loading meetings from directory...") | |
| self.meetings = load_meetings_from_directory(self.data_dir) | |
| print(f"Loaded {len(self.meetings)} meetings") | |
| if not self.meetings: | |
| print("No meetings found. Please add meeting notes to the data directory.") | |
| return | |
| # Clear existing collection | |
| self.client.delete_collection("meeting_notes") | |
| self.collection = self.client.create_collection( | |
| name="meeting_notes", | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| print("Indexing meetings...") | |
| documents = [] | |
| metadatas = [] | |
| ids = [] | |
| for idx, meeting in enumerate(self.meetings): | |
| # Create a rich document representation | |
| doc_parts = [ | |
| f"Project: {meeting.project_name}", | |
| f"Meeting: {meeting.title}", | |
| f"Date: {meeting.date.strftime('%Y-%m-%d') if meeting.date else 'Unknown'}", | |
| ] | |
| if meeting.participants: | |
| doc_parts.append(f"Participants: {', '.join(meeting.participants)}") | |
| if meeting.discussion: | |
| doc_parts.append(f"Discussion:\n{meeting.discussion}") | |
| if meeting.decisions: | |
| doc_parts.append("Decisions:") | |
| doc_parts.extend([f"- {d}" for d in meeting.decisions]) | |
| if meeting.action_items: | |
| doc_parts.append("Action Items:") | |
| for item in meeting.action_items: | |
| status = "✓" if item.completed else "○" | |
| assignee = f"{item.assignee}: " if item.assignee else "" | |
| deadline = f" (by {item.deadline})" if item.deadline else "" | |
| doc_parts.append(f"{status} {assignee}{item.task}{deadline}") | |
| if meeting.blockers: | |
| doc_parts.append("Blockers:") | |
| doc_parts.extend([f"- {b}" for b in meeting.blockers]) | |
| full_doc = "\n".join(doc_parts) | |
| # Chunk the document | |
| chunks = self.text_splitter.split_text(full_doc) | |
| for chunk_idx, chunk in enumerate(chunks): | |
| documents.append(chunk) | |
| metadatas.append({ | |
| "meeting_idx": idx, | |
| "project": meeting.project_name, | |
| "title": meeting.title, | |
| "date": meeting.date.isoformat() if meeting.date else "", | |
| "file_path": meeting.file_path, | |
| "chunk_idx": chunk_idx | |
| }) | |
| ids.append(f"meeting_{idx}_chunk_{chunk_idx}") | |
| # Add to ChromaDB | |
| if documents: | |
| # Embed documents | |
| embeddings_list = self.embeddings.embed_documents(documents) | |
| self.collection.add( | |
| embeddings=embeddings_list, | |
| documents=documents, | |
| metadatas=metadatas, | |
| ids=ids | |
| ) | |
| print(f"Indexed {len(documents)} chunks from {len(self.meetings)} meetings") | |
| def search(self, query: str, n_results: int = 5, project_filter: str = None) -> List[Dict[str, Any]]: | |
| """Search for relevant meeting content.""" | |
| # Embed the query | |
| query_embedding = self.embeddings.embed_query(query) | |
| # Prepare where clause for filtering | |
| where = None | |
| if project_filter: | |
| where = {"project": project_filter} | |
| # Search in ChromaDB | |
| results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=n_results, | |
| where=where | |
| ) | |
| # Format results | |
| formatted_results = [] | |
| if results['documents'] and results['documents'][0]: | |
| for i in range(len(results['documents'][0])): | |
| formatted_results.append({ | |
| 'content': results['documents'][0][i], | |
| 'metadata': results['metadatas'][0][i], | |
| 'distance': results['distances'][0][i] if 'distances' in results else None | |
| }) | |
| return formatted_results | |
| def get_all_projects(self) -> List[str]: | |
| """Get list of all project names.""" | |
| return list(set(m.project_name for m in self.meetings)) | |
| def get_open_action_items(self, project: str = None) -> List[Dict[str, Any]]: | |
| """Get all open action items, optionally filtered by project.""" | |
| action_items = [] | |
| for meeting in self.meetings: | |
| if project and meeting.project_name != project: | |
| continue | |
| for item in meeting.action_items: | |
| if not item.completed: | |
| action_items.append({ | |
| 'project': meeting.project_name, | |
| 'meeting': meeting.title, | |
| 'date': meeting.date, | |
| 'assignee': item.assignee, | |
| 'task': item.task, | |
| 'deadline': item.deadline | |
| }) | |
| return action_items | |
| def get_blockers(self, project: str = None) -> List[Dict[str, Any]]: | |
| """Get all blockers, optionally filtered by project.""" | |
| blockers = [] | |
| for meeting in self.meetings: | |
| if project and meeting.project_name != project: | |
| continue | |
| for blocker in meeting.blockers: | |
| blockers.append({ | |
| 'project': meeting.project_name, | |
| 'meeting': meeting.title, | |
| 'date': meeting.date, | |
| 'blocker': blocker | |
| }) | |
| return blockers | |
| def get_recent_decisions(self, project: str = None, limit: int = 10) -> List[Dict[str, Any]]: | |
| """Get recent decisions, optionally filtered by project.""" | |
| decisions = [] | |
| for meeting in sorted(self.meetings, key=lambda m: m.date or datetime.min, reverse=True): | |
| if project and meeting.project_name != project: | |
| continue | |
| for decision in meeting.decisions: | |
| decisions.append({ | |
| 'project': meeting.project_name, | |
| 'meeting': meeting.title, | |
| 'date': meeting.date, | |
| 'decision': decision | |
| }) | |
| if len(decisions) >= limit: | |
| return decisions | |
| return decisions | |
| def get_project_documents(self, project: str) -> List: | |
| """Get all meeting documents for a specific project.""" | |
| from langchain_core.documents import Document | |
| documents = [] | |
| for meeting in sorted(self.meetings, key=lambda m: m.date or datetime.min): | |
| if meeting.project_name != project: | |
| continue | |
| # Build full meeting content | |
| doc_parts = [ | |
| f"# Meeting: {meeting.title}", | |
| f"**Date:** {meeting.date.strftime('%Y-%m-%d') if meeting.date else 'Unknown'}", | |
| ] | |
| if meeting.participants: | |
| doc_parts.append(f"**Participants:** {', '.join(meeting.participants)}") | |
| if meeting.discussion: | |
| doc_parts.append(f"\n## Discussion\n{meeting.discussion}") | |
| if meeting.decisions: | |
| doc_parts.append("\n## Decisions") | |
| doc_parts.extend([f"- {d}" for d in meeting.decisions]) | |
| if meeting.action_items: | |
| doc_parts.append("\n## Action Items") | |
| for item in meeting.action_items: | |
| status = "[x]" if item.completed else "[ ]" | |
| assignee = f"{item.assignee}: " if item.assignee else "" | |
| deadline = f" (by {item.deadline})" if item.deadline else "" | |
| doc_parts.append(f"- {status} {assignee}{item.task}{deadline}") | |
| if meeting.blockers: | |
| doc_parts.append("\n## Blockers") | |
| doc_parts.extend([f"- {b}" for b in meeting.blockers]) | |
| full_content = "\n".join(doc_parts) | |
| documents.append(Document( | |
| page_content=full_content, | |
| metadata={ | |
| "project": meeting.project_name, | |
| "title": meeting.title, | |
| "date": meeting.date.isoformat() if meeting.date else "" | |
| } | |
| )) | |
| return documents | |