# Helper funcs for LLM_XXXXX.py import tiktoken, json, os, yaml from langchain_core.output_parsers.format_instructions import JSON_FORMAT_INSTRUCTIONS from transformers import AutoTokenizer import GPUtil import time import psutil import threading import torch from datetime import datetime from vouchervision.tool_taxonomy_WFO import validate_taxonomy_WFO, WFONameMatcher from vouchervision.tool_geolocate_HERE import validate_coordinates_here from vouchervision.tool_wikipedia import validate_wikipedia from concurrent.futures import ThreadPoolExecutor, as_completed def run_tools(output, tool_WFO, tool_GEO, tool_wikipedia, json_file_path_wiki): # Define a function that will catch and return the results of your functions def task(func, *args, **kwargs): return func(*args, **kwargs) # List of tasks to run in separate threads tasks = [ (validate_taxonomy_WFO, (tool_WFO, output, False)), (validate_coordinates_here, (tool_GEO, output, False)), (validate_wikipedia, (tool_wikipedia, json_file_path_wiki, output)), ] # Results storage results = {} # Use ThreadPoolExecutor to execute each function in its own thread with ThreadPoolExecutor() as executor: future_to_func = {executor.submit(task, func, *args): func.__name__ for func, args in tasks} for future in as_completed(future_to_func): func_name = future_to_func[future] try: # Collecting results results[func_name] = future.result() except Exception as exc: print(f'{func_name} generated an exception: {exc}') # Here, all threads have completed # Extracting results Matcher = WFONameMatcher(tool_WFO) GEO_dict_null = { 'GEO_override_OCR': False, 'GEO_method': '', 'GEO_formatted_full_string': '', 'GEO_decimal_lat': '', 'GEO_decimal_long': '', 'GEO_city': '', 'GEO_county': '', 'GEO_state': '', 'GEO_state_code': '', 'GEO_country': '', 'GEO_country_code': '', 'GEO_continent': '', } output_WFO, WFO_record = results.get('validate_taxonomy_WFO', (output, Matcher.NULL_DICT)) output_GEO, GEO_record = results.get('validate_coordinates_here', (output, GEO_dict_null)) return output_WFO, WFO_record, output_GEO, GEO_record def save_individual_prompt(prompt_template, txt_file_path_ind_prompt): with open(txt_file_path_ind_prompt, 'w',encoding='utf-8') as file: file.write(prompt_template) def sanitize_prompt(data): if isinstance(data, dict): return {sanitize_prompt(key): sanitize_prompt(value) for key, value in data.items()} elif isinstance(data, list): return [sanitize_prompt(element) for element in data] elif isinstance(data, str): return data.encode('utf-8', 'ignore').decode('utf-8') else: return data def count_tokens(string, vendor, model_name): full_string = string + JSON_FORMAT_INSTRUCTIONS def run_count(full_string, model_name): # Ensure the encoding is obtained correctly. encoding = tiktoken.encoding_for_model(model_name) tokens = encoding.encode(full_string) return len(tokens) try: if vendor == 'mistral': tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-v0.1") tokens = tokenizer.tokenize(full_string) return len(tokens) else: return run_count(full_string, model_name) except Exception as e: print(f"An error occurred: {e}") return 0 class SystemLoadMonitor(): def __init__(self, logger) -> None: self.monitoring_thread = None self.logger = logger self.gpu_usage = {'max_cpu_usage': 0, 'max_load': 0, 'max_vram_usage': 0, "max_ram_usage": 0, 'n_gpus': 0, 'monitoring': True} self.start_time = None self.tool_start_time = None self.has_GPU = torch.cuda.is_available() self.monitor_interval = 2 def start_monitoring_usage(self): self.start_time = time.time() self.monitoring_thread = threading.Thread(target=self.monitor_usage, args=(self.monitor_interval,)) self.monitoring_thread.start() def stop_inference_timer(self): # Stop inference timer and record elapsed time self.inference_time = time.time() - self.start_time # Immediately start the tool timer self.tool_start_time = time.time() def monitor_usage(self, interval): while self.gpu_usage['monitoring']: # GPU monitoring if self.has_GPU: GPUs = GPUtil.getGPUs() self.gpu_usage['n_gpus'] = len(GPUs) # Count the number of GPUs total_load = 0 total_memory_usage_gb = 0 for gpu in GPUs: total_load += gpu.load total_memory_usage_gb += gpu.memoryUsed / 1024.0 if self.gpu_usage['n_gpus'] > 0: # Avoid division by zero # Calculate the average load and memory usage across all GPUs self.gpu_usage['max_load'] = max(self.gpu_usage['max_load'], total_load / self.gpu_usage['n_gpus']) self.gpu_usage['max_vram_usage'] = max(self.gpu_usage['max_vram_usage'], total_memory_usage_gb) # RAM monitoring ram_usage = psutil.virtual_memory().used / (1024.0 ** 3) # Get RAM usage in GB self.gpu_usage['max_ram_usage'] = max(self.gpu_usage.get('max_ram_usage', 0), ram_usage) # CPU monitoring cpu_usage = psutil.cpu_percent(interval=None) self.gpu_usage['max_cpu_usage'] = max(self.gpu_usage.get('max_cpu_usage', 0), cpu_usage) time.sleep(interval) def get_current_datetime(self): # Get the current date and time now = datetime.now() # Format it as a string, replacing colons with underscores datetime_iso = now.strftime('%Y_%m_%dT%H_%M_%S') return datetime_iso def stop_monitoring_report_usage(self): self.gpu_usage['monitoring'] = False self.monitoring_thread.join() tool_time = time.time() - self.tool_start_time if self.tool_start_time else 0 num_gpus, gpu_dict, total_vram_gb, capability_score = check_system_gpus() report = { 'inference_time_s': str(round(self.inference_time, 2)), 'tool_time_s': str(round(tool_time, 2)), 'max_cpu': str(round(self.gpu_usage['max_cpu_usage'], 2)), 'max_ram_gb': str(round(self.gpu_usage['max_ram_usage'], 2)), 'current_time': self.get_current_datetime(), 'n_gpus': self.gpu_usage['n_gpus'], 'total_gpu_vram_gb':total_vram_gb, 'capability_score':capability_score, } self.logger.info(f"Inference Time: {round(self.inference_time,2)} seconds") self.logger.info(f"Tool Time: {round(tool_time,2)} seconds") self.logger.info(f"Max CPU Usage: {round(self.gpu_usage['max_cpu_usage'],2)}%") self.logger.info(f"Max RAM Usage: {round(self.gpu_usage['max_ram_usage'],2)}GB") if self.has_GPU: report.update({'max_gpu_load': str(round(self.gpu_usage['max_load'] * 100, 2))}) report.update({'max_gpu_vram_gb': str(round(self.gpu_usage['max_vram_usage'], 2))}) self.logger.info(f"Max GPU Load: {round(self.gpu_usage['max_load'] * 100, 2)}%") self.logger.info(f"Max GPU Memory Usage: {round(self.gpu_usage['max_vram_usage'], 2)}GB") else: report.update({'max_gpu_load': '0'}) report.update({'max_gpu_vram_gb': '0'}) return report def check_system_gpus(): print(f"Torch CUDA: {torch.cuda.is_available()}") # if not torch.cuda.is_available(): # return 0, {}, 0, "no_gpu" GPUs = GPUtil.getGPUs() num_gpus = len(GPUs) gpu_dict = {} total_vram = 0 for i, gpu in enumerate(GPUs): gpu_vram = gpu.memoryTotal # VRAM in MB gpu_dict[f"GPU_{i}"] = f"{gpu_vram / 1024} GB" # Convert to GB total_vram += gpu_vram total_vram_gb = total_vram / 1024 # Convert total VRAM to GB capability_score_map = { "no_gpu": 0, "class_8GB": 10, "class_12GB": 14, "class_16GB": 18, "class_24GB": 26, "class_48GB": 50, "class_96GB": 100, "class_96GBplus": float('inf'), # Use infinity to represent any value greater than 96GB } # Determine the capability score based on the total VRAM capability_score = "no_gpu" for score, vram in capability_score_map.items(): if total_vram_gb <= vram: capability_score = score break else: capability_score = "class_max" return num_gpus, gpu_dict, total_vram_gb, capability_score if __name__ == '__main__': num_gpus, gpu_dict, total_vram_gb, capability_score = check_system_gpus() print(f"Number of GPUs: {num_gpus}") print(f"GPU Details: {gpu_dict}") print(f"Total VRAM: {total_vram_gb} GB") print(f"Capability Score: {capability_score}")