kt-test-account's picture
test
bf109c6
from matplotlib.colors import ListedColormap
import streamlit as st
from pathlib import Path
import pandas as pd
import altair as alt
import subprocess
import os
import numpy as np
import datetime
import colorcet as cc
## Save results path
COMP_CACHE = Path("competition_cache/safe-challenge")
results_path = Path("competition_cache/cached_results")
TASKS = {
"video-challenge-pilot-config": ["source"],
"video-challenge-task-1-config": ["source"],
"video-challenge-task-2-config": ["source", "category"],
}
if os.environ.get("WORKSHOP_MODE",None):
WORKSHOP_TEAMS=["GRIP-UNINA","ISPL-Realynx","DASH","Lemma1727","Truebees","Shahidmuneer","Baseline","DX"]
def get_default_teams(teams):
try:
return [t for t in WORKSHOP_TEAMS if t in teams]
except:
return [t for t in teams if "test" not in t.lower()]
valid_splits = ["public", "private", "private_only"]
#####################################################################
## Data loading ##
#####################################################################
## Data loading
def get_max_score(group: pd.DataFrame, metric: str, use_selection: bool = True) -> pd.DataFrame:
if use_selection:
if group["selected"].any():
subset = group[group["selected"]]
else:
subset = group
else:
subset = group
max_idx = subset[metric].idxmax()
return group.loc[max_idx]
@st.cache_data
def get_cmap(name):
ccmap = cc.cm[name]
mpl_cmap = ListedColormap(ccmap(np.linspace(0, 1, 256)), name='fire')
return mpl_cmap
def select_rows(df, metric: str = "balanced_accuracy"):
def select(group):
if group["selected"].any():
return group[group["selected"]].loc[group[group["selected"]][metric].idxmax()]
else:
return group.loc[group[f"{metric}_public"].idxmax()]
return df.groupby("team", group_keys=False).apply(select)
@st.cache_data
def load_results(task_key, best_only, metric="balanced_accuracy",check_discrepancies = False):
to_return = {}
for split in valid_splits:
for score in TASKS.get(task_key):
file_path = f"{results_path}/{task_key}_{score}_{split}_score.csv"
if os.path.exists(file_path):
df = pd.read_csv(file_path)
public_df = pd.read_csv(f"{results_path}/{task_key}_{score}_public_score.csv")
if not best_only:
to_return[f"{split}_{score}_score"] = df
else:
if split == "public":
df = df.sort_values(["team", metric], ascending=False).reset_index(drop=True)
selected_max = (
df.copy()
.groupby("team", group_keys=False)
.apply(get_max_score, metric=metric, use_selection=True)
.sort_values([metric], ascending=False)
.set_index("team")
)
df = (
df.copy()
.groupby("team", group_keys=False)
.apply(get_max_score, metric=metric, use_selection=False)
.sort_values([metric], ascending=False)
.set_index("team")
)
if check_discrepancies:
to_return[f"desc_{split}_{score}_score"] = df[metric] - selected_max[metric]
else:
public_df = (
public_df.sort_values(["team", metric], ascending=False)
.reset_index(drop=True)
.set_index("submission_id")[metric]
)
tmp = df.set_index("submission_id").copy()
tmp = tmp.join(public_df, on=["submission_id"], rsuffix="_public")
tmp = tmp.reset_index()
df = select_rows(tmp,metric = metric)
df = df.sort_values([metric], ascending=False).set_index("team")
to_return[f"{split}_{score}_score"] = df
return to_return
@st.cache_data
def load_submission():
out = []
for task in TASKS:
data = pd.read_csv(f"{results_path}/{task}_source_submissions.csv")
data["task"] = task
out.append(data)
return pd.concat(out, ignore_index=True)
def get_updated_time(file="competition_cache/updated.txt"):
if os.path.exists(file):
return open(file).read()
else:
return "no time file found"
@st.cache_data
def get_volume():
subs = pd.concat(
[pd.read_csv(f"{results_path}/{task}_source_submissions.csv") for task in TASKS],
ignore_index=True,
)
subs["datetime"] = pd.DatetimeIndex(subs["datetime"])
subs["date"] = subs["datetime"].dt.date
subs = subs.groupby(["date", "status_reason"]).size().unstack().fillna(0).reset_index()
return subs
@st.cache_data
def make_heatmap(results, label="generated", symbol="👤"):
# Assuming df is your wide-format DataFrame (models as rows, datasets as columns)
df_long = results.set_index("team")
team_order = results.index.tolist()
df_long = df_long.loc[:, [c for c in df_long.columns if c.startswith(label) and "accuracy" not in c]]
df_long.columns = [c.replace(f"{label}_", "") for c in df_long.columns]
if "none" in df_long.columns:
df_long = df_long.drop(columns=["none"])
df_long = df_long.reset_index().melt(id_vars="team", var_name="source", value_name="acc")
# Base chart for rectangles
base = alt.Chart(df_long).encode(
x=alt.X("source:O", title="Source", axis=alt.Axis(orient="top", labelAngle=-60)),
y=alt.Y("team:O", title="Team", sort=team_order),
)
# Heatmap rectangles
heatmap = base.mark_rect().encode(
color=alt.Color("acc:Q", scale=alt.Scale(scheme="greens"), title=f"{label} Accuracy")
)
# Text labels
text = base.mark_text(baseline="middle", fontSize=16).encode(
text=alt.Text("acc:Q", format=".2f"),
color=alt.condition(
alt.datum.acc < 0.5, # you can tune this for readability
alt.value("black"),
alt.value("white"),
),
)
# Combine heatmap and text
chart = (heatmap + text).properties(width=600, height=500, title=f"Accuracy on {symbol} {label} sources heatmap")
return chart
@st.cache_data
def load_roc_file(task, submission_ids):
rocs = pd.read_csv(f"{results_path}/{task}_source_rocs.csv")
rocs = rocs[rocs["submission_id"].isin(submission_ids)]
return rocs
@st.cache_data
def get_unique_teams(teams):
return teams.unique().tolist()
@st.cache_data
def filter_teams(temp, selected_team):
mask = temp.loc[:, "team"].isin(selected_team)
return temp.loc[mask]
def make_roc_curves(task, submission_ids):
rocs = load_roc_file(task, submission_ids)
# if rocs["team"].nunique() > 1:
color_field = "team:N"
roc_chart = (
alt.Chart(rocs)
.mark_line()
.encode(
x="fpr", y="tpr", color=alt.Color(color_field, scale=alt.Scale(scheme=color_map)), detail="submission_id:N"
)
)
return roc_chart
#####################################################################
## Page definition ##
#####################################################################
## Set title
st.set_page_config(
page_title="Leaderboard",
initial_sidebar_state="collapsed",
layout="wide", # This makes the app use the full width of the screen
)
## Pull new results or toggle private public if you are an owner
with st.sidebar:
color_map = st.selectbox("colormap", ["paired", "category20", "category20b", "category20c", "set2", "set3"])
st.session_state["colormap"] = color_map
temp = list(cc.cm_n.keys())
colormap_heatmap = st.selectbox("Color Map",options=temp, index = temp.index("gouldian"))
st.session_state["colormap_heatmap"] = colormap_heatmap
top_n_value = st.slider(
"Mean of top N elements",
min_value=2,
max_value=10,
value=5,
step=1,
help="Calculate the mean of the top N elements in each column",
key="top_n_value",
)
st.session_state["top_n"] = top_n_value
hf_token = os.getenv("HF_TOKEN")
st.session_state["hf_token"] = hf_token
password = st.text_input("Admin login:", type="password")
dataset_options = ["public"]
if password == hf_token:
dataset_options = ["public", "private", "private_only"]
if st.button("Pull New Results"):
with st.spinner("Pulling new results", show_time=True):
try:
process = subprocess.Popen(
["python3", "utils.py"],
text=True, # Decode stdout/stderr as text
)
st.info(f"Background task started with PID: {process.pid}")
process.wait()
process.kill()
if process.returncode != 0:
st.error("The process did not finish successfully.")
else:
st.success(f"PID {process.pid} finished!")
# If a user has the right perms, then this clears the cache
load_results.clear()
get_volume.clear()
load_submission.clear()
st.rerun()
except Exception as e:
st.error(f"Error starting background task: {e}")
## Initialize the dataset view state in session_state if it doesn't exist
if "dataset_view" not in st.session_state:
st.session_state.dataset_view = "public"
# Create the selectbox, ensuring the index is valid
current_view = st.session_state.dataset_view
valid_index = dataset_options.index(current_view) if current_view in dataset_options else 0
dataset_view = st.selectbox("Dataset View", options=dataset_options, index=valid_index, key="dataset_view")
# Display the current dataset view
if dataset_view == "private":
st.success("Showing **PRIVATE** scores (all data).")
# Visual indicator for admins in the UI
if password == hf_token:
st.info("🔐 Admin View: You have access to all data")
# Initialize the top_n parameter if not in session_state
# if "top_n_value" not in st.session_state:
# st.session_state.top_n_value = 3
# Add a slider to select the number of top elements to average
elif dataset_view == "private_only":
st.success("Showing **PRIVATE ONLY** scores (excluding public data).")
# Visual indicator for admins in the UI
if password == hf_token:
st.info("🔒 Admin View: You have access to private-only data")
# Initialize the top_n parameter if not in session_state
# if "top_n_value" not in st.session_state:
# st.session_state.top_n_value = 5
# Add a slider to select the number of top elements to average
else:
st.info("Showing **PUBLIC** scores.")
st.session_state["top_n"] = None
# Ensure only admin users can access private data
if dataset_view in ["private", "private_only"] and password == hf_token:
split = dataset_view
# Clear the cache when the dataset view changes
previous_view = st.session_state.get("previous_dataset_view")
if previous_view != dataset_view:
load_results.clear()
st.session_state["previous_dataset_view"] = dataset_view
else:
split = "public"
else:
split = "public"
st.session_state["split"] = split
def show_dataframe_w_format(df, format="compact", top_n=None,colormap_table = False, sort_columns = True, sort_by_top = True, transpose = False, subset = None):
"""
Display a dataframe with formatted columns. If in private mode and top_n is provided,
adds a row showing the mean of the top n values for each column.
Args:
df: Pandas dataframe to display
format: Format string for number columns (default: "compact")
top_n: Optional number of top values to average per column
"""
split = st.session_state.get("split", "public")
# Only add top-n mean row in private mode
if split in ["private", "private_only"] and top_n is not None and isinstance(top_n, int) and top_n > 0:
# Create a copy to avoid modifying the original
df_display = df.copy()
# Calculate the mean of top n values for each column
top_n_means = {}
for col in df.columns:
sorted_values = df[col] # .sort_values(ascending=False)
# Ensure we don't try to take more values than available
actual_n = min(top_n, len(sorted_values))
if actual_n > 0:
top_n_means[col] = sorted_values.iloc[:actual_n].mean()
else:
top_n_means[col] = float("nan")
# Add the mean row as a new row in the dataframe
top_n_means_df = pd.DataFrame([top_n_means], index=[f"Top-{top_n} Mean"])
df_display = pd.concat([top_n_means_df, df_display])
else:
df_display = df
sort_by_top = sort_by_top and (top_n is not None)
if sort_columns:
df_display = df_display.sort_index(axis=1)
if sort_by_top:
sorted_top_n = df_display.iloc[0].sort_values(ascending=False)
df_display = df_display.loc[:,sorted_top_n.index]
if sort_columns and sort_by_top:
public = [c for c in df_display.columns if c.startswith("📢")]
private = [c for c in df_display.columns if not c.startswith("📢")]
sorted_top_n_pub = df_display.iloc[0].loc[public].sort_values(ascending=False)
sorted_top_n_pri = df_display.iloc[0].loc[private].sort_values(ascending=False)
df_display = df_display.loc[:,sorted_top_n_pub.index.tolist() + sorted_top_n_pri.index.tolist()]
if transpose:
df_display = df_display.transpose()
column_config = {c: st.column_config.NumberColumn(c, format=format) for c in df_display.columns}
if colormap_table:
cmap = st.session_state.get("colormap_heatmap")
df_display = df_display.style.highlight_max(axis=int(transpose) , props="font-weight: bold;")
df_display = df_display.background_gradient(cmap=get_cmap(cmap),subset = subset, axis=int(transpose),vmin = 0.5,vmax=1)
# df_display = df_display.set_table_styles([{'selector':'th',
# 'props':[('word-wrap', ' break-word'),
# ('max-width','10px'),
# ( 'text-align', 'left')
# ]
# }])
return st.dataframe(df_display, column_config=column_config)
@st.fragment
def show_leaderboard(task, score: str = "source"):
split = st.session_state.get("split", "public")
results = load_results(task, best_only=True)
source_split_map = {}
if split in ["private", "private_only"]:
_sol_df = pd.read_csv(COMP_CACHE / task / "solution-processed.csv")
pairs_df = _sol_df[["source_og", "split"]].drop_duplicates()
source_split_map = {x: y for x, y in zip(pairs_df["source_og"], pairs_df["split"])}
cols = [
"balanced_accuracy",
"generated_accuracy",
"real_accuracy",
# "pristine_accuracy",
"auc",
"total_time",
"datetime",
"fail_rate",
]
results_for_split_score = results[f"{split}_{score}_score"]
all_teams = get_unique_teams(results_for_split_score.index.to_series())
default = get_default_teams(all_teams)
# st.write(default)
with st.expander("Display Options",expanded=False):
teams = st.multiselect("Teams", options=all_teams, default=default,key=f"ms_lead_{task}")
colormap_table = st.checkbox("Colormap",value=True, key = f"{task}_colormap_table")
transpose = st.checkbox("Transpose", value=True, key = f"{task}_transpose_table")
results_for_split_score = results_for_split_score.loc[results_for_split_score.index.isin(teams)]
column_config = {
"balanced_accuracy": st.column_config.NumberColumn(
"⚖️ Balanced Accruacy",
format="compact",
min_value=0,
# pinned=True,
max_value=1.0,
# width="small",
),
"generated_accuracy": st.column_config.NumberColumn(
"👤 True Postive Rate",
format="compact",
min_value=0,
# pinned=True,
max_value=1.0,
# width="small",
),
"real_accuracy": st.column_config.NumberColumn(
"🧑‍🎤 True Negative Rate",
format="compact",
min_value=0,
# pinned=True,
max_value=1.0,
# width="small",
),
"auc": st.column_config.NumberColumn(
"📐 AUC",
format="compact",
min_value=0,
# pinned=True,
max_value=1.0,
# width="small",
),
"total_time": st.column_config.NumberColumn(
"🕒 Inference Time (s)",
format="compact",
# pinned=True,
# width="small",
),
"datetime": st.column_config.DatetimeColumn(
"🗓️ Submission Date",
format="YYYY-MM-DD",
# width="small",
),
"fail_rate": st.column_config.NumberColumn(
"❌ Fail Rate",
format="compact",
# width="small",
),
}
labels = {"real": "🧑‍🎤", "generated": "👤"}
for c in results_for_split_score.columns:
if "accuracy" in c:
continue
if any(p in c for p in ["generated", "real"]):
s = c.split("_")
pred = s[0]
source = " ".join(s[1:])
column_config[c] = st.column_config.NumberColumn(
labels[pred] + " " + source,
help=c,
format="compact",
min_value=0,
max_value=1.0,
)
sum_tab, by_source_tab = st.tabs(["Summary","By Source"])
with sum_tab:
"#### Summary"
df_summary = results_for_split_score.loc[:, cols]
if colormap_table:
cmap = st.session_state.get("colormap_heatmap")
df_summary = df_summary.style.highlight_max(axis=0 , props="font-weight: bold;", subset = cols[:4])
df_summary = df_summary.background_gradient(cmap=get_cmap(cmap),axis=0,vmin = 0.5,vmax=1,subset = cols[:4] )
st.dataframe(df_summary, column_config=column_config)
with by_source_tab:
f"##### Accuracy Breakdown by Source"
accuracy_types = {
"True positive/negative rate": 0,
"Conditional balanced accuracy": 1,
"AUC": 2,
}
granularity = st.radio(
"accuracy type",
list(accuracy_types.keys()),
key=f"granularity-{task}-{score}",
horizontal=True,
label_visibility="collapsed",
index=0,
)
## Subset the dataset
cols = [
c
for c in results_for_split_score.columns
if "generated_" in c and "accuracy" not in c and "conditional" not in c
]
col_names = [
(
f"📢 {c.replace('generated_', '')}"
if source_split_map.get(c.replace("generated_", ""), "public") == "public"
else f"🔐 {c.replace('generated_', '')}"
)
for c in results_for_split_score.columns
if "generated_" in c and "accuracy" not in c and "conditional" not in c
]
gen_tmp = results_for_split_score.loc[:, cols].copy()
gen_tmp.columns = col_names
cols = [
c
for c in results_for_split_score.columns
if "real_" in c and "accuracy" not in c and "conditional" not in c
]
col_names = [
(
f"📢 {c.replace('real_', '')}"
if source_split_map.get(c.replace("real_", ""), "public") == "public"
else f"🔐 {c.replace('real_', '')}"
)
for c in results_for_split_score.columns
if "real_" in c and "accuracy" not in c and "conditional" not in c
]
real_tmp = results_for_split_score.loc[:, cols].copy()
real_tmp.columns = col_names
## Check cases
if accuracy_types[granularity] == 0:
"#### 👤 True Positive Rate | Generated Source"
# st.dataframe(gen_tmp, column_config=column_config)
top_n = st.session_state.get("top_n", None)
show_dataframe_w_format(gen_tmp, top_n=top_n,colormap_table=colormap_table, transpose=transpose)
"#### 🧑‍🎤 True Negative Rate | Real Source"
# st.dataframe(real_tmp, column_config=column_config)
show_dataframe_w_format(real_tmp, top_n=top_n,colormap_table=colormap_table, transpose=transpose)
elif accuracy_types[granularity] == 1:
"#### 👤 Balanced Accuracy | Generated Source"
tnr = results_for_split_score.loc[:, ["real_accuracy"]]
gen_tmp[:] = (gen_tmp.values + tnr.values) / 2.0
# st.dataframe(gen_tmp, column_config=column_config)
top_n = st.session_state.get("top_n", None)
show_dataframe_w_format(gen_tmp, top_n=top_n,colormap_table=colormap_table, transpose=transpose)
"#### 🧑‍🎤 Balanced Accuracy | Real Source"
tpr = results_for_split_score.loc[:, ["generated_accuracy"]]
real_tmp[:] = (real_tmp.values + tpr.values) / 2.0
# st.dataframe(real_tmp, column_config=column_config)
show_dataframe_w_format(real_tmp, top_n=top_n,colormap_table=colormap_table, transpose=transpose)
else:
cols = [c for c in results_for_split_score.columns if "generated_conditional_auc" in c]
col_names = [
(
f"📢 {c.replace('generated_conditional_auc_', '')}"
if source_split_map.get(c.replace("generated_conditional_auc_", ""), "public") == "public"
else f"🔐 {c.replace('generated_conditional_auc_', '')}"
)
for c in results_for_split_score.columns
if "generated_conditional_auc_" in c
]
gen_tmp = results_for_split_score.loc[:, cols].copy()
gen_tmp.columns = col_names
cols = [c for c in results_for_split_score.columns if "real_conditional_auc" in c]
col_names = [
(
f"📢 {c.replace('real_conditional_auc_', '')}"
if source_split_map.get(c.replace("real_conditional_auc_", ""), "public") == "public"
else f"🔐 {c.replace('real_conditional_auc_', '')}"
)
for c in results_for_split_score.columns
if "real_conditional_auc" in c
]
real_tmp = results_for_split_score.loc[:, cols].copy()
real_tmp.columns = col_names
"#### 👤 Conditional AUC | Generated Source"
# st.dataframe(gen_tmp, column_config=column_config)
top_n = st.session_state.get("top_n", None)
show_dataframe_w_format(gen_tmp, top_n=top_n,colormap_table=colormap_table, transpose=transpose)
"#### 🧑‍🎤 Conditional AUC | Real Source"
# st.dataframe(real_tmp, column_config=column_config)
show_dataframe_w_format(real_tmp, top_n=top_n,colormap_table=colormap_table, transpose=transpose)
def make_roc(results, show_text=False, log_x=False):
results["FA"] = 1.0 - results["real_accuracy"]
chart = (
alt.Chart(results)
.mark_point(filled=True)
.encode(
x=alt.X("FA:Q", title="🧑‍🎤 False Positive Rate", scale=alt.Scale(type = "log" if log_x else "linear",domain=[.005, 1])),
y=alt.Y("generated_accuracy:Q", title="👤 True Positive Rate", scale=alt.Scale(domain=[0.0, 1.0])),
color=alt.Color("team:N", scale=alt.Scale(scheme=color_map)), # Color by categorical field
size=alt.Size(
"total_time:Q", title="🕒 Inference Time", scale=alt.Scale(rangeMin=100)
), # Size by quantitative field
shape=alt.Shape("split:N", title="Split"),
detail=["submission_id", "auc", "balanced_accuracy"],
)
.properties(width=400, height=400, title="Detection vs False Alarm vs Inference Time")
)
if show_text:
text = (
alt.Chart(results)
.mark_text(
align="right",
fontSize=14,
dx=-5, # shift text to right of point
dy=-5, # shift text slightly up
)
.encode(
x=alt.X("FA:Q"),#, title="🧑‍🎤 False Positive Rate", scale=alt.Scale(domain=[0, 1])),
y=alt.Y("generated_accuracy:Q", title="👤 True Positive Rate", scale=alt.Scale(domain=[0, 1])),
color=alt.Color("team:N", scale=alt.Scale(scheme=color_map)), # Color by categorical field
text="team",
)
)
chart = chart + text
diag_line = (
alt.Chart(pd.DataFrame(dict(tpr=np.linspace(0,1,100), fpr=np.linspace(0,1,100))))
.mark_line(color="lightgray", strokeDash=[8, 4], size=1)
.encode(x="fpr", y="tpr")
)
diag_line2 = (
alt.Chart(pd.DataFrame(dict(tpr=np.linspace(1,0,100), fpr=np.linspace(0,1,100))))
.mark_line(color="lightblue", strokeDash=[8, 4], size=1)
.encode(x="fpr", y="tpr")
)
return chart + diag_line + diag_line2
def make_acc(results, show_text=False, metric_spec=("balanced_accuracy", "Balanced Accuracy")):
metric, metric_title = metric_spec
results = results.loc[results["total_time"] >= 0]
chart = (
alt.Chart(results)
.mark_point(size=200, filled=True)
.encode(
x=alt.X("total_time:Q", title="🕒 Inference Time (sec)", scale=alt.Scale(type="log", domain=[1000, 20000])),
y=alt.Y(
f"{metric}:Q",
title=metric_title,
scale=alt.Scale(domain=[0.4, 1]),
),
shape=alt.Shape("split:N", title="Split"),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
)
.properties(width=400, height=400, title=f"Inference Time vs {metric_title}")
)
if show_text:
text = (
alt.Chart(results)
.mark_text(
align="right",
dx=-5, # shift text to right of point
dy=-5, # shift text slightly up
fontSize=14,
)
.encode(
x="total_time:Q",
y=alt.Y(
f"{metric}:Q",
title=metric_title,
scale=alt.Scale(domain=[0.4, 1]),
),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
text="team",
)
)
chart = chart + text
diag_line = (
alt.Chart(pd.DataFrame(dict(t=[100, 20000], y=[0.5, 0.5])))
.mark_line(color="lightgray", strokeDash=[8, 4])
.encode(x="t", y="y")
)
diag_line2 = (
alt.Chart(pd.DataFrame(dict(t=np.linspace(1000,20000,100), y=np.linspace(.5,1.,100))))
.mark_line(color="lightgray", strokeDash=[8, 4])
.encode(x="t", y="y")
)
return chart + diag_line+diag_line2
def make_acc_vs_auc(results, show_text=False, flip=False):
# results = results.loc[results["total_time"] >= 0]
chart = (
alt.Chart(results)
.mark_point(size=200, filled=True)
.encode(
x=alt.X("auc:Q", title="Area Under Curve", scale=alt.Scale(domain=[0.4, 1])),
y=alt.Y(
"balanced_accuracy:Q",
title="Balanced Accuracy",
scale=alt.Scale(domain=[0.4, 1]),
),
shape=alt.Shape("split:N", title="Split"),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
)
.properties(width=400, height=400, title="AUC vs Balanced Accuracy")
)
if flip:
chart = chart.encode(x=chart.encoding.y, y=chart.encoding.x)
if show_text:
text = (
alt.Chart(results)
.mark_text(
align="right",
dx=-5, # shift text to right of point
dy=-5, # shift text slightly up
fontSize=14,
)
.encode(
x=alt.X("auc:Q", title="Area Under Curve", scale=alt.Scale(domain=[0.4, 1])),
y=alt.Y(
"balanced_accuracy:Q",
title="Balanced Accuracy",
scale=alt.Scale(domain=[0.4, 1]),
),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
text="team",
)
)
if flip:
text = text.encode(x=text.encoding.y, y=text.encoding.x)
chart = chart + text
diag_line = (
alt.Chart(pd.DataFrame(dict(x=[0.4, 1.0], y=[0.4, 1.0])))
.mark_line(color="lightgray", strokeDash=[8, 4])
.encode(x="x", y="y")
)
if flip:
diag_line = diag_line.encode(x=diag_line.encoding.y, y=diag_line.encoding.x)
full_chart = chart + diag_line
return full_chart
def make_vs_public(results, show_text=False, other_split=None):
# results = results.loc[results["total_time"] >= 0]
# results.groupby()
chart = (
alt.Chart(results)
.mark_point(size=200, filled=True)
.encode(
x=alt.X("public:Q", title="public", scale=alt.Scale(domain=[0.6, 1])),
y=alt.Y(f"{other_split}:Q", title=f"{other_split}", scale=alt.Scale(domain=[0.6, 1])),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
)
.properties(width=500, height=500, title=f"public vs {other_split}")
)
if show_text:
text = (
alt.Chart(results)
.mark_text(
align="right",
dx=-5, # shift text to right of point
dy=-5, # shift text slightly up
fontSize=14,
)
.encode(
x=alt.X("public:Q"),
y=alt.Y(f"{other_split}:Q"),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
text="team",
)
)
chart = chart + text
diag_line = (
alt.Chart(pd.DataFrame(dict(x=[0.4, 1.0], y=[0.4, 1.0])))
.mark_line(color="lightgray", strokeDash=[8, 4])
.encode(x="x", y="y")
)
full_chart = chart + diag_line
return full_chart
def show_aug_plot(results,log_x = False,show_text=True):
chart = (
alt.Chart(results)
.mark_point(filled=True,size = 200)
.encode(
x=alt.X("fpr:Q", title="🧑‍🎤 False Positive Rate", scale=alt.Scale(type = "log" if log_x else "linear",domain=[.1, 1])),
y=alt.Y("tpr:Q", title="👤 True Positive Rate", scale=alt.Scale(domain=[0.4, 1.0])),
color=alt.Color("aug:N", scale=alt.Scale(scheme=color_map)), # Color by categorical field
detail=["fpr", "tpr", "aug","team"],
)
.properties(width=800, height=600, title="Detection vs False Alarm Per Augmentation")
)
if show_text:
text = (
alt.Chart(results.loc[results.team.str.startswith("top")])
.mark_text(
align="right",
fontSize=14,
dx=-8, # shift text to right of point
dy=-5, # shift text slightly up
)
.encode(
x=alt.X("fpr:Q"),#, title="🧑‍🎤 False Positive Rate", scale=alt.Scale(domain=[0, 1])),
y=alt.Y("tpr:Q"),#, title="👤 True Positive Rate", scale=alt.Scale(domain=[0, 1])),
color=alt.Color("aug:N", scale=alt.Scale(scheme=color_map)), # Color by categorical field
text="aug",
)
)
chart = chart + text
diag_line = (
alt.Chart(pd.DataFrame(dict(tpr=np.linspace(0,1,100), fpr=np.linspace(0,1,100))))
.mark_line(color="lightgray", strokeDash=[8, 4], size=1)
.encode(x="fpr", y="tpr")
)
diag_line2 = (
alt.Chart(pd.DataFrame(dict(tpr=np.linspace(1,0,100), fpr=np.linspace(0,1,100))))
.mark_line(color="lightblue", strokeDash=[8, 4], size=1)
.encode(x="fpr", y="tpr")
)
return (chart + diag_line + diag_line2).interactive()
def get_heatmaps(temp):
h1 = make_heatmap(temp, "generated", symbol="👤")
h2 = make_heatmap(temp, "real", symbol="🧑‍🎤")
st.altair_chart(h1, use_container_width=True)
st.altair_chart(h2, use_container_width=True)
if temp.columns.str.contains("aug", case=False).any():
h3 = make_heatmap(temp, "aug", symbol="🛠️")
st.altair_chart(h3, use_container_width=True)
@st.fragment
def show_augmentations(task, score):
split = st.session_state.get("split", "public")
results = load_results(task, best_only=True)
results_for_split_score = results[f"{split}_{score}_score"]
all_teams = get_unique_teams(results_for_split_score.index.to_series())
teams = st.multiselect("Teams", options=all_teams, default=get_default_teams(all_teams),key=f"ms_aug_{task}")
results_for_split_score = results_for_split_score.loc[results_for_split_score.index.isin(teams)]
# st.dataframe(results_for_split_score)
f"##### Accuracy Breakdown by Category"
accuracy_types = {
"Accuracy": 0,
"AUC": 1,
}
# Create a row with two columns for controls
col1, col2 = st.columns([0.1, 0.9])
with col1:
granularity = st.radio(
"accuracy type",
list(accuracy_types.keys()),
key=f"granularity-{task}-{score}",
horizontal=True,
label_visibility="collapsed",
index=0,
)
show_deltas = False
if split in ["private", "private_only"]:
with col2:
# Add toggle for showing deltas from "none" column
show_deltas = st.toggle(
"Show deltas from 'none' (higher values mean 'none' was **lower**)",
value=False,
key=f"deltas-{task}-{score}",
)
with col2:
colormap_table = st.checkbox("Colormap",value=True, key = f"{task}_colormap_table_aug")
sort_by_top = st.checkbox("Sort by Top N",value=False )
transpose = st.checkbox("Transpose", value=True, key = f"{task}_transpose_aug_table")
## Check cases
if accuracy_types[granularity] == 0:
"#### Balanced Accuracy"
gen_cols = [
c
for c in results_for_split_score.columns
if "generated_" in c and "accuracy" not in c and "conditional" not in c
]
gen_tmp = results_for_split_score.loc[:, gen_cols].copy()
gen_tmp.columns = [
c.replace("generated_", "")
for c in results_for_split_score.columns
if "generated_" in c and "accuracy" not in c and "conditional" not in c
]
real_cols = [
c
for c in results_for_split_score.columns
if "real_" in c and "accuracy" not in c and "conditional" not in c
]
real_tmp = results_for_split_score.loc[:, real_cols].copy()
real_tmp.columns = [
c.replace("real_", "")
for c in results_for_split_score.columns
if "real_" in c and "accuracy" not in c and "conditional" not in c
]
tmp = (gen_tmp + real_tmp) / 2.0
# If toggle is on and "none" column exists, calculate deltas from "none" column
if show_deltas and "none" in tmp.columns:
# Get the "none" column values
none_values = tmp["none"].copy()
# Calculate deltas: none - current_column
for col in tmp.columns:
if col != "none":
tmp[col] = -none_values + tmp[col]
# st.dataframe(tmp)
top_n = st.session_state.get("top_n", None)
show_dataframe_w_format(tmp, top_n=top_n, colormap_table=colormap_table,sort_columns=False, sort_by_top=sort_by_top,transpose=transpose)
# st.dataframe(tmp)
top_n_teams = tmp.sort_values("none", ascending = False).index[:top_n]
# gen_tmp = gen_tmp.sort_values("none", ascending = False)
gen_tmp.loc[f"top-{top_n}",:] = gen_tmp.loc[top_n_teams,:].mean(0)
gen_tmp.columns.name = "aug"
gen_tmp = gen_tmp.stack().to_frame("tpr")#.set_index(["team","aug"])
real_tmp = real_tmp.sort_values("none", ascending = False)
real_tmp.loc[f"top-{top_n}",:] = real_tmp.loc[top_n_teams,:].mean(0)
real_tmp.columns.name = "aug"
real_tmp = real_tmp.stack()
real_tmp = 1-real_tmp
real_tmp = real_tmp.to_frame("fpr")#.set_index(["team","aug"])
tmp = pd.concat([real_tmp,gen_tmp],axis = 1 ).reset_index()
# st.write(tmp)
only_top = st.toggle("Only Top")
if only_top:
tmp = tmp.loc[tmp.team == f"top-{top_n}"]
else:
tmp = tmp.loc[tmp.team.isin( [f"top-{top_n}"] + top_n_teams.tolist())]
def short_names(n):
n = n.replace("none","NONE")
n = n.replace("compression","cm")
n = n.replace("interpolation","interp")
n = n.replace("adjustment","adj")
return n
tmp["aug"] = tmp["aug"].apply(short_names)
show_text = st.toggle("Show Labels")
log_x = st.toggle("FPR on log scale")
tpr_fpr = show_aug_plot(tmp,show_text = show_text, log_x = log_x)
st.altair_chart(tpr_fpr,use_container_width=False)
else:
cols = [c for c in results_for_split_score.columns if "conditional_auc" in c]
col_names = [
c.replace("conditional_auc_", "")
for c in results_for_split_score.columns
if "conditional_auc" in c
]
tmp = results_for_split_score.loc[:, cols].copy()
tmp.columns = col_names
"#### Conditional AUC"
# If toggle is on and "none" column exists, calculate deltas from "none" column
if show_deltas and "none" in tmp.columns:
# Get the "none" column values
none_values = tmp["none"].copy()
# Calculate deltas: none - current_column
for col in tmp.columns:
if col != "none":
tmp[col] = -none_values + tmp[col]
# st.dataframe(tmp)
top_n = st.session_state.get("top_n", None)
show_dataframe_w_format(tmp, top_n=top_n, colormap_table=colormap_table,sort_columns=False, sort_by_top=sort_by_top,transpose=transpose)
@st.fragment
def show_charts(task, score="source"):
show_auc = st.toggle("Show Best w.r.t. AUC", value=False, key=f"toggle auc {task}")
log_x = st.toggle("FPR on Log Scale",value=False,key=f"toggle log {task}")
metric = "auc" if show_auc else "balanced_accuracy"
split = st.session_state.get("split", "public")
hf_token = st.session_state.get("hf_token", None)
results = load_results(task, best_only=True, metric=metric)
temp = results[f"{split}_source_score"].reset_index()
temp_public = results[f"public_source_score"].reset_index()
temp["split"] = split
temp_public["split"] = "public"
teams = get_unique_teams(temp["team"])
default = get_default_teams(teams)
best_only = True
compare = False
if split != "public":
b1, b2 = st.columns([0.2, 0.8])
with b1:
best_only = st.toggle("Best Only", value=True, key=f"best only {task} {score} {split}")
full_curves = st.toggle("Full curve", value=True, key=f"all curves {task}")
compare = st.toggle(f"Compare vs Public",value=False, key=f"compare {task}")
if not best_only:
results = load_results(task, best_only=best_only, metric=metric)
temp = results[f"{split}_source_score"].reset_index()
temp_public = results["public_source_score"].reset_index()
# selected_team = st.pills(
# "Team", ["ALL"] + teams, key=f"teams {task} 1", default=["ALL"], selection_mode="multi"
# )
with b2:
# selected_team = st.pills(
# "Team", ["ALL"] + teams, key=f"teams {task} 2", default=default, selection_mode="multi"
# )
default = get_default_teams(teams)
selected_team = st.multiselect("Teams", options=teams, default=default,key=f"charts_{task}")
if selected_team is None or len(selected_team) == 0:
return
# if "ALL" in selected_team:
# selected_team = ["ALL"]
# if "ALL" not in selected_team:
temp = filter_teams(temp, selected_team)
temp_public = filter_teams(temp_public, selected_team)
# with st.spinner("making plots...", show_time=True):
# st.write(temp)
roc_scatter = make_roc(temp, show_text=best_only & (not compare), log_x = log_x)
acc_vs_time = make_acc(
temp,
show_text=best_only & (not compare),
metric_spec=("auc", "Area Under Curve") if show_auc else ("balanced_accuracy", "Balanced Accuracy"),
)
acc_vs_auc = make_acc_vs_auc(temp, show_text=best_only & (not compare), flip=show_auc)
if split == "private" and hf_token is not None:
if full_curves:
roc_scatter = make_roc_curves(task, temp["submission_id"].values.tolist()) + roc_scatter
st.altair_chart((roc_scatter | acc_vs_time | acc_vs_auc).interactive(), use_container_width=False)
# if compare:
# st.altair_chart(public_vs_private, use_container_width=False)
if compare:
temp["split"] = split
temp_public["split"] = "public"
temp = pd.concat([temp, temp_public], ignore_index=True)
# metric = "balanced_accuracy" if not show_auc else "auc"
temp_vs_public = temp.set_index(["team", "submission_id", "split"])[metric].unstack().reset_index()
best = st.toggle("best")
if best:
temp_vs_public = temp_vs_public.sort_values("public",ascending = False).drop_duplicates("team")
c1,c2 = st.columns(2)
with c1:
public_vs_private = make_vs_public(temp_vs_public, show_text=best, other_split=split)
st.altair_chart(public_vs_private.interactive(), use_container_width=False)
with c2:
diff = "% drop"
temp_vs_public[diff] = 100*(temp_vs_public["public"] - temp_vs_public["private_only"])/temp_vs_public["public"]
cmap = st.session_state.get("colormap_heatmap")
temp_vs_public_style = temp_vs_public.set_index("team").loc[:,["public","private_only",diff]].sort_values("private_only",ascending = False).style.highlight_max(axis=0 , props="font-weight: bold;")
temp_vs_public_style = temp_vs_public_style.background_gradient(cmap=get_cmap(cmap),subset = [diff], axis=1, vmin = 0,vmax = 10)
st.dataframe(temp_vs_public_style, column_config={c:st.column_config.NumberColumn(format= "compact") for c in ["public","private_only",diff]})
st.info(f"loading {temp['submission_id'].nunique()} submissions")
@st.cache_data
def compute_running_max(result_df, teams, metric):
# Group by team and sort by datetime
result_df = result_df.copy()
result_df = result_df.loc[result_df["team"].isin(teams)]
result_df["datetime"] = pd.to_datetime(result_df["datetime"])
return (
result_df.groupby("team")
.apply(lambda a: a.sort_values("datetime").set_index("datetime")[metric].cummax())
.reset_index()
)
@st.fragment
def show_timeline(task, score="source"):
split = st.session_state.get("split", "public")
hf_token = st.session_state.get("hf_token", None)
results = load_results(task, best_only=False)
temp = results[f"{split}_source_score"].reset_index()
all_teams = get_unique_teams(temp["team"])
all_teams = list(filter(lambda a: a!="Baseline",all_teams))
default = [t for t in all_teams if ("test" not in t.lower())]
teams = st.multiselect("Teams", options=all_teams, default=default)
metric = st.selectbox("Metric", ["auc", "balanced_accuracy"], key=f"time {task}")
baseline_val = temp.query("team=='Baseline'")[metric].max()
df = compute_running_max(temp, teams, metric).dropna()
# team_best = df.groupby("team")[metric].max().sort_values(ascending = False)
team_best = df.sort_values([metric,"datetime"],ascending = False).drop_duplicates(["team"])
team_order = team_best["team"].tolist() + ["Baseline"]
random_guess = (
alt.Chart(pd.DataFrame({"datetime": [df["datetime"].min(), df["datetime"].max()], metric: [0.5, 0.5]}))
.mark_line(strokeDash=[4, 4], color="grey", strokeWidth=2)
.encode(
x="datetime:T",
y=f"{metric}:Q",
)
)
# st.write(st.session_state)
baseline_chart = (
alt.Chart(pd.DataFrame({"datetime": [df["datetime"].min(), df["datetime"].max()], "team": "Baseline", metric: [baseline_val,baseline_val]}))
.mark_line(strokeDash=[8, 8], color="darkgray", strokeWidth=2)
.encode(
x="datetime:T",
y=f"{metric}:Q",
color=alt.Color("team:N", scale=alt.Scale(scheme=st.session_state.get("colormap", "paired")),sort=team_order),
)
)
# Create main chart
task_chart = (
alt.Chart(df)
.mark_line(point=True, interpolate='step-after')
.encode(
x=alt.X(
"datetime:T",
title="Submission Date",
),
y=alt.Y(f"{metric}:Q", scale=alt.Scale(domain=[0.5, 1.0])),
color=alt.Color("team:N", scale=alt.Scale(scheme=st.session_state.get("colormap", "paired")),
sort=team_order),
)
.properties(width=800, height=500, title="Best Performance Over Time (Original Content)")
.interactive()
)
if st.checkbox("Show Labels",value=True,key = f"{task} check show timeline"):
team_best.loc[len(team_best)] = {"team":"Baseline", metric:baseline_val, "datetime": df["datetime"].max()}
# st.write(team_best)
text_chart = (
alt.Chart(team_best)
.mark_text(
align="left",
fontSize=14,
dx=5, # shift text to right of point
dy=-5, # shift text slightly up
)
.encode(
x=alt.X(
"datetime:T",
title="Submission Date",
scale = alt.Scale(domain=[df["datetime"].min(),
df["datetime"].max() + datetime.timedelta(days = 4)]),
),
y=alt.Y(f"{metric}:Q", scale=alt.Scale(domain=[0.5, 1.0])),
color=alt.Color("team:N", scale=alt.Scale(scheme=st.session_state.get("colormap", "paired")),
sort=team_order),
text="team",
)
)
# Combine charts and display
st.altair_chart((task_chart +baseline_chart+text_chart).configure_legend(disable=True), use_container_width=True)
# st.altair_chart(task_chart, use_container_width=True)
def make_plots_for_task(task):
if len(TASKS.get(task)) > 1:
t1, t2, t3, t4 = st.tabs(["Tables", "Charts", "Timeline", "Augmentations"])
else:
t1, t2, t3 = st.tabs(["Tables", "Charts", "Timeline"])
t4 = None
with t1:
show_leaderboard(task)
with t2:
show_charts(task, score="source")
with t3:
split = st.session_state.get("split", "public")
if split != "public":
show_timeline(task, score="source")
else:
st.info(f"not available in {split} in mode")
if t4 is not None:
with t4:
show_augmentations(task, score="category")
updated = get_updated_time()
st.markdown(updated)
@st.fragment
def show_task_comparison():
"""Show summary tables for Task 1 and Task 2 side by side."""
split = st.session_state.get("split", "public")
color_map = st.session_state.get("colormap", "paired")
metric = st.selectbox("Metric", ["balanced_accuracy", "auc"])
task1_key = list(TASKS.keys())[1] # video-challenge-task-1-config
task2_key = list(TASKS.keys())[2] # video-challenge-task-2-config
task1_results = load_results(task1_key, best_only=True, metric=metric)
task2_results = load_results(task2_key, best_only=True, metric=metric)
cols = ["balanced_accuracy", "auc","total_time","generated_accuracy","real_accuracy"]
# st.write(task1_results[f"{split}_source_score"])
task1_results_split_source_score = task1_results[f"{split}_source_score"].loc[:,cols]
task2_results_split_source_score = task2_results[f"{split}_source_score"].loc[:,cols]
combined = pd.concat([task1_results_split_source_score, task2_results_split_source_score], axis=1, keys = ["Task 1", "Task 2"])
combined.columns.names = ["Task", "Metric"]
combined = combined.sort_index(level = "Metric",axis = 1).swaplevel(axis=1)
combined = combined.rename(columns={"generated_accuracy":"tpr","real_accuracy":"tnr"})
# .swaplevel(axis = 1)
# st.write(combined.loc[:,["tpr"]])
# st.write(combined)
all_teams = get_unique_teams(combined.index.to_series())
# all_teams_2 = get_unique_teams(task2_results_split_source_score.index.to_series())
# all_teams = list(set(all_teams_1 + all_teams_2))
default = get_default_teams(all_teams)
teams = st.multiselect("Teams", options=all_teams, default=default,key=f"comp_lead")
combined = combined.loc[combined.index.isin(teams)]
task1_results_split_source_score = task1_results_split_source_score.loc[task1_results_split_source_score.index.isin(teams)]
task2_results_split_source_score = task2_results_split_source_score.loc[task2_results_split_source_score.index.isin(teams)]
column_config = {
"balanced_accuracy": st.column_config.NumberColumn(
"⚖️ Balanced Accuracy",
format="compact",
min_value=0,
max_value=1.0,
),
"generated_accuracy": st.column_config.NumberColumn(
"👤 True Positive Rate",
format="compact",
min_value=0,
max_value=1.0,
),
"real_accuracy": st.column_config.NumberColumn(
"🧑‍🎤 True Negative Rate",
format="compact",
min_value=0,
max_value=1.0,
),
"auc": st.column_config.NumberColumn(
"📐 AUC",
format="compact",
min_value=0,
max_value=1.0,
),
"total_time": st.column_config.NumberColumn(
"🕒 Inference Time (s)",
format="compact",
),
"datetime": st.column_config.DatetimeColumn(
"🗓️ Submission Date",
format="YYYY-MM-DD",
),
"fail_rate": st.column_config.NumberColumn(
"❌ Fail Rate",
format="compact",
),
"task1_balanced_accuracy": st.column_config.NumberColumn(
"⚖️ Task 1 Balanced Accuracy",
format="compact",
min_value=0,
max_value=1.0,
),
"task2_balanced_accuracy": st.column_config.NumberColumn(
"⚖️ Task 2 Balanced Accuracy",
format="compact",
min_value=0,
max_value=1.0,
),
"difference": st.column_config.NumberColumn(
"⚖️ Difference (T1-T2)",
format="compact",
),
"percent_change": st.column_config.NumberColumn(
"% Change",
format="+.2%",
),
}
# Create tabs for different views
tables_tab, charts_tab = st.tabs(["Tables", "Charts"])
with tables_tab:
# Create two columns for side-by-side tables
# st.subheader("Performance Comparison: Task 1 vs Task 2")
# col1, col2 = st.columns(2)
# with col1:
# st.subheader("Task 1: Original Content")
# st.dataframe(
# task1_results_split_source_score.loc[:, cols],
# column_config=column_config,
# use_container_width=True,
# )
# with col2:
# st.subheader("Task 2: Post-processed Content")
# st.dataframe(
# task2_results_split_source_score.loc[:, cols],
# column_config=column_config,
# use_container_width=True,
# )
# Add a section for comparison of task performance differences
st.subheader("Performance Analysis")
st.markdown(
"""
Performance comparison between Task 1 (original content) and
Task 2 (post-processed content). A positive difference indicates degraded performance
on post-processed content.
"""
)
# st.write(combined)
# index = combined.columns.get_loc("auc")
combined_styled = combined.loc[:,["balanced_accuracy", "auc","total_time"]].rename(columns={"auc":"📐 AUC", "balanced_accuracy":"⚖️ Balanced Accuracy","total_time":"🕒 Run Time"})
if st.checkbox("Colormap",value=True):
cmap = st.session_state.get("colormap_heatmap")
combined_styled = combined_styled.style.highlight_max(axis=0, subset = ["📐 AUC","⚖️ Balanced Accuracy"] , props="font-weight: bold;")
combined_styled = combined_styled.background_gradient(cmap=get_cmap(cmap),axis=0,vmin = 0.5,vmax=1, subset =["📐 AUC","⚖️ Balanced Accuracy"] )
# optional: bold headers
st.dataframe(combined_styled,column_config={"Task 1" :st.column_config.NumberColumn(format="compact"),"Task 2":st.column_config.NumberColumn(format="compact") } ,use_container_width=True)
# show_dataframe_w_format(combined, top_n=0)
# # Get the datasets for both tasks
# task1_df = task1_results_split_source_score.reset_index()
# task2_df = task2_results_split_source_score.reset_index()
# # Create a combined dataframe for analysis
# common_teams = set(task1_df["team"]) & set(task2_df["team"])
# if common_teams:
# # Filter to teams that appear in both tasks
# task1_filtered = task1_df[task1_df["team"].isin(common_teams)]
# task2_filtered = task2_df[task2_df["team"].isin(common_teams)]
# # Create a comparison dataframe
# comparison_df = pd.DataFrame(
# {
# "team": list(common_teams),
# "task1_balanced_accuracy": [
# task1_filtered[task1_filtered["team"] == team]["balanced_accuracy"].values[0]
# for team in common_teams
# ],
# "task2_balanced_accuracy": [
# task2_filtered[task2_filtered["team"] == team]["balanced_accuracy"].values[0]
# for team in common_teams
# ],
# }
# )
# # Calculate differences and percentage changes
# comparison_df["difference"] = (
# comparison_df["task1_balanced_accuracy"] - comparison_df["task2_balanced_accuracy"]
# )
# comparison_df["percent_change"] = comparison_df["difference"] / comparison_df["task1_balanced_accuracy"]
# # Sort by the absolute difference (to show biggest performance changes first)
# comparison_df = (
# comparison_df.sort_values(by="difference", ascending=False).reset_index(drop=True).set_index("team")
# )
# # Display the comparison table
# show_dataframe_w_format(comparison_df, top_n=0)
# else:
# st.warning("No common teams found across both tasks.")
with charts_tab:
st.subheader("Team Performance Across Tasks")
# Get the datasets for both tasks if not already done
# if "task1_df" not in locals():
# task1_df = task1_results_split_source_score.reset_index()
# task2_df = task2_results_split_source_score.reset_index()
# common_teams = set(task1_df["team"]) & set(task2_df["team"])
# if common_teams:
# Prepare data for the plot
# plot_data = []
# for team in common_teams:
# # Get team's balanced accuracy for each task
# task1_acc = task1_df[task1_df["team"] == team][metric].values[0]
# task2_acc = task2_df[task2_df["team"] == team][metric].values[0]
# # Add points for Task 1
# plot_data.append({"team": team, "task": "Task 1", metric: task1_acc})
# # Add points for Task 2
# plot_data.append({"team": team, "task": "Task 2", metric: task2_acc})
# plot_df = pd.DataFrame(plot_data).set_index(["team", "task"])[metric].unstack().reset_index()
# st.write(combined)
plot_df = combined.loc[:,"auc"]
# plot_df.index.name = "team"
plot_df = plot_df.reset_index()
# st.write(plot_df)
chart = (
alt.Chart(plot_df)
.mark_circle(size=200)
.encode(
x=alt.X("Task 1:Q", title=f"Task 1 AUC", scale=alt.Scale(domain=[0.4, 1])),
y=alt.Y("Task 2:Q", title=f"Task 2 AUC", scale=alt.Scale(domain=[0.4, 1])),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
)
.properties(width=400, height=400, title="Task 1 vs Task 2: AUC")
.interactive()
)
# if show_text:
text = (
alt.Chart(plot_df)
.mark_text(
align="right",
dx=-5, # shift text to right of point
dy=-5, # shift text slightly up
fontSize=14,
)
.encode(
x=alt.X("Task 1:Q"),
y=alt.Y("Task 2:Q"),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
text="team",
)
)
chart = chart + text
diag_line = (
alt.Chart(pd.DataFrame(dict(x=[0.4, 1.0], y=[0.4, 1.0])))
.mark_line(color="lightgray", strokeDash=[8, 4])
.encode(x="x", y="y")
)
# combined[:,"fpr"] = 1 - combined[:,"tpr"]
chart1 = chart + diag_line
# st.altair_chart(, use_container_width=False)
plot_df = combined.unstack().reset_index().set_index(["Task","team","Metric"]).loc[:,0].unstack().reset_index()
plot_df["fpr"] = 1. - plot_df["tnr"]
# st.write(plot_df)
base = (
alt.Chart(plot_df)
.encode(
x=alt.X("fpr", title=f"False Positive Rate", scale=alt.Scale(type = "linear", domain=[0.001, 1])),
y=alt.Y("tpr", title=f"True Positive Rate", scale=alt.Scale(domain=[0., 1])),
shape = alt.Shape("Task:N",scale=alt.Scale(domain=['Task 2', 'Task 1'])),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
)
.properties(width=400, height=400, title="Task 1 vs Task 2: TPR vs FPR")
.interactive()
)
chart = base.mark_line()
point = base.mark_point(filled=True, size = 200)
chart = chart + point
# if show_text:
text = (
alt.Chart(plot_df)
.mark_text(
align="right",
dx=-5, # shift text to right of point
dy=-5, # shift text slightly up
fontSize=14,
)
.encode(
x=alt.X("fpr", title=f"False Positive Rate", scale=alt.Scale(domain=[0., 1])),
y=alt.Y("tpr", title=f"True Positive Rate", scale=alt.Scale(domain=[0., 1])),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
text="team",
)
)
# chart = chart + text
diag_line = (
alt.Chart(pd.DataFrame(dict(tpr=np.linspace(0,1,100), fpr=np.linspace(0,1,100))))
.mark_line(color="lightgray", strokeDash=[8, 4], size=1)
.encode(x="fpr", y="tpr")
)
diag_line2 = (
alt.Chart(pd.DataFrame(dict(tpr=np.linspace(1,0,100), fpr=np.linspace(0,1,100))))
.mark_line(color="lightblue", strokeDash=[8, 4], size=1)
.encode(x="fpr", y="tpr")
)
# combined[:,"fpr"] = 1 - combined[:,"tpr"]
chart2 = chart + diag_line + diag_line2
st.altair_chart(chart1 | chart2, use_container_width=False)
# Create line chart connecting team performances
# lines = (
# alt.Chart(plot_df)
# .mark_line(point=alt.OverlayMarkDef(filled=True, size=100), strokeDash=[4, 2], strokeWidth=2)
# .encode(
# x=alt.X("task:N", title="Task", sort=["Task 1", "Task 2"]),
# y=alt.Y("balanced_accuracy:Q", title="Balanced Accuracy", scale=alt.Scale(domain=[0.4, 1.0])),
# color=alt.Color(
# "team:N", scale=alt.Scale(scheme=color_map_choice), legend=alt.Legend(title="Teams")
# ),
# tooltip=["team:N", "task:N", "balanced_accuracy:Q"],
# )
# .properties(width=700, height=500, title="Performance Changes Across Tasks")
# )
# st.altair_chart(lines, use_container_width=False)
t1, t2, tp, comparison_tab, volume_tab, all_submission_tab, san_check, data_desc = st.tabs(
["**Task 1**", "**Task 2**", "**Pilot Task**", "**Compare Tasks**", "**Submission Volume**", "**All Submissions**","**Sanity Check**","**Data Description**"]
)
with t1:
"*Detection of Synthetic Video Content. Video files are unmodified from the original output from the models or the real sources.*"
make_plots_for_task(list(TASKS.keys())[1])
with t2:
"*Detection of Post-processed Synthetic Video Content. A subset of Task 1 data files are modified with standard post-processing techniques (compression, resizing, etc).*"
make_plots_for_task(list(TASKS.keys())[2])
with tp:
"*Detection of Synthetic Video Content. Video files are unmodified from the original output from the models or the real sources.*"
make_plots_for_task(list(TASKS.keys())[0])
if split in ["private", "private_only"]:
with comparison_tab:
"**Task 1 to Task 2 performance comparison.**"
show_task_comparison()
with volume_tab:
subs = get_volume()
status_lookup = "QUEUED,PROCESSING,SUCCESS,FAILED".split(",")
found_columns = subs.columns.values.tolist()
status_lookup = list(set(status_lookup) & set(found_columns))
st.bar_chart(subs, x="date", y=status_lookup, stack=True)
total_submissions = int(subs.loc[:, status_lookup].fillna(0).values.sum())
st.metric("Total Submissions", value=total_submissions)
st.metric("Duration", f'{(subs["date"].max() - subs["date"].min()).days} days')
@st.fragment
def show_all_submissions():
show_all = st.toggle("Show All Columns", value=False)
data = load_submission()
fields = ["task", "team", "status_reason"]
field_values = {f: data[f].unique().tolist() for f in fields}
selected_fields = {}
for f, v in field_values.items():
selected_fields[f] = st.multiselect(f"Select {f} to Display", v, default=v)
mask = np.ones(len(data)).astype(bool)
for fs, vs in selected_fields.items():
mask &= data[fs].isin(vs)
data = data.loc[mask]
search_str = st.text_input("search", value="")
if search_str != "":
mask_search = (
data.select_dtypes(include=["object"])
.apply(lambda x: x.str.contains(search_str, case=False, na=False))
.any(axis=1)
)
data = data.loc[mask_search]
if not show_all:
columns_to_show = "task,team,datetime,status_reason,submission_repo,submission_id,space_id".split(",")
data = data.loc[:, columns_to_show]
data = data.sort_values("datetime", ascending=False)
# st.write(",".join(data.columns))
st.dataframe(data, hide_index=True)
@st.fragment
def show_san_check():
for task in list(TASKS.keys()):
f"## {task}"
out = load_results(task,best_only=True, metric="balanced_accuracy",check_discrepancies=True)
for k,v in out.items():
if k.startswith("desc"):
f"### {k}"
st.write(v)
if split == "private":
with all_submission_tab:
show_all_submissions()
with san_check:
show_san_check()
@st.fragment
def show_data_desc():
sources = pd.read_csv("competition_cache/safe-challenge/video-challenge-task-1-config/solution-processed.csv")
sources = sources.drop_duplicates(subset = ["source","source_og"])
def fix(el):
s = el["source"]
if s == "r_09":
return "documentary-2"
elif s == "r_07":
return "documentary-1"
else:
return el["source_og"]
sources["source_og"] = sources.apply(fix,axis = 1)
sources = sources.set_index("source_og")["split"]
# st.write(sources)
def color_rows(row):
if row["Split"] == "public":
return ["background-color: darkblue"] * len(row)
else:
return ["background-color: lightcoral"] * len(row)
tab_real, tab_gen, tab_aug = tabs = st.tabs(["Real","Generated","Augmentations"])
with tab_real:
"### Real Sources"
data =pd.read_csv("competition_cache/data_desc/real_video_stats.csv")
data["Avg Duration"] = data["Avg Duration"].apply(lambda a: float(a[:-1]))
data["Source"] = data["Source"].apply(lambda a: a.replace(" ","-"))
data["Split"] = sources.loc[data["Source"].values].values
data = data.sort_values(["Split","Source"],ascending = False)
# data_styled = data.style.apply(color_rows,axis=1)
st.dataframe(data, hide_index = True, height = 800)
with tab_gen:
"### Synthetic Sources"
data =pd.read_csv("competition_cache/data_desc/generated_video_stats.csv")
data = data.drop(columns=["Description"])
data["Avg Duration"] = data["Avg Duration"].apply(lambda a: float(a[:-1]))
data["Source"] = data["Source"].apply(lambda a: a.replace(" ","-").lower())
data["Split"] = sources.loc[data["Source"].values].values
data = data.sort_values(["Split","Source"],ascending = False)
st.dataframe(data, hide_index = True, height = 800)
with tab_aug:
"### Augmentations"
data =pd.read_csv("competition_cache/data_desc/post_processing_stats.csv",on_bad_lines="warn")
st.dataframe(data, hide_index = True, height = 800)
if split == "private":
with data_desc:
show_data_desc()