repair_user.py (9487B)
1 #!/usr/bin/env python3 2 """ 3 Repara artefatos básicos de membros runv sem sobrescrever conteúdo existente. 4 5 Uso típico: 6 sudo python3 scripts/admin/repair_user.py --user kirihito 7 sudo python3 scripts/admin/repair_user.py --all-users --dry-run --verbose 8 9 O script é intencionalmente conservador: 10 - corrige dono/modo da home e dos diretórios públicos esperados; 11 - cria diretórios ausentes; 12 - cria index.html, gophermap e index.gmi somente se estiverem ausentes; 13 - não faz chown recursivo e não toca em /var/vmail, email, Dovecot ou Maildir. 14 """ 15 16 from __future__ import annotations 17 18 import argparse 19 import json 20 import logging 21 import os 22 import pwd 23 import sys 24 from pathlib import Path 25 from typing import Final 26 27 _SCRIPT_DIR = Path(__file__).resolve().parent 28 if str(_SCRIPT_DIR) not in sys.path: 29 sys.path.insert(0, str(_SCRIPT_DIR)) 30 31 from admin_guard import ensure_admin_cli 32 33 import create_runv_user 34 35 DEFAULT_USERS_JSON: Final[Path] = Path("/var/lib/runv/users.json") 36 SKIP_USERS: Final[set[str]] = { 37 "root", 38 "daemon", 39 "bin", 40 "sys", 41 "sync", 42 "games", 43 "man", 44 "lp", 45 "mail", 46 "news", 47 "uucp", 48 "proxy", 49 "www-data", 50 "backup", 51 "list", 52 "irc", 53 "gnats", 54 "nobody", 55 "systemd-network", 56 "systemd-resolve", 57 "messagebus", 58 "polkitd", 59 "sshd", 60 "entre", 61 "pmurad-admin", 62 "vmail", 63 } 64 65 66 def setup_logging(verbose: bool) -> logging.Logger: 67 log = logging.getLogger("repair_user") 68 log.setLevel(logging.DEBUG if verbose else logging.INFO) 69 log.handlers.clear() 70 handler = logging.StreamHandler(sys.stderr) 71 handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s")) 72 log.addHandler(handler) 73 return log 74 75 76 def require_root(dry_run: bool, log: logging.Logger) -> None: 77 if dry_run: 78 return 79 if os.geteuid() != 0: 80 log.error("execute como root (ou use --dry-run)") 81 raise SystemExit(2) 82 83 84 def user_is_member_candidate(pw: pwd.struct_passwd) -> bool: 85 if pw.pw_name in SKIP_USERS: 86 return False 87 if pw.pw_uid < 1000: 88 return False 89 if not pw.pw_dir.startswith("/home/"): 90 return False 91 return True 92 93 94 def users_from_metadata(path: Path, log: logging.Logger) -> set[str]: 95 if not path.is_file(): 96 log.debug("metadata ausente: %s", path) 97 return set() 98 try: 99 data = json.loads(path.read_text(encoding="utf-8")) 100 except (OSError, json.JSONDecodeError) as e: 101 log.warning("não foi possível ler %s: %s", path, e) 102 return set() 103 if not isinstance(data, list): 104 log.warning("%s não contém lista de membros", path) 105 return set() 106 names: set[str] = set() 107 for item in data: 108 if not isinstance(item, dict): 109 continue 110 username = str(item.get("username", "")).strip() 111 if username and username not in SKIP_USERS: 112 names.add(username) 113 return names 114 115 116 def resolve_users(args: argparse.Namespace, log: logging.Logger) -> list[str]: 117 if args.user: 118 return [args.user.strip()] 119 120 names = users_from_metadata(args.users_json, log) 121 for pw in pwd.getpwall(): 122 if user_is_member_candidate(pw): 123 names.add(pw.pw_name) 124 return sorted(names) 125 126 127 def chmod_chown(path: Path, mode: int, uid: int, gid: int, *, dry_run: bool, log: logging.Logger) -> bool: 128 changed = False 129 st = path.stat() 130 cur_mode = st.st_mode & 0o777 131 if cur_mode != mode: 132 changed = True 133 if dry_run: 134 log.info("[dry-run] chmod %03o %s (era %03o)", mode, path, cur_mode) 135 else: 136 os.chmod(path, mode) 137 log.info("chmod %03o %s", mode, path) 138 if st.st_uid != uid or st.st_gid != gid: 139 changed = True 140 if dry_run: 141 log.info("[dry-run] chown %s:%s %s", uid, gid, path) 142 else: 143 os.chown(path, uid, gid) 144 log.info("chown %s:%s %s", uid, gid, path) 145 return changed 146 147 148 def ensure_dir(path: Path, mode: int, uid: int, gid: int, *, dry_run: bool, log: logging.Logger) -> bool: 149 changed = False 150 if path.exists() and not path.is_dir(): 151 raise RuntimeError(f"{path} existe mas não é diretório") 152 if not path.exists(): 153 changed = True 154 if dry_run: 155 log.info("[dry-run] mkdir -p %s", path) 156 else: 157 path.mkdir(parents=True, exist_ok=True) 158 log.info("criado diretório %s", path) 159 if path.exists(): 160 changed = chmod_chown(path, mode, uid, gid, dry_run=dry_run, log=log) or changed 161 return changed 162 163 164 def ensure_file( 165 path: Path, 166 body: str, 167 mode: int, 168 uid: int, 169 gid: int, 170 *, 171 dry_run: bool, 172 log: logging.Logger, 173 ) -> bool: 174 changed = False 175 if path.exists() and not path.is_file(): 176 raise RuntimeError(f"{path} existe mas não é arquivo regular") 177 if not path.exists(): 178 changed = True 179 if dry_run: 180 log.info("[dry-run] criaria %s", path) 181 else: 182 path.write_text(body, encoding="utf-8") 183 log.info("criado arquivo %s", path) 184 if path.exists(): 185 changed = chmod_chown(path, mode, uid, gid, dry_run=dry_run, log=log) or changed 186 return changed 187 188 189 def repair_one(username: str, *, dry_run: bool, log: logging.Logger) -> tuple[bool, list[str]]: 190 warnings: list[str] = [] 191 pw = pwd.getpwnam(username) 192 uid, gid = pw.pw_uid, pw.pw_gid 193 home = Path(pw.pw_dir) 194 changed = False 195 196 if username in SKIP_USERS: 197 warnings.append("usuário reservado ignorado") 198 return False, warnings 199 if not home.exists(): 200 raise RuntimeError(f"home ausente: {home}") 201 if not home.is_dir(): 202 raise RuntimeError(f"home não é diretório: {home}") 203 if not str(home).startswith("/home/"): 204 raise RuntimeError(f"home fora de /home; reparo recusado: {home}") 205 206 changed = chmod_chown(home, 0o755, uid, gid, dry_run=dry_run, log=log) or changed 207 changed = ensure_dir(home / ".ssh", 0o700, uid, gid, dry_run=dry_run, log=log) or changed 208 auth = home / ".ssh" / "authorized_keys" 209 if auth.exists(): 210 changed = chmod_chown(auth, 0o600, uid, gid, dry_run=dry_run, log=log) or changed 211 else: 212 warnings.append("authorized_keys ausente; não criado porque falta chave pública") 213 214 changed = ensure_dir(home / "public_html", 0o755, uid, gid, dry_run=dry_run, log=log) or changed 215 changed = ensure_file( 216 home / "public_html" / "index.html", 217 create_runv_user.default_index_html(username), 218 0o644, 219 uid, 220 gid, 221 dry_run=dry_run, 222 log=log, 223 ) or changed 224 225 changed = ensure_dir(home / "public_gopher", 0o755, uid, gid, dry_run=dry_run, log=log) or changed 226 changed = ensure_file( 227 home / "public_gopher" / "gophermap", 228 create_runv_user.default_gophermap_text(username), 229 0o644, 230 uid, 231 gid, 232 dry_run=dry_run, 233 log=log, 234 ) or changed 235 236 changed = ensure_dir(home / "public_gemini", 0o755, uid, gid, dry_run=dry_run, log=log) or changed 237 changed = ensure_file( 238 home / "public_gemini" / "index.gmi", 239 create_runv_user.default_gemini_index_gmi(username), 240 0o644, 241 uid, 242 gid, 243 dry_run=dry_run, 244 log=log, 245 ) or changed 246 247 return changed, warnings 248 249 250 def parse_args(argv: list[str] | None = None) -> argparse.Namespace: 251 p = argparse.ArgumentParser( 252 description="Repara diretórios e páginas iniciais ausentes de membros runv.", 253 ) 254 target = p.add_mutually_exclusive_group(required=True) 255 target.add_argument("--user", metavar="USER", help="repara apenas um usuário") 256 target.add_argument("--all-users", action="store_true", help="repara candidatos em users.json e /home") 257 p.add_argument("--users-json", type=Path, default=DEFAULT_USERS_JSON) 258 p.add_argument("--dry-run", action="store_true", help="mostra sem alterar") 259 p.add_argument("--verbose", "-v", action="store_true", help="log detalhado") 260 return p.parse_args(argv) 261 262 263 def main(argv: list[str] | None = None) -> int: 264 args = parse_args(argv) 265 ensure_admin_cli(script_name=Path(__file__).name, dry_run=bool(args.dry_run)) 266 log = setup_logging(args.verbose) 267 require_root(bool(args.dry_run), log) 268 269 users = resolve_users(args, log) 270 if not users: 271 log.info("nenhum usuário candidato encontrado") 272 return 0 273 274 failures = 0 275 changed_count = 0 276 for username in users: 277 log.info("--- reparando %s", username) 278 try: 279 changed, warnings = repair_one(username, dry_run=bool(args.dry_run), log=log) 280 except KeyError: 281 log.warning("%s: ausente em passwd; ignorado", username) 282 continue 283 except Exception as e: 284 failures += 1 285 log.error("%s: falha: %s", username, e) 286 continue 287 for warning in warnings: 288 log.warning("%s: %s", username, warning) 289 if changed: 290 changed_count += 1 291 else: 292 log.info("%s: já estava ok", username) 293 294 print("========== repair_user — resumo ==========") 295 print(f"Modo: {'DRY-RUN' if args.dry_run else 'aplicação'}") 296 print(f"Usuários processados: {len(users)}") 297 print(f"Usuários alterados: {changed_count}") 298 print(f"Falhas: {failures}") 299 print("==========================================") 300 return 1 if failures else 0 301 302 303 if __name__ == "__main__": 304 raise SystemExit(main())