SmartLedger / modal /modal_functions.py
weitu's picture
update with llm inference on hyperbolic
fe311a3
"""
Optimized Modal functions with better container lifecycle management
"""
import modal
from typing import Dict, Any
from datetime import datetime
import requests
import base64
import json
import os
app = modal.App("smartledger")
# Optimized image with model pre-loading
model_image = (
modal.Image.debian_slim(python_version="3.11")
.pip_install([
"sentence-transformers>=2.2.0",
"transformers>=4.35.0",
"torch>=2.0.0",
"pandas>=2.0.0",
"numpy>=1.24.0",
"llama-index>=0.10.0",
"scikit-learn>=1.3.0",
"accelerate>=0.20.0",
])
.run_commands([
# Pre-download models during build (faster cold starts)
"python -c 'from sentence_transformers import SentenceTransformer; SentenceTransformer(\"all-MiniLM-L6-v2\")'",
"python -c 'from transformers import AutoTokenizer, AutoModelForCausalLM; AutoTokenizer.from_pretrained(\"microsoft/DialoGPT-medium\"); AutoModelForCausalLM.from_pretrained(\"microsoft/DialoGPT-medium\")'",
])
)
# Persistent storage that survives container restarts
session_storage = modal.Dict.from_name("smartledger-sessions", create_if_missing=True)
# Vision model configuration (now using Hyperbolic endpoint)
# Inject HYPERBOLIC_API_KEY as a Modal secret for all relevant functions
HYPERBOLIC_API_URL = "https://api.hyperbolic.xyz/v1/chat/completions"
HYPERBOLIC_MODEL_NAME = "Qwen/Qwen2.5-VL-7B-Instruct"
# Global variables to cache models
_embedding_model = None
_tokenizer = None
_llm_model = None
def _load_models():
"""Load models once and cache globally"""
global _embedding_model, _tokenizer, _llm_model
if _embedding_model is None:
print("πŸš€ Loading embedding model...")
from sentence_transformers import SentenceTransformer
_embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
print("βœ… Embedding model loaded")
if _tokenizer is None or _llm_model is None:
print("πŸš€ Loading LLM model...")
from transformers import AutoTokenizer, AutoModelForCausalLM
_tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium")
_tokenizer.pad_token = _tokenizer.eos_token
_llm_model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-medium")
print("βœ… LLM model loaded")
return _embedding_model, _tokenizer, _llm_model
@app.function(
image=model_image,
memory=4096,
timeout=1800, # 30 minutes timeout
min_containers=1 # Keep one container warm
)
def create_index(csv_data: str, session_id: str) -> Dict[str, Any]:
"""Create index using cached models"""
try:
import pandas as pd
from io import StringIO
# Load models
embedding_model, tokenizer, llm_model = _load_models()
print(f"πŸ“Š Creating index for session: {session_id}")
# Parse CSV
df = pd.read_csv(StringIO(csv_data))
# Create embeddings
doc_texts = []
doc_metadata = []
for _, row in df.iterrows():
doc_text = f"Date: {row.get('date', '')} Amount: ${row.get('amount', 0)} Description: {row.get('description', '')} Category: {row.get('category', '')}"
doc_texts.append(doc_text)
doc_metadata.append(row.to_dict())
# Generate embeddings
print(f"πŸ”— Generating embeddings for {len(doc_texts)} transactions...")
embeddings = embedding_model.encode(doc_texts)
# Store in persistent storage
index_data = {
"embeddings": embeddings.tolist(),
"doc_texts": doc_texts,
"doc_metadata": doc_metadata,
"total_transactions": len(doc_texts),
"created_at": datetime.now().isoformat()
}
session_storage[session_id] = index_data
session_storage[f"session_active_{session_id}"] = {
"session_id": session_id,
"created_at": datetime.now().isoformat(),
"status": "active"
}
print(f"βœ… Index created for {len(doc_texts)} transactions")
return {
"status": "success",
"session_id": session_id,
"total_transactions": len(doc_texts),
"created_at": datetime.now().isoformat()
}
except Exception as e:
print(f"❌ Error creating index: {e}")
return {"status": "error", "error": str(e)}
@app.function(
image=model_image,
memory=4096,
timeout=600,
min_containers=1 # Keep one container warm
)
def query_data(query: str, session_id: str) -> Dict[str, Any]:
"""Query using cached models"""
try:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import torch
# Load models
embedding_model, tokenizer, llm_model = _load_models()
print(f"πŸ” Processing query: '{query}' for session: {session_id}")
# Get session data
if session_id not in session_storage:
return {"status": "error", "error": "Session not found"}
session_data = session_storage[session_id]
# Generate query embedding
query_embedding = embedding_model.encode([query])
# Find similar transactions
doc_embeddings = np.array(session_data["embeddings"])
similarities = cosine_similarity(query_embedding, doc_embeddings)[0]
# Get top matches
top_indices = np.argsort(similarities)[::-1][:5]
matching_transactions = []
relevant_context = []
for idx in top_indices:
if similarities[idx] > 0.1:
doc_text = session_data["doc_texts"][idx]
metadata = session_data["doc_metadata"][idx]
matching_transactions.append({
"text": doc_text,
"metadata": metadata,
"similarity": float(similarities[idx])
})
relevant_context.append(doc_text)
# Generate LLM analysis
llm_analysis = ""
if matching_transactions:
context = f"Financial Query: {query}\n\nRelevant Transactions:\n{chr(10).join(relevant_context[:3])}\n\nAnalysis:"
inputs = tokenizer.encode(context, return_tensors="pt", max_length=512, truncation=True)
with torch.no_grad():
outputs = llm_model.generate(
inputs,
max_new_tokens=80,
temperature=0.7,
do_sample=True,
pad_token_id=tokenizer.eos_token_id
)
full_response = tokenizer.decode(outputs[0], skip_special_tokens=True)
llm_analysis = full_response[len(context):].strip()
# Calculate summary
total_amount = sum(float(t["metadata"].get("amount", 0)) for t in matching_transactions)
return {
"status": "success",
"query": query,
"llm_analysis": llm_analysis,
"matching_transactions": len(matching_transactions),
"total_amount": total_amount,
"results": matching_transactions,
"processed_at": datetime.now().isoformat()
}
except Exception as e:
print(f"❌ Error processing query: {e}")
return {"status": "error", "error": str(e)}
@app.function(image=model_image)
def check_health() -> Dict[str, Any]:
"""Quick health check"""
try:
# Load models
embedding_model, tokenizer, llm_model = _load_models()
# Test embedding model
_ = embedding_model.encode(["test transaction"])
# Test LLM
_ = tokenizer.encode("Test", return_tensors="pt")
return {
"status": "healthy",
"embedding_model": "ready",
"llm_model": "ready",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
@app.function(
image=model_image,
memory=2048,
timeout=300,
secrets=[modal.Secret.from_name("HYPERBOLIC_API_KEY")],
)
def process_image_transactions(image_data, session_id: str, filename: str = "uploaded_image") -> Dict[str, Any]:
"""
Process image to extract transaction data using Llama 3.2 Vision Instruct
Args:
image_data: File object or base64 encoded string
session_id: Session ID for storing extracted transactions
filename: Original filename for reference
Returns:
Dictionary containing extracted transactions and metadata
"""
try:
print(f"πŸ“· Processing image for session: {session_id}")
print(f"πŸ” Debug - image_data type: {type(image_data)}")
print(f"πŸ” Debug - image_data dir: {dir(image_data)}")
print(f"πŸ” Debug - hasattr read: {hasattr(image_data, 'read')}")
# Handle file object vs base64 string
if hasattr(image_data, 'read'):
# It's a file object, read and encode
print("πŸ“– Reading file object...")
image_bytes = image_data.read()
print(f"πŸ“ Read {len(image_bytes)} bytes")
image_b64 = base64.b64encode(image_bytes).decode('utf-8')
print(f"πŸ”€ Encoded to base64 length: {len(image_b64)}")
elif isinstance(image_data, str):
# It's already base64 encoded
print("πŸ“ Using string as base64...")
image_b64 = image_data
else:
# It's raw bytes
print(f"πŸ”’ Converting bytes to base64... type: {type(image_data)}")
image_b64 = base64.b64encode(image_data).decode('utf-8')
# Create optimized prompt for transaction extraction
prompt = """You are a financial document analyzer. Extract transaction data from this bank statement, receipt, or financial document.
For each transaction you find, return ONLY a valid JSON array with this exact format:
[
{
"date": "YYYY-MM-DD",
"amount": 123.45,
"vendor": "Vendor Name",
"description": "Transaction description",
"type": "debit"
}
]
Rules:
- Extract ALL transactions visible in the image
- Use negative amounts for debits/expenses, positive for credits/income
- Parse dates to YYYY-MM-DD format
- Clean vendor names (remove extra spaces, standardize)
- Include meaningful descriptions
- Return ONLY the JSON array, no other text
- If no transactions found, return: []
Analyze the image and extract all transaction data:"""
# Call Hyperbolic vision model endpoint
hyperbolic_api_key = os.environ["HYPERBOLIC_API_KEY"]
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {hyperbolic_api_key}",
}
payload = {
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_b64}"},
},
],
}],
"model": HYPERBOLIC_MODEL_NAME,
"max_tokens": 2000,
"temperature": 0.1,
"top_p": 0.001,
}
print("🧠 Calling Hyperbolic Vision model...")
print(f"Hyperbolic API URL: {HYPERBOLIC_API_URL}")
response = requests.post(
HYPERBOLIC_API_URL,
headers=headers,
json=payload,
timeout=60
)
if response.status_code != 200:
return {
"status": "error",
"error": f"Vision model API failed: {response.status_code} - {response.text}",
"transactions": [],
"total_transactions": 0
}
result = response.json()
response_text = result["choices"][0]["message"]["content"]
# Parse JSON response
try:
# Find JSON array in response
response_text = response_text.strip()
start_idx = response_text.find('[')
end_idx = response_text.rfind(']') + 1
if start_idx == -1 or end_idx == 0:
print(f"⚠️ No JSON array found in response")
transactions = []
else:
json_str = response_text[start_idx:end_idx]
transactions = json.loads(json_str)
# Clean and validate transactions
cleaned_transactions = []
for txn in transactions:
if isinstance(txn, dict) and "amount" in txn and "vendor" in txn:
try:
txn["amount"] = float(txn["amount"])
txn["source"] = "image_extraction"
txn["extracted_at"] = datetime.now().isoformat()
cleaned_transactions.append(txn)
except (ValueError, TypeError):
print(f"⚠️ Invalid transaction amount: {txn}")
transactions = cleaned_transactions
except json.JSONDecodeError as e:
print(f"❌ JSON parsing error: {e}")
transactions = []
# Store extracted transactions for this session
image_session_key = f"{session_id}_image_transactions"
session_storage[image_session_key] = {
"transactions": transactions,
"total_transactions": len(transactions),
"filename": filename,
"extracted_at": datetime.now().isoformat(),
"raw_response": response_text[:500] if len(response_text) > 500 else response_text
}
print(f"βœ… Extracted {len(transactions)} transactions from image")
return {
"status": "success",
"transactions": transactions,
"total_transactions": len(transactions),
"filename": filename,
"extracted_at": datetime.now().isoformat(),
"session_id": session_id
}
except Exception as e:
print(f"❌ Error processing image: {e}")
return {
"status": "error",
"error": str(e),
"transactions": [],
"total_transactions": 0,
"filename": filename,
"extracted_at": datetime.now().isoformat()
}
@app.function(
image=model_image,
memory=2048,
timeout=300
)
def reconcile_transactions(session_id: str) -> Dict[str, Any]:
"""
Reconcile image-extracted transactions with CSV ledger entries
Args:
session_id: Session ID containing both CSV and image transaction data
Returns:
Dictionary containing matches, unmatched transactions, and confidence scores
"""
try:
print(f"πŸ”„ Reconciling transactions for session: {session_id}")
# Get CSV transactions
if session_id not in session_storage:
return {
"status": "error",
"error": "No CSV data found for session. Please upload and analyze CSV first."
}
csv_data = session_storage[session_id]
csv_transactions = csv_data.get("doc_metadata", [])
# Get image transactions
image_session_key = f"{session_id}_image_transactions"
if image_session_key not in session_storage:
return {
"status": "error",
"error": "No image data found for session. Please upload and process an image first."
}
image_data = session_storage[image_session_key]
image_transactions = image_data.get("transactions", [])
if not csv_transactions or not image_transactions:
return {
"status": "error",
"error": f"Insufficient data: {len(csv_transactions)} CSV transactions, {len(image_transactions)} image transactions"
}
print(f"Reconciling {len(image_transactions)} image transactions with {len(csv_transactions)} CSV transactions")
# Simple reconciliation algorithm
matches = []
matched_csv_indices = set()
matched_image_indices = set()
for i, image_txn in enumerate(image_transactions):
best_match = None
best_score = 0
best_csv_idx = -1
for j, csv_txn in enumerate(csv_transactions):
if j in matched_csv_indices:
continue
score = 0
reasons = []
discrepancies = []
# Amount matching (40% weight)
try:
image_amount = float(image_txn.get("amount", 0))
csv_amount = float(csv_txn.get("amount", 0))
amount_diff = abs(image_amount - csv_amount)
if amount_diff <= 0.01: # Exact match
score += 0.4
reasons.append(f"Exact amount match: ${csv_amount}")
elif amount_diff <= max(0.01, abs(csv_amount) * 0.02): # 2% tolerance
score += 0.35
reasons.append(f"Close amount match: ${csv_amount} β‰ˆ ${image_amount}")
elif amount_diff <= abs(csv_amount) * 0.05: # 5% tolerance
score += 0.2
reasons.append(f"Approximate amount match: ${csv_amount} β‰ˆ ${image_amount}")
else:
discrepancies.append(f"Amount difference: ${csv_amount} vs ${image_amount}")
except:
discrepancies.append("Invalid amount data")
# Date matching (25% weight) - simplified
try:
from datetime import datetime
csv_date_str = str(csv_txn.get("date", ""))
image_date_str = str(image_txn.get("date", ""))
if csv_date_str[:10] == image_date_str[:10]: # Same date (YYYY-MM-DD)
score += 0.25
reasons.append(f"Exact date match: {csv_date_str[:10]}")
elif abs(len(csv_date_str) - len(image_date_str)) <= 3: # Close dates
score += 0.15
reasons.append(f"Similar dates: {csv_date_str[:10]} β‰ˆ {image_date_str[:10]}")
else:
discrepancies.append(f"Date difference: {csv_date_str[:10]} vs {image_date_str[:10]}")
except:
discrepancies.append("Invalid date data")
# Vendor matching (25% weight) - simplified
try:
csv_vendor = str(csv_txn.get("vendor", "")).upper().strip()
image_vendor = str(image_txn.get("vendor", "")).upper().strip()
if csv_vendor == image_vendor:
score += 0.25
reasons.append(f"Exact vendor match: {csv_vendor}")
elif csv_vendor in image_vendor or image_vendor in csv_vendor:
score += 0.2
reasons.append(f"Partial vendor match: {csv_vendor} β‰ˆ {image_vendor}")
elif len(set(csv_vendor.split()).intersection(set(image_vendor.split()))) > 0:
score += 0.1
reasons.append(f"Vendor keyword match: {csv_vendor} β‰ˆ {image_vendor}")
else:
discrepancies.append(f"Vendor difference: {csv_vendor} vs {image_vendor}")
except:
discrepancies.append("Invalid vendor data")
# Description matching (10% weight) - simplified
try:
csv_desc = str(csv_txn.get("description", "")).upper().strip()
image_desc = str(image_txn.get("description", "")).upper().strip()
if csv_desc and image_desc:
if csv_desc == image_desc:
score += 0.1
reasons.append(f"Description match: {csv_desc}")
elif csv_desc in image_desc or image_desc in csv_desc:
score += 0.05
reasons.append(f"Partial description match")
except:
pass
# Store best match for this image transaction
if score > best_score and score > 0.3: # Minimum threshold
best_score = score
best_match = {
"csv_transaction": csv_txn,
"image_transaction": image_txn,
"confidence_score": round(score, 3),
"match_reasons": reasons,
"discrepancies": discrepancies,
"match_type": "high" if score >= 0.85 else "medium" if score >= 0.65 else "low"
}
best_csv_idx = j
# Add best match if found
if best_match:
matches.append(best_match)
matched_csv_indices.add(best_csv_idx)
matched_image_indices.add(i)
# Calculate unmatched transactions
unmatched_csv = [csv_transactions[i] for i in range(len(csv_transactions)) if i not in matched_csv_indices]
unmatched_image = [image_transactions[i] for i in range(len(image_transactions)) if i not in matched_image_indices]
# Categorize matches
high_confidence = [m for m in matches if m["confidence_score"] >= 0.85]
medium_confidence = [m for m in matches if 0.65 <= m["confidence_score"] < 0.85]
low_confidence = [m for m in matches if m["confidence_score"] < 0.65]
# Calculate summary
match_rate = len(matches) / len(image_transactions) if image_transactions else 0
total_image_amount = sum(float(txn.get("amount", 0)) for txn in image_transactions)
total_matched_amount = sum(float(m["image_transaction"].get("amount", 0)) for m in matches)
result = {
"status": "success",
"summary": {
"total_image_transactions": len(image_transactions),
"total_csv_transactions": len(csv_transactions),
"total_matches": len(matches),
"match_rate": round(match_rate * 100, 1),
"high_confidence_matches": len(high_confidence),
"medium_confidence_matches": len(medium_confidence),
"low_confidence_matches": len(low_confidence),
"unmatched_image_transactions": len(unmatched_image),
"unmatched_csv_transactions": len(unmatched_csv),
"total_image_amount": round(total_image_amount, 2),
"total_matched_amount": round(total_matched_amount, 2),
"reconciliation_percentage": round((total_matched_amount / total_image_amount * 100) if total_image_amount else 0, 1)
},
"matches": {
"high_confidence": high_confidence,
"medium_confidence": medium_confidence,
"low_confidence": low_confidence
},
"unmatched": {
"image_transactions": unmatched_image,
"csv_transactions": unmatched_csv
},
"reconciled_at": datetime.now().isoformat(),
"session_id": session_id
}
# Store reconciliation results
reconciliation_key = f"{session_id}_reconciliation"
session_storage[reconciliation_key] = result
print(f"βœ… Reconciliation complete: {len(matches)} matches found ({match_rate*100:.1f}% match rate)")
return result
except Exception as e:
print(f"❌ Error during reconciliation: {e}")
return {
"status": "error",
"error": str(e),
"summary": {},
"matches": {"high_confidence": [], "medium_confidence": [], "low_confidence": []},
"unmatched": {"image_transactions": [], "csv_transactions": []},
"reconciled_at": datetime.now().isoformat()
}
@app.function(image=model_image)
def list_sessions():
"""Debug function to list all active sessions"""
try:
return {
"status": "success",
"sessions": dict(session_storage),
"session_count": len(session_storage)
}
except Exception as e:
return {"status": "error", "error": str(e)}
if __name__ == "__main__":
print("SmartLedger Modal functions with image processing and reconciliation")
print("Features: CSV indexing, image processing, transaction reconciliation")
print("UI calls: modal.Function.lookup('smartledger', 'create_index')")