Buckets:
| #!/usr/bin/env python3 | |
| """ | |
| STACK – Local Repository Agent with nomic-embed-text | |
| Auto‑bootstraps venv + dependencies on first run. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import math | |
| import time | |
| import threading | |
| import hashlib | |
| import subprocess | |
| import venv | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Any | |
| # ============================================================================ | |
| # 0. VENV BOOTSTRAP – prevent “externally-managed” errors | |
| # ============================================================================ | |
| STACK_DIR = Path(__file__).resolve().parent | |
| VENV_DIR = STACK_DIR / ".stack_venv" | |
| def is_venv() -> bool: | |
| """Check if we are already running inside a virtual environment.""" | |
| return ( | |
| hasattr(sys, 'real_prefix') | |
| or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix) | |
| ) | |
| def create_venv(): | |
| """Create the virtual environment and return the path to its Python binary.""" | |
| print("[*] Creating virtual environment...") | |
| venv.create(str(VENV_DIR), with_pip=True) | |
| if os.name == 'nt': | |
| python = VENV_DIR / 'Scripts' / 'python.exe' | |
| else: | |
| python = VENV_DIR / 'bin' / 'python' | |
| return str(python) | |
| def install_dependencies(python_exe: str): | |
| """Install required packages inside the virtual environment.""" | |
| packages = ["ollama"] | |
| print(f"[*] Installing dependencies inside venv: {packages}") | |
| subprocess.check_call([python_exe, "-m", "pip", "install", "--upgrade", "pip"]) | |
| subprocess.check_call([python_exe, "-m", "pip", "install"] + packages) | |
| def relaunch_in_venv(python_exe: str): | |
| """Replace the current process with one running inside the venv.""" | |
| print("[*] Relaunching inside virtual environment...") | |
| os.execv(python_exe, [python_exe, __file__] + sys.argv[1:]) | |
| # Bootstrap logic | |
| if not is_venv(): | |
| print("[!] Not running in a virtual environment.") | |
| if VENV_DIR.exists(): | |
| # Venv exists – use it | |
| if os.name == 'nt': | |
| python_exe = str(VENV_DIR / 'Scripts' / 'python.exe') | |
| else: | |
| python_exe = str(VENV_DIR / 'bin' / 'python') | |
| if not Path(python_exe).exists(): | |
| print("[!] Venv appears corrupt, recreating...") | |
| python_exe = create_venv() | |
| install_dependencies(python_exe) | |
| relaunch_in_venv(python_exe) | |
| else: | |
| # First run – create venv, install deps, relaunch | |
| python_exe = create_venv() | |
| install_dependencies(python_exe) | |
| relaunch_in_venv(python_exe) | |
| # ============================================================================ | |
| # Now safely inside the venv – import ollama | |
| # ============================================================================ | |
| import ollama | |
| # ============================================================================ | |
| # 2. CONFIGURATION & PATHS | |
| # ============================================================================ | |
| STACK_ROOT = os.path.abspath("./stack_system") | |
| WORKSPACE = os.path.join(STACK_ROOT, "current_workspace") | |
| TEMPLATES_DIR = os.path.join(STACK_ROOT, "templates") | |
| MANIFEST_PATH = os.path.join(TEMPLATES_DIR, "manifest.json") | |
| MEMORY_FILE = os.path.join(STACK_ROOT, ".stack_memory.json") | |
| MAX_TOOL_DEPTH = 10 | |
| os.makedirs(WORKSPACE, exist_ok=True) | |
| os.makedirs(TEMPLATES_DIR, exist_ok=True) | |
| _manifest_lock = threading.RLock() | |
| # ============================================================================ | |
| # 3. OLLAMA CONNECTION & MODEL MANAGEMENT | |
| # ============================================================================ | |
| def check_ollama_running() -> bool: | |
| try: | |
| ollama.list() | |
| return True | |
| except Exception: | |
| return False | |
| def start_ollama() -> bool: | |
| try: | |
| subprocess.Popen(["ollama", "serve"], | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL) | |
| time.sleep(3) | |
| return check_ollama_running() | |
| except Exception: | |
| return False | |
| def ensure_models() -> None: | |
| print("[*] Verifying Ollama models...") | |
| if not check_ollama_running(): | |
| print("[!] Ollama not running. Attempting to start...") | |
| if not start_ollama(): | |
| print("[!] Could not start Ollama. Please run 'ollama serve' manually.") | |
| sys.exit(1) | |
| required_models = ["nomic-embed-text"] | |
| try: | |
| installed = ollama.list() | |
| installed_names = [m.model for m in installed.models] if hasattr(installed, 'models') else [] | |
| except Exception: | |
| installed_names = [] | |
| for model in required_models: | |
| if model not in installed_names: | |
| print(f"[*] Pulling {model}...") | |
| try: | |
| ollama.pull(model) | |
| print(f"[✓] {model} ready") | |
| except Exception as e: | |
| print(f"[!] Failed to pull {model}: {e}") | |
| sys.exit(1) | |
| def get_embedding(text: str) -> Optional[List[float]]: | |
| try: | |
| response = ollama.embed(model="nomic-embed-text", input=text) | |
| if hasattr(response, 'embeddings'): | |
| embeddings = response.embeddings | |
| elif isinstance(response, dict): | |
| embeddings = response.get('embeddings', []) | |
| else: | |
| return None | |
| if embeddings and len(embeddings) > 0: | |
| return embeddings[0] if isinstance(embeddings[0], list) else embeddings[0] | |
| return None | |
| except Exception as e: | |
| print(f"⚠️ Embedding error: {e}") | |
| return None | |
| # ============================================================================ | |
| # 4. VECTOR SIMILARITY & MEMORY SYSTEM | |
| # ============================================================================ | |
| def cosine_similarity(v1: List[float], v2: List[float]) -> float: | |
| if not v1 or not v2 or len(v1) != len(v2): | |
| return 0.0 | |
| dot = sum(a * b for a, b in zip(v1, v2)) | |
| mag1 = math.sqrt(sum(a * a for a in v1)) | |
| mag2 = math.sqrt(sum(b * b for b in v2)) | |
| if mag1 == 0 or mag2 == 0: | |
| return 0.0 | |
| return dot / (mag1 * mag2) | |
| def remember_action(action_summary: str, metadata: Dict = None) -> None: | |
| memory = [] | |
| if os.path.exists(MEMORY_FILE): | |
| try: | |
| with open(MEMORY_FILE, 'r') as f: | |
| memory = json.load(f) | |
| except json.JSONDecodeError: | |
| memory = [] | |
| vector = get_embedding(action_summary) | |
| if vector: | |
| entry = { | |
| "text": action_summary, | |
| "vector": vector, | |
| "timestamp": time.time(), | |
| "metadata": metadata or {} | |
| } | |
| memory.append(entry) | |
| if len(memory) > 1000: | |
| memory = memory[-1000:] | |
| with open(MEMORY_FILE, 'w') as f: | |
| json.dump(memory, f) | |
| def query_memory(query: str, top_k: int = 3) -> List[str]: | |
| if not os.path.exists(MEMORY_FILE): | |
| return [] | |
| try: | |
| with open(MEMORY_FILE, 'r') as f: | |
| memory = json.load(f) | |
| except (json.JSONDecodeError, IOError): | |
| return [] | |
| query_vector = get_embedding(query) | |
| if not query_vector: | |
| return [] | |
| scored = [] | |
| for item in memory: | |
| if "vector" in item and item["vector"]: | |
| score = cosine_similarity(query_vector, item["vector"]) | |
| scored.append((score, item["text"])) | |
| scored.sort(key=lambda x: x[0], reverse=True) | |
| return [text for score, text in scored[:top_k] if score > 0.3] | |
| # ============================================================================ | |
| # 5. TEMPLATE MANAGEMENT & HOT RELOAD | |
| # ============================================================================ | |
| DEFAULT_TEMPLATES = { | |
| "python_web_server": { | |
| "description": "minimal fastapi python web server with api routes", | |
| "files": { | |
| "app.py": "from fastapi import FastAPI\n\napp = FastAPI()\n\n@app.get('/')\ndef root():\n return {'status': 'STACK running'}\n", | |
| "requirements.txt": "fastapi\nuvicorn" | |
| } | |
| }, | |
| "html_login_component": { | |
| "description": "modern tailwind css login form ui component", | |
| "files": { | |
| "login.html": """<!DOCTYPE html> | |
| <html> | |
| <head> | |
| <script src="https://cdn.tailwindcss.com"></script> | |
| </head> | |
| <body class="bg-gray-100 flex items-center justify-center h-screen"> | |
| <div class="bg-white p-8 rounded-lg shadow-md w-96"> | |
| <h2 class="text-2xl font-bold mb-6">Login</h2> | |
| <input type="email" placeholder="Email" class="w-full p-2 border rounded mb-4"> | |
| <input type="password" placeholder="Password" class="w-full p-2 border rounded mb-4"> | |
| <button class="w-full bg-blue-500 text-white p-2 rounded hover:bg-blue-600">Sign In</button> | |
| </div> | |
| </body> | |
| </html>""" | |
| } | |
| } | |
| } | |
| def seed_initial_templates() -> None: | |
| if os.path.exists(MANIFEST_PATH): | |
| return | |
| print("[*] Seeding initial templates...") | |
| manifest = {} | |
| for name, data in DEFAULT_TEMPLATES.items(): | |
| vector = get_embedding(data["description"]) | |
| if vector: | |
| manifest[name] = { | |
| "description": data["description"], | |
| "vector": vector, | |
| "files": data["files"] | |
| } | |
| print(f" ✓ {name}") | |
| with _manifest_lock: | |
| with open(MANIFEST_PATH, 'w') as f: | |
| json.dump(manifest, f, indent=2) | |
| def scan_templates_folder() -> Dict: | |
| discovered = {} | |
| if not os.path.exists(TEMPLATES_DIR): | |
| return discovered | |
| for item in os.listdir(TEMPLATES_DIR): | |
| item_path = os.path.join(TEMPLATES_DIR, item) | |
| if not os.path.isdir(item_path): | |
| continue | |
| files_dict = {} | |
| description = f"code template for {item.replace('_', ' ')}" | |
| for root, _, files in os.walk(item_path): | |
| for filename in files: | |
| if filename == "description.txt": | |
| try: | |
| with open(os.path.join(root, filename), 'r') as f: | |
| description = f.read().strip() | |
| except Exception: | |
| pass | |
| continue | |
| file_path = os.path.join(root, filename) | |
| rel_path = os.path.relpath(file_path, item_path) | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| files_dict[rel_path] = f.read() | |
| except Exception: | |
| pass | |
| if files_dict: | |
| discovered[item] = { | |
| "description": description, | |
| "files": files_dict | |
| } | |
| return discovered | |
| def reload_manifest() -> None: | |
| raw_templates = scan_templates_folder() | |
| with _manifest_lock: | |
| manifest = {} | |
| if os.path.exists(MANIFEST_PATH): | |
| try: | |
| with open(MANIFEST_PATH, 'r') as f: | |
| manifest = json.load(f) | |
| except json.JSONDecodeError: | |
| manifest = {} | |
| updated = False | |
| for name, data in raw_templates.items(): | |
| if name not in manifest or manifest[name]["description"] != data["description"]: | |
| vector = get_embedding(data["description"]) | |
| if vector: | |
| manifest[name] = { | |
| "description": data["description"], | |
| "vector": vector, | |
| "files": data["files"] | |
| } | |
| updated = True | |
| print(f" ✓ Updated template: {name}") | |
| if updated: | |
| with open(MANIFEST_PATH, 'w') as f: | |
| json.dump(manifest, f, indent=2) | |
| def start_template_watcher() -> threading.Thread: | |
| def watcher(): | |
| last_mtime = {} | |
| while True: | |
| time.sleep(2) | |
| try: | |
| current = {} | |
| if os.path.exists(TEMPLATES_DIR): | |
| for root, _, files in os.walk(TEMPLATES_DIR): | |
| for f in files: | |
| path = os.path.join(root, f) | |
| current[path] = os.path.getmtime(path) | |
| if current != last_mtime: | |
| reload_manifest() | |
| last_mtime = current | |
| except Exception: | |
| pass | |
| thread = threading.Thread(target=watcher, daemon=True) | |
| thread.start() | |
| return thread | |
| # ============================================================================ | |
| # 6. SANDBOXED GIT & FILE OPERATIONS | |
| # ============================================================================ | |
| def validate_workspace_path(target_path: str) -> bool: | |
| abs_target = os.path.abspath(os.path.join(WORKSPACE, target_path)) | |
| abs_workspace = os.path.abspath(WORKSPACE) | |
| return abs_target.startswith(abs_workspace) | |
| def run_git_safe(args: List[str], error_ok: bool = False) -> Tuple[bool, str]: | |
| try: | |
| result = subprocess.run( | |
| ["git"] + args, | |
| cwd=WORKSPACE, | |
| capture_output=True, | |
| text=True, | |
| timeout=30 | |
| ) | |
| if result.returncode == 0: | |
| return True, result.stdout or "OK" | |
| elif error_ok: | |
| return False, result.stderr | |
| else: | |
| return False, result.stderr | |
| except subprocess.TimeoutExpired: | |
| return False, "Git operation timed out" | |
| except Exception as e: | |
| return False, str(e) | |
| def cmd_build(repo_name: str) -> str: | |
| global WORKSPACE | |
| new_workspace = os.path.join(STACK_ROOT, repo_name) | |
| try: | |
| os.makedirs(new_workspace, exist_ok=True) | |
| WORKSPACE = new_workspace | |
| success, msg = run_git_safe(["init"]) | |
| if not success: | |
| return f"Git init failed: {msg}" | |
| success, msg = run_git_safe(["checkout", "-b", "main"]) | |
| config_path = os.path.join(WORKSPACE, "STACK.json") | |
| with open(config_path, 'w') as f: | |
| json.dump({ | |
| "name": repo_name, | |
| "created": time.time(), | |
| "managed_by": "STACK" | |
| }, f) | |
| run_git_safe(["add", "."]) | |
| run_git_safe(["commit", "-m", "chore: initial STACK bootstrap"]) | |
| remember_action(f"Created repository '{repo_name}'", {"action": "build", "repo": repo_name}) | |
| return f"✓ Repository '{repo_name}' created at {WORKSPACE}" | |
| except Exception as e: | |
| return f"✗ Build failed: {e}" | |
| def cmd_add(search_query: str) -> str: | |
| workspace_git = os.path.join(WORKSPACE, ".git") | |
| if not os.path.exists(workspace_git): | |
| return "✗ No active repository. Use '/build <name>' first" | |
| with _manifest_lock: | |
| if not os.path.exists(MANIFEST_PATH): | |
| return "✗ No templates available. Run '/import' or add to templates folder" | |
| try: | |
| with open(MANIFEST_PATH, 'r') as f: | |
| manifest = json.load(f) | |
| except Exception: | |
| return "✗ Corrupted manifest file" | |
| if not manifest: | |
| return "✗ Manifest is empty. Seed templates first" | |
| query_vector = get_embedding(search_query) | |
| if not query_vector: | |
| return "✗ Could not generate embedding for query" | |
| best_match = None | |
| best_score = -1.0 | |
| for name, data in manifest.items(): | |
| if "vector" in data: | |
| score = cosine_similarity(query_vector, data["vector"]) | |
| if score > best_score: | |
| best_score = score | |
| best_match = (name, data["files"], data["description"]) | |
| if best_score < 0.35: | |
| return f"✗ No good match (confidence: {best_score:.2f}). Try different phrasing" | |
| template_name, files, description = best_match | |
| branch_name = f"stack/{template_name}" | |
| success, msg = run_git_safe(["checkout", "-b", branch_name], error_ok=True) | |
| files_written = [] | |
| for filename, content in files.items(): | |
| target = os.path.join(WORKSPACE, filename) | |
| if not validate_workspace_path(filename): | |
| continue | |
| os.makedirs(os.path.dirname(target), exist_ok=True) | |
| with open(target, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| files_written.append(filename) | |
| run_git_safe(["add", "."]) | |
| commit_msg = f"feat: add {template_name} via STACK\n\n{description}" | |
| run_git_safe(["commit", "-m", commit_msg]) | |
| run_git_safe(["checkout", "main"]) | |
| run_git_safe(["merge", branch_name]) | |
| remember_action(f"Added template '{template_name}' to repository", | |
| {"action": "add", "template": template_name, "files": files_written}) | |
| return f"✓ Added '{template_name}' (confidence: {best_score:.2f})\n Files: {', '.join(files_written)}" | |
| def cmd_import(json_path: str) -> str: | |
| if not os.path.exists(json_path): | |
| return f"✗ File not found: {json_path}" | |
| try: | |
| with open(json_path, 'r', encoding='utf-8') as f: | |
| new_templates = json.load(f) | |
| except Exception as e: | |
| return f"✗ Invalid JSON: {e}" | |
| with _manifest_lock: | |
| manifest = {} | |
| if os.path.exists(MANIFEST_PATH): | |
| try: | |
| with open(MANIFEST_PATH, 'r') as f: | |
| manifest = json.load(f) | |
| except Exception: | |
| pass | |
| imported = 0 | |
| for name, data in new_templates.items(): | |
| if "description" not in data or "files" not in data: | |
| continue | |
| vector = get_embedding(data["description"]) | |
| if vector: | |
| manifest[name] = { | |
| "description": data["description"], | |
| "vector": vector, | |
| "files": data["files"] | |
| } | |
| imported += 1 | |
| with open(MANIFEST_PATH, 'w') as f: | |
| json.dump(manifest, f, indent=2) | |
| return f"✓ Imported {imported} templates from {json_path}" | |
| def cmd_status() -> str: | |
| with _manifest_lock: | |
| manifest_count = 0 | |
| if os.path.exists(MANIFEST_PATH): | |
| try: | |
| with open(MANIFEST_PATH, 'r') as f: | |
| manifest_count = len(json.load(f)) | |
| except Exception: | |
| pass | |
| has_git = os.path.exists(os.path.join(WORKSPACE, ".git")) | |
| lines = [ | |
| f"STACK Status", | |
| f" Workspace: {WORKSPACE}", | |
| f" Git initialized: {'✓' if has_git else '✗'}", | |
| f" Templates in manifest: {manifest_count}", | |
| f" Ollama: {'✓' if check_ollama_running() else '✗'}" | |
| ] | |
| return "\n".join(lines) | |
| def cmd_list_templates() -> str: | |
| with _manifest_lock: | |
| if not os.path.exists(MANIFEST_PATH): | |
| return "No templates found" | |
| try: | |
| with open(MANIFEST_PATH, 'r') as f: | |
| manifest = json.load(f) | |
| except Exception: | |
| return "Error reading manifest" | |
| if not manifest: | |
| return "No templates in manifest" | |
| lines = ["Available templates:"] | |
| for name, data in manifest.items(): | |
| desc = data.get("description", "No description")[:60] | |
| lines.append(f" • {name}: {desc}") | |
| return "\n".join(lines) | |
| # ============================================================================ | |
| # 7. CLI INTERFACE | |
| # ============================================================================ | |
| def print_banner() -> None: | |
| banner = """ | |
| ███████╗████████╗ █████╗ ██████╗██╗ ██╗ | |
| ██╔════╝╚══██╔══╝██╔══██╗██╔════╝██║ ██╔╝ | |
| ███████╗ ██║ ███████║██║ █████╔╝ | |
| ╚════██║ ██║ ██╔══██║██║ ██╔═██╗ | |
| ███████║ ██║ ██║ ██║╚██████╗██║ ██╗ | |
| ╚══════╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝╚═╝ ╚═╝ | |
| Embed-Driven Repository Architect | |
| """ | |
| print(banner) | |
| def print_commands() -> None: | |
| print(""" | |
| Commands: | |
| /build <name> - Create new git repository workspace | |
| /add <description> - Find matching template, inject into workspace | |
| /import <file.json> - Import external template bundle | |
| /list - List available templates | |
| /status - Show system status | |
| /help - Show this help | |
| /quit - Exit STACK | |
| Templates folder: ./stack_system/templates/ | |
| - Add folders with code + description.txt for hot-reload | |
| - JSON imports also supported | |
| """) | |
| def main() -> None: | |
| print("[*] Initializing STACK...") | |
| ensure_models() | |
| seed_initial_templates() | |
| start_template_watcher() | |
| print_banner() | |
| print_commands() | |
| while True: | |
| try: | |
| user_input = input("\nSTACK > ").strip() | |
| if not user_input: | |
| continue | |
| if user_input.lower() in ["/quit", "/exit", "quit", "exit"]: | |
| print("[*] Goodbye!") | |
| break | |
| if user_input == "/help": | |
| print_commands() | |
| continue | |
| if user_input == "/status": | |
| print(cmd_status()) | |
| continue | |
| if user_input == "/list": | |
| print(cmd_list_templates()) | |
| continue | |
| if user_input.startswith("/build "): | |
| repo_name = user_input[7:].strip() | |
| if repo_name: | |
| print(cmd_build(repo_name)) | |
| else: | |
| print("Usage: /build <repo_name>") | |
| continue | |
| if user_input.startswith("/add "): | |
| query = user_input[5:].strip() | |
| if query: | |
| print(cmd_add(query)) | |
| else: | |
| print("Usage: /add <description>") | |
| continue | |
| if user_input.startswith("/import "): | |
| path = user_input[8:].strip() | |
| if path: | |
| print(cmd_import(path)) | |
| else: | |
| print("Usage: /import <path/to/template.json>") | |
| continue | |
| print(f"Unknown command. Type /help for available commands.") | |
| except KeyboardInterrupt: | |
| print("\n[*] Interrupted. Goodbye!") | |
| break | |
| except EOFError: | |
| break | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| if __name__ == "__main__": | |
| main() | |
Xet Storage Details
- Size:
- 22.4 kB
- Xet hash:
- c0cc3bf640b5ffaa5332971f29a880fc523267c8afeec20f9da2b3e7ce84843b
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.