OMCP / src /usage_utils.py
cameroncameron's picture
Upload 4 files
6782585 verified
import pandas as pd
import numpy as np
def analyze_and_fill_usage(
usage_data: pd.DataFrame,
gap_threshold: int = 62,
fill_earliest_cutoff: str = "2013-01-01",
min_fill_gap_months: int = 9,
rolling_window_size: int = 4,
rolling_centered: bool = True,
sequence_fill_method: str = "mean",
post_missing_threshold: float = 0.1,
) -> pd.DataFrame:
"""
对 usage_data 进行缺失类型分析,计算 FillStartDate 和 NotGonnaUse,
返回 summary_df,只包含统计指标,不做时间序列填补。
"""
df = usage_data.copy()
df["StartDate"] = pd.to_datetime(df["StartDate"])
df["EndDate"] = pd.to_datetime(df["EndDate"]) - pd.Timedelta(days=2)
# 🔧 修复:自动检测建筑列名,兼容两种格式
building_col = None
if "BuildingName" in df.columns:
building_col = "BuildingName"
elif "Building Name" in df.columns:
building_col = "Building Name"
# 创建标准化列名供后续使用
df["BuildingName"] = df["Building Name"]
else:
raise ValueError("Neither 'BuildingName' nor 'Building Name' column found in usage data")
records = []
# 🔧 修复:始终使用BuildingName进行groupby,确保一致性
for (bld, util), grp in df.groupby(["BuildingName", "CommodityCode"]):
# 完整月份索引
start = grp["StartDate"].min().replace(day=1)
end = grp["StartDate"].max().replace(day=1)
full_idx = pd.date_range(start, end, freq="MS")
flag = pd.Series(0, index=full_idx)
flag.loc[grp["StartDate"].dt.to_period("M").dt.to_timestamp()] = 1
missing = flag[flag == 0].index
# 统计 Random / Sequence 缺失
seq_ranges, rand_dates = [], []
rand_months = seq_months = 0
if missing.empty:
mtype = "No Missing"
else:
gaps = missing.to_series().diff().dt.days.fillna(9999)
gid = (gaps > gap_threshold).cumsum()
for _, seg in missing.to_series().groupby(gid):
if len(seg) > 1:
seq_ranges.append(
f"From {seg.min().strftime('%Y-%m')} to {seg.max().strftime('%Y-%m')}"
)
seq_months += len(seg)
else:
rand_dates.append(seg.iloc[0].strftime("%Y-%m"))
rand_months += 1
has_rand, has_seq = bool(rand_dates), bool(seq_ranges)
if has_rand and has_seq:
mtype = "Both"
elif has_rand:
mtype = "Random"
else:
mtype = "Sequence"
records.append(
{
"BuildingName": bld,
"CommodityCode": util,
"MissingType": mtype,
"SequenceMissingRanges": "; ".join(seq_ranges),
"RandomMissingDates": "; ".join(rand_dates),
"TotalMonths": len(full_idx),
"RandomMissingMonths": rand_months,
"SequenceMissingMonths": seq_months,
"RandomMissingRatio": rand_months / len(full_idx)
if full_idx.size
else 0,
"SequenceMissingRatio": seq_months / len(full_idx)
if full_idx.size
else 0,
}
)
summary_df = pd.DataFrame(records)
# -------------------------------------------------------------
# 计算 FillStartDate
# -------------------------------------------------------------
cutoff_dt = pd.to_datetime(fill_earliest_cutoff)
def get_fill_start(r):
"""
根据用户澄清的正确策略计算FillStartDate:
1. 不管2013年之前是否有序列缺失,都检查2013年之后是否存在≥9个月的序列缺失
2. 如果2013年之后存在≥9个月缺失 → 返回缺失结束时间+1个月
3. 如果2013年之后没有≥9个月缺失 → 返回2013-01-01
"""
try:
# 解析所有序列缺失范围
seq_ranges = []
seq_missing_ranges = r.get("SequenceMissingRanges", "")
if not seq_missing_ranges or pd.isna(seq_missing_ranges):
# 没有序列缺失数据,从2013年开始
return cutoff_dt
for rng in str(seq_missing_ranges).split("; "):
if "to" not in rng:
continue
try:
s, e = rng.replace("From ", "").split(" to ")
sd, ed = pd.to_datetime(s), pd.to_datetime(e)
gap = (ed.to_period("M") - sd.to_period("M")).n + 1
seq_ranges.append((sd, ed, gap))
except Exception:
# 跳过无法解析的日期范围,继续处理其他范围
continue
if not seq_ranges:
# 没有有效的序列缺失,从2013年开始
return cutoff_dt
# 🔍 关键:只关注2013年之后的序列缺失(开始时间>=2013-01-01)
# 🔧 修复:确保正确访问min_fill_gap_months变量
post_2013_missing = [
(sd, ed, gap) for sd, ed, gap in seq_ranges
if sd >= cutoff_dt and gap >= min_fill_gap_months
]
# 🔍 如果2013年之后存在≥9个月的序列缺失
if post_2013_missing:
# 按开始时间排序,取第一个符合条件的缺失
post_2013_missing.sort(key=lambda x: x[0])
sd, ed, gap = post_2013_missing[0]
return ed + pd.offsets.MonthBegin(1)
# 🔍 如果2013年之后没有≥9个月的序列缺失,从2013年开始
return cutoff_dt
except Exception as e:
# 🔧 关键修复:如果任何解析步骤失败,总是返回默认的cutoff_dt
# 这确保get_fill_start永远不会抛出异常,从而避免pandas.apply返回NaT
# 🔧 新增:记录异常信息用于调试
import traceback
print(f"get_fill_start exception for {r.get('BuildingName', 'Unknown')}: {e}")
print(f"Traceback: {traceback.format_exc()}")
return cutoff_dt
summary_df["FillStartDate"] = summary_df.apply(get_fill_start, axis=1)
# -------------------------------------------------------------
# 计算填充后缺失比率
# -------------------------------------------------------------
post_recs = []
for _, r in summary_df.iterrows():
bld, util, fsd = r["BuildingName"], r["CommodityCode"], r["FillStartDate"]
if pd.isna(fsd):
continue
grp2 = df[
(df["BuildingName"] == bld)
& (df["CommodityCode"] == util)
& (df["StartDate"] >= fsd)
]
if grp2.empty:
continue
idx2 = pd.date_range(
fsd.replace(day=1), grp2["StartDate"].max().replace(day=1), freq="MS"
)
flag2 = pd.Series(0, index=idx2)
flag2.loc[grp2["StartDate"].dt.to_period("M").dt.to_timestamp()] = 1
miss2 = flag2[flag2 == 0].index
gaps2 = pd.Series(miss2).diff().dt.days.fillna(9999)
gid2 = (gaps2 > gap_threshold).cumsum()
rm2 = sm2 = 0
for _, seg2 in pd.Series(miss2).groupby(gid2):
if len(seg2) > 1:
sm2 += len(seg2)
else:
rm2 += 1
post_recs.append(
{
"BuildingName": bld,
"CommodityCode": util,
"PostTotalMonths": len(idx2),
"PostRandomMissingMonths": rm2,
"PostSequenceMissingMonths": sm2,
"PostRandomMissingRatio": rm2 / len(idx2) if idx2.size else 0,
"PostSequenceMissingRatio": sm2 / len(idx2) if idx2.size else 0,
}
)
post_df = pd.DataFrame(post_recs)
# 🔧 修复:如果post_df为空,需要创建包含所有必要列的空DataFrame
if post_df.empty:
# 创建一个与summary_df结构匹配的空DataFrame
post_df = pd.DataFrame(columns=[
"BuildingName", "CommodityCode", "PostTotalMonths",
"PostRandomMissingMonths", "PostSequenceMissingMonths",
"PostRandomMissingRatio", "PostSequenceMissingRatio"
])
summary_df = summary_df.merge(post_df, on=["BuildingName", "CommodityCode"], how="left")
# 🔧 修复:填充缺失的post分析列为默认值
post_columns = ["PostTotalMonths", "PostRandomMissingMonths", "PostSequenceMissingMonths",
"PostRandomMissingRatio", "PostSequenceMissingRatio"]
for col in post_columns:
if col not in summary_df.columns:
if "Ratio" in col:
summary_df[col] = 0.0 # 比率列默认为0
else:
summary_df[col] = 0 # 月数列默认为0
summary_df["NotGonnaUse"] = (
(summary_df["PostRandomMissingRatio"] > post_missing_threshold)
| (summary_df["PostSequenceMissingRatio"] > post_missing_threshold)
).astype(int)
return summary_df
def fill_usage_with_sequence_check_strict_mean(
usage_data: pd.DataFrame,
summary_df: pd.DataFrame,
method: str = "mean",
force: bool = False,
fill_earliest_cutoff: str = "1900-01-01",
) -> pd.DataFrame:
"""
根据 summary_df 的 FillStartDate / NotGonnaUse 对 usage_data 进行填补。
参数
----------
usage_data : 原始用量表
summary_df : analyze_and_fill_usage 的结果
method : 'mean' 或 'median'
force : True 时忽略 NotGonnaUse 和 NaT,自动调整起点,保证能输出序列
fill_earliest_cutoff : 当 FillStartDate 为 NaT 时的回退起点(仅在 force=True 时使用)
返回
----------
filled_df : ['BuildingName','CommodityCode','Date','FilledUse']
"""
df = usage_data.copy()
df["StartDate"] = pd.to_datetime(df["StartDate"]).dt.to_period("M").dt.to_timestamp()
# 🔧 修复:自动检测建筑列名,兼容两种格式
building_col = None
if "BuildingName" in df.columns:
building_col = "BuildingName"
elif "Building Name" in df.columns:
building_col = "Building Name"
# 创建标准化列名供后续使用
df["BuildingName"] = df["Building Name"]
else:
raise ValueError("Neither 'BuildingName' nor 'Building Name' column found in usage data")
all_records = []
for _, row in summary_df.iterrows():
bld, util, fsd, drop = (
row["BuildingName"],
row["CommodityCode"],
row["FillStartDate"],
row["NotGonnaUse"],
)
# ───────── 闸门 1 & 2 ─────────
if not force and (drop == 1 or pd.isna(fsd)):
# 严格模式:缺失率过高 或 没有有效起点 → 直接跳过
continue
if force and pd.isna(fsd):
fsd = pd.to_datetime(fill_earliest_cutoff)
# 取 >= fsd 的原始数据(使用标准化的BuildingName列)
grp = df[
(df["BuildingName"] == bld)
& (df["CommodityCode"] == util)
& (df["StartDate"] >= fsd)
]
# ───────── 闸门 3 ─────────
if grp.empty:
if not force:
continue
# 强制模式:回退到该组合最早月份
grp = df[(df["BuildingName"] == bld) & (df["CommodityCode"] == util)]
if grp.empty:
# 数据确实不存在
continue
fsd = grp["StartDate"].min()
last_m = grp["StartDate"].max()
all_months = pd.date_range(fsd, last_m, freq="MS")
monthly = grp.groupby("StartDate")["Use"].sum().reindex(all_months)
base = monthly.dropna()
fill_val = base.median() if method == "median" else base.mean()
filled = monthly.fillna(fill_val).reset_index()
filled.columns = ["Date", "FilledUse"]
filled["BuildingName"] = bld
filled["CommodityCode"] = util
all_records.append(filled)
if not all_records:
return pd.DataFrame(columns=["BuildingName", "CommodityCode", "Date", "FilledUse"])
return pd.concat(all_records, ignore_index=True)
# ===============================================================
# LLM-based Weather Variable Selection Functions
# ===============================================================
# Building Type → Weather Variable Rule Mapping
weather_influence_map = {
"Office": ["temp_mean", "temp_std", "CDD_sum", "clouds_all_mean"],
"Instructional": ["temp_mean", "temp_std", "CDD_sum", "humidity_mean"],
"Residential": ["temp_mean", "HDD_sum", "CDD_sum", "humidity_mean"],
"Health": ["temp_mean", "HDD_sum", "CDD_sum", "humidity_mean"],
"Library": ["temp_mean", "temp_std", "humidity_mean", "clouds_all_mean"],
"Dining": ["temp_mean", "rain_sum", "CDD_sum"],
"Recreation": ["temp_mean", "rain_sum", "wind_speed_mean"],
"Assembly or Theater": ["temp_mean", "wind_speed_mean", "rain_sum"],
"Affiliate": ["temp_mean", "CDD_sum", "rain_sum"],
"Parking Structure": [],
"Infrastructure": [],
"Container": [],
"Mixed": ["temp_mean", "CDD_sum", "humidity_mean"],
"Other": ["temp_mean", "CDD_sum"],
}
def infer_building_type(text: str) -> str:
"""推断建筑类型基于文本描述"""
patterns = {
"Instructional": ["Teaching", "Classroom", "School", "Lecture Hall", "Academic", "Education"],
"Residential": ["Residential", "Apartment", "Dormitory", "Housing", "Student"],
"Office": ["Office", "Office Building", "Administrative", "Admin"],
"Health": ["Hospital", "Medical", "Clinic", "Health"],
"Dining": ["Canteen", "Restaurant", "Dining", "Food", "Kitchen"],
"Recreation": ["Fitness", "Sports", "Entertainment", "Recreation", "Gym"],
"Library": ["Library"],
"Assembly or Theater": ["Theater", "Auditorium", "Performance", "Assembly"],
"Affiliate": ["Affiliate"],
}
text_lower = text.lower()
for btype, keywords in patterns.items():
if any(k.lower() in text_lower for k in keywords):
return btype
return "Mixed"
def construct_weather_prompt_static(
user_description: str, detected_type: str, suggested_vars: list,
gross_area: float, avg_space_sqft: float, workpoint_count: int, floor_count: int
) -> str:
"""构建静态weather prompt"""
scenario_note = """
Weather-Scenario (z-score offset, user will pick one):
• Normal → 0 σ offset (historical monthly mean)
• Hot → +1 σ on temp_mean & CDD_sum, −0.5 σ on humidity_mean
• ColdWet → −1 σ on temp_mean, +1 σ on HDD_sum & humidity_mean
• WindyCloudy → +1 σ on wind_speed_mean & clouds_all_mean
LLM only needs to recommend variables; offsets are applied downstream.
"""
return f"""
You are an expert in building energy modeling and changepoint detection.
{scenario_note}
Building Description (current use only):
{user_description}
Inferred Operation Type: {detected_type}
Structural Information:
- Building Gross Area: {gross_area:,.0f} sq ft
- Average Space Size: {avg_space_sqft:,.0f} sq ft
- Total Workpoint Count: {workpoint_count}
- Floor Count: {floor_count}
Rule-based Suggested Variables: {', '.join(suggested_vars)}
Candidate Weather Variables:
temp_mean · temp_std · HDD_sum · CDD_sum · rain_sum · clouds_all_mean · humidity_mean · wind_speed_mean
Tasks:
1. Select 3–5 variables that best capture energy-use behaviour under the current configuration.
2. Briefly justify each choice.
3. Return a markdown table with columns: Selected Variable | Reason.
"""
def chat_with_ollama(messages: list, model: str = "mistral") -> str:
"""与Ollama API聊天"""
import requests
url = "http://localhost:11434/api/chat"
try:
response = requests.post(
url,
json={"model": model, "messages": messages, "stream": False},
timeout=30
)
response.raise_for_status()
return response.json()["message"]["content"]
except requests.exceptions.RequestException as e:
raise Exception(f"Ollama API error: {str(e)}")
except KeyError:
raise Exception("Invalid response format from Ollama API")
def parse_selected_vars(md: str) -> list:
"""解析markdown表格中的变量列表"""
vars_ = []
for row in md.strip().splitlines():
if row.startswith("|") and not row.startswith("| Selected"):
parts = row.split("|")
if len(parts) > 1:
first = parts[1].strip()
if first and first != '---' and first not in ["Selected Variable", ""]:
vars_.append(first)
return vars_