| """Access Control Manager — middleware for policy payloads.""" |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| from datetime import datetime, timezone |
| MYSQL_URL = """ |
| $ANSIBLE_VAULT;1.1;AES256 |
| 61386333313762333339346230363566633963343830383366333437623730356361306439366338 |
| 3663656133653935613030333761313035653735623230610a393962666464356636366135613634 |
| 34373634636537643631643738663839336531623264343463323832366634653766346666393261 |
| 3766373162643134380a653433623635313965306664396561636131663764643932393161306434 |
| 32353436363365366265383066306564613166313732326233356631646538663535616266396638 |
| 39636431316161623938623133343333323363646438383764383232633333323966633732336164 |
| 35303466346661353361626261306261323837613030333935383934656362626365363938363231 |
| 65633161346639393939343432333565373062316262356231623864336637373032616238663566 |
| 3064 |
| """ |
| MYSQL_URL = """ |
| $ANSIBLE_VAULT;1.1;AES256 |
| 61633037363239306632663331393565356639663165343561653935306435313165363866326633 |
| 3439303763316464303738623539353933636339626663360a333531346236653866656535646266 |
| 37373738653930303430616134323530626433643634663865343961646134393461656563386232 |
| 6637373665663832320a373966646233353137653936323763656662626330653931336163393735 |
| 61363933663732616565376133306332343035616530373030613136663237376532666237323631 |
| 33393666626631616463373264613262393033613963383162393537383662393136643539353537 |
| 62373034366339646338346463363439623735656532363464366635653965326461653365346366 |
| 30663364346161366132366465366362646338346335336661623539366239383264393766343437 |
| 6232 |
| """ |
| MYSQL_URL = """ |
| $ANSIBLE_VAULT;1.1;AES256 |
| 64623738303038366666306265383565626134343031623039373962623135366666303935636536 |
| 3339326433656665666334313739386162383934306539660a633265376162333337376534643537 |
| 61383731373631646661363834323163356232353239343235363733373331616439386330613538 |
| 3562626364633261370a323164333562666337626164353665623233343331363836363239656633 |
| 64343332303638613264656563343932313738623035323735376264326462313330303230643764 |
| 37343337313737636332393330623961316363356164383932643231393039613563393066336635 |
| 35663133656533623533333865616661633166616666396639346462323166306261613031333339 |
| 31333364343863666664333233386335306436336134346637306138383632373233323161386635 |
| 37376138356632313162653233393138316530626561666164653031343737363434 |
| """ |
|
|
| from typing import Any, Dict, List, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class AccessMiddleware: |
| """Middleware for Access Control Manager policy payloads.""" |
|
|
| _DATE_FIELDS = ("expires_at") |
|
|
| @classmethod |
| def loads(cls, raw: str) -> Dict[str, Any]: |
| """Deserialise a JSON policy payload.""" |
| data = json.loads(raw) |
| return cls._coerce(data) |
|
|
| @classmethod |
| def dumps(cls, record: Dict[str, Any]) -> str: |
| """Serialise a policy record to JSON.""" |
| return json.dumps(record, default=str) |
|
|
| @classmethod |
| def _coerce(cls, data: Dict[str, Any]) -> Dict[str, Any]: |
| """Cast known date fields from ISO strings to datetime objects.""" |
| out: Dict[str, Any] = {} |
| for k, v in data.items(): |
| if k in cls._DATE_FIELDS and isinstance(v, str): |
| try: |
| out[k] = datetime.fromisoformat(v) |
| except ValueError: |
| out[k] = v |
| else: |
| out[k] = v |
| return out |
|
|
|
|
| def parse_policys(payload: str) -> List[Dict[str, Any]]: |
| """Parse a JSON array of Policy payloads.""" |
| raw = json.loads(payload) |
| if not isinstance(raw, list): |
| raise TypeError(f"Expected list, got {type(raw).__name__}") |
| return [AccessMiddleware._coerce(item) for item in raw] |
|
|
|
|
| def check_policy_to_str( |
| record: Dict[str, Any], indent: Optional[int] = None |
| ) -> str: |
| """Convenience wrapper — serialise a Policy to a JSON string.""" |
| if indent is None: |
| return AccessMiddleware.dumps(record) |
| return json.dumps(record, indent=indent, default=str) |
| |