runv-server

server tooling for runv.club
Log | Files | Refs | README

runv_jail.py (10989B)


      1 from __future__ import annotations
      2 
      3 import logging
      4 import os
      5 import shutil
      6 import subprocess
      7 import tempfile
      8 from pathlib import Path
      9 
     10 RUNV_JAILED_GROUP = "runv-jailed"
     11 JAIL_SKIP_USERNAMES = frozenset({"entre", "pmurad-admin"})
     12 JAIL_ROOT = Path("/srv/jail")
     13 FSTAB_PATH = Path("/etc/fstab")
     14 
     15 
     16 def jail_skip_username(username: str) -> bool:
     17     return username in JAIL_SKIP_USERNAMES
     18 
     19 
     20 def _run(cmd: list[str], *, log: logging.Logger) -> subprocess.CompletedProcess[str]:
     21     log.debug("exec: %s", " ".join(cmd))
     22     return subprocess.run(cmd, capture_output=True, text=True, timeout=600)
     23 
     24 
     25 def is_mounted(path: Path, log: logging.Logger) -> bool:
     26     """Detecta mountpoint usando findmnt quando disponível; fallback para os.path.ismount."""
     27     p = str(path.resolve())
     28     if shutil.which("findmnt") is not None:
     29         r = _run(["findmnt", "-R", "--target", p], log=log)
     30         if r.returncode == 0 and (r.stdout or "").strip():
     31             return True
     32         if r.returncode not in (0, 1):
     33             log.debug("findmnt %s: %s", p, (r.stderr or r.stdout or "").strip())
     34     return os.path.ismount(path)
     35 
     36 
     37 def ensure_runv_jailed_group(log: logging.Logger) -> None:
     38     r = _run(["groupadd", "-f", RUNV_JAILED_GROUP], log=log)
     39     if r.returncode != 0:
     40         err = (r.stderr or r.stdout or "").strip()
     41         raise RuntimeError(f"groupadd -f {RUNV_JAILED_GROUP} falhou: {err}")
     42 
     43 
     44 def ensure_user_in_jailed_group(username: str, log: logging.Logger) -> None:
     45     ensure_runv_jailed_group(log)
     46     r = _run(["getent", "group", RUNV_JAILED_GROUP], log=log)
     47     if r.returncode != 0 or not (r.stdout or "").strip():
     48         raise RuntimeError("grupo runv-jailed não existe após groupadd")
     49     line = (r.stdout or "").strip()
     50     members_field = line.split(":")[-1] if ":" in line else ""
     51     members = {m.strip() for m in members_field.split(",") if m.strip()}
     52     if username in members:
     53         log.debug("jail: %s já está em %s", username, RUNV_JAILED_GROUP)
     54         return
     55     r2 = _run(["usermod", "-aG", RUNV_JAILED_GROUP, username], log=log)
     56     if r2.returncode != 0:
     57         err = (r2.stderr or r2.stdout or "").strip()
     58         raise RuntimeError(f"usermod -aG {RUNV_JAILED_GROUP} {username}: {err}")
     59     log.info("jail: utilizador %s adicionado ao grupo %s", username, RUNV_JAILED_GROUP)
     60 
     61 
     62 def fstab_bind_line(real_home: Path, jail_mount_point: Path) -> str:
     63     src = str(real_home.resolve())
     64     dst = str(jail_mount_point.resolve())
     65     return f"{src}\t{dst}\tnone\tbind,nofail\t0\t0\n"
     66 
     67 
     68 def fstab_has_bind(real_home: Path, jail_mount_point: Path) -> bool:
     69     if not FSTAB_PATH.is_file():
     70         return False
     71     text = FSTAB_PATH.read_text(encoding="utf-8", errors="replace")
     72     src = str(real_home.resolve())
     73     dst = str(jail_mount_point.resolve())
     74     for raw in text.splitlines():
     75         line = raw.strip()
     76         if not line or line.startswith("#"):
     77             continue
     78         parts = line.split()
     79         if len(parts) >= 2 and parts[0] == src and parts[1] == dst:
     80             return True
     81     return False
     82 
     83 
     84 def append_fstab_bind(real_home: Path, jail_mount_point: Path, log: logging.Logger) -> None:
     85     if fstab_has_bind(real_home, jail_mount_point):
     86         log.debug("jail: fstab já contém bind %s -> %s", real_home, jail_mount_point)
     87         return
     88     with open(FSTAB_PATH, "a", encoding="utf-8") as f:
     89         f.write(fstab_bind_line(real_home, jail_mount_point))
     90     log.info("jail: fstab atualizado (bind %s)", real_home.name)
     91 
     92 
     93 def remove_fstab_bind(real_home: Path, jail_mount_point: Path, log: logging.Logger) -> bool:
     94     """Remove a linha de bind correspondente de ``/etc/fstab``. Devolve True se alterou o ficheiro."""
     95     if not FSTAB_PATH.is_file():
     96         return False
     97     src = str(real_home.resolve())
     98     dst = str(jail_mount_point.resolve())
     99     text = FSTAB_PATH.read_text(encoding="utf-8", errors="replace")
    100     lines = text.splitlines(keepends=True)
    101     out: list[str] = []
    102     removed = False
    103     for line in lines:
    104         s = line.strip()
    105         if not s or s.startswith("#"):
    106             out.append(line)
    107             continue
    108         parts = s.split()
    109         if len(parts) >= 2 and parts[0] == src and parts[1] == dst:
    110             removed = True
    111             log.info("jail: removida linha fstab bind %s -> %s", src, dst)
    112             continue
    113         out.append(line)
    114     if not removed:
    115         return False
    116     new_body = "".join(out)
    117     fd, tmp_name = tempfile.mkstemp(
    118         prefix="fstab.",
    119         suffix=".tmp",
    120         dir=str(FSTAB_PATH.parent),
    121     )
    122     tmp_path = Path(tmp_name)
    123     try:
    124         with os.fdopen(fd, "w", encoding="utf-8") as f:
    125             f.write(new_body)
    126             f.flush()
    127             os.fsync(f.fileno())
    128         os.replace(tmp_path, FSTAB_PATH)
    129     except Exception:
    130         tmp_path.unlink(missing_ok=True)
    131         raise
    132     return True
    133 
    134 
    135 def jail_bind_mountpoint(username: str) -> Path:
    136     """Caminho dentro do chroot onde a home real é montada (bind)."""
    137     return JAIL_ROOT / username / "home" / username
    138 
    139 
    140 def remove_user_from_jailed_group(username: str, log: logging.Logger) -> None:
    141     """Remove o utilizador do grupo ``runv-jailed`` (idempotente)."""
    142     r = _run(["getent", "group", RUNV_JAILED_GROUP], log=log)
    143     if r.returncode != 0 or not (r.stdout or "").strip():
    144         log.debug("jail: grupo %s inexistente — nada a remover", RUNV_JAILED_GROUP)
    145         return
    146     line = (r.stdout or "").strip()
    147     members_field = line.split(":")[-1] if ":" in line else ""
    148     members = {m.strip() for m in members_field.split(",") if m.strip()}
    149     if username not in members:
    150         log.debug("jail: %s já não está em %s", username, RUNV_JAILED_GROUP)
    151         return
    152     r2 = _run(["gpasswd", "-d", username, RUNV_JAILED_GROUP], log=log)
    153     if r2.returncode != 0:
    154         err = (r2.stderr or r2.stdout or "").strip()
    155         raise RuntimeError(f"gpasswd -d {username} {RUNV_JAILED_GROUP}: {err}")
    156     log.info("jail: %s removido do grupo %s", username, RUNV_JAILED_GROUP)
    157 
    158 
    159 def unbind_jail_home(jail_home: Path, log: logging.Logger) -> None:
    160     """Desmonta o bind em ``jail_home`` se estiver montado."""
    161     if not is_mounted(jail_home, log):
    162         log.debug("jail: %s não está montado", jail_home)
    163         return
    164     r = _run(["umount", str(jail_home.resolve())], log=log)
    165     if r.returncode != 0:
    166         err = (r.stderr or r.stdout or "").strip()
    167         log.warning("umount %s falhou (%s); tentando lazy umount", jail_home, err)
    168         lazy = _run(["umount", "-l", str(jail_home.resolve())], log=log)
    169         if lazy.returncode != 0:
    170             lazy_err = (lazy.stderr or lazy.stdout or "").strip()
    171             raise RuntimeError(f"umount {jail_home}: {err}; umount -l: {lazy_err}")
    172         log.info("jail: lazy umount em %s", jail_home)
    173         return
    174     log.info("jail: desmontado bind em %s", jail_home)
    175 
    176 
    177 def ensure_jail_layout(
    178     username: str,
    179     home: Path,
    180     log: logging.Logger,
    181     *,
    182     jk_profile: str = "extendedshell",
    183     no_jk_init: bool = False,
    184 ) -> Path:
    185     """
    186     Cria ``/srv/jail/user``, opcionalmente ``jk_init`` (perfil Jailkit), ``home/user``.
    187     Devolve o caminho do mountpoint do bind.
    188     """
    189     jail_root = JAIL_ROOT / username
    190     jail_root.mkdir(parents=True, exist_ok=True)
    191     os.chmod(jail_root, 0o755)
    192     try:
    193         os.chown(jail_root, 0, 0)
    194     except OSError as e:
    195         log.warning("jail: chown root em %s: %s", jail_root, e)
    196     marker = jail_root / "bin"
    197     if not marker.exists():
    198         if no_jk_init:
    199             raise RuntimeError(
    200                 f"jail: {jail_root} sem layout Jailkit (falta bin/) e --no-jk-init foi pedido — "
    201                 "crie o jail manualmente ou execute sem --no-jk-init."
    202             )
    203         if shutil.which("jk_init") is None:
    204             raise RuntimeError("jk_init não encontrado — instale jailkit e corra tools/tools.py")
    205         prof = (jk_profile or "extendedshell").strip()
    206         r = _run(["jk_init", "-j", str(jail_root), prof], log=log)
    207         if r.returncode != 0:
    208             err = (r.stderr or r.stdout or "").strip()
    209             raise RuntimeError(f"jk_init {prof!r} falhou: {err}")
    210         log.info("jail: jk_init %s em %s", prof, jail_root)
    211     else:
    212         log.debug("jail: %s já tem layout jk (bin presente)", jail_root)
    213     inner = jail_root / "home" / username
    214     inner.mkdir(parents=True, exist_ok=True)
    215     hp = inner.parent
    216     try:
    217         os.chmod(hp, 0o755)
    218         os.chown(hp, 0, 0)
    219         os.chown(inner, 0, 0)
    220     except OSError as e:
    221         log.warning("jail: permissões em %s: %s", inner, e)
    222     return inner
    223 
    224 
    225 def ensure_bind_mount(real_home: Path, jail_home: Path, log: logging.Logger) -> None:
    226     if os.path.ismount(jail_home):
    227         log.debug("jail: %s já montado", jail_home)
    228         return
    229     r = _run(
    230         ["mount", "--bind", str(real_home.resolve()), str(jail_home.resolve())],
    231         log=log,
    232     )
    233     if r.returncode != 0:
    234         err = (r.stderr or r.stdout or "").strip()
    235         raise RuntimeError(f"mount --bind falhou: {err}")
    236     log.info("jail: bind mount %s -> %s", real_home, jail_home)
    237 
    238 
    239 def ensure_runv_jail_for_user(
    240     username: str,
    241     home: Path,
    242     *,
    243     no_jail: bool,
    244     log: logging.Logger,
    245     jk_profile: str = "extendedshell",
    246     no_jk_init: bool = False,
    247 ) -> None:
    248     if no_jail:
    249         log.info("jail: omitido (--no-jail)")
    250         return
    251     if jail_skip_username(username):
    252         log.info("jail: omitido (conta excluída: %s)", username)
    253         return
    254     home = home.resolve()
    255     ensure_user_in_jailed_group(username, log)
    256     jail_home = ensure_jail_layout(
    257         username,
    258         home,
    259         log,
    260         jk_profile=jk_profile,
    261         no_jk_init=no_jk_init,
    262     )
    263     ensure_bind_mount(home, jail_home, log)
    264     append_fstab_bind(home, jail_home, log)
    265 
    266 
    267 def teardown_runv_jail_for_user(
    268     username: str,
    269     home: Path,
    270     log: logging.Logger,
    271     *,
    272     dry_run: bool = False,
    273 ) -> None:
    274     """
    275     Inverte ``ensure_runv_jail_for_user``: umount do bind, remove linha fstab, gpasswd -d,
    276     apaga ``/srv/jail/<user>``. Omitido para contas em ``JAIL_SKIP_USERNAMES``.
    277     """
    278     if jail_skip_username(username):
    279         log.info("jail teardown: omitido (conta excluída: %s)", username)
    280         return
    281     real_home = home.resolve()
    282     jail_home = jail_bind_mountpoint(username)
    283     jail_root = JAIL_ROOT / username
    284     if dry_run:
    285         log.info(
    286             "jail teardown [dry-run]: umount %s, fstab, gpasswd -d, rmtree %s",
    287             jail_home,
    288             jail_root,
    289         )
    290         return
    291     unbind_jail_home(jail_home, log)
    292     remove_fstab_bind(real_home, jail_home, log)
    293     remove_user_from_jailed_group(username, log)
    294     if jail_root.is_dir():
    295         unbind_jail_home(jail_home, log)
    296         shutil.rmtree(jail_root, ignore_errors=False)
    297         log.info("jail: removido %s", jail_root)
    298     elif jail_root.exists():
    299         log.warning("jail: %s existe mas não é directório; não removido automaticamente", jail_root)