|
import os |
|
import re |
|
import tempfile |
|
import shutil |
|
import logging |
|
from pathlib import Path |
|
|
|
from huggingface_hub import ( |
|
create_repo, |
|
upload_folder, |
|
list_repo_files, |
|
whoami, |
|
hf_hub_download, |
|
delete_file as hf_delete_file, |
|
HfApi |
|
) |
|
from huggingface_hub.hf_api import CommitOperationDelete |
|
|
|
from huggingface_hub.utils import HfHubHTTPError |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def _get_api_token(ui_token_from_textbox=None): |
|
env_token = os.getenv('HF_TOKEN') |
|
if env_token: return env_token, None |
|
if ui_token_from_textbox: return ui_token_from_textbox, None |
|
return None, "Error: Hugging Face API token not provided in UI or HF_TOKEN env var." |
|
|
|
|
|
def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui): |
|
if not space_name_ui: return None, "Error: Space Name cannot be empty." |
|
if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field for the owner part." |
|
|
|
final_owner = owner_ui |
|
error_message = None |
|
|
|
if not final_owner: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return None, token_err |
|
if not resolved_api_token: return None, "Error: API token required for auto owner determination if Owner field is empty." |
|
try: |
|
user_info = whoami(token=resolved_api_token) |
|
if user_info and 'name' in user_info: |
|
final_owner = user_info['name'] |
|
else: |
|
error_message = "Error: Could not retrieve username from token. Check token permissions or specify Owner." |
|
except Exception as e: |
|
error_message = f"Error retrieving username from token: {str(e)}. Specify Owner or check token." |
|
if error_message: return None, error_message |
|
|
|
if not final_owner: return None, "Error: Owner could not be determined. Please specify it in the Owner field." |
|
return f"{final_owner}/{space_name_ui}", None |
|
|
|
|
|
|
|
def parse_markdown(markdown_input): |
|
space_info = {"repo_name_md": "", "owner_md": "", "files": []} |
|
current_file_path = None |
|
current_file_content_lines = [] |
|
in_file_definition = False |
|
in_code_block = False |
|
|
|
lines = markdown_input.strip().split("\n") |
|
|
|
|
|
cleaned_lines = [] |
|
for line_content_orig in lines: |
|
if line_content_orig.strip().startswith("# "): |
|
|
|
if line_content_orig.strip().startswith("# ### File:") or \ |
|
line_content_orig.strip().startswith("# ## File Structure") or \ |
|
line_content_orig.strip().startswith("# # Space:"): |
|
cleaned_lines.append(line_content_orig.strip()[2:]) |
|
else: |
|
cleaned_lines.append(line_content_orig) |
|
else: |
|
cleaned_lines.append(line_content_orig) |
|
|
|
lines = cleaned_lines |
|
|
|
|
|
for line_content_orig in lines: |
|
line_content_stripped = line_content_orig.strip() |
|
|
|
if line_content_stripped.startswith("### File:"): |
|
|
|
if current_file_path is not None and in_file_definition: |
|
space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines).strip()}) |
|
|
|
current_file_path = line_content_stripped.replace("### File:", "").strip() |
|
|
|
current_file_path = re.split(r'\s*\(', current_file_path, 1)[0].strip() |
|
|
|
current_file_path = current_file_path.strip('`') |
|
|
|
|
|
current_file_content_lines = [] |
|
in_file_definition = True |
|
in_code_block = False |
|
continue |
|
|
|
|
|
if not in_file_definition: |
|
if line_content_stripped.startswith("# Space:"): |
|
full_space_name_md = line_content_stripped.replace("# Space:", "").strip() |
|
if "/" in full_space_name_md: |
|
parts = full_space_name_md.split("/", 1) |
|
if len(parts) == 2: |
|
space_info["owner_md"], space_info["repo_name_md"] = parts[0].strip(), parts[1].strip() |
|
else: |
|
space_info["repo_name_md"] = full_space_name_md |
|
else: |
|
space_info["repo_name_md"] = full_space_name_md |
|
|
|
continue |
|
|
|
|
|
if in_file_definition: |
|
if line_content_stripped.startswith("```"): |
|
|
|
in_code_block = not in_code_block |
|
|
|
if not in_code_block: |
|
|
|
pass |
|
else: |
|
|
|
pass |
|
continue |
|
|
|
|
|
if in_code_block: |
|
current_file_content_lines.append(line_content_orig) |
|
|
|
elif line_content_stripped.startswith("[Binary file") or line_content_stripped.startswith("[Error loading content:") or line_content_stripped.startswith("[Binary or Skipped file]"): |
|
|
|
current_file_content_lines.append(line_content_orig) |
|
|
|
|
|
|
|
|
|
|
|
if current_file_path is not None and in_file_definition: |
|
space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines).strip()}) |
|
|
|
|
|
|
|
space_info["files"] = [f for f in space_info["files"] if f.get("path")] |
|
|
|
|
|
|
|
|
|
space_info["owner_md"] = space_info["owner_md"].strip() |
|
space_info["repo_name_md"] = space_info["repo_name_md"].strip() |
|
|
|
|
|
return space_info |
|
|
|
|
|
|
|
def get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
sdk = None |
|
files = [] |
|
error = None |
|
repo_id = None |
|
|
|
try: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return None, None, token_err |
|
|
|
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) |
|
if err_repo_id: return None, None, err_repo_id |
|
repo_id_for_error_logging = repo_id |
|
|
|
api = HfApi(token=resolved_api_token) |
|
|
|
repo_info_obj = api.repo_info(repo_id=repo_id, repo_type="space", timeout=10) |
|
sdk = repo_info_obj.sdk |
|
files = [sibling.rfilename for sibling in repo_info_obj.siblings if sibling.rfilename] |
|
|
|
if not files and repo_info_obj.siblings: |
|
logger.warning(f"Repo {repo_id} has siblings but no rfilenames extracted.") |
|
|
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error getting repo info for {repo_id_for_error_logging or 'unknown repo'}: {e_http}") |
|
error_message = str(e_http) |
|
status_code = e_http.response.status_code if e_http.response is not None else None |
|
|
|
if status_code == 404: |
|
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found (404)." |
|
elif status_code in (401,403): |
|
error = f"Access denied for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." |
|
else: |
|
error = f"HTTP Error {status_code or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}" |
|
|
|
except Exception as e: |
|
|
|
logger.warning(f"Could not get full repo_info for {repo_id_for_error_logging or 'unknown repo'}, attempting list_repo_files fallback: {e}") |
|
error = f"Error retrieving Space info for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. Attempting file list fallback." |
|
|
|
try: |
|
|
|
resolved_api_token_fb, token_err_fb = _get_api_token(ui_api_token_from_textbox) |
|
if token_err_fb: return None, None, f"{error}\nAPI Token Error during fallback: {token_err_fb}" |
|
repo_id_fb, err_repo_id_fb = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) |
|
if err_repo_id_fb: return None, None, f"{error}\nRepo ID Error during fallback: {err_repo_id_fb}" |
|
|
|
|
|
files = list_repo_files(repo_id=repo_id_fb, token=resolved_api_token_fb, repo_type="space", timeout=10) |
|
|
|
error = f"Warning: Could not fetch full Space info (SDK etc.) for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. File list loaded via fallback." |
|
|
|
except HfHubHTTPError as e2_http: |
|
logger.error(f"HTTP error during fallback list_repo_files for {repo_id_for_error_logging or 'unknown repo'}: {e2_http}") |
|
error_message_fb = str(e2_http) |
|
status_code_fb = e2_http.response.status_code if e2_http.response is not None else None |
|
if status_code_fb == 404: |
|
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found during fallback (404)." |
|
else: |
|
error = f"HTTP Error {status_code_fb or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}' during fallback: {error_message_fb}" |
|
files = [] |
|
|
|
except Exception as e2: |
|
logger.exception(f"Error listing files for {repo_id_for_error_logging or 'unknown repo'} during fallback: {e2}") |
|
error = f"{error}\nError listing files during fallback for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e2)}" |
|
files = [] |
|
|
|
|
|
|
|
if not files and not error: |
|
error = f"No files found in Space `{repo_id_for_error_logging or 'unknown repo'}` (or an issue fetching them)." |
|
|
|
return sdk, files, error |
|
|
|
|
|
|
|
def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui): |
|
_sdk, files, err = get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui) |
|
return files, err |
|
|
|
|
|
|
|
def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
repo_id = None |
|
try: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return None, token_err |
|
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) |
|
if err_repo_id: return None, err_repo_id |
|
repo_id_for_error_logging = repo_id |
|
if not file_path_in_repo: return None, "Error: File path cannot be empty." |
|
|
|
file_path_in_repo = file_path_in_repo.replace("\\", "/") |
|
|
|
|
|
downloaded_file_path = hf_hub_download( |
|
repo_id=repo_id, |
|
filename=file_path_in_repo, |
|
repo_type="space", |
|
token=resolved_api_token, |
|
local_dir_use_symlinks=False, |
|
cache_dir=None |
|
) |
|
content = Path(downloaded_file_path).read_text(encoding="utf-8") |
|
return content, None |
|
except FileNotFoundError: |
|
return None, f"Error: File '{file_path_in_repo}' not found locally after download attempt." |
|
except UnicodeDecodeError: |
|
|
|
return None, f"Error: File '{file_path_in_repo}' is not valid UTF-8 text. Cannot display." |
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error fetching file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}") |
|
error_message = str(e_http) |
|
status_code = e_http.response.status_code if e_http.response is not None else None |
|
if status_code == 404: |
|
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)." |
|
if status_code in (401, 403): |
|
return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." |
|
return None, f"HTTP Error {status_code or 'unknown'} fetching file '{file_path_in_repo}': {error_message}" |
|
except Exception as e: |
|
logger.exception(f"Error fetching file content for {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:") |
|
return None, f"Error fetching file content: {str(e)}" |
|
|
|
|
|
def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input, private): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
repo_id = None |
|
try: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return token_err |
|
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) |
|
if err_repo_id: return err_repo_id |
|
repo_id_for_error_logging = repo_id |
|
|
|
space_info = parse_markdown(markdown_input) |
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
repo_staging_path = Path(temp_dir) / "repo_staging_content" |
|
repo_staging_path.mkdir(exist_ok=True) |
|
|
|
|
|
gitattributes_path = repo_staging_path / ".gitattributes" |
|
with open(gitattributes_path, "w") as f: |
|
f.write("* text=auto eol=lf\n") |
|
|
|
|
|
|
|
if not [f for f in space_info["files"] if not f.get("is_structure_block")]: |
|
logger.info(f"Markdown contained no standard files. Staging only .gitattributes for {repo_id}.") |
|
|
|
|
|
for file_info in space_info["files"]: |
|
if not file_info.get("path") or file_info.get("is_structure_block"): |
|
|
|
if not file_info.get("path"): logger.warning(f"Skipping file_info with no path: {file_info}") |
|
continue |
|
|
|
|
|
content_to_write = file_info.get("content", "") |
|
if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"): |
|
logger.info(f"Skipping binary/error placeholder file from build: {file_info['path']}") |
|
continue |
|
|
|
|
|
file_path_abs = repo_staging_path / file_info["path"] |
|
file_path_abs.parent.mkdir(parents=True, exist_ok=True) |
|
try: |
|
|
|
with open(file_path_abs, "w", encoding="utf-8") as f: |
|
f.write(content_to_write) |
|
except Exception as file_write_error: |
|
logger.error(f"Error writing file {file_info['path']} during staging: {file_write_error}") |
|
return f"Error staging file {file_info['path']}: {file_write_error}" |
|
|
|
|
|
|
|
create_repo(repo_id=repo_id, token=resolved_api_token, repo_type="space", space_sdk=sdk_ui, private=private, exist_ok=True) |
|
|
|
api = HfApi(token=resolved_api_token) |
|
|
|
|
|
try: |
|
current_hub_files_info = api.list_repo_files(repo_id=repo_id, repo_type="space", recursive=True) |
|
current_hub_files = set(current_hub_files_info) |
|
|
|
markdown_staged_filenames = set(str(Path(temp_dir) / "repo_staging_content" / f.get("path")).relative_to(repo_staging_path) for f in space_info["files"] if f.get("path") and not f.get("is_structure_block") and not (f.get("content", "").startswith("[Binary file") or f.get("content", "").startswith("[Error loading content:") or f.get("content", "").startswith("[Binary or Skipped file]"))) |
|
markdown_staged_filenames.add(".gitattributes") |
|
|
|
files_to_delete_on_hub = list(current_hub_files - markdown_staged_filenames) |
|
|
|
|
|
files_to_delete_on_hub = [f for f in files_to_delete_on_hub if not (f.startswith('.git') or (f == "README.md" and "README.md" not in markdown_staged_filenames))] |
|
|
|
|
|
if files_to_delete_on_hub: |
|
logger.info(f"Deleting {len(files_to_delete_on_hub)} files from {repo_id} not in new markdown structure: {files_to_delete_on_hub}") |
|
delete_operations = [CommitOperationDelete(path_in_repo=f) for f in files_to_delete_on_hub] |
|
if delete_operations: |
|
|
|
if list(repo_staging_path.iterdir()): |
|
|
|
|
|
try: |
|
api.create_commit( |
|
repo_id=repo_id, |
|
repo_type="space", |
|
operations=delete_operations, |
|
commit_message=f"AI Space Builder: Removed {len(files_to_delete_on_hub)} files not in updated structure." |
|
) |
|
logger.info("Successfully committed deletions.") |
|
except Exception as e_delete_commit: |
|
logger.error(f"Error committing deletions in {repo_id}: {e_delete_commit}. Proceeding with upload.") |
|
|
|
|
|
|
|
else: |
|
|
|
try: |
|
api.create_commit( |
|
repo_id=repo_id, |
|
repo_type="space", |
|
operations=delete_operations, |
|
commit_message=f"AI Space Builder: Removed {len(files_to_delete_on_hub)} files." |
|
) |
|
logger.info("Successfully committed deletions (only deletions).") |
|
|
|
return f"Successfully updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id}) (Files deleted)." |
|
except Exception as e_only_delete_commit: |
|
logger.error(f"Error committing deletions (only deletions) in {repo_id}: {e_only_delete_commit}.") |
|
return f"Error during Space update (deletions only): {str(e_only_delete_commit)}" |
|
|
|
|
|
except Exception as e_delete_old_prep: |
|
logger.error(f"Error during preparation for deletion of old files in {repo_id}: {e_delete_old_prep}. Proceeding with upload.") |
|
|
|
|
|
|
|
|
|
logger.info(f"Uploading staged files from {str(repo_staging_path)} to {repo_id}") |
|
|
|
upload_folder( |
|
repo_id=repo_id, |
|
folder_path=str(repo_staging_path), |
|
path_in_repo=".", |
|
token=resolved_api_token, |
|
repo_type="space", |
|
commit_message=f"AI Space Builder: Space content update for {repo_id}" |
|
) |
|
|
|
return f"Successfully created/updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id})" |
|
|
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error during create_space for {repo_id_for_error_logging or 'unknown repo'}: {e_http}") |
|
error_message = str(e_http) |
|
status_code = e_http.response.status_code if e_http.response is not None else None |
|
if status_code == 409: |
|
return f"Error creating/updating Space '{repo_id_for_error_logging or 'unknown repo'}: Conflict (Space might exist with different owner/settings)." |
|
if status_code in (401, 403): |
|
return f"Error creating/updating Space '{repo_id_for_error_logging or 'unknown repo'}': Access denied or authentication required ({status_code}). Check token permissions." |
|
return f"HTTP Error {status_code or 'unknown'} during Space creation/update: {error_message}" |
|
except Exception as e: |
|
logger.exception(f"Error in create_space for {repo_id_for_error_logging or 'unknown repo'}:") |
|
return f"Error during Space creation/update: {str(e)}" |
|
|
|
|
|
def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
repo_id = None |
|
try: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return token_err |
|
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) |
|
if err_repo_id: return err_repo_id |
|
repo_id_for_error_logging = repo_id |
|
|
|
if not file_path_in_repo: return "Error: File Path to update cannot be empty." |
|
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/') |
|
commit_msg = commit_message_ui or f"Update {file_path_in_repo} via AI Space Editor" |
|
|
|
api = HfApi(token=resolved_api_token) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8') as tmp_file_obj: |
|
tmp_file_obj.write(file_content) |
|
tmp_file_path = tmp_file_obj.name |
|
|
|
try: |
|
|
|
api.upload_file( |
|
path_or_fileobj=tmp_file_path, |
|
path_in_repo=file_path_in_repo, |
|
repo_id=repo_id, |
|
repo_type="space", |
|
commit_message=commit_msg |
|
) |
|
return f"Successfully updated `{file_path_in_repo}` in Space [{repo_id}](https://huggingface.co/spaces/{repo_id})" |
|
finally: |
|
|
|
if os.path.exists(tmp_file_path): |
|
os.remove(tmp_file_path) |
|
|
|
except FileNotFoundError: |
|
return f"Error: Local temporary file not found during upload for '{file_path_in_repo}'." |
|
except UnicodeDecodeError: |
|
|
|
return f"Error: File '{file_path_in_repo}' is not valid UTF-8 text. Cannot display or edit." |
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}: {e_http}") |
|
error_message = str(e_http) |
|
status_code = e_http.response.status_code if e_http.response is not None else None |
|
if status_code == 404: |
|
return f"Error: Space '{repo_id_for_error_logging or 'unknown repo'}' or file '{file_path_in_repo}' not found (404)." |
|
if status_code in (401, 403): |
|
return f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." |
|
return f"HTTP Error {status_code or 'unknown'} updating file '{file_path_in_repo}': {error_message}" |
|
except Exception as e: |
|
logger.exception(f"Error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}:") |
|
return f"Error updating file for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}" |
|
|
|
|
|
|
|
def delete_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, commit_message_ui=None): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
repo_id = None |
|
try: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return f"API Token Error: {token_err}" |
|
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) |
|
if err_repo_id: return f"Repo ID Error: {err_repo_id}" |
|
repo_id_for_error_logging = repo_id |
|
|
|
if not file_path_in_repo: return "Error: File path cannot be empty for deletion." |
|
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/') |
|
|
|
|
|
|
|
|
|
effective_commit_message = commit_message_ui or f"Deleted file: {file_path_in_repo} via AI Space Editor" |
|
|
|
|
|
hf_delete_file( |
|
path_in_repo=file_path_in_repo, |
|
repo_id=repo_id, |
|
repo_type="space", |
|
token=resolved_api_token, |
|
commit_message=effective_commit_message |
|
) |
|
return f"Successfully deleted file: {file_path_in_repo}" |
|
|
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}") |
|
error_message = str(e_http) |
|
status_code = e_http.response.status_code if e_http.response is not None else None |
|
|
|
if status_code == 404: |
|
return f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' for deletion (404)." |
|
if status_code in (401, 403): |
|
return f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." |
|
return f"HTTP Error {status_code or 'unknown'} deleting file '{file_path_in_repo}': {error_message}" |
|
except Exception as e: |
|
logger.exception(f"Error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:") |
|
return f"Error deleting file '{file_path_in_repo}': {str(e)}" |
|
|
|
|
|
def get_space_runtime_status(ui_api_token_from_textbox, space_name_ui, owner_ui): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
repo_id = None |
|
try: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return None, f"API Token Error: {token_err}" |
|
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) |
|
if err_repo_id: return None, f"Repo ID Error: {err_repo_id}" |
|
repo_id_for_error_logging = repo_id |
|
|
|
api = HfApi(token=resolved_api_token) |
|
logger.info(f"Fetching runtime status for Space: {repo_id}") |
|
|
|
|
|
runtime_info = api.get_space_runtime(repo_id=repo_id) |
|
|
|
|
|
status_details = { |
|
"stage": runtime_info.stage, |
|
"hardware": runtime_info.hardware, |
|
"requested_hardware": runtime_info.requested_hardware if hasattr(runtime_info, 'requested_hardware') else None, |
|
"error_message": None, |
|
"full_log_link": f"https://huggingface.co/spaces/{repo_id}/logs", |
|
"raw_data": runtime_info.raw |
|
} |
|
|
|
|
|
if runtime_info.stage == "ERRORED": |
|
error_content = None |
|
|
|
if hasattr(runtime_info, 'error') and runtime_info.error: error_content = str(runtime_info.error) |
|
elif 'message' in runtime_info.raw and isinstance(runtime_info.raw['message'], str) and ('error' in runtime_info.raw['message'].lower() or runtime_info.raw['message'].strip().endswith('!')): |
|
error_content = runtime_info.raw['message'] |
|
elif 'error' in runtime_info.raw: error_content = str(runtime_info.raw['error']) |
|
|
|
|
|
if 'build' in runtime_info.raw and isinstance(runtime_info.raw['build'], dict) and runtime_info.raw['build'].get('status') == 'error': |
|
error_content = f"Build Error: {runtime_info.raw['build'].get('message', error_content or 'Unknown build error')}" |
|
elif 'run' in runtime_info.raw and isinstance(runtime_info.raw['run'], dict) and runtime_info.raw['run'].get('status') == 'error': |
|
error_content = f"Runtime Error: {runtime_info.raw['run'].get('message', error_content or 'Unknown runtime error')}" |
|
|
|
status_details["error_message"] = error_content if error_content else "Space is in an errored state. Check logs for details." |
|
|
|
logger.info(f"Runtime status for {repo_id}: {status_details['stage']}") |
|
return status_details, None |
|
|
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}: {e_http}") |
|
error_message = str(e_http) |
|
status_code = e_http.response.status_code if e_http.response is not None else None |
|
|
|
if status_code == 404: |
|
|
|
return None, f"Error: Space '{repo_id_for_error_logging or 'unknown repo'}' not found or has no active runtime status (404)." |
|
if status_code in (401, 403): |
|
return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." |
|
return None, f"HTTP Error {status_code or 'unknown'} fetching runtime status for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}" |
|
|
|
except Exception as e: |
|
logger.exception(f"Error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}:") |
|
return None, f"Error fetching runtime status: {str(e)}" |