CEAR / cear_model.py
dotoking's picture
Update cear_model.py
fc19dc9 verified
# cear_model.py
import os
import json
import numpy as np
import pandas as pd
# ---------------- Weight loading ---------------- #
def _load_platform_weights() -> dict:
"""
Load platform weights from platform_weights.json.
Supports multiple key schemes:
- W_C / W_A
- trend_weight / risk_weight
- C_weight / A_weight
"""
script_dir = os.path.dirname(os.path.abspath(__file__))
json_path = os.path.join(script_dir, "platform_weights.json")
if not os.path.exists(json_path):
print("WARNING: platform_weights.json not found. Using default weights.")
# Sensible defaults if file missing
return {
"tiktok": {"W_C": 1.00, "W_A": 1.00},
"instagram": {"W_C": 0.80, "W_A": 0.90},
"youtube": {"W_C": 0.60, "W_A": 0.60},
"twitter": {"W_C": 0.70, "W_A": 0.80},
"reddit": {"W_C": 0.50, "W_A": 0.50},
"facebook": {"W_C": 0.30, "W_A": 0.40},
"other": {"W_C": 0.20, "W_A": 0.30},
}
with open(json_path, "r", encoding="utf-8") as f:
raw = json.load(f)
# Normalize key names into W_C and W_A
norm = {}
for platform, vals in raw.items():
if not isinstance(vals, dict):
vals = {}
w_c = (
vals.get("W_C")
or vals.get("c_weight")
or vals.get("C_weight")
or vals.get("trend_weight")
or 0.0
)
w_a = (
vals.get("W_A")
or vals.get("a_weight")
or vals.get("A_weight")
or vals.get("risk_weight")
or 0.0
)
norm[platform.lower()] = {"W_C": float(w_c), "W_A": float(w_a)}
return norm
PLATFORM_WEIGHTS = _load_platform_weights()
class CEARModel:
"""
Core CEAR scoring model.
Inputs:
user_df: DataFrame with columns:
- 'platform_name': str
- 'minutes_per_week': numeric
- optional 'variety_score': numeric (0–10)
satisfaction: optional float (0–10)
fomo: optional float (0–10)
Returns dict:
{
"C_Score": float,
"A_Risk": float,
"D_Index": float,
"Avg_Variety": float | None,
"Satisfaction": float | None,
"FOMO": float | None,
"Per_Platform_Efficiency": [
{"platform_name": str, "Cultural_Efficiency": float}, ...
]
}
"""
def __init__(self, weights: dict | None = None) -> None:
self.weights = weights if weights is not None else PLATFORM_WEIGHTS
# ---------- internals ---------- #
@staticmethod
def _diminishing_returns(minutes: float) -> float:
"""Log10-based diminishing returns on minutes."""
minutes = max(float(minutes), 0.0)
return float(np.log10(minutes + 1.0))
def _weights_dataframe(self) -> pd.DataFrame:
if not self.weights:
return pd.DataFrame(columns=["platform_name", "W_C", "W_A"])
w_df = pd.DataFrame.from_dict(self.weights, orient="index")
w_df.index = w_df.index.astype(str).str.lower()
w_df.index.name = "platform_name"
w_df = w_df.reset_index()
# Ensure W_C / W_A exist even if missing
if "W_C" not in w_df.columns:
w_df["W_C"] = 0.0
if "W_A" not in w_df.columns:
w_df["W_A"] = 0.0
return w_df[["platform_name", "W_C", "W_A"]]
# ---------- public API ---------- #
def calculate_scores(
self,
user_df: pd.DataFrame,
satisfaction: float | None = None,
fomo: float | None = None,
) -> dict:
if user_df is None or user_df.empty:
return {
"C_Score": 0.0,
"A_Risk": 0.0,
"D_Index": 0.0,
"Avg_Variety": None,
"Satisfaction": satisfaction,
"FOMO": fomo,
"Per_Platform_Efficiency": [],
}
df = user_df.copy()
# Normalize names and convert minutes
df["platform_name"] = (
df["platform_name"].astype(str).str.strip().str.lower()
)
df["minutes_per_week"] = pd.to_numeric(
df["minutes_per_week"], errors="coerce"
).fillna(0.0)
df["minutes_per_week"] = df["minutes_per_week"].clip(lower=0.0)
# Attach weights
w_df = self._weights_dataframe()
df = df.merge(w_df, on="platform_name", how="left")
df[["W_C", "W_A"]] = df[["W_C", "W_A"]].fillna(0.0)
total_mins = float(df["minutes_per_week"].sum())
# 1. Core contributions
df["C_Contrib"] = df.apply(
lambda row: row["W_C"] * self._diminishing_returns(row["minutes_per_week"]),
axis=1,
)
df["A_Contrib"] = df["W_A"] * df["minutes_per_week"]
C_Score = float(df["C_Contrib"].sum())
A_Risk = float(df["A_Contrib"].sum())
# 2. D-Index (effective number of platforms via inverse Herfindahl)
if total_mins > 0:
shares = df["minutes_per_week"] / total_mins
H = float((shares**2).sum())
D_Index = float(1.0 / H) if H > 0 else 0.0
else:
D_Index = 0.0
# 3. Per-platform cultural efficiency (scaled 0–100)
df["Cultural_Efficiency"] = df["C_Contrib"] / df["minutes_per_week"].replace(
0.0, np.nan
)
eff_df = df.loc[
df["minutes_per_week"] > 0, ["platform_name", "Cultural_Efficiency"]
].copy()
eff_df = eff_df.dropna()
if not eff_df.empty:
max_ce = float(eff_df["Cultural_Efficiency"].max())
if max_ce > 0:
eff_df["Cultural_Efficiency"] = (
eff_df["Cultural_Efficiency"] / max_ce * 100.0
)
else:
eff_df["Cultural_Efficiency"] = 0.0
eff_df = eff_df.sort_values("Cultural_Efficiency", ascending=False)
per_platform_eff = eff_df.to_dict("records")
else:
per_platform_eff = []
# 4. Weighted average variety, if provided
avg_variety = None
if "variety_score" in df.columns and total_mins > 0:
if df["variety_score"].notna().any():
avg_variety = float(
np.average(
df["variety_score"].fillna(0.0),
weights=df["minutes_per_week"],
)
)
return {
"C_Score": C_Score,
"A_Risk": A_Risk,
"D_Index": D_Index,
"Avg_Variety": avg_variety,
"Satisfaction": satisfaction,
"FOMO": fomo,
"Per_Platform_Efficiency": per_platform_eff,
}