mlbench123's picture
Update app.py
ce5f14e verified
"""
FastAPI Service for Construction Scope Validation - FIXED VERSION
Includes semantic validation to prevent wrong tasks being assigned to stages
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any, Tuple
import json
import numpy as np
import os
import torch
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import re
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True
torch.set_float32_matmul_precision('high')
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:512'
app = FastAPI(
title="Construction Scope Validator API - Fixed",
description="Validates with semantic task-stage checking",
version="2.1.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============= MODEL LOADING =============
print("="*60)
print("LOADING MODEL...")
print("="*60)
def setup_model_structure():
if not os.path.exists('1_Pooling') or not os.path.exists('2_Normalize'):
print("Creating temporary model structure...")
os.makedirs('1_Pooling', exist_ok=True)
os.makedirs('2_Normalize', exist_ok=True)
pooling_config = {
"word_embedding_dimension": 384,
"pooling_mode_cls_token": False,
"pooling_mode_mean_tokens": True,
"pooling_mode_max_tokens": False,
"pooling_mode_mean_sqrt_len_tokens": False
}
with open('1_Pooling/config.json', 'w') as f:
json.dump(pooling_config, f, indent=2)
with open('2_Normalize/config.json', 'w') as f:
json.dump({}, f)
print("✓ Created model structure")
setup_model_structure()
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"GPU device: {torch.cuda.get_device_name(0)}")
try:
model_files = ['config.json', 'sentence_bert_config.json']
has_weights = os.path.exists('pytorch_model.bin') or os.path.exists('model.safetensors')
has_model = all(os.path.exists(f) for f in model_files) and has_weights
if has_model:
print("✓ Loading trained model...")
embedding_model = SentenceTransformer('./', device='cuda')
print("✅ Trained model loaded!")
else:
print("⚠️ Loading base model...")
embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cuda')
print("✅ Base model loaded!")
except Exception as e:
print(f"❌ Error: {e}")
embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cuda')
BATCH_SIZE = 4096
print(f"✓ Batch Size: {BATCH_SIZE}")
print("="*60)
# ============= DATA MODELS =============
class ScopeItem(BaseModel):
stage: str
task: str
material: str
quantity: float
unit: str
# Enrichment fields
stageId: Optional[int] = None
taskId: Optional[int] = None
materialId: Optional[int] = None
stage_confidence: Optional[float] = None
task_confidence: Optional[float] = None
material_confidence: Optional[float] = None
validated_stage: Optional[str] = None
validated_task: Optional[str] = None
validated_material: Optional[str] = None
material_price: Optional[float] = None
material_margin: Optional[float] = None
# NEW: Validation flags
task_semantic_valid: Optional[bool] = None
task_database_stageId: Optional[int] = None
class AreaScope(BaseModel):
area: str
items: List[ScopeItem]
roomId: Optional[int] = None
roomType: Optional[str] = None
area_confidence: Optional[float] = None
validated_area: Optional[str] = None
class ScopeRequest(BaseModel):
scope_of_work: List[AreaScope]
class ScopeResponse(BaseModel):
scope_of_work: List[AreaScope]
metadata: Optional[Dict[str, Any]] = None
# ============= HELPER FUNCTIONS =============
def parse_room_area(room_area_value):
if room_area_value is None:
return []
if isinstance(room_area_value, list):
return room_area_value
if isinstance(room_area_value, str):
try:
parsed = json.loads(room_area_value)
if isinstance(parsed, list):
return parsed
return [str(parsed)]
except json.JSONDecodeError:
return [room_area_value]
return [str(room_area_value)]
# ============= DATABASE LOADER =============
class DatabaseLoader:
def __init__(self):
self.stages = []
self.tasks = []
self.materials = []
self.rooms = []
self.stage_embeddings = None
self.task_embeddings = None
self.material_embeddings = None
def load_data(self, stages_file: str, tasks_file: str, materials_file: str, rooms_file: str):
print(f"Loading {stages_file}...")
with open(stages_file, 'r', encoding='utf-8') as f:
self.stages = [json.loads(line) for line in f if line.strip()]
print(f"Loading {tasks_file}...")
with open(tasks_file, 'r', encoding='utf-8') as f:
self.tasks = [json.loads(line) for line in f if line.strip()]
print(f"Loading {materials_file}...")
with open(materials_file, 'r', encoding='utf-8') as f:
self.materials = [json.loads(line) for line in f if line.strip()]
print(f"Loading {rooms_file}...")
with open(rooms_file, 'r', encoding='utf-8') as f:
self.rooms = [json.loads(line) for line in f if line.strip()]
print(f"✅ Loaded: {len(self.stages)} stages, {len(self.tasks)} tasks, "
f"{len(self.materials)} materials, {len(self.rooms)} rooms")
def initialize_embeddings(self):
print("\n" + "="*60)
print("INITIALIZING EMBEDDINGS")
print("="*60)
print(f"Computing stage embeddings...")
stage_texts = [s['stage'] for s in self.stages]
self.stage_embeddings = embedding_model.encode(
stage_texts,
batch_size=BATCH_SIZE,
show_progress_bar=True,
convert_to_numpy=True,
normalize_embeddings=True
)
print(f"Computing task embeddings...")
task_texts = [t['task'] for t in self.tasks]
self.task_embeddings = embedding_model.encode(
task_texts,
batch_size=BATCH_SIZE,
show_progress_bar=True,
convert_to_numpy=True,
normalize_embeddings=True
)
print(f"Computing material embeddings...")
material_texts = [m['material'] for m in self.materials]
self.material_embeddings = embedding_model.encode(
material_texts,
batch_size=BATCH_SIZE,
show_progress_bar=True,
convert_to_numpy=True,
normalize_embeddings=True
)
print("="*60)
print("✅ Embeddings ready!")
print("="*60)
db = DatabaseLoader()
# ============= SEMANTIC VALIDATOR =============
class SemanticValidator:
"""Validates if tasks semantically belong to stages"""
def __init__(self):
pass
def validate_task_for_stage(self, task: dict, stage: dict,
task_confidence: float) -> Tuple[bool, float]:
"""Check if task semantically belongs to stage"""
# Get embeddings
stage_idx = next((i for i, s in enumerate(db.stages)
if s['stageId'] == stage['stageId']), None)
task_idx = next((i for i, t in enumerate(db.tasks)
if t['taskId'] == task['taskId']), None)
if stage_idx is None or task_idx is None:
return False, 0.0
stage_emb = db.stage_embeddings[stage_idx].reshape(1, -1)
task_emb = db.task_embeddings[task_idx].reshape(1, -1)
semantic_similarity = cosine_similarity(stage_emb, task_emb)[0][0]
# Threshold for semantic belonging
SEMANTIC_THRESHOLD = 0.25 # Lowered for more lenient matching
if semantic_similarity < SEMANTIC_THRESHOLD:
return False, 0.0
# Adjust confidence
adjusted_confidence = task_confidence * min(semantic_similarity / 0.4, 1.0)
return True, adjusted_confidence
validator = SemanticValidator()
# ============= MATCHING FUNCTIONS =============
def find_best_stage(llm_stage: str, threshold: float = 0.5) -> tuple:
query_embedding = embedding_model.encode(
[llm_stage],
batch_size=BATCH_SIZE,
convert_to_numpy=True,
normalize_embeddings=True
)
similarities = cosine_similarity(query_embedding, db.stage_embeddings)[0]
best_idx = np.argmax(similarities)
best_score = similarities[best_idx]
if best_score >= threshold:
return db.stages[best_idx], best_score
return None, 0.0
def find_best_room(llm_area: str, threshold: float = 0.6) -> tuple:
llm_area_lower = llm_area.lower()
for room in db.rooms:
if room['name'].lower() == llm_area_lower:
return room, 1.0
room_texts = [r['name'] for r in db.rooms]
query_embedding = embedding_model.encode(
[llm_area],
batch_size=BATCH_SIZE,
convert_to_numpy=True,
normalize_embeddings=True
)
room_embeddings = embedding_model.encode(
room_texts,
batch_size=BATCH_SIZE,
convert_to_numpy=True,
normalize_embeddings=True
)
similarities = cosine_similarity(query_embedding, room_embeddings)[0]
best_idx = np.argmax(similarities)
best_score = similarities[best_idx]
if best_score >= threshold:
return db.rooms[best_idx], best_score
return None, 0.0
def find_best_task_with_semantic_validation(
stage_id: int,
llm_task: str,
stage: dict,
fallback_to_global: bool = True
) -> Tuple[Optional[dict], float, bool, Optional[int]]:
"""
Enhanced task matching with semantic validation
Returns: (task, confidence, is_semantically_valid, original_db_stageId)
"""
# Try stage-specific tasks first
stage_tasks = [t for t in db.tasks if t['stageId'] == stage_id]
if stage_tasks:
task_indices = [db.tasks.index(t) for t in stage_tasks]
query_embedding = embedding_model.encode(
[llm_task],
batch_size=1,
convert_to_numpy=True,
normalize_embeddings=True
)
stage_task_embeddings = db.task_embeddings[task_indices]
similarities = cosine_similarity(query_embedding, stage_task_embeddings)[0]
# Get top 3 candidates
top_indices = np.argsort(similarities)[-3:][::-1]
for idx in top_indices:
candidate_task = stage_tasks[idx]
candidate_confidence = similarities[idx]
# Validate semantically
is_valid, adjusted_confidence = validator.validate_task_for_stage(
candidate_task, stage, candidate_confidence
)
if is_valid and adjusted_confidence > 0.35:
return (candidate_task, adjusted_confidence, True,
candidate_task['stageId'])
# Fallback: Search ALL tasks
if fallback_to_global:
query_embedding = embedding_model.encode(
[llm_task],
batch_size=1,
convert_to_numpy=True,
normalize_embeddings=True
)
all_similarities = cosine_similarity(query_embedding, db.task_embeddings)[0]
top_global_indices = np.argsort(all_similarities)[-5:][::-1]
for idx in top_global_indices:
candidate_task = db.tasks[idx]
candidate_confidence = all_similarities[idx]
# Validate with our matched stage
is_valid, adjusted_confidence = validator.validate_task_for_stage(
candidate_task, stage, candidate_confidence
)
if is_valid and adjusted_confidence > 0.3:
return (candidate_task, adjusted_confidence, True,
candidate_task['stageId'])
return None, 0.0, False, None
def extract_keywords(text: str) -> List[str]:
stop_words = {'and', 'or', 'the', 'to', 'a', 'of', 'for', 'in', 'on', 'supply', 'install'}
words = re.findall(r'\b\w+\b', text.lower())
return [w for w in words if w not in stop_words and len(w) > 2]
def find_best_material(task: dict, llm_material: str, unit: str) -> tuple:
"""Find single best material for task"""
task_keywords = extract_keywords(task['task'])
llm_keywords = extract_keywords(llm_material)
all_keywords = set(task_keywords + llm_keywords)
compatible_materials = [
m for m in db.materials
if m['unit'] == unit or m['unit'] == 'unit' or m['unit'] is None
]
if not compatible_materials:
compatible_materials = db.materials
query_embedding = embedding_model.encode(
[llm_material],
batch_size=1,
convert_to_numpy=True,
normalize_embeddings=True
)
scored_materials = []
for material in compatible_materials:
score = 0.0
material_text = material['material'].lower()
for keyword in all_keywords:
if keyword in material_text:
score += 2.0
categories_str = ' '.join(material.get('categories', [])).lower()
for keyword in all_keywords:
if keyword in categories_str:
score += 1.0
material_idx = db.materials.index(material)
material_embedding = db.material_embeddings[material_idx].reshape(1, -1)
semantic_score = cosine_similarity(query_embedding, material_embedding)[0][0]
score += semantic_score * 5.0
if score > 0:
scored_materials.append((material, score))
if not scored_materials:
return None, 0.0
scored_materials.sort(key=lambda x: x[1], reverse=True)
return scored_materials[0]
# ============= VALIDATION PIPELINE =============
def validate_scope(request: ScopeRequest) -> ScopeResponse:
"""Validate and enrich scope with semantic validation"""
enriched_areas = []
semantic_mismatches = 0
for area_scope in request.scope_of_work:
matched_room, room_confidence = find_best_room(area_scope.area)
enriched_items = []
for item in area_scope.items:
enriched_item = item.model_copy()
# Match stage
matched_stage, stage_confidence = find_best_stage(item.stage)
if matched_stage:
enriched_item.stageId = matched_stage['stageId']
enriched_item.validated_stage = matched_stage['stage']
enriched_item.stage_confidence = round(stage_confidence, 2)
# Match task with semantic validation
(matched_task, task_confidence,
is_semantic_valid, db_stage_id) = find_best_task_with_semantic_validation(
matched_stage['stageId'],
item.task,
matched_stage,
fallback_to_global=True
)
if matched_task:
enriched_item.taskId = matched_task['taskId']
enriched_item.validated_task = matched_task['task']
enriched_item.task_confidence = round(task_confidence, 2)
enriched_item.task_semantic_valid = is_semantic_valid
enriched_item.task_database_stageId = db_stage_id
if not is_semantic_valid:
semantic_mismatches += 1
# Match material
matched_material, material_score = find_best_material(
matched_task,
item.material,
item.unit
)
if matched_material:
enriched_item.materialId = matched_material['materialId']
enriched_item.validated_material = matched_material['material']
enriched_item.material_confidence = round(material_score / 10.0, 2)
enriched_item.material_price = float(matched_material['price'])
enriched_item.material_margin = float(matched_material['margin'])
enriched_item.material = matched_material['material']
enriched_items.append(enriched_item)
enriched_area = AreaScope(
area=area_scope.area,
items=enriched_items,
roomId=matched_room['id'] if matched_room else None,
roomType=matched_room['roomType'] if matched_room else None,
validated_area=matched_room['name'] if matched_room else area_scope.area,
area_confidence=round(room_confidence, 2) if matched_room else 0.0
)
enriched_areas.append(enriched_area)
# Calculate metadata
total_items = sum(len(area.items) for area in enriched_areas)
validated_stages = sum(1 for area in enriched_areas for item in area.items if item.stageId)
validated_tasks = sum(1 for area in enriched_areas for item in area.items if item.taskId)
validated_materials = sum(1 for area in enriched_areas for item in area.items if item.materialId)
metadata = {
'total_areas': len(enriched_areas),
'total_items': total_items,
'validated_stages': validated_stages,
'validated_tasks': validated_tasks,
'validated_materials': validated_materials,
'semantic_mismatches': semantic_mismatches,
'validation_rate': {
'stages': round(validated_stages / total_items * 100, 1) if total_items > 0 else 0,
'tasks': round(validated_tasks / total_items * 100, 1) if total_items > 0 else 0,
'materials': round(validated_materials / total_items * 100, 1) if total_items > 0 else 0
}
}
return ScopeResponse(scope_of_work=enriched_areas, metadata=metadata)
# ============= API ENDPOINTS =============
@app.get("/")
async def root():
return {
"service": "Construction Scope Validator - FIXED",
"version": "2.1.0",
"status": "running",
"features": ["semantic_task_validation", "fallback_search"],
"data_loaded": len(db.stages) > 0,
"model_type": "trained" if os.path.exists('model.safetensors') else "base",
"gpu_enabled": torch.cuda.is_available(),
"batch_size": BATCH_SIZE
}
@app.get("/health")
async def health():
return {
"status": "healthy",
"stages_loaded": len(db.stages),
"tasks_loaded": len(db.tasks),
"materials_loaded": len(db.materials),
"rooms_loaded": len(db.rooms),
"embeddings_ready": db.stage_embeddings is not None
}
@app.post("/validate", response_model=ScopeResponse)
async def validate_scope_endpoint(request: ScopeRequest):
"""Validate with semantic checking"""
try:
if not db.stages:
raise HTTPException(status_code=500, detail="Database not loaded")
result = validate_scope(request)
return result
except Exception as e:
import traceback
error_detail = f"Validation error: {str(e)}\n{traceback.format_exc()}"
raise HTTPException(status_code=500, detail=error_detail)
@app.post("/validate-simple", response_model=ScopeRequest)
async def validate_scope_simple(request: ScopeRequest):
"""Returns only enriched scope without metadata"""
try:
if not db.stages:
raise HTTPException(status_code=500, detail="Database not loaded")
result = validate_scope(request)
return ScopeRequest(scope_of_work=result.scope_of_work)
except Exception as e:
import traceback
error_detail = f"Validation error: {str(e)}\n{traceback.format_exc()}"
raise HTTPException(status_code=500, detail=error_detail)
# ============= STARTUP =============
@app.on_event("startup")
async def startup_event():
try:
print("\n" + "="*60)
print("STARTING UP - FIXED VERSION")
print("="*60)
if torch.cuda.is_available():
print(f"\n🚀 GPU ENABLED: {torch.cuda.get_device_name(0)}")
db.load_data(
stages_file='stages.json',
tasks_file='tasks.json',
materials_file='materials.json',
rooms_file='rooms.json'
)
db.initialize_embeddings()
print("\n✅ SERVICE READY WITH SEMANTIC VALIDATION!")
print("="*60)
except Exception as e:
print(f"\n❌ STARTUP ERROR: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
# """
# FastAPI Service for Construction Scope Validation
# Deploy on Hugging Face Spaces - Flattened File Structure
# """
# from fastapi import FastAPI, HTTPException
# from fastapi.middleware.cors import CORSMiddleware
# from pydantic import BaseModel, Field
# from typing import List, Optional, Dict, Any
# import json
# import numpy as np
# import os
# import shutil
# from sentence_transformers import SentenceTransformer
# from sklearn.metrics.pairwise import cosine_similarity
# import re
# app = FastAPI(
# title="Construction Scope Validator API",
# description="Validates and enriches LLM-generated construction scope with DB data",
# version="1.0.0"
# )
# #---------------------------
# # CORS middleware
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"],
# allow_credentials=True,
# allow_methods=["*"],
# allow_headers=["*"],
# )
# # ============= MODEL LOADING WITH FLAT STRUCTURE =============
# print("="*60)
# print("LOADING MODEL...")
# print("="*60)
# def setup_model_structure():
# """
# Create temporary folder structure for sentence-transformers
# if files are in root (flattened structure)
# """
# # Check if we need to create structure
# if not os.path.exists('1_Pooling') or not os.path.exists('2_Normalize'):
# print("Creating temporary model structure...")
# # Create directories
# os.makedirs('1_Pooling', exist_ok=True)
# os.makedirs('2_Normalize', exist_ok=True)
# # Pooling config
# pooling_config = {
# "word_embedding_dimension": 384,
# "pooling_mode_cls_token": False,
# "pooling_mode_mean_tokens": True,
# "pooling_mode_max_tokens": False,
# "pooling_mode_mean_sqrt_len_tokens": False
# }
# with open('1_Pooling/config.json', 'w') as f:
# json.dump(pooling_config, f, indent=2)
# # Normalize config (empty is fine)
# with open('2_Normalize/config.json', 'w') as f:
# json.dump({}, f)
# print("✓ Created 1_Pooling/config.json")
# print("✓ Created 2_Normalize/config.json")
# # Setup structure before loading model
# setup_model_structure()
# try:
# model_files = ['config.json', 'sentence_bert_config.json']
# has_weights = os.path.exists('pytorch_model.bin') or os.path.exists('model.safetensors')
# has_model = all(os.path.exists(f) for f in model_files) and has_weights
# if has_model:
# print("✓ Model files found in root directory")
# print("Loading trained model...")
# embedding_model = SentenceTransformer('./', device='cpu')
# print("✅ Trained model loaded successfully!")
# else:
# print("⚠️ Model not found, using base model...")
# embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
# print("✅ Base model loaded successfully!")
# except Exception as e:
# print(f"❌ Error loading trained model: {e}")
# print("Falling back to base model...")
# embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
# print("✅ Base model loaded successfully!")
# print("="*60)
# # ============= DATA MODELS =============
# class LLMScopeItem(BaseModel):
# stage: str
# task: str
# material: str
# quantity: float
# unit: str
# class LLMAreaScope(BaseModel):
# area: str
# items: List[LLMScopeItem]
# class LLMScopeRequest(BaseModel):
# scope_of_work: List[LLMAreaScope]
# class ValidatedMaterial(BaseModel):
# materialId: int
# name: str
# material: str
# unit: str
# price: float
# margin: float
# categories: List[str]
# confidence_score: float
# class ValidatedTask(BaseModel):
# taskId: int
# task: str
# displayName: str
# unit: str
# stageId: int
# roomArea: List[str]
# confidence_score: float
# recommended_materials: List[ValidatedMaterial]
# class ValidatedStage(BaseModel):
# stageId: int
# stage: str
# priority: int
# confidence_score: float
# tasks: List[ValidatedTask]
# class ValidatedArea(BaseModel):
# roomId: Optional[int]
# name: str
# roomType: str
# matched: bool
# confidence_score: float
# stages: List[ValidatedStage]
# class ValidatedResponse(BaseModel):
# areas: List[ValidatedArea]
# summary: Dict[str, Any]
# # ============= HELPER FUNCTION =============
# def parse_room_area(room_area_value):
# """Parse roomArea field which might be a string, list, or None"""
# if room_area_value is None:
# return []
# if isinstance(room_area_value, list):
# return room_area_value
# if isinstance(room_area_value, str):
# try:
# parsed = json.loads(room_area_value)
# if isinstance(parsed, list):
# return parsed
# return [str(parsed)]
# except json.JSONDecodeError:
# return [room_area_value]
# return [str(room_area_value)]
# # ============= DATABASE LOADERS =============
# class DatabaseLoader:
# def __init__(self):
# self.stages = []
# self.tasks = []
# self.materials = []
# self.rooms = []
# self.stage_embeddings = None
# self.task_embeddings = None
# self.material_embeddings = None
# def load_data(self, stages_file: str, tasks_file: str, materials_file: str, rooms_file: str):
# """Load JSON data files"""
# print(f"Loading {stages_file}...")
# with open(stages_file, 'r', encoding='utf-8') as f:
# self.stages = [json.loads(line) for line in f if line.strip()]
# print(f"Loading {tasks_file}...")
# with open(tasks_file, 'r', encoding='utf-8') as f:
# self.tasks = [json.loads(line) for line in f if line.strip()]
# print(f"Loading {materials_file}...")
# with open(materials_file, 'r', encoding='utf-8') as f:
# self.materials = [json.loads(line) for line in f if line.strip()]
# print(f"Loading {rooms_file}...")
# with open(rooms_file, 'r', encoding='utf-8') as f:
# self.rooms = [json.loads(line) for line in f if line.strip()]
# print(f"✅ Loaded: {len(self.stages)} stages, {len(self.tasks)} tasks, "
# f"{len(self.materials)} materials, {len(self.rooms)} rooms")
# def initialize_embeddings(self):
# """Pre-compute embeddings for fast lookup"""
# print("Computing stage embeddings...")
# stage_texts = [s['stage'] for s in self.stages]
# self.stage_embeddings = embedding_model.encode(stage_texts, show_progress_bar=True)
# print("Computing task embeddings...")
# task_texts = [t['task'] for t in self.tasks]
# self.task_embeddings = embedding_model.encode(task_texts, show_progress_bar=True)
# print("Computing material embeddings...")
# material_texts = [m['material'] for m in self.materials]
# self.material_embeddings = embedding_model.encode(material_texts, show_progress_bar=True)
# print("✅ Embeddings ready!")
# # Global DB instance
# db = DatabaseLoader()
# # ============= MATCHING FUNCTIONS =============
# def find_best_stage(llm_stage: str, threshold: float = 0.5) -> tuple:
# """Find closest matching stage from DB"""
# query_embedding = embedding_model.encode([llm_stage])
# similarities = cosine_similarity(query_embedding, db.stage_embeddings)[0]
# best_idx = np.argmax(similarities)
# best_score = similarities[best_idx]
# if best_score >= threshold:
# return db.stages[best_idx], best_score
# return None, 0.0
# def find_best_room(llm_area: str, threshold: float = 0.6) -> tuple:
# """Find closest matching room from DB"""
# llm_area_lower = llm_area.lower()
# for room in db.rooms:
# if room['name'].lower() == llm_area_lower:
# return room, 1.0
# room_texts = [r['name'] for r in db.rooms]
# query_embedding = embedding_model.encode([llm_area])
# room_embeddings = embedding_model.encode(room_texts)
# similarities = cosine_similarity(query_embedding, room_embeddings)[0]
# best_idx = np.argmax(similarities)
# best_score = similarities[best_idx]
# if best_score >= threshold:
# return db.rooms[best_idx], best_score
# return None, 0.0
# def find_tasks_for_stage(stage_id: int, llm_task: str, top_k: int = 5) -> List[tuple]:
# """Find relevant tasks for a stage matching LLM task description"""
# stage_tasks = [t for t in db.tasks if t['stageId'] == stage_id]
# if not stage_tasks:
# return []
# task_indices = [db.tasks.index(t) for t in stage_tasks]
# query_embedding = embedding_model.encode([llm_task])
# stage_task_embeddings = db.task_embeddings[task_indices]
# similarities = cosine_similarity(query_embedding, stage_task_embeddings)[0]
# top_indices = np.argsort(similarities)[-top_k:][::-1]
# results = [(stage_tasks[idx], similarities[idx]) for idx in top_indices]
# return results
# def extract_keywords(text: str) -> List[str]:
# """Extract meaningful keywords from text"""
# stop_words = {'and', 'or', 'the', 'to', 'a', 'of', 'for', 'in', 'on', 'supply', 'install'}
# words = re.findall(r'\b\w+\b', text.lower())
# return [w for w in words if w not in stop_words and len(w) > 2]
# def find_materials_for_task(task: dict, llm_material: str, unit: str, top_k: int = 10) -> List[tuple]:
# """Find materials matching task requirements"""
# task_keywords = extract_keywords(task['task'])
# llm_keywords = extract_keywords(llm_material)
# all_keywords = set(task_keywords + llm_keywords)
# compatible_materials = [
# m for m in db.materials
# if m['unit'] == unit or m['unit'] == 'unit' or m['unit'] is None
# ]
# if not compatible_materials:
# compatible_materials = db.materials
# scored_materials = []
# for material in compatible_materials:
# score = 0.0
# material_text = material['material'].lower()
# for keyword in all_keywords:
# if keyword in material_text:
# score += 2.0
# categories_str = ' '.join(material.get('categories', [])).lower()
# for keyword in all_keywords:
# if keyword in categories_str:
# score += 1.0
# material_idx = db.materials.index(material)
# query_embedding = embedding_model.encode([llm_material])
# material_embedding = db.material_embeddings[material_idx].reshape(1, -1)
# semantic_score = cosine_similarity(query_embedding, material_embedding)[0][0]
# score += semantic_score * 5.0
# if score > 0:
# scored_materials.append((material, score))
# scored_materials.sort(key=lambda x: x[1], reverse=True)
# return scored_materials[:top_k]
# # ============= VALIDATION PIPELINE =============
# def validate_scope(llm_scope: LLMScopeRequest) -> ValidatedResponse:
# """Main validation pipeline"""
# validated_areas = []
# for area_scope in llm_scope.scope_of_work:
# matched_room, room_confidence = find_best_room(area_scope.area)
# validated_stages_dict = {}
# for item in area_scope.items:
# matched_stage, stage_confidence = find_best_stage(item.stage)
# if not matched_stage:
# continue
# stage_id = matched_stage['stageId']
# if stage_id not in validated_stages_dict:
# validated_stages_dict[stage_id] = {
# 'stage_data': matched_stage,
# 'confidence': stage_confidence,
# 'tasks': []
# }
# task_matches = find_tasks_for_stage(stage_id, item.task, top_k=3)
# if not task_matches:
# continue
# best_task, task_confidence = task_matches[0]
# material_matches = find_materials_for_task(
# best_task, item.material, item.unit, top_k=5
# )
# validated_materials = [
# ValidatedMaterial(
# materialId=m['materialId'],
# name=m['name'],
# material=m['material'],
# unit=m['unit'] or 'unit',
# price=float(m['price']),
# margin=float(m['margin']),
# categories=m['categories'],
# confidence_score=round(score / 10.0, 2)
# )
# for m, score in material_matches
# ]
# validated_task = ValidatedTask(
# taskId=best_task['taskId'],
# task=best_task['task'],
# displayName=best_task['displayName'],
# unit=best_task['unit'],
# stageId=best_task['stageId'],
# roomArea=parse_room_area(best_task['roomArea']),
# confidence_score=round(task_confidence, 2),
# recommended_materials=validated_materials
# )
# validated_stages_dict[stage_id]['tasks'].append(validated_task)
# validated_stages = [
# ValidatedStage(
# stageId=stage_data['stage_data']['stageId'],
# stage=stage_data['stage_data']['stage'],
# priority=stage_data['stage_data']['priority'],
# confidence_score=round(stage_data['confidence'], 2),
# tasks=stage_data['tasks']
# )
# for stage_data in validated_stages_dict.values()
# ]
# validated_stages.sort(key=lambda x: x.priority)
# validated_area = ValidatedArea(
# roomId=matched_room['id'] if matched_room else None,
# name=matched_room['name'] if matched_room else area_scope.area,
# roomType=matched_room['roomType'] if matched_room else 'unknown',
# matched=matched_room is not None,
# confidence_score=round(room_confidence, 2),
# stages=validated_stages
# )
# validated_areas.append(validated_area)
# summary = {
# 'total_areas': len(validated_areas),
# 'total_stages': sum(len(a.stages) for a in validated_areas),
# 'total_tasks': sum(len(s.tasks) for a in validated_areas for s in a.stages),
# 'total_materials': sum(
# len(t.recommended_materials)
# for a in validated_areas
# for s in a.stages
# for t in s.tasks
# ),
# 'matched_areas': sum(1 for a in validated_areas if a.matched),
# 'avg_confidence': round(
# np.mean([a.confidence_score for a in validated_areas]), 2
# ) if validated_areas else 0.0
# }
# return ValidatedResponse(areas=validated_areas, summary=summary)
# # ============= API ENDPOINTS =============
# @app.get("/")
# async def root():
# return {
# "service": "Construction Scope Validator",
# "version": "1.0.0",
# "status": "running",
# "data_loaded": len(db.stages) > 0,
# "model_type": "trained" if os.path.exists('model.safetensors') else "base"
# }
# @app.get("/health")
# async def health():
# return {
# "status": "healthy",
# "stages_loaded": len(db.stages),
# "tasks_loaded": len(db.tasks),
# "materials_loaded": len(db.materials),
# "rooms_loaded": len(db.rooms),
# "embeddings_ready": db.stage_embeddings is not None,
# "model_type": "trained" if os.path.exists('model.safetensors') else "base"
# }
# @app.post("/validate", response_model=ValidatedResponse)
# async def validate_scope_endpoint(request: LLMScopeRequest):
# """Validate LLM-generated scope against database"""
# try:
# if not db.stages:
# raise HTTPException(status_code=500, detail="Database not loaded")
# result = validate_scope(request)
# return result
# except Exception as e:
# import traceback
# error_detail = f"Validation error: {str(e)}\n{traceback.format_exc()}"
# raise HTTPException(status_code=500, detail=error_detail)
# @app.post("/match-stage")
# async def match_stage(stage_name: str):
# """Test endpoint: match a single stage name"""
# matched_stage, confidence = find_best_stage(stage_name)
# if matched_stage:
# return {
# "input": stage_name,
# "matched": matched_stage,
# "confidence": round(confidence, 2)
# }
# return {"input": stage_name, "matched": None, "confidence": 0.0}
# @app.post("/match-room")
# async def match_room(room_name: str):
# """Test endpoint: match a single room name"""
# matched_room, confidence = find_best_room(room_name)
# if matched_room:
# return {
# "input": room_name,
# "matched": matched_room,
# "confidence": round(confidence, 2)
# }
# return {"input": room_name, "matched": None, "confidence": 0.0}
# # ============= STARTUP =============
# @app.on_event("startup")
# async def startup_event():
# """Load data and initialize embeddings on startup"""
# try:
# print("\n" + "="*60)
# print("STARTING UP...")
# print("="*60)
# db.load_data(
# stages_file='stages.json',
# tasks_file='tasks.json',
# materials_file='materials.json',
# rooms_file='rooms.json'
# )
# db.initialize_embeddings()
# print("\n" + "="*60)
# print("✅ SERVICE READY!")
# print("="*60)
# except Exception as e:
# print(f"\n❌ STARTUP ERROR: {e}")
# import traceback
# traceback.print_exc()
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=7860)
# # """
# # FastAPI Service for Construction Scope Validation
# # Deploy on Hugging Face Spaces
# # """
# # from fastapi import FastAPI, HTTPException
# # from fastapi.middleware.cors import CORSMiddleware
# # from pydantic import BaseModel, Field
# # from typing import List, Optional, Dict, Any
# # import json
# # import numpy as np
# # import os
# # from sentence_transformers import SentenceTransformer
# # from sklearn.metrics.pairwise import cosine_similarity
# # import re
# # app = FastAPI(
# # title="Construction Scope Validator API",
# # description="Validates and enriches LLM-generated construction scope with DB data",
# # version="1.0.0"
# # )
# # # CORS middleware
# # app.add_middleware(
# # CORSMiddleware,
# # allow_origins=["*"],
# # allow_credentials=True,
# # allow_methods=["*"],
# # allow_headers=["*"],
# # )
# # # Load embedding model (cached globally)
# # print("="*60)
# # print("LOADING MODEL...")
# # print("="*60)
# # try:
# # model_files = ['config.json', 'sentence_bert_config.json']
# # has_weights = os.path.exists('pytorch_model.bin') or os.path.exists('model.safetensors')
# # has_model = all(os.path.exists(f) for f in model_files) and has_weights
# # if has_model:
# # print("✓ Trained model files found in root directory")
# # print("Loading trained model...")
# # embedding_model = SentenceTransformer('./', device='cpu')
# # print("✅ Trained model loaded successfully!")
# # else:
# # print("⚠️ Trained model not found, using base model...")
# # embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
# # print("✅ Base model loaded successfully!")
# # except Exception as e:
# # print(f"❌ Error loading trained model: {e}")
# # print("Falling back to base model...")
# # embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
# # print("✅ Base model loaded successfully!")
# # print("="*60)
# # # ============= DATA MODELS =============
# # class LLMScopeItem(BaseModel):
# # stage: str
# # task: str
# # material: str
# # quantity: float
# # unit: str
# # class LLMAreaScope(BaseModel):
# # area: str
# # items: List[LLMScopeItem]
# # class LLMScopeRequest(BaseModel):
# # scope_of_work: List[LLMAreaScope]
# # class ValidatedMaterial(BaseModel):
# # materialId: int
# # name: str
# # material: str
# # unit: str
# # price: float
# # margin: float
# # categories: List[str]
# # confidence_score: float
# # class ValidatedTask(BaseModel):
# # taskId: int
# # task: str
# # displayName: str
# # unit: str
# # stageId: int
# # roomArea: List[str]
# # confidence_score: float
# # recommended_materials: List[ValidatedMaterial]
# # class ValidatedStage(BaseModel):
# # stageId: int
# # stage: str
# # priority: int
# # confidence_score: float
# # tasks: List[ValidatedTask]
# # class ValidatedArea(BaseModel):
# # roomId: Optional[int]
# # name: str
# # roomType: str
# # matched: bool
# # confidence_score: float
# # stages: List[ValidatedStage]
# # class ValidatedResponse(BaseModel):
# # areas: List[ValidatedArea]
# # summary: Dict[str, Any]
# # # ============= HELPER FUNCTION =============
# # def parse_room_area(room_area_value):
# # """
# # Parse roomArea field which might be a string, list, or None
# # Returns a proper list of strings
# # """
# # if room_area_value is None:
# # return []
# # # If it's already a list, return it
# # if isinstance(room_area_value, list):
# # return room_area_value
# # # If it's a string, try to parse it as JSON
# # if isinstance(room_area_value, str):
# # try:
# # parsed = json.loads(room_area_value)
# # if isinstance(parsed, list):
# # return parsed
# # return [str(parsed)]
# # except json.JSONDecodeError:
# # # If JSON parsing fails, treat it as a single item
# # return [room_area_value]
# # # Fallback: convert to string and wrap in list
# # return [str(room_area_value)]
# # # ============= DATABASE LOADERS =============
# # class DatabaseLoader:
# # def __init__(self):
# # self.stages = []
# # self.tasks = []
# # self.materials = []
# # self.rooms = []
# # self.stage_embeddings = None
# # self.task_embeddings = None
# # self.material_embeddings = None
# # def load_data(self, stages_file: str, tasks_file: str, materials_file: str, rooms_file: str):
# # """Load JSON data files"""
# # print(f"Loading {stages_file}...")
# # with open(stages_file, 'r', encoding='utf-8') as f:
# # self.stages = [json.loads(line) for line in f if line.strip()]
# # print(f"Loading {tasks_file}...")
# # with open(tasks_file, 'r', encoding='utf-8') as f:
# # self.tasks = [json.loads(line) for line in f if line.strip()]
# # print(f"Loading {materials_file}...")
# # with open(materials_file, 'r', encoding='utf-8') as f:
# # self.materials = [json.loads(line) for line in f if line.strip()]
# # print(f"Loading {rooms_file}...")
# # with open(rooms_file, 'r', encoding='utf-8') as f:
# # self.rooms = [json.loads(line) for line in f if line.strip()]
# # print(f"✅ Loaded: {len(self.stages)} stages, {len(self.tasks)} tasks, "
# # f"{len(self.materials)} materials, {len(self.rooms)} rooms")
# # def initialize_embeddings(self):
# # """Pre-compute embeddings for fast lookup"""
# # print("Computing stage embeddings...")
# # stage_texts = [s['stage'] for s in self.stages]
# # self.stage_embeddings = embedding_model.encode(stage_texts, show_progress_bar=True)
# # print("Computing task embeddings...")
# # task_texts = [t['task'] for t in self.tasks]
# # self.task_embeddings = embedding_model.encode(task_texts, show_progress_bar=True)
# # print("Computing material embeddings...")
# # material_texts = [m['material'] for m in self.materials]
# # self.material_embeddings = embedding_model.encode(material_texts, show_progress_bar=True)
# # print("✅ Embeddings ready!")
# # # Global DB instance
# # db = DatabaseLoader()
# # # ============= MATCHING FUNCTIONS =============
# # def find_best_stage(llm_stage: str, threshold: float = 0.5) -> tuple:
# # """Find closest matching stage from DB"""
# # query_embedding = embedding_model.encode([llm_stage])
# # similarities = cosine_similarity(query_embedding, db.stage_embeddings)[0]
# # best_idx = np.argmax(similarities)
# # best_score = similarities[best_idx]
# # if best_score >= threshold:
# # return db.stages[best_idx], best_score
# # return None, 0.0
# # def find_best_room(llm_area: str, threshold: float = 0.6) -> tuple:
# # """Find closest matching room from DB"""
# # llm_area_lower = llm_area.lower()
# # # Exact match first
# # for room in db.rooms:
# # if room['name'].lower() == llm_area_lower:
# # return room, 1.0
# # # Fuzzy match
# # room_texts = [r['name'] for r in db.rooms]
# # query_embedding = embedding_model.encode([llm_area])
# # room_embeddings = embedding_model.encode(room_texts)
# # similarities = cosine_similarity(query_embedding, room_embeddings)[0]
# # best_idx = np.argmax(similarities)
# # best_score = similarities[best_idx]
# # if best_score >= threshold:
# # return db.rooms[best_idx], best_score
# # return None, 0.0
# # def find_tasks_for_stage(stage_id: int, llm_task: str, top_k: int = 5) -> List[tuple]:
# # """Find relevant tasks for a stage matching LLM task description"""
# # stage_tasks = [t for t in db.tasks if t['stageId'] == stage_id]
# # if not stage_tasks:
# # return []
# # task_indices = [db.tasks.index(t) for t in stage_tasks]
# # query_embedding = embedding_model.encode([llm_task])
# # stage_task_embeddings = db.task_embeddings[task_indices]
# # similarities = cosine_similarity(query_embedding, stage_task_embeddings)[0]
# # top_indices = np.argsort(similarities)[-top_k:][::-1]
# # results = [(stage_tasks[idx], similarities[idx]) for idx in top_indices]
# # return results
# # def extract_keywords(text: str) -> List[str]:
# # """Extract meaningful keywords from text"""
# # stop_words = {'and', 'or', 'the', 'to', 'a', 'of', 'for', 'in', 'on', 'supply', 'install'}
# # words = re.findall(r'\b\w+\b', text.lower())
# # return [w for w in words if w not in stop_words and len(w) > 2]
# # def find_materials_for_task(task: dict, llm_material: str, unit: str, top_k: int = 10) -> List[tuple]:
# # """Find materials matching task requirements"""
# # task_keywords = extract_keywords(task['task'])
# # llm_keywords = extract_keywords(llm_material)
# # all_keywords = set(task_keywords + llm_keywords)
# # compatible_materials = [
# # m for m in db.materials
# # if m['unit'] == unit or m['unit'] == 'unit' or m['unit'] is None
# # ]
# # if not compatible_materials:
# # compatible_materials = db.materials
# # scored_materials = []
# # for material in compatible_materials:
# # score = 0.0
# # material_text = material['material'].lower()
# # for keyword in all_keywords:
# # if keyword in material_text:
# # score += 2.0
# # categories_str = ' '.join(material.get('categories', [])).lower()
# # for keyword in all_keywords:
# # if keyword in categories_str:
# # score += 1.0
# # material_idx = db.materials.index(material)
# # query_embedding = embedding_model.encode([llm_material])
# # material_embedding = db.material_embeddings[material_idx].reshape(1, -1)
# # semantic_score = cosine_similarity(query_embedding, material_embedding)[0][0]
# # score += semantic_score * 5.0
# # if score > 0:
# # scored_materials.append((material, score))
# # scored_materials.sort(key=lambda x: x[1], reverse=True)
# # return scored_materials[:top_k]
# # # ============= VALIDATION PIPELINE =============
# # def validate_scope(llm_scope: LLMScopeRequest) -> ValidatedResponse:
# # """Main validation pipeline"""
# # validated_areas = []
# # for area_scope in llm_scope.scope_of_work:
# # matched_room, room_confidence = find_best_room(area_scope.area)
# # validated_stages_dict = {}
# # for item in area_scope.items:
# # matched_stage, stage_confidence = find_best_stage(item.stage)
# # if not matched_stage:
# # continue
# # stage_id = matched_stage['stageId']
# # if stage_id not in validated_stages_dict:
# # validated_stages_dict[stage_id] = {
# # 'stage_data': matched_stage,
# # 'confidence': stage_confidence,
# # 'tasks': []
# # }
# # task_matches = find_tasks_for_stage(stage_id, item.task, top_k=3)
# # if not task_matches:
# # continue
# # best_task, task_confidence = task_matches[0]
# # material_matches = find_materials_for_task(
# # best_task, item.material, item.unit, top_k=5
# # )
# # validated_materials = [
# # ValidatedMaterial(
# # materialId=m['materialId'],
# # name=m['name'],
# # material=m['material'],
# # unit=m['unit'] or 'unit',
# # price=float(m['price']),
# # margin=float(m['margin']),
# # categories=m['categories'],
# # confidence_score=round(score / 10.0, 2)
# # )
# # for m, score in material_matches
# # ]
# # # FIX: Parse roomArea properly
# # validated_task = ValidatedTask(
# # taskId=best_task['taskId'],
# # task=best_task['task'],
# # displayName=best_task['displayName'],
# # unit=best_task['unit'],
# # stageId=best_task['stageId'],
# # roomArea=parse_room_area(best_task['roomArea']), # <-- FIXED HERE
# # confidence_score=round(task_confidence, 2),
# # recommended_materials=validated_materials
# # )
# # validated_stages_dict[stage_id]['tasks'].append(validated_task)
# # validated_stages = [
# # ValidatedStage(
# # stageId=stage_data['stage_data']['stageId'],
# # stage=stage_data['stage_data']['stage'],
# # priority=stage_data['stage_data']['priority'],
# # confidence_score=round(stage_data['confidence'], 2),
# # tasks=stage_data['tasks']
# # )
# # for stage_data in validated_stages_dict.values()
# # ]
# # validated_stages.sort(key=lambda x: x.priority)
# # validated_area = ValidatedArea(
# # roomId=matched_room['id'] if matched_room else None,
# # name=matched_room['name'] if matched_room else area_scope.area,
# # roomType=matched_room['roomType'] if matched_room else 'unknown',
# # matched=matched_room is not None,
# # confidence_score=round(room_confidence, 2),
# # stages=validated_stages
# # )
# # validated_areas.append(validated_area)
# # summary = {
# # 'total_areas': len(validated_areas),
# # 'total_stages': sum(len(a.stages) for a in validated_areas),
# # 'total_tasks': sum(len(s.tasks) for a in validated_areas for s in a.stages),
# # 'total_materials': sum(
# # len(t.recommended_materials)
# # for a in validated_areas
# # for s in a.stages
# # for t in s.tasks
# # ),
# # 'matched_areas': sum(1 for a in validated_areas if a.matched),
# # 'avg_confidence': round(
# # np.mean([a.confidence_score for a in validated_areas]), 2
# # ) if validated_areas else 0.0
# # }
# # return ValidatedResponse(areas=validated_areas, summary=summary)
# # # ============= API ENDPOINTS =============
# # @app.get("/")
# # async def root():
# # return {
# # "service": "Construction Scope Validator",
# # "version": "1.0.0",
# # "status": "running",
# # "data_loaded": len(db.stages) > 0,
# # "model_type": "trained" if os.path.exists('model.safetensors') else "base"
# # }
# # @app.get("/health")
# # async def health():
# # return {
# # "status": "healthy",
# # "stages_loaded": len(db.stages),
# # "tasks_loaded": len(db.tasks),
# # "materials_loaded": len(db.materials),
# # "rooms_loaded": len(db.rooms),
# # "embeddings_ready": db.stage_embeddings is not None,
# # "model_type": "trained" if os.path.exists('model.safetensors') else "base"
# # }
# # @app.post("/validate", response_model=ValidatedResponse)
# # async def validate_scope_endpoint(request: LLMScopeRequest):
# # """
# # Validate LLM-generated scope against database
# # Returns enriched data with matched stages, tasks, materials, and confidence scores
# # """
# # try:
# # if not db.stages:
# # raise HTTPException(status_code=500, detail="Database not loaded")
# # result = validate_scope(request)
# # return result
# # except Exception as e:
# # import traceback
# # error_detail = f"Validation error: {str(e)}\n{traceback.format_exc()}"
# # raise HTTPException(status_code=500, detail=error_detail)
# # @app.post("/match-stage")
# # async def match_stage(stage_name: str):
# # """Test endpoint: match a single stage name"""
# # matched_stage, confidence = find_best_stage(stage_name)
# # if matched_stage:
# # return {
# # "input": stage_name,
# # "matched": matched_stage,
# # "confidence": round(confidence, 2)
# # }
# # return {"input": stage_name, "matched": None, "confidence": 0.0}
# # @app.post("/match-room")
# # async def match_room(room_name: str):
# # """Test endpoint: match a single room name"""
# # matched_room, confidence = find_best_room(room_name)
# # if matched_room:
# # return {
# # "input": room_name,
# # "matched": matched_room,
# # "confidence": round(confidence, 2)
# # }
# # return {"input": room_name, "matched": None, "confidence": 0.0}
# # # ============= STARTUP =============
# # @app.on_event("startup")
# # async def startup_event():
# # """Load data and initialize embeddings on startup"""
# # try:
# # print("\n" + "="*60)
# # print("STARTING UP...")
# # print("="*60)
# # db.load_data(
# # stages_file='stages.json',
# # tasks_file='tasks.json',
# # materials_file='materials.json',
# # rooms_file='rooms.json'
# # )
# # db.initialize_embeddings()
# # print("\n" + "="*60)
# # print("✅ SERVICE READY!")
# # print("="*60)
# # except Exception as e:
# # print(f"\n❌ STARTUP ERROR: {e}")
# # print("Make sure JSON files are in the correct location")
# # import traceback
# # traceback.print_exc()
# # if __name__ == "__main__":
# # import uvicorn
# # uvicorn.run(app, host="0.0.0.0", port=7860)
# # """
# # FastAPI Service for Construction Scope Validation
# # Deploy on Hugging Face Spaces
# # """
# # from fastapi import FastAPI, HTTPException
# # from fastapi.middleware.cors import CORSMiddleware
# # from pydantic import BaseModel, Field
# # from typing import List, Optional, Dict, Any
# # import json
# # import numpy as np
# # import os
# # from sentence_transformers import SentenceTransformer
# # from sklearn.metrics.pairwise import cosine_similarity
# # import re
# # app = FastAPI(
# # title="Construction Scope Validator API",
# # description="Validates and enriches LLM-generated construction scope with DB data",
# # version="1.0.0"
# # )
# # # CORS middleware
# # app.add_middleware(
# # CORSMiddleware,
# # allow_origins=["*"],
# # allow_credentials=True,
# # allow_methods=["*"],
# # allow_headers=["*"],
# # )
# # # Load embedding model (cached globally)
# # # Try to load trained model from root, fallback to base model
# # print("="*60)
# # print("LOADING MODEL...")
# # print("="*60)
# # try:
# # # Check if trained model files exist in root
# # # Check if trained model files exist in root
# # model_files = ['config.json', 'sentence_bert_config.json']
# # # Check for either pytorch_model.bin or model.safetensors
# # has_weights = os.path.exists('pytorch_model.bin') or os.path.exists('model.safetensors')
# # has_model = all(os.path.exists(f) for f in model_files) and has_weights
# # if has_model:
# # print("✓ Trained model files found in root directory")
# # print("Loading trained model...")
# # embedding_model = SentenceTransformer('./', device='cpu')
# # print("✅ Trained model loaded successfully!")
# # else:
# # print("⚠️ Trained model not found, using base model...")
# # embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
# # print("✅ Base model loaded successfully!")
# # except Exception as e:
# # print(f"❌ Error loading trained model: {e}")
# # print("Falling back to base model...")
# # embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
# # print("✅ Base model loaded successfully!")
# # print("="*60)
# # # ============= DATA MODELS =============
# # class LLMScopeItem(BaseModel):
# # stage: str
# # task: str
# # material: str
# # quantity: float
# # unit: str
# # class LLMAreaScope(BaseModel):
# # area: str
# # items: List[LLMScopeItem]
# # class LLMScopeRequest(BaseModel):
# # scope_of_work: List[LLMAreaScope]
# # class ValidatedMaterial(BaseModel):
# # materialId: int
# # name: str
# # material: str
# # unit: str
# # price: float
# # margin: float
# # categories: List[str]
# # confidence_score: float
# # class ValidatedTask(BaseModel):
# # taskId: int
# # task: str
# # displayName: str
# # unit: str
# # stageId: int
# # roomArea: List[str]
# # confidence_score: float
# # recommended_materials: List[ValidatedMaterial]
# # class ValidatedStage(BaseModel):
# # stageId: int
# # stage: str
# # priority: int
# # confidence_score: float
# # tasks: List[ValidatedTask]
# # class ValidatedArea(BaseModel):
# # roomId: Optional[int]
# # name: str
# # roomType: str
# # matched: bool
# # confidence_score: float
# # stages: List[ValidatedStage]
# # class ValidatedResponse(BaseModel):
# # areas: List[ValidatedArea]
# # summary: Dict[str, Any]
# # # ============= DATABASE LOADERS =============
# # class DatabaseLoader:
# # def __init__(self):
# # self.stages = []
# # self.tasks = []
# # self.materials = []
# # self.rooms = []
# # self.stage_embeddings = None
# # self.task_embeddings = None
# # self.material_embeddings = None
# # def load_data(self, stages_file: str, tasks_file: str, materials_file: str, rooms_file: str):
# # """Load JSON data files"""
# # print(f"Loading {stages_file}...")
# # with open(stages_file, 'r', encoding='utf-8') as f:
# # self.stages = [json.loads(line) for line in f if line.strip()]
# # print(f"Loading {tasks_file}...")
# # with open(tasks_file, 'r', encoding='utf-8') as f:
# # self.tasks = [json.loads(line) for line in f if line.strip()]
# # print(f"Loading {materials_file}...")
# # with open(materials_file, 'r', encoding='utf-8') as f:
# # self.materials = [json.loads(line) for line in f if line.strip()]
# # print(f"Loading {rooms_file}...")
# # with open(rooms_file, 'r', encoding='utf-8') as f:
# # self.rooms = [json.loads(line) for line in f if line.strip()]
# # print(f"✅ Loaded: {len(self.stages)} stages, {len(self.tasks)} tasks, "
# # f"{len(self.materials)} materials, {len(self.rooms)} rooms")
# # def initialize_embeddings(self):
# # """Pre-compute embeddings for fast lookup"""
# # print("Computing stage embeddings...")
# # stage_texts = [s['stage'] for s in self.stages]
# # self.stage_embeddings = embedding_model.encode(stage_texts, show_progress_bar=True)
# # print("Computing task embeddings...")
# # task_texts = [t['task'] for t in self.tasks]
# # self.task_embeddings = embedding_model.encode(task_texts, show_progress_bar=True)
# # print("Computing material embeddings...")
# # material_texts = [m['material'] for m in self.materials]
# # self.material_embeddings = embedding_model.encode(material_texts, show_progress_bar=True)
# # print("✅ Embeddings ready!")
# # # Global DB instance
# # db = DatabaseLoader()
# # # ============= MATCHING FUNCTIONS =============
# # def find_best_stage(llm_stage: str, threshold: float = 0.5) -> tuple:
# # """Find closest matching stage from DB"""
# # query_embedding = embedding_model.encode([llm_stage])
# # similarities = cosine_similarity(query_embedding, db.stage_embeddings)[0]
# # best_idx = np.argmax(similarities)
# # best_score = similarities[best_idx]
# # if best_score >= threshold:
# # return db.stages[best_idx], best_score
# # return None, 0.0
# # def find_best_room(llm_area: str, threshold: float = 0.6) -> tuple:
# # """Find closest matching room from DB"""
# # llm_area_lower = llm_area.lower()
# # # Exact match first
# # for room in db.rooms:
# # if room['name'].lower() == llm_area_lower:
# # return room, 1.0
# # # Fuzzy match
# # room_texts = [r['name'] for r in db.rooms]
# # query_embedding = embedding_model.encode([llm_area])
# # room_embeddings = embedding_model.encode(room_texts)
# # similarities = cosine_similarity(query_embedding, room_embeddings)[0]
# # best_idx = np.argmax(similarities)
# # best_score = similarities[best_idx]
# # if best_score >= threshold:
# # return db.rooms[best_idx], best_score
# # return None, 0.0
# # def find_tasks_for_stage(stage_id: int, llm_task: str, top_k: int = 5) -> List[tuple]:
# # """Find relevant tasks for a stage matching LLM task description"""
# # # Filter tasks by stage
# # stage_tasks = [t for t in db.tasks if t['stageId'] == stage_id]
# # if not stage_tasks:
# # return []
# # # Compute similarities
# # task_indices = [db.tasks.index(t) for t in stage_tasks]
# # query_embedding = embedding_model.encode([llm_task])
# # stage_task_embeddings = db.task_embeddings[task_indices]
# # similarities = cosine_similarity(query_embedding, stage_task_embeddings)[0]
# # # Get top K
# # top_indices = np.argsort(similarities)[-top_k:][::-1]
# # results = [(stage_tasks[idx], similarities[idx]) for idx in top_indices]
# # return results
# # def extract_keywords(text: str) -> List[str]:
# # """Extract meaningful keywords from text"""
# # # Remove common words
# # stop_words = {'and', 'or', 'the', 'to', 'a', 'of', 'for', 'in', 'on', 'supply', 'install'}
# # words = re.findall(r'\b\w+\b', text.lower())
# # return [w for w in words if w not in stop_words and len(w) > 2]
# # def find_materials_for_task(task: dict, llm_material: str, unit: str, top_k: int = 10) -> List[tuple]:
# # """Find materials matching task requirements"""
# # task_keywords = extract_keywords(task['task'])
# # llm_keywords = extract_keywords(llm_material)
# # all_keywords = set(task_keywords + llm_keywords)
# # # Filter by unit compatibility
# # compatible_materials = [
# # m for m in db.materials
# # if m['unit'] == unit or m['unit'] == 'unit' or m['unit'] is None
# # ]
# # if not compatible_materials:
# # # Fallback: allow any unit
# # compatible_materials = db.materials
# # # Score materials
# # scored_materials = []
# # for material in compatible_materials:
# # score = 0.0
# # material_text = material['material'].lower()
# # # Keyword matching
# # for keyword in all_keywords:
# # if keyword in material_text:
# # score += 2.0
# # # Category matching
# # categories_str = ' '.join(material.get('categories', [])).lower()
# # for keyword in all_keywords:
# # if keyword in categories_str:
# # score += 1.0
# # # Embedding similarity
# # material_idx = db.materials.index(material)
# # query_embedding = embedding_model.encode([llm_material])
# # material_embedding = db.material_embeddings[material_idx].reshape(1, -1)
# # semantic_score = cosine_similarity(query_embedding, material_embedding)[0][0]
# # score += semantic_score * 5.0
# # if score > 0:
# # scored_materials.append((material, score))
# # # Sort and return top K
# # scored_materials.sort(key=lambda x: x[1], reverse=True)
# # return scored_materials[:top_k]
# # # ============= VALIDATION PIPELINE =============
# # def validate_scope(llm_scope: LLMScopeRequest) -> ValidatedResponse:
# # """Main validation pipeline"""
# # validated_areas = []
# # for area_scope in llm_scope.scope_of_work:
# # # Match room/area
# # matched_room, room_confidence = find_best_room(area_scope.area)
# # validated_stages_dict = {}
# # for item in area_scope.items:
# # # Match stage
# # matched_stage, stage_confidence = find_best_stage(item.stage)
# # if not matched_stage:
# # continue # Skip if stage not found
# # stage_id = matched_stage['stageId']
# # # Initialize stage if new
# # if stage_id not in validated_stages_dict:
# # validated_stages_dict[stage_id] = {
# # 'stage_data': matched_stage,
# # 'confidence': stage_confidence,
# # 'tasks': []
# # }
# # # Match task
# # task_matches = find_tasks_for_stage(stage_id, item.task, top_k=3)
# # if not task_matches:
# # continue
# # best_task, task_confidence = task_matches[0]
# # # Match materials
# # material_matches = find_materials_for_task(
# # best_task,
# # item.material,
# # item.unit,
# # top_k=5
# # )
# # validated_materials = [
# # ValidatedMaterial(
# # materialId=m['materialId'],
# # name=m['name'],
# # material=m['material'],
# # unit=m['unit'] or 'unit',
# # price=float(m['price']),
# # margin=float(m['margin']),
# # categories=m['categories'],
# # confidence_score=round(score / 10.0, 2)
# # )
# # for m, score in material_matches
# # ]
# # validated_task = ValidatedTask(
# # taskId=best_task['taskId'],
# # task=best_task['task'],
# # displayName=best_task['displayName'],
# # unit=best_task['unit'],
# # stageId=best_task['stageId'],
# # roomArea=best_task['roomArea'],
# # confidence_score=round(task_confidence, 2),
# # recommended_materials=validated_materials
# # )
# # validated_stages_dict[stage_id]['tasks'].append(validated_task)
# # # Build validated stages list
# # validated_stages = [
# # ValidatedStage(
# # stageId=stage_data['stage_data']['stageId'],
# # stage=stage_data['stage_data']['stage'],
# # priority=stage_data['stage_data']['priority'],
# # confidence_score=round(stage_data['confidence'], 2),
# # tasks=stage_data['tasks']
# # )
# # for stage_data in validated_stages_dict.values()
# # ]
# # # Sort stages by priority
# # validated_stages.sort(key=lambda x: x.priority)
# # validated_area = ValidatedArea(
# # roomId=matched_room['id'] if matched_room else None,
# # name=matched_room['name'] if matched_room else area_scope.area,
# # roomType=matched_room['roomType'] if matched_room else 'unknown',
# # matched=matched_room is not None,
# # confidence_score=round(room_confidence, 2),
# # stages=validated_stages
# # )
# # validated_areas.append(validated_area)
# # # Build summary
# # summary = {
# # 'total_areas': len(validated_areas),
# # 'total_stages': sum(len(a.stages) for a in validated_areas),
# # 'total_tasks': sum(len(s.tasks) for a in validated_areas for s in a.stages),
# # 'total_materials': sum(
# # len(t.recommended_materials)
# # for a in validated_areas
# # for s in a.stages
# # for t in s.tasks
# # ),
# # 'matched_areas': sum(1 for a in validated_areas if a.matched),
# # 'avg_confidence': round(
# # np.mean([a.confidence_score for a in validated_areas]), 2
# # ) if validated_areas else 0.0
# # }
# # return ValidatedResponse(areas=validated_areas, summary=summary)
# # # ============= API ENDPOINTS =============
# # @app.get("/")
# # async def root():
# # return {
# # "service": "Construction Scope Validator",
# # "version": "1.0.0",
# # "status": "running",
# # "data_loaded": len(db.stages) > 0,
# # "model_type": "trained" if os.path.exists('pytorch_model.bin') else "base"
# # }
# # @app.get("/health")
# # async def health():
# # return {
# # "status": "healthy",
# # "stages_loaded": len(db.stages),
# # "tasks_loaded": len(db.tasks),
# # "materials_loaded": len(db.materials),
# # "rooms_loaded": len(db.rooms),
# # "embeddings_ready": db.stage_embeddings is not None,
# # "model_type": "trained" if os.path.exists('pytorch_model.bin') else "base"
# # }
# # @app.post("/validate", response_model=ValidatedResponse)
# # async def validate_scope_endpoint(request: LLMScopeRequest):
# # """
# # Validate LLM-generated scope against database
# # Returns enriched data with:
# # - Matched stages from DB
# # - Matched tasks from DB
# # - Recommended materials with pricing
# # - Confidence scores for all matches
# # """
# # try:
# # if not db.stages:
# # raise HTTPException(status_code=500, detail="Database not loaded")
# # result = validate_scope(request)
# # return result
# # except Exception as e:
# # raise HTTPException(status_code=500, detail=f"Validation error: {str(e)}")
# # @app.post("/match-stage")
# # async def match_stage(stage_name: str):
# # """Test endpoint: match a single stage name"""
# # matched_stage, confidence = find_best_stage(stage_name)
# # if matched_stage:
# # return {
# # "input": stage_name,
# # "matched": matched_stage,
# # "confidence": round(confidence, 2)
# # }
# # return {"input": stage_name, "matched": None, "confidence": 0.0}
# # @app.post("/match-room")
# # async def match_room(room_name: str):
# # """Test endpoint: match a single room name"""
# # matched_room, confidence = find_best_room(room_name)
# # if matched_room:
# # return {
# # "input": room_name,
# # "matched": matched_room,
# # "confidence": round(confidence, 2)
# # }
# # return {"input": room_name, "matched": None, "confidence": 0.0}
# # # ============= STARTUP =============
# # @app.on_event("startup")
# # async def startup_event():
# # """Load data and initialize embeddings on startup"""
# # try:
# # print("\n" + "="*60)
# # print("STARTING UP...")
# # print("="*60)
# # # Check what files are available
# # print("\nFiles in root directory:")
# # for file in os.listdir('.'):
# # print(f" - {file}")
# # # Load data
# # db.load_data(
# # stages_file='stages.json',
# # tasks_file='tasks.json',
# # materials_file='materials.json',
# # rooms_file='rooms.json'
# # )
# # db.initialize_embeddings()
# # print("\n" + "="*60)
# # print("✅ SERVICE READY!")
# # print("="*60)
# # except Exception as e:
# # print(f"\n❌ STARTUP ERROR: {e}")
# # print("Make sure JSON files are in the correct location")
# # import traceback
# # traceback.print_exc()
# # if __name__ == "__main__":
# # import uvicorn
# # uvicorn.run(app, host="0.0.0.0", port=7860)