|
import os |
|
import pandas as pd |
|
import requests |
|
import json |
|
from io import StringIO |
|
from datetime import datetime |
|
|
|
from src.assets.text_content import REPO |
|
|
|
def get_github_data(): |
|
""" |
|
Read and process data from CSV files hosted on GitHub. - https://github.com/clembench/clembench-runs |
|
|
|
Returns: |
|
github_data (dict): Dictionary containing: |
|
- "text": List of DataFrames for each version's textual leaderboard data. |
|
- "multimodal": List of DataFrames for each version's multimodal leaderboard data. |
|
- "date": Formatted date of the latest version in "DD Month YYYY" format. |
|
""" |
|
base_repo = REPO |
|
json_url = base_repo + "benchmark_runs.json" |
|
response = requests.get(json_url) |
|
|
|
|
|
if response.status_code != 200: |
|
print(f"Failed to read JSON file: Status Code: {response.status_code}") |
|
return None, None, None, None |
|
|
|
json_data = response.json() |
|
versions = json_data['versions'] |
|
|
|
|
|
version_names = sorted( |
|
[ver['version'] for ver in versions], |
|
key=lambda v: float(v[1:]), |
|
reverse=True |
|
) |
|
print(f"Found {len(version_names)} versions from get_github_data(): {version_names}.") |
|
|
|
|
|
latest_version = version_names[0] |
|
latest_date = next( |
|
ver['date'] for ver in versions if ver['version'] == latest_version |
|
) |
|
formatted_date = datetime.strptime(latest_date, "%Y/%m/%d").strftime("%d %b %Y") |
|
|
|
|
|
github_data = {} |
|
|
|
|
|
text_dfs = [] |
|
mm_dfs = [] |
|
|
|
for version in version_names: |
|
|
|
|
|
text_url = f"{base_repo}{version}/results.csv" |
|
csv_response = requests.get(text_url) |
|
if csv_response.status_code == 200: |
|
df = pd.read_csv(StringIO(csv_response.text)) |
|
df = process_df(df) |
|
df = df.sort_values(by=df.columns[1], ascending=False) |
|
text_dfs.append(df) |
|
else: |
|
print(f"Failed to read Text-only leaderboard CSV file for version: {version}. Status Code: {csv_response.status_code}") |
|
|
|
|
|
if float(version[1:]) >= 1.6: |
|
mm_url = f"{base_repo}{version}_multimodal/results.csv" |
|
mm_response = requests.get(mm_url) |
|
if mm_response.status_code == 200: |
|
df = pd.read_csv(StringIO(mm_response.text)) |
|
df = process_df(df) |
|
df = df.sort_values(by=df.columns[1], ascending=False) |
|
mm_dfs.append(df) |
|
else: |
|
print(f"Failed to read multimodal leaderboard CSV file for version: {version}: Status Code: {csv_response.status_code}. Please ignore this message if multimodal results are not available for this version") |
|
|
|
github_data["text"] = text_dfs |
|
github_data["multimodal"] = mm_dfs |
|
github_data["date"] = formatted_date |
|
|
|
return github_data |
|
|
|
|
|
def process_df(df: pd.DataFrame) -> pd.DataFrame: |
|
""" |
|
Process dataframe: |
|
- Convert datatypes to sort by "float" instead of "str" |
|
- Remove repetition in model names |
|
- Update column names |
|
|
|
Args: |
|
df: Unprocessed Dataframe (after using update_cols) |
|
|
|
Returns: |
|
df: Processed Dataframe |
|
""" |
|
|
|
|
|
for col in df.columns[1:]: |
|
df[col] = pd.to_numeric(df[col], errors='coerce') |
|
|
|
|
|
df[df.columns[0]] = df[df.columns[0]].str.replace('-t0.0', '', regex=True) |
|
df[df.columns[0]] = df[df.columns[0]].apply(lambda x: '--'.join(set(x.split('--')))) |
|
|
|
|
|
custom_column_names = ['Model', 'Clemscore', '% Played', 'Quality Score'] |
|
for i, col in enumerate(df.columns[4:]): |
|
parts = col.split(',') |
|
custom_name = f"{parts[0].strip().capitalize()} {parts[1].strip()}" |
|
custom_column_names.append(custom_name) |
|
|
|
|
|
df.columns = custom_column_names |
|
|
|
return df |
|
|
|
|
|
def query_search(df: pd.DataFrame, query: str) -> pd.DataFrame: |
|
""" |
|
Filter the dataframe based on the search query. |
|
|
|
Args: |
|
df (pd.DataFrame): Unfiltered dataframe. |
|
query (str): A string of queries separated by ";". |
|
Returns: |
|
pd.DataFrame: Filtered dataframe containing searched queries in the 'Model' column. |
|
""" |
|
if not query.strip(): |
|
return df |
|
|
|
queries = [q.strip().lower() for q in query.split(';') if q.strip()] |
|
|
|
|
|
filtered_df = df[df['Model'].str.lower().str.contains('|'.join(queries))] |
|
|
|
return filtered_df |
|
|