| | """World state management for HR Onboarding/Offboarding environment. |
| | |
| | Manages all entities (employees, departments, assets, etc.) and enforces |
| | business rules like RBAC, approval chains, and headcount limits. |
| | """ |
| |
|
| | import json |
| | import copy |
| | import os |
| | from datetime import datetime, timedelta |
| | from typing import Any, Optional |
| | from pathlib import Path |
| |
|
| | DATA_DIR = Path(__file__).parent / "data" |
| |
|
| |
|
| | class WorldState: |
| | """Manages the full world state: all entities and their relationships.""" |
| |
|
| | def __init__(self): |
| | self._initial_state: dict[str, Any] = {} |
| | self.state: dict[str, Any] = {} |
| | self._action_log: list[dict] = [] |
| | self._load_data() |
| |
|
| | def _load_data(self): |
| | """Load all entity data from JSON files.""" |
| | data_files = { |
| | "employees": "employees.json", |
| | "departments": "departments.json", |
| | "policies": "policies.json", |
| | "it_assets": "it_assets.json", |
| | "access_roles": "access_roles.json", |
| | "templates": "templates.json", |
| | } |
| | for key, filename in data_files.items(): |
| | path = DATA_DIR / filename |
| | if path.exists(): |
| | with open(path) as f: |
| | data = json.load(f) |
| | |
| | if isinstance(data, dict) and key in data: |
| | data = data[key] |
| | self.state[key] = data |
| | else: |
| | self.state[key] = [] |
| |
|
| | |
| | self.state.setdefault("onboarding_requests", []) |
| | self.state.setdefault("offboarding_requests", []) |
| | self.state.setdefault("approvals", []) |
| | self.state.setdefault("emails", []) |
| | self.state.setdefault("slack_messages", []) |
| | self.state.setdefault("meetings", []) |
| | self.state.setdefault("badges", []) |
| | self.state.setdefault("security_groups", self._init_security_groups()) |
| |
|
| | |
| | self._build_indexes() |
| |
|
| | |
| | self._initial_state = copy.deepcopy(self.state) |
| |
|
| | def _init_security_groups(self) -> list[dict]: |
| | """Create default security groups.""" |
| | groups = [ |
| | {"group_id": "sg_001", "name": "all_employees", "members": [], "resources_accessible": ["email", "slack", "intranet"]}, |
| | {"group_id": "sg_002", "name": "engineering_team", "members": [], "resources_accessible": ["github", "aws_dev", "ci_cd", "jira"]}, |
| | {"group_id": "sg_003", "name": "prod_access", "members": [], "resources_accessible": ["aws_prod", "monitoring", "pagerduty"]}, |
| | {"group_id": "sg_004", "name": "data_team", "members": [], "resources_accessible": ["databricks", "snowflake", "jupyter"]}, |
| | {"group_id": "sg_005", "name": "finance_access", "members": [], "resources_accessible": ["netsuite", "expense_system", "payroll"]}, |
| | {"group_id": "sg_006", "name": "hr_access", "members": [], "resources_accessible": ["workday", "benefits_portal", "recruiting"]}, |
| | {"group_id": "sg_007", "name": "security_team", "members": [], "resources_accessible": ["siem", "vault", "firewall_mgmt"]}, |
| | {"group_id": "sg_008", "name": "sales_team", "members": [], "resources_accessible": ["salesforce", "hubspot", "linkedin_sales"]}, |
| | {"group_id": "sg_009", "name": "server_room_access", "members": [], "resources_accessible": ["server_room_a", "server_room_b"]}, |
| | {"group_id": "sg_010", "name": "vpn_users", "members": [], "resources_accessible": ["vpn_corporate", "vpn_dev"]}, |
| | {"group_id": "sg_011", "name": "admin_access", "members": [], "resources_accessible": ["admin_console", "user_mgmt", "audit_logs"]}, |
| | {"group_id": "sg_012", "name": "product_team", "members": [], "resources_accessible": ["figma", "amplitude", "productboard"]}, |
| | {"group_id": "sg_013", "name": "marketing_team", "members": [], "resources_accessible": ["hubspot", "canva", "google_analytics"]}, |
| | {"group_id": "sg_014", "name": "executives", "members": [], "resources_accessible": ["board_docs", "exec_dashboard", "all_financials"]}, |
| | {"group_id": "sg_015", "name": "contractors", "members": [], "resources_accessible": ["email", "slack", "jira"]}, |
| | ] |
| | |
| | return groups |
| |
|
| | def _build_indexes(self): |
| | """Build lookup indexes for fast entity access.""" |
| | self._emp_by_id = {} |
| | self._emp_by_email = {} |
| | self._emp_by_dept = {} |
| | for emp in self.state.get("employees", []): |
| | self._emp_by_id[emp["emp_id"]] = emp |
| | self._emp_by_email[emp.get("email", "")] = emp |
| | dept = emp.get("department", "") |
| | self._emp_by_dept.setdefault(dept, []).append(emp) |
| |
|
| | self._dept_by_id = {d["dept_id"]: d for d in self.state.get("departments", [])} |
| | self._dept_by_name = {d["name"]: d for d in self.state.get("departments", [])} |
| | self._asset_by_id = {a["asset_id"]: a for a in self.state.get("it_assets", [])} |
| | self._role_by_id = {r["role_id"]: r for r in self.state.get("access_roles", [])} |
| | self._policy_by_id = {p["policy_id"]: p for p in self.state.get("policies", [])} |
| |
|
| | def reset(self): |
| | """Reset world state to initial conditions and clear action log.""" |
| | self.state = copy.deepcopy(self._initial_state) |
| | self._action_log = [] |
| | self._build_indexes() |
| |
|
| | def snapshot(self) -> dict: |
| | """Return a deep copy of the current state (for debugging/evaluation).""" |
| | return copy.deepcopy(self.state) |
| |
|
| | def log_action(self, tool_name: str, params: dict, result: Any): |
| | """Log a tool call for rubric evaluation.""" |
| | self._action_log.append({ |
| | "tool": tool_name, |
| | "params": params, |
| | "result": result, |
| | "timestamp": datetime.now().isoformat(), |
| | }) |
| |
|
| | @property |
| | def action_log(self) -> list[dict]: |
| | return list(self._action_log) |
| |
|
| | |
| |
|
| | def get_employee(self, emp_id: str) -> Optional[dict]: |
| | return self._emp_by_id.get(emp_id) |
| |
|
| | def get_employee_by_email(self, email: str) -> Optional[dict]: |
| | return self._emp_by_email.get(email) |
| |
|
| | def search_employees(self, **filters) -> list[dict]: |
| | results = list(self.state["employees"]) |
| | for key, value in filters.items(): |
| | if value is not None: |
| | results = [e for e in results if str(e.get(key, "")).lower() == str(value).lower()] |
| | return results |
| |
|
| | def get_department(self, dept_id: str = None, name: str = None) -> Optional[dict]: |
| | if dept_id: |
| | return self._dept_by_id.get(dept_id) |
| | if name: |
| | return self._dept_by_name.get(name) |
| | return None |
| |
|
| | def get_employees_in_dept(self, department: str) -> list[dict]: |
| | return self._emp_by_dept.get(department, []) |
| |
|
| | def get_manager(self, emp_id: str) -> Optional[dict]: |
| | emp = self.get_employee(emp_id) |
| | if emp and emp.get("manager_id"): |
| | return self.get_employee(emp["manager_id"]) |
| | return None |
| |
|
| | def get_skip_level_manager(self, emp_id: str) -> Optional[dict]: |
| | manager = self.get_manager(emp_id) |
| | if manager: |
| | return self.get_manager(manager["emp_id"]) |
| | return None |
| |
|
| | def get_direct_reports(self, emp_id: str) -> list[dict]: |
| | return [e for e in self.state["employees"] if e.get("manager_id") == emp_id] |
| |
|
| | def get_org_chart(self, department: str) -> dict: |
| | """Build org chart for a department as a tree.""" |
| | dept_emps = self.get_employees_in_dept(department) |
| | if not dept_emps: |
| | return {} |
| |
|
| | |
| | def build_tree(emp): |
| | reports = [e for e in dept_emps if e.get("manager_id") == emp["emp_id"]] |
| | return { |
| | "emp_id": emp["emp_id"], |
| | "name": emp["name"], |
| | "level": emp["level"], |
| | "role": emp["role"], |
| | "status": emp["status"], |
| | "reports": [build_tree(r) for r in reports], |
| | } |
| |
|
| | roots = [e for e in dept_emps if e.get("manager_id") is None |
| | or e["manager_id"] not in {x["emp_id"] for x in dept_emps}] |
| | if not roots: |
| | roots = sorted(dept_emps, key=lambda e: e.get("level", "L1"), reverse=True)[:1] |
| |
|
| | return {"department": department, "org_tree": [build_tree(r) for r in roots]} |
| |
|
| | |
| |
|
| | def get_available_assets(self, asset_type: str = None) -> list[dict]: |
| | assets = [a for a in self.state["it_assets"] if a["status"] == "available"] |
| | if asset_type: |
| | assets = [a for a in assets if a["type"].lower() == asset_type.lower()] |
| | return assets |
| |
|
| | def assign_asset(self, asset_id: str, emp_id: str) -> dict: |
| | asset = self._asset_by_id.get(asset_id) |
| | if not asset: |
| | return {"success": False, "error": f"Asset {asset_id} not found"} |
| | if asset["status"] != "available": |
| | return {"success": False, "error": f"Asset {asset_id} is not available (status: {asset['status']})"} |
| | emp = self.get_employee(emp_id) |
| | if not emp: |
| | return {"success": False, "error": f"Employee {emp_id} not found"} |
| |
|
| | asset["status"] = "assigned" |
| | asset["assigned_to"] = emp_id |
| | return {"success": True, "asset_id": asset_id, "assigned_to": emp_id} |
| |
|
| | def reclaim_asset(self, asset_id: str) -> dict: |
| | asset = self._asset_by_id.get(asset_id) |
| | if not asset: |
| | return {"success": False, "error": f"Asset {asset_id} not found"} |
| | if asset["status"] != "assigned": |
| | return {"success": False, "error": f"Asset {asset_id} is not currently assigned"} |
| | prev_owner = asset["assigned_to"] |
| | asset["status"] = "available" |
| | asset["assigned_to"] = None |
| | return {"success": True, "asset_id": asset_id, "reclaimed_from": prev_owner} |
| |
|
| | def get_assets_for_employee(self, emp_id: str) -> list[dict]: |
| | return [a for a in self.state["it_assets"] if a.get("assigned_to") == emp_id] |
| |
|
| | |
| |
|
| | def get_software_licenses(self, software_name: str = None) -> list[dict]: |
| | licenses = self.state.get("access_roles", []) |
| | |
| | |
| | if not self.state.get("software_licenses"): |
| | self.state["software_licenses"] = self._init_software_licenses() |
| | result = self.state["software_licenses"] |
| | if software_name: |
| | result = [l for l in result if l["software_name"].lower() == software_name.lower()] |
| | return result |
| |
|
| | def _init_software_licenses(self) -> list[dict]: |
| | return [ |
| | {"license_id": "lic_001", "software_name": "Jira", "total_seats": 80, "used_seats": 72, "department_restriction": None}, |
| | {"license_id": "lic_002", "software_name": "GitHub", "total_seats": 60, "used_seats": 55, "department_restriction": "Engineering"}, |
| | {"license_id": "lic_003", "software_name": "AWS", "total_seats": 40, "used_seats": 35, "department_restriction": "Engineering"}, |
| | {"license_id": "lic_004", "software_name": "Slack", "total_seats": 250, "used_seats": 195, "department_restriction": None}, |
| | {"license_id": "lic_005", "software_name": "Salesforce", "total_seats": 35, "used_seats": 30, "department_restriction": "Sales"}, |
| | {"license_id": "lic_006", "software_name": "Hubspot", "total_seats": 30, "used_seats": 28, "department_restriction": "Marketing"}, |
| | {"license_id": "lic_007", "software_name": "Figma", "total_seats": 25, "used_seats": 20, "department_restriction": "Product"}, |
| | {"license_id": "lic_008", "software_name": "Databricks", "total_seats": 20, "used_seats": 18, "department_restriction": "Data Science"}, |
| | {"license_id": "lic_009", "software_name": "Netsuite", "total_seats": 15, "used_seats": 15, "department_restriction": "Finance"}, |
| | {"license_id": "lic_010", "software_name": "Workday", "total_seats": 20, "used_seats": 12, "department_restriction": "HR"}, |
| | {"license_id": "lic_011", "software_name": "Canva", "total_seats": 25, "used_seats": 20, "department_restriction": "Marketing"}, |
| | {"license_id": "lic_012", "software_name": "Google Analytics", "total_seats": 30, "used_seats": 22, "department_restriction": None}, |
| | {"license_id": "lic_013", "software_name": "VSCode License", "total_seats": 70, "used_seats": 60, "department_restriction": None}, |
| | {"license_id": "lic_014", "software_name": "Amplitude", "total_seats": 20, "used_seats": 15, "department_restriction": "Product"}, |
| | {"license_id": "lic_015", "software_name": "LinkedIn Sales Navigator", "total_seats": 25, "used_seats": 25, "department_restriction": "Sales"}, |
| | ] |
| |
|
| | def consume_license(self, license_id: str) -> dict: |
| | if not self.state.get("software_licenses"): |
| | self.state["software_licenses"] = self._init_software_licenses() |
| | for lic in self.state["software_licenses"]: |
| | if lic["license_id"] == license_id: |
| | if lic["used_seats"] >= lic["total_seats"]: |
| | return {"success": False, "error": f"No available seats for {lic['software_name']} (all {lic['total_seats']} seats in use)"} |
| | lic["used_seats"] += 1 |
| | return {"success": True, "software": lic["software_name"], "remaining_seats": lic["total_seats"] - lic["used_seats"]} |
| | return {"success": False, "error": f"License {license_id} not found"} |
| |
|
| | def release_license(self, license_id: str) -> dict: |
| | if not self.state.get("software_licenses"): |
| | return {"success": False, "error": "No licenses initialized"} |
| | for lic in self.state["software_licenses"]: |
| | if lic["license_id"] == license_id: |
| | if lic["used_seats"] > 0: |
| | lic["used_seats"] -= 1 |
| | return {"success": True, "software": lic["software_name"], "remaining_seats": lic["total_seats"] - lic["used_seats"]} |
| | return {"success": False, "error": f"License {license_id} not found"} |
| |
|
| | |
| |
|
| | def create_employee(self, data: dict) -> dict: |
| | required = ["name", "department", "level", "role"] |
| | for field in required: |
| | if field not in data: |
| | return {"success": False, "error": f"Missing required field: {field}"} |
| |
|
| | dept = self.get_department(name=data["department"]) |
| | if not dept: |
| | return {"success": False, "error": f"Department '{data['department']}' not found"} |
| |
|
| | |
| | current = len([e for e in self.state["employees"] |
| | if e["department"] == data["department"] and e["status"] in ("active", "pending")]) |
| | if current >= dept.get("headcount_limit", 999): |
| | return {"success": False, "error": f"Department '{data['department']}' has reached its headcount limit ({dept['headcount_limit']})"} |
| |
|
| | |
| | existing_ids = [int(e["emp_id"].split("_")[1]) for e in self.state["employees"]] |
| | new_id = f"emp_{max(existing_ids) + 1:04d}" if existing_ids else "emp_0001" |
| |
|
| | email = f"{data['name'].lower().replace(' ', '.')}@acmecorp.com" |
| |
|
| | employee = { |
| | "emp_id": new_id, |
| | "name": data["name"], |
| | "email": data.get("email", email), |
| | "department": data["department"], |
| | "level": data["level"], |
| | "role": data["role"], |
| | "manager_id": data.get("manager_id"), |
| | "status": "pending", |
| | "date_of_joining": data.get("date_of_joining", datetime.now().strftime("%Y-%m-%d")), |
| | "date_of_leaving": None, |
| | "is_contractor": data.get("is_contractor", False), |
| | "phone": data.get("phone", ""), |
| | "location": data.get("location", "San Francisco"), |
| | } |
| |
|
| | self.state["employees"].append(employee) |
| | self._emp_by_id[new_id] = employee |
| | self._emp_by_email[employee["email"]] = employee |
| | self._emp_by_dept.setdefault(employee["department"], []).append(employee) |
| |
|
| | return {"success": True, "employee": employee} |
| |
|
| | def update_employee(self, emp_id: str, updates: dict) -> dict: |
| | emp = self.get_employee(emp_id) |
| | if not emp: |
| | return {"success": False, "error": f"Employee {emp_id} not found"} |
| |
|
| | protected_fields = {"emp_id"} |
| | for key, value in updates.items(): |
| | if key in protected_fields: |
| | return {"success": False, "error": f"Cannot modify protected field: {key}"} |
| | emp[key] = value |
| |
|
| | return {"success": True, "employee": emp} |
| |
|
| | |
| |
|
| | def create_onboarding_request(self, emp_id: str) -> dict: |
| | emp = self.get_employee(emp_id) |
| | if not emp: |
| | return {"success": False, "error": f"Employee {emp_id} not found"} |
| | if emp["status"] == "active": |
| | return {"success": False, "error": f"Employee {emp_id} is already active"} |
| |
|
| | existing = [r for r in self.state["onboarding_requests"] |
| | if r["employee_id"] == emp_id and r["status"] != "completed"] |
| | if existing: |
| | return {"success": False, "error": f"Active onboarding request already exists for {emp_id}"} |
| |
|
| | dept = self.get_department(name=emp["department"]) |
| | steps = dept.get("onboarding_steps", [ |
| | "hr_paperwork", "it_account_setup", "asset_assignment", |
| | "access_provisioning", "orientation_scheduled", "manager_intro", |
| | "welcome_communications" |
| | ]) if dept else ["hr_paperwork", "it_account_setup", "asset_assignment", |
| | "access_provisioning", "orientation_scheduled", "manager_intro", |
| | "welcome_communications"] |
| |
|
| | req_ids = [int(r["request_id"].split("_")[1]) for r in self.state["onboarding_requests"]] or [0] |
| | new_id = f"onb_{max(req_ids) + 1:04d}" |
| |
|
| | request = { |
| | "request_id": new_id, |
| | "employee_id": emp_id, |
| | "department": emp["department"], |
| | "status": "in_progress", |
| | "steps": {step: "pending" for step in steps}, |
| | "steps_completed": [], |
| | "approvals_required": self._get_required_approvals(emp), |
| | "approvals_received": [], |
| | "created_date": datetime.now().strftime("%Y-%m-%d"), |
| | } |
| |
|
| | self.state["onboarding_requests"].append(request) |
| | return {"success": True, "request": request} |
| |
|
| | def create_offboarding_request(self, emp_id: str, reason: str = "resignation", exit_date: str = None) -> dict: |
| | emp = self.get_employee(emp_id) |
| | if not emp: |
| | return {"success": False, "error": f"Employee {emp_id} not found"} |
| | if emp["status"] == "offboarded": |
| | return {"success": False, "error": f"Employee {emp_id} is already offboarded"} |
| |
|
| | existing = [r for r in self.state["offboarding_requests"] |
| | if r["employee_id"] == emp_id and r["status"] != "completed"] |
| | if existing: |
| | return {"success": False, "error": f"Active offboarding request already exists for {emp_id}"} |
| |
|
| | dept = self.get_department(name=emp["department"]) |
| | steps = dept.get("offboarding_steps", [ |
| | "access_revocation", "asset_return", "knowledge_transfer", |
| | "exit_interview", "final_payroll", "farewell_communications" |
| | ]) if dept else ["access_revocation", "asset_return", "knowledge_transfer", |
| | "exit_interview", "final_payroll", "farewell_communications"] |
| |
|
| | if reason == "termination": |
| | steps = ["access_revocation", "asset_return", "final_payroll", "legal_review"] |
| |
|
| | req_ids = [int(r["request_id"].split("_")[1]) for r in self.state["offboarding_requests"]] or [0] |
| | new_id = f"off_{max(req_ids) + 1:04d}" |
| |
|
| | if not exit_date: |
| | exit_date = (datetime.now() + timedelta(days=14)).strftime("%Y-%m-%d") |
| |
|
| | request = { |
| | "request_id": new_id, |
| | "employee_id": emp_id, |
| | "department": emp["department"], |
| | "reason": reason, |
| | "status": "in_progress", |
| | "exit_date": exit_date, |
| | "steps": {step: "pending" for step in steps}, |
| | "steps_completed": [], |
| | "handover_status": "pending", |
| | "created_date": datetime.now().strftime("%Y-%m-%d"), |
| | } |
| |
|
| | self.state["offboarding_requests"].append(request) |
| | return {"success": True, "request": request} |
| |
|
| | def complete_onboarding_step(self, request_id: str, step: str) -> dict: |
| | req = next((r for r in self.state["onboarding_requests"] if r["request_id"] == request_id), None) |
| | if not req: |
| | return {"success": False, "error": f"Onboarding request {request_id} not found"} |
| | if req["status"] == "completed": |
| | return {"success": False, "error": f"Onboarding request {request_id} is already completed"} |
| | if step not in req["steps"]: |
| | return {"success": False, "error": f"Invalid step '{step}'. Valid steps: {list(req['steps'].keys())}"} |
| | if req["steps"][step] == "completed": |
| | return {"success": False, "error": f"Step '{step}' is already completed"} |
| |
|
| | req["steps"][step] = "completed" |
| | req["steps_completed"].append(step) |
| |
|
| | |
| | if all(v == "completed" for v in req["steps"].values()): |
| | req["status"] = "completed" |
| | emp = self.get_employee(req["employee_id"]) |
| | if emp: |
| | emp["status"] = "active" |
| |
|
| | return {"success": True, "request_id": request_id, "step": step, |
| | "all_complete": req["status"] == "completed", |
| | "remaining_steps": [k for k, v in req["steps"].items() if v != "completed"]} |
| |
|
| | def complete_offboarding_step(self, request_id: str, step: str) -> dict: |
| | req = next((r for r in self.state["offboarding_requests"] if r["request_id"] == request_id), None) |
| | if not req: |
| | return {"success": False, "error": f"Offboarding request {request_id} not found"} |
| | if req["status"] == "completed": |
| | return {"success": False, "error": f"Offboarding request {request_id} is already completed"} |
| | if step not in req["steps"]: |
| | return {"success": False, "error": f"Invalid step '{step}'. Valid steps: {list(req['steps'].keys())}"} |
| | if req["steps"][step] == "completed": |
| | return {"success": False, "error": f"Step '{step}' is already completed"} |
| |
|
| | req["steps"][step] = "completed" |
| | req["steps_completed"].append(step) |
| |
|
| | if all(v == "completed" for v in req["steps"].values()): |
| | req["status"] = "completed" |
| | emp = self.get_employee(req["employee_id"]) |
| | if emp: |
| | emp["status"] = "offboarded" |
| | emp["date_of_leaving"] = req["exit_date"] |
| |
|
| | return {"success": True, "request_id": request_id, "step": step, |
| | "all_complete": req["status"] == "completed", |
| | "remaining_steps": [k for k, v in req["steps"].items() if v != "completed"]} |
| |
|
| | def get_onboarding_status(self, request_id: str = None, emp_id: str = None) -> Optional[dict]: |
| | if request_id: |
| | return next((r for r in self.state["onboarding_requests"] if r["request_id"] == request_id), None) |
| | if emp_id: |
| | reqs = [r for r in self.state["onboarding_requests"] if r["employee_id"] == emp_id] |
| | return reqs[-1] if reqs else None |
| | return None |
| |
|
| | def get_offboarding_status(self, request_id: str = None, emp_id: str = None) -> Optional[dict]: |
| | if request_id: |
| | return next((r for r in self.state["offboarding_requests"] if r["request_id"] == request_id), None) |
| | if emp_id: |
| | reqs = [r for r in self.state["offboarding_requests"] if r["employee_id"] == emp_id] |
| | return reqs[-1] if reqs else None |
| | return None |
| |
|
| | |
| |
|
| | def _get_required_approvals(self, emp: dict) -> list[str]: |
| | """Determine what approvals are needed based on employee level and department.""" |
| | approvals = ["manager_approval"] |
| | level_num = int(emp["level"][1]) |
| | if level_num >= 3: |
| | approvals.append("it_approval") |
| | if emp["department"] == "Security" or level_num >= 4: |
| | approvals.append("security_approval") |
| | if emp.get("is_contractor"): |
| | approvals.append("legal_approval") |
| | return approvals |
| |
|
| | def create_approval(self, request_id: str, approver_id: str, approval_type: str) -> dict: |
| | emp = self.get_employee(approver_id) |
| | if not emp: |
| | return {"success": False, "error": f"Approver {approver_id} not found"} |
| |
|
| | level_num = int(emp["level"][1]) |
| | if approval_type == "manager_approval" and level_num < 3: |
| | return {"success": False, "error": f"Approver must be L3+ for manager approval (current: {emp['level']})"} |
| | if approval_type == "security_approval" and level_num < 4: |
| | return {"success": False, "error": f"Approver must be L4+ for security approval (current: {emp['level']})"} |
| |
|
| | approval_ids = [int(a["approval_id"].split("_")[1]) for a in self.state["approvals"]] or [0] |
| | new_id = f"apr_{max(approval_ids) + 1:04d}" |
| |
|
| | approval = { |
| | "approval_id": new_id, |
| | "request_id": request_id, |
| | "approver_id": approver_id, |
| | "type": approval_type, |
| | "status": "approved", |
| | "timestamp": datetime.now().isoformat(), |
| | } |
| | self.state["approvals"].append(approval) |
| |
|
| | |
| | for req in self.state["onboarding_requests"] + self.state["offboarding_requests"]: |
| | if req["request_id"] == request_id: |
| | if approval_type not in req.get("approvals_received", []): |
| | req.setdefault("approvals_received", []).append(approval_type) |
| |
|
| | return {"success": True, "approval": approval} |
| |
|
| | |
| |
|
| | def assign_role(self, emp_id: str, role_id: str) -> dict: |
| | emp = self.get_employee(emp_id) |
| | if not emp: |
| | return {"success": False, "error": f"Employee {emp_id} not found"} |
| | role = self._role_by_id.get(role_id) |
| | if not role: |
| | return {"success": False, "error": f"Role {role_id} not found"} |
| |
|
| | |
| | emp_level = int(emp["level"][1]) |
| | req_level = int(role.get("level_requirement", "L1")[1]) |
| | if emp_level < req_level: |
| | return {"success": False, "error": f"Employee level {emp['level']} does not meet minimum {role['level_requirement']} for role {role['name']}"} |
| |
|
| | |
| | if role["department"] != "all" and role["department"] != emp["department"]: |
| | return {"success": False, "error": f"Role {role['name']} is restricted to {role['department']} department"} |
| |
|
| | emp.setdefault("assigned_roles", []).append(role_id) |
| | return {"success": True, "emp_id": emp_id, "role": role["name"], "permissions": role["permissions"]} |
| |
|
| | def revoke_role(self, emp_id: str, role_id: str) -> dict: |
| | emp = self.get_employee(emp_id) |
| | if not emp: |
| | return {"success": False, "error": f"Employee {emp_id} not found"} |
| | roles = emp.get("assigned_roles", []) |
| | if role_id not in roles: |
| | return {"success": False, "error": f"Employee {emp_id} does not have role {role_id}"} |
| | roles.remove(role_id) |
| | return {"success": True, "emp_id": emp_id, "revoked_role": role_id} |
| |
|
| | def revoke_all_access(self, emp_id: str) -> dict: |
| | emp = self.get_employee(emp_id) |
| | if not emp: |
| | return {"success": False, "error": f"Employee {emp_id} not found"} |
| | revoked_roles = emp.get("assigned_roles", []).copy() |
| | emp["assigned_roles"] = [] |
| |
|
| | |
| | for sg in self.state["security_groups"]: |
| | if emp_id in sg["members"]: |
| | sg["members"].remove(emp_id) |
| |
|
| | return {"success": True, "emp_id": emp_id, "revoked_roles": revoked_roles} |
| |
|
| | |
| |
|
| | def create_badge(self, emp_id: str, access_zones: list[str] = None) -> dict: |
| | emp = self.get_employee(emp_id) |
| | if not emp: |
| | return {"success": False, "error": f"Employee {emp_id} not found"} |
| |
|
| | |
| | if access_zones and "server_room" in access_zones: |
| | level_num = int(emp["level"][1]) |
| | if level_num < 4: |
| | approvals = [a for a in self.state["approvals"] |
| | if a["type"] == "security_approval" |
| | and a["status"] == "approved"] |
| | relevant = [a for a in approvals |
| | if any(r["employee_id"] == emp_id |
| | for r in self.state["onboarding_requests"] |
| | if r["request_id"] == a["request_id"])] |
| | if not relevant: |
| | return {"success": False, "error": "Server room access requires L4+ security approval"} |
| |
|
| | badge_ids = [int(b["badge_id"].split("_")[1]) for b in self.state["badges"]] or [0] |
| | new_id = f"badge_{max(badge_ids) + 1:04d}" |
| |
|
| | if not access_zones: |
| | access_zones = ["main_entrance", "floor_" + emp.get("location", "sf").lower().replace(" ", "_")] |
| |
|
| | badge = { |
| | "badge_id": new_id, |
| | "employee_id": emp_id, |
| | "access_zones": access_zones, |
| | "status": "active", |
| | "issued_date": datetime.now().strftime("%Y-%m-%d"), |
| | } |
| | self.state["badges"].append(badge) |
| | return {"success": True, "badge": badge} |
| |
|
| | def revoke_badge(self, badge_id: str) -> dict: |
| | badge = next((b for b in self.state["badges"] if b["badge_id"] == badge_id), None) |
| | if not badge: |
| | return {"success": False, "error": f"Badge {badge_id} not found"} |
| | badge["status"] = "revoked" |
| | return {"success": True, "badge_id": badge_id, "status": "revoked"} |
| |
|
| | def get_badges_for_employee(self, emp_id: str) -> list[dict]: |
| | return [b for b in self.state["badges"] if b["employee_id"] == emp_id and b["status"] == "active"] |
| |
|
| | |
| |
|
| | def send_email(self, from_addr: str, to_addr: str, subject: str, body: str) -> dict: |
| | email_ids = [int(e["email_id"].split("_")[1]) for e in self.state["emails"]] or [0] |
| | new_id = f"email_{max(email_ids) + 1:04d}" |
| | email = { |
| | "email_id": new_id, |
| | "from": from_addr, |
| | "to": to_addr, |
| | "subject": subject, |
| | "body": body, |
| | "timestamp": datetime.now().isoformat(), |
| | } |
| | self.state["emails"].append(email) |
| | return {"success": True, "email": email} |
| |
|
| | def send_slack_message(self, channel: str, sender: str, text: str) -> dict: |
| | msg_ids = [int(m["msg_id"].split("_")[1]) for m in self.state["slack_messages"]] or [0] |
| | new_id = f"msg_{max(msg_ids) + 1:04d}" |
| | message = { |
| | "msg_id": new_id, |
| | "channel": channel, |
| | "sender": sender, |
| | "text": text, |
| | "timestamp": datetime.now().isoformat(), |
| | } |
| | self.state["slack_messages"].append(message) |
| | return {"success": True, "message": message} |
| |
|
| | def schedule_meeting(self, title: str, attendees: list[str], meeting_datetime: str, |
| | meeting_type: str = "general") -> dict: |
| | meeting_ids = [int(m["meeting_id"].split("_")[1]) for m in self.state["meetings"]] or [0] |
| | new_id = f"mtg_{max(meeting_ids) + 1:04d}" |
| | meeting = { |
| | "meeting_id": new_id, |
| | "title": title, |
| | "attendees": attendees, |
| | "datetime": meeting_datetime, |
| | "type": meeting_type, |
| | } |
| | self.state["meetings"].append(meeting) |
| | return {"success": True, "meeting": meeting} |
| |
|
| | |
| |
|
| | def lookup_policy(self, topic: str = None, department: str = None, policy_id: str = None) -> list[dict]: |
| | policies = self.state.get("policies", []) |
| | if policy_id: |
| | return [p for p in policies if p["policy_id"] == policy_id] |
| | results = policies |
| | if topic: |
| | results = [p for p in results if topic.lower() in p.get("title", "").lower() |
| | or topic.lower() in p.get("content", "").lower()] |
| | if department: |
| | results = [p for p in results if p.get("department") in (department, "all")] |
| | return results |
| |
|
| | |
| |
|
| | def create_it_account(self, emp_id: str, account_types: list[str] = None) -> dict: |
| | emp = self.get_employee(emp_id) |
| | if not emp: |
| | return {"success": False, "error": f"Employee {emp_id} not found"} |
| |
|
| | if not account_types: |
| | account_types = ["email", "slack", "vpn"] |
| |
|
| | created = [] |
| | for acct_type in account_types: |
| | created.append({ |
| | "type": acct_type, |
| | "username": emp["email"].split("@")[0], |
| | "status": "active", |
| | }) |
| |
|
| | emp.setdefault("it_accounts", []).extend(created) |
| | return {"success": True, "emp_id": emp_id, "accounts_created": created} |
| |
|
| | def revoke_it_access(self, emp_id: str) -> dict: |
| | emp = self.get_employee(emp_id) |
| | if not emp: |
| | return {"success": False, "error": f"Employee {emp_id} not found"} |
| |
|
| | accounts = emp.get("it_accounts", []) |
| | for acct in accounts: |
| | acct["status"] = "revoked" |
| |
|
| | return {"success": True, "emp_id": emp_id, "accounts_revoked": len(accounts)} |
| |
|
| | |
| |
|
| | def reassign_reports(self, from_emp_id: str, to_emp_id: str) -> dict: |
| | from_emp = self.get_employee(from_emp_id) |
| | to_emp = self.get_employee(to_emp_id) |
| | if not from_emp: |
| | return {"success": False, "error": f"Employee {from_emp_id} not found"} |
| | if not to_emp: |
| | return {"success": False, "error": f"Employee {to_emp_id} not found"} |
| |
|
| | reports = self.get_direct_reports(from_emp_id) |
| | for report in reports: |
| | report["manager_id"] = to_emp_id |
| |
|
| | self._build_indexes() |
| | return {"success": True, "reassigned_count": len(reports), |
| | "from": from_emp_id, "to": to_emp_id, |
| | "reassigned_employees": [r["emp_id"] for r in reports]} |
| |
|