Spaces:
Sleeping
Sleeping
""" | |
CTF Calculations Module | |
This module contains the CTFCalculator class for calculating Conduction Transfer Function (CTF) | |
coefficients for HVAC load calculations using the implicit Finite Difference Method, enhanced | |
with sol-air temperature calculations accounting for solar radiation, longwave radiation, and | |
dynamic outdoor heat transfer coefficient. | |
Developed by: Dr Majed Abuseif, Deakin University | |
© 2025 | |
""" | |
import numpy as np | |
import scipy.sparse as sparse | |
import scipy.sparse.linalg as sparse_linalg | |
import hashlib | |
import logging | |
import threading | |
from typing import List, Dict, Any, NamedTuple | |
import streamlit as st | |
from enum import Enum | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
class ComponentType(Enum): | |
WALL = "Wall" | |
ROOF = "Roof" | |
FLOOR = "Floor" | |
WINDOW = "Window" | |
SKYLIGHT = "Skylight" | |
class CTFCoefficients(NamedTuple): | |
X: List[float] # Exterior temperature coefficients | |
Y: List[float] # Cross coefficients | |
Z: List[float] # Interior temperature coefficients | |
F: List[float] # Flux history coefficients | |
class CTFCalculator: | |
"""Class to calculate and cache CTF coefficients for building components.""" | |
# Cache for CTF coefficients based on construction properties | |
_ctf_cache = {} | |
_cache_lock = threading.Lock() # Thread-safe lock for cache access | |
def calculate_sky_temperature(T_out: float, dew_point: float, total_sky_cover: float = 0.5) -> float: | |
"""Calculate sky temperature using cloud-cover-dependent model. | |
Args: | |
T_out (float): Outdoor dry-bulb temperature (°C). | |
dew_point (float): Dew point temperature (°C). | |
total_sky_cover (float): Sky cover fraction (0 to 1). | |
Returns: | |
float: Sky temperature (°C), bounded by dew point. | |
References: | |
ASHRAE Handbook—Fundamentals (2021), Chapter 26. | |
""" | |
epsilon_sky = 0.9 + 0.04 * total_sky_cover | |
T_sky = (epsilon_sky * (T_out + 273.15)**4)**0.25 - 273.15 | |
return T_sky if dew_point <= T_out else dew_point | |
def calculate_h_o(wind_speed: float, surface_type: ComponentType) -> float: | |
"""Calculate dynamic outdoor heat transfer coefficient based on wind speed and surface type. | |
Args: | |
wind_speed (float): Wind speed (m/s). | |
surface_type (ComponentType): Type of surface (WALL, ROOF, FLOOR, WINDOW, SKYLIGHT). | |
Returns: | |
float: Outdoor heat transfer coefficient (W/m²·K). | |
References: | |
ASHRAE Handbook—Fundamentals (2021), Chapter 26. | |
""" | |
from app.m_c_data import DEFAULT_WINDOW_PROPERTIES # Delayed import to avoid circular dependency | |
wind_speed = max(min(wind_speed, 20.0), 0.0) # Bound for stability | |
if surface_type in [ComponentType.WALL, ComponentType.FLOOR]: | |
h_o = 8.3 + 4.0 * (wind_speed ** 0.6) # ASHRAE Ch. 26 | |
elif surface_type == ComponentType.ROOF: | |
h_o = 9.1 + 2.8 * wind_speed # ASHRAE Ch. 26 | |
else: # WINDOW, SKYLIGHT | |
h_o = DEFAULT_WINDOW_PROPERTIES["h_o"] | |
return max(h_o, 5.0) # Minimum for stability | |
def calculate_sol_air_temperature(T_out: float, I_t: float, absorptivity: float, emissivity: float, | |
h_o: float, dew_point: float, total_sky_cover: float = 0.5) -> float: | |
"""Calculate sol-air temperature for a surface. | |
Args: | |
T_out (float): Outdoor dry-bulb temperature (°C). | |
I_t (float): Total incident solar radiation (W/m²). | |
absorptivity (float): Surface absorptivity. | |
emissivity (float): Surface emissivity. | |
h_o (float): Outdoor heat transfer coefficient (W/m²·K). | |
dew_point (float): Dew point temperature (°C). | |
total_sky_cover (float): Sky cover fraction (0 to 1). | |
Returns: | |
float: Sol-air temperature (°C). | |
References: | |
ASHRAE Handbook—Fundamentals (2021), Chapter 26. | |
""" | |
sigma = 5.67e-8 # Stefan-Boltzmann constant (W/m²·K⁴) | |
T_sky = CTFCalculator.calculate_sky_temperature(T_out, dew_point, total_sky_cover) | |
T_sol_air = T_out + (absorptivity * I_t - emissivity * sigma * ((T_out + 273.15)**4 - (T_sky + 273.15)**4)) / h_o | |
return T_sol_air | |
def _hash_construction(construction: Dict[str, Any]) -> str: | |
"""Generate a unique hash for a construction based on its properties. | |
Args: | |
construction: Dictionary containing construction properties (name, layers, adiabatic). | |
Returns: | |
str: SHA-256 hash of the construction properties. | |
""" | |
hash_input = f"{construction.get('name', '')}{construction.get('adiabatic', False)}" | |
layers = construction.get('layers', []) | |
for layer in layers: | |
material_name = layer.get('material', '') | |
thickness = layer.get('thickness', 0.0) | |
hash_input += f"{material_name}{thickness}" | |
return hashlib.sha256(hash_input.encode()).hexdigest() | |
def _get_material_properties(cls, material_name: str) -> Dict[str, float]: | |
"""Retrieve material properties from session state. | |
Args: | |
material_name: Name of the material. | |
Returns: | |
Dict containing conductivity, density, specific_heat, absorptivity, emissivity. | |
Returns empty dict if material not found. | |
""" | |
try: | |
materials = st.session_state.project_data.get('materials', {}) | |
material = materials.get('library', {}).get(material_name, materials.get('project', {}).get(material_name)) | |
if not material: | |
logger.error(f"Material '{material_name}' not found in library or project materials.") | |
return {} | |
# Extract required properties | |
thermal_props = material.get('thermal_properties', {}) | |
return { | |
'name': material_name, | |
'conductivity': thermal_props.get('conductivity', 0.0), | |
'density': thermal_props.get('density', 0.0), | |
'specific_heat': thermal_props.get('specific_heat', 0.0), | |
'absorptivity': material.get('absorptivity', 0.6), | |
'emissivity': material.get('emissivity', 0.9) | |
} | |
except Exception as e: | |
logger.error(f"Error retrieving material '{material_name}' properties: {str(e)}") | |
return {} | |
def calculate_ctf_coefficients(cls, component: Dict[str, Any], hourly_data: Dict[str, Any] = None) -> CTFCoefficients: | |
"""Calculate CTF coefficients using implicit Finite Difference Method with sol-air temperature. | |
Note: Per ASHRAE, CTF calculations are skipped for WINDOW and SKYLIGHT components, | |
as they use typical material properties. CTF tables for these components will be added later. | |
Args: | |
component: Dictionary containing component properties from st.session_state.project_data["components"]. | |
hourly_data: Dictionary containing hourly weather data (T_out, dew_point, wind_speed, total_sky_cover, I_t). | |
Returns: | |
CTFCoefficients: Named tuple containing X, Y, Z, and F coefficients. | |
""" | |
# Determine component type | |
comp_type_str = component.get('type', '').lower() # Expected from component dictionary key (e.g., 'walls') | |
comp_type_map = { | |
'walls': ComponentType.WALL, | |
'roofs': ComponentType.ROOF, | |
'floors': ComponentType.FLOOR, | |
'windows': ComponentType.WINDOW, | |
'skylights': ComponentType.SKYLIGHT | |
} | |
component_type = comp_type_map.get(comp_type_str, None) | |
if not component_type: | |
logger.warning(f"Invalid component type '{comp_type_str}' for component '{component.get('name', 'Unknown')}'. Returning zero CTFs.") | |
return CTFCoefficients(X=[0.0], Y=[0.0], Z=[0.0], F=[0.0]) | |
# Validate adiabatic and ground_contact mutual exclusivity | |
if component.get('adiabatic', False) and component.get('ground_contact', False): | |
logger.warning(f"Component {component.get('name', 'Unknown')} has both adiabatic and ground_contact set to True. Treating as adiabatic, setting ground_contact to False.") | |
component['ground_contact'] = False | |
# Skip CTF for adiabatic components | |
if component.get('adiabatic', False): | |
logger.info(f"Skipping CTF calculation for adiabatic {component_type.value} component '{component.get('name', 'Unknown')}'. Returning zero coefficients.") | |
return CTFCoefficients(X=[0.0], Y=[0.0], Z=[0.0], F=[0.0]) | |
# Skip CTF for WINDOW, SKYLIGHT as per ASHRAE; return zero coefficients | |
if component_type in [ComponentType.WINDOW, ComponentType.SKYLIGHT]: | |
logger.info(f"Skipping CTF calculation for {component_type.value} component '{component.get('name', 'Unknown')}'. Using zero coefficients until CTF tables are implemented.") | |
return CTFCoefficients(X=[0.0], Y=[0.0], Z=[0.0], F=[0.0]) | |
# Retrieve construction | |
construction_name = component.get('construction', '') | |
if not construction_name: | |
logger.warning(f"No construction specified for component '{component.get('name', 'Unknown')}' ({component_type.value}). Returning zero CTFs.") | |
return CTFCoefficients(X=[0.0], Y=[0.0], Z=[0.0], F=[0.0]) | |
constructions = st.session_state.project_data.get('constructions', {}) | |
construction = constructions.get('library', {}).get(construction_name, constructions.get('project', {}).get(construction_name)) | |
if not construction or not construction.get('layers'): | |
logger.warning(f"No valid construction or layers found for construction '{construction_name}' in component '{component.get('name', 'Unknown')}' ({component_type.value}). Returning zero CTFs.") | |
return CTFCoefficients(X=[0.0], Y=[0.0], Z=[0.0], F=[0.0]) | |
# Check cache with thread-safe access | |
construction_hash = cls._hash_construction(construction) | |
with cls._cache_lock: | |
if construction_hash in cls._ctf_cache: | |
logger.info(f"Using cached CTF coefficients for construction '{construction_name}'") | |
return cls._ctf_cache[construction_hash] | |
# Collect layer properties | |
thicknesses = [] | |
material_props = [] | |
for layer in construction.get('layers', []): | |
material_name = layer.get('material', '') | |
thickness = layer.get('thickness', 0.0) | |
if thickness <= 0.0: | |
logger.warning(f"Invalid thickness {thickness} for material '{material_name}' in construction '{construction_name}'. Skipping layer.") | |
continue | |
material = cls._get_material_properties(material_name) | |
if not material: | |
logger.warning(f"Skipping layer with material '{material_name}' in construction '{construction_name}' due to missing properties.") | |
continue | |
thicknesses.append(thickness) | |
material_props.append(material) | |
if not thicknesses or not material_props: | |
logger.warning(f"No valid layers with material properties for construction '{construction_name}' in component '{component.get('name', 'Unknown')}'. Returning zero CTFs.") | |
return CTFCoefficients(X=[0.0], Y=[0.0], Z=[0.0], F=[0.0]) | |
# Extract material properties | |
k = [m['conductivity'] for m in material_props] # W/m·K | |
rho = [m['density'] for m in material_props] # kg/m³ | |
c = [m['specific_heat'] for m in material_props] # J/kg·K | |
alpha = [k_i / (rho_i * c_i) if rho_i * c_i > 0 else 0.0 for k_i, rho_i, c_i in zip(k, rho, c)] # Thermal diffusivity (m²/s) | |
absorptivity = material_props[0].get('absorptivity', 0.6) # Use first layer's absorptivity | |
emissivity = material_props[0].get('emissivity', 0.9) # Use first layer's emissivity | |
# Discretization parameters | |
dt = 3600 # 1-hour time step (s) | |
nodes_per_layer = 3 # 2–4 nodes per layer for balance | |
R_in = 0.12 # Indoor surface resistance (m²·K/W, ASHRAE) | |
# Get weather data for sol-air temperature | |
T_out = hourly_data.get('dry_bulb', 25.0) if hourly_data else 25.0 | |
dew_point = hourly_data.get('dew_point', T_out - 5.0) if hourly_data else T_out - 5.0 | |
wind_speed = hourly_data.get('wind_speed', 4.0) if hourly_data else 4.0 | |
total_sky_cover = hourly_data.get('total_sky_cover', 0.5) if hourly_data else 0.0 | |
I_t = hourly_data.get('total_incident_radiation', 0.0) if hourly_data else 0.0 | |
# Calculate dynamic h_o and sol-air temperature | |
h_o = cls.calculate_h_o(wind_speed, component_type) | |
T_sol_air = cls.calculate_sol_air_temperature(T_out, I_t, absorptivity, emissivity, h_o, dew_point, total_sky_cover) | |
R_out = 1.0 / h_o # Outdoor surface resistance based on dynamic h_o | |
logger.info(f"Calculated h_o={h_o:.2f} W/m²·K, T_sol_air={T_sol_air:.2f}°C for component '{component.get('name', 'Unknown')}'") | |
# Calculate node spacing and check stability | |
total_nodes = sum(nodes_per_layer for _ in thicknesses) | |
dx = [t / nodes_per_layer for t in thicknesses] # Node spacing per layer | |
node_positions = [] | |
node_idx = 0 | |
for i, t in enumerate(thicknesses): | |
for j in range(nodes_per_layer): | |
node_positions.append((i, j, node_idx)) # (layer_idx, node_in_layer, global_node_idx) | |
node_idx += 1 | |
# Stability check: Fourier number | |
for i, (a, d) in enumerate(zip(alpha, dx)): | |
if a == 0 or d == 0: | |
logger.warning(f"Invalid thermal diffusivity or node spacing for layer {i} in construction '{construction_name}'. Skipping stability adjustment.") | |
continue | |
Fo = a * dt / (d ** 2) | |
if Fo < 0.33: | |
logger.warning(f"Fourier number {Fo:.3f} < 0.33 for layer {i} ({material_props[i]['name']}). Adjusting node spacing.") | |
dx[i] = np.sqrt(a * dt / 0.33) | |
nodes_per_layer = max(2, int(np.ceil(thicknesses[i] / dx[i]))) | |
dx[i] = thicknesses[i] / nodes_per_layer | |
Fo = a * dt / (dx[i] ** 2) | |
logger.info(f"Adjusted node spacing for layer {i}: dx={dx[i]:.4f} m, Fo={Fo:.3f}") | |
# Build system matrices | |
A = sparse.lil_matrix((total_nodes, total_nodes)) | |
b = np.zeros(total_nodes) | |
node_to_layer = [i for i, _, _ in node_positions] | |
for idx, (layer_idx, node_j, global_idx) in enumerate(node_positions): | |
k_i = k[layer_idx] | |
rho_i = rho[layer_idx] | |
c_i = c[layer_idx] | |
dx_i = dx[layer_idx] | |
if k_i == 0 or rho_i == 0 or c_i == 0 or dx_i == 0: | |
logger.warning(f"Invalid material properties for layer {layer_idx} ({material_props[layer_idx]['name']}) in construction '{construction_name}'. Using default values.") | |
continue | |
if node_j == 0 and layer_idx == 0: # Outdoor surface node | |
A[idx, idx] = 1.0 + 2 * dt * k_i / (dx_i * rho_i * c_i * dx_i) + dt / (rho_i * c_i * dx_i * R_out) | |
A[idx, idx + 1] = -2 * dt * k_i / (dx_i * rho_i * c_i * dx_i) | |
b[idx] = dt / (rho_i * c_i * dx_i * R_out) * T_sol_air # Use sol-air temperature | |
elif node_j == nodes_per_layer - 1 and layer_idx == len(thicknesses) - 1: # Indoor surface node | |
A[idx, idx] = 1.0 + 2 * dt * k_i / (dx_i * rho_i * c_i * dx_i) + dt / (rho_i * c_i * dx_i * R_in) | |
A[idx, idx - 1] = -2 * dt * k_i / (dx_i * rho_i * c_i * dx_i) | |
b[idx] = dt / (rho_i * c_i * dx_i * R_in) # Indoor temp contribution | |
# Add radiant load to indoor surface node (convert kW to W) | |
radiant_load = component.get("radiant_load", 0.0) * 1000 # kW to W | |
if radiant_load != 0 and rho_i * c_i * dx_i != 0: | |
b[idx] += dt / (rho_i * c_i * dx_i) * radiant_load | |
logger.debug(f"Added radiant load {radiant_load:.2f} W to indoor node for component '{component.get('name', 'Unknown')}'") | |
elif radiant_load != 0: | |
logger.warning(f"Invalid material properties (rho={rho_i}, c={c_i}, dx={dx_i}) for radiant load in component '{component.get('name', 'Unknown')}'. Skipping.") | |
elif node_j == nodes_per_layer - 1 and layer_idx < len(thicknesses) - 1: # Interface between layers | |
k_next = k[layer_idx + 1] | |
dx_next = dx[layer_idx + 1] | |
rho_next = rho[layer_idx + 1] | |
c_next = c[layer_idx + 1] | |
if k_next == 0 or dx_next == 0 or rho_next == 0 or c_next == 0: | |
logger.warning(f"Invalid material properties for layer {layer_idx + 1} ({material_props[layer_idx + 1]['name']}) in construction '{construction_name}'. Skipping interface.") | |
continue | |
A[idx, idx] = 1.0 + dt * (k_i / dx_i + k_next / dx_next) / (0.5 * (rho_i * c_i * dx_i + rho_next * c_next * dx_next)) | |
A[idx, idx - 1] = -dt * k_i / (dx_i * 0.5 * (rho_i * c_i * dx_i + rho_next * c_next * dx_next)) | |
A[idx, idx + 1] = -dt * k_next / (dx_next * 0.5 * (rho_i * c_i * dx_i + rho_next * c_next * dx_next)) | |
elif node_j == 0 and layer_idx > 0: # Interface from previous layer | |
k_prev = k[layer_idx - 1] | |
dx_prev = dx[layer_idx - 1] | |
rho_prev = rho[layer_idx - 1] | |
c_prev = c[layer_idx - 1] | |
if k_prev == 0 or dx_prev == 0 or rho_prev == 0 or c_prev == 0: | |
logger.warning(f"Invalid material properties for layer {layer_idx - 1} ({material_props[layer_idx - 1]['name']}) in construction '{construction_name}'. Skipping interface.") | |
continue | |
A[idx, idx] = 1.0 + dt * (k_prev / dx_prev + k_i / dx_i) / (0.5 * (rho_prev * c_prev * dx_prev + rho_i * c_i * dx_i)) | |
A[idx, idx - 1] = -dt * k_prev / (dx_prev * 0.5 * (rho_prev * c_prev * dx_prev + rho_i * c_i * dx_i)) | |
A[idx, idx + 1] = -dt * k_i / (dx_i * 0.5 * (rho_prev * c_prev * dx_prev + rho_i * c_i * dx_i)) | |
else: # Internal node | |
A[idx, idx] = 1.0 + 2 * dt * k_i / (dx_i * rho_i * c_i * dx_i) | |
A[idx, idx - 1] = -dt * k_i / (dx_i * rho_i * c_i * dx_i) | |
A[idx, idx + 1] = -dt * k_i / (dx_i * rho_i * c_i * dx_i) | |
A = A.tocsr() # Convert to CSR for efficient solving | |
# Calculate CTF coefficients (X, Y, Z, F) | |
num_ctf = 12 # Standard number of coefficients | |
X = [0.0] * num_ctf # Exterior temp response | |
Y = [0.0] * num_ctf # Cross response | |
Z = [0.0] * num_ctf # Interior temp response | |
F = [0.0] * num_ctf # Flux history | |
T_prev = np.zeros(total_nodes) # Previous temperatures | |
# Impulse response for exterior temperature (X, Y) | |
for t in range(num_ctf): | |
b_out = b.copy() | |
if t == 0: | |
b_out[0] = dt / (rho[0] * c[0] * dx[0] * R_out) * T_sol_air if rho[0] * c[0] * dx[0] != 0 else 0.0 # Unit outdoor temp impulse with sol-air | |
T = sparse_linalg.spsolve(A, b_out + T_prev) | |
q_in = (T[-1] - 0.0) / R_in # Indoor heat flux (W/m²) | |
Y[t] = q_in | |
q_out = (0.0 - T[0]) / R_out # Outdoor heat flux | |
X[t] = q_out | |
T_prev = T.copy() | |
# Reset for interior temperature (Z) | |
T_prev = np.zeros(total_nodes) | |
for t in range(num_ctf): | |
b_in = b.copy() | |
if t == 0: | |
b_in[-1] = dt / (rho[-1] * c[-1] * dx[-1] * R_in) if rho[-1] * c[-1] * dx[-1] != 0 else 0.0 # Unit indoor temp impulse | |
T = sparse_linalg.spsolve(A, b_in + T_prev) | |
q_in = (T[-1] - 0.0) / R_in | |
Z[t] = q_in | |
T_prev = T.copy() | |
# Flux history coefficients (F) | |
T_prev = np.zeros(total_nodes) | |
for t in range(num_ctf): | |
b_flux = np.zeros(total_nodes) | |
if t == 0: | |
b_flux[-1] = -1.0 / (rho[-1] * c[-1] * dx[-1]) if rho[-1] * c[-1] * dx[-1] != 0 else 0.0 # Unit flux impulse | |
T = sparse_linalg.spsolve(A, b_flux + T_prev) | |
q_in = (T[-1] - 0.0) / R_in | |
F[t] = q_in | |
T_prev = T.copy() | |
ctf = CTFCoefficients(X=X, Y=Y, Z=Z, F=F) | |
with cls._cache_lock: | |
cls._ctf_cache[construction_hash] = ctf | |
logger.info(f"Calculated CTF coefficients for construction '{construction_name}' in component '{component.get('name', 'Unknown')}'") | |
return ctf | |
def calculate_ctf_tables(cls, component: Dict[str, Any]) -> CTFCoefficients: | |
"""Placeholder for future implementation of CTF table lookups for windows and skylights. | |
Args: | |
component: Dictionary containing component properties. | |
Returns: | |
CTFCoefficients: Placeholder zero coefficients until implementation. | |
""" | |
logger.info(f"CTF table calculation for {component.get('type', 'Unknown')} component '{component.get('name', 'Unknown')}' not yet implemented. Returning zero coefficients.") | |
return CTFCoefficients(X=[0.0], Y=[0.0], Z=[0.0], F=[0.0]) |