Spaces:
Runtime error
Runtime error
| import html | |
| import threading | |
| import time | |
| import cProfile | |
| from modules import shared, progress, errors | |
| queue_lock = threading.Lock() | |
| def wrap_queued_call(func): | |
| def f(*args, **kwargs): | |
| with queue_lock: | |
| res = func(*args, **kwargs) | |
| return res | |
| return f | |
| def wrap_gradio_gpu_call(func, extra_outputs=None): | |
| name = func.__name__ | |
| def f(*args, **kwargs): | |
| # if the first argument is a string that says "task(...)", it is treated as a job id | |
| if len(args) > 0 and type(args[0]) == str and args[0][0:5] == "task(" and args[0][-1] == ")": | |
| id_task = args[0] | |
| progress.add_task_to_queue(id_task) | |
| else: | |
| id_task = None | |
| with queue_lock: | |
| progress.start_task(id_task) | |
| res = [None, '', '', ''] | |
| try: | |
| res = func(*args, **kwargs) | |
| progress.record_results(id_task, res) | |
| except Exception as e: | |
| shared.log.error(f"Exception: {e}") | |
| shared.log.error(f"Arguments: args={str(args)[:10240]} kwargs={str(kwargs)[:10240]}") | |
| errors.display(e, 'gradio call') | |
| res[-1] = f"<div class='error'>{html.escape(str(e))}</div>" | |
| finally: | |
| progress.finish_task(id_task) | |
| return res | |
| return wrap_gradio_call(f, extra_outputs=extra_outputs, add_stats=True, name=name) | |
| def wrap_gradio_call(func, extra_outputs=None, add_stats=False, name=None): | |
| job_name = name if name is not None else func.__name__ | |
| def f(*args, extra_outputs_array=extra_outputs, **kwargs): | |
| t = time.perf_counter() | |
| shared.mem_mon.reset() | |
| shared.state.begin(job_name) | |
| try: | |
| if shared.cmd_opts.profile: | |
| pr = cProfile.Profile() | |
| pr.enable() | |
| res = func(*args, **kwargs) | |
| if res is None: | |
| msg = "No result returned from function" | |
| shared.log.warning(msg) | |
| res = [None, '', '', f"<div class='error'>{html.escape(msg)}</div>"] | |
| else: | |
| res = list(res) | |
| if shared.cmd_opts.profile: | |
| errors.profile(pr, 'Wrap') | |
| except Exception as e: | |
| errors.display(e, 'gradio call') | |
| if extra_outputs_array is None: | |
| extra_outputs_array = [None, ''] | |
| res = extra_outputs_array + [f"<div class='error'>{html.escape(type(e).__name__+': '+str(e))}</div>"] | |
| shared.state.end() | |
| if not add_stats: | |
| return tuple(res) | |
| elapsed = time.perf_counter() - t | |
| elapsed_m = int(elapsed // 60) | |
| elapsed_s = elapsed % 60 | |
| elapsed_text = f"{elapsed_m}m {elapsed_s:.2f}s" if elapsed_m > 0 else f"{elapsed_s:.2f}s" | |
| vram_html = '' | |
| if not shared.mem_mon.disabled: | |
| vram = {k: -(v//-(1024*1024)) for k, v in shared.mem_mon.read().items()} | |
| if vram.get('active_peak', 0) > 0: | |
| vram_html = " | <p class='vram'>" | |
| vram_html += f"GPU active {max(vram['active_peak'], vram['reserved_peak'])} MB reserved {vram['reserved']} | used {vram['used']} MB free {vram['free']} MB total {vram['total']} MB" | |
| vram_html += f" | retries {vram['retries']} oom {vram['oom']}" if vram.get('retries', 0) > 0 or vram.get('oom', 0) > 0 else '' | |
| vram_html += "</p>" | |
| if isinstance(res, list): | |
| res[-1] += f"<div class='performance'><p class='time'>Time: {elapsed_text}</p>{vram_html}</div>" | |
| return tuple(res) | |
| return f | |