Spaces:
Running
Running
import gradio as gr | |
from huggingface_hub import InferenceClient, HfHubHTTPError | |
import os | |
import re | |
import traceback | |
# --- Configuration --- | |
API_TOKEN = os.getenv("HF_TOKEN", None) | |
# MODEL = "Qwen/Qwen3-32B" # This is a very large model, might require specific inference endpoint/hardware | |
# Let's try a smaller, generally available model for testing first, e.g., Mixtral | |
# You can change this back if you are sure Qwen3-32B is available and configured for your space/token | |
# MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
# Or uncomment the Qwen model if you are certain it's correctly set up for inference: | |
MODEL = "Qwen/Qwen3-32B" | |
# i have used Qwen3 because its quiet compatible | |
# --- Hugging Face Client Initialization --- | |
print("--- App Start ---") | |
if not API_TOKEN: | |
print("Warning: HF_TOKEN environment variable not set. Using anonymous access.") | |
print("Certain models might require a token for access.") | |
else: | |
print(f"HF_TOKEN found (length={len(API_TOKEN)}).") # Don't print the token itself | |
try: | |
print(f"Initializing Inference Client for model: {MODEL}") | |
# Explicitly pass token=None if not found, though InferenceClient handles it. | |
client = InferenceClient(model=MODEL, token=API_TOKEN if API_TOKEN else None) | |
print("Inference Client Initialized Successfully.") | |
# Optional: Add a quick test call if feasible, but be mindful of potential costs/rate limits | |
# try: | |
# client.text_generation("test", max_new_tokens=1) | |
# print("Test generation successful.") | |
# except Exception as test_e: | |
# print(f"Warning: Test generation failed. Client might be initialized but model access could be problematic. Error: {test_e}") | |
except HfHubHTTPError as http_err: | |
# More specific error handling for HTTP errors (like 401 Unauthorized, 403 Forbidden, 404 Not Found) | |
error_message = ( | |
f"Failed to initialize model client for {MODEL} due to an HTTP error.\n" | |
f"Status Code: {http_err.response.status_code}\n" | |
f"Error: {http_err}\n" | |
f"Check:\n" | |
f"1. If '{MODEL}' is a valid model ID on Hugging Face Hub.\n" | |
f"2. If the model requires gating or specific permissions.\n" | |
f"3. If your HF_TOKEN is correct and has the necessary permissions (set as a Secret in your Space).\n" | |
f"4. If the default Inference API supports this model or if a dedicated Inference Endpoint is needed." | |
) | |
print(f"ERROR: {error_message}") | |
raise gr.Error(error_message) | |
except Exception as e: | |
error_message = ( | |
f"An unexpected error occurred while initializing the model client for {MODEL}.\n" | |
f"Error Type: {type(e).__name__}\n" | |
f"Error: {e}\n" | |
f"Traceback:\n{traceback.format_exc()}\n" # Add traceback | |
f"Check HF_TOKEN, model availability, network connection, and Space resources." | |
) | |
print(f"ERROR: {error_message}") | |
raise gr.Error(error_message) | |
# --- Helper Functions --- | |
# Parse all ```filename.ext\n<code>``` blocks | |
def parse_code_blocks(response: str) -> list: | |
pattern = r"```([^\n]+)\n(.*?)```" | |
blocks = re.findall(pattern, response, re.DOTALL) | |
files = [] | |
for filename, code in blocks: | |
filename = filename.strip() | |
code = code.strip() | |
# Basic language detection (can be expanded) | |
lang = None | |
if filename.endswith(".py"): | |
lang = "python" | |
elif filename.endswith(".js"): | |
lang = "javascript" | |
elif filename.endswith(".html"): | |
lang = "html" | |
elif filename.endswith(".css"): | |
lang = "css" | |
elif filename.endswith(".json"): | |
lang = "json" | |
elif filename.endswith(".md"): | |
lang = "markdown" | |
elif filename.endswith(".sh") or filename.endswith(".bash"): | |
lang = "bash" | |
elif filename.endswith(".java"): | |
lang = "java" | |
# Add more extensions as needed | |
files.append({ | |
"filename": filename, | |
"language": lang, | |
"code": code | |
}) | |
# Add logging to see what's parsed | |
# print(f"Parsed {len(files)} code blocks.") | |
# for i, f in enumerate(files): | |
# print(f" Block {i}: filename='{f['filename']}', lang='{f['language']}', code_len={len(f['code'])}") | |
return files | |
def strip_think_tags(text: str) -> str: | |
return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip() | |
def extract_thoughts(text: str) -> str: | |
matches = re.findall(r"<think>(.*?)</think>", text, flags=re.DOTALL) | |
# Only return the last thought block for cleaner display? Or join all? Let's join. | |
return "\n---\n".join(match.strip() for match in matches).strip() | |
# --- System Message --- | |
system_message = ( | |
"You are a helpful AI assistant specialized in generating website code. " | |
"Generate all the necessary files based on the user's request. " | |
"Output each file within a separate markdown code block formatted exactly like this:\n" | |
"```filename.ext\n" | |
"<code>\n" | |
"```\n" | |
"Do not add any explanatory text outside the code blocks. Ensure the filenames have appropriate extensions. " | |
"If you need to think step-by-step, use <think>...</think> tags. These tags will be hidden from the final user output but help guide your generation process." | |
) | |
# --- Code Generation Function --- | |
def generate_code(prompt, backend_choice, max_tokens, temperature, top_p): | |
if not prompt: | |
# Handle empty prompt case | |
yield [], gr.update(value="Please enter a description for the website.", visible=True) | |
return | |
# Use f-string formatting for clarity | |
user_prompt = f"USER_PROMPT: {prompt}\nUSER_BACKEND_PREFERENCE: {backend_choice}" | |
messages = [ | |
{"role": "system", "content": system_message}, | |
{"role": "user", "content": user_prompt} | |
] | |
full_response = "" | |
current_thoughts = "" | |
accumulated_error = "" # Accumulate errors during stream | |
# Reset outputs: Clear previous code blocks and show/clear thinking box | |
# Yield an empty list to the gr.Column to clear it. | |
# Make thinking box visible but empty. | |
yield [], gr.update(visible=True, value="Generating code...") | |
print(f"\n--- Generating Code ---") | |
print(f"Prompt: {prompt[:100]}...") # Log truncated prompt | |
print(f"Backend: {backend_choice}, Max Tokens: {max_tokens}, Temp: {temperature}, Top-P: {top_p}") | |
try: | |
stream = client.chat_completion( | |
messages=messages, | |
max_tokens=max_tokens, | |
stream=True, | |
temperature=temperature if temperature > 0 else 0.01, # Ensure temp is positive | |
top_p=top_p, | |
# Consider adding stop sequences if the model tends to run on | |
# stop=["```\n\n", "\n\nHuman:", "\n\nUSER:"] # Example stop sequences | |
) | |
code_updates = [] # Store the gr.Code components to yield | |
for i, message in enumerate(stream): | |
# Check for errors in the stream message (some providers might include error info) | |
if hasattr(message, 'error') and message.error: | |
accumulated_error += f"Error in stream chunk {i}: {message.error}\n" | |
print(f"ERROR in stream chunk {i}: {message.error}") | |
continue # Skip this chunk if it's an error indicator | |
# Ensure the path to content is correct | |
try: | |
# Common path: message.choices[0].delta.content | |
token = message.choices[0].delta.content | |
# Handle potential None token at the end of the stream or in error cases | |
if token is None: | |
token = "" | |
# print(f"Token {i}: '{token}'") # DEBUG: print each token | |
except (AttributeError, IndexError, TypeError) as e: | |
# Handle unexpected message structure | |
print(f"Warning: Could not extract token from stream message {i}. Structure: {message}. Error: {e}") | |
token = "" # Assign empty string to avoid breaking accumulation | |
if isinstance(token, str): | |
full_response += token | |
# Update thinking box periodically (e.g., every 10 tokens or if thoughts change) | |
if i % 10 == 0 or "<think>" in token or "</think>" in token: | |
thoughts = extract_thoughts(full_response) | |
if thoughts != current_thoughts: | |
current_thoughts = thoughts | |
# Don't yield code_updates here yet, only update thoughts | |
yield code_updates, gr.update(value=current_thoughts if current_thoughts else "Thinking...", visible=True) | |
# Update code blocks less frequently or when a block seems complete | |
# Heuristic: update if the response ends with ``` | |
if token.strip().endswith("```") or i % 20 == 0: # Adjust frequency as needed | |
cleaned_response = strip_think_tags(full_response) | |
parsed_files = parse_code_blocks(cleaned_response) | |
# Create gr.Code components for the parsed files | |
# Compare with existing code_updates to avoid redundant updates if content hasn't changed significantly | |
new_code_updates = [] | |
changed = False | |
if len(parsed_files) != len(code_updates): | |
changed = True | |
else: | |
# Quick check if filenames/code lengths differ significantly | |
for idx, f in enumerate(parsed_files): | |
if (idx >= len(code_updates) or | |
f["filename"] != code_updates[idx].label or | |
len(f["code"]) != len(code_updates[idx].value)): # Simple length check | |
changed = True | |
break | |
if changed or not code_updates: # Update if changed or first time | |
code_updates = [] | |
for f in parsed_files: | |
code_updates.append( | |
gr.Code( | |
value=f["code"], | |
label=f["filename"], | |
language=f["language"] | |
) | |
) | |
# Yield the list of gr.Code components to the gr.Column | |
# Also update thoughts (might be slightly out of sync, but acceptable) | |
yield code_updates, gr.update(value=current_thoughts if current_thoughts else "Thinking...", visible=True) | |
# --- Final Update after Stream Ends --- | |
print("Stream finished.") | |
if accumulated_error: | |
print(f"Errors occurred during stream:\n{accumulated_error}") | |
# Decide how to show this to the user, e.g., append to thoughts or show separately | |
current_thoughts += f"\n\n**Streaming Errors:**\n{accumulated_error}" | |
cleaned_response = strip_think_tags(full_response) | |
final_files = parse_code_blocks(cleaned_response) | |
print(f"Final parsed files: {len(final_files)}") | |
final_code_updates = [] | |
if not final_files and not accumulated_error: | |
# Handle case where no code blocks were generated | |
final_code_updates.append(gr.Markdown("No code blocks were generated. The model might have responded with text instead, or the format was incorrect.")) | |
print("Warning: No code blocks found in the final response.") | |
# Optionally show the raw response for debugging | |
# final_code_updates.append(gr.Code(label="Raw Response", value=cleaned_response, language="text")) | |
elif not final_files and accumulated_error: | |
final_code_updates.append(gr.Markdown(f"**Error during generation:**\n{accumulated_error}")) | |
else: | |
for f in final_files: | |
final_code_updates.append( | |
gr.Code( | |
value=f["code"], | |
label=f["filename"], | |
language=f["language"] | |
) | |
) | |
# Yield final code blocks and hide thinking box (or show final thoughts/errors) | |
final_thought_update = gr.update(visible=True if current_thoughts else False, value=current_thoughts) | |
yield final_code_updates, final_thought_update | |
except HfHubHTTPError as http_err: | |
# Handle errors during the streaming call itself | |
error_message = ( | |
f"**Error during code generation (HTTP Error):**\n" | |
f"Status Code: {http_err.response.status_code}\n" | |
f"Error: {http_err}\n" | |
f"This could be due to rate limits, invalid input, model errors, or token issues.\n" | |
f"Check the Hugging Face Space logs for more details." | |
) | |
print(f"ERROR: {error_message}") | |
print(traceback.format_exc()) | |
# Yield error message in the output area | |
yield [gr.Markdown(error_message)], gr.update(visible=False) # Hide thinking box on error | |
except Exception as e: | |
error_message = ( | |
f"**An unexpected error occurred during code generation:**\n" | |
f"Error Type: {type(e).__name__}\n" | |
f"Error: {e}\n\n" | |
f"**Traceback:**\n```\n{traceback.format_exc()}\n```\n" | |
f"Check the Hugging Face Space logs for more details." | |
) | |
print(f"ERROR: {error_message}") | |
# Yield error message in the output area | |
yield [gr.Markdown(error_message)], gr.update(visible=False) # Hide thinking box on error | |
# --- Gradio Interface --- | |
with gr.Blocks(css=".gradio-container { max-width: 90% !important; }") as demo: | |
gr.Markdown("# ✨ Website Code Generator ✨") | |
gr.Markdown("Describe the website you want. Code files will appear below. Uses `mistralai/Mixtral-8x7B-Instruct-v0.1` by default (check code to change).") # Update description | |
with gr.Row(): | |
with gr.Column(scale=2): | |
prompt_input = gr.Textbox(label="Website Description", lines=6, placeholder="e.g., A simple landing page with a title, a paragraph, and a button linking to example.com") | |
backend_radio = gr.Radio(["Static (HTML/CSS/JS)", "Flask", "Node.js"], label="Backend Preference (Influences AI)", value="Static (HTML/CSS/JS)") | |
generate_button = gr.Button("✨ Generate Website Code", variant="primary") | |
with gr.Accordion("Advanced Settings", open=False): | |
max_tokens_slider = gr.Slider(512, 8192, value=4096, step=256, label="Max New Tokens") # Increased max potential tokens | |
temperature_slider = gr.Slider(0.0, 1.2, value=0.6, step=0.05, label="Temperature (0=deterministic, >1=more creative)") # Allow 0 | |
top_p_slider = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="Top-P (Nucleus Sampling)") | |
with gr.Column(scale=3): | |
thinking_box = gr.Textbox(label="Model Activity / Thoughts", visible=False, interactive=False, lines=2) | |
# Use gr.Column to hold the dynamic code blocks | |
# Remove the update lambda, it's not needed for Column | |
file_outputs = gr.Column(elem_id="code-output-area") | |
generate_button.click( | |
fn=generate_code, | |
inputs=[prompt_input, backend_radio, max_tokens_slider, temperature_slider, top_p_slider], | |
# Output to the Column and the Textbox | |
outputs=[file_outputs, thinking_box], | |
# api_name="generate_code" # Optional: for API access | |
) | |
# --- Launch --- | |
if __name__ == "__main__": | |
print("Starting Gradio App...") | |
# Use queue() for handling multiple users and streaming | |
# Set share=False unless you specifically want a public link from local execution | |
# Set debug=True for more detailed Gradio errors locally (remove/set False for production) | |
demo.queue().launch(debug=False, share=False) | |
print("Gradio App Launched.") |