mailgun_client.py (9911B)
1 #!/usr/bin/env python3 2 """ 3 Cliente HTTP Mailgun (API de envio) — stdlib apenas. 4 5 Basic Auth: utilizador fixo ``api``, palavra-passe = API key. 6 Documentação: https://documentation.mailgun.com/en/latest/api-sending.html 7 """ 8 9 from __future__ import annotations 10 11 import base64 12 import json 13 import os 14 import re 15 import ssl 16 import urllib.error 17 import urllib.parse 18 import urllib.request 19 from pathlib import Path 20 from typing import Any, Final, Mapping, Sequence 21 22 DEFAULT_STATE_PATH = Path("/etc/runv-email.json") 23 DEFAULT_SECRETS_PATH = Path("/etc/runv-email.secrets.json") 24 25 # Placeholder neutro para testes/documentação — nunca credenciais reais. 26 EXAMPLE_DOMAIN: Final[str] = "example.com" 27 28 _REGIONS: Final[frozenset[str]] = frozenset({"us", "eu"}) 29 30 # Domínio verificável: hostname ou subdomínio típico Mailgun (mg.example.com) 31 _DOMAIN_RE: Final[re.Pattern[str]] = re.compile( 32 r"^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$" 33 ) 34 35 36 class MailgunConfigError(ValueError): 37 """Configuração inválida ou incompleta.""" 38 39 40 class MailgunHTTPError(RuntimeError): 41 """Resposta HTTP não-sucesso da API Mailgun.""" 42 43 def __init__(self, message: str, *, status: int, body_snippet: str) -> None: 44 super().__init__(message) 45 self.status = status 46 self.body_snippet = body_snippet 47 48 49 def mailgun_base_url(region: str) -> str: 50 """ 51 URL base da API (sem ``/v3/...``) para a região escolhida. 52 ``region`` deve ser ``us`` ou ``eu`` (minúsculas). 53 """ 54 r = region.strip().lower() 55 if r not in _REGIONS: 56 raise MailgunConfigError(f"região inválida: {region!r} (use 'us' ou 'eu')") 57 if r == "eu": 58 return "https://api.eu.mailgun.net" 59 return "https://api.mailgun.net" 60 61 62 def build_mailgun_messages_url(*, base_url: str, domain: str) -> str: 63 """URL completa ``POST .../v3/{domain}/messages``.""" 64 b = base_url.rstrip("/") 65 d = domain.strip().lower() 66 if not d: 67 raise MailgunConfigError("domínio Mailgun vazio") 68 return f"{b}/v3/{urllib.parse.quote(d, safe='.')}/messages" 69 70 71 def mask_secret(value: str | None, *, visible_tail: int = 4) -> str: 72 """Mascara segredos para logs ou mensagens de diagnóstico.""" 73 if value is None: 74 return "(não definido)" 75 s = value.strip() 76 if not s: 77 return "(vazio)" 78 if len(s) <= visible_tail + 3: 79 return "***" 80 return s[:3] + "…" + s[-visible_tail:] # type: ignore 81 82 83 def validate_mailgun_inputs( 84 *, 85 domain: str, 86 region: str, 87 from_addr: str, 88 admin_email: str, 89 api_key: str, 90 ) -> dict[str, str]: 91 """ 92 Valida entradas interactivas / ficheiro. Devolve dict normalizado 93 (domain, region, from_addr, admin_email) — não devolve a key. 94 """ 95 core = validate_mailgun_send_fields( 96 domain=domain, 97 region=region, 98 from_addr=from_addr, 99 api_key=api_key, 100 ) 101 ad = admin_email.strip() 102 if not ad or "@" not in ad: 103 raise MailgunConfigError("email do administrador inválido.") 104 return {**core, "admin_email": ad} 105 106 107 def validate_mailgun_send_fields( 108 *, 109 domain: str, 110 region: str, 111 from_addr: str, 112 api_key: str, 113 ) -> dict[str, str]: 114 """Valida domínio, região, From e API key (envio em tempo de execução).""" 115 d = domain.strip().lower() 116 if not d: 117 raise MailgunConfigError("domínio de envio obrigatório (não pode estar vazio).") 118 if not _DOMAIN_RE.match(d): 119 raise MailgunConfigError( 120 f"domínio inválido: {domain!r} — use um hostname FQDN (ex.: {EXAMPLE_DOMAIN}).", 121 ) 122 123 r = region.strip().lower() 124 mailgun_base_url(r) # valida região 125 126 fa = from_addr.strip() 127 if not fa or "@" not in fa: 128 raise MailgunConfigError("remetente (From) deve ser um endereço de email válido.") 129 130 key = api_key.strip() 131 if not key: 132 raise MailgunConfigError("API key Mailgun obrigatória (não pode estar vazia).") 133 134 return { 135 "domain": d, 136 "region": r, 137 "from_addr": fa, 138 } 139 140 141 def state_path() -> Path: 142 raw = os.environ.get("RUNV_EMAIL_STATE_PATH", "").strip() 143 return Path(raw) if raw else DEFAULT_STATE_PATH 144 145 146 def secrets_path_from_state(public: Mapping[str, Any]) -> Path: 147 raw = str(public.get("secrets_path") or "").strip() 148 if raw: 149 return Path(raw) 150 raw_env = os.environ.get("RUNV_EMAIL_SECRETS_PATH", "").strip() 151 if raw_env: 152 return Path(raw_env) 153 return DEFAULT_SECRETS_PATH 154 155 156 def load_public_state(path: Path | None = None) -> dict[str, Any]: 157 p = path or state_path() 158 if not p.is_file(): 159 raise FileNotFoundError( 160 f"Estado de email não encontrado: {p}. Execute o configurador Mailgun.", 161 ) 162 return json.loads(p.read_text(encoding="utf-8")) 163 164 165 def load_mailgun_api_key(public: Mapping[str, Any]) -> tuple[str, str]: 166 """ 167 Carrega API key. Ordem: ``RUNV_MAILGUN_API_KEY``, depois ficheiro de segredos. 168 Devolve (api_key, fonte_descritiva) — fonte nunca contém a key. 169 """ 170 env_key = os.environ.get("RUNV_MAILGUN_API_KEY", "").strip() 171 if env_key: 172 return env_key, "environment" 173 174 sp = secrets_path_from_state(public) 175 if not sp.is_file(): 176 raise MailgunConfigError( 177 f"API key em falta: defina RUNV_MAILGUN_API_KEY ou crie {sp} (0600) com mailgun_api_key.", 178 ) 179 try: 180 sec = json.loads(sp.read_text(encoding="utf-8")) 181 except json.JSONDecodeError as e: 182 raise MailgunConfigError(f"ficheiro de segredos JSON inválido: {sp}: {e}") from e 183 key = str(sec.get("mailgun_api_key", "")).strip() 184 if not key: 185 raise MailgunConfigError(f"mailgun_api_key vazio em {sp}") 186 return key, f"file:{sp}" 187 188 189 def build_mailgun_runtime_config(public: Mapping[str, Any]) -> dict[str, Any]: 190 """Junta estado público + key (em memória) para envio.""" 191 if public.get("backend") == "sendmail": 192 raise MailgunConfigError("estado explícito backend=sendmail — não usar Mailgun") 193 domain = str(public.get("mailgun_domain", "")).strip() 194 region = str(public.get("mailgun_region", "")).strip().lower() 195 default_from = str(public.get("default_from", "")).strip() 196 api_key, _src = load_mailgun_api_key(public) 197 base = str(public.get("api_base_url") or mailgun_base_url(region)) 198 validate_mailgun_send_fields( 199 domain=domain, 200 region=region, 201 from_addr=default_from, 202 api_key=api_key, 203 ) 204 return { 205 "domain": domain, 206 "region": region, 207 "api_base_url": base, 208 "default_from": default_from, 209 "api_key": api_key, 210 } 211 212 213 def send_via_mailgun_api( 214 *, 215 base_url: str, 216 domain: str, 217 api_key: str, 218 from_addr: str, 219 to_addrs: str | Sequence[str], 220 subject: str, 221 text: str, 222 html: str | None = None, 223 timeout: int = 120, 224 ) -> tuple[int, str]: 225 """ 226 POST application/x-www-form-urlencoded para ``/v3/{domain}/messages``. 227 228 :return: (status_code, body_text) em sucesso 200. 229 :raises MailgunHTTPError: status não 2xx. 230 :raises MailgunConfigError: destinatários vazios. 231 """ 232 if isinstance(to_addrs, str): 233 recipients = [to_addrs.strip()] 234 else: 235 recipients = [a.strip() for a in to_addrs if a and str(a).strip()] 236 if not recipients: 237 raise MailgunConfigError("lista de destinatários vazia") 238 239 url = build_mailgun_messages_url(base_url=base_url, domain=domain) 240 pairs: list[tuple[str, str]] = [ 241 ("from", from_addr), 242 ("subject", subject), 243 ("text", text), 244 ] 245 for a in recipients: 246 pairs.append(("to", a)) 247 if html: 248 pairs.append(("html", html)) 249 250 body = urllib.parse.urlencode(pairs).encode("utf-8") 251 token = base64.b64encode(f"api:{api_key}".encode("utf-8")).decode("ascii") 252 req = urllib.request.Request(url, data=body, method="POST") 253 req.add_header("Authorization", f"Basic {token}") 254 req.add_header("Content-Type", "application/x-www-form-urlencoded; charset=utf-8") 255 256 ctx = ssl.create_default_context() 257 try: 258 with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: 259 raw = resp.read().decode("utf-8", errors="replace") 260 return resp.getcode() or 200, raw 261 except urllib.error.HTTPError as e: 262 err_body = e.read().decode("utf-8", errors="replace") if e.fp else "" 263 snippet = err_body[:500].strip() # type: ignore 264 raise MailgunHTTPError( 265 f"Mailgun HTTP {e.code}", 266 status=e.code, 267 body_snippet=snippet, 268 ) from None 269 except urllib.error.URLError as e: 270 reason = getattr(e, "reason", e) 271 raise MailgunConfigError(f"rede/SSL ao contactar Mailgun: {reason}") from e 272 except TimeoutError as e: 273 raise MailgunConfigError("timeout ao contactar API Mailgun") from e 274 275 276 def format_mailgun_failure(status: int, body_snippet: str) -> str: 277 """Mensagem legível para operadores (sem expor segredos).""" 278 base = f"HTTP {status}" 279 if status in (401, 403): 280 return ( 281 f"{base}: API key inválida, domínio/região incorrectos, ou **IP allowlist** no " 282 f"painel Mailgun a bloquear este servidor. Confirme chave HTTP (não password SMTP), " 283 f"domínio na URL, e em Security/API a lista de IPs permitidos." 284 ) 285 if status == 400: 286 return f"{base}: pedido inválido — verifique domínio, From autorizado e campos obrigatórios. Resposta: {body_snippet[:200]}" # type: ignore 287 if status == 404: 288 return f"{base}: domínio ou URL/região incorretos (confirme US vs EU e o domínio no painel Mailgun). Resposta: {body_snippet[:200]}" # type: ignore 289 if status >= 500: 290 return f"{base}: erro no serviço Mailgun. Tente mais tarde. Resposta: {body_snippet[:200]}" # type: ignore 291 return f"{base}: {body_snippet[:300]}" # type: ignore