BioGuideMCP / test_queries.py
stefanjwojcik's picture
Add setup script and comprehensive tests for Congressional Bioguide MCP Server
15de73a
#!/usr/bin/env python3
"""
Test script to validate the Congressional Bioguide database and search functionality.
"""
import sqlite3
import json
from pathlib import Path
def test_database():
"""Test database structure and basic queries."""
print("Testing Database...")
print("=" * 60)
if not Path("congress.db").exists():
print("❌ Database not found. Run ingest_data.py first.")
return False
conn = sqlite3.connect("congress.db")
cursor = conn.cursor()
# Test 1: Count members
cursor.execute("SELECT COUNT(*) FROM members")
member_count = cursor.fetchone()[0]
print(f"βœ“ Members in database: {member_count}")
# Test 2: Count job positions
cursor.execute("SELECT COUNT(*) FROM job_positions")
job_count = cursor.fetchone()[0]
print(f"βœ“ Job positions recorded: {job_count}")
# Test 3: Search by name
cursor.execute("""
SELECT bio_id, family_name, given_name, birth_date, death_date
FROM members
WHERE unaccented_family_name = 'Lincoln'
ORDER BY birth_date
""")
lincolns = cursor.fetchall()
print(f"\nβœ“ Found {len(lincolns)} member(s) with family name 'Lincoln':")
for bio_id, family, given, birth, death in lincolns:
print(f" - {given} {family} ({bio_id}): {birth} - {death or 'present'}")
# Test 4: Party breakdown
cursor.execute("""
SELECT party, COUNT(DISTINCT bio_id) as count
FROM job_positions
WHERE party IS NOT NULL
GROUP BY party
ORDER BY count DESC
LIMIT 10
""")
parties = cursor.fetchall()
print(f"\nβœ“ Top parties by member count:")
for party, count in parties:
print(f" - {party}: {count} members")
# Test 5: State representation
cursor.execute("""
SELECT region_code, COUNT(DISTINCT bio_id) as count
FROM job_positions
WHERE region_code IS NOT NULL AND region_type = 'StateRegion'
GROUP BY region_code
ORDER BY count DESC
LIMIT 10
""")
states = cursor.fetchall()
print(f"\nβœ“ Top states by member count:")
for state, count in states:
print(f" - {state}: {count} members")
# Test 6: Relationships
cursor.execute("SELECT COUNT(*) FROM relationships")
rel_count = cursor.fetchone()[0]
print(f"\nβœ“ Family relationships recorded: {rel_count}")
if rel_count > 0:
cursor.execute("""
SELECT m1.given_name, m1.family_name, r.relationship_type,
m2.given_name, m2.family_name
FROM relationships r
JOIN members m1 ON r.bio_id = m1.bio_id
JOIN members m2 ON r.related_bio_id = m2.bio_id
LIMIT 5
""")
relationships = cursor.fetchall()
print(" Sample relationships:")
for given1, family1, rel_type, given2, family2 in relationships:
print(f" - {given1} {family1} is {rel_type} of {given2} {family2}")
# Test 7: Profile text
cursor.execute("""
SELECT bio_id, given_name, family_name, LENGTH(profile_text) as text_len
FROM members
WHERE profile_text IS NOT NULL
ORDER BY text_len DESC
LIMIT 5
""")
longest_profiles = cursor.fetchall()
print(f"\nβœ“ Longest biography profiles:")
for bio_id, given, family, length in longest_profiles:
print(f" - {given} {family} ({bio_id}): {length} characters")
conn.close()
return True
def test_faiss_index():
"""Test FAISS index."""
print("\n\nTesting FAISS Index...")
print("=" * 60)
if not Path("congress_faiss.index").exists():
print("❌ FAISS index not found. Run ingest_data.py first.")
return False
if not Path("congress_bio_ids.pkl").exists():
print("❌ Bio ID mapping not found. Run ingest_data.py first.")
return False
try:
import faiss
import pickle
from sentence_transformers import SentenceTransformer
# Load index
index = faiss.read_index("congress_faiss.index")
with open("congress_bio_ids.pkl", "rb") as f:
bio_ids = pickle.load(f)
print(f"βœ“ FAISS index loaded: {index.ntotal} vectors")
print(f"βœ“ Dimension: {index.d}")
# Load model
model = SentenceTransformer('all-MiniLM-L6-v2')
print("βœ“ Sentence transformer model loaded")
# Test search
test_queries = [
"lawyers who became judges",
"Civil War veterans",
"served in the military",
"teachers and educators"
]
for query in test_queries:
print(f"\nβœ“ Testing query: '{query}'")
query_embedding = model.encode([query])[0].reshape(1, -1).astype('float32')
faiss.normalize_L2(query_embedding)
scores, indices = index.search(query_embedding, 3)
# Load database to get names
conn = sqlite3.connect("congress.db")
cursor = conn.cursor()
print(" Top 3 results:")
for idx, score in zip(indices[0], scores[0]):
if idx < len(bio_ids):
bio_id = bio_ids[idx]
cursor.execute(
"SELECT given_name, family_name FROM members WHERE bio_id = ?",
(bio_id,)
)
result = cursor.fetchone()
if result:
given, family = result
print(f" - {given} {family} ({bio_id}): score={score:.4f}")
conn.close()
return True
except ImportError as e:
print(f"❌ Missing dependency: {e}")
print(" Run: pip install -r requirements.txt")
return False
except Exception as e:
print(f"❌ Error testing FAISS: {e}")
return False
def test_sample_profile():
"""Display a sample profile."""
print("\n\nSample Profile...")
print("=" * 60)
conn = sqlite3.connect("congress.db")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get a well-known member
cursor.execute("""
SELECT * FROM members
WHERE unaccented_family_name = 'Lincoln' AND unaccented_given_name = 'Abraham'
LIMIT 1
""")
member = cursor.fetchone()
if member:
bio_id = member['bio_id']
print(f"Profile: {member['given_name']} {member['family_name']} ({bio_id})")
print(f"Birth: {member['birth_date']}")
print(f"Death: {member['death_date']}")
print(f"\nBiography excerpt:")
profile_text = member['profile_text'] or ""
print(f" {profile_text[:300]}...")
# Get positions
cursor.execute("""
SELECT job_name, party, congress_number, region_code, start_date, end_date
FROM job_positions
WHERE bio_id = ?
ORDER BY start_date
""", (bio_id,))
positions = cursor.fetchall()
if positions:
print(f"\nPositions held ({len(positions)}):")
for pos in positions:
print(f" - {pos['job_name']} ({pos['party']}), {pos['region_code']}")
print(f" Congress {pos['congress_number']}: {pos['start_date']} - {pos['end_date']}")
conn.close()
def main():
"""Run all tests."""
print("Congressional Bioguide Database Test Suite")
print("=" * 60)
print()
db_ok = test_database()
faiss_ok = test_faiss_index()
if db_ok:
test_sample_profile()
print("\n" + "=" * 60)
if db_ok and faiss_ok:
print("βœ“ All tests passed!")
print("\nThe system is ready to use. Start the MCP server with:")
print(" python3 server.py")
else:
print("❌ Some tests failed. Please check the errors above.")
if not db_ok:
print(" Run: python3 ingest_data.py")
if __name__ == "__main__":
main()