runv_email_aliases.py (15664B)
1 #!/usr/bin/env python3 2 """ 3 Utilitários para pedidos e aprovação de aliases de email runv.club (stdlib apenas). 4 """ 5 6 from __future__ import annotations 7 8 import json 9 import os 10 import re 11 import secrets 12 import sys 13 import tempfile 14 from datetime import datetime, timezone 15 from pathlib import Path 16 from typing import Any, Final, Literal 17 18 import runv_community as rc 19 20 DEFAULT_ALIASES_PATH: Final[Path] = Path("/var/lib/runv/email-aliases.json") 21 DEFAULT_ALIASES_LOCK: Final[Path] = Path("/var/lib/runv/email-aliases.lock") 22 DEFAULT_QUEUE_DIR: Final[Path] = Path("/var/lib/runv/email-alias-queue") 23 DEFAULT_ALIAS_DOMAIN: Final[str] = "runv.club" 24 DEFAULT_MEMBERS_GROUP: Final[str] = "runv-members" 25 ALIASES_JSON_MODE: Final[int] = 0o640 26 ALIASES_LOCK_MODE: Final[int] = 0o660 27 28 ALIAS_RESERVED_USERNAMES: Final[frozenset[str]] = frozenset( 29 { 30 "root", 31 "admin", 32 "postmaster", 33 "abuse", 34 "security", 35 "support", 36 "contato", 37 "contact", 38 "noreply", 39 "no-reply", 40 "mailer-daemon", 41 "hostmaster", 42 "webmaster", 43 "entre", 44 "join", 45 "welcome", 46 } 47 ) 48 49 DEST_EMAIL_RE: Final[re.Pattern[str]] = re.compile( 50 r"^[^@\s\x00\r\n]+@[^@\s\x00\r\n]+\.[^@\s\x00\r\n]+$" 51 ) 52 53 ArchiveKind = Literal["approved", "rejected", "cancelled"] 54 55 56 def iso_utc_now() -> str: 57 return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") 58 59 60 def alias_domain() -> str: 61 raw = os.environ.get("RUNV_EMAIL_ALIAS_DOMAIN", "").strip().lower() 62 return raw if raw else DEFAULT_ALIAS_DOMAIN 63 64 65 def aliases_paths() -> tuple[Path, Path]: 66 aliases = os.environ.get("RUNV_EMAIL_ALIASES_PATH", "").strip() 67 lock = os.environ.get("RUNV_EMAIL_ALIASES_LOCK_PATH", "").strip() 68 return ( 69 Path(aliases) if aliases else DEFAULT_ALIASES_PATH, 70 Path(lock) if lock else DEFAULT_ALIASES_LOCK, 71 ) 72 73 74 def queue_paths() -> dict[str, Path]: 75 raw = os.environ.get("RUNV_EMAIL_ALIAS_QUEUE_DIR", "").strip() 76 base = Path(raw) if raw else DEFAULT_QUEUE_DIR 77 return { 78 "queue": base, 79 "approved": base / "approved", 80 "rejected": base / "rejected", 81 "cancelled": base / "cancelled", 82 } 83 84 85 def validate_alias_username(username: str) -> str: 86 u = rc.validate_username(username) 87 if u in ALIAS_RESERVED_USERNAMES: 88 rc.friendly_exit(f"nome de utilizador reservado para alias de email: {u!r}") 89 return u 90 91 92 def validate_destination_email(email: str) -> str: 93 addr = email.strip() 94 if not addr: 95 rc.friendly_exit("email de destino obrigatório.") 96 if len(addr) > 254: 97 rc.friendly_exit("email de destino demasiado longo (máximo 254 caracteres).") 98 if "\x00" in addr or "\n" in addr or "\r" in addr or " " in addr: 99 rc.friendly_exit("email de destino inválido.") 100 if addr.count("@") != 1: 101 rc.friendly_exit("email de destino inválido: deve conter exactamente um @.") 102 if not DEST_EMAIL_RE.fullmatch(addr): 103 rc.friendly_exit("email de destino inválido.") 104 local, domain = addr.rsplit("@", 1) 105 if not local or not domain: 106 rc.friendly_exit("email de destino inválido.") 107 if "." not in domain: 108 rc.friendly_exit("email de destino inválido: domínio deve conter pelo menos um ponto.") 109 if domain.lower() == alias_domain().lower(): 110 rc.friendly_exit( 111 f"email de destino não pode ser @{alias_domain()} (evita encaminhamento em loop)." 112 ) 113 return addr 114 115 116 def alias_address(username: str) -> str: 117 return f"{username}@{alias_domain()}" 118 119 120 def current_username() -> str: 121 import pwd 122 123 try: 124 name = pwd.getpwuid(os.getuid()).pw_name 125 except (KeyError, OSError) as e: 126 rc.friendly_exit(f"não foi possível determinar o utilizador: {e}") 127 return validate_alias_username(name) 128 129 130 def admin_operator() -> str: 131 sudo_user = os.environ.get("SUDO_USER", "").strip() 132 if sudo_user: 133 return sudo_user 134 import pwd 135 136 try: 137 return pwd.getpwuid(os.getuid()).pw_name 138 except (KeyError, OSError): 139 return "root" 140 141 142 def require_root() -> None: 143 geteuid = getattr(os, "geteuid", None) 144 if geteuid is None or geteuid() != 0: 145 rc.friendly_exit("este comando precisa ser executado como root.") 146 147 148 def new_request_id(username: str) -> str: 149 ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") 150 suffix = secrets.token_hex(3) 151 return f"{ts}-{username}-{suffix}" 152 153 154 def _read_json_file(path: Path) -> Any | None: 155 if not path.is_file(): 156 return None 157 try: 158 raw = path.read_text(encoding="utf-8").strip() 159 if not raw: 160 return None 161 return json.loads(raw) 162 except PermissionError: 163 geteuid = getattr(os, "geteuid", None) 164 if geteuid is not None and geteuid() != 0: 165 rc.friendly_exit( 166 "sem permissão para ler email-aliases.json.\n" 167 "Peça ao admin para executar:\n" 168 " sudo python3 scripts/admin/setup_email_aliases.py" 169 ) 170 return None 171 except (OSError, json.JSONDecodeError): 172 return None 173 174 175 def restore_aliases_json_permissions() -> None: 176 """Mantém email-aliases.json legível pelo grupo runv-members após escrita root.""" 177 aliases_path, lock_path = aliases_paths() 178 group = os.environ.get("RUNV_MEMBERS_GROUP", "").strip() or DEFAULT_MEMBERS_GROUP 179 try: 180 import grp 181 182 gid = grp.getgrnam(group).gr_gid 183 except KeyError: 184 return 185 for path, mode in ((aliases_path, ALIASES_JSON_MODE), (lock_path, ALIASES_LOCK_MODE)): 186 if not path.exists(): 187 continue 188 try: 189 os.chown(path, 0, gid) 190 os.chmod(path, mode) 191 except OSError: 192 pass 193 194 195 def load_aliases_unlocked(aliases_path: Path) -> dict[str, dict[str, Any]]: 196 parsed = _read_json_file(aliases_path) 197 if parsed is None: 198 return {} 199 if not isinstance(parsed, dict): 200 rc.friendly_exit(f"formato inválido em {aliases_path}: esperado objecto JSON.") 201 out: dict[str, dict[str, Any]] = {} 202 for key, val in parsed.items(): 203 if isinstance(key, str) and isinstance(val, dict): 204 out[key] = val 205 return out 206 207 208 def load_aliases() -> dict[str, dict[str, Any]]: 209 aliases_path, _ = aliases_paths() 210 return load_aliases_unlocked(aliases_path) 211 212 213 def save_aliases(data: dict[str, dict[str, Any]]) -> None: 214 import fcntl 215 216 aliases_path, lock_path = aliases_paths() 217 aliases_path.parent.mkdir(parents=True, exist_ok=True) 218 lock_path.parent.mkdir(parents=True, exist_ok=True) 219 220 lock_f = open(lock_path, "a+", encoding="utf-8") 221 try: 222 fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX) 223 tmp_fd, tmp_name = tempfile.mkstemp( 224 prefix="email-aliases.", 225 suffix=".tmp", 226 dir=str(aliases_path.parent), 227 ) 228 tmp_path = Path(tmp_name) 229 try: 230 with os.fdopen(tmp_fd, "w", encoding="utf-8") as out: 231 json.dump(data, out, indent=2, ensure_ascii=False) 232 out.write("\n") 233 out.flush() 234 os.fsync(out.fileno()) 235 os.replace(tmp_path, aliases_path) 236 restore_aliases_json_permissions() 237 except Exception: 238 tmp_path.unlink(missing_ok=True) 239 raise 240 finally: 241 fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN) 242 lock_f.close() 243 244 245 def get_active_alias(username: str) -> dict[str, Any] | None: 246 entry = load_aliases().get(username) 247 if not entry or entry.get("status") != "active": 248 return None 249 return entry 250 251 252 def _queue_request_file(request_id: str) -> Path: 253 return queue_paths()["queue"] / f"{request_id}.json" 254 255 256 def _parse_request(path: Path) -> dict[str, Any] | None: 257 data = _read_json_file(path) 258 if not isinstance(data, dict): 259 return None 260 return data 261 262 263 def iter_pending_requests() -> list[dict[str, Any]]: 264 qdir = queue_paths()["queue"] 265 if not qdir.is_dir(): 266 return [] 267 pending: list[dict[str, Any]] = [] 268 for path in sorted(qdir.glob("*.json")): 269 if not path.is_file(): 270 continue 271 req = _parse_request(path) 272 if req and req.get("status") == "pending": 273 pending.append({**req, "_path": str(path)}) 274 pending.sort(key=lambda r: str(r.get("created_at", ""))) 275 return pending 276 277 278 def find_pending_for_user(username: str) -> dict[str, Any] | None: 279 for req in iter_pending_requests(): 280 if req.get("username") == username: 281 return req 282 return None 283 284 285 def find_latest_pending_for_user(username: str) -> tuple[dict[str, Any], Path] | None: 286 matches: list[tuple[dict[str, Any], Path]] = [] 287 qdir = queue_paths()["queue"] 288 if not qdir.is_dir(): 289 return None 290 for path in qdir.glob("*.json"): 291 if not path.is_file(): 292 continue 293 req = _parse_request(path) 294 if req and req.get("username") == username and req.get("status") == "pending": 295 matches.append((req, path)) 296 if not matches: 297 return None 298 299 def sort_key(item: tuple[dict[str, Any], Path]) -> str: 300 req, path = item 301 created = req.get("created_at") 302 if isinstance(created, str) and created: 303 return created 304 try: 305 return datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc).strftime( 306 "%Y-%m-%dT%H:%M:%SZ" 307 ) 308 except OSError: 309 return "" 310 311 matches.sort(key=sort_key) 312 req, path = matches[-1] 313 return req, path 314 315 316 def create_pending_request(username: str, destination: str) -> dict[str, Any]: 317 paths = queue_paths() 318 try: 319 paths["queue"].mkdir(parents=True, exist_ok=True) 320 except OSError as e: 321 rc.friendly_exit(f"não foi possível criar o diretório da fila: {e}") 322 323 if find_pending_for_user(username) is not None: 324 rc.friendly_exit( 325 "Já existe pedido pendente.\n" 326 "Veja o status com:\n" 327 " runv-email-alias status\n\n" 328 "Para cancelar:\n" 329 " runv-email-alias cancel" 330 ) 331 332 request_id = new_request_id(username) 333 alias = alias_address(username) 334 payload: dict[str, Any] = { 335 "request_id": request_id, 336 "username": username, 337 "alias": alias, 338 "destination": destination, 339 "status": "pending", 340 "created_at": iso_utc_now(), 341 } 342 dest_path = _queue_request_file(request_id) 343 try: 344 fd = os.open( 345 dest_path, 346 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 347 0o664, 348 ) 349 except FileExistsError: 350 rc.friendly_exit("conflito ao criar pedido; tente novamente.") 351 except PermissionError: 352 rc.friendly_exit( 353 f"sem permissão para criar pedido em {paths['queue']}\n" 354 "peça ao admin para executar:\n" 355 " sudo python3 scripts/admin/setup_email_aliases.py" 356 ) 357 except OSError as e: 358 rc.friendly_exit(f"erro ao criar pedido: {e}") 359 360 try: 361 with os.fdopen(fd, "w", encoding="utf-8") as out: 362 json.dump(payload, out, indent=2, ensure_ascii=False) 363 out.write("\n") 364 except OSError as e: 365 dest_path.unlink(missing_ok=True) 366 rc.friendly_exit(f"erro ao gravar pedido: {e}") 367 return payload 368 369 370 def archive_request( 371 path: Path, 372 payload: dict[str, Any], 373 kind: ArchiveKind, 374 ) -> Path: 375 paths = queue_paths() 376 dest_dir = paths[kind] 377 dest_dir.mkdir(parents=True, exist_ok=True) 378 request_id = payload.get("request_id") 379 if not isinstance(request_id, str) or not request_id: 380 request_id = path.stem 381 dest = dest_dir / f"{request_id}.json" 382 if dest.exists(): 383 dest = dest_dir / f"{request_id}-{secrets.token_hex(2)}.json" 384 try: 385 path.replace(dest) 386 except OSError: 387 dest.write_text( 388 json.dumps(payload, indent=2, ensure_ascii=False) + "\n", 389 encoding="utf-8", 390 ) 391 path.unlink(missing_ok=True) 392 return dest 393 394 395 def cancel_latest_pending(username: str) -> dict[str, Any] | None: 396 found = find_latest_pending_for_user(username) 397 if found is None: 398 return None 399 req, path = found 400 req["status"] = "cancelled" 401 req["cancelled_at"] = iso_utc_now() 402 archive_request(path, req, "cancelled") 403 return req 404 405 406 def approve_pending(username: str, operator: str) -> dict[str, Any]: 407 validate_alias_username(username) 408 found = find_latest_pending_for_user(username) 409 if found is None: 410 rc.friendly_exit(f"nenhum pedido pendente para o utilizador {username!r}.") 411 req, path = found 412 destination = req.get("destination") 413 if not isinstance(destination, str): 414 rc.friendly_exit("pedido pendente sem email de destino válido.") 415 destination = validate_destination_email(destination) 416 417 aliases_path, _ = aliases_paths() 418 import fcntl 419 420 aliases_path.parent.mkdir(parents=True, exist_ok=True) 421 lock_path = aliases_paths()[1] 422 lock_path.parent.mkdir(parents=True, exist_ok=True) 423 424 lock_f = open(lock_path, "a+", encoding="utf-8") 425 try: 426 fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX) 427 data = load_aliases_unlocked(aliases_path) 428 now = iso_utc_now() 429 alias = alias_address(username) 430 existing = data.get(username) 431 created_at = now 432 if isinstance(existing, dict) and isinstance(existing.get("created_at"), str): 433 created_at = existing["created_at"] 434 data[username] = { 435 "username": username, 436 "alias": alias, 437 "destination": destination, 438 "status": "active", 439 "created_at": created_at, 440 "updated_at": now, 441 "approved_by": operator, 442 } 443 tmp_fd, tmp_name = tempfile.mkstemp( 444 prefix="email-aliases.", 445 suffix=".tmp", 446 dir=str(aliases_path.parent), 447 ) 448 tmp_path = Path(tmp_name) 449 try: 450 with os.fdopen(tmp_fd, "w", encoding="utf-8") as out: 451 json.dump(data, out, indent=2, ensure_ascii=False) 452 out.write("\n") 453 out.flush() 454 os.fsync(out.fileno()) 455 os.replace(tmp_path, aliases_path) 456 restore_aliases_json_permissions() 457 except Exception: 458 tmp_path.unlink(missing_ok=True) 459 raise 460 finally: 461 fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN) 462 lock_f.close() 463 464 req["status"] = "approved" 465 req["approved_at"] = iso_utc_now() 466 req["approved_by"] = operator 467 archive_request(path, req, "approved") 468 return data[username] 469 470 471 def reject_pending(username: str, operator: str, reason: str) -> dict[str, Any]: 472 validate_alias_username(username) 473 found = find_latest_pending_for_user(username) 474 if found is None: 475 rc.friendly_exit(f"nenhum pedido pendente para o utilizador {username!r}.") 476 req, path = found 477 req["status"] = "rejected" 478 req["rejected_at"] = iso_utc_now() 479 req["rejected_by"] = operator 480 req["reason"] = reason 481 archive_request(path, req, "rejected") 482 return req 483 484 485 def list_active_aliases() -> list[tuple[str, str, str]]: 486 data = load_aliases() 487 rows: list[tuple[str, str, str]] = [] 488 for username in sorted(data.keys(), key=str.lower): 489 entry = data[username] 490 if entry.get("status") != "active": 491 continue 492 alias = entry.get("alias") 493 dest = entry.get("destination") 494 if isinstance(alias, str) and isinstance(dest, str): 495 rows.append((username, alias, dest)) 496 return rows