runv-server

server tooling for runv.club
Log | Files | Refs | README

runv_email_aliases.py (15664B)


      1 #!/usr/bin/env python3
      2 """
      3 Utilitários para pedidos e aprovação de aliases de email runv.club (stdlib apenas).
      4 """
      5 
      6 from __future__ import annotations
      7 
      8 import json
      9 import os
     10 import re
     11 import secrets
     12 import sys
     13 import tempfile
     14 from datetime import datetime, timezone
     15 from pathlib import Path
     16 from typing import Any, Final, Literal
     17 
     18 import runv_community as rc
     19 
     20 DEFAULT_ALIASES_PATH: Final[Path] = Path("/var/lib/runv/email-aliases.json")
     21 DEFAULT_ALIASES_LOCK: Final[Path] = Path("/var/lib/runv/email-aliases.lock")
     22 DEFAULT_QUEUE_DIR: Final[Path] = Path("/var/lib/runv/email-alias-queue")
     23 DEFAULT_ALIAS_DOMAIN: Final[str] = "runv.club"
     24 DEFAULT_MEMBERS_GROUP: Final[str] = "runv-members"
     25 ALIASES_JSON_MODE: Final[int] = 0o640
     26 ALIASES_LOCK_MODE: Final[int] = 0o660
     27 
     28 ALIAS_RESERVED_USERNAMES: Final[frozenset[str]] = frozenset(
     29     {
     30         "root",
     31         "admin",
     32         "postmaster",
     33         "abuse",
     34         "security",
     35         "support",
     36         "contato",
     37         "contact",
     38         "noreply",
     39         "no-reply",
     40         "mailer-daemon",
     41         "hostmaster",
     42         "webmaster",
     43         "entre",
     44         "join",
     45         "welcome",
     46     }
     47 )
     48 
     49 DEST_EMAIL_RE: Final[re.Pattern[str]] = re.compile(
     50     r"^[^@\s\x00\r\n]+@[^@\s\x00\r\n]+\.[^@\s\x00\r\n]+$"
     51 )
     52 
     53 ArchiveKind = Literal["approved", "rejected", "cancelled"]
     54 
     55 
     56 def iso_utc_now() -> str:
     57     return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
     58 
     59 
     60 def alias_domain() -> str:
     61     raw = os.environ.get("RUNV_EMAIL_ALIAS_DOMAIN", "").strip().lower()
     62     return raw if raw else DEFAULT_ALIAS_DOMAIN
     63 
     64 
     65 def aliases_paths() -> tuple[Path, Path]:
     66     aliases = os.environ.get("RUNV_EMAIL_ALIASES_PATH", "").strip()
     67     lock = os.environ.get("RUNV_EMAIL_ALIASES_LOCK_PATH", "").strip()
     68     return (
     69         Path(aliases) if aliases else DEFAULT_ALIASES_PATH,
     70         Path(lock) if lock else DEFAULT_ALIASES_LOCK,
     71     )
     72 
     73 
     74 def queue_paths() -> dict[str, Path]:
     75     raw = os.environ.get("RUNV_EMAIL_ALIAS_QUEUE_DIR", "").strip()
     76     base = Path(raw) if raw else DEFAULT_QUEUE_DIR
     77     return {
     78         "queue": base,
     79         "approved": base / "approved",
     80         "rejected": base / "rejected",
     81         "cancelled": base / "cancelled",
     82     }
     83 
     84 
     85 def validate_alias_username(username: str) -> str:
     86     u = rc.validate_username(username)
     87     if u in ALIAS_RESERVED_USERNAMES:
     88         rc.friendly_exit(f"nome de utilizador reservado para alias de email: {u!r}")
     89     return u
     90 
     91 
     92 def validate_destination_email(email: str) -> str:
     93     addr = email.strip()
     94     if not addr:
     95         rc.friendly_exit("email de destino obrigatório.")
     96     if len(addr) > 254:
     97         rc.friendly_exit("email de destino demasiado longo (máximo 254 caracteres).")
     98     if "\x00" in addr or "\n" in addr or "\r" in addr or " " in addr:
     99         rc.friendly_exit("email de destino inválido.")
    100     if addr.count("@") != 1:
    101         rc.friendly_exit("email de destino inválido: deve conter exactamente um @.")
    102     if not DEST_EMAIL_RE.fullmatch(addr):
    103         rc.friendly_exit("email de destino inválido.")
    104     local, domain = addr.rsplit("@", 1)
    105     if not local or not domain:
    106         rc.friendly_exit("email de destino inválido.")
    107     if "." not in domain:
    108         rc.friendly_exit("email de destino inválido: domínio deve conter pelo menos um ponto.")
    109     if domain.lower() == alias_domain().lower():
    110         rc.friendly_exit(
    111             f"email de destino não pode ser @{alias_domain()} (evita encaminhamento em loop)."
    112         )
    113     return addr
    114 
    115 
    116 def alias_address(username: str) -> str:
    117     return f"{username}@{alias_domain()}"
    118 
    119 
    120 def current_username() -> str:
    121     import pwd
    122 
    123     try:
    124         name = pwd.getpwuid(os.getuid()).pw_name
    125     except (KeyError, OSError) as e:
    126         rc.friendly_exit(f"não foi possível determinar o utilizador: {e}")
    127     return validate_alias_username(name)
    128 
    129 
    130 def admin_operator() -> str:
    131     sudo_user = os.environ.get("SUDO_USER", "").strip()
    132     if sudo_user:
    133         return sudo_user
    134     import pwd
    135 
    136     try:
    137         return pwd.getpwuid(os.getuid()).pw_name
    138     except (KeyError, OSError):
    139         return "root"
    140 
    141 
    142 def require_root() -> None:
    143     geteuid = getattr(os, "geteuid", None)
    144     if geteuid is None or geteuid() != 0:
    145         rc.friendly_exit("este comando precisa ser executado como root.")
    146 
    147 
    148 def new_request_id(username: str) -> str:
    149     ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
    150     suffix = secrets.token_hex(3)
    151     return f"{ts}-{username}-{suffix}"
    152 
    153 
    154 def _read_json_file(path: Path) -> Any | None:
    155     if not path.is_file():
    156         return None
    157     try:
    158         raw = path.read_text(encoding="utf-8").strip()
    159         if not raw:
    160             return None
    161         return json.loads(raw)
    162     except PermissionError:
    163         geteuid = getattr(os, "geteuid", None)
    164         if geteuid is not None and geteuid() != 0:
    165             rc.friendly_exit(
    166                 "sem permissão para ler email-aliases.json.\n"
    167                 "Peça ao admin para executar:\n"
    168                 "  sudo python3 scripts/admin/setup_email_aliases.py"
    169             )
    170         return None
    171     except (OSError, json.JSONDecodeError):
    172         return None
    173 
    174 
    175 def restore_aliases_json_permissions() -> None:
    176     """Mantém email-aliases.json legível pelo grupo runv-members após escrita root."""
    177     aliases_path, lock_path = aliases_paths()
    178     group = os.environ.get("RUNV_MEMBERS_GROUP", "").strip() or DEFAULT_MEMBERS_GROUP
    179     try:
    180         import grp
    181 
    182         gid = grp.getgrnam(group).gr_gid
    183     except KeyError:
    184         return
    185     for path, mode in ((aliases_path, ALIASES_JSON_MODE), (lock_path, ALIASES_LOCK_MODE)):
    186         if not path.exists():
    187             continue
    188         try:
    189             os.chown(path, 0, gid)
    190             os.chmod(path, mode)
    191         except OSError:
    192             pass
    193 
    194 
    195 def load_aliases_unlocked(aliases_path: Path) -> dict[str, dict[str, Any]]:
    196     parsed = _read_json_file(aliases_path)
    197     if parsed is None:
    198         return {}
    199     if not isinstance(parsed, dict):
    200         rc.friendly_exit(f"formato inválido em {aliases_path}: esperado objecto JSON.")
    201     out: dict[str, dict[str, Any]] = {}
    202     for key, val in parsed.items():
    203         if isinstance(key, str) and isinstance(val, dict):
    204             out[key] = val
    205     return out
    206 
    207 
    208 def load_aliases() -> dict[str, dict[str, Any]]:
    209     aliases_path, _ = aliases_paths()
    210     return load_aliases_unlocked(aliases_path)
    211 
    212 
    213 def save_aliases(data: dict[str, dict[str, Any]]) -> None:
    214     import fcntl
    215 
    216     aliases_path, lock_path = aliases_paths()
    217     aliases_path.parent.mkdir(parents=True, exist_ok=True)
    218     lock_path.parent.mkdir(parents=True, exist_ok=True)
    219 
    220     lock_f = open(lock_path, "a+", encoding="utf-8")
    221     try:
    222         fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX)
    223         tmp_fd, tmp_name = tempfile.mkstemp(
    224             prefix="email-aliases.",
    225             suffix=".tmp",
    226             dir=str(aliases_path.parent),
    227         )
    228         tmp_path = Path(tmp_name)
    229         try:
    230             with os.fdopen(tmp_fd, "w", encoding="utf-8") as out:
    231                 json.dump(data, out, indent=2, ensure_ascii=False)
    232                 out.write("\n")
    233                 out.flush()
    234                 os.fsync(out.fileno())
    235             os.replace(tmp_path, aliases_path)
    236             restore_aliases_json_permissions()
    237         except Exception:
    238             tmp_path.unlink(missing_ok=True)
    239             raise
    240     finally:
    241         fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN)
    242         lock_f.close()
    243 
    244 
    245 def get_active_alias(username: str) -> dict[str, Any] | None:
    246     entry = load_aliases().get(username)
    247     if not entry or entry.get("status") != "active":
    248         return None
    249     return entry
    250 
    251 
    252 def _queue_request_file(request_id: str) -> Path:
    253     return queue_paths()["queue"] / f"{request_id}.json"
    254 
    255 
    256 def _parse_request(path: Path) -> dict[str, Any] | None:
    257     data = _read_json_file(path)
    258     if not isinstance(data, dict):
    259         return None
    260     return data
    261 
    262 
    263 def iter_pending_requests() -> list[dict[str, Any]]:
    264     qdir = queue_paths()["queue"]
    265     if not qdir.is_dir():
    266         return []
    267     pending: list[dict[str, Any]] = []
    268     for path in sorted(qdir.glob("*.json")):
    269         if not path.is_file():
    270             continue
    271         req = _parse_request(path)
    272         if req and req.get("status") == "pending":
    273             pending.append({**req, "_path": str(path)})
    274     pending.sort(key=lambda r: str(r.get("created_at", "")))
    275     return pending
    276 
    277 
    278 def find_pending_for_user(username: str) -> dict[str, Any] | None:
    279     for req in iter_pending_requests():
    280         if req.get("username") == username:
    281             return req
    282     return None
    283 
    284 
    285 def find_latest_pending_for_user(username: str) -> tuple[dict[str, Any], Path] | None:
    286     matches: list[tuple[dict[str, Any], Path]] = []
    287     qdir = queue_paths()["queue"]
    288     if not qdir.is_dir():
    289         return None
    290     for path in qdir.glob("*.json"):
    291         if not path.is_file():
    292             continue
    293         req = _parse_request(path)
    294         if req and req.get("username") == username and req.get("status") == "pending":
    295             matches.append((req, path))
    296     if not matches:
    297         return None
    298 
    299     def sort_key(item: tuple[dict[str, Any], Path]) -> str:
    300         req, path = item
    301         created = req.get("created_at")
    302         if isinstance(created, str) and created:
    303             return created
    304         try:
    305             return datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc).strftime(
    306                 "%Y-%m-%dT%H:%M:%SZ"
    307             )
    308         except OSError:
    309             return ""
    310 
    311     matches.sort(key=sort_key)
    312     req, path = matches[-1]
    313     return req, path
    314 
    315 
    316 def create_pending_request(username: str, destination: str) -> dict[str, Any]:
    317     paths = queue_paths()
    318     try:
    319         paths["queue"].mkdir(parents=True, exist_ok=True)
    320     except OSError as e:
    321         rc.friendly_exit(f"não foi possível criar o diretório da fila: {e}")
    322 
    323     if find_pending_for_user(username) is not None:
    324         rc.friendly_exit(
    325             "Já existe pedido pendente.\n"
    326             "Veja o status com:\n"
    327             "  runv-email-alias status\n\n"
    328             "Para cancelar:\n"
    329             "  runv-email-alias cancel"
    330         )
    331 
    332     request_id = new_request_id(username)
    333     alias = alias_address(username)
    334     payload: dict[str, Any] = {
    335         "request_id": request_id,
    336         "username": username,
    337         "alias": alias,
    338         "destination": destination,
    339         "status": "pending",
    340         "created_at": iso_utc_now(),
    341     }
    342     dest_path = _queue_request_file(request_id)
    343     try:
    344         fd = os.open(
    345             dest_path,
    346             os.O_WRONLY | os.O_CREAT | os.O_EXCL,
    347             0o664,
    348         )
    349     except FileExistsError:
    350         rc.friendly_exit("conflito ao criar pedido; tente novamente.")
    351     except PermissionError:
    352         rc.friendly_exit(
    353             f"sem permissão para criar pedido em {paths['queue']}\n"
    354             "peça ao admin para executar:\n"
    355             "  sudo python3 scripts/admin/setup_email_aliases.py"
    356         )
    357     except OSError as e:
    358         rc.friendly_exit(f"erro ao criar pedido: {e}")
    359 
    360     try:
    361         with os.fdopen(fd, "w", encoding="utf-8") as out:
    362             json.dump(payload, out, indent=2, ensure_ascii=False)
    363             out.write("\n")
    364     except OSError as e:
    365         dest_path.unlink(missing_ok=True)
    366         rc.friendly_exit(f"erro ao gravar pedido: {e}")
    367     return payload
    368 
    369 
    370 def archive_request(
    371     path: Path,
    372     payload: dict[str, Any],
    373     kind: ArchiveKind,
    374 ) -> Path:
    375     paths = queue_paths()
    376     dest_dir = paths[kind]
    377     dest_dir.mkdir(parents=True, exist_ok=True)
    378     request_id = payload.get("request_id")
    379     if not isinstance(request_id, str) or not request_id:
    380         request_id = path.stem
    381     dest = dest_dir / f"{request_id}.json"
    382     if dest.exists():
    383         dest = dest_dir / f"{request_id}-{secrets.token_hex(2)}.json"
    384     try:
    385         path.replace(dest)
    386     except OSError:
    387         dest.write_text(
    388             json.dumps(payload, indent=2, ensure_ascii=False) + "\n",
    389             encoding="utf-8",
    390         )
    391         path.unlink(missing_ok=True)
    392     return dest
    393 
    394 
    395 def cancel_latest_pending(username: str) -> dict[str, Any] | None:
    396     found = find_latest_pending_for_user(username)
    397     if found is None:
    398         return None
    399     req, path = found
    400     req["status"] = "cancelled"
    401     req["cancelled_at"] = iso_utc_now()
    402     archive_request(path, req, "cancelled")
    403     return req
    404 
    405 
    406 def approve_pending(username: str, operator: str) -> dict[str, Any]:
    407     validate_alias_username(username)
    408     found = find_latest_pending_for_user(username)
    409     if found is None:
    410         rc.friendly_exit(f"nenhum pedido pendente para o utilizador {username!r}.")
    411     req, path = found
    412     destination = req.get("destination")
    413     if not isinstance(destination, str):
    414         rc.friendly_exit("pedido pendente sem email de destino válido.")
    415     destination = validate_destination_email(destination)
    416 
    417     aliases_path, _ = aliases_paths()
    418     import fcntl
    419 
    420     aliases_path.parent.mkdir(parents=True, exist_ok=True)
    421     lock_path = aliases_paths()[1]
    422     lock_path.parent.mkdir(parents=True, exist_ok=True)
    423 
    424     lock_f = open(lock_path, "a+", encoding="utf-8")
    425     try:
    426         fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX)
    427         data = load_aliases_unlocked(aliases_path)
    428         now = iso_utc_now()
    429         alias = alias_address(username)
    430         existing = data.get(username)
    431         created_at = now
    432         if isinstance(existing, dict) and isinstance(existing.get("created_at"), str):
    433             created_at = existing["created_at"]
    434         data[username] = {
    435             "username": username,
    436             "alias": alias,
    437             "destination": destination,
    438             "status": "active",
    439             "created_at": created_at,
    440             "updated_at": now,
    441             "approved_by": operator,
    442         }
    443         tmp_fd, tmp_name = tempfile.mkstemp(
    444             prefix="email-aliases.",
    445             suffix=".tmp",
    446             dir=str(aliases_path.parent),
    447         )
    448         tmp_path = Path(tmp_name)
    449         try:
    450             with os.fdopen(tmp_fd, "w", encoding="utf-8") as out:
    451                 json.dump(data, out, indent=2, ensure_ascii=False)
    452                 out.write("\n")
    453                 out.flush()
    454                 os.fsync(out.fileno())
    455             os.replace(tmp_path, aliases_path)
    456             restore_aliases_json_permissions()
    457         except Exception:
    458             tmp_path.unlink(missing_ok=True)
    459             raise
    460     finally:
    461         fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN)
    462         lock_f.close()
    463 
    464     req["status"] = "approved"
    465     req["approved_at"] = iso_utc_now()
    466     req["approved_by"] = operator
    467     archive_request(path, req, "approved")
    468     return data[username]
    469 
    470 
    471 def reject_pending(username: str, operator: str, reason: str) -> dict[str, Any]:
    472     validate_alias_username(username)
    473     found = find_latest_pending_for_user(username)
    474     if found is None:
    475         rc.friendly_exit(f"nenhum pedido pendente para o utilizador {username!r}.")
    476     req, path = found
    477     req["status"] = "rejected"
    478     req["rejected_at"] = iso_utc_now()
    479     req["rejected_by"] = operator
    480     req["reason"] = reason
    481     archive_request(path, req, "rejected")
    482     return req
    483 
    484 
    485 def list_active_aliases() -> list[tuple[str, str, str]]:
    486     data = load_aliases()
    487     rows: list[tuple[str, str, str]] = []
    488     for username in sorted(data.keys(), key=str.lower):
    489         entry = data[username]
    490         if entry.get("status") != "active":
    491             continue
    492         alias = entry.get("alias")
    493         dest = entry.get("destination")
    494         if isinstance(alias, str) and isinstance(dest, str):
    495             rows.append((username, alias, dest))
    496     return rows