runv-server

server tooling for runv.club
Log | Files | Refs | README

setup_email_aliases.py (8104B)


      1 #!/usr/bin/env python3
      2 """
      3 Prepara diretórios, permissões e grupo para pedidos de alias de email runv.club.
      4 
      5 Não configura Postfix, Dovecot, Mailgun nem DNS.
      6 """
      7 
      8 from __future__ import annotations
      9 
     10 import argparse
     11 import logging
     12 import os
     13 import subprocess
     14 import sys
     15 from pathlib import Path
     16 
     17 _SCRIPT_DIR = Path(__file__).resolve().parent
     18 _REPO_TOOLS_LIB = _SCRIPT_DIR.parent.parent / "tools" / "lib"
     19 if str(_REPO_TOOLS_LIB) not in sys.path:
     20     sys.path.insert(0, str(_REPO_TOOLS_LIB))
     21 
     22 import runv_community as rc  # noqa: E402
     23 
     24 DEFAULT_GROUP = "runv-members"
     25 VAR_LIB_RUNV = Path("/var/lib/runv")
     26 ALIASES_JSON = VAR_LIB_RUNV / "email-aliases.json"
     27 ALIASES_LOCK = VAR_LIB_RUNV / "email-aliases.lock"
     28 QUEUE_DIR = VAR_LIB_RUNV / "email-alias-queue"
     29 USERS_JSON = VAR_LIB_RUNV / "users.json"
     30 
     31 
     32 def _run(cmd: list[str], *, dry_run: bool, log: logging.Logger) -> subprocess.CompletedProcess[str]:
     33     log.info("exec: %s", " ".join(cmd))
     34     if dry_run:
     35         return subprocess.CompletedProcess(cmd, 0, "", "")
     36     return subprocess.run(cmd, capture_output=True, text=True, timeout=120)
     37 
     38 
     39 def require_root(dry_run: bool) -> None:
     40     if dry_run:
     41         return
     42     geteuid = getattr(os, "geteuid", None)
     43     if geteuid is None or geteuid() != 0:
     44         print("este script precisa ser executado como root (ou use --dry-run).", file=sys.stderr)
     45         raise SystemExit(1)
     46 
     47 
     48 def group_exists(name: str) -> bool:
     49     try:
     50         import grp
     51     except ModuleNotFoundError:
     52         return False
     53     try:
     54         grp.getgrnam(name)
     55         return True
     56     except KeyError:
     57         return False
     58 
     59 
     60 def ensure_group(name: str, *, dry_run: bool, log: logging.Logger) -> None:
     61     if dry_run:
     62         print(f"[dry-run] groupadd {name} (se não existir)")
     63         return
     64     if group_exists(name):
     65         log.info("grupo %s já existe", name)
     66         return
     67     r = _run(["groupadd", name], dry_run=dry_run, log=log)
     68     if dry_run:
     69         print(f"[dry-run] groupadd {name}")
     70         return
     71     if r.returncode != 0:
     72         err = (r.stderr or r.stdout or "").strip()
     73         print(f"groupadd {name} falhou: {err}", file=sys.stderr)
     74         raise SystemExit(1)
     75     log.info("grupo %s criado", name)
     76 
     77 
     78 def ensure_dir(path: Path, mode: int, *, dry_run: bool, log: logging.Logger) -> None:
     79     if dry_run:
     80         print(f"[dry-run] mkdir {path} mode {oct(mode)}")
     81         return
     82     path.mkdir(parents=True, exist_ok=True)
     83     os.chmod(path, mode)
     84     log.info("directório %s (%s)", path, oct(mode))
     85 
     86 
     87 def ensure_file(path: Path, default_content: str, mode: int, *, dry_run: bool, log: logging.Logger) -> None:
     88     if dry_run:
     89         action = "criar" if not path.is_file() else "manter"
     90         print(f"[dry-run] {action} {path}")
     91         return
     92     path.parent.mkdir(parents=True, exist_ok=True)
     93     if not path.is_file():
     94         path.write_text(default_content, encoding="utf-8")
     95         log.info("ficheiro criado: %s", path)
     96     else:
     97         raw = path.read_text(encoding="utf-8").strip()
     98         if not raw:
     99             path.write_text(default_content, encoding="utf-8")
    100             log.info("ficheiro inicializado: %s", path)
    101         else:
    102             log.info("ficheiro existente preservado: %s", path)
    103     os.chmod(path, mode)
    104 
    105 
    106 def chown_path(path: Path, user: str, group: str, *, dry_run: bool, log: logging.Logger) -> None:
    107     if dry_run:
    108         print(f"[dry-run] chown {user}:{group} {path}")
    109         return
    110     import grp
    111     import pwd
    112 
    113     try:
    114         uid = pwd.getpwnam(user).pw_uid
    115         gid = grp.getgrnam(group).gr_gid
    116         os.chown(path, uid, gid)
    117         log.info("chown %s:%s %s", user, group, path)
    118     except (KeyError, OSError) as e:
    119         log.warning("não foi possível chown %s: %s", path, e)
    120 
    121 
    122 def apply_permissions(group: str, *, dry_run: bool, log: logging.Logger) -> None:
    123     ensure_dir(VAR_LIB_RUNV, 0o755, dry_run=dry_run, log=log)
    124     if not dry_run:
    125         try:
    126             os.chown(VAR_LIB_RUNV, 0, 0)
    127         except OSError:
    128             pass
    129 
    130     ensure_file(ALIASES_JSON, "{}\n", 0o640, dry_run=dry_run, log=log)
    131     ensure_file(ALIASES_LOCK, "", 0o660, dry_run=dry_run, log=log)
    132 
    133     for sub in ("", "approved", "rejected", "cancelled"):
    134         d = QUEUE_DIR if not sub else QUEUE_DIR / sub
    135         ensure_dir(d, 0o2770, dry_run=dry_run, log=log)
    136 
    137     if dry_run:
    138         print(f"[dry-run] chown root:{group} em aliases e fila")
    139         return
    140 
    141     chown_path(ALIASES_JSON, "root", group, dry_run=False, log=log)
    142     chown_path(ALIASES_LOCK, "root", group, dry_run=False, log=log)
    143     for sub in ("", "approved", "rejected", "cancelled"):
    144         d = QUEUE_DIR if not sub else QUEUE_DIR / sub
    145         chown_path(d, "root", group, dry_run=False, log=log)
    146 
    147 
    148 def add_existing_users(group: str, *, dry_run: bool, log: logging.Logger) -> None:
    149     if not USERS_JSON.is_file():
    150         print(f"aviso: {USERS_JSON} não encontrado; --add-existing-users ignorado.")
    151         return
    152     names, warning = rc.load_member_usernames(USERS_JSON, rc.DEFAULT_HOME_ROOT)
    153     if warning:
    154         print(warning)
    155     if not names:
    156         print("aviso: nenhum username encontrado em users.json.")
    157         return
    158     import pwd
    159 
    160     for username in names:
    161         if dry_run:
    162             print(f"[dry-run] usermod -aG {group} {username}")
    163             continue
    164         try:
    165             pwd.getpwnam(username)
    166         except KeyError:
    167             print(f"aviso: utilizador Unix {username!r} não existe; ignorado.")
    168             continue
    169         r = _run(["usermod", "-aG", group, username], dry_run=False, log=log)
    170         if r.returncode != 0:
    171             err = (r.stderr or r.stdout or "").strip()
    172             print(f"aviso: usermod -aG {group} {username}: {err}")
    173         else:
    174             log.info("utilizador %s adicionado ao grupo %s", username, group)
    175 
    176 
    177 def print_final_instructions(repo_root: Path) -> None:
    178     tools_dir = repo_root / "tools"
    179     print()
    180     print("Setup de aliases de email concluído.\n")
    181     print("Para instalar comandos:")
    182     print(f"  cd {tools_dir}")
    183     print("  sudo python3 tools.py\n")
    184     print("Para testar como utilizador:")
    185     print("  runv-email-alias request seu-email@exemplo.com")
    186     print("  runv-email-alias status\n")
    187     print("Para admin:")
    188     print("  sudo runv-admin-email-alias pending")
    189     print("  sudo runv-admin-email-alias approve USER\n")
    190     print(
    191         "Se o servidor não usar o grupo runv-members, pode escolher outro com --group NOME."
    192     )
    193 
    194 
    195 def build_parser() -> argparse.ArgumentParser:
    196     p = argparse.ArgumentParser(
    197         description="Prepara /var/lib/runv para pedidos de alias de email (sem MTA).",
    198     )
    199     p.add_argument(
    200         "--dry-run",
    201         action="store_true",
    202         help="mostrar acções sem alterar o sistema",
    203     )
    204     p.add_argument(
    205         "--group",
    206         default=DEFAULT_GROUP,
    207         metavar="NOME",
    208         help=f"grupo Unix com acesso à fila (padrão: {DEFAULT_GROUP})",
    209     )
    210     p.add_argument(
    211         "--add-existing-users",
    212         action="store_true",
    213         help="adicionar usernames de /var/lib/runv/users.json ao grupo",
    214     )
    215     p.add_argument("-v", "--verbose", action="store_true", help="mais detalhe no log")
    216     return p
    217 
    218 
    219 def main(argv: list[str] | None = None) -> int:
    220     args = build_parser().parse_args(argv)
    221     logging.basicConfig(
    222         level=logging.DEBUG if args.verbose else logging.INFO,
    223         format="%(levelname)s: %(message)s",
    224     )
    225     log = logging.getLogger("setup_email_aliases")
    226     require_root(bool(args.dry_run))
    227     group = args.group.strip()
    228     if not group:
    229         print("nome de grupo inválido.", file=sys.stderr)
    230         return 1
    231 
    232     ensure_group(group, dry_run=bool(args.dry_run), log=log)
    233     apply_permissions(group, dry_run=bool(args.dry_run), log=log)
    234     if args.add_existing_users:
    235         add_existing_users(group, dry_run=bool(args.dry_run), log=log)
    236 
    237     repo_root = _SCRIPT_DIR.parent.parent
    238     if not args.dry_run:
    239         print_final_instructions(repo_root)
    240     else:
    241         print("\n[dry-run] nenhuma alteração aplicada.")
    242     return 0
    243 
    244 
    245 if __name__ == "__main__":
    246     raise SystemExit(main())