fpl-solver / engine.py
AnayShukla's picture
Clean Production Release
f7cecf3
import pandas as pd
import numpy as np
import math
from scipy.stats import nbinom
def poisson_probability_of_conceding_2_or_more_goals(lambd):
"""Calculates the probability of conceding 2 or more goals using Poisson distribution."""
p_0 = math.exp(-lambd)
p_1 = lambd * math.exp(-lambd)
return 1 - p_0 - p_1
def poisson_pmf(k, lambd):
"""Calculates the Poisson Probability Mass Function P(X=k)."""
if k < 0:
return 0.0
if lambd < 1e-9: # Treat very small lambda as zero for stability
return 1.0 if k == 0 else 0.0
return (lambd**k * math.exp(-lambd)) / math.factorial(k)
def neg_binom_probability_of_value(expected_mean, value, dispersion=1.0):
"""
Calculates the exact probability (PMF) of getting exactly 'value' events.
Used for: Saves, Goals, Assists.
"""
if expected_mean <= 0:
return 0.0
if dispersion <= 1.0: # Fallback to Poisson if no dispersion
return poisson_pmf(value, expected_mean)
# Convert Mean + Dispersion to n, p
p = 1 / dispersion
n = (expected_mean * p) / (1 - p)
return nbinom.pmf(value, n, p)
def neg_binom_probability_at_least(expected_mean, threshold, dispersion=1.0):
"""
Calculates probability of getting 'threshold' OR MORE events.
Used for: DefCons (CBIT), Recoveries.
"""
if expected_mean <= 0:
return 0.0
if dispersion <= 1.0:
# Use existing Poisson logic if dispersion is low
return 1 - poisson_cdf(threshold - 1, expected_mean)
p = 1 / dispersion
n = (expected_mean * p) / (1 - p)
# Probability of X >= threshold is (1 - CDF(threshold - 1))
return 1 - nbinom.cdf(threshold - 1, n, p)
def calculate_expected_conceded_points(lambd):
"""
Calculates the expected fantasy points from goals conceded based on a
-1 point penalty for every 2 goals.
"""
total_expected_points = 0
max_goals_to_check = 10
for k in range(max_goals_to_check + 1):
prob_k = poisson_pmf(k=k, lambd=lambd)
points_for_k_goals = -(k // 2)
total_expected_points += prob_k * points_for_k_goals
return total_expected_points
def poisson_cdf(k, lambd):
"""Calculates the Poisson Cumulative Distribution Function P(X<=k)."""
if k < 0:
return 0.0
if lambd < 1e-9: # Treat very small lambda as zero for stability
return 1.0 if k >= 0 else 0.0
return sum(poisson_pmf(i, lambd) for i in range(math.floor(k) + 1))
def apply_team_skepticism(df, skepticism_factors):
"""
Applies a skepticism multiplier to a player's base points based on their team.
"""
if not skepticism_factors:
return df
for team_id, multiplier in skepticism_factors.items():
players_on_team = df[df["team"] == team_id].index
df.loc[players_on_team, "base_pts"] *= multiplier
return df
def calculate_single_match_points(
player,
match_row,
xMins_in_match,
points_config,
player_penalty_shares,
is_gk=False,
is_def=False,
is_mid=False,
is_fwd=False,
):
"""
Calculates points for a single match given the xMins and match projections.
Includes full logic for CBIT, CBITR, Penalty Saves, and dynamic BPS.
"""
if xMins_in_match <= 0:
return {"pts": 0.0, "xG": 0.0, "xA": 0.0, "CS": 0.0, "cbit": 0.0, "cbitr": 0.0}
scaling_factor = xMins_in_match / 90.0
player_team_num = player["team"]
player_pos = player["element_type"]
# 1. Identify Home/Away and get Opponent Stats
if player_team_num == match_row["home_team_num"]:
team_proj_goals = match_row["mc_home_goals_mean"]
team_conc_goals = match_row["mc_away_goals_mean"]
team_proj_assists = match_row["mc_home_assists_xa_mean"]
team_proj_cbit = match_row["mc_home_CBIT_mean"]
team_proj_cbitr = match_row["mc_home_CBITR_mean"]
team_proj_saves = match_row["mc_home_keeper_saves_mean"]
team_proj_yc = match_row["mc_home_yc_mean"]
team_proj_rc = match_row["mc_home_rc_mean"]
cs_odds = match_row["home_clean_sheet_odds"]
else:
team_proj_goals = match_row["mc_away_goals_mean"]
team_conc_goals = match_row["mc_home_goals_mean"]
team_proj_assists = match_row["mc_away_assists_xa_mean"]
team_proj_cbit = match_row["mc_away_CBIT_mean"]
team_proj_cbitr = match_row["mc_away_CBITR_mean"]
team_proj_saves = match_row["mc_away_keeper_saves_mean"]
team_proj_yc = match_row["mc_away_yc_mean"]
team_proj_rc = match_row["mc_away_rc_mean"]
cs_odds = match_row["away_clean_sheet_odds"]
# 2. Player Share Calculations
proj_goals = player["xG_share"] * team_proj_goals
proj_assists = player["xA_share"] * team_proj_assists
proj_cbit = player["xCBIT_share"] * team_proj_cbit
proj_cbitr = player["xCBITR_share"] * team_proj_cbitr
proj_saves = 0
proj_pen_saves = 0
if is_gk:
proj_saves = (player["baseline_xSaves_p90"] + team_proj_saves) / 2
proj_pen_saves = player["baseline_pksave_p90"]
# --- GOALS & ASSISTS ---
pts_goals = (
sum(
poisson_pmf(k, proj_goals) * k * points_config["goal"][player_pos]
for k in range(9)
)
* scaling_factor
)
pts_assists = (
sum(
poisson_pmf(k, proj_assists) * k * points_config["assist"] for k in range(9)
)
* scaling_factor
)
# --- CLEAN SHEET & CONCEDED ---
pts_cs = (
cs_odds * points_config["clean_sheet"][player_pos]
if xMins_in_match >= 60
else (cs_odds * points_config["clean_sheet"][player_pos]) * scaling_factor
)
pts_conc = (
calculate_expected_conceded_points(team_conc_goals) * scaling_factor
if (is_gk or is_def) and team_conc_goals is not None
else 0.0
)
# --- CARDS ---
pts_yc = (player["YC_share"] * team_proj_yc * -1) * scaling_factor
pts_rc = (player["RC_share"] * team_proj_rc * -3) * scaling_factor
# --- SAVES & PENALTY SAVES (GK) ---
pts_saves = 0.0
pts_pen_save = 0.0
if is_gk:
expected_saves_pts_unscaled = sum(
neg_binom_probability_of_value(proj_saves, k, dispersion=1.5)
* ((k // 3) * points_config["saves_per_3"])
for k in range(21)
)
pts_saves = expected_saves_pts_unscaled * scaling_factor
expected_pen_saved_pts_unscaled = sum(
poisson_pmf(k, proj_pen_saves) * (k * 5) for k in range(3)
)
pts_pen_save = expected_pen_saved_pts_unscaled * scaling_factor
# --- CBIT & CBITR ---
pts_cbit = (
(
neg_binom_probability_at_least(proj_cbit, 10, dispersion=3.2)
* 2
* scaling_factor
)
if is_def
else 0.0
)
pts_cbitr = 0.0
if is_mid:
pts_cbitr = (
neg_binom_probability_at_least(proj_cbitr, 12, dispersion=2.8)
* 2
* scaling_factor
)
elif is_fwd:
pts_cbitr = (
neg_binom_probability_at_least(proj_cbitr, 12, dispersion=1.7)
* 2
* scaling_factor
)
# --- PENALTY POINTS (Taker) ---
pts_penalty = 0.0
if player_penalty_shares and player["id"] in player_penalty_shares:
pen_share = player_penalty_shares[player["id"]]
base_pen_pts = points_config["penalty_points_per_position"].get(player_pos, 0)
pts_penalty = (base_pen_pts * pen_share) * scaling_factor
# --- APPEARANCE ---
pts_app = 2 if xMins_in_match > 60 else (1 if xMins_in_match > 0 else 0)
# --- BONUS POINTS ---
bps_floor = player["baseline_bps_floor_p90"] * scaling_factor
bps_mins = 6 if xMins_in_match >= 60 else (3 if xMins_in_match > 0 else 0)
scaled_goals = proj_goals * scaling_factor
scaled_assists = proj_assists * scaling_factor
scaled_saves = proj_saves * scaling_factor if is_gk else 0
scaled_pen_saves = proj_pen_saves * scaling_factor if is_gk else 0
scaled_yc = player["YC_share"] * team_proj_yc * scaling_factor
scaled_rc = player["RC_share"] * team_proj_rc * scaling_factor
bps_goals = scaled_goals * (24 if is_fwd else (18 if is_mid else 12))
bps_assists = scaled_assists * 9
bps_cs = cs_odds * 12 if (is_gk or is_def) and xMins_in_match >= 60 else 0
bps_saves = scaled_saves * 2
bps_pen_saves = scaled_pen_saves * 15
bps_cards = (scaled_yc * -3) + (scaled_rc * -9)
total_projected_bps = (
bps_floor
+ bps_mins
+ bps_goals
+ bps_assists
+ bps_cs
+ bps_saves
+ bps_pen_saves
+ bps_cards
)
pts_bonus = total_projected_bps / 29.4 if not is_gk else 0.0
# --- FINAL SUM ---
total_pts = (
pts_goals
+ pts_assists
+ pts_cs
+ pts_conc
+ pts_yc
+ pts_rc
+ pts_saves
+ pts_pen_save
+ pts_cbit
+ pts_cbitr
+ pts_penalty
+ pts_app
+ pts_bonus
)
return {
"pts": total_pts,
"xG": proj_goals * scaling_factor,
"xA": proj_assists * scaling_factor,
"CS": cs_odds if xMins_in_match >= 60 else cs_odds * scaling_factor,
"cbit": proj_cbit * scaling_factor,
"cbitr": proj_cbitr * scaling_factor,
}
def calculate_all_points(
player_df_base,
match_df,
player_penalty_shares,
MINS_SCALING_BONUS,
pos_map,
teams_dict_1,
teams_dict,
points_config,
effective_xmins_overrides,
MINS_THRESHOLD,
RAMP_UP_PERIOD,
decay_rates,
ramp_up_rates,
user_player_status_overrides,
team_skepticism,
effective_availability_multipliers,
):
RAMP_UP_PERIOD = 3
player_df = player_df_base.copy()
final_df_output = pd.DataFrame(
{
"Pos": player_df["element_type"].map(pos_map),
"ID": player_df["id"],
"Name": player_df["web_name"],
"BV": player_df["now_cost"],
"SV": player_df["now_cost"],
"Team": player_df["Team"],
}
)
continuous_xMins_progression = player_df["baseline_xMins"].copy()
has_baseline_xmins_override = getattr(player_df, "attrs", {}).get(
"has_baseline_xmins_override", False
)
all_baseline_overrides = getattr(player_df, "attrs", {}).get(
"all_baseline_overrides", {}
)
unique_gws = sorted(match_df["GW"].unique())
match_projections_col = {index: {} for index in player_df.index}
for gw_idx, gw in enumerate(unique_gws):
if has_baseline_xmins_override and gw == 1:
for index, player in player_df.iterrows():
player_id = player["id"]
if (
player_id in all_baseline_overrides
and "baseline_xMins" in all_baseline_overrides[player_id]
):
continuous_xMins_progression.loc[index] = all_baseline_overrides[
player_id
]["baseline_xMins"]
gw_calc_df = pd.DataFrame(index=player_df.index)
gw_calc_df["team"] = player_df["team"]
gw_calc_df["id"] = player_df["id"]
gw_calc_df["web_name"] = player_df["web_name"]
gw_calc_df["player_name"] = player_df["name"]
gw_calc_df["xG_share"] = player_df["xG_share"]
gw_calc_df["xA_share"] = player_df["xA_share"]
gw_calc_df["baseline_xMins"] = player_df["baseline_xMins"]
gw_calc_df["baseline_bps_floor_p90"] = player_df["baseline_bps_floor_p90"]
gw_calc_df["base_pts"] = 0.0
# VECTORIZED XMINS CALCULATION
player_ids_array = player_df["id"].values
n_players = len(player_ids_array)
status_list = [
user_player_status_overrides.get(pid, {"status": "default"})["status"]
for pid in player_ids_array
]
weeks_out_list = [
user_player_status_overrides.get(pid, {}).get("weeks_out", 0)
for pid in player_ids_array
]
status_array = np.array(status_list, dtype=object)
weeks_out_array = np.array(weeks_out_list)
is_not_starter = status_array == "not_a_starter"
is_suspended = status_array == "suspended"
is_injured = status_array == "injured"
is_default = ~(is_not_starter | is_suspended | is_injured)
baseline_mins_array = player_df["baseline_xMins"].values
prev_continuous_xmins_array = continuous_xMins_progression.values
calculated_xmins_array = np.zeros(n_players, dtype=float)
next_continuous_xmins_array = np.zeros(n_players, dtype=float)
first_gw = min(unique_gws)
is_first_gw = gw == first_gw
is_available_first_gw = ~(is_not_starter | is_suspended | is_injured)
# CASE 1: First GW + Available
if is_first_gw:
mask_first_available = is_available_first_gw
calculated_xmins_array[mask_first_available] = baseline_mins_array[
mask_first_available
]
calculated_xmins_array[is_not_starter] = 0
# CASE 3: Suspended
mask_suspended_during = is_suspended & (gw <= weeks_out_array)
mask_suspended_return = is_suspended & (gw == weeks_out_array + 1)
mask_suspended_after = is_suspended & (gw > weeks_out_array + 1)
calculated_xmins_array[mask_suspended_during] = 0
calculated_xmins_array[mask_suspended_return] = baseline_mins_array[
mask_suspended_return
]
decay_rate_susp = decay_rates.get("suspended", decay_rates.get("default", 0.99))
ramp_rate_susp = ramp_up_rates.get("suspended", ramp_up_rates.get("default", 0))
mask_susp_decay = mask_suspended_after & (
prev_continuous_xmins_array >= MINS_THRESHOLD
)
mask_susp_ramp = mask_suspended_after & (
prev_continuous_xmins_array < MINS_THRESHOLD
)
calculated_xmins_array[mask_susp_decay] = (
prev_continuous_xmins_array[mask_susp_decay] * decay_rate_susp
)
calculated_xmins_array[mask_susp_ramp] = np.minimum(
prev_continuous_xmins_array[mask_susp_ramp] + ramp_rate_susp, 90
)
# CASE 4: Injured
mask_injured_out = is_injured & (gw <= weeks_out_array)
calculated_xmins_array[mask_injured_out] = 0
mask_injured_recovering = is_injured & (gw > weeks_out_array)
weeks_since_injury_array = np.maximum(0, gw - weeks_out_array)
mask_ramp_phase = mask_injured_recovering & (
weeks_since_injury_array <= RAMP_UP_PERIOD
)
calculated_xmins_array[mask_ramp_phase] = (
baseline_mins_array[mask_ramp_phase] / RAMP_UP_PERIOD
) * weeks_since_injury_array[mask_ramp_phase]
mask_post_ramp = mask_injured_recovering & (
weeks_since_injury_array > RAMP_UP_PERIOD
)
decay_rate_default = decay_rates.get("default", 0.99)
ramp_rate_default = ramp_up_rates.get(
"default", ramp_up_rates.get("injured", 0)
)
mask_post_decay = mask_post_ramp & (
prev_continuous_xmins_array >= MINS_THRESHOLD
)
mask_post_ramp_up = mask_post_ramp & (
prev_continuous_xmins_array < MINS_THRESHOLD
)
calculated_xmins_array[mask_post_decay] = (
prev_continuous_xmins_array[mask_post_decay] * decay_rate_default
)
calculated_xmins_array[mask_post_ramp_up] = np.minimum(
prev_continuous_xmins_array[mask_post_ramp_up] + ramp_rate_default, 90
)
# CASE 5: Default/healthy
mask_default_calc = is_default & ~(is_first_gw & is_available_first_gw)
element_type_array = player_df["element_type"].values
is_gk = element_type_array == 1
mask_gk_default = mask_default_calc & is_gk
calculated_xmins_array[mask_gk_default] = prev_continuous_xmins_array[
mask_gk_default
]
mask_outfield_default = mask_default_calc & (~is_gk)
mask_outf_decay = mask_outfield_default & (
prev_continuous_xmins_array >= MINS_THRESHOLD
)
calculated_xmins_array[mask_outf_decay] = (
prev_continuous_xmins_array[mask_outf_decay] * decay_rate_default
)
mask_outf_ramp = (
mask_outfield_default
& (prev_continuous_xmins_array < MINS_THRESHOLD)
& (baseline_mins_array > 0)
)
calculated_xmins_array[mask_outf_ramp] = np.minimum(
prev_continuous_xmins_array[mask_outf_ramp] + ramp_rate_default, 90
)
calculated_xmins_array = np.clip(calculated_xmins_array, 0, 90)
next_continuous_xmins_array = calculated_xmins_array.copy()
# APPLY OVERRIDES AND AVAILABILITY
xMins_for_current_gw_display = calculated_xmins_array.copy()
for idx in range(n_players):
player_id = player_ids_array[idx]
availability_mult = effective_availability_multipliers.get(
player_id, {}
).get(gw, 1.0)
xMins_for_current_gw_display[idx] *= availability_mult
if (
player_id in effective_xmins_overrides
and gw in effective_xmins_overrides[player_id]
):
xMins_for_current_gw_display[idx] = effective_xmins_overrides[
player_id
][gw]
xMins_for_current_gw_display = pd.Series(
xMins_for_current_gw_display, index=player_df.index
)
next_gw_continuous_xMins = pd.Series(
next_continuous_xmins_array, index=player_df.index
)
gw_calc_df[f"{gw}_xMins"] = xMins_for_current_gw_display
# STREAMLINED MATCH SCORING LOOP
gw_matches = match_df[match_df["GW"] == gw]
for index, player in player_df.iterrows():
player_team_num = player["team"]
my_matches = gw_matches[
(gw_matches["home_team_num"] == player_team_num)
| (gw_matches["away_team_num"] == player_team_num)
]
if my_matches.empty:
gw_calc_df.loc[index, "base_pts"] = 0
gw_calc_df.loc[index, f"{gw}_xMins"] = 0
gw_calc_df.loc[index, "gw_xG"] = 0.0
gw_calc_df.loc[index, "gw_xA"] = 0.0
gw_calc_df.loc[index, "gw_CS"] = 0.0
gw_calc_df.loc[index, "gw_cbit"] = 0.0
gw_calc_df.loc[index, "gw_cbitr"] = 0.0
continue
base_gw_mins = gw_calc_df.loc[index, f"{gw}_xMins"]
mins_per_match = (
base_gw_mins * 0.97
if len(my_matches) > 1 and base_gw_mins > 35
else base_gw_mins
)
total_gw_pts = 0
total_gw_xg = 0
total_gw_xa = 0
total_gw_cs = 0
total_gw_cbit = 0
total_gw_cbitr = 0
for _, match_row in my_matches.iterrows():
stats = calculate_single_match_points(
player=player,
match_row=match_row,
xMins_in_match=mins_per_match,
points_config=points_config,
player_penalty_shares=player_penalty_shares,
is_gk=(player["element_type"] == 1),
is_def=(player["element_type"] == 2),
is_mid=(player["element_type"] == 3),
is_fwd=(player["element_type"] == 4),
)
total_gw_pts += stats["pts"]
total_gw_xg += stats["xG"]
total_gw_xa += stats["xA"]
total_gw_cs += stats["CS"]
total_gw_cbit += stats["cbit"]
total_gw_cbitr += stats["cbitr"]
is_home = player_team_num == match_row["home_team_num"]
opp_num = (
match_row["away_team_num"]
if is_home
else match_row["home_team_num"]
)
match_id = (
f"{match_row['home_team_num']}_vs_{match_row['away_team_num']}"
)
match_projections_col[index][match_id] = {
"opponent_team_id": int(opp_num),
"is_home": bool(is_home),
"default_gw": int(gw),
"Pts": round(stats["pts"], 3),
"xMins": round(mins_per_match, 1),
"xG": round(stats["xG"], 3),
"xA": round(stats["xA"], 3),
"CS": round(stats["CS"], 3),
}
gw_calc_df.loc[index, "base_pts"] = total_gw_pts
gw_calc_df.loc[index, "gw_xG"] = total_gw_xg
gw_calc_df.loc[index, "gw_xA"] = total_gw_xa
gw_calc_df.loc[index, "gw_CS"] = total_gw_cs
gw_calc_df.loc[index, "gw_cbit"] = total_gw_cbit
gw_calc_df.loc[index, "gw_cbitr"] = total_gw_cbitr
gw_calc_df = apply_team_skepticism(gw_calc_df, team_skepticism)
gw_calc_df["total_pts"] = gw_calc_df["base_pts"]
final_df_output[f"{gw}_xMins"] = round(gw_calc_df[f"{gw}_xMins"], 0)
final_df_output[f"{gw}_Pts"] = round(gw_calc_df["total_pts"], 2)
final_df_output[f"{gw}_xG"] = round(gw_calc_df["gw_xG"], 2)
final_df_output[f"{gw}_xA"] = round(gw_calc_df["gw_xA"], 2)
final_df_output[f"{gw}_CS"] = gw_calc_df["gw_CS"]
final_df_output[f"{gw}_cbit"] = gw_calc_df["gw_cbit"]
final_df_output[f"{gw}_cbitr"] = gw_calc_df["gw_cbitr"]
continuous_xMins_progression = next_gw_continuous_xMins.copy()
final_df_output["Total Points"] = final_df_output.filter(like="_Pts").sum(axis=1)
final_df_output["Average Points"] = round(
(final_df_output.filter(like="_Pts").sum(axis=1)) / len(unique_gws), 2
)
final_df_output["match_projections"] = pd.Series(match_projections_col)
return final_df_output