substra / substra_template /mlflow_live_performances.py
NimaBoscarino's picture
WIP: Substra orchestrator
04a30fc
raw
history blame
1.49 kB
import pandas as pd
import json
from pathlib import Path
from mlflow import log_metric
import time
import os
from glob import glob
TIMEOUT = 60 # Number of seconds to stop the script after the last update of the json file
POLLING_FREQUENCY = 10 # Try to read the updates in the file every 10 seconds
# Wait for the file to be found
start = time.time()
while not len(glob(str(Path("local-worker") / "live_performances" / "*" / "performances.json"))) > 0:
time.sleep(POLLING_FREQUENCY)
if time.time() - start >= TIMEOUT:
raise TimeoutError("The performance file does not exist, maybe no test task has been executed yet.")
path_to_json = Path(glob(str(Path("local-worker") / "live_performances" / "*" / "performances.json"))[0])
logged_rows = []
last_update = time.time()
while (time.time() - last_update) <= TIMEOUT:
if last_update == os.path.getmtime(str(path_to_json)):
time.sleep(POLLING_FREQUENCY)
continue
last_update = os.path.getmtime(str(path_to_json))
time.sleep(1) # Waiting for the json to be fully written
dict_perf = json.load(path_to_json.open())
df = pd.DataFrame(dict_perf)
for _, row in df.iterrows():
if row["testtask_key"] in logged_rows:
continue
logged_rows.append(row["testtask_key"])
step = int(row["round_idx"]) if row["round_idx"] is not None else int(row["testtask_rank"])
log_metric(f"{row['metric_name']}_{row['worker']}", row["performance"], step)