LibVulnWatch / src /leaderboard /read_evals.py
seonglae-holistic's picture
fix: submit libaray with minimal information
fdddab8
import glob
import json
import os
from datetime import datetime
from pydantic import BaseModel
from src.display.formatting import make_clickable_library, make_clickable_report
from src.display.utils import auto_eval_column_attrs, LibraryType, Tasks, Language
def parse_iso_datetime(datetime_str: str) -> datetime:
"""Parse ISO format datetime string, handling 'Z' UTC timezone indicator"""
if datetime_str.endswith('Z'):
datetime_str = datetime_str[:-1] + '+00:00'
return datetime.fromisoformat(datetime_str)
class AssessmentResult(BaseModel):
"""Represents one full vulnerability assessment. Built from a combination of the result and request file for a given library.
"""
assessment_id: str # Unique identifier
library_name: str
org: str
repo: str
version: str
results: dict # Risk scores
framework: str = ""
language: Language = Language.Other
language_str: str = "" # Original language string to support multiple languages
library_type: LibraryType = LibraryType.Unknown
license: str = "?"
stars: int = 0
last_update: str = ""
availability: bool = True
verified: bool = False
report_url: str = "" # URL to detailed assessment report
@classmethod
def init_from_json_file(cls, json_filepath):
"""Initializes the assessment result from a JSON file"""
with open(json_filepath) as fp:
data = json.load(fp)
assessment = data.get("assessment", {})
# Get library and org
library_name = assessment.get("library_name", "")
org_and_repo = library_name.split("/", 1)
if len(org_and_repo) == 1:
org = ""
repo = org_and_repo[0]
assessment_id = f"{repo}_{assessment.get('version', '')}"
else:
org = org_and_repo[0]
repo = org_and_repo[1]
assessment_id = f"{org}_{repo}_{assessment.get('version', '')}"
# Extract risk scores
risk_scores = {}
for task in Tasks:
domain = task.value
score = assessment.get("scores", {}).get(domain.benchmark, None)
if score is not None:
risk_scores[domain.benchmark] = score
# Library metadata
framework = assessment.get("framework", "")
language_str = assessment.get("language", "Other")
# Handle multiple languages separated by /
if "/" in language_str:
language_parts = [lang.strip() for lang in language_str.split("/")]
# Store the full string but parse the first language for enum
language = next((lang for lang in Language if lang.value.name == language_parts[0]), Language.Other)
else:
language = next((lang for lang in Language if lang.value.name == language_str), Language.Other)
# Availability and verification
last_update = assessment.get("last_updated", "")
if last_update:
try:
# Format date for display
dt = parse_iso_datetime(last_update)
last_update = dt.strftime("%Y-%m-%d")
except Exception as e:
print(e)
pass
return cls(
assessment_id=assessment_id,
library_name=library_name,
org=org,
repo=repo,
version=assessment.get("version", ""),
results=risk_scores,
framework=framework,
language=language,
language_str=language_str,
license=assessment.get("license", "?"),
availability=assessment.get("active_maintenance", True),
verified=assessment.get("independently_verified", False),
last_update=last_update,
report_url=assessment.get("report_url", ""),
)
def to_dict(self):
"""Converts the Assessment Result to a dict compatible with our dataframe display"""
# Calculate Trust Score as equal-weight average
weights = {
"license_validation": 0.2,
"security_assessment": 0.2,
"maintenance_health": 0.2,
"dependency_management": 0.2,
"regulatory_compliance": 0.2
}
# Calculate Trust Score - if domain is missing, use highest risk score (10)
risk_sum = 0
weight_sum = 0
for domain, weight in weights.items():
score = self.results.get(domain, 10) # Default to highest risk if missing
risk_sum += score * weight
weight_sum += weight
trust_score = risk_sum / weight_sum if weight_sum > 0 else 10
# Round to 1 decimal place
trust_score = round(trust_score, 1)
data_dict = {
"assessment_id": self.assessment_id, # not a column, just a save name
auto_eval_column_attrs.library_type.name: self.library_type.value.name,
auto_eval_column_attrs.library_type_symbol.name: self.library_type.value.symbol,
auto_eval_column_attrs.language.name: self.language_str if self.language_str else self.language.value.name,
auto_eval_column_attrs.framework.name: self.framework,
auto_eval_column_attrs.library.name: make_clickable_library(self.library_name),
auto_eval_column_attrs.version.name: self.version,
auto_eval_column_attrs.overall_risk.name: trust_score,
auto_eval_column_attrs.license_name.name: self.license,
auto_eval_column_attrs.stars.name: self.stars,
auto_eval_column_attrs.last_update.name: self.last_update,
auto_eval_column_attrs.verified.name: self.verified,
auto_eval_column_attrs.availability.name: self.availability,
auto_eval_column_attrs.report_url.name: make_clickable_report(self.report_url),
}
# Add task-specific risk scores - map to display column names
for task in Tasks:
task_enum = task.value # Task dataclass instance
benchmark_key = task_enum.benchmark # e.g., "license_validation"
col_name = task_enum.col_name # Use the display name, e.g., "License Risk"
risk_score = self.results.get(benchmark_key, 10) # Default to highest risk
# Round to 1 decimal place
data_dict[col_name] = round(risk_score, 1)
return data_dict
def update_with_request_file(self, assessment_filepath):
"""Finds the relevant request file for the current library and updates info with it"""
try:
with open(assessment_filepath, "r") as f:
request = json.load(f)["assessment"]
self.library_type = LibraryType.from_str(request.get("framework", ""))
self.stars = request.get("github_stars", 0)
except Exception as e:
print(e)
print(f"Could not find request file for {self.library_name} version {self.version}")
def get_request_file_for_library(requests_path, library_name, version):
"""Selects the correct request file for a given library. Only keeps runs tagged as FINISHED"""
# Try multiple naming patterns for flexibility
possible_patterns = [
f"{library_name.replace('/', '_')}_eval_request_*.json", # Original pattern
f"{library_name.replace('/', '_')}_request.json", # Simple pattern
f"{library_name.replace('/', '_')}*.json" # Fallback pattern
]
request_files = []
for pattern in possible_patterns:
pattern_path = os.path.join(requests_path, pattern)
found_files = glob.glob(pattern_path)
request_files.extend(found_files)
if not request_files:
print(f"Warning: No request files found matching {library_name}")
return ""
# Select correct request file (version)
request_file = ""
request_files = sorted(request_files, reverse=True)
for tmp_request_file in request_files:
try:
with open(tmp_request_file, "r") as f:
req_content = json.load(f)
if (
req_content.get("status", "") in ["FINISHED"] and
req_content.get("version", "") == version
):
request_file = tmp_request_file
break
except Exception as e:
print(f"Error reading {tmp_request_file}: {e}")
continue
return request_file
def get_raw_assessment_results(results_path: str, requests_path: str) -> list[AssessmentResult]:
"""From the path of the results folder root, extract all needed info for assessments"""
assessment_filepaths = []
for root, _, files in os.walk(results_path):
# We should only have json files in assessment results
if len(files) == 0 or any([not f.endswith(".json") for f in files]):
continue
# Sort the files by date if they have date info
try:
files.sort(key=lambda x: parse_iso_datetime(json.loads(open(os.path.join(root, x)).read())["assessment"]["completed_time"]), reverse=True)
except Exception as e:
print(e)
pass
for file in files:
assessment_filepaths.append(os.path.join(root, file))
assessment_results = {}
for assessment_filepath in assessment_filepaths:
# Creation of result
assessment_result = AssessmentResult.init_from_json_file(assessment_filepath)
assessment_result.update_with_request_file(assessment_filepath)
# Store results of same eval together
assessment_id = assessment_result.assessment_id
if assessment_id in assessment_results.keys():
assessment_results[assessment_id].results.update({k: v for k, v in assessment_result.results.items() if v is not None})
else:
assessment_results[assessment_id] = assessment_result
results = []
for v in assessment_results.values():
try:
v.to_dict() # we test if the dict version is complete
results.append(v)
except KeyError: # not all eval values present
continue
return results