Spaces:
Sleeping
Sleeping
""" | |
Optimized Modal functions with better container lifecycle management | |
""" | |
import modal | |
from typing import Dict, Any | |
from datetime import datetime | |
import requests | |
import base64 | |
import json | |
import os | |
app = modal.App("smartledger") | |
# Optimized image with model pre-loading | |
model_image = ( | |
modal.Image.debian_slim(python_version="3.11") | |
.pip_install([ | |
"sentence-transformers>=2.2.0", | |
"transformers>=4.35.0", | |
"torch>=2.0.0", | |
"pandas>=2.0.0", | |
"numpy>=1.24.0", | |
"llama-index>=0.10.0", | |
"scikit-learn>=1.3.0", | |
"accelerate>=0.20.0", | |
]) | |
.run_commands([ | |
# Pre-download models during build (faster cold starts) | |
"python -c 'from sentence_transformers import SentenceTransformer; SentenceTransformer(\"all-MiniLM-L6-v2\")'", | |
"python -c 'from transformers import AutoTokenizer, AutoModelForCausalLM; AutoTokenizer.from_pretrained(\"microsoft/DialoGPT-medium\"); AutoModelForCausalLM.from_pretrained(\"microsoft/DialoGPT-medium\")'", | |
]) | |
) | |
# Persistent storage that survives container restarts | |
session_storage = modal.Dict.from_name("smartledger-sessions", create_if_missing=True) | |
# Vision model configuration (now using Hyperbolic endpoint) | |
# Inject HYPERBOLIC_API_KEY as a Modal secret for all relevant functions | |
HYPERBOLIC_API_URL = "https://api.hyperbolic.xyz/v1/chat/completions" | |
HYPERBOLIC_MODEL_NAME = "Qwen/Qwen2.5-VL-7B-Instruct" | |
# Global variables to cache models | |
_embedding_model = None | |
_tokenizer = None | |
_llm_model = None | |
def _load_models(): | |
"""Load models once and cache globally""" | |
global _embedding_model, _tokenizer, _llm_model | |
if _embedding_model is None: | |
print("π Loading embedding model...") | |
from sentence_transformers import SentenceTransformer | |
_embedding_model = SentenceTransformer('all-MiniLM-L6-v2') | |
print("β Embedding model loaded") | |
if _tokenizer is None or _llm_model is None: | |
print("π Loading LLM model...") | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
_tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium") | |
_tokenizer.pad_token = _tokenizer.eos_token | |
_llm_model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-medium") | |
print("β LLM model loaded") | |
return _embedding_model, _tokenizer, _llm_model | |
def create_index(csv_data: str, session_id: str) -> Dict[str, Any]: | |
"""Create index using cached models""" | |
try: | |
import pandas as pd | |
from io import StringIO | |
# Load models | |
embedding_model, tokenizer, llm_model = _load_models() | |
print(f"π Creating index for session: {session_id}") | |
# Parse CSV | |
df = pd.read_csv(StringIO(csv_data)) | |
# Create embeddings | |
doc_texts = [] | |
doc_metadata = [] | |
for _, row in df.iterrows(): | |
doc_text = f"Date: {row.get('date', '')} Amount: ${row.get('amount', 0)} Description: {row.get('description', '')} Category: {row.get('category', '')}" | |
doc_texts.append(doc_text) | |
doc_metadata.append(row.to_dict()) | |
# Generate embeddings | |
print(f"π Generating embeddings for {len(doc_texts)} transactions...") | |
embeddings = embedding_model.encode(doc_texts) | |
# Store in persistent storage | |
index_data = { | |
"embeddings": embeddings.tolist(), | |
"doc_texts": doc_texts, | |
"doc_metadata": doc_metadata, | |
"total_transactions": len(doc_texts), | |
"created_at": datetime.now().isoformat() | |
} | |
session_storage[session_id] = index_data | |
session_storage[f"session_active_{session_id}"] = { | |
"session_id": session_id, | |
"created_at": datetime.now().isoformat(), | |
"status": "active" | |
} | |
print(f"β Index created for {len(doc_texts)} transactions") | |
return { | |
"status": "success", | |
"session_id": session_id, | |
"total_transactions": len(doc_texts), | |
"created_at": datetime.now().isoformat() | |
} | |
except Exception as e: | |
print(f"β Error creating index: {e}") | |
return {"status": "error", "error": str(e)} | |
def query_data(query: str, session_id: str) -> Dict[str, Any]: | |
"""Query using cached models""" | |
try: | |
import numpy as np | |
from sklearn.metrics.pairwise import cosine_similarity | |
import torch | |
# Load models | |
embedding_model, tokenizer, llm_model = _load_models() | |
print(f"π Processing query: '{query}' for session: {session_id}") | |
# Get session data | |
if session_id not in session_storage: | |
return {"status": "error", "error": "Session not found"} | |
session_data = session_storage[session_id] | |
# Generate query embedding | |
query_embedding = embedding_model.encode([query]) | |
# Find similar transactions | |
doc_embeddings = np.array(session_data["embeddings"]) | |
similarities = cosine_similarity(query_embedding, doc_embeddings)[0] | |
# Get top matches | |
top_indices = np.argsort(similarities)[::-1][:5] | |
matching_transactions = [] | |
relevant_context = [] | |
for idx in top_indices: | |
if similarities[idx] > 0.1: | |
doc_text = session_data["doc_texts"][idx] | |
metadata = session_data["doc_metadata"][idx] | |
matching_transactions.append({ | |
"text": doc_text, | |
"metadata": metadata, | |
"similarity": float(similarities[idx]) | |
}) | |
relevant_context.append(doc_text) | |
# Generate LLM analysis | |
llm_analysis = "" | |
if matching_transactions: | |
context = f"Financial Query: {query}\n\nRelevant Transactions:\n{chr(10).join(relevant_context[:3])}\n\nAnalysis:" | |
inputs = tokenizer.encode(context, return_tensors="pt", max_length=512, truncation=True) | |
with torch.no_grad(): | |
outputs = llm_model.generate( | |
inputs, | |
max_new_tokens=80, | |
temperature=0.7, | |
do_sample=True, | |
pad_token_id=tokenizer.eos_token_id | |
) | |
full_response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
llm_analysis = full_response[len(context):].strip() | |
# Calculate summary | |
total_amount = sum(float(t["metadata"].get("amount", 0)) for t in matching_transactions) | |
return { | |
"status": "success", | |
"query": query, | |
"llm_analysis": llm_analysis, | |
"matching_transactions": len(matching_transactions), | |
"total_amount": total_amount, | |
"results": matching_transactions, | |
"processed_at": datetime.now().isoformat() | |
} | |
except Exception as e: | |
print(f"β Error processing query: {e}") | |
return {"status": "error", "error": str(e)} | |
def check_health() -> Dict[str, Any]: | |
"""Quick health check""" | |
try: | |
# Load models | |
embedding_model, tokenizer, llm_model = _load_models() | |
# Test embedding model | |
_ = embedding_model.encode(["test transaction"]) | |
# Test LLM | |
_ = tokenizer.encode("Test", return_tensors="pt") | |
return { | |
"status": "healthy", | |
"embedding_model": "ready", | |
"llm_model": "ready", | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
return {"status": "unhealthy", "error": str(e)} | |
def process_image_transactions(image_data, session_id: str, filename: str = "uploaded_image") -> Dict[str, Any]: | |
""" | |
Process image to extract transaction data using Llama 3.2 Vision Instruct | |
Args: | |
image_data: File object or base64 encoded string | |
session_id: Session ID for storing extracted transactions | |
filename: Original filename for reference | |
Returns: | |
Dictionary containing extracted transactions and metadata | |
""" | |
try: | |
print(f"π· Processing image for session: {session_id}") | |
print(f"π Debug - image_data type: {type(image_data)}") | |
print(f"π Debug - image_data dir: {dir(image_data)}") | |
print(f"π Debug - hasattr read: {hasattr(image_data, 'read')}") | |
# Handle file object vs base64 string | |
if hasattr(image_data, 'read'): | |
# It's a file object, read and encode | |
print("π Reading file object...") | |
image_bytes = image_data.read() | |
print(f"π Read {len(image_bytes)} bytes") | |
image_b64 = base64.b64encode(image_bytes).decode('utf-8') | |
print(f"π€ Encoded to base64 length: {len(image_b64)}") | |
elif isinstance(image_data, str): | |
# It's already base64 encoded | |
print("π Using string as base64...") | |
image_b64 = image_data | |
else: | |
# It's raw bytes | |
print(f"π’ Converting bytes to base64... type: {type(image_data)}") | |
image_b64 = base64.b64encode(image_data).decode('utf-8') | |
# Create optimized prompt for transaction extraction | |
prompt = """You are a financial document analyzer. Extract transaction data from this bank statement, receipt, or financial document. | |
For each transaction you find, return ONLY a valid JSON array with this exact format: | |
[ | |
{ | |
"date": "YYYY-MM-DD", | |
"amount": 123.45, | |
"vendor": "Vendor Name", | |
"description": "Transaction description", | |
"type": "debit" | |
} | |
] | |
Rules: | |
- Extract ALL transactions visible in the image | |
- Use negative amounts for debits/expenses, positive for credits/income | |
- Parse dates to YYYY-MM-DD format | |
- Clean vendor names (remove extra spaces, standardize) | |
- Include meaningful descriptions | |
- Return ONLY the JSON array, no other text | |
- If no transactions found, return: [] | |
Analyze the image and extract all transaction data:""" | |
# Call Hyperbolic vision model endpoint | |
hyperbolic_api_key = os.environ["HYPERBOLIC_API_KEY"] | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {hyperbolic_api_key}", | |
} | |
payload = { | |
"messages": [{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": prompt}, | |
{ | |
"type": "image_url", | |
"image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}, | |
}, | |
], | |
}], | |
"model": HYPERBOLIC_MODEL_NAME, | |
"max_tokens": 2000, | |
"temperature": 0.1, | |
"top_p": 0.001, | |
} | |
print("π§ Calling Hyperbolic Vision model...") | |
print(f"Hyperbolic API URL: {HYPERBOLIC_API_URL}") | |
response = requests.post( | |
HYPERBOLIC_API_URL, | |
headers=headers, | |
json=payload, | |
timeout=60 | |
) | |
if response.status_code != 200: | |
return { | |
"status": "error", | |
"error": f"Vision model API failed: {response.status_code} - {response.text}", | |
"transactions": [], | |
"total_transactions": 0 | |
} | |
result = response.json() | |
response_text = result["choices"][0]["message"]["content"] | |
# Parse JSON response | |
try: | |
# Find JSON array in response | |
response_text = response_text.strip() | |
start_idx = response_text.find('[') | |
end_idx = response_text.rfind(']') + 1 | |
if start_idx == -1 or end_idx == 0: | |
print(f"β οΈ No JSON array found in response") | |
transactions = [] | |
else: | |
json_str = response_text[start_idx:end_idx] | |
transactions = json.loads(json_str) | |
# Clean and validate transactions | |
cleaned_transactions = [] | |
for txn in transactions: | |
if isinstance(txn, dict) and "amount" in txn and "vendor" in txn: | |
try: | |
txn["amount"] = float(txn["amount"]) | |
txn["source"] = "image_extraction" | |
txn["extracted_at"] = datetime.now().isoformat() | |
cleaned_transactions.append(txn) | |
except (ValueError, TypeError): | |
print(f"β οΈ Invalid transaction amount: {txn}") | |
transactions = cleaned_transactions | |
except json.JSONDecodeError as e: | |
print(f"β JSON parsing error: {e}") | |
transactions = [] | |
# Store extracted transactions for this session | |
image_session_key = f"{session_id}_image_transactions" | |
session_storage[image_session_key] = { | |
"transactions": transactions, | |
"total_transactions": len(transactions), | |
"filename": filename, | |
"extracted_at": datetime.now().isoformat(), | |
"raw_response": response_text[:500] if len(response_text) > 500 else response_text | |
} | |
print(f"β Extracted {len(transactions)} transactions from image") | |
return { | |
"status": "success", | |
"transactions": transactions, | |
"total_transactions": len(transactions), | |
"filename": filename, | |
"extracted_at": datetime.now().isoformat(), | |
"session_id": session_id | |
} | |
except Exception as e: | |
print(f"β Error processing image: {e}") | |
return { | |
"status": "error", | |
"error": str(e), | |
"transactions": [], | |
"total_transactions": 0, | |
"filename": filename, | |
"extracted_at": datetime.now().isoformat() | |
} | |
def reconcile_transactions(session_id: str) -> Dict[str, Any]: | |
""" | |
Reconcile image-extracted transactions with CSV ledger entries | |
Args: | |
session_id: Session ID containing both CSV and image transaction data | |
Returns: | |
Dictionary containing matches, unmatched transactions, and confidence scores | |
""" | |
try: | |
print(f"π Reconciling transactions for session: {session_id}") | |
# Get CSV transactions | |
if session_id not in session_storage: | |
return { | |
"status": "error", | |
"error": "No CSV data found for session. Please upload and analyze CSV first." | |
} | |
csv_data = session_storage[session_id] | |
csv_transactions = csv_data.get("doc_metadata", []) | |
# Get image transactions | |
image_session_key = f"{session_id}_image_transactions" | |
if image_session_key not in session_storage: | |
return { | |
"status": "error", | |
"error": "No image data found for session. Please upload and process an image first." | |
} | |
image_data = session_storage[image_session_key] | |
image_transactions = image_data.get("transactions", []) | |
if not csv_transactions or not image_transactions: | |
return { | |
"status": "error", | |
"error": f"Insufficient data: {len(csv_transactions)} CSV transactions, {len(image_transactions)} image transactions" | |
} | |
print(f"Reconciling {len(image_transactions)} image transactions with {len(csv_transactions)} CSV transactions") | |
# Simple reconciliation algorithm | |
matches = [] | |
matched_csv_indices = set() | |
matched_image_indices = set() | |
for i, image_txn in enumerate(image_transactions): | |
best_match = None | |
best_score = 0 | |
best_csv_idx = -1 | |
for j, csv_txn in enumerate(csv_transactions): | |
if j in matched_csv_indices: | |
continue | |
score = 0 | |
reasons = [] | |
discrepancies = [] | |
# Amount matching (40% weight) | |
try: | |
image_amount = float(image_txn.get("amount", 0)) | |
csv_amount = float(csv_txn.get("amount", 0)) | |
amount_diff = abs(image_amount - csv_amount) | |
if amount_diff <= 0.01: # Exact match | |
score += 0.4 | |
reasons.append(f"Exact amount match: ${csv_amount}") | |
elif amount_diff <= max(0.01, abs(csv_amount) * 0.02): # 2% tolerance | |
score += 0.35 | |
reasons.append(f"Close amount match: ${csv_amount} β ${image_amount}") | |
elif amount_diff <= abs(csv_amount) * 0.05: # 5% tolerance | |
score += 0.2 | |
reasons.append(f"Approximate amount match: ${csv_amount} β ${image_amount}") | |
else: | |
discrepancies.append(f"Amount difference: ${csv_amount} vs ${image_amount}") | |
except: | |
discrepancies.append("Invalid amount data") | |
# Date matching (25% weight) - simplified | |
try: | |
from datetime import datetime | |
csv_date_str = str(csv_txn.get("date", "")) | |
image_date_str = str(image_txn.get("date", "")) | |
if csv_date_str[:10] == image_date_str[:10]: # Same date (YYYY-MM-DD) | |
score += 0.25 | |
reasons.append(f"Exact date match: {csv_date_str[:10]}") | |
elif abs(len(csv_date_str) - len(image_date_str)) <= 3: # Close dates | |
score += 0.15 | |
reasons.append(f"Similar dates: {csv_date_str[:10]} β {image_date_str[:10]}") | |
else: | |
discrepancies.append(f"Date difference: {csv_date_str[:10]} vs {image_date_str[:10]}") | |
except: | |
discrepancies.append("Invalid date data") | |
# Vendor matching (25% weight) - simplified | |
try: | |
csv_vendor = str(csv_txn.get("vendor", "")).upper().strip() | |
image_vendor = str(image_txn.get("vendor", "")).upper().strip() | |
if csv_vendor == image_vendor: | |
score += 0.25 | |
reasons.append(f"Exact vendor match: {csv_vendor}") | |
elif csv_vendor in image_vendor or image_vendor in csv_vendor: | |
score += 0.2 | |
reasons.append(f"Partial vendor match: {csv_vendor} β {image_vendor}") | |
elif len(set(csv_vendor.split()).intersection(set(image_vendor.split()))) > 0: | |
score += 0.1 | |
reasons.append(f"Vendor keyword match: {csv_vendor} β {image_vendor}") | |
else: | |
discrepancies.append(f"Vendor difference: {csv_vendor} vs {image_vendor}") | |
except: | |
discrepancies.append("Invalid vendor data") | |
# Description matching (10% weight) - simplified | |
try: | |
csv_desc = str(csv_txn.get("description", "")).upper().strip() | |
image_desc = str(image_txn.get("description", "")).upper().strip() | |
if csv_desc and image_desc: | |
if csv_desc == image_desc: | |
score += 0.1 | |
reasons.append(f"Description match: {csv_desc}") | |
elif csv_desc in image_desc or image_desc in csv_desc: | |
score += 0.05 | |
reasons.append(f"Partial description match") | |
except: | |
pass | |
# Store best match for this image transaction | |
if score > best_score and score > 0.3: # Minimum threshold | |
best_score = score | |
best_match = { | |
"csv_transaction": csv_txn, | |
"image_transaction": image_txn, | |
"confidence_score": round(score, 3), | |
"match_reasons": reasons, | |
"discrepancies": discrepancies, | |
"match_type": "high" if score >= 0.85 else "medium" if score >= 0.65 else "low" | |
} | |
best_csv_idx = j | |
# Add best match if found | |
if best_match: | |
matches.append(best_match) | |
matched_csv_indices.add(best_csv_idx) | |
matched_image_indices.add(i) | |
# Calculate unmatched transactions | |
unmatched_csv = [csv_transactions[i] for i in range(len(csv_transactions)) if i not in matched_csv_indices] | |
unmatched_image = [image_transactions[i] for i in range(len(image_transactions)) if i not in matched_image_indices] | |
# Categorize matches | |
high_confidence = [m for m in matches if m["confidence_score"] >= 0.85] | |
medium_confidence = [m for m in matches if 0.65 <= m["confidence_score"] < 0.85] | |
low_confidence = [m for m in matches if m["confidence_score"] < 0.65] | |
# Calculate summary | |
match_rate = len(matches) / len(image_transactions) if image_transactions else 0 | |
total_image_amount = sum(float(txn.get("amount", 0)) for txn in image_transactions) | |
total_matched_amount = sum(float(m["image_transaction"].get("amount", 0)) for m in matches) | |
result = { | |
"status": "success", | |
"summary": { | |
"total_image_transactions": len(image_transactions), | |
"total_csv_transactions": len(csv_transactions), | |
"total_matches": len(matches), | |
"match_rate": round(match_rate * 100, 1), | |
"high_confidence_matches": len(high_confidence), | |
"medium_confidence_matches": len(medium_confidence), | |
"low_confidence_matches": len(low_confidence), | |
"unmatched_image_transactions": len(unmatched_image), | |
"unmatched_csv_transactions": len(unmatched_csv), | |
"total_image_amount": round(total_image_amount, 2), | |
"total_matched_amount": round(total_matched_amount, 2), | |
"reconciliation_percentage": round((total_matched_amount / total_image_amount * 100) if total_image_amount else 0, 1) | |
}, | |
"matches": { | |
"high_confidence": high_confidence, | |
"medium_confidence": medium_confidence, | |
"low_confidence": low_confidence | |
}, | |
"unmatched": { | |
"image_transactions": unmatched_image, | |
"csv_transactions": unmatched_csv | |
}, | |
"reconciled_at": datetime.now().isoformat(), | |
"session_id": session_id | |
} | |
# Store reconciliation results | |
reconciliation_key = f"{session_id}_reconciliation" | |
session_storage[reconciliation_key] = result | |
print(f"β Reconciliation complete: {len(matches)} matches found ({match_rate*100:.1f}% match rate)") | |
return result | |
except Exception as e: | |
print(f"β Error during reconciliation: {e}") | |
return { | |
"status": "error", | |
"error": str(e), | |
"summary": {}, | |
"matches": {"high_confidence": [], "medium_confidence": [], "low_confidence": []}, | |
"unmatched": {"image_transactions": [], "csv_transactions": []}, | |
"reconciled_at": datetime.now().isoformat() | |
} | |
def list_sessions(): | |
"""Debug function to list all active sessions""" | |
try: | |
return { | |
"status": "success", | |
"sessions": dict(session_storage), | |
"session_count": len(session_storage) | |
} | |
except Exception as e: | |
return {"status": "error", "error": str(e)} | |
if __name__ == "__main__": | |
print("SmartLedger Modal functions with image processing and reconciliation") | |
print("Features: CSV indexing, image processing, transaction reconciliation") | |
print("UI calls: modal.Function.lookup('smartledger', 'create_index')") |