Huggingface-Space-Commander / build_logic.py
broadfield-dev's picture
Create build_logic.py
b80b022 verified
import os
import re
import tempfile
import shutil
import logging
from pathlib import Path
from huggingface_hub import (
create_repo,
upload_folder,
list_repo_files,
whoami,
hf_hub_download,
delete_file as hf_delete_file,
HfApi
)
from huggingface_hub.hf_api import CommitOperationDelete
# Import the general HTTP error from huggingface_hub.utils
from huggingface_hub.utils import HfHubHTTPError # For catching specific HF HTTP errors
# Setup basic logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# --- Helper Function to Get API Token ---
def _get_api_token(ui_token_from_textbox=None):
env_token = os.getenv('HF_TOKEN')
if env_token: return env_token, None
if ui_token_from_textbox: return ui_token_from_textbox, None
return None, "Error: Hugging Face API token not provided in UI or HF_TOKEN env var."
# --- Helper Function to Determine Repo ID ---
def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui):
if not space_name_ui: return None, "Error: Space Name cannot be empty."
if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field for the owner part."
final_owner = owner_ui
error_message = None
if not final_owner:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, token_err
if not resolved_api_token: return None, "Error: API token required for auto owner determination if Owner field is empty."
try:
user_info = whoami(token=resolved_api_token)
if user_info and 'name' in user_info:
final_owner = user_info['name']
else:
error_message = "Error: Could not retrieve username from token. Check token permissions or specify Owner."
except Exception as e:
error_message = f"Error retrieving username from token: {str(e)}. Specify Owner or check token."
if error_message: return None, error_message
if not final_owner: return None, "Error: Owner could not be determined. Please specify it in the Owner field."
return f"{final_owner}/{space_name_ui}", None
# --- Corrected Markdown Parsing ---
def parse_markdown(markdown_input):
space_info = {"repo_name_md": "", "owner_md": "", "files": []}
current_file_path = None
current_file_content_lines = []
in_file_definition = False
in_code_block = False
lines = markdown_input.strip().split("\n")
# Clean up potential leading '#' added by Gradio's Markdown sometimes
cleaned_lines = []
for line_content_orig in lines:
if line_content_orig.strip().startswith("# "):
# Only strip leading # if it looks like a Markdown heading related to our format
if line_content_orig.strip().startswith("# ### File:") or \
line_content_orig.strip().startswith("# ## File Structure") or \
line_content_orig.strip().startswith("# # Space:"):
cleaned_lines.append(line_content_orig.strip()[2:])
else:
cleaned_lines.append(line_content_orig)
else:
cleaned_lines.append(line_content_orig)
lines = cleaned_lines
for line_content_orig in lines:
line_content_stripped = line_content_orig.strip()
if line_content_stripped.startswith("### File:"):
# Before processing a new file, save the content of the previous one
if current_file_path is not None and in_file_definition: # Check if we were inside a file definition
space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines).strip()})
current_file_path = line_content_stripped.replace("### File:", "").strip()
# Clean up potential trailing descriptions like "(main application)"
current_file_path = re.split(r'\s*\(', current_file_path, 1)[0].strip()
# Clean up potential backticks around the filename
current_file_path = current_file_path.strip('`')
current_file_content_lines = []
in_file_definition = True
in_code_block = False # Reset code block flag for the new file
continue
# If we are not currently inside a file definition block (i.e., before the first "### File:")
if not in_file_definition:
if line_content_stripped.startswith("# Space:"):
full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
if "/" in full_space_name_md:
parts = full_space_name_md.split("/", 1)
if len(parts) == 2:
space_info["owner_md"], space_info["repo_name_md"] = parts[0].strip(), parts[1].strip()
else:
space_info["repo_name_md"] = full_space_name_md # Handle case like "user/repo/"
else:
space_info["repo_name_md"] = full_space_name_md
# Ignore other lines outside a file block for now (like "## File Structure" preamble)
continue
# If we are inside a file definition block
if in_file_definition:
if line_content_stripped.startswith("```"):
# Toggle code block status
in_code_block = not in_code_block
# If exiting a code block, the next lines are not part of the code
if not in_code_block:
# We consume the ``` line itself, don't add it to content
pass
else:
# If entering a code block, we consume the ```lang line itself
pass
continue # Do not add the ``` line to content
# If inside a code block, add the line as-is (original content, including leading/trailing whitespace)
if in_code_block:
current_file_content_lines.append(line_content_orig)
# If not inside a code block, check for binary file marker
elif line_content_stripped.startswith("[Binary file") or line_content_stripped.startswith("[Error loading content:") or line_content_stripped.startswith("[Binary or Skipped file]"):
# Handle binary file markers or error messages as content if not in code block
current_file_content_lines.append(line_content_orig)
# Any other lines outside code blocks within a file definition are ignored (e.g., descriptions, blank lines)
# This assumes all code/content *must* be within ``` blocks or be a specific marker line.
# After the loop, save the content of the last file
if current_file_path is not None and in_file_definition:
space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines).strip()})
# Ensure all file paths are valid and clean up empty files if necessary (based on content parsing)
# The parsing logic above should handle stripping content, but this is a final check
space_info["files"] = [f for f in space_info["files"] if f.get("path")] # Ensure path exists
# Optional: Filter out files where content became empty after strip() if that's desired behavior.
# Currently, it keeps files with empty content, which is fine for creating empty files.
# Clean up owner/repo names from potential whitespace
space_info["owner_md"] = space_info["owner_md"].strip()
space_info["repo_name_md"] = space_info["repo_name_md"].strip()
return space_info
# --- Function to Get Space SDK and Files ---
def get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
sdk = None
files = []
error = None
repo_id = None # Define repo_id here to ensure it's available for error logging after _determine_repo_id
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, None, token_err
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id: return None, None, err_repo_id
repo_id_for_error_logging = repo_id # Update logging name
api = HfApi(token=resolved_api_token)
# Use repo_info endpoint as it's more robust and gives SDK
repo_info_obj = api.repo_info(repo_id=repo_id, repo_type="space", timeout=10) # Added timeout
sdk = repo_info_obj.sdk
files = [sibling.rfilename for sibling in repo_info_obj.siblings if sibling.rfilename]
if not files and repo_info_obj.siblings:
logger.warning(f"Repo {repo_id} has siblings but no rfilenames extracted.")
except HfHubHTTPError as e_http: # Catch specific HF HTTP errors first
logger.error(f"HTTP error getting repo info for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found (404)."
elif status_code in (401,403):
error = f"Access denied for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
else:
error = f"HTTP Error {status_code or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}"
except Exception as e: # Catch other general exceptions
# If repo_info failed, try listing files as a fallback
logger.warning(f"Could not get full repo_info for {repo_id_for_error_logging or 'unknown repo'}, attempting list_repo_files fallback: {e}")
error = f"Error retrieving Space info for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. Attempting file list fallback." # Set a warning message
try:
# Re-determine repo_id and get token for fallback
resolved_api_token_fb, token_err_fb = _get_api_token(ui_api_token_from_textbox)
if token_err_fb: return None, None, f"{error}\nAPI Token Error during fallback: {token_err_fb}" # Propagate token error
repo_id_fb, err_repo_id_fb = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id_fb: return None, None, f"{error}\nRepo ID Error during fallback: {err_repo_id_fb}" # Propagate repo ID error
# Attempt to list files
files = list_repo_files(repo_id=repo_id_fb, token=resolved_api_token_fb, repo_type="space", timeout=10) # Added timeout
# If fallback is successful, update error message to a warning about repo_info
error = f"Warning: Could not fetch full Space info (SDK etc.) for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. File list loaded via fallback."
except HfHubHTTPError as e2_http:
logger.error(f"HTTP error during fallback list_repo_files for {repo_id_for_error_logging or 'unknown repo'}: {e2_http}")
error_message_fb = str(e2_http)
status_code_fb = e2_http.response.status_code if e2_http.response is not None else None
if status_code_fb == 404:
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found during fallback (404)."
else:
error = f"HTTP Error {status_code_fb or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}' during fallback: {error_message_fb}"
files = [] # Ensure files list is empty on fallback error
except Exception as e2:
logger.exception(f"Error listing files for {repo_id_for_error_logging or 'unknown repo'} during fallback: {e2}")
error = f"{error}\nError listing files during fallback for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e2)}"
files = [] # Ensure files list is empty on fallback error
# Final check: if files are still empty and there's no specific error, provide a generic "no files" message
if not files and not error:
error = f"No files found in Space `{repo_id_for_error_logging or 'unknown repo'}` (or an issue fetching them)."
return sdk, files, error
# --- Function to list files ---
def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui):
_sdk, files, err = get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui)
return files, err
# --- Function to Fetch File Content from Hub ---
def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, token_err
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id: return None, err_repo_id
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return None, "Error: File path cannot be empty."
# Ensure file_path_in_repo uses forward slashes
file_path_in_repo = file_path_in_repo.replace("\\", "/")
# Use hf_hub_download first, which caches locally
downloaded_file_path = hf_hub_download(
repo_id=repo_id,
filename=file_path_in_repo,
repo_type="space",
token=resolved_api_token,
local_dir_use_symlinks=False, # Avoid symlinks issues
cache_dir=None # Use default cache dir
)
content = Path(downloaded_file_path).read_text(encoding="utf-8")
return content, None
except FileNotFoundError:
return None, f"Error: File '{file_path_in_repo}' not found locally after download attempt."
except UnicodeDecodeError:
# If read_text fails, it's likely binary or non-utf8 text
return None, f"Error: File '{file_path_in_repo}' is not valid UTF-8 text. Cannot display."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error fetching file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
if status_code in (401, 403):
return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return None, f"HTTP Error {status_code or 'unknown'} fetching file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error fetching file content for {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:")
return None, f"Error fetching file content: {str(e)}"
# --- Create/Update Space ---
def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input, private):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return token_err
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id: return err_repo_id
repo_id_for_error_logging = repo_id # Update logging name
space_info = parse_markdown(markdown_input)
with tempfile.TemporaryDirectory() as temp_dir:
repo_staging_path = Path(temp_dir) / "repo_staging_content"
repo_staging_path.mkdir(exist_ok=True)
# Always write .gitattributes to ensure LF line endings
gitattributes_path = repo_staging_path / ".gitattributes"
with open(gitattributes_path, "w") as f:
f.write("* text=auto eol=lf\n")
# If there are no files parsed from markdown *other than* the structure block,
# ensure the .gitattributes file is still staged.
if not [f for f in space_info["files"] if not f.get("is_structure_block")]:
logger.info(f"Markdown contained no standard files. Staging only .gitattributes for {repo_id}.")
for file_info in space_info["files"]:
if not file_info.get("path") or file_info.get("is_structure_block"):
# Skip entries without a path or the structure block representation
if not file_info.get("path"): logger.warning(f"Skipping file_info with no path: {file_info}")
continue
# Skip files that were marked as binary/error during loading
content_to_write = file_info.get("content", "")
if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"):
logger.info(f"Skipping binary/error placeholder file from build: {file_info['path']}")
continue
file_path_abs = repo_staging_path / file_info["path"]
file_path_abs.parent.mkdir(parents=True, exist_ok=True) # Create parent directories
try:
# Ensure content is treated as text and written with utf-8 encoding
with open(file_path_abs, "w", encoding="utf-8") as f:
f.write(content_to_write)
except Exception as file_write_error:
logger.error(f"Error writing file {file_info['path']} during staging: {file_write_error}")
return f"Error staging file {file_info['path']}: {file_write_error}"
# Create or ensure repo exists
create_repo(repo_id=repo_id, token=resolved_api_token, repo_type="space", space_sdk=sdk_ui, private=private, exist_ok=True)
api = HfApi(token=resolved_api_token)
# Determine files to delete (files on Hub not in markdown)
try:
current_hub_files_info = api.list_repo_files(repo_id=repo_id, repo_type="space", recursive=True)
current_hub_files = set(current_hub_files_info)
# Get filenames from the markdown that were actually staged (not skipped binaries/structure)
markdown_staged_filenames = set(str(Path(temp_dir) / "repo_staging_content" / f.get("path")).relative_to(repo_staging_path) for f in space_info["files"] if f.get("path") and not f.get("is_structure_block") and not (f.get("content", "").startswith("[Binary file") or f.get("content", "").startswith("[Error loading content:") or f.get("content", "").startswith("[Binary or Skipped file]")))
markdown_staged_filenames.add(".gitattributes") # Always keep .gitattributes if we staged it
files_to_delete_on_hub = list(current_hub_files - markdown_staged_filenames)
# Exclude .git/ files and potentially README.md if we didn't explicitly include it in markdown
files_to_delete_on_hub = [f for f in files_to_delete_on_hub if not (f.startswith('.git') or (f == "README.md" and "README.md" not in markdown_staged_filenames))]
if files_to_delete_on_hub:
logger.info(f"Deleting {len(files_to_delete_on_hub)} files from {repo_id} not in new markdown structure: {files_to_delete_on_hub}")
delete_operations = [CommitOperationDelete(path_in_repo=f) for f in files_to_delete_on_hub]
if delete_operations:
# Check if there are also files to upload in this commit
if list(repo_staging_path.iterdir()): # Check if staging dir has anything to upload
# Combine delete and upload if possible (advanced scenario, requires specific hf_api methods)
# For simplicity here, do deletes in a separate commit before upload_folder
try:
api.create_commit(
repo_id=repo_id,
repo_type="space",
operations=delete_operations,
commit_message=f"AI Space Builder: Removed {len(files_to_delete_on_hub)} files not in updated structure."
)
logger.info("Successfully committed deletions.")
except Exception as e_delete_commit:
logger.error(f"Error committing deletions in {repo_id}: {e_delete_commit}. Proceeding with upload.")
# If delete commit fails, maybe upload_folder can handle concurrent ops?
# Or perhaps the files will be overwritten anyway if present in staging?
# It's safest to report the delete error but attempt upload.
else:
# If only deletions are happening (staging is empty except maybe .gitattributes)
try:
api.create_commit(
repo_id=repo_id,
repo_type="space",
operations=delete_operations,
commit_message=f"AI Space Builder: Removed {len(files_to_delete_on_hub)} files."
)
logger.info("Successfully committed deletions (only deletions).")
# If only deleting, we are done.
return f"Successfully updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id}) (Files deleted)."
except Exception as e_only_delete_commit:
logger.error(f"Error committing deletions (only deletions) in {repo_id}: {e_only_delete_commit}.")
return f"Error during Space update (deletions only): {str(e_only_delete_commit)}"
except Exception as e_delete_old_prep:
logger.error(f"Error during preparation for deletion of old files in {repo_id}: {e_delete_old_prep}. Proceeding with upload.")
# Don't return here, allow the upload to happen.
# Upload the staged files (including .gitattributes and any new/updated files)
logger.info(f"Uploading staged files from {str(repo_staging_path)} to {repo_id}")
# Use upload_folder which handles creating/updating files based on the staging directory content
upload_folder(
repo_id=repo_id,
folder_path=str(repo_staging_path),
path_in_repo=".", # Upload to the root of the repository
token=resolved_api_token,
repo_type="space",
commit_message=f"AI Space Builder: Space content update for {repo_id}"
)
return f"Successfully created/updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id})"
except HfHubHTTPError as e_http:
logger.error(f"HTTP error during create_space for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 409: # Conflict, often means repo exists but maybe wrong type/owner?
return f"Error creating/updating Space '{repo_id_for_error_logging or 'unknown repo'}: Conflict (Space might exist with different owner/settings)."
if status_code in (401, 403):
return f"Error creating/updating Space '{repo_id_for_error_logging or 'unknown repo'}': Access denied or authentication required ({status_code}). Check token permissions."
return f"HTTP Error {status_code or 'unknown'} during Space creation/update: {error_message}"
except Exception as e:
logger.exception(f"Error in create_space for {repo_id_for_error_logging or 'unknown repo'}:")
return f"Error during Space creation/update: {str(e)}"
# --- Update Single File ---
def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return token_err
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id: return err_repo_id
repo_id_for_error_logging = repo_id # Update logging name
if not file_path_in_repo: return "Error: File Path to update cannot be empty."
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/') # Clean path for Hub
commit_msg = commit_message_ui or f"Update {file_path_in_repo} via AI Space Editor"
api = HfApi(token=resolved_api_token)
# Use a temporary file to upload content safely
with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8') as tmp_file_obj:
tmp_file_obj.write(file_content)
tmp_file_path = tmp_file_obj.name
try:
# Upload the temporary file to the specified path in the repo
api.upload_file(
path_or_fileobj=tmp_file_path,
path_in_repo=file_path_in_repo,
repo_id=repo_id,
repo_type="space",
commit_message=commit_msg
)
return f"Successfully updated `{file_path_in_repo}` in Space [{repo_id}](https://huggingface.co/spaces/{repo_id})"
finally:
# Ensure the temporary file is removed
if os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
except FileNotFoundError:
return f"Error: Local temporary file not found during upload for '{file_path_in_repo}'."
except UnicodeDecodeError:
# If read_text fails, it's likely binary or non-utf8 text
return f"Error: File '{file_path_in_repo}' is not valid UTF-8 text. Cannot display or edit."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return f"Error: Space '{repo_id_for_error_logging or 'unknown repo'}' or file '{file_path_in_repo}' not found (404)."
if status_code in (401, 403):
return f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return f"HTTP Error {status_code or 'unknown'} updating file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}:")
return f"Error updating file for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}"
# --- Delete Single File ---
def delete_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, commit_message_ui=None):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id: return f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id # Update logging name
if not file_path_in_repo: return "Error: File path cannot be empty for deletion."
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/') # Clean path for Hub
# Prevent deleting essential files like .gitattributes or README.md unless explicitly handled?
# For now, allow deleting anything selected in the dropdown.
effective_commit_message = commit_message_ui or f"Deleted file: {file_path_in_repo} via AI Space Editor"
# Use hf_delete_file directly
hf_delete_file(
path_in_repo=file_path_in_repo,
repo_id=repo_id,
repo_type="space",
token=resolved_api_token,
commit_message=effective_commit_message
)
return f"Successfully deleted file: {file_path_in_repo}"
except HfHubHTTPError as e_http: # Catch specific HF HTTP errors
logger.error(f"HTTP error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' for deletion (404)."
if status_code in (401, 403):
return f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return f"HTTP Error {status_code or 'unknown'} deleting file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:")
return f"Error deleting file '{file_path_in_repo}': {str(e)}"
# --- Get Space Runtime Status ---
def get_space_runtime_status(ui_api_token_from_textbox, space_name_ui, owner_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id: return None, f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id # Update logging name
api = HfApi(token=resolved_api_token)
logger.info(f"Fetching runtime status for Space: {repo_id}")
# Use get_space_runtime which provides details like stage, hardware, etc.
runtime_info = api.get_space_runtime(repo_id=repo_id)
# Structure the details for display
status_details = {
"stage": runtime_info.stage,
"hardware": runtime_info.hardware,
"requested_hardware": runtime_info.requested_hardware if hasattr(runtime_info, 'requested_hardware') else None, # requested_hardware might not always be present
"error_message": None,
"full_log_link": f"https://huggingface.co/spaces/{repo_id}/logs",
"raw_data": runtime_info.raw # Include raw data for detailed inspection if needed
}
# Check for specific error states or messages
if runtime_info.stage == "ERRORED":
error_content = None
# Look for error details in various places within the raw data or the error attribute
if hasattr(runtime_info, 'error') and runtime_info.error: error_content = str(runtime_info.error)
elif 'message' in runtime_info.raw and isinstance(runtime_info.raw['message'], str) and ('error' in runtime_info.raw['message'].lower() or runtime_info.raw['message'].strip().endswith('!')): # Basic check for message indicative of error
error_content = runtime_info.raw['message']
elif 'error' in runtime_info.raw: error_content = str(runtime_info.raw['error'])
# Check build/run specific error messages in raw data
if 'build' in runtime_info.raw and isinstance(runtime_info.raw['build'], dict) and runtime_info.raw['build'].get('status') == 'error':
error_content = f"Build Error: {runtime_info.raw['build'].get('message', error_content or 'Unknown build error')}"
elif 'run' in runtime_info.raw and isinstance(runtime_info.raw['run'], dict) and runtime_info.raw['run'].get('status') == 'error':
error_content = f"Runtime Error: {runtime_info.raw['run'].get('message', error_content or 'Unknown runtime error')}"
status_details["error_message"] = error_content if error_content else "Space is in an errored state. Check logs for details."
logger.info(f"Runtime status for {repo_id}: {status_details['stage']}")
return status_details, None
except HfHubHTTPError as e_http: # Catch specific HF HTTP errors
logger.error(f"HTTP error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
# A 404 could mean the space doesn't exist or doesn't have an active runtime state recorded
return None, f"Error: Space '{repo_id_for_error_logging or 'unknown repo'}' not found or has no active runtime status (404)."
if status_code in (401, 403):
return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return None, f"HTTP Error {status_code or 'unknown'} fetching runtime status for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}"
except Exception as e:
logger.exception(f"Error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}:")
return None, f"Error fetching runtime status: {str(e)}"