Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Header, HTTPException | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| from backend.api.storage.rules_store import RulesStore | |
| router = APIRouter() | |
| rules_store = RulesStore() | |
| class RulePayload(BaseModel): | |
| rule: str | |
| class BulkRulePayload(BaseModel): | |
| rules: List[str] | |
| def get_rules_for_tenant(tenant_id: str) -> List[str]: | |
| return rules_store.get_rules(tenant_id) | |
| async def get_redflag_rules( | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| Returns all red-flag rules for this tenant. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "rules": get_rules_for_tenant(x_tenant_id) | |
| } | |
| async def add_redflag_rule( | |
| payload: Optional[RulePayload] = None, | |
| rule: Optional[str] = None, | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| Adds a new red-flag rule to this tenant. | |
| Accepts either JSON body {"rule": "..."} or query parameter ?rule=... | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| rule_value = payload.rule if payload else rule | |
| if not rule_value: | |
| raise HTTPException(status_code=400, detail="Missing rule text") | |
| rule_value = rule_value.strip() | |
| if not rule_value: | |
| raise HTTPException(status_code=400, detail="Rule cannot be empty") | |
| rules_store.add_rule(x_tenant_id, rule_value) | |
| rules = get_rules_for_tenant(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "added_rule": rule_value, | |
| "rules": rules | |
| } | |
| async def add_redflag_rules_bulk( | |
| payload: BulkRulePayload, | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| Adds multiple rules in one call. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| if not payload.rules: | |
| raise HTTPException(status_code=400, detail="No rules provided") | |
| cleaned = [rule.strip() for rule in payload.rules if rule.strip()] | |
| added = rules_store.add_rules_bulk(x_tenant_id, cleaned) | |
| rules = get_rules_for_tenant(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "added_rules": added, | |
| "rules": rules | |
| } | |
| async def delete_redflag_rule( | |
| rule: str, | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| Deletes a red-flag rule for this tenant. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| deleted = rules_store.delete_rule(x_tenant_id, rule) | |
| if not deleted: | |
| raise HTTPException(status_code=404, detail="Rule not found") | |
| rules = get_rules_for_tenant(x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "deleted_rule": rule, | |
| "rules": rules | |
| } | |