tele_bot / models.py
PRC142004's picture
Upload 21 files
945d0f8 verified
"""
Database Models & Schemas for Climate-Resilient Agricultural Platform
======================================================================
Handles user profiles, alert configurations, and historical data
"""
from pydantic import BaseModel, Field
from typing import Optional, List, Dict
from datetime import datetime
from enum import Enum
# ════════════════════════════════════════════════════════════════════════════
# ENUMS
# ════════════════════════════════════════════════════════════════════════════
class AlertSeverity(str, Enum):
CRITICAL = "CRITICAL"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
INFO = "INFO"
class CropType(str, Enum):
RICE = "RICE"
WHEAT = "WHEAT"
CORN = "CORN"
COTTON = "COTTON"
SUGARCANE = "SUGARCANE"
POTATO = "POTATO"
TOMATO = "TOMATO"
ONION = "ONION"
SOYBEAN = "SOYBEAN"
BANANA = "BANANA"
CITRUS = "CITRUS"
OTHER = "OTHER"
class Season(str, Enum):
RABI = "RABI"
KHARIF = "KHARIF"
ANNUAL = "ANNUAL"
class IrrigationType(str, Enum):
DRIP = "DRIP"
FLOOD = "FLOOD"
SPRINKLER = "SPRINKLER"
FURROW = "FURROW"
# ════════════════════════════════════════════════════════════════════════════
# FARMER PROFILE
# ════════════════════════════════════════════════════════════════════════════
class FarmerProfile(BaseModel):
farmer_id: str
name: str
phone: str
telegram_chat_id: Optional[str] = None
email: Optional[str] = None
village: str
latitude: float
longitude: float
crop_type: CropType
soil_type: str
motor_capacity: float = 10.0
irrigation_type: IrrigationType = IrrigationType.DRIP
season: Season = Season.ANNUAL
crop_age: int = 0
farm_size_hectares: float = 1.0
alert_preferences: Dict[str, bool] = Field(default_factory=lambda: {
"weather": True,
"pest": True,
"irrigation": True,
"sustainability": True,
"climate_warning": True
})
contact_preference: str = "telegram"
threshold_settings: Dict[str, float] = Field(default_factory=lambda: {
"temp_high": 40,
"temp_low": 5,
"wind_speed": 30,
"wind_alert": 15,
"rain_threshold": 10
})
registered_at: datetime = Field(default_factory=datetime.now)
# ════════════════════════════════════════════════════════════════════════════
# ALERT MODELS
# ════════════════════════════════════════════════════════════════════════════
class WeatherAlert(BaseModel):
alert_type: str
temperature: float
humidity: float
wind_speed: float
condition: str
action: str
severity: AlertSeverity
lead_time: str
class PestAlert(BaseModel):
pest_name: str
risk_level: AlertSeverity
confidence: float
recommended_action: str
spray_timing: str
product_suggestion: Optional[str] = None
class IrrigationAlert(BaseModel):
water_needed_m3_per_sqm: float
irrigation_duration_minutes: int
best_time_window: str
urgency: AlertSeverity
reason: str
class SustainabilityAlert(BaseModel):
tip_category: str
action: str
impact: str
implementation_time: str
expected_benefit: str
class ClimateResilienceScore(BaseModel):
total_score: float # 0-100
weather_risk: float
pest_risk: float
water_risk: float
resilience_level: str # GREEN, YELLOW, RED
critical_factors: List[str]
improvement_suggestions: List[str]
class FarmAlert(BaseModel):
alert_id: str
farmer_id: str
timestamp: datetime
severity: AlertSeverity
category: str # weather, pest, irrigation, sustainability, climate
title: str
message: str
detailed_data: Dict
action_required: bool
suggested_action: str
sent: bool = False
sent_via: Optional[List[str]] = None # ["telegram", "sms", "email"]
class AlertHistory(BaseModel):
farmer_id: str
total_alerts_today: int
critical_alerts: int
high_alerts: int
medium_alerts: int
actions_taken: int
last_alert_time: Optional[datetime] = None
# ════════════════════════════════════════════════════════════════════════════
# DECISION ENGINE
# ════════════════════════════════════════════════════════════════════════════
class DashboardResponse(BaseModel):
farmer_id: str
farmer_profile: FarmerProfile
current_conditions: Dict
climate_resilience_score: ClimateResilienceScore
active_alerts: List[FarmAlert]
weather_forecast: List[Dict]
pest_predictions: List[PestAlert]
irrigation_recommendations: IrrigationAlert
sustainability_tips: List[SustainabilityAlert]
generated_at: datetime = Field(default_factory=datetime.now)
next_critical_window: Optional[str] = None
# ════════════════════════════════════════════════════════════════════════════
# API SCHEMAS
# ════════════════════════════════════════════════════════════════════════════
class HealthResponse(BaseModel):
status: str
timestamp: datetime
services: Dict[str, str]
message: str
class WebSocketAlert(BaseModel):
event_type: str # "alert", "update", "subscribe", "unsubscribe"
farmer_id: str
data: Dict
timestamp: datetime = Field(default_factory=datetime.now)