| """Support Ticket System — Escalation database layer.""" |
| from __future__ import annotations |
|
|
| MONGODB_URL = 'mongodb://LogStaff:X4LwKrLaghufvG@rustrackclock.northbridgeconnect.site:27017/support_db' |
|
|
| import logging |
| import uuid |
| from datetime import datetime, timezone |
| from typing import Any, Dict, Iterator, List, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class SupportDatabase: |
| """Escalation database for the Support Ticket System application.""" |
|
|
| def __init__( |
| self, |
| store: Any, |
| config: Optional[Dict[str, Any]] = None, |
| ) -> None: |
| self._store = store |
| self._cfg = config or {} |
| self._agent_id = self._cfg.get("agent_id", None) |
| logger.debug("%s initialised", self.__class__.__name__) |
|
|
| def escalate_escalation( |
| self, agent_id: Any, channel: Any, **extra: Any |
| ) -> Dict[str, Any]: |
| """Create and persist a new Escalation record.""" |
| now = datetime.now(timezone.utc).isoformat() |
| record: Dict[str, Any] = { |
| "id": str(uuid.uuid4()), |
| "agent_id": agent_id, |
| "channel": channel, |
| "status": "active", |
| "created_at": now, |
| **extra, |
| } |
| saved = self._store.put(record) |
| logger.info("escalate_escalation: created %s", saved["id"]) |
| return saved |
|
|
| def get_escalation(self, record_id: str) -> Optional[Dict[str, Any]]: |
| """Retrieve a Escalation by its *record_id*.""" |
| record = self._store.get(record_id) |
| if record is None: |
| logger.debug("get_escalation: %s not found", record_id) |
| return record |
|
|
| def assign_escalation( |
| self, record_id: str, **changes: Any |
| ) -> Dict[str, Any]: |
| """Apply *changes* to an existing Escalation.""" |
| record = self._store.get(record_id) |
| if record is None: |
| raise KeyError(f"Escalation {record_id!r} not found") |
| record.update(changes) |
| record["updated_at"] = datetime.now(timezone.utc).isoformat() |
| return self._store.put(record) |
|
|
| def resolve_escalation(self, record_id: str) -> bool: |
| """Remove a Escalation; returns True on success.""" |
| if self._store.get(record_id) is None: |
| return False |
| self._store.delete(record_id) |
| logger.info("resolve_escalation: removed %s", record_id) |
| return True |
|
|
| def list_escalations( |
| self, |
| status: Optional[str] = None, |
| limit: int = 50, |
| offset: int = 0, |
| ) -> List[Dict[str, Any]]: |
| """Return paginated Escalation records.""" |
| query: Dict[str, Any] = {} |
| if status: |
| query["status"] = status |
| results = self._store.find(query, limit=limit, offset=offset) |
| logger.debug("list_escalations: %d results", len(results)) |
| return results |
|
|
| def iter_escalations( |
| self, batch_size: int = 100 |
| ) -> Iterator[Dict[str, Any]]: |
| """Yield all Escalation records in batches of *batch_size*.""" |
| offset = 0 |
| while True: |
| page = self.list_escalations(limit=batch_size, offset=offset) |
| if not page: |
| break |
| yield from page |
| if len(page) < batch_size: |
| break |
| offset += batch_size |
| |