SentinelAI / parsers /parser_agent.py
iitian's picture
Sync SentinelAI project and add Hugging Face Docker Space layout.
8b3905d
"""Parser agent: raw logs -> structured SecurityEvent."""
from __future__ import annotations
import json
import re
from datetime import datetime, timezone
from typing import Any
from models.schemas import RawLogIngest, SecurityEvent, Severity
_SSH_FAIL = re.compile(
r"^(?P<ts>\w+\s+\d+\s+\d+:\d+:\d+).+sshd.*Failed password for (?:invalid user )?(?P<user>\S+) from (?P<ip>[\d.]+)"
)
_SSH_OK = re.compile(
r"^(?P<ts>\w+\s+\d+\s+\d+:\d+:\d+).+sshd.*Accepted (?P<method>\S+) for (?P<user>\S+) from (?P<ip>[\d.]+)"
)
_SUDO = re.compile(
r"^(?P<ts>\w+\s+\d+\s+\d+:\d+:\d+).+sudo.+USER=(?P<target>\S+).+COMMAND=(?P<cmd>.+)"
)
_NGINX = re.compile(
r'^(?P<ip>[\d.]+).+\[(?P<date>[^\]]+)\]."(?P<method>\S+)\s+(?P<path>\S+).*" (?P<code>\d+)'
)
_K8S = re.compile(r"type=(?P<type>\S+)\s+reason=(?P<reason>\S+).*name=(?P<name>\S+)")
def _parse_ts_ssh(s: str) -> datetime:
"""Best-effort parse for auth.log style timestamps (current year)."""
now = datetime.now(timezone.utc)
try:
dt = datetime.strptime(f"{now.year} {s}", "%Y %b %d %H:%M:%S")
return dt.replace(tzinfo=timezone.utc)
except ValueError:
return now
def parse_raw(ingest: RawLogIngest) -> SecurityEvent:
line = ingest.raw_line.strip()
meta = ingest.metadata or {}
if ingest.source == "json" or line.startswith("{"):
try:
data = json.loads(line)
sev_raw = str(data.get("severity", "info")).lower()
try:
sev = Severity(sev_raw)
except ValueError:
sev = Severity.INFO
return SecurityEvent(
timestamp=_parse_iso(data.get("timestamp")),
event_type=str(data.get("event_type", "generic")),
source_ip=data.get("source_ip"),
host=str(data.get("host", meta.get("host", "unknown"))),
severity=sev,
message=str(data.get("message", line)),
raw={"source": ingest.source, "line": line},
normalized=data,
)
except json.JSONDecodeError:
pass
m = _SSH_FAIL.search(line)
if m:
return SecurityEvent(
timestamp=_parse_ts_ssh(m.group("ts")),
event_type="auth.ssh_failed",
source_ip=m.group("ip"),
host=meta.get("host", "edge-01"),
severity=Severity.MEDIUM,
message=f"Failed SSH for {m.group('user')} from {m.group('ip')}",
raw={"source": ingest.source, "line": line},
normalized={"user": m.group("user")},
)
m = _SSH_OK.search(line)
if m:
return SecurityEvent(
timestamp=_parse_ts_ssh(m.group("ts")),
event_type="auth.ssh_success",
source_ip=m.group("ip"),
host=meta.get("host", "edge-01"),
severity=Severity.LOW,
message=f"Accepted SSH for {m.group('user')} via {m.group('method')}",
raw={"source": ingest.source, "line": line},
normalized={"user": m.group("user"), "method": m.group("method")},
)
m = _SUDO.search(line)
if m:
return SecurityEvent(
timestamp=_parse_ts_ssh(m.group("ts")),
event_type="privilege.sudo",
source_ip=meta.get("source_ip"),
host=meta.get("host", "edge-01"),
severity=Severity.HIGH,
message=f"sudo escalation to {m.group('target')}: {m.group('cmd')[:120]}",
raw={"source": ingest.source, "line": line},
normalized={"target_user": m.group("target"), "command": m.group("cmd")},
)
m = _NGINX.search(line)
if m:
code = int(m.group("code"))
sev = Severity.HIGH if code in (401, 403) else Severity.INFO
return SecurityEvent(
timestamp=_parse_nginx_date(m.group("date")),
event_type="web.request",
source_ip=m.group("ip"),
host=meta.get("host", "web-01"),
severity=sev,
message=f"{m.group('method')} {m.group('path')} -> {code}",
raw={"source": ingest.source, "line": line},
normalized={"path": m.group("path"), "status": code},
)
m = _K8S.search(line)
if m:
return SecurityEvent(
timestamp=datetime.now(timezone.utc),
event_type="k8s.event",
source_ip=None,
host=meta.get("host", "k8s-api"),
severity=Severity.MEDIUM,
message=f"K8s {m.group('type')} {m.group('reason')} {m.group('name')}",
raw={"source": ingest.source, "line": line},
normalized={"k8s_type": m.group("type"), "reason": m.group("reason")},
)
return SecurityEvent(
timestamp=datetime.now(timezone.utc),
event_type="generic.syslog",
source_ip=meta.get("source_ip"),
host=meta.get("host", "unknown"),
severity=Severity.INFO,
message=line[:512],
raw={"source": ingest.source, "line": line},
normalized={},
)
def _parse_iso(ts: Any) -> datetime:
if ts is None:
return datetime.now(timezone.utc)
if isinstance(ts, datetime):
return ts if ts.tzinfo else ts.replace(tzinfo=timezone.utc)
try:
return datetime.fromisoformat(str(ts).replace("Z", "+00:00"))
except ValueError:
return datetime.now(timezone.utc)
def _parse_nginx_date(s: str) -> datetime:
try:
return datetime.strptime(s.split()[0], "%d/%b/%Y:%H:%M:%S").replace(tzinfo=timezone.utc)
except (ValueError, IndexError):
return datetime.now(timezone.utc)