Spaces:
Running
Running
# Fantasy Baseball Optimizer β’ Final Working Version | |
# ------------------------------------------------- | |
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import pulp | |
import os | |
import tempfile | |
import logging | |
import traceback | |
# ---------- Logging ---------- | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s %(levelname)-8s %(message)s", | |
datefmt="%H:%M:%S" | |
) | |
log = logging.getLogger(__name__) | |
# ---------- Helpers ---------- | |
def cache_file(obj, label): | |
if isinstance(obj, str): return obj | |
d = os.path.join(tempfile.gettempdir(), "fb_cache") | |
os.makedirs(d, exist_ok=True) | |
p = os.path.join(d, label) | |
with open(obj.name, 'rb') as s, open(p, 'wb') as t: | |
t.write(s.read()) | |
return p | |
def swap_name(n): | |
if pd.isna(n) or ',' not in str(n): return str(n).strip() | |
last, first = [x.strip() for x in n.split(',', 1)] | |
return f"{first} {last}" | |
def parse_4(txt): | |
if pd.isna(txt): return ["","","",""] | |
parts = txt.split()[:4] | |
outs = [] | |
for w in parts: | |
if '(' in w and ')' in w: | |
outs.append(w.split('(')[-1][0].upper()) | |
else: outs.append('') | |
return outs + ['']*(4-len(outs)) | |
def enforce_util_only(df): | |
df = df.copy() | |
mask = df['Players'].str.contains('Ohtani', case=False, na=False) | |
df.loc[mask, 'Pos'] = 'UTIL' | |
return df | |
def gradient(v, high_good=True): | |
if not isinstance(v, (int, float)): return '' | |
pct = max(0, min(v/5 if high_good else (5-v)/5, 1)) | |
r, g = int(255*(1-pct)), int(255*pct) | |
return f"background-color:rgba({r},{g},0,0.4)" | |
def color_handed(v): | |
if v=='L': return 'background-color:#3b6ff7;color:#fff' | |
if v=='R': return 'background-color:#d93025;color:#fff' | |
return '' | |
# ---------- Standings ---------- | |
def compute_boosts(std, team): | |
cats = ['R','HR','RBI','SB','AVG','K','W','SV','ERA','WHIP'] | |
std['Team'] = std['Team'].astype(str).str.lower().str.strip() | |
if team.lower() not in std['Team'].values: | |
raise ValueError(f"Team {team} not in standings") | |
row = std.loc[std['Team']==team.lower()].iloc[0] | |
raw_r, boosts = {}, {} | |
rank_strs = [] | |
for c in cats: | |
col = pd.to_numeric(std[c], errors='coerce') | |
val = pd.to_numeric(row[c], errors='coerce') | |
higher = c not in ['ERA','WHIP'] | |
rk = 1 + (col>val).sum() if higher else 1 + (col<val).sum() | |
raw_r[c] = rk | |
suf = "tsnrhtdd"[((rk//10)%10!=1)*(rk%10<4)*(rk%10)::4] | |
rank_strs.append(f"{rk}{suf}") | |
base = (rk-1)/(len(col)-1) if len(col)>1 else 0 | |
mult = 10 if c in cats[:5] else 5 | |
boosts[c] = round(mult * base, 1) | |
df = pd.DataFrame([rank_strs, [boosts[c] for c in cats]], | |
index=['Rank','Boost'], columns=cats) | |
return df, pd.Series(raw_r) | |
def style_standings(df, raw): | |
sty = df.style.hide(axis='index').set_table_styles([ | |
{'selector':'th','props':[('text-align','center')]}, | |
{'selector':'td','props':[('text-align','center')]}]) | |
max_r = raw.max() | |
for c in df.columns: | |
rk = raw[c] | |
pct = (rk-1)/(max_r-1) if max_r>1 else 0 | |
r, g = int(255*pct), int(255*(1-pct)) | |
sty = sty.set_properties(subset=pd.IndexSlice['Rank',c], | |
**{'background-color':f'rgba({r},{g},0,0.4)'}) | |
return sty.format(precision=1, subset=pd.IndexSlice['Boost',:]).to_html() | |
# ---------- Scoring ---------- | |
def add_scores(hit, pit, hpj, ppj, boost_df, mult, scale): | |
hstats=['R','HR','RBI','SB','AVG'] | |
pstats=['K','W','SV','ERA','WHIP'] | |
for df, stats in [(hit,hstats),(hpj,hstats)]: | |
for c in stats: | |
df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0) | |
for df, stats in [(pit,pstats),(ppj,pstats)]: | |
for c in stats: | |
df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0) | |
hmu, hsd = hpj[hstats].mean(), hpj[hstats].std().replace(0,1) | |
pmu, psd = ppj[pstats].mean(), ppj[pstats].std().replace(0,1) | |
for s in hstats: | |
hit[f'Z_{s}'] = (hit[s]-hmu[s])/(hsd[s]/scale) | |
for s in pstats: | |
diff = (pmu[s]-pit[s]) if s in ['ERA','WHIP'] else (pit[s]-pmu[s]) | |
pit[f'Z_{s}'] = diff/(psd[s]/scale) | |
hw = boost_df.loc['Boost',hstats].values*10 | |
pw = boost_df.loc['Boost',pstats].values*5 | |
hit['Boost'] = hit[[f'Z_{s}' for s in hstats]].dot(hw)/hw.sum() | |
pit['Boost'] = pit[[f'Z_{s}' for s in pstats]].dot(pw)/pw.sum() | |
hit['MASH'] = hit['$'].fillna(0) + hit['Boost']*mult | |
pit['MASH'] = pit['$'].fillna(0) + pit['Boost']*mult | |
return hit, pit | |
# ---------- Optimization ---------- | |
def optimize_lineup(df): | |
slots=['C1','C2','1B','2B','3B','SS']+[f'OF{i}' for i in range(1,6)]+['CI','MI','UTIL'] | |
elig={} | |
for i,r in df.iterrows(): | |
p=set(str(r['Pos']).split(',')) | |
v={'UTIL'} | |
if 'C' in p: v|={'C1','C2'} | |
if '1B' in p: v|={'1B','CI'} | |
if '2B' in p: v|={'2B','MI'} | |
if '3B' in p: v|={'3B','CI'} | |
if 'SS' in p: v|={'SS','MI'} | |
if 'OF' in p or any(pp.startswith('OF') for pp in p): v|={f'OF{i}' for i in range(1,6)} | |
elig[i]=v | |
for s in slots: | |
if not any(s in v for v in elig.values()): | |
idx=len(df) | |
df.loc[idx,['Players','Pos','$','Boost','MASH']]=[f'No {s}',s,0,0,-0.001] | |
elig[idx]={s} | |
prob=pulp.LpProblem('L',pulp.LpMaximize) | |
x={(i,s):pulp.LpVariable(f'x_{i}_{s}',cat='Binary') for i in elig for s in elig[i]} | |
for s in slots: | |
prob += pulp.lpSum(x[i,s] for i in elig if s in elig[i]) == 1 | |
for i in elig: | |
prob += pulp.lpSum(x[i,s] for s in elig[i]) <= 1 | |
prob += pulp.lpSum(df.loc[i,'MASH']*x[i,s] for i,s in x) | |
prob.solve(pulp.PULP_CBC_CMD(msg=False)) | |
df['Slot']=pd.Series({i:s for(i,s),v in x.items() if v.value()==1}) | |
slot2pos={**{f'OF{i}':'OF' for i in range(1,6)},'C1':'C','C2':'C','1B':'1B', | |
'2B':'2B','3B':'3B','SS':'SS','CI':'CI','MI':'MI','UTIL':'UTIL'} | |
df['Pos']=df['Slot'].map(slot2pos).fillna('Bench') | |
return df | |
# ---------- Main Process ---------- | |
def run(roster,hproj,pproj,std,team,boost_x,scale): | |
try: | |
rost=pd.read_csv(cache_file(roster,'r.csv')) | |
rost['Players']=rost['Players'].apply(swap_name) | |
hpj=pd.read_csv(cache_file(hproj,'h.csv')) | |
ppj=pd.read_csv(cache_file(pproj,'p.csv')) | |
std=pd.read_csv(cache_file(std,'s.csv')) | |
hpj[['SP1','SP2','SP3','SP4']]=hpj['SP'].apply(parse_4).tolist() | |
hpj['PA']=pd.to_numeric(hpj['PA'], errors='coerce').fillna(0) | |
rost['Owner']=rost['Owner'].fillna('').str.lower().str.strip() | |
rows=rost[rost['Owner']==team.lower()].copy() | |
rows['isP']=rows['Pos'].str.contains('P',na=False) | |
hitters=rows[~rows['isP']].merge(hpj,left_on='id',right_on='NFBCID', | |
how='left',suffixes=('','_proj'),indicator=True) | |
hitters['has_proj']=hitters['_merge']=='both' | |
hitters.drop(columns=['_merge'],inplace=True) | |
pitchers=rows[rows['isP']].merge(ppj,left_on='id',right_on='NFBCID', | |
how='left',suffixes=('','_proj'),indicator=True) | |
pitchers['has_proj']=pitchers['_merge']=='both' | |
pitchers.drop(columns=['_merge'],inplace=True) | |
for st in ['IP','ERA','WHIP']: | |
pr=f'{st}_proj' | |
if pr in pitchers: pitchers[st]=pitchers[pr] | |
hitters=enforce_util_only(hitters) | |
boost_df,raw=compute_boosts(std,team) | |
hitters,pitchers=add_scores(hitters,pitchers,hpj,ppj,boost_df,boost_x,scale) | |
hitters=optimize_lineup(hitters) | |
# Get lineup and bench | |
lineup_players = hitters[hitters['Pos'] != 'Bench'].index | |
line = hitters.loc[lineup_players].copy() | |
slot_order=['C1','C2','1B','2B','SS','3B']+[f'OF{i}' for i in range(1,6)]+['CI','MI','UTIL'] | |
line['Slot'] = pd.Categorical(line['Slot'], categories=slot_order, ordered=True) | |
line = line.sort_values('Slot') | |
# ---------- Reassign OF1βOF5 by descending MASH ---------- | |
of_mask = line['Slot'].isin([f'OF{i}' for i in range(1,6)]) | |
ofs = line[of_mask].sort_values('MASH', ascending=False).copy() | |
other = line[~of_mask].copy() | |
# Reassign best OFs to OF1βOF5 | |
ofs['Slot'] = [f'OF{i+1}' for i in range(len(ofs))] | |
line = pd.concat([ofs, other], ignore_index=True) | |
# Re-sort to maintain the original order | |
line['Slot'] = pd.Categorical(line['Slot'], categories=slot_order, ordered=True) | |
line = line.sort_values('Slot') | |
bench = hitters[ | |
(hitters['has_proj']) & | |
(~hitters.index.isin(lineup_players)) | |
].sort_values('MASH', ascending=False) | |
minors=pd.concat([ | |
hitters[~hitters['has_proj']][['Players','Pos']], | |
pitchers[~pitchers['has_proj']][['Players','Pos']] | |
], ignore_index=True) | |
# Process pitchers | |
pproj = pitchers[pitchers['has_proj']] | |
top9 = pproj.sort_values('MASH', ascending=False).head(9) | |
bp = pproj.sort_values('MASH', ascending=False).iloc[9:] | |
# Style columns | |
lc=['Slot','Players','Pos','MASH','$','PA','SP1','SP2','SP3','SP4','Boost'] | |
bc=['Players','Pos','MASH','$','PA','SP1','SP2','SP3','SP4','Boost'] | |
pc=['Players','Pos','MASH','$','IP','ERA','WHIP','Boost'] | |
def style_df(df, grads=None): | |
df=df.copy() | |
if df.empty: return df.to_html(index=False) | |
df['Boost']=df['Boost'].round(1) | |
sty=df.style.hide(axis='index').set_table_styles([ | |
{'selector':'th','props':[('text-align','center')]}, | |
{'selector':'td','props':[('text-align','center')]}]) | |
nums=df.select_dtypes(include=[np.number]).columns | |
if grads: | |
for c in grads: | |
high_good = False if c in ['ERA','WHIP'] else True | |
sty=sty.map(lambda v,hg=high_good: gradient(v,hg), subset=[c]) | |
for sp in ['SP1','SP2','SP3','SP4']: | |
if sp in df.columns: sty=sty.map(color_handed, subset=[sp]) | |
return sty.format('{:.2f}', subset=nums).to_html() | |
return ( | |
style_standings(boost_df,raw), | |
style_df(line[lc], grads=['MASH','$']), | |
style_df(bench[bc], grads=['MASH','$']), | |
style_df(top9[pc], grads=['MASH','$']), | |
style_df(bp[pc], grads=['MASH','$']), | |
minors.to_html(index=False) | |
) | |
except Exception as e: | |
log.error(e) | |
log.debug(traceback.format_exc()) | |
err=f"<pre>{e}</pre>" | |
return err,err,err,err,err,err | |
# ---------- UI Setup ---------- | |
with gr.Blocks() as demo: | |
gr.Markdown("# π Fantasy Baseball Optimizer v4.10") | |
with gr.Row(): | |
with gr.Column(): | |
r_in=gr.File(label="Roster CSV") | |
h_in=gr.File(label="Hitters CSV") | |
p_in=gr.File(label="Pitchers CSV") | |
s_in=gr.File(label="Standings CSV") | |
t_in=gr.Textbox(label="Team Name") | |
b_in=gr.Slider(0.5,10.0,value=3.0,step=0.5,label="Boost Γ") | |
sc_in=gr.Slider(0.1,2.0,value=1.0,step=0.1,label="StdDev Scale") | |
btn=gr.Button("Optimize π") | |
with gr.Column(): | |
std_html=gr.HTML(label="Standings + Boost") | |
line_html=gr.HTML(label="Lineup") | |
bench_html=gr.HTML(label="Bench Hitters") | |
tp_html=gr.HTML(label="Top 9 Pitchers") | |
bp_html=gr.HTML(label="Bullpen & Reserves") | |
min_html=gr.HTML(label="In the Minors") | |
btn.click( | |
run, | |
inputs=[r_in,h_in,p_in,s_in,t_in,b_in,sc_in], | |
outputs=[std_html,line_html,bench_html,tp_html,bp_html,min_html] | |
) | |
if __name__=="__main__": | |
demo.launch(server_name="0.0.0.0", server_port=7860) |