runv-server

server tooling for runv.club
Log | Files | Refs | README

mailgun_client.py (9911B)


      1 #!/usr/bin/env python3
      2 """
      3 Cliente HTTP Mailgun (API de envio) — stdlib apenas.
      4 
      5 Basic Auth: utilizador fixo ``api``, palavra-passe = API key.
      6 Documentação: https://documentation.mailgun.com/en/latest/api-sending.html
      7 """
      8 
      9 from __future__ import annotations
     10 
     11 import base64
     12 import json
     13 import os
     14 import re
     15 import ssl
     16 import urllib.error
     17 import urllib.parse
     18 import urllib.request
     19 from pathlib import Path
     20 from typing import Any, Final, Mapping, Sequence
     21 
     22 DEFAULT_STATE_PATH = Path("/etc/runv-email.json")
     23 DEFAULT_SECRETS_PATH = Path("/etc/runv-email.secrets.json")
     24 
     25 # Placeholder neutro para testes/documentação — nunca credenciais reais.
     26 EXAMPLE_DOMAIN: Final[str] = "example.com"
     27 
     28 _REGIONS: Final[frozenset[str]] = frozenset({"us", "eu"})
     29 
     30 # Domínio verificável: hostname ou subdomínio típico Mailgun (mg.example.com)
     31 _DOMAIN_RE: Final[re.Pattern[str]] = re.compile(
     32     r"^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$"
     33 )
     34 
     35 
     36 class MailgunConfigError(ValueError):
     37     """Configuração inválida ou incompleta."""
     38 
     39 
     40 class MailgunHTTPError(RuntimeError):
     41     """Resposta HTTP não-sucesso da API Mailgun."""
     42 
     43     def __init__(self, message: str, *, status: int, body_snippet: str) -> None:
     44         super().__init__(message)
     45         self.status = status
     46         self.body_snippet = body_snippet
     47 
     48 
     49 def mailgun_base_url(region: str) -> str:
     50     """
     51     URL base da API (sem ``/v3/...``) para a região escolhida.
     52     ``region`` deve ser ``us`` ou ``eu`` (minúsculas).
     53     """
     54     r = region.strip().lower()
     55     if r not in _REGIONS:
     56         raise MailgunConfigError(f"região inválida: {region!r} (use 'us' ou 'eu')")
     57     if r == "eu":
     58         return "https://api.eu.mailgun.net"
     59     return "https://api.mailgun.net"
     60 
     61 
     62 def build_mailgun_messages_url(*, base_url: str, domain: str) -> str:
     63     """URL completa ``POST .../v3/{domain}/messages``."""
     64     b = base_url.rstrip("/")
     65     d = domain.strip().lower()
     66     if not d:
     67         raise MailgunConfigError("domínio Mailgun vazio")
     68     return f"{b}/v3/{urllib.parse.quote(d, safe='.')}/messages"
     69 
     70 
     71 def mask_secret(value: str | None, *, visible_tail: int = 4) -> str:
     72     """Mascara segredos para logs ou mensagens de diagnóstico."""
     73     if value is None:
     74         return "(não definido)"
     75     s = value.strip()
     76     if not s:
     77         return "(vazio)"
     78     if len(s) <= visible_tail + 3:
     79         return "***"
     80     return s[:3] + "…" + s[-visible_tail:]  # type: ignore
     81 
     82 
     83 def validate_mailgun_inputs(
     84     *,
     85     domain: str,
     86     region: str,
     87     from_addr: str,
     88     admin_email: str,
     89     api_key: str,
     90 ) -> dict[str, str]:
     91     """
     92     Valida entradas interactivas / ficheiro. Devolve dict normalizado
     93     (domain, region, from_addr, admin_email) — não devolve a key.
     94     """
     95     core = validate_mailgun_send_fields(
     96         domain=domain,
     97         region=region,
     98         from_addr=from_addr,
     99         api_key=api_key,
    100     )
    101     ad = admin_email.strip()
    102     if not ad or "@" not in ad:
    103         raise MailgunConfigError("email do administrador inválido.")
    104     return {**core, "admin_email": ad}
    105 
    106 
    107 def validate_mailgun_send_fields(
    108     *,
    109     domain: str,
    110     region: str,
    111     from_addr: str,
    112     api_key: str,
    113 ) -> dict[str, str]:
    114     """Valida domínio, região, From e API key (envio em tempo de execução)."""
    115     d = domain.strip().lower()
    116     if not d:
    117         raise MailgunConfigError("domínio de envio obrigatório (não pode estar vazio).")
    118     if not _DOMAIN_RE.match(d):
    119         raise MailgunConfigError(
    120             f"domínio inválido: {domain!r} — use um hostname FQDN (ex.: {EXAMPLE_DOMAIN}).",
    121         )
    122 
    123     r = region.strip().lower()
    124     mailgun_base_url(r)  # valida região
    125 
    126     fa = from_addr.strip()
    127     if not fa or "@" not in fa:
    128         raise MailgunConfigError("remetente (From) deve ser um endereço de email válido.")
    129 
    130     key = api_key.strip()
    131     if not key:
    132         raise MailgunConfigError("API key Mailgun obrigatória (não pode estar vazia).")
    133 
    134     return {
    135         "domain": d,
    136         "region": r,
    137         "from_addr": fa,
    138     }
    139 
    140 
    141 def state_path() -> Path:
    142     raw = os.environ.get("RUNV_EMAIL_STATE_PATH", "").strip()
    143     return Path(raw) if raw else DEFAULT_STATE_PATH
    144 
    145 
    146 def secrets_path_from_state(public: Mapping[str, Any]) -> Path:
    147     raw = str(public.get("secrets_path") or "").strip()
    148     if raw:
    149         return Path(raw)
    150     raw_env = os.environ.get("RUNV_EMAIL_SECRETS_PATH", "").strip()
    151     if raw_env:
    152         return Path(raw_env)
    153     return DEFAULT_SECRETS_PATH
    154 
    155 
    156 def load_public_state(path: Path | None = None) -> dict[str, Any]:
    157     p = path or state_path()
    158     if not p.is_file():
    159         raise FileNotFoundError(
    160             f"Estado de email não encontrado: {p}. Execute o configurador Mailgun.",
    161         )
    162     return json.loads(p.read_text(encoding="utf-8"))
    163 
    164 
    165 def load_mailgun_api_key(public: Mapping[str, Any]) -> tuple[str, str]:
    166     """
    167     Carrega API key. Ordem: ``RUNV_MAILGUN_API_KEY``, depois ficheiro de segredos.
    168     Devolve (api_key, fonte_descritiva) — fonte nunca contém a key.
    169     """
    170     env_key = os.environ.get("RUNV_MAILGUN_API_KEY", "").strip()
    171     if env_key:
    172         return env_key, "environment"
    173 
    174     sp = secrets_path_from_state(public)
    175     if not sp.is_file():
    176         raise MailgunConfigError(
    177             f"API key em falta: defina RUNV_MAILGUN_API_KEY ou crie {sp} (0600) com mailgun_api_key.",
    178         )
    179     try:
    180         sec = json.loads(sp.read_text(encoding="utf-8"))
    181     except json.JSONDecodeError as e:
    182         raise MailgunConfigError(f"ficheiro de segredos JSON inválido: {sp}: {e}") from e
    183     key = str(sec.get("mailgun_api_key", "")).strip()
    184     if not key:
    185         raise MailgunConfigError(f"mailgun_api_key vazio em {sp}")
    186     return key, f"file:{sp}"
    187 
    188 
    189 def build_mailgun_runtime_config(public: Mapping[str, Any]) -> dict[str, Any]:
    190     """Junta estado público + key (em memória) para envio."""
    191     if public.get("backend") == "sendmail":
    192         raise MailgunConfigError("estado explícito backend=sendmail — não usar Mailgun")
    193     domain = str(public.get("mailgun_domain", "")).strip()
    194     region = str(public.get("mailgun_region", "")).strip().lower()
    195     default_from = str(public.get("default_from", "")).strip()
    196     api_key, _src = load_mailgun_api_key(public)
    197     base = str(public.get("api_base_url") or mailgun_base_url(region))
    198     validate_mailgun_send_fields(
    199         domain=domain,
    200         region=region,
    201         from_addr=default_from,
    202         api_key=api_key,
    203     )
    204     return {
    205         "domain": domain,
    206         "region": region,
    207         "api_base_url": base,
    208         "default_from": default_from,
    209         "api_key": api_key,
    210     }
    211 
    212 
    213 def send_via_mailgun_api(
    214     *,
    215     base_url: str,
    216     domain: str,
    217     api_key: str,
    218     from_addr: str,
    219     to_addrs: str | Sequence[str],
    220     subject: str,
    221     text: str,
    222     html: str | None = None,
    223     timeout: int = 120,
    224 ) -> tuple[int, str]:
    225     """
    226     POST application/x-www-form-urlencoded para ``/v3/{domain}/messages``.
    227 
    228     :return: (status_code, body_text) em sucesso 200.
    229     :raises MailgunHTTPError: status não 2xx.
    230     :raises MailgunConfigError: destinatários vazios.
    231     """
    232     if isinstance(to_addrs, str):
    233         recipients = [to_addrs.strip()]
    234     else:
    235         recipients = [a.strip() for a in to_addrs if a and str(a).strip()]
    236     if not recipients:
    237         raise MailgunConfigError("lista de destinatários vazia")
    238 
    239     url = build_mailgun_messages_url(base_url=base_url, domain=domain)
    240     pairs: list[tuple[str, str]] = [
    241         ("from", from_addr),
    242         ("subject", subject),
    243         ("text", text),
    244     ]
    245     for a in recipients:
    246         pairs.append(("to", a))
    247     if html:
    248         pairs.append(("html", html))
    249 
    250     body = urllib.parse.urlencode(pairs).encode("utf-8")
    251     token = base64.b64encode(f"api:{api_key}".encode("utf-8")).decode("ascii")
    252     req = urllib.request.Request(url, data=body, method="POST")
    253     req.add_header("Authorization", f"Basic {token}")
    254     req.add_header("Content-Type", "application/x-www-form-urlencoded; charset=utf-8")
    255 
    256     ctx = ssl.create_default_context()
    257     try:
    258         with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp:
    259             raw = resp.read().decode("utf-8", errors="replace")
    260             return resp.getcode() or 200, raw
    261     except urllib.error.HTTPError as e:
    262         err_body = e.read().decode("utf-8", errors="replace") if e.fp else ""
    263         snippet = err_body[:500].strip()  # type: ignore
    264         raise MailgunHTTPError(
    265             f"Mailgun HTTP {e.code}",
    266             status=e.code,
    267             body_snippet=snippet,
    268         ) from None
    269     except urllib.error.URLError as e:
    270         reason = getattr(e, "reason", e)
    271         raise MailgunConfigError(f"rede/SSL ao contactar Mailgun: {reason}") from e
    272     except TimeoutError as e:
    273         raise MailgunConfigError("timeout ao contactar API Mailgun") from e
    274 
    275 
    276 def format_mailgun_failure(status: int, body_snippet: str) -> str:
    277     """Mensagem legível para operadores (sem expor segredos)."""
    278     base = f"HTTP {status}"
    279     if status in (401, 403):
    280         return (
    281             f"{base}: API key inválida, domínio/região incorrectos, ou **IP allowlist** no "
    282             f"painel Mailgun a bloquear este servidor. Confirme chave HTTP (não password SMTP), "
    283             f"domínio na URL, e em Security/API a lista de IPs permitidos."
    284         )
    285     if status == 400:
    286         return f"{base}: pedido inválido — verifique domínio, From autorizado e campos obrigatórios. Resposta: {body_snippet[:200]}"  # type: ignore
    287     if status == 404:
    288         return f"{base}: domínio ou URL/região incorretos (confirme US vs EU e o domínio no painel Mailgun). Resposta: {body_snippet[:200]}"  # type: ignore
    289     if status >= 500:
    290         return f"{base}: erro no serviço Mailgun. Tente mais tarde. Resposta: {body_snippet[:200]}"  # type: ignore
    291     return f"{base}: {body_snippet[:300]}"  # type: ignore