|
import errno |
|
import functools |
|
import logging |
|
import os |
|
import pickle |
|
import signal |
|
import traceback |
|
from typing import Any, List |
|
|
|
import keyring |
|
import yaml |
|
from h2o_wave import Q, ui |
|
from keyring.errors import KeyringLocked, PasswordDeleteError |
|
|
|
from llm_studio.app_utils.config import default_cfg |
|
from llm_studio.app_utils.utils import get_database_dir, get_user_id |
|
|
|
__all__ = [ |
|
"load_user_settings_and_secrets", |
|
"load_default_user_settings", |
|
"save_user_settings_and_secrets", |
|
"Secrets", |
|
] |
|
|
|
logger = logging.getLogger(__name__) |
|
SECRET_KEYS = [ |
|
key |
|
for key in default_cfg.user_settings |
|
if any(password in key for password in ["token", "key"]) |
|
] |
|
USER_SETTING_KEYS = [key for key in default_cfg.user_settings if key not in SECRET_KEYS] |
|
|
|
|
|
async def save_user_settings_and_secrets(q: Q): |
|
await _save_secrets(q) |
|
_save_user_settings(q) |
|
|
|
|
|
def load_user_settings_and_secrets(q: Q): |
|
_maybe_migrate_to_yaml(q) |
|
_load_secrets(q) |
|
_load_user_settings(q) |
|
|
|
|
|
def load_default_user_settings(q: Q, clear_secrets=True): |
|
for key in default_cfg.user_settings: |
|
q.client[key] = default_cfg.user_settings[key] |
|
if clear_secrets: |
|
_clear_secrets(q, key) |
|
|
|
|
|
class NoSaver: |
|
""" |
|
Base class that provides methods for saving, loading, and deleting password entries. |
|
|
|
Attributes: |
|
username (str): The username associated with the password entries. |
|
root_dir (str): The root directory. |
|
|
|
Methods: |
|
save(name: str, password: str) -> None: |
|
Save a password entry with the given name and password. |
|
|
|
load(name: str) -> str: |
|
Load and return the password associated with the given name. |
|
|
|
delete(name: str) -> None: |
|
Delete the password entry with the given name. |
|
|
|
""" |
|
|
|
def __init__(self, username: str, root_dir: str): |
|
self.username = username |
|
self.root_dir = root_dir |
|
|
|
def save(self, name: str, password: str): |
|
pass |
|
|
|
def load(self, name: str) -> str: |
|
return "" |
|
|
|
def delete(self, name: str): |
|
pass |
|
|
|
|
|
class KeyRingSaver(NoSaver): |
|
""" |
|
A class for saving, loading, and deleting passwords using the keyring library. |
|
Some machines may not have keyring installed, so this class may not be available. |
|
""" |
|
|
|
def __init__(self, username: str, root_dir: str): |
|
super().__init__(username, root_dir) |
|
self.namespace = f"{username}_h2o_llmstudio" |
|
|
|
def save(self, name: str, password: str): |
|
keyring.set_password(self.namespace, name, password) |
|
|
|
def load(self, name: str) -> str: |
|
return keyring.get_password(self.namespace, name) or "" |
|
|
|
def delete(self, name: str): |
|
try: |
|
keyring.delete_password(self.namespace, name) |
|
except (KeyringLocked, PasswordDeleteError): |
|
pass |
|
except Exception as e: |
|
logger.warning(f"Error deleting password for keyring: {e}") |
|
|
|
|
|
class EnvFileSaver(NoSaver): |
|
""" |
|
This module provides the EnvFileSaver class, which is used to save, load, |
|
and delete name-password pairs in an environment file. |
|
Only use this class if you are sure that the environment file is secure. |
|
""" |
|
|
|
@property |
|
def filename(self): |
|
return os.path.join(self.root_dir, f"{self.username}.env") |
|
|
|
def save(self, name: str, password: str): |
|
data = {} |
|
if os.path.exists(self.filename): |
|
with open(self.filename, "r") as f: |
|
data = yaml.safe_load(f) |
|
data[name] = password |
|
with open(self.filename, "w") as f: |
|
yaml.safe_dump(data, f) |
|
|
|
def load(self, name: str) -> str: |
|
if not os.path.exists(self.filename): |
|
return "" |
|
|
|
with open(self.filename, "r") as f: |
|
data = yaml.safe_load(f) |
|
return data.get(name, "") |
|
|
|
def delete(self, name: str): |
|
if os.path.exists(self.filename): |
|
with open(self.filename, "r") as f: |
|
data = yaml.safe_load(f) |
|
if data and name in data: |
|
del data[name] |
|
with open(self.filename, "w") as f: |
|
yaml.safe_dump(data, f) |
|
|
|
|
|
|
|
class TimeoutError(Exception): |
|
pass |
|
|
|
|
|
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): |
|
def decorator(func): |
|
def _handle_timeout(signum, frame): |
|
raise TimeoutError(error_message) |
|
|
|
@functools.wraps(func) |
|
def wrapper(*args, **kwargs): |
|
signal.signal(signal.SIGALRM, _handle_timeout) |
|
signal.alarm(seconds) |
|
try: |
|
result = func(*args, **kwargs) |
|
finally: |
|
signal.alarm(0) |
|
return result |
|
|
|
return wrapper |
|
|
|
return decorator |
|
|
|
|
|
@timeout(3) |
|
def check_if_keyring_works(): |
|
""" |
|
Test if keyring is working. On misconfigured machines, |
|
Keyring may hang up to 2 minutes with the following error: |
|
jeepney.wrappers.DBusErrorResponse: |
|
[org.freedesktop.DBus.Error.TimedOut] |
|
("Failed to activate service 'org.freedesktop.secrets': |
|
timed out (service_start_timeout=120000ms)",) |
|
|
|
To avoid waiting for 2 minutes, we kill the process after 3 seconds. |
|
""" |
|
keyring.get_password("service", "username") |
|
|
|
|
|
class Secrets: |
|
""" |
|
Factory class to get the secrets' handler. |
|
""" |
|
|
|
_secrets = { |
|
"Do not save credentials permanently": NoSaver, |
|
".env File": EnvFileSaver, |
|
} |
|
try: |
|
check_if_keyring_works() |
|
logger.info("Keyring is correctly configured on this machine.") |
|
_secrets["Keyring"] = KeyRingSaver |
|
except TimeoutError: |
|
logger.warning( |
|
"Error loading keyring due to timeout. Disabling keyring save option." |
|
) |
|
except Exception as e: |
|
logger.warning(f"Error loading keyring: {e}. Disabling keyring save option.") |
|
|
|
@classmethod |
|
def names(cls) -> List[str]: |
|
return sorted(cls._secrets.keys()) |
|
|
|
@classmethod |
|
def get(cls, name: str) -> Any: |
|
return cls._secrets.get(name) |
|
|
|
|
|
def _save_user_settings(q: Q): |
|
user_settings = {key: q.client[key] for key in USER_SETTING_KEYS} |
|
with open(_get_usersettings_path(q), "w") as f: |
|
yaml.dump(user_settings, f) |
|
|
|
|
|
def _load_user_settings(q: Q): |
|
if os.path.isfile(_get_usersettings_path(q)): |
|
logger.info("Reading user settings") |
|
with open(_get_usersettings_path(q), "r") as f: |
|
user_settings = yaml.load(f, Loader=yaml.FullLoader) |
|
for key in USER_SETTING_KEYS: |
|
q.client[key] = user_settings.get(key, default_cfg.user_settings[key]) |
|
else: |
|
logger.info("No user settings found. Using default settings.") |
|
|
|
|
|
load_default_user_settings(q, clear_secrets=False) |
|
|
|
|
|
async def _save_secrets(q: Q): |
|
secret_name, secrets_handler = _get_secrets_handler(q) |
|
for key in SECRET_KEYS: |
|
try: |
|
_clear_secrets(q, key, excludes=tuple(secret_name)) |
|
if q.client[key]: |
|
secrets_handler.save(key, q.client[key]) |
|
|
|
except Exception: |
|
exception = str(traceback.format_exc()) |
|
logger.error(f"Could not save password {key} to {secret_name}") |
|
q.page["meta"].dialog = ui.dialog( |
|
title="Could not save secrets. " |
|
"Please choose another Credential Handler.", |
|
name="secrets_error", |
|
items=[ |
|
ui.text( |
|
f"The following error occurred when" |
|
f" using {secret_name}: {exception}." |
|
), |
|
ui.button( |
|
name="settings/close_error_dialog", label="Close", primary=True |
|
), |
|
], |
|
closable=True, |
|
) |
|
q.client["keep_meta"] = True |
|
await q.page.save() |
|
break |
|
else: |
|
|
|
q.client["dataset/import/s3_bucket"] = q.client["default_aws_bucket_name"] |
|
q.client["dataset/import/s3_access_key"] = q.client["default_aws_access_key"] |
|
q.client["dataset/import/s3_secret_key"] = q.client["default_aws_secret_key"] |
|
q.client["dataset/import/kaggle_access_key"] = q.client[ |
|
"default_kaggle_username" |
|
] |
|
q.client["dataset/import/kaggle_secret_key"] = q.client[ |
|
"default_kaggle_secret_key" |
|
] |
|
|
|
|
|
def _load_secrets(q: Q): |
|
secret_name, secrets_handler = _get_secrets_handler(q) |
|
for key in SECRET_KEYS: |
|
try: |
|
q.client[key] = secrets_handler.load(key) or default_cfg.user_settings[key] |
|
except Exception: |
|
logger.error(f"Could not load password {key} from {secret_name}") |
|
q.client[key] = "" |
|
|
|
|
|
def _get_secrets_handler(q: Q): |
|
secret_name = ( |
|
q.client["credential_saver"] or default_cfg.user_settings["credential_saver"] |
|
) |
|
secrets_handler = Secrets.get(secret_name)( |
|
username=get_user_id(q), root_dir=get_database_dir(q) |
|
) |
|
return secret_name, secrets_handler |
|
|
|
|
|
def _clear_secrets(q: Q, name: str, excludes=tuple()): |
|
for secret_name in Secrets.names(): |
|
if secret_name not in excludes: |
|
secrets_handler = Secrets.get(secret_name)( |
|
username=get_user_id(q), root_dir=get_database_dir(q) |
|
) |
|
|
|
secrets_handler.delete(name) |
|
|
|
|
|
def _maybe_migrate_to_yaml(q: Q): |
|
""" |
|
Migrate user settings from a pickle file to a YAML file. |
|
""" |
|
|
|
old_usersettings_path = os.path.join( |
|
get_database_dir(q), f"{get_user_id(q)}.settings" |
|
) |
|
if not os.path.isfile(old_usersettings_path): |
|
return |
|
|
|
try: |
|
with open(old_usersettings_path, "rb") as f: |
|
user_settings = pickle.load(f) |
|
|
|
secret_name, secrets_handler = _get_secrets_handler(q) |
|
logger.info(f"Migrating token using {secret_name}") |
|
for key in SECRET_KEYS: |
|
if key in user_settings: |
|
secrets_handler.save(key, user_settings[key]) |
|
|
|
with open(_get_usersettings_path(q), "w") as f: |
|
yaml.dump( |
|
{ |
|
key: value |
|
for key, value in user_settings.items() |
|
if key in USER_SETTING_KEYS |
|
}, |
|
f, |
|
) |
|
os.remove(old_usersettings_path) |
|
logger.info(f"Successfully migrated tokens to {secret_name}. Old file deleted.") |
|
except Exception as e: |
|
logger.info( |
|
f"Could not migrate tokens. " |
|
f"Please delete {old_usersettings_path} and set your credentials again." |
|
f"Error: \n\n {e} {traceback.format_exc()}" |
|
) |
|
|
|
|
|
def _get_usersettings_path(q: Q): |
|
return os.path.join(get_database_dir(q), f"{get_user_id(q)}.yaml") |
|
|