runv-server

server tooling for runv.club
Log | Files | Refs | README

repair_user.py (9487B)


      1 #!/usr/bin/env python3
      2 """
      3 Repara artefatos básicos de membros runv sem sobrescrever conteúdo existente.
      4 
      5 Uso típico:
      6   sudo python3 scripts/admin/repair_user.py --user kirihito
      7   sudo python3 scripts/admin/repair_user.py --all-users --dry-run --verbose
      8 
      9 O script é intencionalmente conservador:
     10 - corrige dono/modo da home e dos diretórios públicos esperados;
     11 - cria diretórios ausentes;
     12 - cria index.html, gophermap e index.gmi somente se estiverem ausentes;
     13 - não faz chown recursivo e não toca em /var/vmail, email, Dovecot ou Maildir.
     14 """
     15 
     16 from __future__ import annotations
     17 
     18 import argparse
     19 import json
     20 import logging
     21 import os
     22 import pwd
     23 import sys
     24 from pathlib import Path
     25 from typing import Final
     26 
     27 _SCRIPT_DIR = Path(__file__).resolve().parent
     28 if str(_SCRIPT_DIR) not in sys.path:
     29     sys.path.insert(0, str(_SCRIPT_DIR))
     30 
     31 from admin_guard import ensure_admin_cli
     32 
     33 import create_runv_user
     34 
     35 DEFAULT_USERS_JSON: Final[Path] = Path("/var/lib/runv/users.json")
     36 SKIP_USERS: Final[set[str]] = {
     37     "root",
     38     "daemon",
     39     "bin",
     40     "sys",
     41     "sync",
     42     "games",
     43     "man",
     44     "lp",
     45     "mail",
     46     "news",
     47     "uucp",
     48     "proxy",
     49     "www-data",
     50     "backup",
     51     "list",
     52     "irc",
     53     "gnats",
     54     "nobody",
     55     "systemd-network",
     56     "systemd-resolve",
     57     "messagebus",
     58     "polkitd",
     59     "sshd",
     60     "entre",
     61     "pmurad-admin",
     62     "vmail",
     63 }
     64 
     65 
     66 def setup_logging(verbose: bool) -> logging.Logger:
     67     log = logging.getLogger("repair_user")
     68     log.setLevel(logging.DEBUG if verbose else logging.INFO)
     69     log.handlers.clear()
     70     handler = logging.StreamHandler(sys.stderr)
     71     handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
     72     log.addHandler(handler)
     73     return log
     74 
     75 
     76 def require_root(dry_run: bool, log: logging.Logger) -> None:
     77     if dry_run:
     78         return
     79     if os.geteuid() != 0:
     80         log.error("execute como root (ou use --dry-run)")
     81         raise SystemExit(2)
     82 
     83 
     84 def user_is_member_candidate(pw: pwd.struct_passwd) -> bool:
     85     if pw.pw_name in SKIP_USERS:
     86         return False
     87     if pw.pw_uid < 1000:
     88         return False
     89     if not pw.pw_dir.startswith("/home/"):
     90         return False
     91     return True
     92 
     93 
     94 def users_from_metadata(path: Path, log: logging.Logger) -> set[str]:
     95     if not path.is_file():
     96         log.debug("metadata ausente: %s", path)
     97         return set()
     98     try:
     99         data = json.loads(path.read_text(encoding="utf-8"))
    100     except (OSError, json.JSONDecodeError) as e:
    101         log.warning("não foi possível ler %s: %s", path, e)
    102         return set()
    103     if not isinstance(data, list):
    104         log.warning("%s não contém lista de membros", path)
    105         return set()
    106     names: set[str] = set()
    107     for item in data:
    108         if not isinstance(item, dict):
    109             continue
    110         username = str(item.get("username", "")).strip()
    111         if username and username not in SKIP_USERS:
    112             names.add(username)
    113     return names
    114 
    115 
    116 def resolve_users(args: argparse.Namespace, log: logging.Logger) -> list[str]:
    117     if args.user:
    118         return [args.user.strip()]
    119 
    120     names = users_from_metadata(args.users_json, log)
    121     for pw in pwd.getpwall():
    122         if user_is_member_candidate(pw):
    123             names.add(pw.pw_name)
    124     return sorted(names)
    125 
    126 
    127 def chmod_chown(path: Path, mode: int, uid: int, gid: int, *, dry_run: bool, log: logging.Logger) -> bool:
    128     changed = False
    129     st = path.stat()
    130     cur_mode = st.st_mode & 0o777
    131     if cur_mode != mode:
    132         changed = True
    133         if dry_run:
    134             log.info("[dry-run] chmod %03o %s (era %03o)", mode, path, cur_mode)
    135         else:
    136             os.chmod(path, mode)
    137             log.info("chmod %03o %s", mode, path)
    138     if st.st_uid != uid or st.st_gid != gid:
    139         changed = True
    140         if dry_run:
    141             log.info("[dry-run] chown %s:%s %s", uid, gid, path)
    142         else:
    143             os.chown(path, uid, gid)
    144             log.info("chown %s:%s %s", uid, gid, path)
    145     return changed
    146 
    147 
    148 def ensure_dir(path: Path, mode: int, uid: int, gid: int, *, dry_run: bool, log: logging.Logger) -> bool:
    149     changed = False
    150     if path.exists() and not path.is_dir():
    151         raise RuntimeError(f"{path} existe mas não é diretório")
    152     if not path.exists():
    153         changed = True
    154         if dry_run:
    155             log.info("[dry-run] mkdir -p %s", path)
    156         else:
    157             path.mkdir(parents=True, exist_ok=True)
    158             log.info("criado diretório %s", path)
    159     if path.exists():
    160         changed = chmod_chown(path, mode, uid, gid, dry_run=dry_run, log=log) or changed
    161     return changed
    162 
    163 
    164 def ensure_file(
    165     path: Path,
    166     body: str,
    167     mode: int,
    168     uid: int,
    169     gid: int,
    170     *,
    171     dry_run: bool,
    172     log: logging.Logger,
    173 ) -> bool:
    174     changed = False
    175     if path.exists() and not path.is_file():
    176         raise RuntimeError(f"{path} existe mas não é arquivo regular")
    177     if not path.exists():
    178         changed = True
    179         if dry_run:
    180             log.info("[dry-run] criaria %s", path)
    181         else:
    182             path.write_text(body, encoding="utf-8")
    183             log.info("criado arquivo %s", path)
    184     if path.exists():
    185         changed = chmod_chown(path, mode, uid, gid, dry_run=dry_run, log=log) or changed
    186     return changed
    187 
    188 
    189 def repair_one(username: str, *, dry_run: bool, log: logging.Logger) -> tuple[bool, list[str]]:
    190     warnings: list[str] = []
    191     pw = pwd.getpwnam(username)
    192     uid, gid = pw.pw_uid, pw.pw_gid
    193     home = Path(pw.pw_dir)
    194     changed = False
    195 
    196     if username in SKIP_USERS:
    197         warnings.append("usuário reservado ignorado")
    198         return False, warnings
    199     if not home.exists():
    200         raise RuntimeError(f"home ausente: {home}")
    201     if not home.is_dir():
    202         raise RuntimeError(f"home não é diretório: {home}")
    203     if not str(home).startswith("/home/"):
    204         raise RuntimeError(f"home fora de /home; reparo recusado: {home}")
    205 
    206     changed = chmod_chown(home, 0o755, uid, gid, dry_run=dry_run, log=log) or changed
    207     changed = ensure_dir(home / ".ssh", 0o700, uid, gid, dry_run=dry_run, log=log) or changed
    208     auth = home / ".ssh" / "authorized_keys"
    209     if auth.exists():
    210         changed = chmod_chown(auth, 0o600, uid, gid, dry_run=dry_run, log=log) or changed
    211     else:
    212         warnings.append("authorized_keys ausente; não criado porque falta chave pública")
    213 
    214     changed = ensure_dir(home / "public_html", 0o755, uid, gid, dry_run=dry_run, log=log) or changed
    215     changed = ensure_file(
    216         home / "public_html" / "index.html",
    217         create_runv_user.default_index_html(username),
    218         0o644,
    219         uid,
    220         gid,
    221         dry_run=dry_run,
    222         log=log,
    223     ) or changed
    224 
    225     changed = ensure_dir(home / "public_gopher", 0o755, uid, gid, dry_run=dry_run, log=log) or changed
    226     changed = ensure_file(
    227         home / "public_gopher" / "gophermap",
    228         create_runv_user.default_gophermap_text(username),
    229         0o644,
    230         uid,
    231         gid,
    232         dry_run=dry_run,
    233         log=log,
    234     ) or changed
    235 
    236     changed = ensure_dir(home / "public_gemini", 0o755, uid, gid, dry_run=dry_run, log=log) or changed
    237     changed = ensure_file(
    238         home / "public_gemini" / "index.gmi",
    239         create_runv_user.default_gemini_index_gmi(username),
    240         0o644,
    241         uid,
    242         gid,
    243         dry_run=dry_run,
    244         log=log,
    245     ) or changed
    246 
    247     return changed, warnings
    248 
    249 
    250 def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
    251     p = argparse.ArgumentParser(
    252         description="Repara diretórios e páginas iniciais ausentes de membros runv.",
    253     )
    254     target = p.add_mutually_exclusive_group(required=True)
    255     target.add_argument("--user", metavar="USER", help="repara apenas um usuário")
    256     target.add_argument("--all-users", action="store_true", help="repara candidatos em users.json e /home")
    257     p.add_argument("--users-json", type=Path, default=DEFAULT_USERS_JSON)
    258     p.add_argument("--dry-run", action="store_true", help="mostra sem alterar")
    259     p.add_argument("--verbose", "-v", action="store_true", help="log detalhado")
    260     return p.parse_args(argv)
    261 
    262 
    263 def main(argv: list[str] | None = None) -> int:
    264     args = parse_args(argv)
    265     ensure_admin_cli(script_name=Path(__file__).name, dry_run=bool(args.dry_run))
    266     log = setup_logging(args.verbose)
    267     require_root(bool(args.dry_run), log)
    268 
    269     users = resolve_users(args, log)
    270     if not users:
    271         log.info("nenhum usuário candidato encontrado")
    272         return 0
    273 
    274     failures = 0
    275     changed_count = 0
    276     for username in users:
    277         log.info("--- reparando %s", username)
    278         try:
    279             changed, warnings = repair_one(username, dry_run=bool(args.dry_run), log=log)
    280         except KeyError:
    281             log.warning("%s: ausente em passwd; ignorado", username)
    282             continue
    283         except Exception as e:
    284             failures += 1
    285             log.error("%s: falha: %s", username, e)
    286             continue
    287         for warning in warnings:
    288             log.warning("%s: %s", username, warning)
    289         if changed:
    290             changed_count += 1
    291         else:
    292             log.info("%s: já estava ok", username)
    293 
    294     print("========== repair_user — resumo ==========")
    295     print(f"Modo: {'DRY-RUN' if args.dry_run else 'aplicação'}")
    296     print(f"Usuários processados: {len(users)}")
    297     print(f"Usuários alterados: {changed_count}")
    298     print(f"Falhas: {failures}")
    299     print("==========================================")
    300     return 1 if failures else 0
    301 
    302 
    303 if __name__ == "__main__":
    304     raise SystemExit(main())