runv-server

server tooling for runv.club
Log | Files | Refs | README

entre_app.py (17460B)


      1 #!/usr/bin/env python3
      2 """
      3 Experiência SSH guiada para pedidos de entrada na runv.club (utilizador «entre»).
      4 
      5 Executado via ForceCommand no OpenSSH. Não cria contas Linux; apenas fila + log
      6 + notificação opcional.
      7 
      8 Versão 0.02 — runv.club
      9 """
     10 
     11 from __future__ import annotations
     12 
     13 import argparse
     14 import os
     15 import sys
     16 from datetime import datetime, timezone
     17 from pathlib import Path
     18 
     19 # Arte ASCII da landing (site/public/index.html) — manter alinhado ao <pre class="ascii">.
     20 RUNV_ASCII_ART: str = """██████╗ ██╗   ██╗███╗   ██╗██╗   ██╗
     21 ██╔══██╗██║   ██║████╗  ██║██║   ██║
     22 ██████╔╝██║   ██║██╔██╗ ██║██║   ██║
     23 ██╔══██╗╚██╗ ██╔╝██║╚██╗██║╚██╗ ██╔╝
     24 ██║  ██║ ╚████╔╝ ██║ ╚████║ ╚████╔╝
     25 ╚═╝  ╚═╝  ╚═══╝  ╚═╝  ╚═══╝  ╚═══╝"""
     26 
     27 ASCII_TAGLINE: str = ".club — um computador para compartilhar"
     28 
     29 # Em intro.txt: linha só com este marcador separa ecrãs da narrativa.
     30 INTRO_PAGE_BREAK: str = "%%PAGE%%"
     31 
     32 from entre_core import (
     33     APP_VERSION,
     34     MAX_ONLINE_PRESENCE_LEN,
     35     ValidationError,
     36     build_request_payload,
     37     find_config_path,
     38     find_install_root,
     39     load_config,
     40     log_session,
     41     new_request_id,
     42     render_template,
     43     resolve_entre_notify_recipients,
     44     resolve_paths,
     45     save_request_json,
     46     sendmail_notify,
     47     setup_file_logger,
     48     ssh_remote_context,
     49     validate_email,
     50     validate_online_presence,
     51     validate_public_key_line,
     52     validate_username,
     53 )
     54 
     55 
     56 def eprint(msg: str) -> None:
     57     print(msg, file=sys.stderr)
     58 
     59 
     60 def pause(stdin, stdout) -> None:
     61     stdout.write("\n[Enter] continuar  ·  [q] sair\n")
     62     stdout.flush()
     63     line = stdin.readline()
     64     if not line:
     65         raise SystemExit(0)
     66     if line.strip().lower() in ("q", "quit", "sair"):
     67         print("\nAté logo.\n")
     68         raise SystemExit(0)
     69 
     70 
     71 def read_line(prompt: str, stdin, stdout) -> str:
     72     stdout.write(prompt)
     73     stdout.flush()
     74     line = stdin.readline()
     75     if not line:
     76         raise SystemExit(0)
     77     return line.rstrip("\r\n")
     78 
     79 
     80 def write_data_step_header(stdout, step: int, total: int, title: str) -> None:
     81     """Cabeçalho visível antes de cada campo do formulário."""
     82     clear_screen(stdout)
     83     g = "\033[92m" if _use_ansi_color(stdout) else ""
     84     c = "\033[96m" if _use_ansi_color(stdout) else ""
     85     b = "\033[1m" if _use_ansi_color(stdout) else ""
     86     r = "\033[0m" if g else ""
     87     bar = "━" * 52
     88     stdout.write(f"\n  {g}{bar}{r}\n")
     89     stdout.write(f"  {b}{c}Dados · passo {step}/{total}{r}\n")
     90     stdout.write(f"  {b}{g}{title}{r}\n")
     91     stdout.write(f"  {g}{bar}{r}\n\n")
     92 
     93 
     94 def read_multiline_until_dot(stdin, stdout, *, max_lines: int = 48) -> str:
     95     """Várias linhas; termina com uma linha só com '.' (como no SMTP clássico)."""
     96     d = "\033[2m" if _use_ansi_color(stdout) else ""
     97     r = "\033[0m" if d else ""
     98     stdout.write(
     99         f"{d}  (podes usar várias linhas; para terminar, uma linha só com . e Enter){r}\n\n"
    100     )
    101     stdout.flush()
    102     lines: list[str] = []
    103     for _ in range(max_lines):
    104         line = stdin.readline()
    105         if not line:
    106             raise SystemExit(0)
    107         s = line.rstrip("\r\n")
    108         if s == ".":
    109             if lines:
    110                 break
    111             continue
    112         lines.append(s)
    113         if len("\n".join(lines)) > MAX_ONLINE_PRESENCE_LEN:
    114             stdout.write(
    115                 f"\n  {d}(limite de tamanho atingido — campo fechado aqui.){r}\n"
    116             )
    117             break
    118     return "\n".join(lines).strip()
    119 
    120 
    121 def clear_screen(stdout) -> None:
    122     stdout.write("\033[2J\033[H")
    123     stdout.flush()
    124 
    125 
    126 def _use_ansi_color(stdout) -> bool:
    127     if not getattr(stdout, "isatty", lambda: False)():
    128         return False
    129     term = (os.environ.get("TERM") or "").strip().lower()
    130     if term in ("", "dumb"):
    131         return False
    132     if os.environ.get("NO_COLOR", "").strip():
    133         return False
    134     return True
    135 
    136 
    137 RUNV_CLUB_MARK: str = "runv.club"
    138 
    139 
    140 def style_runv_club(text: str, stdout) -> str:
    141     """Destaca runv.club a verde no terminal (todas as ocorrências)."""
    142     if not _use_ansi_color(stdout) or RUNV_CLUB_MARK not in text:
    143         return text
    144     g, r = "\033[92m", "\033[0m"
    145     return text.replace(RUNV_CLUB_MARK, f"{g}{RUNV_CLUB_MARK}{r}")
    146 
    147 
    148 def wait_any_key(stdin, stdout) -> None:
    149     """Lê uma tecla em modo cru (POSIX); senão, uma linha (Enter)."""
    150     if sys.platform != "win32" and stdin.isatty():
    151         try:
    152             import termios
    153             import tty
    154 
    155             fd = stdin.fileno()
    156             old = termios.tcgetattr(fd)
    157             try:
    158                 tty.setraw(fd)
    159                 ch = stdin.read(1)
    160             finally:
    161                 termios.tcsetattr(fd, termios.TCSADRAIN, old)
    162             if ch == "\x03":
    163                 raise KeyboardInterrupt
    164             if ch == "\x04" or ch == "":
    165                 raise SystemExit(0)
    166             return
    167         except (ImportError, OSError, termios.error):
    168             pass
    169     stdout.write("  (tecla Enter para continuar)\n")
    170     stdout.flush()
    171     line = stdin.readline()
    172     if not line:
    173         raise SystemExit(0)
    174 
    175 
    176 def show_opening_splash(stdin, stdout) -> None:
    177     clear_screen(stdout)
    178     green = "\033[92m" if _use_ansi_color(stdout) else ""
    179     reset = "\033[0m" if green else ""
    180     stdout.write("\n")
    181     for line in RUNV_ASCII_ART.splitlines():
    182         stdout.write(f"  {green}{line}{reset}\n")
    183     stdout.write(f"\n  {green}{ASCII_TAGLINE}{reset}\n\n")
    184     stdout.write(f"  {green}Aperte qualquer tecla para continuar...{reset}\n")
    185     stdout.flush()
    186     wait_any_key(stdin, stdout)
    187 
    188 
    189 def show_paged_template(stdin, stdout, template_path: Path) -> None:
    190     raw = template_path.read_text(encoding="utf-8")
    191     pages = [p.strip("\n") for p in raw.split(INTRO_PAGE_BREAK)]
    192     pages = [p for p in pages if p.strip()]
    193     total = len(pages)
    194     for i, page in enumerate(pages, start=1):
    195         clear_screen(stdout)
    196         if total > 1:
    197             stdout.write(f"  ({i}/{total})\n\n")
    198         page = style_runv_club(page, stdout)
    199         stdout.write(page)
    200         if not page.endswith("\n"):
    201             stdout.write("\n")
    202         stdout.flush()
    203         pause(stdin, stdout)
    204 
    205 
    206 def collect_loop(stdin, stdout, templates: Path) -> tuple[str, str, str, str, str]:
    207     username = email = online_presence = pubkey = ""
    208     fp = ""
    209     total = 4
    210     while True:
    211         write_data_step_header(stdout, 1, total, "Nome de utilizador Unix desejado")
    212         stdout.write(
    213             style_runv_club(
    214                 "Letras minúsculas, dígitos, _ ou -; começa com letra. "
    215                 "Deixa em branco só se ainda não tiveres escolhido.\n",
    216                 stdout,
    217             )
    218         )
    219         b = "\033[1m" if _use_ansi_color(stdout) else ""
    220         r = "\033[0m" if b else ""
    221         stdout.write(f"\n  {b}» Escreve abaixo e prima Enter:{r}\n\n  ")
    222         stdout.flush()
    223         u = read_line("", stdin, stdout).strip()
    224         if u:
    225             username = u
    226 
    227         write_data_step_header(stdout, 2, total, "Email de contacto")
    228         stdout.write(
    229             "Endereço para a equipa te responder sobre este pedido.\n"
    230         )
    231         stdout.write(f"\n  {b}» Escreve abaixo e prima Enter:{r}\n\n  ")
    232         stdout.flush()
    233         e = read_line("", stdin, stdout).strip()
    234         if e:
    235             email = e
    236 
    237         write_data_step_header(stdout, 3, total, "Onde te encontramos online?")
    238         stdout.write(
    239             style_runv_club(
    240                 "Links, perfis ou páginas onde aparece o teu trabalho, código ou participação "
    241                 "— por exemplo site, GitHub, Mastodon, itch.io, etc. "
    242                 "Uma sugestão por linha.\n",
    243                 stdout,
    244             )
    245         )
    246         stdout.write(f"\n  {b}» A tua resposta (várias linhas):{r}\n")
    247         stdout.flush()
    248         raw_on = read_multiline_until_dot(stdin, stdout)
    249         if raw_on:
    250             online_presence = raw_on
    251 
    252         write_data_step_header(stdout, 4, total, "Chave pública SSH")
    253         stdout.write(
    254             "Uma única linha, a mesma que irias pôr em authorized_keys. "
    255             "Só a pública.\n"
    256         )
    257         stdout.write(f"\n  {b}» Cola a linha abaixo e prima Enter:{r}\n\n  ")
    258         stdout.flush()
    259         pk = stdin.readline()
    260         if not pk:
    261             raise SystemExit(0)
    262         pk = pk.rstrip("\r\n")
    263         if pk.strip():
    264             pubkey = pk.strip()
    265 
    266         errors: list[str] = []
    267         try:
    268             vu = validate_username(username)
    269         except ValidationError as ex:
    270             errors.append(str(ex))
    271             vu = ""
    272         try:
    273             ve = validate_email(email)
    274         except ValidationError as ex:
    275             errors.append(str(ex))
    276             ve = ""
    277         try:
    278             v_on = validate_online_presence(online_presence)
    279         except ValidationError as ex:
    280             errors.append(str(ex))
    281             v_on = ""
    282         try:
    283             if not pubkey:
    284                 raise ValidationError("a chave pública é obrigatória.")
    285             nkey, fp = validate_public_key_line(pubkey)
    286         except ValidationError as ex:
    287             errors.append(str(ex))
    288             nkey, fp = "", ""
    289 
    290         if errors:
    291             clear_screen(stdout)
    292             stdout.write("— Corrige os dados —\n\n")
    293             for err in errors:
    294                 stdout.write(f"  • {err}\n")
    295             stdout.write("\n[Enter] para voltar ao início do formulário\n")
    296             stdout.flush()
    297             stdin.readline()
    298             continue
    299         return vu, ve, v_on, nkey, fp
    300 
    301 
    302 def confirm_loop(
    303     stdin,
    304     stdout,
    305     *,
    306     username: str,
    307     email: str,
    308     online_presence: str,
    309     fingerprint: str,
    310     templates: Path,
    311 ) -> str:
    312     now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
    313     body = render_template(
    314         templates / "confirm.txt",
    315         {
    316             "username": username,
    317             "email": email,
    318             "online_presence": online_presence,
    319             "fingerprint": fingerprint,
    320             "submitted_preview": now,
    321         },
    322     )
    323     while True:
    324         clear_screen(stdout)
    325         stdout.write(style_runv_club(body, stdout))
    326         stdout.write("\n  [c] confirmar envio\n")
    327         stdout.write("  [e] editar dados\n")
    328         stdout.write("  [x] cancelar e sair\n\n")
    329         stdout.write("Opção: ")
    330         stdout.flush()
    331         line = stdin.readline()
    332         if not line:
    333             raise SystemExit(0)
    334         c = line.strip().lower()
    335         if c in ("c", "confirmar", "s", "sim", "y", "yes"):
    336             return "confirm"
    337         if c in ("e", "editar"):
    338             return "edit"
    339         if c in ("x", "cancelar", "n", "nao", "não"):
    340             return "cancel"
    341         stdout.write("Opção inválida.\n")
    342         stdout.write("[Enter]")
    343         stdin.readline()
    344 
    345 
    346 def main() -> int:
    347     parser = argparse.ArgumentParser(description="Fluxo SSH entre@runv.club (runv.club)")
    348     parser.add_argument("--version", action="version", version=f"%(prog)s {APP_VERSION}")
    349     args = parser.parse_args()
    350     del args
    351 
    352     stdin, stdout = sys.stdin, sys.stdout
    353 
    354     install_root = find_install_root()
    355     config_path = find_config_path(install_root)
    356     try:
    357         cfg = load_config(config_path)
    358     except (OSError, ValueError) as e:
    359         eprint(f"Erro de configuração: {e}")
    360         return 2
    361 
    362     paths = resolve_paths(cfg, install_root)
    363     logger = setup_file_logger(paths.log_file)
    364 
    365     ctx = ssh_remote_context()
    366     log_session(
    367         logger,
    368         f"sessão iniciada remote_addr={ctx.get('remote_addr')!r} tty={ctx.get('tty')!r}",
    369     )
    370 
    371     templates = paths.templates_dir
    372     if not templates.is_dir():
    373         eprint(f"Templates em falta: {templates}")
    374         log_session(logger, f"ERRO templates em falta: {templates}", level=40)
    375         return 2
    376 
    377     try:
    378         # --- Abertura: arte ASCII da landing (verde) + qualquer tecla
    379         show_opening_splash(stdin, stdout)
    380 
    381         # --- Etapa 1: narrativa (%%PAGE%% em intro.txt)
    382         show_paged_template(stdin, stdout, templates / "intro.txt")
    383 
    384         # --- Etapa 2: aviso chave (pode ter %%PAGE%% como intro.txt)
    385         show_paged_template(stdin, stdout, templates / "warning_public_key.txt")
    386 
    387         # --- Etapa 3–4: coleta e confirmação (com edição repetível)
    388         username, email, online_presence, pubkey, fingerprint = collect_loop(
    389             stdin, stdout, templates
    390         )
    391         while True:
    392             action = confirm_loop(
    393                 stdin,
    394                 stdout,
    395                 username=username,
    396                 email=email,
    397                 online_presence=online_presence,
    398                 fingerprint=fingerprint,
    399                 templates=templates,
    400             )
    401             if action == "cancel":
    402                 log_session(logger, "utilizador cancelou antes de gravar")
    403                 stdout.write("\nPedido cancelado. Até logo.\n\n")
    404                 return 0
    405             if action == "edit":
    406                 username, email, online_presence, pubkey, fingerprint = collect_loop(
    407                     stdin, stdout, templates
    408                 )
    409                 continue
    410             break
    411 
    412         request_id = ""
    413         path_saved = None
    414         for attempt in range(8):
    415             request_id = new_request_id()
    416             payload = build_request_payload(
    417                 request_id=request_id,
    418                 username=username,
    419                 email=email,
    420                 online_presence=online_presence,
    421                 public_key=pubkey,
    422                 fingerprint=fingerprint,
    423                 remote_addr=ctx.get("remote_addr"),
    424                 tty=ctx.get("tty"),
    425             )
    426             try:
    427                 path_saved = save_request_json(
    428                     queue_dir=paths.queue_dir,
    429                     request_id=request_id,
    430                     payload=payload,
    431                     logger=logger,
    432                 )
    433                 break
    434             except FileExistsError:
    435                 log_session(logger, f"colisão request_id, a gerar outro (tentativa {attempt + 1})")
    436                 continue
    437             except OSError as e:
    438                 log_session(logger, f"ERRO ao gravar pedido: {e}", level=40)
    439                 eprint("Não foi possível gravar o pedido. Contacte a administração.")
    440                 return 2
    441         else:
    442             log_session(logger, "ERRO: não foi possível obter request_id único", level=40)
    443             eprint("Erro interno: tente novamente.")
    444             return 2
    445 
    446         submitted_at = payload["submitted_at"]
    447         _ = path_saved
    448 
    449         # Aviso em consola ao admin (template curto)
    450         try:
    451             oneline = online_presence.replace("\n", " ").strip()
    452             if len(oneline) > 100:
    453                 oneline = oneline[:97] + "..."
    454             notice = render_template(
    455                 templates / "admin_console_notice.txt",
    456                 {
    457                     "request_id": request_id,
    458                     "username": username,
    459                     "email": email,
    460                     "fingerprint": fingerprint,
    461                     "submitted_at": submitted_at,
    462                     "online_presence_line": oneline,
    463                 },
    464             )
    465             log_session(logger, "admin_console_notice:\n" + notice.strip())
    466         except OSError:
    467             pass
    468 
    469         admin_email, mail_from = resolve_entre_notify_recipients(cfg, logger=logger)
    470         sendmail_path = str(cfg.get("sendmail_path", "/usr/sbin/sendmail")).strip()
    471         if admin_email:
    472             try:
    473                 subject = f"[runv] Novo pedido: {username}"
    474                 body = render_template(
    475                     templates / "admin_mail.txt",
    476                     {
    477                         "request_id": request_id,
    478                         "username": username,
    479                         "email": email,
    480                         "online_presence": online_presence,
    481                         "public_key": pubkey,
    482                         "fingerprint": fingerprint,
    483                         "submitted_at": submitted_at,
    484                         "remote_addr": ctx.get("remote_addr") or "",
    485                         "tty": ctx.get("tty") or "",
    486                     },
    487                 )
    488                 sendmail_notify(
    489                     admin_email=admin_email,
    490                     mail_from=mail_from,
    491                     subject=subject,
    492                     body=body,
    493                     sendmail_path=sendmail_path,
    494                     logger=logger,
    495                 )
    496             except OSError as e:
    497                 log_session(logger, f"template admin_mail falhou: {e}", level=40)
    498 
    499         # --- Etapa 7: despedida
    500         clear_screen(stdout)
    501         goodbye = render_template(
    502             templates / "goodbye.txt",
    503             {"request_id": request_id},
    504         )
    505         stdout.write(style_runv_club(goodbye, stdout))
    506         stdout.flush()
    507         log_session(logger, f"sessão concluída request_id={request_id}")
    508     except ValidationError as e:
    509         log_session(logger, f"validação: {e}", level=40)
    510         stdout.write(style_runv_club(f"\n{e}\n\n", stdout))
    511         return 1
    512     return 0
    513 
    514 
    515 if __name__ == "__main__":
    516     raise SystemExit(main())