Spaces:
Runtime error
Runtime error
from typing import Any, Dict | |
from fastapi import Depends | |
from modules import shared | |
from modules.api import models, helpers | |
def post_shutdown(): | |
shared.log.info('Shutdown request received') | |
import sys | |
sys.exit(0) | |
def get_motd(): | |
import requests | |
motd = '' | |
ver = shared.get_version() | |
if ver.get('updated', None) is not None: | |
motd = f"version <b>{ver['hash']} {ver['updated']}</b> <span style='color: var(--primary-500)'>{ver['url'].split('/')[-1]}</span><br>" | |
if shared.opts.motd: | |
res = requests.get('https://vladmandic.github.io/automatic/motd', timeout=10) | |
if res.status_code == 200: | |
msg = (res.text or '').strip() | |
shared.log.info(f'MOTD: {msg if len(msg) > 0 else "N/A"}') | |
motd += res.text | |
return motd | |
def get_version(): | |
return shared.get_version() | |
def get_platform(): | |
from installer import get_platform as installer_get_platform | |
from modules.loader import get_packages as loader_get_packages | |
return { **installer_get_platform(), **loader_get_packages() } | |
def get_log_buffer(req: models.ReqLog = Depends()): | |
lines = shared.log.buffer[:req.lines] if req.lines > 0 else shared.log.buffer.copy() | |
if req.clear: | |
shared.log.buffer.clear() | |
return lines | |
def get_config(): | |
options = {} | |
for k in shared.opts.data.keys(): | |
if shared.opts.data_labels.get(k) is not None: | |
options.update({k: shared.opts.data.get(k, shared.opts.data_labels.get(k).default)}) | |
else: | |
options.update({k: shared.opts.data.get(k, None)}) | |
if 'sd_lyco' in options: | |
del options['sd_lyco'] | |
if 'sd_lora' in options: | |
del options['sd_lora'] | |
return options | |
def set_config(req: Dict[str, Any]): | |
updated = [] | |
for k, v in req.items(): | |
updated.append({ k: shared.opts.set(k, v) }) | |
shared.opts.save(shared.config_filename) | |
return { "updated": updated } | |
def get_cmd_flags(): | |
return vars(shared.cmd_opts) | |
def get_progress(req: models.ReqProgress = Depends()): | |
import time | |
if shared.state.job_count == 0: | |
return models.ResProgress(progress=0, eta_relative=0, state=shared.state.dict(), textinfo=shared.state.textinfo) | |
shared.state.do_set_current_image() | |
current_image = None | |
if shared.state.current_image and not req.skip_current_image: | |
current_image = helpers.encode_pil_to_base64(shared.state.current_image) | |
batch_x = max(shared.state.job_no, 0) | |
batch_y = max(shared.state.job_count, 1) | |
step_x = max(shared.state.sampling_step, 0) | |
step_y = max(shared.state.sampling_steps, 1) | |
current = step_y * batch_x + step_x | |
total = step_y * batch_y | |
progress = current / total if current > 0 and total > 0 else 0 | |
time_since_start = time.time() - shared.state.time_start | |
eta_relative = (time_since_start / progress) - time_since_start if progress > 0 else 0 | |
res = models.ResProgress(progress=progress, eta_relative=eta_relative, state=shared.state.dict(), current_image=current_image, textinfo=shared.state.textinfo) | |
return res | |
def post_interrupt(): | |
shared.state.interrupt() | |
return {} | |
def post_skip(): | |
shared.state.skip() | |
def get_memory(): | |
try: | |
import os | |
import psutil | |
process = psutil.Process(os.getpid()) | |
res = process.memory_info() # only rss is cross-platform guaranteed so we dont rely on other values | |
ram_total = 100 * res.rss / process.memory_percent() # and total memory is calculated as actual value is not cross-platform safe | |
ram = { 'free': ram_total - res.rss, 'used': res.rss, 'total': ram_total } | |
except Exception as err: | |
ram = { 'error': f'{err}' } | |
try: | |
import torch | |
if torch.cuda.is_available(): | |
s = torch.cuda.mem_get_info() | |
system = { 'free': s[0], 'used': s[1] - s[0], 'total': s[1] } | |
s = dict(torch.cuda.memory_stats(shared.device)) | |
allocated = { 'current': s['allocated_bytes.all.current'], 'peak': s['allocated_bytes.all.peak'] } | |
reserved = { 'current': s['reserved_bytes.all.current'], 'peak': s['reserved_bytes.all.peak'] } | |
active = { 'current': s['active_bytes.all.current'], 'peak': s['active_bytes.all.peak'] } | |
inactive = { 'current': s['inactive_split_bytes.all.current'], 'peak': s['inactive_split_bytes.all.peak'] } | |
warnings = { 'retries': s['num_alloc_retries'], 'oom': s['num_ooms'] } | |
cuda = { | |
'system': system, | |
'active': active, | |
'allocated': allocated, | |
'reserved': reserved, | |
'inactive': inactive, | |
'events': warnings, | |
} | |
else: | |
cuda = { 'error': 'unavailable' } | |
except Exception as err: | |
cuda = { 'error': f'{err}' } | |
return models.ResMemory(ram = ram, cuda = cuda) | |