Spaces:
Sleeping
Sleeping
import logging | |
from typing import Dict, List | |
import torch | |
import torch.nn as nn | |
import numpy as np | |
import pickle | |
from sklearn.preprocessing import StandardScaler | |
from datetime import datetime, timedelta | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# LSTM Model Definition (must match training script) | |
class DelayPredictor(nn.Module): | |
def __init__(self, input_size, hidden_size, num_layers): | |
super(DelayPredictor, self).__init__() | |
self.hidden_size = hidden_size | |
self.num_layers = num_layers | |
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) | |
self.attention = nn.Linear(hidden_size, 1) | |
self.fc = nn.Linear(hidden_size, 1) | |
self.sigmoid = nn.Sigmoid() | |
def forward(self, x): | |
lstm_out, _ = self.lstm(x) | |
attn_weights = torch.softmax(self.attention(lstm_out).squeeze(-1), dim=1) | |
context = torch.bmm(attn_weights.unsqueeze(1), lstm_out).squeeze(1) | |
out = self.fc(context) | |
return self.sigmoid(out) * 100 | |
# Load model and scaler | |
try: | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model = DelayPredictor(input_size=7, hidden_size=64, num_layers=2).to(device) | |
model.load_state_dict(torch.load("models/delay_model.pth", map_location=device)) | |
model.eval() | |
with open("models/scaler.pkl", "rb") as f: | |
scaler = pickle.load(f) | |
logger.info("LSTM model and scaler loaded successfully") | |
except Exception as e: | |
logger.error(f"Failed to load model or scaler: {str(e)}") | |
model = None | |
scaler = None | |
def get_weather_condition(score: int) -> str: | |
"""Map weather impact score (0-100) to descriptive weather condition.""" | |
if score <= 10: | |
return "Sunny" | |
elif score <= 30: | |
return "Partly Cloudy" | |
elif score <= 50: | |
return "Cloudy" | |
elif score <= 70: | |
return "Light Rain" | |
elif score <= 85: | |
return "Heavy Rain" | |
else: | |
return "Severe Storm" | |
def call_ai_model_for_insights(input_data: Dict, delay_risk: float) -> List[str]: | |
""" | |
Generate detailed hardcoded insights based on input data and delay risk. | |
Includes a 2-week risk alert if weather_forecast_date is within 14 days. | |
Returns 3-5 prioritized, phase/task-specific insights. | |
""" | |
logger.info("Generating detailed hardcoded AI insights") | |
phase = input_data.get("phase", "") | |
task = input_data.get("task", "") | |
current_progress = input_data.get("current_progress", 0) | |
expected_duration = input_data.get("task_expected_duration", 0) | |
actual_duration = input_data.get("task_actual_duration", 0) | |
workforce_gap = input_data.get("workforce_gap", 0) | |
skill_level = input_data.get("workforce_skill_level", "").lower() | |
shift_hours = input_data.get("workforce_shift_hours", 0) | |
weather_score = input_data.get("weather_impact_score", 0) | |
weather_condition = input_data.get("weather_condition", get_weather_condition(weather_score)) | |
project_location = input_data.get("project_location", "Unknown") | |
weather_forecast_date = input_data.get("weather_forecast_date", "") | |
# Initialize insights with scores for prioritization | |
insights = [] | |
# Helper function to add insight with priority score | |
def add_insight(message: str, priority: float): | |
insights.append((message, priority)) | |
# 2-week risk alert | |
try: | |
forecast_date = datetime.strptime(weather_forecast_date, "%Y-%m-%d") | |
current_date = datetime(2025, 5, 26) # Fixed date as per system | |
two_weeks_later = current_date + timedelta(days=14) | |
if current_date <= forecast_date <= two_weeks_later: | |
if delay_risk > 75 or weather_score > 75: | |
add_insight( | |
f"⚠️ Critical 2-Week Risk Alert: High risk of delay for {phase}: {task} in {project_location} by {weather_forecast_date} due to {'severe weather' if weather_score > 75 else 'high delay probability'}. Implement contingency plans immediately.", | |
1.2 | |
) | |
elif delay_risk > 50 or weather_score > 50: | |
add_insight( | |
f"⚠️ 2-Week Risk Alert: Moderate risk of delay for {phase}: {task} in {project_location} by {weather_forecast_date}. Monitor closely and prepare mitigation measures.", | |
1.1 | |
) | |
except ValueError: | |
logger.warning("Invalid weather_forecast_date format; skipping 2-week risk alert") | |
# Delay risk-based insights | |
if delay_risk > 75: | |
add_insight(f"Urgent: High delay risk detected for {phase}: {task} in {project_location}. Take immediate action.", 1.0) | |
elif delay_risk > 50: | |
add_insight(f"Monitor {phase}: {task} closely in {project_location} to prevent delays.", 0.9) | |
elif delay_risk > 25: | |
add_insight(f"Maintain steady progress for {phase}: {task} in {project_location}.", 0.7) | |
else: | |
add_insight(f"Optimize resources for {phase}: {task} in {project_location} to maintain schedule.", 0.6) | |
# Weather-specific insights | |
if weather_score > 85: | |
add_insight(f"Critical: Severe storm forecast in {project_location} for {phase}: {task}. Consider halting outdoor activities.", 1.1) | |
elif weather_score > 70: | |
add_insight(f"Reschedule outdoor tasks for {phase}: {task} in {project_location} due to heavy rain forecast.", 1.0) | |
elif weather_score > 50: | |
add_insight(f"Shift to indoor or weather-resistant tasks for {phase}: {task} in {project_location} due to light rain.", 0.9) | |
elif weather_score > 30: | |
add_insight(f"Monitor cloudy conditions in {project_location} for {phase}: {task} to avoid unexpected delays.", 0.7) | |
else: | |
add_insight(f"Proceed with {phase}: {task} in {project_location} under favorable weather conditions.", 0.6) | |
# Phase/task-specific insights | |
task_specific = { | |
"Planning": { | |
"Define Scope": f"Ensure stakeholder alignment for Planning: Define Scope in {project_location}, considering weather impacts.", | |
"Resource Allocation": f"Secure budget and resources early for Planning: Resource Allocation in {project_location}.", | |
"Permit Acquisition": f"Expedite permits for Planning: Permit Acquisition in {project_location} to avoid weather-related delays." | |
}, | |
"Design": { | |
"Architectural Drafting": f"Engage architects early for Design: Architectural Drafting in {project_location}, accounting for weather.", | |
"Engineering Analysis": f"Hire additional engineers for Design: Engineering Analysis in {project_location} to meet deadlines.", | |
"Design Review": f"Schedule thorough reviews for Design: Design Review in {project_location}, considering forecast." | |
}, | |
"Construction": { | |
"Foundation Work": f"Optimize material delivery for Construction: Foundation Work in {project_location}, avoiding {weather_condition.lower()}.", | |
"Structural Build": f"Ensure equipment availability for Construction: Structural Build in {project_location} under {weather_condition.lower()}.", | |
"Utility Installation": f"Coordinate subcontractors for Construction: Utility Installation in {project_location}, monitoring weather." | |
} | |
} | |
if phase in task_specific and task in task_specific[phase]: | |
add_insight(task_specific[phase][task], 0.8) | |
# Workforce-based insights | |
if workforce_gap > 30: | |
add_insight(f"Urgently hire subcontractors in {project_location} to address {workforce_gap}% workforce shortage.", 1.0) | |
elif workforce_gap > 15: | |
add_insight(f"Recruit additional workers in {project_location} to reduce {workforce_gap}% workforce gap.", 0.9) | |
elif workforce_gap > 5: | |
add_insight(f"Consider temporary staff in {project_location} to address minor workforce gap.", 0.7) | |
if skill_level == "low": | |
add_insight(f"Provide training in {project_location} to improve low skill levels for {phase}: {task}.", 0.9) | |
elif skill_level == "medium" and delay_risk > 50: | |
add_insight(f"Upskill workforce in {project_location} for efficiency in {phase}: {task}.", 0.8) | |
elif skill_level == "high" and delay_risk < 25: | |
add_insight(f"Leverage high skill levels in {project_location} to maintain {phase}: {task} progress.", 0.6) | |
if shift_hours < 6: | |
add_insight(f"Extend shift hours beyond {shift_hours} in {project_location} to meet {phase}: {task} deadlines.", 0.9) | |
elif shift_hours < 8 and delay_risk > 50: | |
add_insight(f"Increase shift hours to 8 in {project_location} for {phase}: {task}.", 0.8) | |
elif shift_hours > 10: | |
add_insight(f"Balance shifts in {project_location} to prevent burnout during {phase}: {task}.", 0.7) | |
# Progress and duration-based insights | |
if expected_duration > 0 and actual_duration > expected_duration: | |
overrun_pct = ((actual_duration - expected_duration) / expected_duration) * 100 | |
if overrun_pct > 20: | |
add_insight(f"Address significant duration overrun ({overrun_pct:.1f}%) for {phase}: {task} in {project_location}.", 1.0) | |
elif overrun_pct > 10: | |
add_insight(f"Review scheduling to address {overrun_pct:.1f}% overrun in {phase}: {task} in {project_location}.", 0.8) | |
if expected_duration > 0: | |
expected_progress = min((actual_duration / expected_duration) * 100, 100) | |
if current_progress < expected_progress * 0.8: | |
add_insight(f"Accelerate task progress for {phase}: {task} in {project_location} to align with schedule.", 0.9) | |
elif current_progress < 50 and delay_risk > 50: | |
add_insight(f"Increase resources to boost {current_progress}% progress in {phase}: {task} in {project_location}.", 0.8) | |
# Edge cases | |
if workforce_gap >= 90: | |
add_insight(f"Critical: Halt non-essential tasks in {project_location} until workforce gap for {phase}: {task} is resolved.", 1.1) | |
if current_progress == 0 and delay_risk > 50: | |
add_insight(f"Initiate {phase}: {task} in {project_location} immediately to avoid further delays.", 1.0) | |
if expected_duration == 0 or actual_duration == 0: | |
add_insight(f"Provide accurate duration estimates for {phase}: {task} in {project_location} for reliable predictions.", 0.7) | |
if weather_score > 50 and phase == "Construction": | |
add_insight(f"Prepare contingency plans for {phase}: {task} in {project_location} due to adverse weather forecast.", 0.95) | |
# Sort insights by priority and select top 3-5 | |
insights.sort(key=lambda x: x[1], reverse=True) | |
selected_insights = [insight[0] for insight in insights[:5]] | |
logger.info(f"Generated insights: {selected_insights}") | |
return selected_insights or [f"No significant delay factors detected for {phase}: {task} in {project_location}."] | |
def predict_delay(input_data: Dict) -> Dict: | |
""" | |
Predict delay probability using LSTM model. | |
Inputs: Project task data (phase, progress, duration, workforce, weather). | |
Outputs: Delay probability, AI insights, high-risk phases, weather condition. | |
""" | |
logger.info("Starting LSTM delay prediction") | |
if model is None or scaler is None: | |
logger.error("Model or scaler not loaded; falling back to default") | |
return { | |
"project": input_data.get("project_name", "Unnamed Project"), | |
"phase": input_data.get("phase", ""), | |
"task": input_data.get("task", ""), | |
"delay_probability": 50.0, | |
"ai_insights": "Model unavailable; please check deployment.", | |
"high_risk_phases": [], | |
"weather_condition": "Unknown" | |
} | |
phase = input_data.get("phase", "") | |
task = input_data.get("task", "") | |
weather_condition = input_data.get("weather_condition", get_weather_condition(input_data.get("weather_impact_score", 0))) | |
# Prepare input features | |
phase_mapping = {"Planning": 0, "Design": 1, "Construction": 2} | |
skill_mapping = {"Low": 0, "Medium": 1, "High": 2} | |
try: | |
features = np.array([[ | |
input_data.get("current_progress", 0), | |
input_data.get("task_expected_duration", 0), | |
input_data.get("task_actual_duration", 0), | |
input_data.get("workforce_gap", 0), | |
input_data.get("weather_impact_score", 0), | |
skill_mapping.get(input_data.get("workforce_skill_level", "Medium"), 1), | |
phase_mapping.get(phase, 0) | |
]]) | |
except KeyError as e: | |
logger.error(f"Invalid input data: {str(e)}") | |
return { | |
"project": input_data.get("project_name", "Unnamed Project"), | |
"phase": phase, | |
"task": task, | |
"delay_probability": 50.0, | |
"ai_insights": f"Invalid input: {str(e)}", | |
"high_risk_phases": [], | |
"weather_condition": weather_condition | |
} | |
# Standardize and reshape | |
try: | |
features_scaled = scaler.transform(features) | |
features_tensor = torch.tensor(features_scaled.reshape(1, 1, -1), dtype=torch.float32).to(device) | |
except Exception as e: | |
logger.error(f"Feature preprocessing failed: {str(e)}") | |
return { | |
"project": input_data.get("project_name", "Unnamed Project"), | |
"phase": phase, | |
"task": task, | |
"delay_probability": 50.0, | |
"ai_insights": f"Preprocessing error: {str(e)}", | |
"high_risk_phases": [], | |
"weather_condition": weather_condition | |
} | |
# Predict | |
with torch.no_grad(): | |
delay_risk = model(features_tensor).cpu().numpy().item() | |
delay_risk = round(max(0, min(delay_risk, 100)), 1) | |
# Generate high_risk_phases | |
task_options = { | |
"Planning": ["Define Scope", "Resource Allocation", "Permit Acquisition"], | |
"Design": ["Architectural Drafting", "Engineering Analysis", "Design Review"], | |
"Construction": ["Foundation Work", "Structural Build", "Utility Installation"] | |
} | |
high_risk_phases = [] | |
if phase in task_options: | |
for t in task_options[phase]: | |
task_risk = delay_risk | |
if t != task: | |
task_risk = min(max(task_risk + (hash(t) % 10 - 5), 0), 100) | |
high_risk_phases.append({ | |
"phase": phase, | |
"task": t, | |
"risk": round(task_risk, 1) | |
}) | |
# Generate insights | |
insights = call_ai_model_for_insights(input_data, delay_risk) | |
logger.info(f"Prediction completed: Delay risk = {delay_risk:.1f}%") | |
return { | |
"project": input_data.get("project_name", "Unnamed Project"), | |
"phase": phase, | |
"task": task, | |
"delay_probability": delay_risk, | |
"ai_insights": "; ".join(insights) if insights else "No significant delay factors detected.", | |
"high_risk_phases": high_risk_phases, | |
"weather_condition": weather_condition | |
} |