runv_jail.py (10989B)
1 from __future__ import annotations 2 3 import logging 4 import os 5 import shutil 6 import subprocess 7 import tempfile 8 from pathlib import Path 9 10 RUNV_JAILED_GROUP = "runv-jailed" 11 JAIL_SKIP_USERNAMES = frozenset({"entre", "pmurad-admin"}) 12 JAIL_ROOT = Path("/srv/jail") 13 FSTAB_PATH = Path("/etc/fstab") 14 15 16 def jail_skip_username(username: str) -> bool: 17 return username in JAIL_SKIP_USERNAMES 18 19 20 def _run(cmd: list[str], *, log: logging.Logger) -> subprocess.CompletedProcess[str]: 21 log.debug("exec: %s", " ".join(cmd)) 22 return subprocess.run(cmd, capture_output=True, text=True, timeout=600) 23 24 25 def is_mounted(path: Path, log: logging.Logger) -> bool: 26 """Detecta mountpoint usando findmnt quando disponível; fallback para os.path.ismount.""" 27 p = str(path.resolve()) 28 if shutil.which("findmnt") is not None: 29 r = _run(["findmnt", "-R", "--target", p], log=log) 30 if r.returncode == 0 and (r.stdout or "").strip(): 31 return True 32 if r.returncode not in (0, 1): 33 log.debug("findmnt %s: %s", p, (r.stderr or r.stdout or "").strip()) 34 return os.path.ismount(path) 35 36 37 def ensure_runv_jailed_group(log: logging.Logger) -> None: 38 r = _run(["groupadd", "-f", RUNV_JAILED_GROUP], log=log) 39 if r.returncode != 0: 40 err = (r.stderr or r.stdout or "").strip() 41 raise RuntimeError(f"groupadd -f {RUNV_JAILED_GROUP} falhou: {err}") 42 43 44 def ensure_user_in_jailed_group(username: str, log: logging.Logger) -> None: 45 ensure_runv_jailed_group(log) 46 r = _run(["getent", "group", RUNV_JAILED_GROUP], log=log) 47 if r.returncode != 0 or not (r.stdout or "").strip(): 48 raise RuntimeError("grupo runv-jailed não existe após groupadd") 49 line = (r.stdout or "").strip() 50 members_field = line.split(":")[-1] if ":" in line else "" 51 members = {m.strip() for m in members_field.split(",") if m.strip()} 52 if username in members: 53 log.debug("jail: %s já está em %s", username, RUNV_JAILED_GROUP) 54 return 55 r2 = _run(["usermod", "-aG", RUNV_JAILED_GROUP, username], log=log) 56 if r2.returncode != 0: 57 err = (r2.stderr or r2.stdout or "").strip() 58 raise RuntimeError(f"usermod -aG {RUNV_JAILED_GROUP} {username}: {err}") 59 log.info("jail: utilizador %s adicionado ao grupo %s", username, RUNV_JAILED_GROUP) 60 61 62 def fstab_bind_line(real_home: Path, jail_mount_point: Path) -> str: 63 src = str(real_home.resolve()) 64 dst = str(jail_mount_point.resolve()) 65 return f"{src}\t{dst}\tnone\tbind,nofail\t0\t0\n" 66 67 68 def fstab_has_bind(real_home: Path, jail_mount_point: Path) -> bool: 69 if not FSTAB_PATH.is_file(): 70 return False 71 text = FSTAB_PATH.read_text(encoding="utf-8", errors="replace") 72 src = str(real_home.resolve()) 73 dst = str(jail_mount_point.resolve()) 74 for raw in text.splitlines(): 75 line = raw.strip() 76 if not line or line.startswith("#"): 77 continue 78 parts = line.split() 79 if len(parts) >= 2 and parts[0] == src and parts[1] == dst: 80 return True 81 return False 82 83 84 def append_fstab_bind(real_home: Path, jail_mount_point: Path, log: logging.Logger) -> None: 85 if fstab_has_bind(real_home, jail_mount_point): 86 log.debug("jail: fstab já contém bind %s -> %s", real_home, jail_mount_point) 87 return 88 with open(FSTAB_PATH, "a", encoding="utf-8") as f: 89 f.write(fstab_bind_line(real_home, jail_mount_point)) 90 log.info("jail: fstab atualizado (bind %s)", real_home.name) 91 92 93 def remove_fstab_bind(real_home: Path, jail_mount_point: Path, log: logging.Logger) -> bool: 94 """Remove a linha de bind correspondente de ``/etc/fstab``. Devolve True se alterou o ficheiro.""" 95 if not FSTAB_PATH.is_file(): 96 return False 97 src = str(real_home.resolve()) 98 dst = str(jail_mount_point.resolve()) 99 text = FSTAB_PATH.read_text(encoding="utf-8", errors="replace") 100 lines = text.splitlines(keepends=True) 101 out: list[str] = [] 102 removed = False 103 for line in lines: 104 s = line.strip() 105 if not s or s.startswith("#"): 106 out.append(line) 107 continue 108 parts = s.split() 109 if len(parts) >= 2 and parts[0] == src and parts[1] == dst: 110 removed = True 111 log.info("jail: removida linha fstab bind %s -> %s", src, dst) 112 continue 113 out.append(line) 114 if not removed: 115 return False 116 new_body = "".join(out) 117 fd, tmp_name = tempfile.mkstemp( 118 prefix="fstab.", 119 suffix=".tmp", 120 dir=str(FSTAB_PATH.parent), 121 ) 122 tmp_path = Path(tmp_name) 123 try: 124 with os.fdopen(fd, "w", encoding="utf-8") as f: 125 f.write(new_body) 126 f.flush() 127 os.fsync(f.fileno()) 128 os.replace(tmp_path, FSTAB_PATH) 129 except Exception: 130 tmp_path.unlink(missing_ok=True) 131 raise 132 return True 133 134 135 def jail_bind_mountpoint(username: str) -> Path: 136 """Caminho dentro do chroot onde a home real é montada (bind).""" 137 return JAIL_ROOT / username / "home" / username 138 139 140 def remove_user_from_jailed_group(username: str, log: logging.Logger) -> None: 141 """Remove o utilizador do grupo ``runv-jailed`` (idempotente).""" 142 r = _run(["getent", "group", RUNV_JAILED_GROUP], log=log) 143 if r.returncode != 0 or not (r.stdout or "").strip(): 144 log.debug("jail: grupo %s inexistente — nada a remover", RUNV_JAILED_GROUP) 145 return 146 line = (r.stdout or "").strip() 147 members_field = line.split(":")[-1] if ":" in line else "" 148 members = {m.strip() for m in members_field.split(",") if m.strip()} 149 if username not in members: 150 log.debug("jail: %s já não está em %s", username, RUNV_JAILED_GROUP) 151 return 152 r2 = _run(["gpasswd", "-d", username, RUNV_JAILED_GROUP], log=log) 153 if r2.returncode != 0: 154 err = (r2.stderr or r2.stdout or "").strip() 155 raise RuntimeError(f"gpasswd -d {username} {RUNV_JAILED_GROUP}: {err}") 156 log.info("jail: %s removido do grupo %s", username, RUNV_JAILED_GROUP) 157 158 159 def unbind_jail_home(jail_home: Path, log: logging.Logger) -> None: 160 """Desmonta o bind em ``jail_home`` se estiver montado.""" 161 if not is_mounted(jail_home, log): 162 log.debug("jail: %s não está montado", jail_home) 163 return 164 r = _run(["umount", str(jail_home.resolve())], log=log) 165 if r.returncode != 0: 166 err = (r.stderr or r.stdout or "").strip() 167 log.warning("umount %s falhou (%s); tentando lazy umount", jail_home, err) 168 lazy = _run(["umount", "-l", str(jail_home.resolve())], log=log) 169 if lazy.returncode != 0: 170 lazy_err = (lazy.stderr or lazy.stdout or "").strip() 171 raise RuntimeError(f"umount {jail_home}: {err}; umount -l: {lazy_err}") 172 log.info("jail: lazy umount em %s", jail_home) 173 return 174 log.info("jail: desmontado bind em %s", jail_home) 175 176 177 def ensure_jail_layout( 178 username: str, 179 home: Path, 180 log: logging.Logger, 181 *, 182 jk_profile: str = "extendedshell", 183 no_jk_init: bool = False, 184 ) -> Path: 185 """ 186 Cria ``/srv/jail/user``, opcionalmente ``jk_init`` (perfil Jailkit), ``home/user``. 187 Devolve o caminho do mountpoint do bind. 188 """ 189 jail_root = JAIL_ROOT / username 190 jail_root.mkdir(parents=True, exist_ok=True) 191 os.chmod(jail_root, 0o755) 192 try: 193 os.chown(jail_root, 0, 0) 194 except OSError as e: 195 log.warning("jail: chown root em %s: %s", jail_root, e) 196 marker = jail_root / "bin" 197 if not marker.exists(): 198 if no_jk_init: 199 raise RuntimeError( 200 f"jail: {jail_root} sem layout Jailkit (falta bin/) e --no-jk-init foi pedido — " 201 "crie o jail manualmente ou execute sem --no-jk-init." 202 ) 203 if shutil.which("jk_init") is None: 204 raise RuntimeError("jk_init não encontrado — instale jailkit e corra tools/tools.py") 205 prof = (jk_profile or "extendedshell").strip() 206 r = _run(["jk_init", "-j", str(jail_root), prof], log=log) 207 if r.returncode != 0: 208 err = (r.stderr or r.stdout or "").strip() 209 raise RuntimeError(f"jk_init {prof!r} falhou: {err}") 210 log.info("jail: jk_init %s em %s", prof, jail_root) 211 else: 212 log.debug("jail: %s já tem layout jk (bin presente)", jail_root) 213 inner = jail_root / "home" / username 214 inner.mkdir(parents=True, exist_ok=True) 215 hp = inner.parent 216 try: 217 os.chmod(hp, 0o755) 218 os.chown(hp, 0, 0) 219 os.chown(inner, 0, 0) 220 except OSError as e: 221 log.warning("jail: permissões em %s: %s", inner, e) 222 return inner 223 224 225 def ensure_bind_mount(real_home: Path, jail_home: Path, log: logging.Logger) -> None: 226 if os.path.ismount(jail_home): 227 log.debug("jail: %s já montado", jail_home) 228 return 229 r = _run( 230 ["mount", "--bind", str(real_home.resolve()), str(jail_home.resolve())], 231 log=log, 232 ) 233 if r.returncode != 0: 234 err = (r.stderr or r.stdout or "").strip() 235 raise RuntimeError(f"mount --bind falhou: {err}") 236 log.info("jail: bind mount %s -> %s", real_home, jail_home) 237 238 239 def ensure_runv_jail_for_user( 240 username: str, 241 home: Path, 242 *, 243 no_jail: bool, 244 log: logging.Logger, 245 jk_profile: str = "extendedshell", 246 no_jk_init: bool = False, 247 ) -> None: 248 if no_jail: 249 log.info("jail: omitido (--no-jail)") 250 return 251 if jail_skip_username(username): 252 log.info("jail: omitido (conta excluída: %s)", username) 253 return 254 home = home.resolve() 255 ensure_user_in_jailed_group(username, log) 256 jail_home = ensure_jail_layout( 257 username, 258 home, 259 log, 260 jk_profile=jk_profile, 261 no_jk_init=no_jk_init, 262 ) 263 ensure_bind_mount(home, jail_home, log) 264 append_fstab_bind(home, jail_home, log) 265 266 267 def teardown_runv_jail_for_user( 268 username: str, 269 home: Path, 270 log: logging.Logger, 271 *, 272 dry_run: bool = False, 273 ) -> None: 274 """ 275 Inverte ``ensure_runv_jail_for_user``: umount do bind, remove linha fstab, gpasswd -d, 276 apaga ``/srv/jail/<user>``. Omitido para contas em ``JAIL_SKIP_USERNAMES``. 277 """ 278 if jail_skip_username(username): 279 log.info("jail teardown: omitido (conta excluída: %s)", username) 280 return 281 real_home = home.resolve() 282 jail_home = jail_bind_mountpoint(username) 283 jail_root = JAIL_ROOT / username 284 if dry_run: 285 log.info( 286 "jail teardown [dry-run]: umount %s, fstab, gpasswd -d, rmtree %s", 287 jail_home, 288 jail_root, 289 ) 290 return 291 unbind_jail_home(jail_home, log) 292 remove_fstab_bind(real_home, jail_home, log) 293 remove_user_from_jailed_group(username, log) 294 if jail_root.is_dir(): 295 unbind_jail_home(jail_home, log) 296 shutil.rmtree(jail_root, ignore_errors=False) 297 log.info("jail: removido %s", jail_root) 298 elif jail_root.exists(): 299 log.warning("jail: %s existe mas não é directório; não removido automaticamente", jail_root)