Gemma-3 / app.py
FrameRateTech's picture
Update app.py
15e014a verified
import os
import gc
import logging
import traceback
import time
import json
import asyncio
from datetime import datetime, timedelta
from pathlib import Path
import transformers
import torch
import gradio as gr
import pandas as pd
import PyPDF2
import io
# For asynchronous file I/O
import aiofiles
from transformers import Gemma3ForConditionalGeneration, AutoProcessor
###############################################################################
# Configuration and Global Variables
###############################################################################
# Load configuration from environment variables if available
MODEL_ID = os.getenv("MODEL_ID", "google/gemma-3-4b-it")
FIXED_TEMPERATURE = float(os.getenv("FIXED_TEMPERATURE", 0.2))
FIXED_TOP_P = float(os.getenv("FIXED_TOP_P", 0.95))
MAX_NEW_TOKENS = int(os.getenv("MAX_NEW_TOKENS", 4096))
# Global model and processor placeholders
model = None
processor = None
# Cache for reference documents
REFERENCE_CACHE = {}
###############################################################################
# Logging Configuration
###############################################################################
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("NedByDamageScan")
###############################################################################
# System Prompt and Model Personality
###############################################################################
SYSTEM_PROMPT = """You are Ned by DamageScan, a highly specialized water remediation consultant with extensive expertise in water damage restoration, labor planning, equipment selection, and cost analysis. Your responses are precise, data-driven, and grounded in real-world engineering principles.
The input you receive is in CSV format and is structured in a specific way: the first row contains descriptors and the subsequent rows contain the data for processing. The first several columns contain identical 'site' level information that applies to the entire property (such as Claim_ID, Site_Name, Address, City, State, Structure, Date_of_Loss, Damage_Description, etc.), while subsequent columns provide room-specific details (e.g., Room_Name, Room_Temp_F, Room_Humidity_Percent, Room_GPP, dimensions, damage specifics, etc.).
Before generating your recommendations, please first extract and consolidate the site-level data from the repeated columns, then process the room-level data from the remaining columns. Use the complete dataset to inform your calculations and recommendations for drying times, equipment needs, labor scheduling, and cost estimations. Your explanation should clearly reference both the common site information and the unique characteristics of each room. Always ensure that your analysis is accurate and adheres precisely to your internal knowledge reference base, industry standards. Explain your reasoning in clear, concise language using appropriate technical terminology."""
###############################################################################
# Memory Management
###############################################################################
def optimize_memory():
"""Clear caches and perform garbage collection."""
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
logger.info("Memory optimized.")
###############################################################################
# Reference Files Loader with Asynchronous I/O and Caching
###############################################################################
async def load_reference_files():
"""Load reference documents from 'CDI.json' in the main directory and from the 'reference' folder (if available)."""
global REFERENCE_CACHE
if REFERENCE_CACHE:
return REFERENCE_CACHE
reference_content = {}
# Load CDI.json from the main directory
cdi_path = Path("CDI.json")
if cdi_path.exists():
try:
async with aiofiles.open(cdi_path, "r", encoding="utf-8") as f:
content = await f.read()
reference_content[cdi_path.name] = content
logger.info(f"Loaded reference file: {cdi_path.name}")
except Exception as e:
logger.error(f"Error reading {cdi_path.name}: {str(e)}")
# Load additional reference files from the "reference" folder, if it exists
reference_dir = Path("reference")
if reference_dir.exists():
for file_path in reference_dir.glob("**/*"):
if file_path.is_file():
try:
async with aiofiles.open(file_path, "r", encoding="utf-8") as file:
content = await file.read()
reference_content[file_path.name] = content
logger.info(f"Loaded reference file: {file_path.name}")
except Exception as e:
logger.error(f"Error reading {file_path.name}: {str(e)}")
REFERENCE_CACHE = reference_content
return reference_content
###############################################################################
# Prompt Formatting Functions
###############################################################################
def format_cdi_calculation_prompt(df, reference_content):
prompt = "Generate CDI (Certified Drying Inspector) calculations for water damage restoration.\n\n"
prompt += "CSV DATA SUMMARY:\n"
prompt += f"- Total Rooms: {df['Room_Name'].nunique()}\n"
prompt += f"- Claim IDs: {', '.join(df['Claim_ID'].unique())}\n\n"
prompt += "Please analyze each room and provide detailed CDI metrics following industry standards.\n"
if reference_content:
prompt += "\nREFERENCE DOCUMENTS:\n"
for fname, content in reference_content.items():
if len(content) > 500:
prompt += f"--- {fname} (truncated) ---\n{content[:500]}...\n\n"
else:
prompt += f"--- {fname} ---\n{content}\n\n"
return prompt
def format_drying_time_prompt(df, reference_content):
prompt = "Generate drying time calculations for water-damaged materials using the following CSV data.\n\n"
prompt += "CSV DATA SUMMARY:\n"
prompt += f"- Total Rooms: {df['Room_Name'].nunique()}\n"
prompt += f"- Categories: {', '.join(map(str, df['Category'].unique()))}\n\n"
prompt += "Please provide detailed drying time estimates for each room.\n"
if reference_content:
prompt += "\nREFERENCE DOCUMENTS:\n"
for fname, content in reference_content.items():
if len(content) > 500:
prompt += f"--- {fname} (truncated) ---\n{content[:500]}...\n\n"
else:
prompt += f"--- {fname} ---\n{content}\n\n"
return prompt
def format_equipment_needs_prompt(df, reference_content):
prompt = "Generate equipment needs calculations for water damage restoration using the following CSV data.\n\n"
prompt += "CSV DATA SUMMARY:\n"
prompt += f"- Total Rooms: {df['Room_Name'].nunique()}\n"
prompt += f"- Claim IDs: {', '.join(df['Claim_ID'].unique())}\n\n"
prompt += "Please provide detailed recommendations for equipment quantities and placement.\n"
if reference_content:
prompt += "\nREFERENCE DOCUMENTS:\n"
for fname, content in reference_content.items():
if len(content) > 500:
prompt += f"--- {fname} (truncated) ---\n{content[:500]}...\n\n"
else:
prompt += f"--- {fname} ---\n{content}\n\n"
return prompt
def format_labor_needs_prompt(df, reference_content):
prompt = "Generate labor needs calculations for water damage restoration using the following CSV data.\n\n"
prompt += "CSV DATA SUMMARY:\n"
prompt += f"- Total Rooms: {df['Room_Name'].nunique()}\n"
prompt += f"- Claim IDs: {', '.join(df['Claim_ID'].unique())}\n\n"
prompt += "Please provide detailed labor scheduling and cost estimates for each room.\n"
if reference_content:
prompt += "\nREFERENCE DOCUMENTS:\n"
for fname, content in reference_content.items():
if len(content) > 500:
prompt += f"--- {fname} (truncated) ---\n{content[:500]}...\n\n"
else:
prompt += f"--- {fname} ---\n{content}\n\n"
return prompt
def format_materials_removal_prompt(df, reference_content):
prompt = "Generate materials removal recommendations for water damage restoration using the following CSV data.\n\n"
prompt += "CSV DATA SUMMARY:\n"
prompt += f"- Total Rooms: {df['Room_Name'].nunique()}\n"
prompt += f"- Claim IDs: {', '.join(df['Claim_ID'].unique())}\n\n"
prompt += "Please provide detailed recommendations for material removal and associated costs.\n"
if reference_content:
prompt += "\nREFERENCE DOCUMENTS:\n"
for fname, content in reference_content.items():
if len(content) > 500:
prompt += f"--- {fname} (truncated) ---\n{content[:500]}...\n\n"
else:
prompt += f"--- {fname} ---\n{content}\n\n"
return prompt
###############################################################################
# CSV Parsing Utility
###############################################################################
def safe_parse_csv(csv_text):
"""Attempt to parse CSV text into a DataFrame; return None if parsing fails."""
try:
df = pd.read_csv(io.StringIO(csv_text))
return df
except Exception as e:
logger.warning(f"CSV parsing failed: {str(e)}")
return None
def process_csv_file(file_path):
"""Process an uploaded CSV file and return the DataFrame."""
try:
df = pd.read_csv(file_path)
logger.info(f"Successfully loaded CSV file: {file_path}")
return df, None
except Exception as e:
error_msg = f"Error processing CSV file: {str(e)}"
logger.error(error_msg)
return None, error_msg
###############################################################################
# Model Loading and Text Generation
###############################################################################
def load_model_and_processor():
"""Load the Gemma 3 model and its processor."""
global model, processor
logger.info(f"Loading model: {MODEL_ID}")
logger.info(f"Transformers version: {transformers.__version__}")
logger.info(f"PyTorch version: {torch.__version__}")
device_info = {
"cuda_available": torch.cuda.is_available(),
"mps_available": hasattr(torch.backends, "mps") and torch.backends.mps.is_available()
}
try:
processor_local = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True)
logger.info("Processor loaded successfully.")
if device_info["cuda_available"]:
device_map = "auto"
torch_dtype = torch.bfloat16
elif device_info["mps_available"]:
device_map = {"": "mps"}
torch_dtype = torch.float16
else:
device_map = {"": "cpu"}
torch_dtype = torch.float32
model_local = Gemma3ForConditionalGeneration.from_pretrained(
MODEL_ID,
torch_dtype=torch_dtype,
device_map=device_map,
trust_remote_code=True,
)
model_local.eval()
logger.info("Model loaded successfully.")
except Exception as e:
logger.error(f"Failed to load model: {str(e)}")
raise RuntimeError(f"Failed to load model: {str(e)}")
return model_local, processor_local
def generate_text(model, processor, messages, max_new_tokens=MAX_NEW_TOKENS):
"""Generate text output using the Gemma 3 model."""
try:
inputs = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt"
)
inputs = {k: v.to(model.device) for k, v in inputs.items()}
input_len = inputs["input_ids"].shape[-1]
with torch.no_grad():
outputs = model.generate(
**inputs,
temperature=FIXED_TEMPERATURE,
top_p=FIXED_TOP_P,
do_sample=True,
max_new_tokens=max_new_tokens,
repetition_penalty=1.1,
)
if outputs.shape[0] > 0:
generation = outputs[0][input_len:]
response = processor.decode(generation, skip_special_tokens=True)
else:
response = "No response generated."
return response
except Exception as e:
logger.error(f"Error in generate_text: {str(e)}")
return "An error occurred during generation. Please try again."
# Asynchronous wrapper for text generation using a thread
async def async_generate_text(model, processor, messages, max_new_tokens=MAX_NEW_TOKENS):
return await asyncio.to_thread(generate_text, model, processor, messages, max_new_tokens)
###############################################################################
# Calculation Generation via Prompting (Asynchronous)
###############################################################################
async def generate_calculation_from_df(df, calculation_type):
"""Generate calculation based on a DataFrame."""
reference_content = await load_reference_files()
if calculation_type == "CDI Calculations":
prompt = format_cdi_calculation_prompt(df, reference_content)
elif calculation_type == "Drying Time":
prompt = format_drying_time_prompt(df, reference_content)
elif calculation_type == "Equipment Needs":
prompt = format_equipment_needs_prompt(df, reference_content)
elif calculation_type == "Labor Needs":
prompt = format_labor_needs_prompt(df, reference_content)
elif calculation_type == "Materials Removal":
prompt = format_materials_removal_prompt(df, reference_content)
else:
prompt = f"Please generate {calculation_type} based on the provided CSV data."
messages = [
{"role": "system", "content": [{"type": "text", "text": SYSTEM_PROMPT}]},
{"role": "user", "content": [{"type": "text", "text": prompt}]}
]
response = await async_generate_text(model, processor, messages, max_new_tokens=MAX_NEW_TOKENS)
optimize_memory()
# Return both the generated response and a preview (first 5 rows) of the CSV data as a list of records
preview_table = df.head(5).to_dict('records')
return response, preview_table
async def generate_calculation_from_text(csv_text, calculation_type):
"""Generate calculation from CSV text."""
df = safe_parse_csv(csv_text)
if df is not None:
return await generate_calculation_from_df(df, calculation_type)
else:
reference_content = await load_reference_files()
prompt = f"CSV Data:\n{csv_text}\n\nPlease generate {calculation_type} for water damage restoration."
messages = [
{"role": "system", "content": [{"type": "text", "text": SYSTEM_PROMPT}]},
{"role": "user", "content": [{"type": "text", "text": prompt}]}
]
response = await async_generate_text(model, processor, messages, max_new_tokens=MAX_NEW_TOKENS)
optimize_memory()
return response, None
###############################################################################
# Gradio Interface Handler Functions (Asynchronous)
###############################################################################
async def handle_file_upload(file_path, calculation_type):
"""Process an uploaded CSV file and generate calculations."""
if not file_path or file_path.strip() == "":
return "Please upload a CSV file.", None
df, error = process_csv_file(file_path)
if error:
return f"Error: {error}", None
if df is not None:
logger.info(f"CSV Preview:\n{df.head(5).to_string()}")
return await generate_calculation_from_df(df, calculation_type)
else:
return "Failed to process the CSV file. Please check the format and try again.", None
async def handle_text_input(csv_text, calculation_type):
"""Process CSV text input and generate calculations."""
if not csv_text or csv_text.strip() == "":
return "Please enter CSV data.", None
return await generate_calculation_from_text(csv_text, calculation_type)
async def combined_input_handler(csv_text, file, calculation_type):
"""Handle either file upload or text input, with file taking precedence.
Returns a tuple: (generated_answer, CSV preview data)
"""
if file and isinstance(file, str) and file.strip() != "":
return await handle_file_upload(file, calculation_type)
elif csv_text and csv_text.strip() != "":
return await handle_text_input(csv_text, calculation_type)
else:
return "Please either upload a CSV file or enter CSV data.", None
###############################################################################
# Gradio Interface
###############################################################################
def build_interface():
with gr.Blocks(css="footer {visibility: hidden}") as demo:
gr.Markdown("<h1 align='center'>Ned by DamageScan: Water Remediation Calculator</h1>")
gr.Markdown(
"Enter your CSV data below or upload a CSV file and select the type of calculation to generate recommendations. "
"Each row of the CSV represents a room's water damage details."
)
with gr.Row():
with gr.Column(scale=2):
with gr.Tab("CSV Text Input"):
csv_input = gr.TextArea(
label="CSV Data",
placeholder="Paste your CSV data here...\n\nExample:\nClaim_ID,Site_Name,Address,...",
lines=10
)
with gr.Tab("CSV File Upload"):
file_input = gr.File(
label="Upload CSV File",
file_types=[".csv"],
type="filepath"
)
calc_type = gr.Dropdown(
label="Calculation Type",
choices=[
"CDI Calculations",
"Drying Time",
"Equipment Needs",
"Labor Needs",
"Materials Removal"
],
value="CDI Calculations"
)
generate_button = gr.Button("Generate Answer", variant="primary")
status = gr.Textbox(label="Status", value="Ready", interactive=False)
with gr.Column(scale=1):
preview_table = gr.Dataframe(label="CSV Preview", interactive=False)
output_box = gr.TextArea(
label="Generated Answer",
lines=15,
interactive=False
)
# Event handlers with asynchronous functions
generate_button.click(
lambda: "Processing...",
outputs=status
).then(
combined_input_handler,
inputs=[csv_input, file_input, calc_type],
outputs=[output_box, preview_table]
).then(
lambda: "Ready",
outputs=status
)
return demo
###############################################################################
# Main Application Entry Point
###############################################################################
if __name__ == "__main__":
try:
logger.info("Starting Ned by DamageScan: Water Remediation Calculator")
model, processor = load_model_and_processor()
demo = build_interface()
# Note: share=True is not supported on Hugging Face Spaces; it will be ignored there.
demo.queue().launch(share=True, debug=True, show_error=True)
except Exception as e:
logger.error(f"Application startup failed: {str(e)}")
traceback.print_exc()