runv-server

server tooling for runv.club
Log | Files | Refs | README

runv_community.py (6760B)


      1 #!/usr/bin/env python3
      2 """
      3 Utilitários partilhados pelos comandos comunitários runv.club (stdlib apenas).
      4 """
      5 
      6 from __future__ import annotations
      7 
      8 import json
      9 import os
     10 import re
     11 import sys
     12 from datetime import datetime, timezone
     13 from pathlib import Path
     14 from typing import Any, Final
     15 
     16 USERNAME_RE: Final[re.Pattern[str]] = re.compile(r"^[a-z][a-z0-9_-]{1,31}$")
     17 MAX_READ_BYTES: Final[int] = 16 * 1024
     18 DEFAULT_USERS_JSON: Final[Path] = Path("/var/lib/runv/users.json")
     19 DEFAULT_HOME_ROOT: Final[Path] = Path("/home")
     20 INSTALLED_LIB_DIR: Final[Path] = Path("/usr/local/share/runv/lib")
     21 
     22 PROFILE_DEFAULT: Final[dict[str, Any]] = {
     23     "display_name": "",
     24     "bio": "",
     25     "location": "",
     26     "links": [],
     27     "interests": [],
     28 }
     29 
     30 
     31 def install_bootstrap() -> None:
     32     """Permite importar este módulo a partir de tools/bin/ ou de /usr/local/bin/."""
     33     here = Path(__file__).resolve().parent
     34     candidates: list[Path] = [INSTALLED_LIB_DIR, here]
     35     try:
     36         script = Path(sys.argv[0]).resolve()
     37         if script.parent.name == "bin":
     38             candidates.insert(0, script.parent.parent / "lib")
     39     except (IndexError, OSError):
     40         pass
     41     for candidate in candidates:
     42         if (candidate / "runv_community.py").is_file():
     43             s = str(candidate)
     44             if s not in sys.path:
     45                 sys.path.insert(0, s)
     46             return
     47 
     48 
     49 def friendly_exit(msg: str, code: int = 1) -> None:
     50     print(msg, file=sys.stderr)
     51     raise SystemExit(code)
     52 
     53 
     54 def validate_username(username: str) -> str:
     55     u = username.strip()
     56     if not USERNAME_RE.fullmatch(u):
     57         friendly_exit(
     58             "nome de utilizador inválido: use letras minúsculas, dígitos, _ e -; "
     59             "comece com letra; entre 2 e 32 caracteres."
     60         )
     61     return u
     62 
     63 
     64 def path_is_file(path: Path) -> bool:
     65     try:
     66         return path.is_file()
     67     except OSError:
     68         return False
     69 
     70 
     71 def read_text_limited(path: Path, *, max_bytes: int = MAX_READ_BYTES) -> str | None:
     72     if not path_is_file(path):
     73         return None
     74     try:
     75         with path.open("rb") as f:
     76             data = f.read(max_bytes + 1)
     77         if len(data) > max_bytes:
     78             data = data[:max_bytes]
     79         return data.decode("utf-8", errors="replace")
     80     except OSError:
     81         return None
     82 
     83 
     84 def file_has_content(path: Path) -> bool:
     85     text = read_text_limited(path)
     86     return text is not None and bool(text.strip())
     87 
     88 
     89 def parse_profile_json(text: str | None) -> tuple[dict[str, Any] | None, str | None]:
     90     if text is None:
     91         return None, None
     92     raw = text.strip()
     93     if not raw:
     94         return None, None
     95     try:
     96         data = json.loads(raw)
     97     except json.JSONDecodeError as e:
     98         return None, f"aviso: profile.json inválido ({e.msg})."
     99     if not isinstance(data, dict):
    100         return None, "aviso: profile.json deve ser um objeto JSON."
    101     safe: dict[str, Any] = {}
    102     for key in ("display_name", "bio", "location"):
    103         val = data.get(key, "")
    104         safe[key] = val if isinstance(val, str) else ""
    105     links = data.get("links", [])
    106     safe["links"] = [x for x in links if isinstance(x, str)] if isinstance(links, list) else []
    107     interests = data.get("interests", [])
    108     safe["interests"] = (
    109         [x for x in interests if isinstance(x, str)] if isinstance(interests, list) else []
    110     )
    111     return safe, None
    112 
    113 
    114 def home_paths(username: str) -> dict[str, Path]:
    115     base = DEFAULT_HOME_ROOT / username
    116     return {
    117         "home": base,
    118         "profile": base / ".runv" / "profile.json",
    119         "plan": base / ".plan",
    120         "project": base / ".project",
    121         "public_index": base / "public_html" / "index.html",
    122     }
    123 
    124 
    125 def homepage_mtime_iso(path: Path) -> str | None:
    126     try:
    127         st = path.stat()
    128         ts = datetime.fromtimestamp(st.st_mtime, tz=timezone.utc)
    129         return ts.isoformat().replace("+00:00", "Z")
    130     except OSError:
    131         return None
    132 
    133 
    134 def format_mtime_local(iso_or_path: str | Path | None) -> str | None:
    135     if iso_or_path is None:
    136         return None
    137     if isinstance(iso_or_path, Path):
    138         iso = homepage_mtime_iso(iso_or_path)
    139         if iso is None:
    140             return None
    141     else:
    142         iso = iso_or_path
    143     try:
    144         if iso.endswith("Z"):
    145             dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
    146         else:
    147             dt = datetime.fromisoformat(iso)
    148         if dt.tzinfo is None:
    149             dt = dt.replace(tzinfo=timezone.utc)
    150         local = dt.astimezone()
    151         return local.strftime("%Y-%m-%d %H:%M")
    152     except (ValueError, TypeError):
    153         return None
    154 
    155 
    156 def _username_from_item(item: Any) -> str | None:
    157     if isinstance(item, str) and USERNAME_RE.fullmatch(item):
    158         return item
    159     if isinstance(item, dict):
    160         uname = item.get("username")
    161         if isinstance(uname, str) and USERNAME_RE.fullmatch(uname):
    162             return uname
    163     return None
    164 
    165 
    166 def _usernames_from_parsed(data: Any) -> list[str]:
    167     found: list[str] = []
    168     seen: set[str] = set()
    169     if isinstance(data, list):
    170         items = data
    171     elif isinstance(data, dict):
    172         if isinstance(data.get("users"), list):
    173             items = data["users"]
    174         else:
    175             items = list(data.keys())
    176     else:
    177         return []
    178     for item in items:
    179         uname = _username_from_item(item)
    180         if uname and uname not in seen:
    181             seen.add(uname)
    182             found.append(uname)
    183     return found
    184 
    185 
    186 def load_member_usernames(
    187     users_json_path: Path,
    188     home_root: Path,
    189 ) -> tuple[list[str], str | None]:
    190     warning: str | None = None
    191     if users_json_path.is_file():
    192         try:
    193             raw = users_json_path.read_text(encoding="utf-8").strip()
    194             if raw:
    195                 parsed = json.loads(raw)
    196                 names = _usernames_from_parsed(parsed)
    197                 if names:
    198                     return sorted(names, key=str.lower), None
    199         except (OSError, json.JSONDecodeError):
    200             warning = (
    201                 f"aviso: não foi possível ler {users_json_path}; "
    202                 "a listar diretórios em /home."
    203             )
    204     names_home: list[str] = []
    205     if home_root.is_dir():
    206         try:
    207             entries = home_root.iterdir()
    208         except OSError:
    209             entries = ()
    210         for entry in entries:
    211             if not entry.is_dir() or not USERNAME_RE.fullmatch(entry.name):
    212                 continue
    213             try:
    214                 if not os.access(entry, os.R_OK | os.X_OK):
    215                     continue
    216             except OSError:
    217                 continue
    218             names_home.append(entry.name)
    219     return sorted(set(names_home), key=str.lower), warning
    220 
    221 
    222 def profile_json_default_text() -> str:
    223     return json.dumps(PROFILE_DEFAULT, ensure_ascii=False, indent=2) + "\n"