runv-server

server tooling for runv.club
Log | Files | Refs | README

configure_msmtp_legacy.py (15630B)


      1 #!/usr/bin/env python3
      2 """
      3 LEGADO — Instalador/configurador runv.club: envio via msmtp + sendmail (Debian 13).
      4 
      5 O caminho predefinido do projeto é Mailgun API (`configure_mailgun.py`).
      6 Use este script apenas se precisar de SMTP local/msmtp.
      7 
      8 Executar como root. Ver docs/08-email.md no repositório.
      9 """
     10 
     11 from __future__ import annotations
     12 
     13 import argparse
     14 import json
     15 import logging
     16 import os
     17 import re
     18 import shutil
     19 import subprocess
     20 import sys
     21 import time
     22 from getpass import getpass
     23 from pathlib import Path
     24 from typing import Any
     25 
     26 # Caminhos no sistema
     27 MSMPTRC_PATH = Path("/etc/msmtprc")
     28 ALIASES_PATH = Path("/etc/msmtp_aliases")
     29 NETRC_PATH = Path("/root/.netrc")
     30 STATE_PATH = Path("/etc/runv-email.json")
     31 PASS_SCRIPT_DIR = Path("/usr/local/lib/runv-email")
     32 PASS_SCRIPT_DEST = PASS_SCRIPT_DIR / "netrc_password.py"
     33 LOGFILE_MSMT = Path("/var/log/msmtp.log")
     34 
     35 MODULE_ROOT = Path(__file__).resolve().parent
     36 ADMIN_DIR = MODULE_ROOT.parent / "scripts" / "admin"
     37 if str(ADMIN_DIR) not in sys.path:
     38     sys.path.insert(0, str(ADMIN_DIR))
     39 
     40 from admin_guard import ensure_admin_cli
     41 
     42 SOURCE_PASS_SCRIPT = MODULE_ROOT / "scripts" / "netrc_password.py"
     43 
     44 APT_PACKAGES = ("msmtp", "msmtp-mta", "ca-certificates", "bsd-mailx")
     45 
     46 ACCOUNT_NAME = "runv"
     47 
     48 
     49 def setup_logging(verbose: bool) -> None:
     50     level = logging.DEBUG if verbose else logging.INFO
     51     logging.basicConfig(level=level, format="%(levelname)s: %(message)s")
     52 
     53 
     54 def log() -> logging.Logger:
     55     return logging.getLogger("runv-email-legacy-smtp")
     56 
     57 
     58 def require_root() -> None:
     59     if os.geteuid() != 0:
     60         print("Execute como root (sudo).", file=sys.stderr)
     61         raise SystemExit(1)
     62 
     63 
     64 def run_cmd(
     65     cmd: list,
     66     *,
     67     dry_run: bool,
     68     timeout: int = 600,
     69 ) -> subprocess.CompletedProcess[str] | None:
     70     log().debug("exec: %s", " ".join(cmd))
     71     if dry_run:
     72         log().info("[dry-run] %s", " ".join(cmd))
     73         return None
     74     return subprocess.run(
     75         cmd,
     76         capture_output=True,
     77         text=True,
     78         timeout=timeout,
     79     )
     80 
     81 
     82 def apt_install(dry_run: bool) -> None:
     83     r = run_cmd(["apt-get", "update", "-qq"], dry_run=dry_run)
     84     if r is not None and r.returncode != 0:
     85         log().warning("apt-get update: código %s — %s", r.returncode, r.stderr.strip())
     86     r2 = run_cmd(
     87         ["apt-get", "install", "-y", *APT_PACKAGES],
     88         dry_run=dry_run,
     89     )
     90     if r2 is not None and r2.returncode != 0:
     91         raise RuntimeError(f"apt-get install falhou: {r2.stderr or r2.stdout}")
     92 
     93 
     94 def backup_if_exists(path: Path, *, dry_run: bool, force: bool) -> Path | None:
     95     if not path.is_file():
     96         return None
     97     bak = path.with_name(f"{path.name}.bak.{int(time.time())}")
     98     if dry_run:
     99         log().info("[dry-run] backup seria: %s -> %s", path, bak)
    100         return bak
    101     shutil.copy2(path, bak)
    102     log().info("Backup: %s", bak)
    103     return bak
    104 
    105 
    106 def confirm_overwrite(path: Path, *, force: bool) -> bool:
    107     if force:
    108         return True
    109     if not path.is_file():
    110         return True
    111     r = input(f"O ficheiro {path} já existe. Sobrescrever? [s/N]: ").strip().lower()
    112     return r in ("s", "sim", "y", "yes")
    113 
    114 
    115 def _remove_netrc_machine_block(text: str, host: str) -> str:
    116     """Remove o bloco que começa em 'machine <host>' até à linha antes do próximo 'machine '."""
    117     host_line = re.compile(rf"^machine\s+{re.escape(host)}\s*$", re.MULTILINE)
    118     next_machine = re.compile(r"^machine\s+", re.MULTILINE)
    119     lines = text.splitlines()
    120     out = []
    121     i = 0
    122     while i < len(lines):
    123         if host_line.match(lines[i]):
    124             i += 1
    125             while i < len(lines) and not next_machine.match(lines[i]):
    126                 i += 1
    127             continue
    128         out.append(lines[i])
    129         i += 1
    130     return "\n".join(out)
    131 
    132 
    133 def upsert_netrc_machine(host: str, login: str, password: str, *, dry_run: bool) -> None:
    134     """Atualiza ou acrescenta bloco machine HOST em /root/.netrc."""
    135     block = f"machine {host}\nlogin {login}\npassword {password}\n"
    136     if dry_run:
    137         log().info("[dry-run] atualizaria .netrc para machine %s", host)
    138         return
    139 
    140     existing = ""
    141     if NETRC_PATH.is_file():
    142         existing = NETRC_PATH.read_text(encoding="utf-8", errors="replace")
    143 
    144     stripped = _remove_netrc_machine_block(existing, host).rstrip()
    145     new_text = (stripped + "\n\n" + block if stripped else block).rstrip() + "\n"
    146 
    147     NETRC_PATH.parent.mkdir(parents=True, exist_ok=True)
    148     NETRC_PATH.write_text(new_text, encoding="utf-8")
    149     os.chmod(NETRC_PATH, 0o600)
    150     try:
    151         os.chown(NETRC_PATH, 0, 0)
    152     except OSError:
    153         pass
    154     log().info("Escrito %s (0600)", NETRC_PATH)
    155 
    156 
    157 def install_passwordeval_script(*, dry_run: bool) -> None:
    158     if not SOURCE_PASS_SCRIPT.is_file():
    159         raise FileNotFoundError(f"script em falta no módulo: {SOURCE_PASS_SCRIPT}")
    160     if dry_run:
    161         log().info("[dry-run] copiaria netrc_password.py para %s", PASS_SCRIPT_DEST)
    162         return
    163     PASS_SCRIPT_DIR.mkdir(parents=True, mode=0o755, exist_ok=True)
    164     shutil.copy2(SOURCE_PASS_SCRIPT, PASS_SCRIPT_DEST)
    165     PASS_SCRIPT_DEST.chmod(0o755)
    166     try:
    167         os.chown(PASS_SCRIPT_DEST, 0, 0)
    168     except OSError:
    169         pass
    170     log().info("Instalado %s", PASS_SCRIPT_DEST)
    171 
    172 
    173 def build_msmtprc(
    174     *,
    175     host: str,
    176     port: int,
    177     tls_on: bool,
    178     starttls_on: bool,
    179     auth_on: bool,
    180     user: str,
    181     default_from: str,
    182     use_aliases: bool,
    183 ) -> str:
    184     lines = [
    185         "# Gerido por runv.club configure_msmtp_legacy.py — não editar à mão sem cópia de segurança",
    186         "",
    187         "defaults",
    188         f"tls_trust_file /etc/ssl/certs/ca-certificates.crt",
    189         f"logfile {LOGFILE_MSMT}",
    190         "",
    191         f"account {ACCOUNT_NAME}",
    192         f"host {host}",
    193         f"port {port}",
    194         f"from {default_from}",
    195         "tls           " + ("on" if tls_on else "off"),
    196         "tls_starttls  " + ("on" if starttls_on else "off"),
    197     ]
    198     if auth_on and user:
    199         lines.append("auth on")
    200         lines.append(f"user {user}")
    201         lines.append(f"passwordeval {PASS_SCRIPT_DEST} {host}")
    202     else:
    203         lines.append("auth off")
    204 
    205     if use_aliases:
    206         lines.append(f"aliases {ALIASES_PATH}")
    207 
    208     lines.extend(
    209         [
    210             "",
    211             f"account default : {ACCOUNT_NAME}",
    212             "",
    213         ]
    214     )
    215     return "\n".join(lines)
    216 
    217 
    218 def write_msmtprc(content: str, *, dry_run: bool) -> None:
    219     if dry_run:
    220         log().info("[dry-run] escreveria %s", MSMPTRC_PATH)
    221         log().debug("%s", content)
    222         return
    223     MSMPTRC_PATH.write_text(content, encoding="utf-8")
    224     os.chmod(MSMPTRC_PATH, 0o600)
    225     try:
    226         os.chown(MSMPTRC_PATH, 0, 0)
    227     except OSError:
    228         pass
    229     log().info("Escrito %s (0600)", MSMPTRC_PATH)
    230 
    231 
    232 def write_aliases(admin_email: str, *, dry_run: bool) -> None:
    233     body = (
    234         f"# Gerido por runv.club configure_msmtp_legacy.py — formato msmtp (não Sendmail)\n"
    235         f"root: {admin_email}\n"
    236         f"cron: {admin_email}\n"
    237         f"default: {admin_email}\n"
    238     )
    239     if dry_run:
    240         log().info("[dry-run] escreveria %s", ALIASES_PATH)
    241         return
    242     backup_if_exists(ALIASES_PATH, dry_run=False, force=True)
    243     ALIASES_PATH.write_text(body, encoding="utf-8")
    244     os.chmod(ALIASES_PATH, 0o644)
    245     try:
    246         os.chown(ALIASES_PATH, 0, 0)
    247     except OSError:
    248         pass
    249     log().info("Escrito %s (0644)", ALIASES_PATH)
    250 
    251 
    252 def write_state(data: dict[str, Any], *, dry_run: bool) -> None:
    253     if dry_run:
    254         log().info("[dry-run] escreveria %s", STATE_PATH)
    255         return
    256     STATE_PATH.write_text(
    257         json.dumps(data, indent=2, ensure_ascii=False) + "\n",
    258         encoding="utf-8",
    259     )
    260     os.chmod(STATE_PATH, 0o600)
    261     try:
    262         os.chown(STATE_PATH, 0, 0)
    263     except OSError:
    264         pass
    265     log().info("Metadados em %s (sem segredos SMTP em texto claro — use .netrc)", STATE_PATH)
    266 
    267 
    268 def touch_logfile(*, dry_run: bool) -> None:
    269     if dry_run:
    270         return
    271     LOGFILE_MSMT.parent.mkdir(parents=True, exist_ok=True)
    272     if not LOGFILE_MSMT.exists():
    273         LOGFILE_MSMT.touch(mode=0o640)
    274     try:
    275         os.chown(LOGFILE_MSMT, 0, 0)
    276     except OSError:
    277         pass
    278 
    279 
    280 def load_state() -> dict[str, Any]:
    281     if not STATE_PATH.is_file():
    282         raise FileNotFoundError(
    283             f"Estado não encontrado: {STATE_PATH}. Execute configure_msmtp_legacy.py sem --test primeiro.",
    284         )
    285     return json.loads(STATE_PATH.read_text(encoding="utf-8"))
    286 
    287 
    288 def run_test_send(*, dry_run: bool) -> None:
    289     state = load_state()
    290     admin = str(state.get("admin_email", "")).strip()
    291     from_addr = str(state.get("default_from", "")).strip()
    292     if not admin or not from_addr:
    293         raise ValueError("admin_email ou default_from em falta no estado")
    294 
    295     sys.path.insert(0, str(MODULE_ROOT))
    296     from lib.mailer import render_template, send_mail  # type: ignore
    297 
    298     body = render_template(
    299         "system_test",
    300         admin_email=admin,
    301         default_from=from_addr,
    302         host=state.get("smtp_host", ""),
    303         api_base_url="(modo SMTP legado — não aplicável)",
    304         timestamp=str(int(time.time())),
    305     )
    306     subj = "[runv.club] Email de teste do sistema (SMTP legado)"
    307     if dry_run:
    308         log().info("[dry-run] enviaria teste para %s", admin)
    309         return
    310     send_mail(admin, subj, body, from_addr=from_addr, _state=state)
    311     log().info("Email de teste enviado para %s", admin)
    312 
    313 
    314 def prompt_yes_no(msg: str, default_no: bool = True) -> bool:
    315     suf = " [s/N]: " if default_no else " [S/n]: "
    316     r = input(msg + suf).strip().lower()
    317     if not r:
    318         return not default_no
    319     return r in ("s", "sim", "y", "yes")
    320 
    321 
    322 def prompt_line(msg: str, default: str = "") -> str:
    323     d = f" [{default}]" if default else ""
    324     r = input(f"{msg}{d}: ").strip()
    325     return r if r else default
    326 
    327 
    328 def interactive_config() -> dict[str, Any]:
    329     print("\n=== LEGADO: Configuração SMTP (msmtp + sendmail) ===\n")
    330     print("Nota: o caminho recomendado é Mailgun API (configure_mailgun.py).\n")
    331     host = prompt_line("Host SMTP")
    332     if not host:
    333         raise ValueError("Host SMTP obrigatório.")
    334 
    335     port_s = prompt_line("Porta SMTP", "587")
    336     port = int(port_s) if port_s.isdigit() else 587
    337 
    338     tls_on = prompt_yes_no("Usar TLS (tls)?", default_no=False)
    339     starttls_on = prompt_yes_no("Usar STARTTLS (tls_starttls)?", default_no=False)
    340     auth_on = prompt_yes_no("Autenticação SMTP (usuário/senha)?", default_no=False)
    341 
    342     user = ""
    343     if auth_on:
    344         user = prompt_line("Utilizador SMTP (login)")
    345         if not user:
    346             raise ValueError("Com auth on, o utilizador SMTP é obrigatório.")
    347 
    348     default_from = prompt_line("Remetente padrão (From)")
    349     if not default_from or "@" not in default_from:
    350         raise ValueError("Remetente (From) deve ser um endereço de email válido.")
    351 
    352     admin_email = prompt_line("Email do administrador (notificações)")
    353     if not admin_email or "@" not in admin_email:
    354         raise ValueError("Email do admin inválido.")
    355 
    356     password = ""
    357     if auth_on:
    358         p1 = getpass("Senha ou token SMTP (não ecoa): ")
    359         p2 = getpass("Repita a senha: ")
    360         if p1 != p2:
    361             raise ValueError("Senhas não coincidem.")
    362         password = p1
    363 
    364     return {
    365         "smtp_host": host,
    366         "smtp_port": port,
    367         "tls_on": tls_on,
    368         "starttls_on": starttls_on,
    369         "auth_on": auth_on,
    370         "smtp_user": user,
    371         "smtp_password": password,
    372         "default_from": default_from,
    373         "admin_email": admin_email,
    374     }
    375 
    376 
    377 def main() -> int:
    378     parser = argparse.ArgumentParser(
    379         description="LEGADO: instala msmtp/sendmail e configura SMTP runv.club.",
    380     )
    381     parser.add_argument("--dry-run", action="store_true")
    382     parser.add_argument("--verbose", "-v", action="store_true")
    383     parser.add_argument("--force", "-f", action="store_true", help="sobrescrever sem perguntar")
    384     parser.add_argument(
    385         "--test",
    386         action="store_true",
    387         help="enviar apenas email de teste (requer config e %s)" % STATE_PATH,
    388     )
    389     parser.add_argument("--skip-apt", action="store_true", help="não executar apt-get")
    390     args = parser.parse_args()
    391     ensure_admin_cli(
    392         script_name=Path(__file__).name,
    393         dry_run=bool(args.dry_run),
    394     )
    395 
    396     setup_logging(args.verbose)
    397     require_root()
    398 
    399     try:
    400         if args.test:
    401             run_test_send(dry_run=args.dry_run)
    402             print("Teste concluído.")
    403             return 0
    404 
    405         if not args.skip_apt:
    406             apt_install(args.dry_run)
    407 
    408         touch_logfile(dry_run=args.dry_run)
    409         install_passwordeval_script(dry_run=args.dry_run)
    410 
    411         cfg = interactive_config()
    412 
    413         if not confirm_overwrite(MSMPTRC_PATH, force=args.force):
    414             print("Cancelado.")
    415             return 1
    416         backup_if_exists(MSMPTRC_PATH, dry_run=args.dry_run, force=args.force)
    417 
    418         if cfg["auth_on"]:
    419             if not cfg.get("smtp_password"):
    420                 raise ValueError("Com autenticação ligada, a senha/token é obrigatório.")
    421             if not confirm_overwrite(NETRC_PATH, force=args.force):
    422                 print("Cancelado.")
    423                 return 1
    424             backup_if_exists(NETRC_PATH, dry_run=args.dry_run, force=args.force)
    425             upsert_netrc_machine(
    426                 cfg["smtp_host"],
    427                 cfg["smtp_user"],
    428                 cfg["smtp_password"],
    429                 dry_run=args.dry_run,
    430             )
    431 
    432         mc = build_msmtprc(
    433             host=cfg["smtp_host"],
    434             port=int(cfg["smtp_port"]),
    435             tls_on=bool(cfg["tls_on"]),
    436             starttls_on=bool(cfg["starttls_on"]),
    437             auth_on=bool(cfg["auth_on"]),
    438             user=cfg["smtp_user"],
    439             default_from=cfg["default_from"],
    440             use_aliases=True,
    441         )
    442         write_msmtprc(mc, dry_run=args.dry_run)
    443 
    444         if not confirm_overwrite(ALIASES_PATH, force=args.force):
    445             print("Cancelado.")
    446             return 1
    447         write_aliases(cfg["admin_email"], dry_run=args.dry_run)
    448 
    449         state_public: dict[str, Any] = {
    450             "backend": "sendmail",
    451             "provider": "smtp_msmtp",
    452             "email_package_root": str(MODULE_ROOT),
    453             "admin_email": cfg["admin_email"],
    454             "default_from": cfg["default_from"],
    455             "smtp_host": cfg["smtp_host"],
    456             "smtp_port": cfg["smtp_port"],
    457         }
    458         write_state(state_public, dry_run=args.dry_run)
    459 
    460         if not args.dry_run and prompt_yes_no("\nEnviar email de teste agora?", default_no=True):
    461             try:
    462                 run_test_send(dry_run=False)
    463                 log().info("Teste enviado.")
    464             except Exception as e:
    465                 log().warning("Teste falhou (config pode estar correta mesmo assim): %s", e)
    466 
    467         print("\n=== Resumo (backend legado: SMTP / sendmail) ===")
    468         print(f"  msmtp:     {MSMPTRC_PATH}")
    469         print(f"  aliases:   {ALIASES_PATH}")
    470         print(f"  netrc:     {NETRC_PATH} (credenciais — não partilhar)")
    471         print(f"  estado:    {STATE_PATH}")
    472         print(f"  sendmail:  /usr/sbin/sendmail (msmtp-mta)")
    473         print("\nDocumentação: docs/08-email.md (repositório)")
    474         print("Teste posterior: sudo python3 email/configure_msmtp_legacy.py --test")
    475         print("Mailgun (recomendado): sudo python3 email/configure_mailgun.py")
    476         return 0
    477 
    478     except (KeyboardInterrupt, EOFError):
    479         print("\nInterrompido.", file=sys.stderr)
    480         return 130
    481     except Exception as e:
    482         log().error("%s", e)
    483         return 1
    484 
    485 
    486 if __name__ == "__main__":
    487     raise SystemExit(main())