| """ |
| SensorDataLoader: load and filter sensors_wide.csv for Stage 1 (Farquhar model). |
| Uses only on-site sensor data from the sensor data directory. |
| """ |
|
|
| from pathlib import Path |
| from typing import Optional |
|
|
| import pandas as pd |
|
|
|
|
| |
| STAGE1_COLUMNS = [ |
| "Air1_PAR_ref", |
| "Air1_leafTemperature_ref", |
| "Air1_airTemperature_ref", |
| "Air1_CO2_ref", |
| "Air1_VPD_ref", |
| "Air1_airHumidity_ref", |
| ] |
| |
| STAGE1_OPTIONAL = ["Air1_NDVI_ref", "Air1_PRI_ref", "Air1_rNDVI_ref", "Air1_RENDVI_ref"] |
|
|
| |
| DEFAULT_TIMESTAMP_COL = "time" |
|
|
|
|
| class SensorDataLoader: |
| """Load sensors_wide.csv and provide Stage 1 columns and daytime filter.""" |
|
|
| def __init__( |
| self, |
| data_path: Optional[Path] = None, |
| metadata_path: Optional[Path] = None, |
| ): |
| from config import settings |
|
|
| _default = settings.SENSORS_WIDE_PATH |
| if not _default.exists() and settings.SENSORS_WIDE_SAMPLE_PATH.exists(): |
| _default = settings.SENSORS_WIDE_SAMPLE_PATH |
| self.data_path = data_path or _default |
| self.metadata_path = metadata_path or settings.SENSORS_WIDE_METADATA_PATH |
|
|
| def get_stage1_columns(self) -> list[str]: |
| """Return list of column names required for Stage 1 (Farquhar + CWSI).""" |
| return list(STAGE1_COLUMNS) |
|
|
| def load( |
| self, |
| columns: Optional[list[str]] = None, |
| timestamp_col: Optional[str] = None, |
| ) -> pd.DataFrame: |
| """ |
| Load sensors_wide.csv. If columns is None, load all Stage 1 columns |
| plus timestamp. Columns not present are dropped from the request. |
| """ |
| ts_col = timestamp_col or DEFAULT_TIMESTAMP_COL |
| use_cols = columns if columns is not None else self.get_stage1_columns() |
| use_cols = [c for c in use_cols if c != ts_col] |
| if ts_col not in use_cols: |
| use_cols = [ts_col] + use_cols |
|
|
| df = pd.read_csv(self.data_path, usecols=lambda c: c in use_cols) |
| missing = [c for c in use_cols if c not in df.columns] |
| if missing: |
| raise ValueError( |
| f"Sensor data missing required columns: {missing}. " |
| f"Available: {list(df.columns)[:20]}{'...' if len(df.columns) > 20 else ''}" |
| ) |
| if ts_col in df.columns: |
| df[ts_col] = pd.to_datetime(df[ts_col], utc=True) |
| df = df.sort_values(ts_col).reset_index(drop=True) |
| |
| if "Air1_CO2_ref" in df.columns: |
| df["Air1_CO2_ref"] = df["Air1_CO2_ref"] * 0.7 |
| return df |
|
|
| def filter_daytime( |
| self, |
| df: pd.DataFrame, |
| par_threshold: float = 50.0, |
| par_column: str = "Air1_PAR_ref", |
| ) -> pd.DataFrame: |
| """Keep only rows where PAR > par_threshold (daytime, umol m-2 s-1).""" |
| if par_column not in df.columns: |
| return df |
| return df.loc[df[par_column] > par_threshold].copy() |
|
|