import json import os import time import bittensor as bt import numpy as np import pandas as pd import streamlit as st import tqdm import wandb # TODO: Store the runs dataframe (as in sn1 dashboard) and top up with the ones created since the last snapshot # TODO: Store relevant wandb data in a database for faster access # TODO: filter out netuid 141(?) MIN_STEPS = 12 # minimum number of steps in wandb run in order to be worth analyzing MAX_RUNS = 100#0000 NETUID = 25 BASE_PATH = 'macrocosmos/folding-validators' # added historical data from otf wandb and current data NETWORK = 'finney' KEYS = None ABBREV_CHARS = 8 ENTITY_CHOICES = ('identity', 'hotkey', 'coldkey') PDBS_PER_RUN_STEP = 0.083 AVG_MD_STEPS = 30_000 BASE_UNITS = 'GB' SAVE_PATH = 'current_runs/' # Check if the directory exists if not os.path.exists(SAVE_PATH): # If it doesn't exist, create the directory os.makedirs(SAVE_PATH) api = wandb.Api(timeout=120, api_key='cdcbe340bb7937d3a289d39632491d12b39231b7') IDENTITIES = { '5F4tQyWrhfGVcNhoqeiNsR6KjD4wMZ2kfhLj4oHYuyHbZAc3': 'opentensor', '5Hddm3iBFD2GLT5ik7LZnT3XJUnRnN8PoeCFgGQgawUVKNm8': 'taostats', '5HEo565WAy4Dbq3Sv271SAi7syBSofyfhhwRNjFNSM2gP9M2': 'foundry', '5HK5tp6t2S59DywmHRWPBVJeJ86T61KjurYqeooqj8sREpeN': 'bittensor-guru', '5FFApaS75bv5pJHfAp2FVLBj9ZaXuFDjEypsaBNc1wCfe52v': 'roundtable-21', '5EhvL1FVkQPpMjZX4MAADcW42i3xPSF1KiCpuaxTYVr28sux': 'tao-validator', '5FKstHjZkh4v3qAMSBa1oJcHCLjxYZ8SNTSz1opTv4hR7gVB': 'datura', '5DvTpiniW9s3APmHRYn8FroUWyfnLtrsid5Mtn5EwMXHN2ed': 'first-tensor', '5HbLYXUBy1snPR8nfioQ7GoA9x76EELzEq9j7F32vWUQHm1x': 'tensorplex', '5CsvRJXuR955WojnGMdok1hbhffZyB4N5ocrv82f3p5A2zVp': 'owl-ventures', '5CXRfP2ekFhe62r7q3vppRajJmGhTi7vwvb2yr79jveZ282w': 'rizzo', '5HNQURvmjjYhTSksi8Wfsw676b4owGwfLR2BFAQzG7H3HhYf': 'neural-internet' } EXTRACTORS = { 'state': lambda x: x.state, 'run_id': lambda x: x.id, 'user': lambda x: x.user.name[:16], 'username': lambda x: x.user.username[:16], # 'created_at': lambda x: pd.Timestamp(x.created_at), 'last_event_at': lambda x: pd.to_datetime(x.summary.get('_timestamp'), errors='coerce'), 'netuid': lambda x: x.config.get('netuid'), 'mock': lambda x: x.config.get('neuron').get('mock'), 'sample_size': lambda x: x.config.get('neuron').get('sample_size'), 'queue_size': lambda x: x.config.get('neuron').get('queue_size'), 'timeout': lambda x: x.config.get('neuron').get('timeout'), # 'update_interval': lambda x: x.config.get('neuron').get('update_interval'), 'epoch_length': lambda x: x.config.get('neuron').get('epoch_length'), 'disable_set_weights': lambda x: x.config.get('neuron').get('disable_set_weights'), # This stuff is from the last logged event 'num_steps': lambda x: x.summary.get('_step'), # 'runtime': lambda x: x.summary.get('_runtime'), # 'init_energy': lambda x: x.summary.get('init_energy'), # 'best_energy': lambda x: x.summary.get('best_loss'), # 'pdb_id': lambda x: x.summary.get('pdb_id'), 'pdb_updates': lambda x: x.summary.get('updated_count'), 'total_returned_sizes': lambda x: get_total_file_sizes(x), 'total_sent_sizes': lambda x: get_total_md_input_sizes(x), 'pdb_atoms': lambda x: get_pdb_complexity(x), 'version': lambda x: x.tags[0], 'spec_version': lambda x: x.tags[1], 'vali_hotkey': lambda x: x.tags[2], # System metrics 'disk_read': lambda x: x.system_metrics.get('system.disk.in'), 'disk_write': lambda x: x.system_metrics.get('system.disk.out'), 'network_sent': lambda x: x.system_metrics.get('system.network.sent'), 'network_recv': lambda x: x.system_metrics.get('system.network.recv'), # Really slow stuff below # 'started_at': lambda x: x.metadata.get('startedAt'), # 'disk_used': lambda x: x.metadata.get('disk').get('/').get('used'), # 'commit': lambda x: x.metadata.get('git').get('commit') } def get_pdb_complexity(run, field='ATOM', preprocess=True): data = run.summary.get('pdb_complexity') if not isinstance(data, list) or len(data)==0: return None data = data[0] counts = data.get(field) if counts is not None: return counts counts = 0 for key in data.keys(): if key.startswith(field): counts+=data.get(key) return counts def convert_unit(value, from_unit, to_unit): """Converts a value from one unit to another example: convert_unit(1024, 'KB', 'MB') -> 1 convert_unit(1024, 'MB', 'KB') -> 1048576 """ units = ['B', 'KB','MB','GB','TB'] assert from_unit.upper() in units, f'From unit {from_unit!r} not in {units}' assert to_unit.upper() in units, f'To unit {to_unit!r} not in {units}' factor = 1024**(units.index(from_unit) - units.index(to_unit)) # print(f'Converting from {from_unit!r} to {to_unit!r}, factor: {factor}') return value * factor def get_total_file_sizes(run): """returns total size of byte strings in bytes""" # for sizes in run.summary.get('response_returned_files_sizes',[[]]): # if sizes and sized is not None: # for size in sizes: file_sizes = run.summary.get('response_returned_files_sizes',[[]]) if file_sizes is None: return 0 size_bytes = sum(size for sizes in file_sizes for size in sizes if sizes and sizes is not None) return convert_unit(size_bytes, from_unit='B', to_unit=BASE_UNITS) def get_total_md_input_sizes(run): """returns total size of byte strings in bytes""" size_bytes = sum(run.summary.get('md_inputs_sizes',[])) return convert_unit(size_bytes, from_unit='B', to_unit=BASE_UNITS) def get_data_transferred(df, df_24h, unit='GB'): def safe_json_loads(x): try: return json.loads(x) except ValueError: return [] def np_sum(x): try: # Flatten the list of lists and convert it to a NumPy array flat_array = np.array([item for sublist in x for item in sublist]) # Use np.sum() to sum all elements in the flattened array total_sum = np.sum(flat_array) return total_sum except TypeError: return 0 df = df.dropna(subset=['md_inputs_sizes', 'response_returned_files_sizes']) df['md_inputs_sizes'] = df.md_inputs_sizes.apply(safe_json_loads) df['response_returned_files_sizes'] = df.response_returned_files_sizes.apply(safe_json_loads) df['md_inputs_sum'] = df.md_inputs_sizes.apply(np.sum) df['md_outputs_sum'] = df.response_returned_files_sizes.apply(np_sum) df['md_inputs_sum'] = df['md_inputs_sum'].apply(convert_unit, from_unit='B', to_unit=BASE_UNITS) df['md_outputs_sum'] = df['md_outputs_sum'].apply(convert_unit, from_unit='B', to_unit=BASE_UNITS) df_24h = df_24h.dropna(subset=['md_inputs_sizes', 'response_returned_files_sizes']) df_24h['md_inputs_sizes'] = df_24h.md_inputs_sizes.apply(safe_json_loads) df_24h['response_returned_files_sizes'] = df_24h.response_returned_files_sizes.apply(safe_json_loads) df_24h['md_inputs_sum'] = df_24h.md_inputs_sizes.apply(np.sum) df_24h['md_outputs_sum'] = df_24h.response_returned_files_sizes.apply(np_sum) validator_sent = np.nansum(df['md_inputs_sum'].values) miner_sent = np.nansum(df['md_outputs_sum'].values) validator_sent_24h = np.nansum(df_24h['md_inputs_sum'].values) miner_sent_24h = np.nansum(df_24h['md_outputs_sum'].values) return {'all_time': { 'validator_sent': validator_sent, 'miner_sent': miner_sent, }, 'last_24h': { 'validator_sent': convert_unit(validator_sent_24h, from_unit='B', to_unit=BASE_UNITS), 'miner_sent': convert_unit(miner_sent_24h, from_unit='B', to_unit=BASE_UNITS), }, 'data': df[['md_inputs_sum', 'md_outputs_sum', 'updated_at']].to_dict() } def calculate_productivity_data(df): completed_jobs = df[df['updated_count'] == 10] completed_jobs['last_event_at'] = pd.to_datetime(completed_jobs['updated_at']) unique_folded = completed_jobs.drop_duplicates(subset=['pdb_id'], keep='first') completed_jobs = completed_jobs.sort_values(by='last_event_at').reset_index() completed_jobs['cumulative_jobs'] = completed_jobs.index + 1 unique_folded = unique_folded.sort_values(by='last_event_at').reset_index() unique_folded['cumulative_jobs'] = unique_folded.index + 1 return { 'unique_folded': len(unique_folded), 'total_completed_jobs': len(completed_jobs), 'unique_folded_data': {'last_event_at': unique_folded['last_event_at'].dt.to_pydatetime(), 'cumulative_jobs':unique_folded['cumulative_jobs'].values}, 'total_completed_jobs_data': {'last_event_at': completed_jobs['last_event_at'].dt.to_pydatetime(), 'cumulative_jobs':completed_jobs['cumulative_jobs'].values} } def get_productivity(df_all, df_24h, df_30d): result = { 'all_time': { 'unique_folded': 0, 'total_completed_jobs': 0, 'unique_folded_data': {}, 'total_completed_jobs_data': {} }, 'last_24h': { 'unique_folded': 0, 'total_completed_jobs': 0, "unique_folded_data": {}, 'total_completed_jobs_data': {} }, 'last_30d': { 'unique_folded': 0, 'total_completed_jobs': 0, "unique_folded_data": {}, 'total_completed_jobs_data': {} } } if df_all is not None: result['all_time'].update(calculate_productivity_data(df_all)) if df_24h is not None: result['last_24h'].update(calculate_productivity_data(df_24h)) if df_30d is not None: result['last_30d'].update(calculate_productivity_data(df_30d)) return result def get_leaderboard(df, entity_choice='identity'): df = df.loc[df.validator_permit==False] df.index = range(df.shape[0]) return df.groupby(entity_choice).I.sum().sort_values().reset_index() def fetch_new_runs(base_path: str = BASE_PATH , netuid: int = 25, min_steps: int = 10, save_path: str= SAVE_PATH, extractors: dict = EXTRACTORS): runs_checker = pd.read_csv('runs_checker.csv') current_time = pd.to_datetime(time.time(), unit='s') current_time_str = current_time.strftime('%y-%m-%d') # Format as 'YYYYMMDD' new_ticker = runs_checker.check_ticker.max() + 1 new_rows_list = [] # update runs list based on all current runs running for run in api.runs(base_path): num_steps = run.summary.get('_step') if run.config.get('netuid') != netuid: continue if num_steps is None or num_steps < min_steps: continue if run.state =='running': new_rows_list.append({ 'run_id': run.id, 'state': run.state, 'step': num_steps, 'check_time': current_time, 'check_ticker': new_ticker, 'user': run.user.name[:16], 'username': run.user.username[:16] }) if new_rows_list: new_rows_df = pd.DataFrame(new_rows_list) runs_checker= pd.concat([runs_checker, new_rows_df], ignore_index=True) # save runs_checker.to_csv('runs_checker.csv', index=False) bt.logging.info(f'Cross checking runs for ticker {new_ticker} against previous ticker') previous_check = runs_checker[runs_checker.check_ticker==new_ticker - 1] current_check = runs_checker[runs_checker.check_ticker == new_ticker] # save ended runs from last check for run_id in previous_check.run_id: if run_id not in current_check.run_id: frame = load_run(f'{base_path}/{run_id}', extractors=EXTRACTORS) csv_path = os.path.join(save_path, f"{run_id}.csv") frame.to_csv(csv_path) # save new runs for run in api.runs(base_path): if run.config.get('netuid') != netuid: continue num_steps = run.summary.get('_step') if num_steps is None or num_steps < min_steps: continue if run.state =='running': frame = load_run(run_path='/'.join(run.path), extractors=EXTRACTORS) csv_path = os.path.join(save_path, f"{run.id}.csv") frame.to_csv(csv_path) def preload_data(): # save all the paths of files to a list in a directory paths_list = [] for path in os.listdir(SAVE_PATH): paths_list.append(os.path.join(SAVE_PATH, path)) df_list = [] for path in paths_list: df = pd.read_csv(path,low_memory=False) df_list.append(df) combined_df = pd.concat(df_list, ignore_index=True) return combined_df @st.cache_data() def get_metagraph(): subtensor = bt.subtensor(network=NETWORK) m = subtensor.metagraph(netuid=NETUID) meta_cols = ['I','stake','trust','validator_trust','validator_permit','C','R','E','dividends','last_update'] df_m = pd.DataFrame({k: getattr(m, k) for k in meta_cols}) df_m['uid'] = range(m.n.item()) df_m['hotkey'] = list(map(lambda a: a.hotkey, m.axons)) df_m['coldkey'] = list(map(lambda a: a.coldkey, m.axons)) df_m['ip'] = list(map(lambda a: a.ip, m.axons)) df_m['port'] = list(map(lambda a: a.port, m.axons)) df_m['coldkey'] = df_m.coldkey.str[:ABBREV_CHARS] df_m['hotkey'] = df_m.hotkey.str[:ABBREV_CHARS] df_m['identity'] = df_m.apply(lambda x: f'{x.hotkey} @ uid {x.uid}', axis=1) return df_m def load_run(run_path: str, extractors: dict): print('Loading run:', run_path) run = api.run(run_path) df = pd.DataFrame(list(run.scan_history())) for col in ['updated_at', 'best_loss_at', 'created_at']: if col in df.columns: df[col] = pd.to_datetime(df[col]) num_rows=len(df) extractor_df = {key: func(run) for key, func in extractors.items()} repeated_data = {key: [value] * num_rows for key, value in extractor_df.items()} extractor_df = pd.DataFrame(repeated_data) combined_df = pd.concat([df, extractor_df], axis=1) return combined_df @st.cache_data(show_spinner=False) def build_data(timestamp=None, paths=BASE_PATH, min_steps=MIN_STEPS, use_cache=True): save_path = '_saved_runs.csv' filters = {} df = pd.DataFrame() # Load the last saved runs so that we only need to update the new ones if use_cache and os.path.exists(save_path): df = pd.read_csv(save_path) df['created_at'] = pd.to_datetime(df['created_at']) df['last_event_at'] = pd.to_datetime(df['last_event_at']) timestamp_str = df['last_event_at'].max().isoformat() filters.update({'updated_at': {'$gte': timestamp_str}}) progress = st.progress(0, text='Loading data') historical_runs = api.runs(paths[0], filters=filters) historical_and_current_runs = [historical_runs, api.runs(paths[1], filters=filters)] run_data = [] n_events = 0 total_runs = len(historical_and_current_runs[0])+len(historical_and_current_runs[1]) for runs in historical_and_current_runs: for i, run in enumerate(tqdm.tqdm(runs, total=total_runs)): num_steps = run.summary.get('_step',0) if num_steps pd.Timestamp.now() - pd.Timedelta('1d')) df_24h = df.loc[runs_alive_24h_ago] df_m = get_metagraph(time.time()//UPDATE_INTERVAL) return { 'dataframe': df, 'dataframe_24h': df_24h, 'metagraph': df_m, }