entre_app.py (17460B)
1 #!/usr/bin/env python3 2 """ 3 Experiência SSH guiada para pedidos de entrada na runv.club (utilizador «entre»). 4 5 Executado via ForceCommand no OpenSSH. Não cria contas Linux; apenas fila + log 6 + notificação opcional. 7 8 Versão 0.02 — runv.club 9 """ 10 11 from __future__ import annotations 12 13 import argparse 14 import os 15 import sys 16 from datetime import datetime, timezone 17 from pathlib import Path 18 19 # Arte ASCII da landing (site/public/index.html) — manter alinhado ao <pre class="ascii">. 20 RUNV_ASCII_ART: str = """██████╗ ██╗ ██╗███╗ ██╗██╗ ██╗ 21 ██╔══██╗██║ ██║████╗ ██║██║ ██║ 22 ██████╔╝██║ ██║██╔██╗ ██║██║ ██║ 23 ██╔══██╗╚██╗ ██╔╝██║╚██╗██║╚██╗ ██╔╝ 24 ██║ ██║ ╚████╔╝ ██║ ╚████║ ╚████╔╝ 25 ╚═╝ ╚═╝ ╚═══╝ ╚═╝ ╚═══╝ ╚═══╝""" 26 27 ASCII_TAGLINE: str = ".club — um computador para compartilhar" 28 29 # Em intro.txt: linha só com este marcador separa ecrãs da narrativa. 30 INTRO_PAGE_BREAK: str = "%%PAGE%%" 31 32 from entre_core import ( 33 APP_VERSION, 34 MAX_ONLINE_PRESENCE_LEN, 35 ValidationError, 36 build_request_payload, 37 find_config_path, 38 find_install_root, 39 load_config, 40 log_session, 41 new_request_id, 42 render_template, 43 resolve_entre_notify_recipients, 44 resolve_paths, 45 save_request_json, 46 sendmail_notify, 47 setup_file_logger, 48 ssh_remote_context, 49 validate_email, 50 validate_online_presence, 51 validate_public_key_line, 52 validate_username, 53 ) 54 55 56 def eprint(msg: str) -> None: 57 print(msg, file=sys.stderr) 58 59 60 def pause(stdin, stdout) -> None: 61 stdout.write("\n[Enter] continuar · [q] sair\n") 62 stdout.flush() 63 line = stdin.readline() 64 if not line: 65 raise SystemExit(0) 66 if line.strip().lower() in ("q", "quit", "sair"): 67 print("\nAté logo.\n") 68 raise SystemExit(0) 69 70 71 def read_line(prompt: str, stdin, stdout) -> str: 72 stdout.write(prompt) 73 stdout.flush() 74 line = stdin.readline() 75 if not line: 76 raise SystemExit(0) 77 return line.rstrip("\r\n") 78 79 80 def write_data_step_header(stdout, step: int, total: int, title: str) -> None: 81 """Cabeçalho visível antes de cada campo do formulário.""" 82 clear_screen(stdout) 83 g = "\033[92m" if _use_ansi_color(stdout) else "" 84 c = "\033[96m" if _use_ansi_color(stdout) else "" 85 b = "\033[1m" if _use_ansi_color(stdout) else "" 86 r = "\033[0m" if g else "" 87 bar = "━" * 52 88 stdout.write(f"\n {g}{bar}{r}\n") 89 stdout.write(f" {b}{c}Dados · passo {step}/{total}{r}\n") 90 stdout.write(f" {b}{g}{title}{r}\n") 91 stdout.write(f" {g}{bar}{r}\n\n") 92 93 94 def read_multiline_until_dot(stdin, stdout, *, max_lines: int = 48) -> str: 95 """Várias linhas; termina com uma linha só com '.' (como no SMTP clássico).""" 96 d = "\033[2m" if _use_ansi_color(stdout) else "" 97 r = "\033[0m" if d else "" 98 stdout.write( 99 f"{d} (podes usar várias linhas; para terminar, uma linha só com . e Enter){r}\n\n" 100 ) 101 stdout.flush() 102 lines: list[str] = [] 103 for _ in range(max_lines): 104 line = stdin.readline() 105 if not line: 106 raise SystemExit(0) 107 s = line.rstrip("\r\n") 108 if s == ".": 109 if lines: 110 break 111 continue 112 lines.append(s) 113 if len("\n".join(lines)) > MAX_ONLINE_PRESENCE_LEN: 114 stdout.write( 115 f"\n {d}(limite de tamanho atingido — campo fechado aqui.){r}\n" 116 ) 117 break 118 return "\n".join(lines).strip() 119 120 121 def clear_screen(stdout) -> None: 122 stdout.write("\033[2J\033[H") 123 stdout.flush() 124 125 126 def _use_ansi_color(stdout) -> bool: 127 if not getattr(stdout, "isatty", lambda: False)(): 128 return False 129 term = (os.environ.get("TERM") or "").strip().lower() 130 if term in ("", "dumb"): 131 return False 132 if os.environ.get("NO_COLOR", "").strip(): 133 return False 134 return True 135 136 137 RUNV_CLUB_MARK: str = "runv.club" 138 139 140 def style_runv_club(text: str, stdout) -> str: 141 """Destaca runv.club a verde no terminal (todas as ocorrências).""" 142 if not _use_ansi_color(stdout) or RUNV_CLUB_MARK not in text: 143 return text 144 g, r = "\033[92m", "\033[0m" 145 return text.replace(RUNV_CLUB_MARK, f"{g}{RUNV_CLUB_MARK}{r}") 146 147 148 def wait_any_key(stdin, stdout) -> None: 149 """Lê uma tecla em modo cru (POSIX); senão, uma linha (Enter).""" 150 if sys.platform != "win32" and stdin.isatty(): 151 try: 152 import termios 153 import tty 154 155 fd = stdin.fileno() 156 old = termios.tcgetattr(fd) 157 try: 158 tty.setraw(fd) 159 ch = stdin.read(1) 160 finally: 161 termios.tcsetattr(fd, termios.TCSADRAIN, old) 162 if ch == "\x03": 163 raise KeyboardInterrupt 164 if ch == "\x04" or ch == "": 165 raise SystemExit(0) 166 return 167 except (ImportError, OSError, termios.error): 168 pass 169 stdout.write(" (tecla Enter para continuar)\n") 170 stdout.flush() 171 line = stdin.readline() 172 if not line: 173 raise SystemExit(0) 174 175 176 def show_opening_splash(stdin, stdout) -> None: 177 clear_screen(stdout) 178 green = "\033[92m" if _use_ansi_color(stdout) else "" 179 reset = "\033[0m" if green else "" 180 stdout.write("\n") 181 for line in RUNV_ASCII_ART.splitlines(): 182 stdout.write(f" {green}{line}{reset}\n") 183 stdout.write(f"\n {green}{ASCII_TAGLINE}{reset}\n\n") 184 stdout.write(f" {green}Aperte qualquer tecla para continuar...{reset}\n") 185 stdout.flush() 186 wait_any_key(stdin, stdout) 187 188 189 def show_paged_template(stdin, stdout, template_path: Path) -> None: 190 raw = template_path.read_text(encoding="utf-8") 191 pages = [p.strip("\n") for p in raw.split(INTRO_PAGE_BREAK)] 192 pages = [p for p in pages if p.strip()] 193 total = len(pages) 194 for i, page in enumerate(pages, start=1): 195 clear_screen(stdout) 196 if total > 1: 197 stdout.write(f" ({i}/{total})\n\n") 198 page = style_runv_club(page, stdout) 199 stdout.write(page) 200 if not page.endswith("\n"): 201 stdout.write("\n") 202 stdout.flush() 203 pause(stdin, stdout) 204 205 206 def collect_loop(stdin, stdout, templates: Path) -> tuple[str, str, str, str, str]: 207 username = email = online_presence = pubkey = "" 208 fp = "" 209 total = 4 210 while True: 211 write_data_step_header(stdout, 1, total, "Nome de utilizador Unix desejado") 212 stdout.write( 213 style_runv_club( 214 "Letras minúsculas, dígitos, _ ou -; começa com letra. " 215 "Deixa em branco só se ainda não tiveres escolhido.\n", 216 stdout, 217 ) 218 ) 219 b = "\033[1m" if _use_ansi_color(stdout) else "" 220 r = "\033[0m" if b else "" 221 stdout.write(f"\n {b}» Escreve abaixo e prima Enter:{r}\n\n ") 222 stdout.flush() 223 u = read_line("", stdin, stdout).strip() 224 if u: 225 username = u 226 227 write_data_step_header(stdout, 2, total, "Email de contacto") 228 stdout.write( 229 "Endereço para a equipa te responder sobre este pedido.\n" 230 ) 231 stdout.write(f"\n {b}» Escreve abaixo e prima Enter:{r}\n\n ") 232 stdout.flush() 233 e = read_line("", stdin, stdout).strip() 234 if e: 235 email = e 236 237 write_data_step_header(stdout, 3, total, "Onde te encontramos online?") 238 stdout.write( 239 style_runv_club( 240 "Links, perfis ou páginas onde aparece o teu trabalho, código ou participação " 241 "— por exemplo site, GitHub, Mastodon, itch.io, etc. " 242 "Uma sugestão por linha.\n", 243 stdout, 244 ) 245 ) 246 stdout.write(f"\n {b}» A tua resposta (várias linhas):{r}\n") 247 stdout.flush() 248 raw_on = read_multiline_until_dot(stdin, stdout) 249 if raw_on: 250 online_presence = raw_on 251 252 write_data_step_header(stdout, 4, total, "Chave pública SSH") 253 stdout.write( 254 "Uma única linha, a mesma que irias pôr em authorized_keys. " 255 "Só a pública.\n" 256 ) 257 stdout.write(f"\n {b}» Cola a linha abaixo e prima Enter:{r}\n\n ") 258 stdout.flush() 259 pk = stdin.readline() 260 if not pk: 261 raise SystemExit(0) 262 pk = pk.rstrip("\r\n") 263 if pk.strip(): 264 pubkey = pk.strip() 265 266 errors: list[str] = [] 267 try: 268 vu = validate_username(username) 269 except ValidationError as ex: 270 errors.append(str(ex)) 271 vu = "" 272 try: 273 ve = validate_email(email) 274 except ValidationError as ex: 275 errors.append(str(ex)) 276 ve = "" 277 try: 278 v_on = validate_online_presence(online_presence) 279 except ValidationError as ex: 280 errors.append(str(ex)) 281 v_on = "" 282 try: 283 if not pubkey: 284 raise ValidationError("a chave pública é obrigatória.") 285 nkey, fp = validate_public_key_line(pubkey) 286 except ValidationError as ex: 287 errors.append(str(ex)) 288 nkey, fp = "", "" 289 290 if errors: 291 clear_screen(stdout) 292 stdout.write("— Corrige os dados —\n\n") 293 for err in errors: 294 stdout.write(f" • {err}\n") 295 stdout.write("\n[Enter] para voltar ao início do formulário\n") 296 stdout.flush() 297 stdin.readline() 298 continue 299 return vu, ve, v_on, nkey, fp 300 301 302 def confirm_loop( 303 stdin, 304 stdout, 305 *, 306 username: str, 307 email: str, 308 online_presence: str, 309 fingerprint: str, 310 templates: Path, 311 ) -> str: 312 now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") 313 body = render_template( 314 templates / "confirm.txt", 315 { 316 "username": username, 317 "email": email, 318 "online_presence": online_presence, 319 "fingerprint": fingerprint, 320 "submitted_preview": now, 321 }, 322 ) 323 while True: 324 clear_screen(stdout) 325 stdout.write(style_runv_club(body, stdout)) 326 stdout.write("\n [c] confirmar envio\n") 327 stdout.write(" [e] editar dados\n") 328 stdout.write(" [x] cancelar e sair\n\n") 329 stdout.write("Opção: ") 330 stdout.flush() 331 line = stdin.readline() 332 if not line: 333 raise SystemExit(0) 334 c = line.strip().lower() 335 if c in ("c", "confirmar", "s", "sim", "y", "yes"): 336 return "confirm" 337 if c in ("e", "editar"): 338 return "edit" 339 if c in ("x", "cancelar", "n", "nao", "não"): 340 return "cancel" 341 stdout.write("Opção inválida.\n") 342 stdout.write("[Enter]") 343 stdin.readline() 344 345 346 def main() -> int: 347 parser = argparse.ArgumentParser(description="Fluxo SSH entre@runv.club (runv.club)") 348 parser.add_argument("--version", action="version", version=f"%(prog)s {APP_VERSION}") 349 args = parser.parse_args() 350 del args 351 352 stdin, stdout = sys.stdin, sys.stdout 353 354 install_root = find_install_root() 355 config_path = find_config_path(install_root) 356 try: 357 cfg = load_config(config_path) 358 except (OSError, ValueError) as e: 359 eprint(f"Erro de configuração: {e}") 360 return 2 361 362 paths = resolve_paths(cfg, install_root) 363 logger = setup_file_logger(paths.log_file) 364 365 ctx = ssh_remote_context() 366 log_session( 367 logger, 368 f"sessão iniciada remote_addr={ctx.get('remote_addr')!r} tty={ctx.get('tty')!r}", 369 ) 370 371 templates = paths.templates_dir 372 if not templates.is_dir(): 373 eprint(f"Templates em falta: {templates}") 374 log_session(logger, f"ERRO templates em falta: {templates}", level=40) 375 return 2 376 377 try: 378 # --- Abertura: arte ASCII da landing (verde) + qualquer tecla 379 show_opening_splash(stdin, stdout) 380 381 # --- Etapa 1: narrativa (%%PAGE%% em intro.txt) 382 show_paged_template(stdin, stdout, templates / "intro.txt") 383 384 # --- Etapa 2: aviso chave (pode ter %%PAGE%% como intro.txt) 385 show_paged_template(stdin, stdout, templates / "warning_public_key.txt") 386 387 # --- Etapa 3–4: coleta e confirmação (com edição repetível) 388 username, email, online_presence, pubkey, fingerprint = collect_loop( 389 stdin, stdout, templates 390 ) 391 while True: 392 action = confirm_loop( 393 stdin, 394 stdout, 395 username=username, 396 email=email, 397 online_presence=online_presence, 398 fingerprint=fingerprint, 399 templates=templates, 400 ) 401 if action == "cancel": 402 log_session(logger, "utilizador cancelou antes de gravar") 403 stdout.write("\nPedido cancelado. Até logo.\n\n") 404 return 0 405 if action == "edit": 406 username, email, online_presence, pubkey, fingerprint = collect_loop( 407 stdin, stdout, templates 408 ) 409 continue 410 break 411 412 request_id = "" 413 path_saved = None 414 for attempt in range(8): 415 request_id = new_request_id() 416 payload = build_request_payload( 417 request_id=request_id, 418 username=username, 419 email=email, 420 online_presence=online_presence, 421 public_key=pubkey, 422 fingerprint=fingerprint, 423 remote_addr=ctx.get("remote_addr"), 424 tty=ctx.get("tty"), 425 ) 426 try: 427 path_saved = save_request_json( 428 queue_dir=paths.queue_dir, 429 request_id=request_id, 430 payload=payload, 431 logger=logger, 432 ) 433 break 434 except FileExistsError: 435 log_session(logger, f"colisão request_id, a gerar outro (tentativa {attempt + 1})") 436 continue 437 except OSError as e: 438 log_session(logger, f"ERRO ao gravar pedido: {e}", level=40) 439 eprint("Não foi possível gravar o pedido. Contacte a administração.") 440 return 2 441 else: 442 log_session(logger, "ERRO: não foi possível obter request_id único", level=40) 443 eprint("Erro interno: tente novamente.") 444 return 2 445 446 submitted_at = payload["submitted_at"] 447 _ = path_saved 448 449 # Aviso em consola ao admin (template curto) 450 try: 451 oneline = online_presence.replace("\n", " ").strip() 452 if len(oneline) > 100: 453 oneline = oneline[:97] + "..." 454 notice = render_template( 455 templates / "admin_console_notice.txt", 456 { 457 "request_id": request_id, 458 "username": username, 459 "email": email, 460 "fingerprint": fingerprint, 461 "submitted_at": submitted_at, 462 "online_presence_line": oneline, 463 }, 464 ) 465 log_session(logger, "admin_console_notice:\n" + notice.strip()) 466 except OSError: 467 pass 468 469 admin_email, mail_from = resolve_entre_notify_recipients(cfg, logger=logger) 470 sendmail_path = str(cfg.get("sendmail_path", "/usr/sbin/sendmail")).strip() 471 if admin_email: 472 try: 473 subject = f"[runv] Novo pedido: {username}" 474 body = render_template( 475 templates / "admin_mail.txt", 476 { 477 "request_id": request_id, 478 "username": username, 479 "email": email, 480 "online_presence": online_presence, 481 "public_key": pubkey, 482 "fingerprint": fingerprint, 483 "submitted_at": submitted_at, 484 "remote_addr": ctx.get("remote_addr") or "", 485 "tty": ctx.get("tty") or "", 486 }, 487 ) 488 sendmail_notify( 489 admin_email=admin_email, 490 mail_from=mail_from, 491 subject=subject, 492 body=body, 493 sendmail_path=sendmail_path, 494 logger=logger, 495 ) 496 except OSError as e: 497 log_session(logger, f"template admin_mail falhou: {e}", level=40) 498 499 # --- Etapa 7: despedida 500 clear_screen(stdout) 501 goodbye = render_template( 502 templates / "goodbye.txt", 503 {"request_id": request_id}, 504 ) 505 stdout.write(style_runv_club(goodbye, stdout)) 506 stdout.flush() 507 log_session(logger, f"sessão concluída request_id={request_id}") 508 except ValidationError as e: 509 log_session(logger, f"validação: {e}", level=40) 510 stdout.write(style_runv_club(f"\n{e}\n\n", stdout)) 511 return 1 512 return 0 513 514 515 if __name__ == "__main__": 516 raise SystemExit(main())