Spaces:
Running
Phase 6 Implementation Spec: Embeddings & Semantic Search
Goal: Add vector search for semantic evidence retrieval. Philosophy: "Find what you mean, not just what you type." Prerequisite: Phase 5 complete (Magentic working)
1. Why Embeddings?
Current limitation: Keyword-only search misses semantically related papers.
Example problem:
- User searches: "metformin alzheimer"
- PubMed returns: Papers with exact keywords
- MISSED: Papers about "AMPK activation neuroprotection" (same mechanism, different words)
With embeddings:
- Embed the query AND all evidence
- Find semantically similar papers even without keyword match
- Deduplicate by meaning, not just URL
2. Architecture
Current (Phase 5)
Query β SearchAgent β PubMed/Web (keyword) β Evidence
Phase 6
Query β Embed(Query) β SearchAgent
βββ PubMed/Web (keyword) β Evidence
βββ VectorDB (semantic) β Related Evidence
β
Evidence β Embed β Store
Shared Context Enhancement
# Current
evidence_store = {"current": []}
# Phase 6
evidence_store = {
"current": [], # Raw evidence
"embeddings": {}, # URL -> embedding vector
"vector_index": None, # ChromaDB collection
}
3. Technology Choice
ChromaDB (Recommended)
- Free, open-source, local-first
- No API keys, no cloud dependency
- Supports sentence-transformers out of the box
- Perfect for hackathon (no infra setup)
Embedding Model
sentence-transformers/all-MiniLM-L6-v2(fast, good quality)- Or
BAAI/bge-small-en-v1.5(better quality, still fast)
4. Implementation
4.1 Dependencies
Add to pyproject.toml:
[project.optional-dependencies]
embeddings = [
"chromadb>=0.4.0",
"sentence-transformers>=2.2.0",
]
4.2 Embedding Service (src/services/embeddings.py)
CRITICAL: Async Pattern Required
sentence-transformersis synchronous and CPU-bound. Running it directly in async code will block the event loop, freezing the UI and halting all concurrent operations.Solution: Use
asyncio.run_in_executor()to offload to thread pool. This pattern already exists insrc/tools/websearch.py:28-34.
"""Embedding service for semantic search.
IMPORTANT: All public methods are async to avoid blocking the event loop.
The sentence-transformers model is CPU-bound, so we use run_in_executor().
"""
import asyncio
from typing import List
import chromadb
from sentence_transformers import SentenceTransformer
class EmbeddingService:
"""Handles text embedding and vector storage.
All embedding operations run in a thread pool to avoid blocking
the async event loop. See src/tools/websearch.py for the pattern.
"""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self._model = SentenceTransformer(model_name)
self._client = chromadb.Client() # In-memory for hackathon
self._collection = self._client.create_collection(
name="evidence",
metadata={"hnsw:space": "cosine"}
)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Sync internal methods (run in thread pool)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _sync_embed(self, text: str) -> List[float]:
"""Synchronous embedding - DO NOT call directly from async code."""
return self._model.encode(text).tolist()
def _sync_batch_embed(self, texts: List[str]) -> List[List[float]]:
"""Batch embedding for efficiency - DO NOT call directly from async code."""
return [e.tolist() for e in self._model.encode(texts)]
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Async public methods (safe for event loop)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def embed(self, text: str) -> List[float]:
"""Embed a single text (async-safe).
Uses run_in_executor to avoid blocking the event loop.
"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, self._sync_embed, text)
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""Batch embed multiple texts (async-safe, more efficient)."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, self._sync_batch_embed, texts)
async def add_evidence(self, evidence_id: str, content: str, metadata: dict) -> None:
"""Add evidence to vector store (async-safe)."""
embedding = await self.embed(content)
# ChromaDB operations are fast, but wrap for consistency
loop = asyncio.get_running_loop()
await loop.run_in_executor(
None,
lambda: self._collection.add(
ids=[evidence_id],
embeddings=[embedding],
metadatas=[metadata],
documents=[content]
)
)
async def search_similar(self, query: str, n_results: int = 5) -> List[dict]:
"""Find semantically similar evidence (async-safe)."""
query_embedding = await self.embed(query)
loop = asyncio.get_running_loop()
results = await loop.run_in_executor(
None,
lambda: self._collection.query(
query_embeddings=[query_embedding],
n_results=n_results
)
)
# Handle empty results gracefully
if not results["ids"] or not results["ids"][0]:
return []
return [
{"id": id, "content": doc, "metadata": meta, "distance": dist}
for id, doc, meta, dist in zip(
results["ids"][0],
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
)
]
async def deduplicate(self, new_evidence: List, threshold: float = 0.9) -> List:
"""Remove semantically duplicate evidence (async-safe)."""
unique = []
for evidence in new_evidence:
similar = await self.search_similar(evidence.content, n_results=1)
if not similar or similar[0]["distance"] > (1 - threshold):
unique.append(evidence)
await self.add_evidence(
evidence_id=evidence.citation.url,
content=evidence.content,
metadata={"source": evidence.citation.source}
)
return unique
4.3 Enhanced SearchAgent (src/agents/search_agent.py)
Update SearchAgent to use embeddings. Note: All embedding calls are awaited:
class SearchAgent(BaseAgent):
def __init__(
self,
search_handler: SearchHandlerProtocol,
evidence_store: dict,
embedding_service: EmbeddingService | None = None, # NEW
):
# ... existing init ...
self._embeddings = embedding_service
async def run(self, messages, *, thread=None, **kwargs) -> AgentRunResponse:
# ... extract query ...
# Execute keyword search
result = await self._handler.execute(query, max_results_per_tool=10)
# Semantic deduplication (NEW) - ALL CALLS ARE AWAITED
if self._embeddings:
# Deduplicate by semantic similarity (async-safe)
unique_evidence = await self._embeddings.deduplicate(result.evidence)
# Also search for semantically related evidence (async-safe)
related = await self._embeddings.search_similar(query, n_results=5)
# Merge related evidence not already in results
existing_urls = {e.citation.url for e in unique_evidence}
for item in related:
if item["id"] not in existing_urls:
# Reconstruct Evidence from stored data
# ... merge logic ...
# ... rest of method ...
4.4 Semantic Expansion in Orchestrator
The MagenticOrchestrator can use embeddings to expand queries:
# In task instruction
task = f"""Research drug repurposing opportunities for: {query}
The system has semantic search enabled. When evidence is found:
1. Related concepts will be automatically surfaced
2. Duplicates are removed by meaning, not just URL
3. Use the surfaced related concepts to refine searches
"""
4.5 HuggingFace Spaces Deployment
β οΈ Important for HF Spaces
sentence-transformersdownloads models (500MB) to `/.cache` on first use. HuggingFace Spaces have ephemeral storage - the cache is wiped on restart. This causes slow cold starts and bandwidth usage.
Solution: Pre-download the model in your Dockerfile:
# In Dockerfile
FROM python:3.11-slim
# Set cache directory
ENV HF_HOME=/app/.cache
ENV TRANSFORMERS_CACHE=/app/.cache
# Pre-download the embedding model during build
RUN pip install sentence-transformers && \
python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('all-MiniLM-L6-v2')"
# ... rest of Dockerfile
Alternative: Use environment variable to specify persistent path:
# In HF Spaces settings or app.yaml
env:
- name: HF_HOME
value: /data/.cache # Persistent volume
5. Directory Structure After Phase 6
src/
βββ services/ # NEW
β βββ __init__.py
β βββ embeddings.py # EmbeddingService
βββ agents/
β βββ search_agent.py # Updated with embeddings
β βββ judge_agent.py
βββ ...
6. Tests
6.1 Unit Tests (tests/unit/services/test_embeddings.py)
Note: All tests are async since the EmbeddingService methods are async.
"""Unit tests for EmbeddingService."""
import pytest
from src.services.embeddings import EmbeddingService
class TestEmbeddingService:
@pytest.mark.asyncio
async def test_embed_returns_vector(self):
"""Embedding should return a float vector."""
service = EmbeddingService()
embedding = await service.embed("metformin diabetes")
assert isinstance(embedding, list)
assert len(embedding) > 0
assert all(isinstance(x, float) for x in embedding)
@pytest.mark.asyncio
async def test_similar_texts_have_close_embeddings(self):
"""Semantically similar texts should have similar embeddings."""
service = EmbeddingService()
e1 = await service.embed("metformin treats diabetes")
e2 = await service.embed("metformin is used for diabetes treatment")
e3 = await service.embed("the weather is sunny today")
# Cosine similarity helper
from numpy import dot
from numpy.linalg import norm
cosine = lambda a, b: dot(a, b) / (norm(a) * norm(b))
# Similar texts should be closer
assert cosine(e1, e2) > cosine(e1, e3)
@pytest.mark.asyncio
async def test_batch_embed_efficient(self):
"""Batch embedding should be more efficient than individual calls."""
service = EmbeddingService()
texts = ["text one", "text two", "text three"]
# Batch embed
batch_results = await service.embed_batch(texts)
assert len(batch_results) == 3
assert all(isinstance(e, list) for e in batch_results)
@pytest.mark.asyncio
async def test_add_and_search(self):
"""Should be able to add evidence and search for similar."""
service = EmbeddingService()
await service.add_evidence(
evidence_id="test1",
content="Metformin activates AMPK pathway",
metadata={"source": "pubmed"}
)
results = await service.search_similar("AMPK activation drugs", n_results=1)
assert len(results) == 1
assert "AMPK" in results[0]["content"]
@pytest.mark.asyncio
async def test_search_similar_empty_collection(self):
"""Search on empty collection should return empty list, not error."""
service = EmbeddingService()
results = await service.search_similar("anything", n_results=5)
assert results == []
7. Definition of Done
Phase 6 is COMPLETE when:
EmbeddingServiceimplemented with ChromaDB- SearchAgent uses embeddings for deduplication
- Semantic search surfaces related evidence
- All unit tests pass
- Integration test shows improved recall (finds related papers)
8. Value Delivered
| Before (Phase 5) | After (Phase 6) |
|---|---|
| Keyword-only search | Semantic + keyword search |
| URL-based deduplication | Meaning-based deduplication |
| Miss related papers | Surface related concepts |
| Exact match required | Fuzzy semantic matching |
Real example improvement:
- Query: "metformin alzheimer"
- Before: Only papers mentioning both words
- After: Also finds "AMPK neuroprotection", "biguanide cognitive", etc.