update_user.py (28527B)
1 #!/usr/bin/env python3 2 """ 3 Atualiza utilizador Unix existente no runv.club: email do utilizador (users.json), chave SSH, 4 palavra-passe de login (chpasswd) e quotas ext4 (setquota). 5 6 Executar como root. Alinha-se a create_runv_user / del-user / runv_mount. 7 8 Modo interativo no terminal (sem argumentos ou -i) ou flags CLI. 9 10 Após gravar ``users.json``, pode sincronizar a landing pública com 11 ``site/genlanding.py --sync-public-only`` (como ``create_runv_user`` / ``del-user``). 12 13 Versão 0.03 — runv.club 14 """ 15 16 from __future__ import annotations 17 18 import argparse 19 import fcntl 20 import getpass 21 import json 22 import logging 23 import os 24 import pwd 25 import re 26 import shutil 27 import subprocess 28 import sys 29 import tempfile 30 from datetime import datetime, timezone 31 from pathlib import Path 32 from collections.abc import Callable 33 from typing import Any, Final 34 35 _SCRIPT_DIR = Path(__file__).resolve().parent 36 if str(_SCRIPT_DIR) not in sys.path: 37 sys.path.insert(0, str(_SCRIPT_DIR)) 38 39 from admin_guard import ensure_admin_cli 40 from runv_landing_sync import try_sync_landing_via_genlanding 41 42 USERNAME_PATTERN: Final[re.Pattern[str]] = re.compile(r"^[a-z][a-z0-9_-]{1,31}$") 43 EMAIL_PATTERN: Final[re.Pattern[str]] = re.compile( 44 r"^[a-zA-Z0-9.!#$%&'*+/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?" 45 r"(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+$" 46 ) 47 ALLOWED_KEY_TYPES: Final[tuple[str, ...]] = ( 48 "ssh-ed25519", 49 "sk-ssh-ed25519@openssh.com", 50 "ecdsa-sha2-nistp256", 51 "ecdsa-sha2-nistp384", 52 "ecdsa-sha2-nistp521", 53 "ssh-rsa", 54 ) 55 FINGERPRINT_SHA256_RE: Final[re.Pattern[str]] = re.compile(r"\b(SHA256:[+A-Za-z0-9/_=-]+)\b") 56 57 DEFAULT_METADATA_PATH: Final[Path] = Path("/var/lib/runv/users.json") 58 DEFAULT_LOCK_PATH: Final[Path] = Path("/var/lib/runv/users.lock") 59 60 DEFAULT_QUOTA_SOFT_MIB: Final[int] = 450 61 DEFAULT_QUOTA_HARD_MIB: Final[int] = 500 62 DEFAULT_QUOTA_INODE_SOFT: Final[int] = 10_000 63 DEFAULT_QUOTA_INODE_HARD: Final[int] = 12_000 64 65 VERSION: Final[str] = "0.03" 66 EXIT_OK: Final[int] = 0 67 EXIT_VALIDATION: Final[int] = 1 68 EXIT_SYSTEM: Final[int] = 2 69 70 MIN_UID_NORMAL_USER: Final[int] = 1000 71 72 73 def setup_update_user_log() -> logging.Logger: 74 log = logging.getLogger("runv.update_user") 75 log.setLevel(logging.INFO) 76 log.propagate = False 77 if not log.handlers: 78 h = logging.StreamHandler(sys.stderr) 79 h.setFormatter(logging.Formatter("%(levelname)s: %(message)s")) 80 log.addHandler(h) 81 return log 82 83 84 def maybe_sync_landing_after_metadata( 85 *, 86 skip_metadata: bool, 87 no_refresh_landing_members: bool, 88 landing_document_root: Path | None, 89 metadata_file: Path, 90 members_homes_root: Path | None, 91 dry_run: bool, 92 log: logging.Logger, 93 ) -> None: 94 if dry_run or skip_metadata or no_refresh_landing_members or landing_document_root is None: 95 return 96 root = landing_document_root.resolve() 97 if not root.is_dir(): 98 log.warning("DocumentRoot da landing inexistente (%s); sync omitido", root) 99 return 100 log.info("sincronizar landing (public + members) em %s", root) 101 try_sync_landing_via_genlanding( 102 document_root=root, 103 users_json=metadata_file, 104 homes_root=members_homes_root.resolve() if members_homes_root else None, 105 log=log, 106 ) 107 108 109 def eprint(msg: str) -> None: 110 print(msg, file=sys.stderr) 111 112 113 def require_root(*, dry_run: bool) -> None: 114 if not dry_run and os.geteuid() != 0: 115 eprint("Erro: execute como root (sudo).") 116 raise SystemExit(EXIT_VALIDATION) 117 118 119 def validate_username_syntax(username: str) -> str: 120 if not username or not username.strip(): 121 eprint("Erro: username é obrigatório.") 122 raise SystemExit(EXIT_VALIDATION) 123 u = username.strip() 124 if not USERNAME_PATTERN.fullmatch(u): 125 eprint( 126 "Erro: username inválido (letras minúsculas, dígitos, _ e -; 2–32 chars, começa com letra)." 127 ) 128 raise SystemExit(EXIT_VALIDATION) 129 return u 130 131 132 def validate_email(email: str) -> str: 133 e = email.strip() 134 if not EMAIL_PATTERN.fullmatch(e): 135 raise ValueError("formato de email inválido") 136 return e 137 138 139 def check_user_exists(username: str) -> tuple[int, int, Path]: 140 try: 141 pw = pwd.getpwnam(username) 142 except KeyError: 143 eprint(f"Erro: utilizador {username!r} não existe no sistema.") 144 raise SystemExit(EXIT_VALIDATION) 145 if pw.pw_uid < MIN_UID_NORMAL_USER: 146 eprint(f"Erro: UID {pw.pw_uid} < {MIN_UID_NORMAL_USER} (conta de sistema).") 147 raise SystemExit(EXIT_VALIDATION) 148 return pw.pw_uid, pw.pw_gid, Path(pw.pw_dir) 149 150 151 def normalize_public_key(raw: str) -> str: 152 if "\n" in raw or "\r" in raw: 153 raise ValueError("chave deve ser uma única linha") 154 line = raw.strip() 155 if not line: 156 raise ValueError("chave vazia") 157 parts = line.split() 158 if len(parts) < 2: 159 raise ValueError("chave malformada") 160 if parts[0] not in ALLOWED_KEY_TYPES: 161 raise ValueError(f"tipo de chave não permitido: {parts[0]!r}") 162 blob = parts[1] 163 if not re.fullmatch(r"[A-Za-z0-9+/]+=*", blob): 164 raise ValueError("dados base64 inválidos") 165 out = parts[0] + " " + blob 166 if len(parts) > 2: 167 out += " " + " ".join(parts[2:]) 168 return out 169 170 171 def compute_public_key_fingerprint(public_key_line: str) -> str: 172 line = normalize_public_key(public_key_line) 173 fd, tmppath = tempfile.mkstemp(prefix="runv-upd-key-", suffix=".pub") 174 path = Path(tmppath) 175 try: 176 with os.fdopen(fd, "w", encoding="utf-8") as f: 177 f.write(line + "\n") 178 proc = subprocess.run( 179 ["ssh-keygen", "-l", "-E", "sha256", "-f", str(path)], 180 capture_output=True, 181 text=True, 182 timeout=30, 183 ) 184 if proc.returncode != 0: 185 err = (proc.stderr or proc.stdout or "").strip() 186 raise ValueError(f"ssh-keygen: {err}") 187 first = (proc.stdout or "").strip().splitlines()[0] 188 m = FINGERPRINT_SHA256_RE.search(first) 189 if not m: 190 raise ValueError(f"fingerprint não encontrado: {first!r}") 191 return m.group(1) 192 finally: 193 path.unlink(missing_ok=True) 194 195 196 def mib_to_setquota_kib(mib: int) -> int: 197 if mib < 0: 198 raise ValueError("MiB negativo") 199 return mib * 1024 200 201 202 def quota_probe_path(home: Path) -> Path: 203 p = home.resolve() 204 if p.is_dir(): 205 return p 206 return p.parent if p.parent != p else Path("/").resolve() 207 208 209 def apply_setquota( 210 username: str, 211 home: Path, 212 soft_mib: int, 213 hard_mib: int, 214 inode_soft: int, 215 inode_hard: int, 216 *, 217 dry_run: bool, 218 ) -> tuple[str, str]: 219 from runv_mount import MountLookupError, find_mount_triple, quota_opts_allow_user 220 221 if soft_mib > hard_mib or inode_soft > inode_hard: 222 raise ValueError("soft não pode exceder hard (blocos ou inodes)") 223 probe = quota_probe_path(home) 224 try: 225 target, fstype, opts = find_mount_triple(probe) 226 except MountLookupError as e: 227 raise RuntimeError(str(e)) from e 228 if fstype != "ext4" or not quota_opts_allow_user(opts): 229 raise RuntimeError(f"sem ext4+usrquota em {target!r}") 230 if not shutil.which("setquota"): 231 raise RuntimeError("comando setquota não encontrado (apt install quota)") 232 bs = mib_to_setquota_kib(soft_mib) 233 bh = mib_to_setquota_kib(hard_mib) 234 cmd = ["setquota", "-u", username, str(bs), str(bh), str(inode_soft), str(inode_hard), target] 235 if dry_run: 236 print(f" [dry-run] {' '.join(cmd)}") 237 return target, fstype 238 r = subprocess.run(cmd, capture_output=True, text=True, timeout=120) 239 if r.returncode != 0: 240 err = (r.stderr or r.stdout or "").strip() 241 raise RuntimeError(f"setquota falhou: {err}") 242 return target, fstype 243 244 245 def write_authorized_keys_replace( 246 home: Path, 247 uid: int, 248 gid: int, 249 public_key_line: str, 250 *, 251 dry_run: bool, 252 ) -> None: 253 line = normalize_public_key(public_key_line) 254 ssh_dir = home / ".ssh" 255 auth = ssh_dir / "authorized_keys" 256 if dry_run: 257 print(f" [dry-run] escreveria {auth} com uma linha") 258 return 259 ssh_dir.mkdir(parents=True, exist_ok=True) 260 os.chmod(ssh_dir, 0o700) 261 os.chown(ssh_dir, uid, gid) 262 auth.write_text(line + "\n", encoding="utf-8") 263 os.chmod(auth, 0o600) 264 os.chown(auth, uid, gid) 265 266 267 def write_authorized_keys_append( 268 home: Path, 269 uid: int, 270 gid: int, 271 public_key_line: str, 272 *, 273 dry_run: bool, 274 ) -> None: 275 line = normalize_public_key(public_key_line) 276 ssh_dir = home / ".ssh" 277 auth = ssh_dir / "authorized_keys" 278 if dry_run: 279 print(f" [dry-run] acrescentaria chave em {auth}") 280 return 281 ssh_dir.mkdir(parents=True, exist_ok=True) 282 os.chmod(ssh_dir, 0o700) 283 os.chown(ssh_dir, uid, gid) 284 if auth.exists(): 285 existing = auth.read_text(encoding="utf-8") 286 if line in existing.splitlines(): 287 print(" [info] authorized_keys já continha esta chave.") 288 else: 289 with open(auth, "a", encoding="utf-8") as f: 290 f.write(line + "\n") 291 else: 292 auth.write_text(line + "\n", encoding="utf-8") 293 os.chmod(auth, 0o600) 294 os.chown(auth, uid, gid) 295 296 297 def set_password_chpasswd(username: str, password: str, *, dry_run: bool) -> None: 298 if dry_run: 299 print(f" [dry-run] chpasswd para {username!r}") 300 return 301 r = subprocess.run( 302 ["chpasswd"], 303 input=f"{username}:{password}\n", 304 text=True, 305 capture_output=True, 306 timeout=60, 307 ) 308 if r.returncode != 0: 309 err = (r.stderr or r.stdout or "").strip() 310 raise RuntimeError(f"chpasswd falhou: {err}") 311 312 313 def mutate_metadata( 314 metadata_path: Path, 315 lock_path: Path, 316 *, 317 dry_run: bool, 318 mutator: Callable[[list[dict[str, Any]]], bool], 319 ) -> bool: 320 """ 321 Lê lista JSON sob flock, chama mutator(data) -> True se deve gravar. 322 Gravação atómica na mesma secção crítica. 323 """ 324 metadata_path.parent.mkdir(parents=True, exist_ok=True) 325 lock_path.parent.mkdir(parents=True, exist_ok=True) 326 lock_f = open(lock_path, "a+", encoding="utf-8") 327 try: 328 fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX) 329 if not metadata_path.is_file(): 330 data: list[dict[str, Any]] = [] 331 else: 332 raw = metadata_path.read_text(encoding="utf-8").strip() 333 if not raw: 334 data = [] 335 else: 336 parsed = json.loads(raw) 337 if not isinstance(parsed, list): 338 raise ValueError("users.json: esperada lista JSON") 339 data = parsed 340 if not mutator(data): 341 return False 342 if dry_run: 343 print(f" [dry-run] gravaria {len(data)} entradas em {metadata_path}") 344 return True 345 tmp_fd, tmp_name = tempfile.mkstemp( 346 prefix="users.", 347 suffix=".tmp", 348 dir=str(metadata_path.parent), 349 ) 350 tmp_path = Path(tmp_name) 351 try: 352 with os.fdopen(tmp_fd, "w", encoding="utf-8") as out: 353 json.dump(data, out, indent=2, ensure_ascii=False) 354 out.flush() 355 os.fsync(out.fileno()) 356 os.replace(tmp_path, metadata_path) 357 except Exception: 358 tmp_path.unlink(missing_ok=True) 359 raise 360 return True 361 finally: 362 fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN) 363 lock_f.close() 364 365 366 def find_metadata_index(data: list[dict[str, Any]], username: str) -> int | None: 367 for i, row in enumerate(data): 368 if isinstance(row, dict) and row.get("username") == username: 369 return i 370 return None 371 372 373 def update_metadata_email( 374 metadata_path: Path, 375 lock_path: Path, 376 username: str, 377 email: str, 378 *, 379 dry_run: bool, 380 ) -> bool: 381 def m(data: list[dict[str, Any]]) -> bool: 382 idx = find_metadata_index(data, username) 383 if idx is None: 384 eprint( 385 f"Aviso: sem entrada em {metadata_path} para {username!r}; email não gravado em JSON." 386 ) 387 return False 388 data[idx]["email"] = email 389 return True 390 391 ok = mutate_metadata(metadata_path, lock_path, dry_run=dry_run, mutator=m) 392 if ok: 393 print(f" [ok] email em metadados atualizado para {email!r}") 394 return ok 395 396 397 def update_metadata_after_key( 398 metadata_path: Path, 399 lock_path: Path, 400 username: str, 401 fingerprint: str, 402 *, 403 dry_run: bool, 404 ) -> bool: 405 def m(data: list[dict[str, Any]]) -> bool: 406 idx = find_metadata_index(data, username) 407 if idx is None: 408 eprint(f"Aviso: sem entrada em metadados para {username!r}; fingerprint não gravado.") 409 return False 410 data[idx]["public_key_fingerprint"] = fingerprint 411 return True 412 413 if mutate_metadata(metadata_path, lock_path, dry_run=dry_run, mutator=m): 414 print(f" [ok] fingerprint em metadados: {fingerprint}") 415 return True 416 return False 417 418 419 def update_metadata_after_quota( 420 metadata_path: Path, 421 lock_path: Path, 422 username: str, 423 soft_mib: int, 424 hard_mib: int, 425 inode_soft: int, 426 inode_hard: int, 427 mountpoint: str, 428 fstype: str, 429 *, 430 dry_run: bool, 431 ) -> None: 432 def m(data: list[dict[str, Any]]) -> bool: 433 idx = find_metadata_index(data, username) 434 if idx is None: 435 eprint( 436 f"Aviso: sem entrada em metadados para {username!r}; quotas não reflectidas no JSON." 437 ) 438 return False 439 now = datetime.now(timezone.utc).isoformat() 440 row = data[idx] 441 row["quota_enabled"] = True 442 row["quota_soft_mb"] = soft_mib 443 row["quota_hard_mb"] = hard_mib 444 row["quota_inode_soft"] = inode_soft 445 row["quota_inode_hard"] = inode_hard 446 row["quota_mountpoint"] = mountpoint 447 row["quota_filesystem"] = fstype 448 row["quota_applied_at"] = now 449 row["quota_status"] = "applied" 450 if row.get("status") == "partial_quota": 451 row["status"] = "active" 452 return True 453 454 if mutate_metadata(metadata_path, lock_path, dry_run=dry_run, mutator=m): 455 print(" [ok] campos de quota actualizados em metadados") 456 457 458 def prompt_line(msg: str, default: str | None = None) -> str: 459 if default is not None: 460 s = input(f"{msg} [{default}]: ").strip() 461 return s if s else default 462 return input(f"{msg}: ").strip() 463 464 465 def interactive_loop( 466 username: str, 467 uid: int, 468 gid: int, 469 home: Path, 470 metadata_path: Path, 471 lock_path: Path, 472 *, 473 dry_run: bool, 474 skip_metadata: bool, 475 ) -> None: 476 print() 477 print(f"Utilizador: {username} (uid={uid}, home={home})") 478 print("Escolha o que alterar (número). Repita até terminar.") 479 print(" 1) Email do utilizador (users.json)") 480 print(" 2) Substituir ~/.ssh/authorized_keys por UMA chave (política runv típica)") 481 print(" 3) Acrescentar chave a authorized_keys") 482 print(" 4) Definir palavra-passe de login (chpasswd) — o runv costuma usar só SSH por chave") 483 print(" 5) Aplicar quota (MiB soft/hard + inodes, como create_runv_user)") 484 print(" 0) Sair") 485 print() 486 while True: 487 choice = input("Opção [0]: ").strip() or "0" 488 if choice == "0": 489 break 490 if choice == "1": 491 if skip_metadata: 492 print(" [skip] --skip-metadata activo.") 493 continue 494 em = prompt_line("Novo email do utilizador") 495 if not em: 496 continue 497 try: 498 em = validate_email(em) 499 except ValueError as e: 500 eprint(f"Erro: {e}") 501 continue 502 update_metadata_email(metadata_path, lock_path, username, em, dry_run=dry_run) 503 elif choice == "2": 504 print("Cole UMA linha de chave pública OpenSSH (Enter para cancelar):") 505 line = input().strip() 506 if not line: 507 continue 508 try: 509 fp = compute_public_key_fingerprint(line) 510 write_authorized_keys_replace(home, uid, gid, line, dry_run=dry_run) 511 if not skip_metadata: 512 update_metadata_after_key( 513 metadata_path, lock_path, username, fp, dry_run=dry_run 514 ) 515 except ValueError as e: 516 eprint(f"Erro: {e}") 517 elif choice == "3": 518 print("Cole linha de chave a acrescentar:") 519 line = input().strip() 520 if not line: 521 continue 522 try: 523 write_authorized_keys_append(home, uid, gid, line, dry_run=dry_run) 524 print(" [ok] chave acrescentada (metadados: use opção 2 ou edite JSON se quiser fingerprint único)") 525 except ValueError as e: 526 eprint(f"Erro: {e}") 527 elif choice == "4": 528 if not sys.stdin.isatty(): 529 eprint("Palavra-passe: use terminal interactivo ou não use esta opção.") 530 continue 531 p1 = getpass.getpass("Nova palavra-passe: ") 532 p2 = getpass.getpass("Repita: ") 533 if p1 != p2: 534 eprint("As palavras-passe não coincidem.") 535 continue 536 if not p1: 537 eprint("Palavra-passe vazia recusada.") 538 continue 539 try: 540 set_password_chpasswd(username, p1, dry_run=dry_run) 541 print(" [ok] palavra-passe alterada (login shell / chpasswd)") 542 except RuntimeError as e: 543 eprint(str(e)) 544 elif choice == "5": 545 try: 546 sm = int(prompt_line("MiB soft", str(DEFAULT_QUOTA_SOFT_MIB))) 547 hm = int(prompt_line("MiB hard", str(DEFAULT_QUOTA_HARD_MIB))) 548 isoft = int(prompt_line("Inode soft", str(DEFAULT_QUOTA_INODE_SOFT))) 549 ihard = int(prompt_line("Inode hard", str(DEFAULT_QUOTA_INODE_HARD))) 550 except ValueError: 551 eprint("Números inválidos.") 552 continue 553 try: 554 mp, fs = apply_setquota( 555 username, home, sm, hm, isoft, ihard, dry_run=dry_run 556 ) 557 if not skip_metadata: 558 update_metadata_after_quota( 559 metadata_path, 560 lock_path, 561 username, 562 sm, 563 hm, 564 isoft, 565 ihard, 566 mp, 567 fs, 568 dry_run=dry_run, 569 ) 570 except (ValueError, RuntimeError) as e: 571 eprint(str(e)) 572 else: 573 print("Opção desconhecida.") 574 print() 575 576 577 def parse_args(argv: list[str] | None) -> argparse.Namespace: 578 p = argparse.ArgumentParser( 579 description="Atualiza utilizador runv: email (JSON), SSH, palavra-passe, quota.", 580 ) 581 p.add_argument("--username", "-u", metavar="USER", help="utilizador Unix existente") 582 p.add_argument( 583 "-i", 584 "--interactive", 585 action="store_true", 586 help="menu interactivo (também é o padrão se não houver flags de alteração)", 587 ) 588 p.add_argument("--email", metavar="ADDR", help="email do utilizador (users.json)") 589 p.add_argument( 590 "--replace-public-key", 591 metavar="LINE", 592 help="substitui authorized_keys por esta linha OpenSSH", 593 ) 594 p.add_argument( 595 "--append-public-key", 596 metavar="LINE", 597 help="acrescenta linha a authorized_keys", 598 ) 599 p.add_argument( 600 "--ssh-replace-file", 601 type=Path, 602 metavar="PATH", 603 help="ficheiro com uma linha OpenSSH (substitui authorized_keys)", 604 ) 605 p.add_argument( 606 "--ssh-append-file", 607 type=Path, 608 metavar="PATH", 609 help="ficheiro com uma linha OpenSSH (acrescenta a authorized_keys)", 610 ) 611 p.add_argument( 612 "--set-password", 613 action="store_true", 614 help="pede nova palavra-passe (getpass); requer TTY", 615 ) 616 p.add_argument("--quota-soft-mb", type=int, metavar="MiB", default=None) 617 p.add_argument("--quota-hard-mb", type=int, metavar="MiB", default=None) 618 p.add_argument("--quota-inode-soft", type=int, default=None) 619 p.add_argument("--quota-inode-hard", type=int, default=None) 620 p.add_argument("--dry-run", action="store_true") 621 p.add_argument( 622 "--skip-metadata", 623 action="store_true", 624 help="não lê nem grava users.json", 625 ) 626 p.add_argument("--metadata-file", type=Path, default=DEFAULT_METADATA_PATH) 627 p.add_argument("--lock-file", type=Path, default=DEFAULT_LOCK_PATH) 628 p.add_argument( 629 "--landing-document-root", 630 type=Path, 631 default=Path("/var/www/runv.club/html"), 632 help=( 633 "DocumentRoot da landing; após gravar users.json, executa genlanding --sync-public-only " 634 "(omitido com --skip-metadata ou --no-refresh-landing-members)" 635 ), 636 ) 637 p.add_argument( 638 "--no-refresh-landing-members", 639 action="store_true", 640 help="não copiar site/public nem regenerar data/members.json após alterar metadados", 641 ) 642 p.add_argument( 643 "--members-homes-root", 644 type=Path, 645 default=None, 646 metavar="DIR", 647 help="opcional: --members-homes-root para genlanding (ex. /home)", 648 ) 649 p.add_argument("--version", action="version", version=f"%(prog)s {VERSION} — runv.club") 650 return p.parse_args(argv) 651 652 653 def read_key_file(path: Path) -> str: 654 raw = path.read_text(encoding="utf-8").strip() 655 lines = [ln.strip() for ln in raw.splitlines() if ln.strip() and not ln.strip().startswith("#")] 656 if len(lines) != 1: 657 raise ValueError("ficheiro deve conter exactamente uma linha de chave (sem comentários)") 658 return lines[0] 659 660 661 def main(argv: list[str] | None = None) -> int: 662 args = parse_args(argv) 663 dry_run = args.dry_run 664 ensure_admin_cli( 665 script_name=Path(__file__).name, 666 dry_run=bool(dry_run), 667 ) 668 log = setup_update_user_log() 669 require_root(dry_run=dry_run) 670 671 has_quota_flag = any( 672 [ 673 args.quota_soft_mb is not None, 674 args.quota_hard_mb is not None, 675 args.quota_inode_soft is not None, 676 args.quota_inode_hard is not None, 677 ] 678 ) 679 has_cli_change = any( 680 [ 681 args.email, 682 args.replace_public_key, 683 args.append_public_key, 684 args.ssh_replace_file is not None, 685 args.ssh_append_file is not None, 686 args.set_password, 687 has_quota_flag, 688 ] 689 ) 690 691 if not args.username: 692 if not sys.stdin.isatty(): 693 eprint("Erro: indique --username ou execute em modo interactivo com TTY.") 694 return EXIT_VALIDATION 695 u = prompt_line("Username Unix a atualizar") 696 username = validate_username_syntax(u) 697 else: 698 username = validate_username_syntax(args.username) 699 700 uid, gid, home = check_user_exists(username) 701 702 if args.interactive or not has_cli_change: 703 if args.interactive and has_cli_change: 704 eprint("Aviso: com -i/--interactive o menu ignora outras flags de alteração nesta execução.") 705 if args.set_password and not sys.stdin.isatty(): 706 eprint("Erro: --set-password requer TTY.") 707 return EXIT_VALIDATION 708 print(f"== update_user.py v{VERSION} — runv.club ==") 709 interactive_loop( 710 username, 711 uid, 712 gid, 713 home, 714 args.metadata_file, 715 args.lock_file, 716 dry_run=dry_run, 717 skip_metadata=args.skip_metadata, 718 ) 719 maybe_sync_landing_after_metadata( 720 skip_metadata=args.skip_metadata, 721 no_refresh_landing_members=args.no_refresh_landing_members, 722 landing_document_root=args.landing_document_root, 723 metadata_file=args.metadata_file, 724 members_homes_root=args.members_homes_root, 725 dry_run=dry_run, 726 log=log, 727 ) 728 return EXIT_OK 729 730 pk_replace: str | None = args.replace_public_key 731 if args.ssh_replace_file is not None: 732 if pk_replace is not None: 733 eprint("Erro: use só uma de --replace-public-key ou --ssh-replace-file.") 734 return EXIT_VALIDATION 735 try: 736 pk_replace = read_key_file(args.ssh_replace_file) 737 except (OSError, ValueError) as e: 738 eprint(f"Erro: {e}") 739 return EXIT_VALIDATION 740 741 pk_append: str | None = args.append_public_key 742 if args.ssh_append_file is not None: 743 if pk_append is not None: 744 eprint("Erro: use só uma de --append-public-key ou --ssh-append-file.") 745 return EXIT_VALIDATION 746 try: 747 pk_append = read_key_file(args.ssh_append_file) 748 except (OSError, ValueError) as e: 749 eprint(f"Erro: {e}") 750 return EXIT_VALIDATION 751 752 if pk_replace is not None and pk_append is not None: 753 eprint("Erro: numa só execução use substituir chave OU acrescentar, não ambos.") 754 return EXIT_VALIDATION 755 756 try: 757 if args.email: 758 if args.skip_metadata: 759 eprint("Erro: --email requer metadados; não use --skip-metadata.") 760 return EXIT_VALIDATION 761 em = validate_email(args.email) 762 update_metadata_email( 763 args.metadata_file, args.lock_file, username, em, dry_run=dry_run 764 ) 765 766 if pk_replace: 767 fp = compute_public_key_fingerprint(pk_replace) 768 write_authorized_keys_replace(home, uid, gid, pk_replace, dry_run=dry_run) 769 if not args.skip_metadata: 770 update_metadata_after_key( 771 args.metadata_file, args.lock_file, username, fp, dry_run=dry_run 772 ) 773 774 if pk_append: 775 write_authorized_keys_append(home, uid, gid, pk_append, dry_run=dry_run) 776 777 if args.set_password: 778 if not sys.stdin.isatty(): 779 eprint("Erro: --set-password requer TTY (use modo interactivo).") 780 return EXIT_VALIDATION 781 p1 = getpass.getpass("Nova palavra-passe: ") 782 p2 = getpass.getpass("Repita: ") 783 if p1 != p2 or not p1: 784 eprint("Palavra-passe inválida ou não coincide.") 785 return EXIT_VALIDATION 786 set_password_chpasswd(username, p1, dry_run=dry_run) 787 print(" [ok] palavra-passe alterada") 788 789 if ( 790 args.quota_soft_mb is not None 791 or args.quota_hard_mb is not None 792 or args.quota_inode_soft is not None 793 or args.quota_inode_hard is not None 794 ): 795 sm = args.quota_soft_mb if args.quota_soft_mb is not None else DEFAULT_QUOTA_SOFT_MIB 796 hm = args.quota_hard_mb if args.quota_hard_mb is not None else DEFAULT_QUOTA_HARD_MIB 797 iso = ( 798 args.quota_inode_soft 799 if args.quota_inode_soft is not None 800 else DEFAULT_QUOTA_INODE_SOFT 801 ) 802 ihd = ( 803 args.quota_inode_hard 804 if args.quota_inode_hard is not None 805 else DEFAULT_QUOTA_INODE_HARD 806 ) 807 mp, fs = apply_setquota(username, home, sm, hm, iso, ihd, dry_run=dry_run) 808 if not args.skip_metadata: 809 update_metadata_after_quota( 810 args.metadata_file, 811 args.lock_file, 812 username, 813 sm, 814 hm, 815 iso, 816 ihd, 817 mp, 818 fs, 819 dry_run=dry_run, 820 ) 821 except ValueError as e: 822 eprint(f"Erro: {e}") 823 return EXIT_VALIDATION 824 except RuntimeError as e: 825 eprint(str(e)) 826 return EXIT_SYSTEM 827 828 maybe_sync_landing_after_metadata( 829 skip_metadata=args.skip_metadata, 830 no_refresh_landing_members=args.no_refresh_landing_members, 831 landing_document_root=args.landing_document_root, 832 metadata_file=args.metadata_file, 833 members_homes_root=args.members_homes_root, 834 dry_run=dry_run, 835 log=log, 836 ) 837 838 return EXIT_OK 839 840 841 if __name__ == "__main__": 842 raise SystemExit(main())