memvid-mcp / modal_memvid_service.py
eldarski
πŸŽ₯ Memvid MCP Server - Hackathon Submission - Complete MCP server with 24 tools for video-based AI memory storage - Dual storage with Modal GPU acceleration - Ready for Agents-MCP-Hackathon Track 1
168b0da
"""
Modal Memvid Service - GPU-accelerated video memory processing
This service provides:
- GPU-accelerated video processing using memvid library
- QR code generation and decoding optimization
- Modal object storage for MP4 files
- Auto-scaling based on video processing workload
"""
import os
import time
import json
import modal
from typing import List, Dict, Any, Optional
# Modal App Configuration
app = modal.App("memvid-video-service")
# Docker image with all video processing dependencies
memvid_image = (
modal.Image.debian_slim()
.pip_install(
[
"memvid>=0.1.0",
"opencv-python-headless>=4.8.0",
"pillow>=9.5.0",
"qrcode>=7.4.2",
"pyzbar>=0.1.9", # QR code decoding
"numpy>=1.24.0",
"torch>=2.0.0", # PyTorch for GPU acceleration
]
)
.apt_install(
[
"libzbar0", # For QR code decoding
"ffmpeg", # For video processing
"libgl1-mesa-glx", # OpenCV dependencies
"libglib2.0-0",
]
)
)
# Volume for persistent video storage
videos_volume = modal.Volume.from_name("memvid-videos", create_if_missing=True)
@app.function(
image=memvid_image,
gpu="T4", # GPU optimized for video processing
volumes={"/storage": videos_volume},
timeout=900, # 15 minutes timeout for video processing
cpu=4.0, # More CPU for video encoding
memory=8192, # 8GB RAM for video processing
)
def process_video_memory(
text: str, client_id: str, metadata: Dict[str, Any]
) -> Dict[str, Any]:
"""
GPU-accelerated video memory processing on Modal
Args:
text: Text content to store as video memory
client_id: Unique identifier for the client/user
metadata: Additional metadata for the memory
Returns:
Dict with processing results and metrics
"""
import sys
sys.path.append("/storage")
from memvid import MemvidEncoder, MemvidRetriever
import shutil
import uuid
start_time = time.time()
processing_metrics = {"gpu_used": "T4", "cpu_count": 4, "memory_gb": 8}
try:
# Setup storage paths in Modal volume
client_storage_path = f"/storage/{client_id}"
os.makedirs(client_storage_path, exist_ok=True)
print(f"🎬 Processing video memory for client: {client_id}")
print(f"πŸ“ Text content: {text[:100]}...")
# Initialize memvid encoder with Modal storage
encoder = MemvidEncoder()
# Process video memory with GPU acceleration
video_start_time = time.time()
# Add text to encoder and build video
encoder.add_text(text)
# Create output paths
video_file = f"{client_storage_path}/videos/memory_{int(time.time())}.mp4"
index_file = (
f"{client_storage_path}/videos/memory_{int(time.time())}_index.json"
)
# Ensure directories exist
os.makedirs(os.path.dirname(video_file), exist_ok=True)
# Build video with QR codes
result = encoder.build_video(video_file, index_file)
video_processing_time = time.time() - video_start_time
processing_metrics["video_processing_time"] = video_processing_time
# Get file information
video_files = []
chunk_files = []
if os.path.exists(client_storage_path):
# Find video files
videos_dir = os.path.join(client_storage_path, "videos")
if os.path.exists(videos_dir):
for file in os.listdir(videos_dir):
if file.endswith(".mp4"):
file_path = os.path.join(videos_dir, file)
file_size = os.path.getsize(file_path)
video_files.append(
{
"filename": file,
"size_bytes": file_size,
"path": file_path,
}
)
# Find chunk files
chunks_dir = os.path.join(client_storage_path, "chunks")
if os.path.exists(chunks_dir):
for file in os.listdir(chunks_dir):
if file.endswith(".txt"):
file_path = os.path.join(chunks_dir, file)
file_size = os.path.getsize(file_path)
chunk_files.append(
{
"filename": file,
"size_bytes": file_size,
"path": file_path,
}
)
# Calculate storage metrics
total_video_size = sum(f["size_bytes"] for f in video_files)
total_chunks_size = sum(f["size_bytes"] for f in chunk_files)
processing_metrics.update(
{
"video_files_count": len(video_files),
"chunk_files_count": len(chunk_files),
"total_video_size": total_video_size,
"total_chunks_size": total_chunks_size,
"total_storage_size": total_video_size + total_chunks_size,
}
)
# Generate unique memory ID
memory_id = f"modal_video_{client_id}_{int(time.time())}_{uuid.uuid4().hex[:8]}"
total_time = time.time() - start_time
processing_metrics["total_time"] = total_time
print(f"βœ… Video memory processed successfully")
print(f"πŸ“Š Created {len(video_files)} videos, {len(chunk_files)} chunks")
print(f"πŸ’Ύ Total storage: {total_video_size + total_chunks_size} bytes")
print(f"⏱️ Processing time: {total_time:.2f}s")
return {
"success": True,
"memory_id": memory_id,
"client_id": client_id,
"video_files": video_files,
"chunk_files": chunk_files,
"processing_metrics": processing_metrics,
"metadata": metadata,
"storage_path": client_storage_path,
"infrastructure": "Modal + T4 GPU + Volume Storage",
}
except Exception as e:
print(f"❌ Error in video processing: {str(e)}")
processing_metrics["error_time"] = time.time() - start_time
return {
"success": False,
"error": str(e),
"processing_metrics": processing_metrics,
"infrastructure": "Modal + T4 GPU + Volume Storage",
}
@app.function(
image=memvid_image,
gpu="T4",
volumes={"/storage": videos_volume},
timeout=600, # 10 minutes timeout for search operations
cpu=2.0,
memory=4096, # 4GB RAM for search
)
def search_video_memory(
query: str, client_id: str, memory_name: Optional[str] = None, top_k: int = 5
) -> Dict[str, Any]:
"""
GPU-accelerated video memory search on Modal
Args:
query: Search query text
client_id: Client identifier to search within
memory_name: Optional specific memory name filter
top_k: Number of top results to return
Returns:
Dict with search results and metrics
"""
import sys
sys.path.append("/storage")
from memvid import MemvidEncoder, MemvidRetriever
start_time = time.time()
try:
print(f"πŸ” Searching video memory for query: {query}")
print(f"πŸ‘€ Client: {client_id}")
# Initialize memvid retriever with Modal storage
client_storage_path = f"/storage/{client_id}"
# Find video files for this client
videos_dir = os.path.join(client_storage_path, "videos")
video_files = []
if os.path.exists(videos_dir):
for file in os.listdir(videos_dir):
if file.endswith(".mp4"):
video_files.append(os.path.join(videos_dir, file))
if not video_files:
return {
"success": True,
"query": query,
"client_id": client_id,
"results": [],
"total_results": 0,
"message": "No video memories found for this client",
"processing_metrics": {
"search_time": 0,
"total_time": time.time() - start_time,
"gpu_used": "T4",
"infrastructure": "Modal + Video Processing",
},
}
# Perform video-based search
search_start_time = time.time()
# Search through available video files
results = []
for video_file in video_files[:1]: # Search first video for now
try:
# Find corresponding index file
index_file = video_file.replace(".mp4", "_index.json")
if not os.path.exists(index_file):
# Try alternative index file naming
index_file = video_file.replace(".mp4", ".json")
if not os.path.exists(index_file):
print(f"No index file found for {video_file}")
continue
# Initialize retriever with video and index files
retriever = MemvidRetriever(video_file, index_file)
video_results = retriever.search(query, top_k=top_k)
if video_results:
results.extend(video_results)
except Exception as e:
print(f"Error searching video {video_file}: {e}")
continue
search_time = time.time() - search_start_time
# Format results for consistency
formatted_results = []
if isinstance(results, list):
for i, result in enumerate(results[:top_k]):
if isinstance(result, dict):
formatted_results.append(
{
"memory_id": result.get("id", f"video_result_{i}"),
"text": result.get("text", result.get("content", "")),
"metadata": result.get("metadata", {}),
"similarity_score": result.get(
"score", 0.8
), # Default score
"video_file": result.get("video_file", ""),
"chunk_file": result.get("chunk_file", ""),
}
)
elif isinstance(result, str):
formatted_results.append(
{
"memory_id": f"video_result_{i}",
"text": result,
"metadata": {},
"similarity_score": 0.75,
"video_file": "",
"chunk_file": "",
}
)
elif isinstance(results, str):
# Single result
formatted_results.append(
{
"memory_id": "video_result_0",
"text": results,
"metadata": {},
"similarity_score": 0.8,
"video_file": "",
"chunk_file": "",
}
)
total_time = time.time() - start_time
print(f"βœ… Video search completed")
print(f"πŸ“Š Found {len(formatted_results)} results")
print(f"⏱️ Search time: {search_time:.2f}s, Total time: {total_time:.2f}s")
return {
"success": True,
"query": query,
"client_id": client_id,
"results": formatted_results,
"total_results": len(formatted_results),
"processing_metrics": {
"search_time": search_time,
"total_time": total_time,
"gpu_used": "T4",
"infrastructure": "Modal + Video Processing",
},
}
except Exception as e:
print(f"❌ Error in video search: {str(e)}")
return {
"success": False,
"error": str(e),
"processing_time": time.time() - start_time,
"results": [],
"infrastructure": "Modal + T4 GPU + Volume Storage",
}
@app.function(
image=memvid_image,
volumes={"/storage": videos_volume},
timeout=60,
)
def get_video_stats(client_id: str) -> Dict[str, Any]:
"""
Get statistics for video storage
Args:
client_id: Client identifier
Returns:
Dict with storage statistics
"""
import os
import json
try:
client_storage_path = f"/storage/{client_id}"
if not os.path.exists(client_storage_path):
return {
"client_id": client_id,
"storage_type": "modal_video",
"memory_count": 0,
"total_video_size": 0,
"total_chunks": 0,
"infrastructure": "Modal + T4 GPU + Volume Storage",
}
# Count video files
videos_dir = os.path.join(client_storage_path, "videos")
video_count = 0
total_video_size = 0
if os.path.exists(videos_dir):
for file in os.listdir(videos_dir):
if file.endswith(".mp4"):
video_count += 1
file_path = os.path.join(videos_dir, file)
total_video_size += os.path.getsize(file_path)
# Count chunk files
chunks_dir = os.path.join(client_storage_path, "chunks")
chunk_count = 0
total_chunks_size = 0
if os.path.exists(chunks_dir):
for file in os.listdir(chunks_dir):
if file.endswith(".txt"):
chunk_count += 1
file_path = os.path.join(chunks_dir, file)
total_chunks_size += os.path.getsize(file_path)
# Get metadata if available
metadata_file = os.path.join(client_storage_path, "metadata.json")
first_memory = None
last_memory = None
if os.path.exists(metadata_file):
try:
with open(metadata_file, "r") as f:
metadata = json.load(f)
# Extract creation times if available
first_memory = metadata.get("first_memory")
last_memory = metadata.get("last_memory")
except:
pass
return {
"client_id": client_id,
"storage_type": "modal_video",
"memory_count": video_count,
"total_video_size": total_video_size,
"total_chunks": chunk_count,
"total_chunks_size": total_chunks_size,
"total_storage_size": total_video_size + total_chunks_size,
"first_memory": first_memory,
"last_memory": last_memory,
"infrastructure": "Modal + T4 GPU + Volume Storage",
"storage_path": client_storage_path,
}
except Exception as e:
return {
"client_id": client_id,
"storage_type": "modal_video",
"error": str(e),
"infrastructure": "Modal + T4 GPU + Volume Storage",
}
# Client class for easy integration with DualStorageManager
class ModalMemvidClient:
"""Client for interacting with Modal Memvid Service"""
def __init__(self, modal_token: Optional[str] = None):
"""
Initialize Modal Memvid Client
Args:
modal_token: Optional Modal token (uses environment if not provided)
"""
if modal_token:
os.environ["MODAL_TOKEN"] = modal_token
# Test Modal connection
try:
import modal
print("βœ… Modal Memvid Client initialized successfully")
except Exception as e:
print(f"⚠️ Modal Memvid Client initialization warning: {e}")
def store_memory(
self, text: str, client_id: str, metadata: Dict[str, Any]
) -> Dict[str, Any]:
"""Store memory using Modal memvid service"""
try:
# Use the deployed app's function with correct Modal calling pattern
import modal
func = modal.Function.from_name(
"memvid-video-service", "process_video_memory"
)
return func.remote(text, client_id, metadata)
except Exception as e:
return {"success": False, "error": f"Modal memvid storage failed: {e}"}
def search_memory(
self,
query: str,
client_id: str,
memory_name: Optional[str] = None,
top_k: int = 5,
) -> Dict[str, Any]:
"""Search memory using Modal memvid service"""
try:
# Use the deployed app's function with correct Modal calling pattern
import modal
func = modal.Function.from_name(
"memvid-video-service", "search_video_memory"
)
return func.remote(query, client_id, memory_name, top_k)
except Exception as e:
return {
"success": False,
"error": f"Modal memvid search failed: {e}",
"results": [],
}
def get_stats(self, client_id: str) -> Dict[str, Any]:
"""Get statistics using Modal memvid service"""
try:
# Use the deployed app's function with correct Modal calling pattern
import modal
func = modal.Function.from_name("memvid-video-service", "get_video_stats")
return func.remote(client_id)
except Exception as e:
return {"success": False, "error": f"Modal memvid stats failed: {e}"}
def list_memories(self, client_id: str) -> str:
"""List memories for client (Modal implementation)"""
try:
stats = self.get_stats(client_id)
if stats.get(
"success", True
): # Modal stats don't have success field currently
memory_list = {
"client_id": client_id,
"storage_type": "modal_video",
"memory_count": stats.get("memory_count", 0),
"memories": [], # Modal doesn't currently track individual memory names
"total_size": stats.get("total_storage_size", 0),
"infrastructure": "Modal + T4 GPU + Volume Storage",
}
return json.dumps(memory_list, indent=2)
else:
return json.dumps(
{
"error": f"Failed to list memories: {stats.get('error', 'Unknown error')}"
}
)
except Exception as e:
return json.dumps({"error": f"Modal memvid list_memories failed: {e}"})
def build_memory_video(self, client_id: str, memory_name: str) -> str:
"""Build memory video (Modal implementation)"""
# For Modal, videos are built automatically during storage
return f"Memory videos are automatically built during storage in Modal for client {client_id}. Memory name: {memory_name}"
def chat_with_memory(self, query: str, client_id: str, memory_name: str) -> str:
"""Chat with memory using Modal memvid service"""
try:
# Use search as basis for chat
search_results = self.search_memory(query, client_id, memory_name, top_k=3)
if search_results.get("success", False):
results = search_results.get("results", [])
if results:
# Simple chat response based on search results
context = "\n".join(
[result.get("text", "") for result in results[:2]]
)
response = f"Based on your memories: {context}\n\nYour query '{query}' relates to the stored information above."
return response
else:
return f"I couldn't find any relevant memories for '{query}' in your video storage."
else:
return f"Error accessing memories: {search_results.get('error', 'Unknown error')}"
except Exception as e:
return f"Modal memvid chat failed: {e}"
def delete_memory(self, client_id: str, memory_name: str) -> str:
"""Delete memory (Modal implementation)"""
# Modal currently doesn't support selective deletion
return f"Memory deletion not yet implemented in Modal for client {client_id}, memory {memory_name}"
def get_memory_stats(self, client_id: str) -> str:
"""Get memory statistics as JSON string"""
try:
stats = self.get_stats(client_id)
return json.dumps(stats, indent=2)
except Exception as e:
return json.dumps({"error": f"Modal memvid get_memory_stats failed: {e}"})
if __name__ == "__main__":
# Test the Modal functions locally
print("πŸ§ͺ Testing Modal Memvid Service...")
# Test client
client = ModalMemvidClient()
# Test storage
result = client.store_memory(
"This is a test memory for Modal video storage with GPU acceleration",
"test_client",
{"test": True, "timestamp": time.time()},
)
print(f"🎬 Storage result: {result}")
# Test search
search_result = client.search_memory("test memory GPU", "test_client", top_k=3)
print(f"πŸ” Search result: {search_result}")
# Test stats
stats = client.get_stats("test_client")
print(f"οΏ½οΏ½ Stats: {stats}")