| import io |
| import os |
| import re |
| import sys |
|
|
| from ._core import Process |
|
|
| |
| |
| |
| BSD_STAT_PPID = 2 |
|
|
| |
| LINUX_STAT_PPID = 3 |
|
|
| STAT_PATTERN = re.compile(r"\(.+\)|\S+") |
|
|
|
|
| def detect_proc(): |
| """Detect /proc filesystem style. |
| |
| This checks the /proc/{pid} directory for possible formats. Returns one of |
| the following as str: |
| |
| * `stat`: Linux-style, i.e. ``/proc/{pid}/stat``. |
| * `status`: BSD-style, i.e. ``/proc/{pid}/status``. |
| """ |
| pid = os.getpid() |
| for name in ("stat", "status"): |
| if os.path.exists(os.path.join("/proc", str(pid), name)): |
| return name |
| raise ProcFormatError("unsupported proc format") |
|
|
|
|
| def _use_bsd_stat_format(): |
| try: |
| return os.uname().sysname.lower() in ("freebsd", "netbsd", "dragonfly") |
| except Exception: |
| return False |
|
|
|
|
| def _get_ppid(pid, name): |
| path = os.path.join("/proc", str(pid), name) |
| with io.open(path, encoding="ascii", errors="replace") as f: |
| parts = STAT_PATTERN.findall(f.read()) |
| |
| if _use_bsd_stat_format(): |
| return parts[BSD_STAT_PPID] |
| return parts[LINUX_STAT_PPID] |
|
|
|
|
| def _get_cmdline(pid): |
| path = os.path.join("/proc", str(pid), "cmdline") |
| encoding = sys.getfilesystemencoding() or "utf-8" |
| with io.open(path, encoding=encoding, errors="replace") as f: |
| |
| |
| |
| |
| return tuple(f.read().split("\0")[:-1]) |
|
|
|
|
| class ProcFormatError(EnvironmentError): |
| pass |
|
|
|
|
| def iter_process_parents(pid, max_depth=10): |
| """Try to look up the process tree via the /proc interface.""" |
| stat_name = detect_proc() |
|
|
| |
| |
| |
| def _iter_process_parents(pid, max_depth): |
| for _ in range(max_depth): |
| ppid = _get_ppid(pid, stat_name) |
| args = _get_cmdline(pid) |
| yield Process(args=args, pid=pid, ppid=ppid) |
| if ppid == "0": |
| break |
| pid = ppid |
|
|
| return _iter_process_parents(pid, max_depth) |
|
|