commit ae760bda9b2361b42d0150bb739d7ba815b6d133
parent 9aee2918523d3b3c972a83289f6154b0ba94e9c2
Author: Pablo Murad <pblmrd@gmail.com>
Date: Sun, 3 May 2026 12:57:10 -0300
mais inteligente
Diffstat:
18 files changed, 2578 insertions(+), 717 deletions(-)
diff --git a/.agent/skills b/.agent/skills
@@ -0,0 +1 @@
+Subproject commit 44d6277b698618538f3ae68fa8c803d7e5f07482
diff --git a/.env.example b/.env.example
@@ -14,3 +14,30 @@ ADMIN_PASSWORD=
# Opcional: YouTube PO Token para acesso a formatos Android
# Obtenha em: https://github.com/yt-dlp/yt-dlp/wiki/PO-Token-Guide
# YOUTUBE_PO_TOKEN=android.gvs+XXX
+
+# ---------------------------------------------------------------------------
+# Modelos OpenAI (opcional, defaults sao seguros)
+# ---------------------------------------------------------------------------
+
+# Modelo de chat usado para sumario, conversao para PT-BR e deteccao de tipo.
+# Default: gpt-5-mini. Alternativas: gpt-5, gpt-5-nano, gpt-4.1, gpt-4.1-mini, gpt-4o-mini.
+# OPENAI_CHAT_MODEL=gpt-5-mini
+
+# Modelo padrao de transcricao (saida em texto).
+# Default: gpt-4o-mini-transcribe. Alternativas: gpt-4o-transcribe, whisper-1.
+# OPENAI_TRANSCRIBE_MODEL=gpt-4o-mini-transcribe
+
+# Modelo usado quando precisamos de timestamps (verbose_json) para gerar capitulos.
+# Default: whisper-1 (ainda e o mais confiavel para segments com start/end).
+# OPENAI_TRANSCRIBE_TIMESTAMPS_MODEL=whisper-1
+
+# Liga/desliga geracao de capitulos com timestamps em audio/video.
+# OPENAI_ENABLE_CHAPTERS=true
+
+# Liga/desliga sumario estruturado (TL;DR, key points, decisoes, action items, etc).
+# Quando false, mantem o sumario textual legado.
+# OPENAI_ENABLE_SMART_SUMMARY=true
+
+# Esforco de raciocinio para modelos da familia gpt-5/o-series.
+# Valores validos: minimal, low, medium, high. Default: medium.
+# OPENAI_REASONING_EFFORT=medium
diff --git a/README.md b/README.md
@@ -1,6 +1,6 @@
# Lazier
-Sistema CLI e Web para **transcrição** e **sumarização** de áudios, vídeos, textos e PDFs usando OpenAI (Whisper + GPT). Suporta upload de arquivos, URLs do YouTube e de centenas de outros sites (via yt-dlp), e páginas web. Exporta em DOCX, TXT, Markdown, JSON e PDF.
+Sistema CLI e Web para **transcrição** e **sumarização** de áudios, vídeos, textos e PDFs usando os modelos mais recentes da OpenAI (família GPT-5 + `gpt-4o-mini-transcribe`/`whisper-1`). Além de transcrição/sumário tradicionais, gera **sumário inteligente** estruturado (TL;DR, pontos-chave, decisões, *action items*, tópicos e citações) e **capítulos com timestamps** para áudio/vídeo. Suporta upload de arquivos, URLs do YouTube e de centenas de outros sites (via yt-dlp), e páginas web. Exporta em DOCX, TXT, Markdown, JSON e PDF.
## Requisitos
@@ -22,6 +22,19 @@ cp .env.example .env
| `SESSION_SECRET_KEY` | Sim (Web) | Chave para sessões. Gere com: `openssl rand -hex 32` |
| `ADMIN_USER` / `ADMIN_PASSWORD` | Não | Usuário admin da WebGUI (criado na primeira execução se definidos) |
+### Modelos e inteligência (opcionais)
+
+Defaults seguros já vêm configurados; sobrescreva apenas se quiser mudar.
+
+| Variável | Default | Descrição |
+|----------|---------|-----------|
+| `OPENAI_CHAT_MODEL` | `gpt-5-mini` | Modelo de chat usado para sumário, conversão para PT-BR, detecção de tipo e capítulos. Alternativas: `gpt-5`, `gpt-5-nano`, `gpt-4.1`, `gpt-4o-mini`. |
+| `OPENAI_TRANSCRIBE_MODEL` | `gpt-4o-mini-transcribe` | Modelo padrão de transcrição (saída em texto). Alternativas: `gpt-4o-transcribe`, `whisper-1`. |
+| `OPENAI_TRANSCRIBE_TIMESTAMPS_MODEL` | `whisper-1` | Modelo usado quando precisamos de `verbose_json` para gerar capítulos. Atualmente `whisper-1` é o mais confiável para retornar `start`/`end`. |
+| `OPENAI_ENABLE_SMART_SUMMARY` | `true` | Liga/desliga o sumário estruturado (TL;DR, pontos-chave, decisões, ações, tópicos, citações, perguntas em aberto). Quando `false`, mantém o sumário textual legado. |
+| `OPENAI_ENABLE_CHAPTERS` | `true` | Liga/desliga geração de capítulos com timestamps em áudio/vídeo. |
+| `OPENAI_REASONING_EFFORT` | `medium` | Esforço de raciocínio para modelos da família `gpt-5`/`o-series`. Valores: `minimal`, `low`, `medium`, `high`. |
+
Opcional: `YOUTUBE_PO_TOKEN` para melhor suporte a alguns vídeos do YouTube ([guia](https://github.com/yt-dlp/yt-dlp/wiki/PO-Token-Guide)).
**Segredos:** nunca commite ficheiros `.env` com valores reais, chaves API ou passwords. Mantenha apenas `.env.example` no Git; use variáveis de ambiente no host ou *secrets* do CI/CD (GitHub Actions, etc.).
@@ -74,23 +87,86 @@ Acesse **http://localhost:19283** (ou use `--port` para outra porta).
### CLI
```bash
-# Transcrever e sumarizar
-lazier audio.mp3
-lazier video.mp4
-lazier document.pdf
-lazier "https://www.youtube.com/watch?v=VIDEO_ID"
-
-# Opções
-lazier audio.mp3 --format json # Formato: docx, txt, md, json, pdf
-lazier transcribe video.mp4 # Apenas transcrição
-lazier web # Inicia servidor web
-lazier cache clear # Limpa cache
+# Transcrição (gera o texto em PT-BR)
+lazier transcribe audio.mp3
+lazier transcribe video.mp4
+lazier transcribe "https://www.youtube.com/watch?v=VIDEO_ID"
+
+# Sumário (texto + sumário inteligente + capítulos quando aplicável)
+lazier summarize document.pdf
+lazier summarize "https://example.com/artigo" --format md
+lazier summarize aula.mp3 --gpt-model gpt-5 --reasoning high
+
+# Desligar features novas explicitamente
+lazier summarize aula.mp3 --no-smart --no-chapters
+
+# Outras opções
+lazier summarize video.mp4 --format json # docx, txt, md, json, pdf
+lazier config # mostra a config corrente
+lazier web # inicia servidor web
+lazier cache clear # limpa cache
```
+Flags principais:
+
+- `--model` sobrescreve `OPENAI_TRANSCRIBE_MODEL` para esse comando.
+- `--gpt-model` sobrescreve `OPENAI_CHAT_MODEL`.
+- `--smart/--no-smart` força ligar/desligar o sumário estruturado.
+- `--chapters/--no-chapters` força ligar/desligar capítulos com timestamps.
+- `--reasoning {minimal,low,medium,high}` ajusta o esforço de raciocínio para modelos `gpt-5`/`o-series`.
+
### WebGUI
Acesse http://localhost:19283 após iniciar com `lazier web` ou Docker. Na primeira vez, faça login com o usuário e senha definidos em `ADMIN_USER` e `ADMIN_PASSWORD` no `.env` (se configurados).
+### API HTTP
+
+`POST /api/process` e `POST /api/upload` aceitam, além dos campos clássicos, overrides por requisição:
+
+```json
+{
+ "url": "https://example.com/aula.mp3",
+ "format": "md",
+ "mode": "summarize",
+ "chat_model": "gpt-5",
+ "transcribe_model": "gpt-4o-transcribe",
+ "smart": true,
+ "chapters": true
+}
+```
+
+Os campos `chat_model`, `transcribe_model`, `smart` e `chapters` são opcionais e sobrescrevem os defaults definidos no `.env` apenas para esse job. `GET /api/jobs/{id}/details` passa a devolver `smart_summary`, `chapters` e `content_type` quando disponíveis.
+
+## Inteligência
+
+### Sumário estruturado
+
+Quando `OPENAI_ENABLE_SMART_SUMMARY=true` (default), o Lazier gera um objeto `SmartSummary` validado por Pydantic com Structured Outputs. Cada saída inclui:
+
+- `tldr`: resumo de 1-3 frases
+- `key_points`: pontos principais em ordem lógica
+- `decisions`: decisões explícitas tomadas no conteúdo
+- `action_items`: lista de `{owner, task, due_hint}`
+- `topics`: temas curtos
+- `quotes`: trechos literais marcantes
+- `open_questions`: perguntas em aberto
+
+O caminho legado (`summarize_text` em texto livre) ainda é usado como fallback se o modelo não suportar Structured Outputs ou se `OPENAI_ENABLE_SMART_SUMMARY=false`.
+
+### Detecção de tipo de conteúdo
+
+Antes de gerar o sumário, o sistema chama um classificador curto que escolhe entre `lecture`, `podcast`, `interview`, `news`, `tutorial`, `meeting`, `tech_doc` ou `other`. O tipo detectado é exibido na CLI/API e usado para ajustar o prompt do sumário (ex.: reuniões enfatizam `decisions`/`action_items`; palestras enfatizam conceitos e exemplos).
+
+### Capítulos com timestamps
+
+Para áudio/vídeo, quando `OPENAI_ENABLE_CHAPTERS=true`, o Lazier:
+
+1. Pede ao modelo de transcrição (`whisper-1` por padrão para timestamps) saída em `verbose_json` com `segments`.
+2. Envia esses segmentos ao modelo de chat, que devolve uma lista de capítulos com título, resumo e índices inicial/final.
+3. Materializa cada capítulo com `start`, `end` e `start_hms`/`end_hms`.
+
+Os capítulos aparecem nos exports DOCX/Markdown/PDF/TXT e no JSON cru.
+
## Sites suportados (vídeo/áudio)
Além do YouTube, você pode colar URLs de vídeo ou áudio de **centenas de sites**. O Lazier usa o [yt-dlp](https://github.com/yt-dlp/yt-dlp) para extrair o áudio; se a URL não for um vídeo, o sistema tenta extrair o texto da página e sumarizar.
diff --git a/lazier/api/routes.py b/lazier/api/routes.py
@@ -38,6 +38,10 @@ class ProcessRequest(BaseModel):
mode: Optional[str] = None
transcribe: Optional[bool] = None
summarize: Optional[bool] = None
+ chat_model: Optional[str] = None
+ transcribe_model: Optional[str] = None
+ smart: Optional[bool] = None
+ chapters: Optional[bool] = None
def _resolve_mode(
@@ -90,6 +94,25 @@ def _progress_updater(job_id: str):
return callback
+def _build_overrides(
+ *,
+ chat_model: Optional[str],
+ transcribe_model: Optional[str],
+ smart: Optional[bool],
+ chapters: Optional[bool],
+) -> dict:
+ overrides: dict = {}
+ if chat_model:
+ overrides["chat_model"] = chat_model
+ if transcribe_model:
+ overrides["transcribe_model"] = transcribe_model
+ if smart is not None:
+ overrides["smart"] = smart
+ if chapters is not None:
+ overrides["chapters"] = chapters
+ return overrides
+
+
def _create_job(
*,
mode: str,
@@ -98,10 +121,14 @@ def _create_job(
source_url: Optional[str] = None,
file_path: Optional[str] = None,
input_type: Optional[str] = None,
+ overrides: Optional[dict] = None,
) -> dict:
store = get_job_store()
job_id = str(uuid.uuid4())
created_at = datetime.now().isoformat()
+ metadata: dict = {}
+ if overrides:
+ metadata["_overrides"] = overrides
return store.create_job(
{
"id": job_id,
@@ -113,7 +140,7 @@ def _create_job(
"source_url": source_url,
"file_path": file_path,
"format": output_format,
- "metadata": {},
+ "metadata": metadata,
"created_at": created_at,
}
)
@@ -128,6 +155,7 @@ def _process_job(job_id: str) -> None:
source = job.get("source_url") or job.get("file_path")
progress_callback = _progress_updater(job_id)
+ overrides = (job.get("metadata") or {}).get("_overrides") or {}
try:
store.update_job(job_id, status="processing", progress=1, error=None)
@@ -137,17 +165,26 @@ def _process_job(job_id: str) -> None:
source,
mode=job["mode"],
output_format=job["format"],
+ model=overrides.get("transcribe_model"),
+ gpt_model=overrides.get("chat_model"),
+ use_smart_summary=overrides.get("smart"),
+ use_chapters=overrides.get("chapters"),
run_id=job_id,
source_name=job.get("source_name"),
created_at=job.get("created_at"),
progress_callback=progress_callback,
)
+
+ merged_metadata = result.get("metadata", {}) or {}
+ if overrides:
+ merged_metadata = {**merged_metadata, "_overrides": overrides}
+
store.update_job(
job_id,
status="completed",
progress=100,
input_type=result.get("input_type", job.get("input_type")),
- metadata=result.get("metadata", {}),
+ metadata=merged_metadata,
transcription=result.get("transcription"),
summary=result.get("summary"),
result_path=result.get("result_path"),
@@ -225,10 +262,20 @@ async def upload_files(
mode: Optional[str] = Form(None),
transcribe: Optional[bool] = Form(None),
summarize: Optional[bool] = Form(None),
+ chat_model: Optional[str] = Form(None),
+ transcribe_model: Optional[str] = Form(None),
+ smart: Optional[bool] = Form(None),
+ chapters: Optional[bool] = Form(None),
):
"""Upload de arquivos para processamento."""
resolved_mode = _resolve_mode(mode, transcribe, summarize)
+ overrides = _build_overrides(
+ chat_model=chat_model,
+ transcribe_model=transcribe_model,
+ smart=smart,
+ chapters=chapters,
+ )
valid_extensions = AUDIO_EXTENSIONS | VIDEO_EXTENSIONS | TEXT_EXTENSIONS | {".pdf"}
jobs = []
@@ -260,6 +307,7 @@ async def upload_files(
source_name=file.filename,
file_path=str(file_path),
input_type="file",
+ overrides=overrides,
)
background_tasks.add_task(_process_job, job["id"])
jobs.append(job["id"])
@@ -278,12 +326,20 @@ async def process_url(request: ProcessRequest, background_tasks: BackgroundTasks
detail="Playlists continuam disponiveis apenas no fluxo explicito da CLI.",
)
+ overrides = _build_overrides(
+ chat_model=request.chat_model,
+ transcribe_model=request.transcribe_model,
+ smart=request.smart,
+ chapters=request.chapters,
+ )
+
job = _create_job(
mode=resolved_mode,
output_format=request.format,
source_name=request.url,
source_url=request.url,
input_type="url",
+ overrides=overrides,
)
background_tasks.add_task(_process_job, job["id"])
return {"job_id": job["id"], "status": "processing", "mode": resolved_mode}
@@ -322,12 +378,16 @@ async def get_job_details(job_id: str):
if job["status"] not in {"completed", "interrupted"}:
raise HTTPException(status_code=400, detail="Job ainda nao concluido")
+ metadata = job.get("metadata", {}) or {}
return {
"id": job["id"],
"mode": job["mode"],
"transcription": job.get("transcription"),
"summary": job.get("summary"),
- "metadata": job.get("metadata", {}),
+ "metadata": metadata,
+ "smart_summary": metadata.get("smart_summary"),
+ "chapters": metadata.get("chapters") or [],
+ "content_type": metadata.get("content_type"),
"format": job.get("format", "docx"),
"result_path": job.get("result_path"),
"transcription_path": job.get("transcription_path"),
diff --git a/lazier/cli.py b/lazier/cli.py
@@ -18,6 +18,7 @@ from rich.console import Console
from rich.panel import Panel
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
+from .core.config import VALID_REASONING_EFFORTS, get_model_config, reset_model_config_cache
from .core.playlist import is_playlist_url, process_playlist
from .core.processing import process_source
from .core.cache import get_cache_manager
@@ -44,9 +45,16 @@ def _run_mode(
mode: str,
output: Optional[str],
format_type: str,
- model: str,
- gpt_model: str,
+ model: Optional[str],
+ gpt_model: Optional[str],
+ smart: Optional[bool] = None,
+ chapters: Optional[bool] = None,
+ reasoning: Optional[str] = None,
):
+ if reasoning:
+ os.environ["OPENAI_REASONING_EFFORT"] = reasoning
+ reset_model_config_cache()
+
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
@@ -80,6 +88,8 @@ def _run_mode(
output_format=format_type,
model=model,
gpt_model=gpt_model,
+ use_smart_summary=smart,
+ use_chapters=chapters,
output_path=output,
run_id=str(uuid.uuid4()),
source_name=Path(input_path).name if not input_path.startswith(("http://", "https://")) else input_path,
@@ -97,6 +107,10 @@ def _run_mode(
console.print(f"\n[bold green]✓ Processamento concluido![/bold green]")
console.print(f"[cyan]Arquivo gerado:[/cyan] {result['result_path']}")
+ if result.get("content_type"):
+ console.print(f"[cyan]Tipo detectado:[/cyan] {result['content_type']}")
+ if result.get("chapters"):
+ console.print(f"[cyan]Capitulos:[/cyan] {len(result['chapters'])} gerados")
@click.group(invoke_without_command=True)
@@ -120,13 +134,54 @@ def cli(ctx, input_path):
click.echo(ctx.get_help())
+def _model_options(func):
+ """Adiciona as flags compartilhadas de modelo/inteligencia."""
+ func = click.option(
+ "--reasoning",
+ type=click.Choice(sorted(VALID_REASONING_EFFORTS)),
+ default=None,
+ help="Esforco de raciocinio para modelos gpt-5/o-series (sobrescreve OPENAI_REASONING_EFFORT)",
+ )(func)
+ func = click.option(
+ "--chapters/--no-chapters",
+ "chapters_flag",
+ default=None,
+ help="Liga/desliga geracao de capitulos com timestamps (default: OPENAI_ENABLE_CHAPTERS)",
+ )(func)
+ func = click.option(
+ "--smart/--no-smart",
+ "smart_flag",
+ default=None,
+ help="Liga/desliga sumario estruturado (default: OPENAI_ENABLE_SMART_SUMMARY)",
+ )(func)
+ func = click.option(
+ "--gpt-model",
+ default=None,
+ help="Modelo de chat (default: OPENAI_CHAT_MODEL ou gpt-5-mini)",
+ )(func)
+ func = click.option(
+ "--model",
+ default=None,
+ help="Modelo de transcricao (default: OPENAI_TRANSCRIBE_MODEL ou gpt-4o-mini-transcribe)",
+ )(func)
+ return func
+
+
@cli.command()
@click.argument("input_path", type=str)
@click.option("--output", "-o", type=str, help="Nome do arquivo de saida")
@click.option("--format", "-f", "format_type", type=click.Choice(["docx", "txt", "md", "json", "pdf"]), default="docx", help="Formato de saida")
-@click.option("--model", default="whisper-1", help="Modelo Whisper (padrao: whisper-1)")
-@click.option("--gpt-model", default="gpt-4o-mini", help="Modelo GPT para conversao ao portugues (padrao: gpt-4o-mini)")
-def transcribe(input_path: str, output: Optional[str], format_type: str, model: str, gpt_model: str):
+@_model_options
+def transcribe(
+ input_path: str,
+ output: Optional[str],
+ format_type: str,
+ model: Optional[str],
+ gpt_model: Optional[str],
+ smart_flag: Optional[bool],
+ chapters_flag: Optional[bool],
+ reasoning: Optional[str],
+):
"""Transcreve ou converte o conteudo para portugues."""
_run_mode(
input_path=input_path,
@@ -135,6 +190,9 @@ def transcribe(input_path: str, output: Optional[str], format_type: str, model:
format_type=format_type,
model=model,
gpt_model=gpt_model,
+ smart=smart_flag,
+ chapters=chapters_flag,
+ reasoning=reasoning,
)
@@ -142,9 +200,17 @@ def transcribe(input_path: str, output: Optional[str], format_type: str, model:
@click.argument("input_path", type=str)
@click.option("--output", "-o", type=str, help="Nome do arquivo de saida")
@click.option("--format", "-f", "format_type", type=click.Choice(["docx", "txt", "md", "json", "pdf"]), default="docx", help="Formato de saida")
-@click.option("--model", default="whisper-1", help="Modelo Whisper (padrao: whisper-1)")
-@click.option("--gpt-model", default="gpt-4o-mini", help="Modelo GPT para sumario (padrao: gpt-4o-mini)")
-def summarize(input_path: str, output: Optional[str], format_type: str, model: str, gpt_model: str):
+@_model_options
+def summarize(
+ input_path: str,
+ output: Optional[str],
+ format_type: str,
+ model: Optional[str],
+ gpt_model: Optional[str],
+ smart_flag: Optional[bool],
+ chapters_flag: Optional[bool],
+ reasoning: Optional[str],
+):
"""Gera um sumario em portugues do conteudo informado."""
_run_mode(
input_path=input_path,
@@ -153,6 +219,27 @@ def summarize(input_path: str, output: Optional[str], format_type: str, model: s
format_type=format_type,
model=model,
gpt_model=gpt_model,
+ smart=smart_flag,
+ chapters=chapters_flag,
+ reasoning=reasoning,
+ )
+
+
+@cli.command()
+def config():
+ """Mostra a configuracao corrente de modelos."""
+ cfg = get_model_config(refresh=True)
+ console.print(
+ Panel.fit(
+ f"[bold]Modelos[/bold]\n\n"
+ f"Chat: [cyan]{cfg.chat_model}[/cyan]\n"
+ f"Transcricao: [cyan]{cfg.transcribe_model}[/cyan]\n"
+ f"Transcricao (timestamps): [cyan]{cfg.transcribe_timestamps_model}[/cyan]\n"
+ f"Reasoning effort: [cyan]{cfg.reasoning_effort}[/cyan]\n"
+ f"Smart summary: [cyan]{'on' if cfg.enable_smart_summary else 'off'}[/cyan]\n"
+ f"Capitulos: [cyan]{'on' if cfg.enable_chapters else 'off'}[/cyan]",
+ title="Lazier - Config Atual",
+ )
)
diff --git a/lazier/core/chapters.py b/lazier/core/chapters.py
@@ -0,0 +1,227 @@
+"""
+Constroi capitulos com timestamps a partir de segmentos transcritos.
+
+Recebe a lista de segmentos (`{"start": float, "end": float, "text": str}`) e
+pede ao modelo de chat (com Structured Outputs) os indices iniciais e finais de
+cada capitulo, alem de titulo e resumo curto. A partir desses indices,
+calculamos `start`/`end` reais.
+"""
+
+from __future__ import annotations
+
+import json
+import math
+import os
+from typing import Any, Dict, List, Optional
+
+try:
+ from openai import OpenAI
+except ImportError: # pragma: no cover
+ OpenAI = None
+
+from .config import get_model_config
+
+
+CHAPTERS_JSON_SCHEMA: Dict[str, Any] = {
+ "name": "ChaptersResponse",
+ "strict": True,
+ "schema": {
+ "type": "object",
+ "additionalProperties": False,
+ "required": ["chapters"],
+ "properties": {
+ "chapters": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "additionalProperties": False,
+ "required": ["title", "summary", "start_index", "end_index"],
+ "properties": {
+ "title": {"type": "string"},
+ "summary": {"type": "string"},
+ "start_index": {"type": "integer", "minimum": 0},
+ "end_index": {"type": "integer", "minimum": 0},
+ },
+ },
+ }
+ },
+ },
+}
+
+
+# Limites operacionais.
+MIN_SEGMENTS_FOR_CHAPTERS = 6
+MAX_CHAPTERS = 12
+MIN_CHAPTERS = 3
+MAX_SEGMENTS_INLINE = 1200 # se for maior, fazemos downsample
+
+
+def _ensure_client() -> "OpenAI":
+ api_key = os.getenv("OPENAI_API_KEY")
+ if not api_key:
+ raise Exception("OPENAI_API_KEY nao encontrada.")
+ if OpenAI is None:
+ raise Exception("openai nao esta instalado neste ambiente.")
+ return OpenAI(api_key=api_key)
+
+
+def _format_seconds(seconds: float) -> str:
+ seconds = max(0, int(seconds))
+ h = seconds // 3600
+ m = (seconds % 3600) // 60
+ s = seconds % 60
+ return f"{h:02d}:{m:02d}:{s:02d}"
+
+
+def _downsample_segments(segments: List[Dict[str, Any]], target: int) -> List[Dict[str, Any]]:
+ """Reduz a lista de segmentos enviada ao modelo preservando inicio/fim."""
+ if len(segments) <= target:
+ return segments
+ step = math.ceil(len(segments) / target)
+ sampled = [segments[i] for i in range(0, len(segments), step)]
+ if sampled[-1] is not segments[-1]:
+ sampled.append(segments[-1])
+ return sampled
+
+
+def build_chapters(
+ segments: List[Dict[str, Any]],
+ *,
+ model: Optional[str] = None,
+ content_type: Optional[str] = None,
+) -> List[Dict[str, Any]]:
+ """Gera lista de capitulos a partir dos segments. Sempre retorna lista.
+
+ Cada capitulo tem `title`, `summary`, `start`, `end`, `start_hms`, `end_hms`.
+ Quando nao for possivel gerar via LLM, devolve uma divisao por tempo simples.
+ """
+
+ if not segments or len(segments) < MIN_SEGMENTS_FOR_CHAPTERS:
+ return _fallback_chapters(segments)
+
+ config = get_model_config()
+ chosen_model = model or config.chat_model
+
+ sampled = _downsample_segments(segments, MAX_SEGMENTS_INLINE)
+
+ # Mapa de indice na amostra -> indice real e tempos absolutos.
+ indexed_sample = []
+ for sample_idx, seg in enumerate(sampled):
+ indexed_sample.append(
+ {
+ "i": sample_idx,
+ "start": float(seg.get("start", 0.0)),
+ "end": float(seg.get("end", 0.0)),
+ "text": (seg.get("text") or "").strip(),
+ }
+ )
+
+ persona = ""
+ if content_type:
+ persona = f"O conteudo foi classificado como '{content_type}'. "
+
+ system = (
+ persona
+ + "Voce divide transcricoes em capitulos coerentes baseando-se em mudancas de "
+ "topico. Use APENAS os indices fornecidos. Cada capitulo deve cobrir um intervalo "
+ f"continuo. Gere entre {MIN_CHAPTERS} e {MAX_CHAPTERS} capitulos, sem sobrepor."
+ )
+ user = (
+ "Segmentos disponiveis (cada um com indice 'i', tempo de inicio em segundos e "
+ "trecho de texto):\n\n"
+ + json.dumps(indexed_sample, ensure_ascii=False)
+ + "\n\nDevolva uma lista de capitulos com title curto (<=80 chars) e summary (1-2 frases)."
+ )
+
+ try:
+ client = _ensure_client()
+ kwargs: Dict[str, Any] = {
+ "model": chosen_model,
+ "messages": [
+ {"role": "system", "content": system},
+ {"role": "user", "content": user},
+ ],
+ "response_format": {"type": "json_schema", "json_schema": CHAPTERS_JSON_SCHEMA},
+ }
+ if config.supports_reasoning(chosen_model):
+ kwargs["reasoning_effort"] = config.reasoning_effort
+ else:
+ kwargs["temperature"] = 0.2
+
+ response = client.chat.completions.create(**kwargs)
+ raw = response.choices[0].message.content or "{}"
+ data = json.loads(raw)
+ chapters = data.get("chapters") or []
+ return _materialize_chapters(chapters, indexed_sample, segments)
+ except Exception as exc:
+ print(f"Aviso: geracao de capitulos via LLM falhou ({exc}); usando fallback.")
+ return _fallback_chapters(segments)
+
+
+def _materialize_chapters(
+ raw_chapters: List[Dict[str, Any]],
+ indexed_sample: List[Dict[str, Any]],
+ full_segments: List[Dict[str, Any]],
+) -> List[Dict[str, Any]]:
+ if not raw_chapters:
+ return _fallback_chapters(full_segments)
+
+ materialized: List[Dict[str, Any]] = []
+ sample_count = len(indexed_sample)
+
+ for chapter in raw_chapters:
+ start_idx = max(0, min(int(chapter.get("start_index", 0)), sample_count - 1))
+ end_idx = max(start_idx, min(int(chapter.get("end_index", start_idx)), sample_count - 1))
+ start_seconds = float(indexed_sample[start_idx]["start"])
+ end_seconds = float(indexed_sample[end_idx]["end"])
+ materialized.append(
+ {
+ "title": (chapter.get("title") or "").strip() or f"Capitulo {len(materialized) + 1}",
+ "summary": (chapter.get("summary") or "").strip(),
+ "start": start_seconds,
+ "end": end_seconds,
+ "start_hms": _format_seconds(start_seconds),
+ "end_hms": _format_seconds(end_seconds),
+ }
+ )
+
+ materialized.sort(key=lambda c: c["start"])
+ # Garante que nao haja sobreposicao agressiva.
+ for i in range(1, len(materialized)):
+ if materialized[i]["start"] < materialized[i - 1]["end"]:
+ materialized[i]["start"] = materialized[i - 1]["end"]
+ materialized[i]["start_hms"] = _format_seconds(materialized[i]["start"])
+ return materialized
+
+
+def _fallback_chapters(segments: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ """Divide o audio em ate 5 capitulos por tempo quando nao temos LLM."""
+ if not segments:
+ return []
+
+ total_start = float(segments[0].get("start", 0.0))
+ total_end = float(segments[-1].get("end", total_start))
+ duration = max(total_end - total_start, 1.0)
+
+ target_chunks = min(5, max(MIN_CHAPTERS, len(segments) // 30 or MIN_CHAPTERS))
+ if len(segments) < MIN_SEGMENTS_FOR_CHAPTERS:
+ target_chunks = 1
+
+ boundaries = [
+ total_start + (duration * i / target_chunks) for i in range(target_chunks + 1)
+ ]
+ chapters: List[Dict[str, Any]] = []
+ for i in range(target_chunks):
+ start = boundaries[i]
+ end = boundaries[i + 1]
+ chapters.append(
+ {
+ "title": f"Parte {i + 1}",
+ "summary": "",
+ "start": start,
+ "end": end,
+ "start_hms": _format_seconds(start),
+ "end_hms": _format_seconds(end),
+ }
+ )
+ return chapters
diff --git a/lazier/core/config.py b/lazier/core/config.py
@@ -0,0 +1,107 @@
+"""
+Configuracao centralizada dos modelos OpenAI e flags de comportamento.
+
+Todas as opcoes podem ser sobrescritas via .env ou variaveis de ambiente. A
+funcao `get_model_config()` faz cache em memoria e e usada pela CLI, API e
+pipeline para escolher modelos, ativar capitulos/sumario inteligente etc.
+"""
+
+from __future__ import annotations
+
+import os
+from dataclasses import dataclass, asdict
+from typing import Optional
+
+try:
+ from dotenv import load_dotenv
+except ImportError: # pragma: no cover
+ def load_dotenv():
+ return False
+
+
+load_dotenv()
+
+
+# Defaults shipados (Maio 2026):
+# - chat: gpt-5-mini -> equilibrio qualidade/custo
+# - transcribe: gpt-4o-mini-transcribe -> sucessor de whisper-1, mais preciso
+# - timestamps: whisper-1 -> ainda e a forma confiavel de obter `verbose_json`
+# com segments para construir capitulos.
+DEFAULT_CHAT_MODEL = "gpt-5-mini"
+DEFAULT_TRANSCRIBE_MODEL = "gpt-4o-mini-transcribe"
+DEFAULT_TRANSCRIBE_TIMESTAMPS_MODEL = "whisper-1"
+DEFAULT_REASONING_EFFORT = "medium"
+
+VALID_REASONING_EFFORTS = {"minimal", "low", "medium", "high"}
+
+
+def _env_bool(key: str, default: bool) -> bool:
+ raw = os.getenv(key)
+ if raw is None:
+ return default
+ return raw.strip().lower() in {"1", "true", "yes", "on", "y"}
+
+
+def _env_str(key: str, default: str) -> str:
+ raw = os.getenv(key)
+ if raw is None or not raw.strip():
+ return default
+ return raw.strip()
+
+
+@dataclass(frozen=True)
+class ModelConfig:
+ """Snapshot imutavel de configuracao de modelos."""
+
+ chat_model: str = DEFAULT_CHAT_MODEL
+ transcribe_model: str = DEFAULT_TRANSCRIBE_MODEL
+ transcribe_timestamps_model: str = DEFAULT_TRANSCRIBE_TIMESTAMPS_MODEL
+ enable_chapters: bool = True
+ enable_smart_summary: bool = True
+ reasoning_effort: str = DEFAULT_REASONING_EFFORT
+
+ def supports_reasoning(self, model: Optional[str] = None) -> bool:
+ """Indica se devemos enviar `reasoning_effort` para um modelo."""
+ target = (model or self.chat_model or "").lower()
+ # Modelos da familia gpt-5 e o-series aceitam reasoning_effort.
+ return target.startswith("gpt-5") or target.startswith("o1") or target.startswith("o3") or target.startswith("o4")
+
+ def as_dict(self) -> dict:
+ return asdict(self)
+
+
+_cached_config: Optional[ModelConfig] = None
+
+
+def get_model_config(refresh: bool = False) -> ModelConfig:
+ """Retorna a configuracao corrente, com cache em memoria.
+
+ Use `refresh=True` em testes que mudam variaveis de ambiente.
+ """
+
+ global _cached_config
+ if _cached_config is not None and not refresh:
+ return _cached_config
+
+ reasoning = _env_str("OPENAI_REASONING_EFFORT", DEFAULT_REASONING_EFFORT).lower()
+ if reasoning not in VALID_REASONING_EFFORTS:
+ reasoning = DEFAULT_REASONING_EFFORT
+
+ _cached_config = ModelConfig(
+ chat_model=_env_str("OPENAI_CHAT_MODEL", DEFAULT_CHAT_MODEL),
+ transcribe_model=_env_str("OPENAI_TRANSCRIBE_MODEL", DEFAULT_TRANSCRIBE_MODEL),
+ transcribe_timestamps_model=_env_str(
+ "OPENAI_TRANSCRIBE_TIMESTAMPS_MODEL", DEFAULT_TRANSCRIBE_TIMESTAMPS_MODEL
+ ),
+ enable_chapters=_env_bool("OPENAI_ENABLE_CHAPTERS", True),
+ enable_smart_summary=_env_bool("OPENAI_ENABLE_SMART_SUMMARY", True),
+ reasoning_effort=reasoning,
+ )
+ return _cached_config
+
+
+def reset_model_config_cache() -> None:
+ """Limpa o cache (uso em testes)."""
+
+ global _cached_config
+ _cached_config = None
diff --git a/lazier/core/content_type.py b/lazier/core/content_type.py
@@ -0,0 +1,131 @@
+"""
+Deteccao automatica do tipo de conteudo a sumarizar.
+
+Faz uma chamada curta ao modelo de chat com Structured Outputs e devolve um
+dos tipos suportados (`lecture`, `podcast`, `interview`, `news`, `tutorial`,
+`meeting`, `tech_doc`, `other`). Em caso de erro, devolve um default seguro
+sem propagar a excecao para nao quebrar o pipeline principal.
+"""
+
+from __future__ import annotations
+
+import json
+import os
+from typing import Any, Dict, List, Optional
+
+try:
+ from openai import OpenAI
+except ImportError: # pragma: no cover
+ OpenAI = None
+
+from .config import get_model_config
+
+
+SUPPORTED_TYPES: List[str] = [
+ "lecture",
+ "podcast",
+ "interview",
+ "news",
+ "tutorial",
+ "meeting",
+ "tech_doc",
+ "other",
+]
+
+
+CONTENT_TYPE_JSON_SCHEMA: Dict[str, Any] = {
+ "name": "ContentType",
+ "strict": True,
+ "schema": {
+ "type": "object",
+ "additionalProperties": False,
+ "required": ["content_type", "confidence", "rationale"],
+ "properties": {
+ "content_type": {"type": "string", "enum": SUPPORTED_TYPES},
+ "confidence": {"type": "number", "minimum": 0.0, "maximum": 1.0},
+ "rationale": {"type": "string"},
+ },
+ },
+}
+
+
+SAMPLE_CHAR_BUDGET = 8_000
+
+
+def _ensure_client() -> "OpenAI":
+ api_key = os.getenv("OPENAI_API_KEY")
+ if not api_key:
+ raise Exception("OPENAI_API_KEY nao encontrada.")
+ if OpenAI is None:
+ raise Exception("openai nao esta instalado neste ambiente.")
+ return OpenAI(api_key=api_key)
+
+
+def _build_sample(text: str) -> str:
+ """Pega comeco + fim do texto para detectar o tipo sem gastar tokens demais."""
+ if len(text) <= SAMPLE_CHAR_BUDGET:
+ return text
+ half = SAMPLE_CHAR_BUDGET // 2
+ return text[:half] + "\n\n[...trecho omitido...]\n\n" + text[-half:]
+
+
+def detect_content_type(
+ text: str,
+ metadata: Optional[Dict[str, Any]] = None,
+ model: Optional[str] = None,
+) -> Dict[str, Any]:
+ """Detecta o tipo do conteudo. Sempre retorna um dict.
+
+ Returns:
+ { "content_type": str, "confidence": float, "rationale": str }
+ """
+
+ default_response = {"content_type": "other", "confidence": 0.0, "rationale": ""}
+
+ if not text or not text.strip():
+ return default_response
+
+ config = get_model_config()
+ chosen_model = model or config.chat_model
+
+ sample = _build_sample(text)
+ title = (metadata or {}).get("title") or ""
+ uploader = (metadata or {}).get("uploader") or ""
+ hint_block = ""
+ if title or uploader:
+ hint_block = f"Titulo conhecido: {title}\nAutor/canal: {uploader}\n\n"
+
+ system = (
+ "Voce classifica conteudos em um dos tipos: lecture, podcast, interview, news, "
+ "tutorial, meeting, tech_doc, other. Use 'other' apenas quando nenhum dos "
+ "demais se encaixar bem."
+ )
+ user = (
+ f"{hint_block}Classifique o conteudo abaixo. Devolva tambem confidence (0-1) e "
+ "uma rationale curta em portugues.\n\nTrecho:\n\n" + sample
+ )
+
+ try:
+ client = _ensure_client()
+ kwargs: Dict[str, Any] = {
+ "model": chosen_model,
+ "messages": [
+ {"role": "system", "content": system},
+ {"role": "user", "content": user},
+ ],
+ "response_format": {"type": "json_schema", "json_schema": CONTENT_TYPE_JSON_SCHEMA},
+ }
+ if config.supports_reasoning(chosen_model):
+ kwargs["reasoning_effort"] = "minimal"
+ else:
+ kwargs["temperature"] = 0.0
+
+ response = client.chat.completions.create(**kwargs)
+ raw = response.choices[0].message.content or "{}"
+ parsed = json.loads(raw)
+ if parsed.get("content_type") not in SUPPORTED_TYPES:
+ parsed["content_type"] = "other"
+ return parsed
+ except Exception as exc:
+ print(f"Aviso: deteccao de tipo de conteudo falhou ({exc}); usando 'other'.")
+ return default_response
diff --git a/lazier/core/formats.py b/lazier/core/formats.py
@@ -1,11 +1,16 @@
"""
-Exportadores de múltiplos formatos (TXT, Markdown, JSON, DOCX, PDF)
+Exportadores de multiplos formatos (TXT, Markdown, JSON, DOCX, PDF).
+
+Quando `metadata` contiver `smart_summary` e/ou `chapters`, secoes dedicadas
+sao renderizadas em todos os formatos. Quando essas chaves nao existem, o
+comportamento legado (texto livre) e mantido.
"""
import json
from datetime import datetime
from pathlib import Path
-from typing import Optional, Dict, Any
+from typing import Any, Dict, List, Optional
+
try:
from docx import Document # noqa: F401
from docx.shared import Pt, Inches, RGBColor # noqa: F401
@@ -18,92 +23,189 @@ from ..docx_generator import _format_duration
from ..utils import sanitize_xml_string
+# ---------------------------------------------------------------------------
+# Helpers de renderizacao do smart summary e capitulos
+# ---------------------------------------------------------------------------
+
+SMART_SECTION_LABELS = [
+ ("key_points", "Pontos-chave"),
+ ("decisions", "Decisoes"),
+ ("topics", "Topicos"),
+ ("quotes", "Citacoes"),
+ ("open_questions", "Perguntas em aberto"),
+]
+
+
+def _format_action_item(action: Dict[str, Any]) -> str:
+ owner = (action.get("owner") or "").strip() or "(sem responsavel)"
+ task = (action.get("task") or "").strip()
+ due = (action.get("due_hint") or "").strip()
+ rendered = f"{owner}: {task}".strip(": ").strip()
+ if due:
+ rendered += f" (prazo: {due})"
+ return rendered
+
+
+def _smart_summary_lines_txt(smart: Dict[str, Any], heading_decoration: str = "") -> List[str]:
+ lines: List[str] = []
+ tldr = (smart.get("tldr") or "").strip()
+ if tldr:
+ lines.append(f"{heading_decoration}TL;DR")
+ lines.append(tldr)
+ lines.append("")
+
+ for key, label in SMART_SECTION_LABELS:
+ items = smart.get(key) or []
+ if not items:
+ continue
+ lines.append(f"{heading_decoration}{label}")
+ for item in items:
+ lines.append(f" - {item}")
+ lines.append("")
+
+ actions = smart.get("action_items") or []
+ if actions:
+ lines.append(f"{heading_decoration}Acoes")
+ for action in actions:
+ lines.append(f" - {_format_action_item(action)}")
+ lines.append("")
+ return lines
+
+
+def _chapters_lines_txt(chapters: List[Dict[str, Any]]) -> List[str]:
+ lines: List[str] = []
+ if not chapters:
+ return lines
+ lines.append("Capitulos")
+ for chapter in chapters:
+ title = chapter.get("title") or "Capitulo"
+ start = chapter.get("start_hms") or _format_duration(int(chapter.get("start", 0)))
+ end = chapter.get("end_hms") or _format_duration(int(chapter.get("end", 0)))
+ summary = (chapter.get("summary") or "").strip()
+ line = f" - [{start} - {end}] {title}"
+ if summary:
+ line += f": {summary}"
+ lines.append(line)
+ lines.append("")
+ return lines
+
+
+def _smart_summary_lines_md(smart: Dict[str, Any]) -> List[str]:
+ lines: List[str] = []
+ tldr = (smart.get("tldr") or "").strip()
+ if tldr:
+ lines.append("### TL;DR")
+ lines.append("")
+ lines.append(tldr)
+ lines.append("")
+
+ for key, label in SMART_SECTION_LABELS:
+ items = smart.get(key) or []
+ if not items:
+ continue
+ lines.append(f"### {label}")
+ lines.append("")
+ for item in items:
+ lines.append(f"- {item}")
+ lines.append("")
+
+ actions = smart.get("action_items") or []
+ if actions:
+ lines.append("### Acoes")
+ lines.append("")
+ for action in actions:
+ lines.append(f"- {_format_action_item(action)}")
+ lines.append("")
+ return lines
+
+
+def _chapters_lines_md(chapters: List[Dict[str, Any]]) -> List[str]:
+ if not chapters:
+ return []
+ lines = ["## Capitulos", ""]
+ for chapter in chapters:
+ title = chapter.get("title") or "Capitulo"
+ start = chapter.get("start_hms") or _format_duration(int(chapter.get("start", 0)))
+ end = chapter.get("end_hms") or _format_duration(int(chapter.get("end", 0)))
+ summary = (chapter.get("summary") or "").strip()
+ lines.append(f"- **{start} - {end}** | {title}")
+ if summary:
+ lines.append(f" - {summary}")
+ lines.append("")
+ return lines
+
+
def export_txt(
transcription: str,
summary: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
output_path: str = "output.txt"
) -> str:
- """
- Exporta transcrição e sumário em formato TXT
-
- Args:
- transcription: Texto transcrito
- summary: Texto sumarizado (opcional)
- metadata: Metadados do vídeo/áudio
- output_path: Caminho do arquivo de saída
-
- Returns:
- Caminho do arquivo criado
- """
output_path_obj = Path(output_path)
output_path_obj.parent.mkdir(parents=True, exist_ok=True)
-
- lines = []
-
- # Título
- title = metadata.get('title', 'Transcrição') if metadata else 'Transcrição'
+
+ metadata = metadata or {}
+ smart = metadata.get("smart_summary")
+ chapters = metadata.get("chapters") or []
+ content_type = metadata.get("content_type")
+
+ lines: List[str] = []
+
+ title = metadata.get("title", "Transcricao")
lines.append("=" * 80)
lines.append(title.center(80))
lines.append("=" * 80)
lines.append("")
-
- # Metadados
+
lines.append(f"Data de processamento: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
-
- if metadata:
- if metadata.get('duration'):
- duration_sec = metadata['duration']
- duration_str = _format_duration(duration_sec)
- lines.append(f"Duração: {duration_str}")
-
- if metadata.get('uploader'):
- lines.append(f"Canal/Criador: {metadata['uploader']}")
-
- if metadata.get('webpage_url'):
- lines.append(f"Link do vídeo: {metadata['webpage_url']}")
-
+
+ if metadata.get("duration"):
+ lines.append(f"Duracao: {_format_duration(metadata['duration'])}")
+ if metadata.get("uploader"):
+ lines.append(f"Canal/Criador: {metadata['uploader']}")
+ if metadata.get("webpage_url"):
+ lines.append(f"Link: {metadata['webpage_url']}")
+ if content_type:
+ lines.append(f"Tipo detectado: {content_type}")
+
lines.append("")
lines.append("-" * 80)
lines.append("")
-
- # Sumário
- if summary:
- lines.append("SUMÁRIO")
+
+ if smart:
+ lines.append("SUMARIO INTELIGENTE")
+ lines.append("-" * 80)
+ lines.extend(_smart_summary_lines_txt(smart))
lines.append("-" * 80)
-
- # Divide sumário em parágrafos e preserva estrutura
- summary_paragraphs = summary.split('\n\n')
- if len(summary_paragraphs) == 1:
- summary_paragraphs = summary.split('\n')
-
- for para in summary_paragraphs:
+ lines.append("")
+ elif summary:
+ lines.append("SUMARIO")
+ lines.append("-" * 80)
+ for para in (summary.split("\n\n") if "\n\n" in summary else summary.split("\n")):
if para.strip():
lines.append(para.strip())
- lines.append("") # Linha em branco entre parágrafos
-
+ lines.append("")
+ lines.append("-" * 80)
+ lines.append("")
+
+ if chapters:
+ lines.append("CAPITULOS")
+ lines.append("-" * 80)
+ lines.extend(_chapters_lines_txt(chapters))
lines.append("-" * 80)
lines.append("")
-
- # Transcrição
+
if transcription and transcription.strip():
- lines.append("TRANSCRIÇÃO COMPLETA")
+ lines.append("TRANSCRICAO COMPLETA")
lines.append("-" * 80)
-
- # Divide transcrição em parágrafos e preserva estrutura
- transcription_paragraphs = transcription.split('\n\n')
- if len(transcription_paragraphs) == 1:
- transcription_paragraphs = transcription.split('\n')
-
- for para in transcription_paragraphs:
+ for para in (transcription.split("\n\n") if "\n\n" in transcription else transcription.split("\n")):
if para.strip():
lines.append(para.strip())
- lines.append("") # Linha em branco entre parágrafos
-
- # Salva arquivo
- with open(output_path_obj, 'w', encoding='utf-8') as f:
- f.write('\n'.join(lines))
-
+ lines.append("")
+
+ with open(output_path_obj, "w", encoding="utf-8") as f:
+ f.write("\n".join(lines))
+
return str(output_path_obj)
@@ -113,84 +215,65 @@ def export_markdown(
metadata: Optional[Dict[str, Any]] = None,
output_path: str = "output.md"
) -> str:
- """
- Exporta transcrição e sumário em formato Markdown
-
- Args:
- transcription: Texto transcrito
- summary: Texto sumarizado (opcional)
- metadata: Metadados do vídeo/áudio
- output_path: Caminho do arquivo de saída
-
- Returns:
- Caminho do arquivo criado
- """
output_path_obj = Path(output_path)
output_path_obj.parent.mkdir(parents=True, exist_ok=True)
-
- lines = []
-
- # Título
- title = metadata.get('title', 'Transcrição') if metadata else 'Transcrição'
+
+ metadata = metadata or {}
+ smart = metadata.get("smart_summary")
+ chapters = metadata.get("chapters") or []
+ content_type = metadata.get("content_type")
+
+ lines: List[str] = []
+
+ title = metadata.get("title", "Transcricao")
lines.append(f"# {title}")
lines.append("")
-
- # Metadados
+
lines.append("## Metadados")
lines.append("")
lines.append(f"- **Data de processamento:** {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
-
- if metadata:
- if metadata.get('duration'):
- duration_sec = metadata['duration']
- duration_str = _format_duration(duration_sec)
- lines.append(f"- **Duração:** {duration_str}")
-
- if metadata.get('uploader'):
- lines.append(f"- **Canal/Criador:** {metadata['uploader']}")
-
- if metadata.get('webpage_url'):
- lines.append(f"- **Link do vídeo:** [{metadata['webpage_url']}]({metadata['webpage_url']})")
-
+ if metadata.get("duration"):
+ lines.append(f"- **Duracao:** {_format_duration(metadata['duration'])}")
+ if metadata.get("uploader"):
+ lines.append(f"- **Canal/Criador:** {metadata['uploader']}")
+ if metadata.get("webpage_url"):
+ lines.append(f"- **Link:** [{metadata['webpage_url']}]({metadata['webpage_url']})")
+ if content_type:
+ lines.append(f"- **Tipo detectado:** `{content_type}`")
lines.append("")
-
- # Sumário
- if summary:
- lines.append("## Sumário")
+
+ if smart:
+ lines.append("## Sumario inteligente")
lines.append("")
-
- # Divide sumário em parágrafos e formata corretamente
- summary_paragraphs = summary.split('\n\n')
- if len(summary_paragraphs) == 1:
- summary_paragraphs = summary.split('\n')
-
- for para in summary_paragraphs:
+ lines.extend(_smart_summary_lines_md(smart))
+ lines.append("---")
+ lines.append("")
+ elif summary:
+ lines.append("## Sumario")
+ lines.append("")
+ for para in (summary.split("\n\n") if "\n\n" in summary else summary.split("\n")):
if para.strip():
lines.append(para.strip())
- lines.append("") # Linha em branco entre parágrafos (Markdown requer)
-
+ lines.append("")
+ lines.append("---")
+ lines.append("")
+
+ if chapters:
+ lines.extend(_chapters_lines_md(chapters))
lines.append("---")
lines.append("")
-
- # Transcrição
+
if transcription and transcription.strip():
- lines.append("## Transcrição Completa")
+ lines.append("## Transcricao Completa")
lines.append("")
-
- # Divide transcrição em parágrafos e formata corretamente
- transcription_paragraphs = transcription.split('\n\n')
- if len(transcription_paragraphs) == 1:
- transcription_paragraphs = transcription.split('\n')
-
- for para in transcription_paragraphs:
+ for para in (transcription.split("\n\n") if "\n\n" in transcription else transcription.split("\n")):
if para.strip():
lines.append(para.strip())
- lines.append("") # Linha em branco entre parágrafos (Markdown requer)
-
- # Salva arquivo
- with open(output_path_obj, 'w', encoding='utf-8') as f:
- f.write('\n'.join(lines))
-
+ lines.append("")
+
+ with open(output_path_obj, "w", encoding="utf-8") as f:
+ f.write("\n".join(lines))
+
return str(output_path_obj)
@@ -200,37 +283,29 @@ def export_json(
metadata: Optional[Dict[str, Any]] = None,
output_path: str = "output.json"
) -> str:
- """
- Exporta transcrição e sumário em formato JSON
-
- Args:
- transcription: Texto transcrito
- summary: Texto sumarizado (opcional)
- metadata: Metadados do vídeo/áudio
- output_path: Caminho do arquivo de saída
-
- Returns:
- Caminho do arquivo criado
- """
output_path_obj = Path(output_path)
output_path_obj.parent.mkdir(parents=True, exist_ok=True)
-
- data = {
- 'metadata': {
- 'title': metadata.get('title', 'Transcrição') if metadata else 'Transcrição',
- 'processed_at': datetime.now().isoformat(),
- 'duration': metadata.get('duration') if metadata else None,
- 'uploader': metadata.get('uploader') if metadata else None,
- 'webpage_url': metadata.get('webpage_url') if metadata else None,
+
+ metadata = metadata or {}
+
+ data: Dict[str, Any] = {
+ "metadata": {
+ "title": metadata.get("title", "Transcricao"),
+ "processed_at": datetime.now().isoformat(),
+ "duration": metadata.get("duration"),
+ "uploader": metadata.get("uploader"),
+ "webpage_url": metadata.get("webpage_url"),
+ "content_type": metadata.get("content_type"),
},
- 'transcription': transcription if transcription and transcription.strip() else None,
- 'summary': summary,
+ "transcription": transcription if transcription and transcription.strip() else None,
+ "summary": summary,
+ "smart_summary": metadata.get("smart_summary"),
+ "chapters": metadata.get("chapters") or [],
}
-
- # Salva arquivo
- with open(output_path_obj, 'w', encoding='utf-8') as f:
+
+ with open(output_path_obj, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
-
+
return str(output_path_obj)
@@ -240,18 +315,6 @@ def export_docx(
metadata: Optional[Dict[str, Any]] = None,
output_path: str = "output.docx"
) -> str:
- """
- Exporta transcrição e sumário em formato DOCX (reutiliza função existente)
-
- Args:
- transcription: Texto transcrito
- summary: Texto sumarizado (opcional)
- metadata: Metadados do vídeo/áudio
- output_path: Caminho do arquivo de saída
-
- Returns:
- Caminho do arquivo criado
- """
from ..docx_generator import create_document
return create_document(transcription, summary, metadata, output_path)
@@ -262,85 +325,121 @@ def export_pdf(
metadata: Optional[Dict[str, Any]] = None,
output_path: str = "output.pdf"
) -> str:
- """
- Exporta transcrição e sumário em formato PDF.
-
- Args:
- transcription: Texto transcrito
- summary: Texto sumarizado (opcional)
- metadata: Metadados do vídeo/áudio
- output_path: Caminho do arquivo de saída
-
- Returns:
- Caminho do arquivo criado
- """
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
-
+
output_path_obj = Path(output_path)
output_path_obj.parent.mkdir(parents=True, exist_ok=True)
-
+
+ metadata = metadata or {}
+ smart = metadata.get("smart_summary")
+ chapters = metadata.get("chapters") or []
+
doc = SimpleDocTemplate(
str(output_path_obj),
pagesize=A4,
- rightMargin=2*cm,
- leftMargin=2*cm,
- topMargin=2*cm,
- bottomMargin=2*cm,
+ rightMargin=2 * cm,
+ leftMargin=2 * cm,
+ topMargin=2 * cm,
+ bottomMargin=2 * cm,
)
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
- 'CustomTitle',
- parent=styles['Heading1'],
+ "CustomTitle",
+ parent=styles["Heading1"],
fontSize=16,
spaceAfter=12,
alignment=1,
)
- heading_style = styles['Heading2']
- body_style = styles['Normal']
-
+ heading_style = styles["Heading2"]
+ sub_heading_style = styles["Heading3"]
+ body_style = styles["Normal"]
+
story = []
-
- title = sanitize_xml_string(
- metadata.get('title', 'Transcrição') if metadata else 'Transcrição'
- )
+
+ title = sanitize_xml_string(metadata.get("title", "Transcricao"))
story.append(Paragraph(title, title_style))
story.append(Spacer(1, 12))
-
+
meta_line = f"Data de processamento: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}"
- if metadata:
- if metadata.get('duration'):
- duration_str = _format_duration(metadata['duration'])
- meta_line += f" | Duração: {duration_str}"
- if metadata.get('uploader'):
- meta_line += f" | Canal/Criador: {sanitize_xml_string(str(metadata['uploader']))}"
- if metadata.get('webpage_url'):
- meta_line += f" | Link do vídeo: {sanitize_xml_string(str(metadata['webpage_url']))}"
+ if metadata.get("duration"):
+ meta_line += f" | Duracao: {_format_duration(metadata['duration'])}"
+ if metadata.get("uploader"):
+ meta_line += f" | Canal/Criador: {sanitize_xml_string(str(metadata['uploader']))}"
+ if metadata.get("webpage_url"):
+ meta_line += f" | Link: {sanitize_xml_string(str(metadata['webpage_url']))}"
+ if metadata.get("content_type"):
+ meta_line += f" | Tipo: {metadata['content_type']}"
story.append(Paragraph(sanitize_xml_string(meta_line), body_style))
story.append(Spacer(1, 16))
-
- if summary:
- story.append(Paragraph("Sumário", heading_style))
+
+ def _safe(text: str) -> str:
+ cleaned = sanitize_xml_string(text or "")
+ return (
+ cleaned.replace("&", "&")
+ .replace("<", "<")
+ .replace(">", ">")
+ .replace("**", "")
+ .replace("\n", "<br/>")
+ )
+
+ if smart:
+ story.append(Paragraph("Sumario inteligente", heading_style))
story.append(Spacer(1, 8))
- for para in summary.split('\n\n'):
+ if smart.get("tldr"):
+ story.append(Paragraph("TL;DR", sub_heading_style))
+ story.append(Paragraph(_safe(smart["tldr"]), body_style))
+ story.append(Spacer(1, 6))
+ for key, label in SMART_SECTION_LABELS:
+ items = smart.get(key) or []
+ if not items:
+ continue
+ story.append(Paragraph(label, sub_heading_style))
+ for item in items:
+ story.append(Paragraph(f"• {_safe(str(item))}", body_style))
+ story.append(Spacer(1, 6))
+ actions = smart.get("action_items") or []
+ if actions:
+ story.append(Paragraph("Acoes", sub_heading_style))
+ for action in actions:
+ story.append(Paragraph(f"• {_safe(_format_action_item(action))}", body_style))
+ story.append(Spacer(1, 6))
+ story.append(Spacer(1, 12))
+ elif summary:
+ story.append(Paragraph("Sumario", heading_style))
+ story.append(Spacer(1, 8))
+ for para in summary.split("\n\n"):
if not para.strip():
continue
- text = sanitize_xml_string(para.strip()).replace('&', '&').replace('<', '<').replace('>', '>').replace('\n', '<br/>').replace('**', '')
- story.append(Paragraph(text, body_style))
- story.append(Spacer(1, 16))
-
+ story.append(Paragraph(_safe(para.strip()), body_style))
+ story.append(Spacer(1, 12))
+
+ if chapters:
+ story.append(Paragraph("Capitulos", heading_style))
+ story.append(Spacer(1, 8))
+ for chapter in chapters:
+ start = chapter.get("start_hms") or _format_duration(int(chapter.get("start", 0)))
+ end = chapter.get("end_hms") or _format_duration(int(chapter.get("end", 0)))
+ title = chapter.get("title") or "Capitulo"
+ line = f"<b>[{start} - {end}]</b> {_safe(title)}"
+ story.append(Paragraph(line, body_style))
+ chapter_summary = (chapter.get("summary") or "").strip()
+ if chapter_summary:
+ story.append(Paragraph(_safe(chapter_summary), body_style))
+ story.append(Spacer(1, 4))
+ story.append(Spacer(1, 12))
+
transcription_safe = sanitize_xml_string(transcription or "")
if transcription_safe.strip():
- story.append(Paragraph("Transcrição Completa", heading_style))
+ story.append(Paragraph("Transcricao Completa", heading_style))
story.append(Spacer(1, 8))
- for para in transcription_safe.split('\n\n'):
+ for para in transcription_safe.split("\n\n"):
if not para.strip():
continue
- text = para.strip().replace('&', '&').replace('<', '<').replace('>', '>').replace('\n', '<br/>').replace('**', '')
- story.append(Paragraph(text, body_style))
-
+ story.append(Paragraph(_safe(para.strip()), body_style))
+
doc.build(story)
return str(output_path_obj)
@@ -352,35 +451,21 @@ def export(
output_path: str = "output",
format_type: str = "docx"
) -> str:
- """
- Exporta transcrição e sumário no formato especificado
-
- Args:
- transcription: Texto transcrito
- summary: Texto sumarizado (opcional)
- metadata: Metadados do vídeo/áudio
- output_path: Caminho base do arquivo (sem extensão)
- format_type: Tipo de formato (txt, md, json, docx, pdf)
-
- Returns:
- Caminho do arquivo criado
- """
- # Adiciona extensão se não tiver
output_path_obj = Path(output_path)
if not output_path_obj.suffix:
output_path = f"{output_path}.{format_type}"
-
+
format_type = format_type.lower()
-
- if format_type == 'txt':
+
+ if format_type == "txt":
return export_txt(transcription, summary, metadata, output_path)
- elif format_type == 'md' or format_type == 'markdown':
+ elif format_type in {"md", "markdown"}:
return export_markdown(transcription, summary, metadata, output_path)
- elif format_type == 'json':
+ elif format_type == "json":
return export_json(transcription, summary, metadata, output_path)
- elif format_type == 'docx':
+ elif format_type == "docx":
return export_docx(transcription, summary, metadata, output_path)
- elif format_type == 'pdf':
+ elif format_type == "pdf":
return export_pdf(transcription, summary, metadata, output_path)
else:
- raise ValueError(f"Formato não suportado: {format_type}. Use: txt, md, json, docx, pdf")
+ raise ValueError(f"Formato nao suportado: {format_type}. Use: txt, md, json, docx, pdf")
diff --git a/lazier/core/processing.py b/lazier/core/processing.py
@@ -1,25 +1,47 @@
"""
Pipeline compartilhado de processamento para CLI e API.
+
+Inclui as etapas de:
+- Validacao de input
+- Download (YouTube/web) ou preparacao de audio
+- Transcricao (com ou sem timestamps)
+- Conversao para PT-BR
+- Deteccao de tipo de conteudo
+- Sumario (legado ou estruturado)
+- Capitulos com timestamps
+- Export final
"""
+from __future__ import annotations
+
import os
from datetime import datetime
from pathlib import Path
-from typing import Any, Callable, Dict, Optional
+from typing import Any, Callable, Dict, List, Optional, Tuple
-from .cache import calculate_file_hash, calculate_url_hash, get_cache_manager
+from .cache import calculate_file_hash, calculate_string_hash, calculate_url_hash, get_cache_manager
+from .chapters import build_chapters
+from .config import get_model_config
+from .content_type import detect_content_type
from .exceptions import MusicContentError
from .jobs import build_job_artifact_path, get_outputs_root
from ..audio_processor import prepare_audio_file
from ..downloader import download_video_audio, download_youtube_audio
-from ..summarizer import render_text_in_portuguese, summarize_text
-from ..transcriber import transcribe_audio
+from ..summarizer import (
+ format_smart_summary_as_text,
+ render_text_in_portuguese,
+ summarize_smart,
+ summarize_text,
+)
+from ..transcriber import transcribe_audio, transcribe_audio_with_timestamps
from ..utils import cleanup_files, validate_input
from ..web.extractor import extract_pdf_content, extract_text_file_content, extract_web_content
from .formats import export
ProgressCallback = Optional[Callable[[int, str, Optional[str]], None]]
+MEDIA_INPUT_TYPES = {"audio", "video", "youtube"}
+
def _notify(callback: ProgressCallback, progress: int, status: str, message: Optional[str] = None) -> None:
if callback:
@@ -47,6 +69,22 @@ def _ensure_mode(mode: str) -> str:
return mode
+def _resolve_runtime(
+ model: Optional[str],
+ gpt_model: Optional[str],
+ use_smart_summary: Optional[bool],
+ use_chapters: Optional[bool],
+) -> Dict[str, Any]:
+ config = get_model_config()
+ return {
+ "transcribe_model": model or config.transcribe_model,
+ "transcribe_timestamps_model": config.transcribe_timestamps_model,
+ "chat_model": gpt_model or config.chat_model,
+ "smart_summary": config.enable_smart_summary if use_smart_summary is None else use_smart_summary,
+ "chapters_enabled": config.enable_chapters if use_chapters is None else use_chapters,
+ }
+
+
def _export_selected_artifact(
mode: str,
format_type: str,
@@ -72,80 +110,171 @@ def _export_selected_artifact(
)
-def _transcribe_media_to_portuguese(
+def _transcribe_media(
audio_file: str,
- cache_prefix: str,
- cache_identifier: str,
- model: str,
- gpt_model: str,
+ *,
+ runtime: Dict[str, Any],
metadata: Dict[str, Any],
progress_callback: ProgressCallback,
progress_start: int = 30,
progress_end: int = 70,
-) -> str:
+) -> Tuple[str, List[Dict[str, Any]]]:
+ """Transcreve audio retornando (texto_pt, segments). Usa cache + render PT."""
+
cache = _get_cache_manager_safe()
- cached = cache.get(cache_prefix, cache_identifier) if cache else None
+ file_hash = calculate_file_hash(audio_file)
+ chapters_enabled = runtime["chapters_enabled"]
+ cache_prefix = "transcription_ts" if chapters_enabled else "transcription"
+ cached = cache.get(cache_prefix, file_hash) if cache else None
if cached and cached.get("transcription"):
_notify(progress_callback, progress_end, "processing", "Transcricao encontrada no cache")
- return cached["transcription"]
+ return cached["transcription"], cached.get("segments", []) or []
midpoint = progress_start + ((progress_end - progress_start) // 2)
- _notify(progress_callback, progress_start, "processing", "Transcrevendo audio...")
- raw_transcription = transcribe_audio(audio_file, language=None, model=model)
+ segments: List[Dict[str, Any]] = []
+
+ if chapters_enabled:
+ _notify(progress_callback, progress_start, "processing", "Transcrevendo audio com timestamps...")
+ result = transcribe_audio_with_timestamps(
+ audio_file,
+ language=None,
+ model=runtime["transcribe_timestamps_model"],
+ )
+ raw_text = result.get("text", "")
+ segments = result.get("segments", []) or []
+ else:
+ _notify(progress_callback, progress_start, "processing", "Transcrevendo audio...")
+ raw_text = transcribe_audio(audio_file, language=None, model=runtime["transcribe_model"])
+
_notify(progress_callback, midpoint, "processing", "Convertendo conteudo para portugues...")
- portuguese_text = render_text_in_portuguese(raw_transcription, model=gpt_model)
+ portuguese_text = render_text_in_portuguese(raw_text, model=runtime["chat_model"])
if cache:
cache.set(
cache_prefix,
- cache_identifier,
+ file_hash,
{
"transcription": portuguese_text,
+ "segments": segments,
"metadata": metadata,
"timestamp": datetime.now().isoformat(),
},
)
_notify(progress_callback, progress_end, "processing", "Transcricao concluida")
- return portuguese_text
+ return portuguese_text, segments
+
+
+def _detect_and_attach_content_type(
+ text: str,
+ metadata: Dict[str, Any],
+ runtime: Dict[str, Any],
+) -> Optional[str]:
+ """Detecta o tipo (uma vez) e adiciona em metadata."""
+ if not text:
+ return None
+ if metadata.get("content_type"):
+ return metadata["content_type"]
+ detection = detect_content_type(text, metadata=metadata, model=runtime["chat_model"])
+ metadata["content_type"] = detection.get("content_type", "other")
+ metadata["content_type_confidence"] = detection.get("confidence", 0.0)
+ return metadata["content_type"]
-def _summarize_portuguese_text(
+def _build_summary(
text: str,
- model: str,
+ *,
+ metadata: Dict[str, Any],
+ runtime: Dict[str, Any],
progress_callback: ProgressCallback,
progress_start: int = 75,
progress_end: int = 88,
-) -> str:
+) -> Optional[str]:
+ """Gera sumario (estruturado ou textual) com cache. Atualiza metadata."""
+
+ if not text:
+ return None
+
cache = _get_cache_manager_safe()
- text_hash = calculate_url_hash(text)
- cached = cache.get("summary", text_hash) if cache else None
- if cached and cached.get("summary"):
+ text_hash = calculate_string_hash(text)
+ smart_enabled = runtime["smart_summary"]
+ cache_prefix = "smart_summary" if smart_enabled else "summary"
+
+ cached = cache.get(cache_prefix, text_hash) if cache else None
+ if cached:
_notify(progress_callback, progress_end, "processing", "Sumario encontrado no cache")
- return cached["summary"]
+ if smart_enabled and cached.get("smart_summary"):
+ metadata["smart_summary"] = cached["smart_summary"]
+ return cached.get("summary") or format_smart_summary_as_text(cached["smart_summary"])
+ if not smart_enabled and cached.get("summary"):
+ return cached["summary"]
+
+ if smart_enabled:
+ _notify(progress_callback, progress_start, "processing", "Gerando sumario inteligente...")
+ try:
+ smart = summarize_smart(
+ text,
+ model=runtime["chat_model"],
+ content_type=metadata.get("content_type"),
+ )
+ metadata["smart_summary"] = smart
+ summary_text = format_smart_summary_as_text(smart)
+ if cache:
+ cache.set(
+ cache_prefix,
+ text_hash,
+ {
+ "smart_summary": smart,
+ "summary": summary_text,
+ "timestamp": datetime.now().isoformat(),
+ },
+ )
+ _notify(progress_callback, progress_end, "processing", "Sumario inteligente concluido")
+ return summary_text
+ except Exception as exc:
+ print(f"Aviso: smart summary falhou ({exc}); caindo para sumario legado.")
_notify(progress_callback, progress_start, "processing", "Gerando sumario em portugues...")
- summary = summarize_text(text, model=model, language="pt-BR")
+ summary = summarize_text(text, model=runtime["chat_model"], language="pt-BR")
if cache:
cache.set(
"summary",
text_hash,
- {
- "summary": summary,
- "timestamp": datetime.now().isoformat(),
- },
+ {"summary": summary, "timestamp": datetime.now().isoformat()},
)
_notify(progress_callback, progress_end, "processing", "Sumario concluido")
return summary
+def _build_chapters_if_possible(
+ *,
+ runtime: Dict[str, Any],
+ segments: List[Dict[str, Any]],
+ metadata: Dict[str, Any],
+ progress_callback: ProgressCallback,
+) -> List[Dict[str, Any]]:
+ if not runtime["chapters_enabled"] or not segments:
+ return []
+ _notify(progress_callback, 70, "processing", "Gerando capitulos...")
+ chapters = build_chapters(
+ segments,
+ model=runtime["chat_model"],
+ content_type=metadata.get("content_type"),
+ )
+ if chapters:
+ metadata["chapters"] = chapters
+ return chapters
+
+
def process_source(
source: str,
*,
mode: str,
output_format: str = "docx",
- model: str = "whisper-1",
- gpt_model: str = "gpt-4o-mini",
+ model: Optional[str] = None,
+ gpt_model: Optional[str] = None,
+ use_smart_summary: Optional[bool] = None,
+ use_chapters: Optional[bool] = None,
output_path: Optional[str] = None,
output_root: Optional[Path] = None,
run_id: Optional[str] = None,
@@ -154,22 +283,23 @@ def process_source(
progress_callback: ProgressCallback = None,
keep_files: bool = False,
) -> Dict[str, Any]:
- """
- Processa um input e retorna texto em portugues, sumario e caminhos de saida.
- """
+ """Processa um input e retorna texto em portugues, sumario e caminhos de saida."""
_ensure_api_key()
mode = _ensure_mode(mode)
+ runtime = _resolve_runtime(model, gpt_model, use_smart_summary, use_chapters)
+
_notify(progress_callback, 5, "processing", "Validando entrada...")
is_valid, input_type, error_msg = validate_input(source)
if not is_valid:
raise Exception(error_msg)
cache = _get_cache_manager_safe()
- files_to_cleanup = []
+ files_to_cleanup: List[str] = []
metadata: Dict[str, Any] = {}
portuguese_text: Optional[str] = None
summary: Optional[str] = None
+ segments: List[Dict[str, Any]] = []
try:
output_root = output_root or get_outputs_root()
@@ -178,49 +308,25 @@ def process_source(
url_hash = calculate_url_hash(source) if cache else ""
cached = cache.get("youtube", url_hash) if cache else None
if cached and cached.get("transcription"):
- metadata = cached.get("metadata", {})
+ metadata = cached.get("metadata", {}) or {}
portuguese_text = cached.get("transcription")
summary = cached.get("summary")
+ segments = cached.get("segments", []) or []
+ if cached.get("smart_summary"):
+ metadata["smart_summary"] = cached["smart_summary"]
+ if cached.get("chapters"):
+ metadata["chapters"] = cached["chapters"]
_notify(progress_callback, 70, "processing", "Conteudo do YouTube encontrado no cache")
- if mode == "summarize" and not summary:
- summary = _summarize_portuguese_text(portuguese_text, gpt_model, progress_callback)
- if cache:
- cache.set(
- "youtube",
- url_hash,
- {
- "transcription": portuguese_text,
- "summary": summary,
- "metadata": metadata,
- "timestamp": datetime.now().isoformat(),
- },
- )
else:
_notify(progress_callback, 15, "processing", "Baixando video do YouTube...")
audio_file, metadata = download_youtube_audio(source)
files_to_cleanup.append(audio_file)
- portuguese_text = _transcribe_media_to_portuguese(
- audio_file=audio_file,
- cache_prefix="transcription",
- cache_identifier=calculate_file_hash(audio_file),
- model=model,
- gpt_model=gpt_model,
+ portuguese_text, segments = _transcribe_media(
+ audio_file,
+ runtime=runtime,
metadata=metadata,
progress_callback=progress_callback,
)
- if mode == "summarize":
- summary = _summarize_portuguese_text(portuguese_text, gpt_model, progress_callback)
- if cache:
- cache.set(
- "youtube",
- url_hash,
- {
- "transcription": portuguese_text,
- "summary": summary,
- "metadata": metadata,
- "timestamp": datetime.now().isoformat(),
- },
- )
elif input_type == "web":
url_hash = calculate_url_hash(source) if cache else ""
@@ -230,68 +336,33 @@ def process_source(
files_to_cleanup.append(audio_file)
cached = cache.get("video", url_hash) if cache else None
if cached and cached.get("transcription"):
- metadata = cached.get("metadata", metadata)
+ metadata = cached.get("metadata", metadata) or metadata
portuguese_text = cached.get("transcription")
summary = cached.get("summary")
+ segments = cached.get("segments", []) or []
+ if cached.get("smart_summary"):
+ metadata["smart_summary"] = cached["smart_summary"]
+ if cached.get("chapters"):
+ metadata["chapters"] = cached["chapters"]
_notify(progress_callback, 70, "processing", "Conteudo de video encontrado no cache")
- if mode == "summarize" and not summary:
- summary = _summarize_portuguese_text(portuguese_text, gpt_model, progress_callback)
- if cache:
- cache.set(
- "video",
- url_hash,
- {
- "transcription": portuguese_text,
- "summary": summary,
- "metadata": metadata,
- "timestamp": datetime.now().isoformat(),
- },
- )
else:
- portuguese_text = _transcribe_media_to_portuguese(
- audio_file=audio_file,
- cache_prefix="transcription",
- cache_identifier=calculate_file_hash(audio_file),
- model=model,
- gpt_model=gpt_model,
+ portuguese_text, segments = _transcribe_media(
+ audio_file,
+ runtime=runtime,
metadata=metadata,
progress_callback=progress_callback,
)
- if mode == "summarize":
- summary = _summarize_portuguese_text(portuguese_text, gpt_model, progress_callback)
- if cache:
- cache.set(
- "video",
- url_hash,
- {
- "transcription": portuguese_text,
- "summary": summary,
- "metadata": metadata,
- "timestamp": datetime.now().isoformat(),
- },
- )
except MusicContentError:
raise
except Exception:
cached = cache.get("web", url_hash) if cache else None
if cached and cached.get("content"):
- metadata = cached.get("metadata", {})
+ metadata = cached.get("metadata", {}) or {}
portuguese_text = cached.get("content")
summary = cached.get("summary")
+ if cached.get("smart_summary"):
+ metadata["smart_summary"] = cached["smart_summary"]
_notify(progress_callback, 70, "processing", "Conteudo web encontrado no cache")
- if mode == "summarize" and not summary:
- summary = _summarize_portuguese_text(portuguese_text, gpt_model, progress_callback)
- if cache:
- cache.set(
- "web",
- url_hash,
- {
- "content": portuguese_text,
- "summary": summary,
- "metadata": metadata,
- "timestamp": datetime.now().isoformat(),
- },
- )
else:
_notify(progress_callback, 20, "processing", "Extraindo texto da pagina web...")
content_data = extract_web_content(source)
@@ -299,38 +370,22 @@ def process_source(
"title": content_data.get("title", "Pagina Web"),
"webpage_url": source,
}
- portuguese_text = render_text_in_portuguese(content_data["content"], model=gpt_model)
+ portuguese_text = render_text_in_portuguese(
+ content_data["content"], model=runtime["chat_model"]
+ )
_notify(progress_callback, 70, "processing", "Texto convertido para portugues")
- if mode == "summarize":
- summary = _summarize_portuguese_text(portuguese_text, gpt_model, progress_callback)
- if cache:
- cache.set(
- "web",
- url_hash,
- {
- "content": portuguese_text,
- "summary": summary,
- "metadata": metadata,
- "timestamp": datetime.now().isoformat(),
- },
- )
elif input_type in {"audio", "video"}:
_notify(progress_callback, 15, "processing", "Preparando audio...")
audio_file = prepare_audio_file(source, is_video=(input_type == "video"))
if audio_file != source:
files_to_cleanup.append(audio_file)
- portuguese_text = _transcribe_media_to_portuguese(
- audio_file=audio_file,
- cache_prefix="transcription",
- cache_identifier=calculate_file_hash(audio_file),
- model=model,
- gpt_model=gpt_model,
+ portuguese_text, segments = _transcribe_media(
+ audio_file,
+ runtime=runtime,
metadata=metadata,
progress_callback=progress_callback,
)
- if mode == "summarize":
- summary = _summarize_portuguese_text(portuguese_text, gpt_model, progress_callback)
elif input_type in {"pdf", "text"}:
_notify(progress_callback, 20, "processing", "Extraindo conteudo do arquivo...")
@@ -342,16 +397,88 @@ def process_source(
"title": content_data.get("title", "Documento"),
"file_path": source,
}
- portuguese_text = render_text_in_portuguese(content_data["content"], model=gpt_model)
+ portuguese_text = render_text_in_portuguese(
+ content_data["content"], model=runtime["chat_model"]
+ )
_notify(progress_callback, 70, "processing", "Conteudo convertido para portugues")
- if mode == "summarize":
- summary = _summarize_portuguese_text(portuguese_text, gpt_model, progress_callback)
if metadata is None:
metadata = {}
if source.startswith(("http://", "https://")) and not metadata.get("webpage_url"):
metadata = {**metadata, "webpage_url": source}
+ # ---- Etapas pos-transcricao comuns a todos os tipos ----
+ if portuguese_text:
+ _detect_and_attach_content_type(portuguese_text, metadata, runtime)
+
+ chapters = _build_chapters_if_possible(
+ runtime=runtime,
+ segments=segments,
+ metadata=metadata,
+ progress_callback=progress_callback,
+ )
+
+ if mode == "summarize" and not summary:
+ summary = _build_summary(
+ portuguese_text or "",
+ metadata=metadata,
+ runtime=runtime,
+ progress_callback=progress_callback,
+ )
+ elif mode == "summarize" and summary and runtime["smart_summary"] and not metadata.get("smart_summary"):
+ # Cache antigo guardou apenas summary textual; reaproveitamos como tldr.
+ metadata["smart_summary"] = {
+ "tldr": summary[:600],
+ "key_points": [],
+ "decisions": [],
+ "action_items": [],
+ "topics": [],
+ "quotes": [],
+ "open_questions": [],
+ }
+
+ # ---- Atualiza caches "ricos" para fontes de midia ----
+ if cache and input_type == "youtube":
+ cache.set(
+ "youtube",
+ url_hash,
+ {
+ "transcription": portuguese_text,
+ "summary": summary,
+ "metadata": metadata,
+ "segments": segments,
+ "smart_summary": metadata.get("smart_summary"),
+ "chapters": chapters,
+ "timestamp": datetime.now().isoformat(),
+ },
+ )
+ elif cache and input_type == "web" and portuguese_text and segments:
+ cache.set(
+ "video",
+ url_hash,
+ {
+ "transcription": portuguese_text,
+ "summary": summary,
+ "metadata": metadata,
+ "segments": segments,
+ "smart_summary": metadata.get("smart_summary"),
+ "chapters": chapters,
+ "timestamp": datetime.now().isoformat(),
+ },
+ )
+ elif cache and input_type == "web" and portuguese_text and not segments:
+ cache.set(
+ "web",
+ url_hash,
+ {
+ "content": portuguese_text,
+ "summary": summary,
+ "metadata": metadata,
+ "smart_summary": metadata.get("smart_summary"),
+ "timestamp": datetime.now().isoformat(),
+ },
+ )
+
run_id = run_id or datetime.now().strftime("%Y%m%d%H%M%S%f")
resolved_source_name = source_name or metadata.get("title") or source
@@ -388,6 +515,9 @@ def process_source(
"metadata": metadata,
"transcription": portuguese_text,
"summary": summary,
+ "smart_summary": metadata.get("smart_summary"),
+ "chapters": chapters,
+ "content_type": metadata.get("content_type"),
"result_path": exported_path,
"transcription_path": exported_path if mode == "transcribe" else None,
"summary_path": exported_path if mode == "summarize" else None,
diff --git a/lazier/docx_generator.py b/lazier/docx_generator.py
@@ -71,30 +71,41 @@ def create_document(
metadata_section.add_run('\nLink do vídeo: ').bold = True
metadata_section.add_run(sanitize_xml_string(str(metadata['webpage_url'])))
+ # Tipo de conteudo detectado (quando disponivel)
+ if metadata and metadata.get('content_type'):
+ metadata_section.add_run('\nTipo detectado: ').bold = True
+ metadata_section.add_run(sanitize_xml_string(str(metadata['content_type'])))
+
# Espaçamento
doc.add_paragraph()
-
- # Seção de Sumário (se disponível)
- if summary:
+
+ # Sumario inteligente (Structured Outputs) tem precedencia sobre o texto livre
+ smart = metadata.get('smart_summary') if metadata else None
+ if smart:
+ _add_smart_summary_to_doc(doc, smart)
+ elif summary:
doc.add_heading('Sumário', level=2)
-
+
# Sanitizar sumário antes de processar
summary = sanitize_xml_string(summary)
-
+
# Divide sumário em parágrafos
summary_paragraphs = summary.split('\n\n')
if len(summary_paragraphs) == 1:
- # Se não tem \n\n, tenta dividir por \n
summary_paragraphs = summary.split('\n')
-
+
for para_text in summary_paragraphs:
if para_text.strip():
_add_markdown_paragraph(doc, para_text.strip(), is_summary=True)
-
- # Linha separadora
+
doc.add_paragraph('_' * 80)
doc.add_paragraph()
-
+
+ # Capitulos com timestamps (quando disponiveis)
+ chapters = metadata.get('chapters') if metadata else None
+ if chapters:
+ _add_chapters_to_doc(doc, chapters)
+
# Seção de Transcrição
transcription = sanitize_xml_string(transcription)
if transcription.strip():
@@ -126,6 +137,93 @@ def create_document(
return str(output_path_obj)
+def _add_smart_summary_to_doc(doc, smart: Dict[str, Any]) -> None:
+ """Renderiza um SmartSummary com headings e listas no DOCX."""
+ if not smart:
+ return
+
+ doc.add_heading('Sumário inteligente', level=2)
+
+ tldr = (smart.get('tldr') or '').strip()
+ if tldr:
+ doc.add_heading('TL;DR', level=3)
+ para = doc.add_paragraph()
+ _parse_markdown_to_runs(tldr, para)
+ para.paragraph_format.space_after = Pt(8)
+ para.paragraph_format.line_spacing = 1.15
+
+ section_labels = [
+ ('key_points', 'Pontos-chave'),
+ ('decisions', 'Decisões'),
+ ('topics', 'Tópicos'),
+ ('quotes', 'Citações'),
+ ('open_questions', 'Perguntas em aberto'),
+ ]
+
+ for key, label in section_labels:
+ items = smart.get(key) or []
+ if not items:
+ continue
+ doc.add_heading(label, level=3)
+ for item in items:
+ value = sanitize_xml_string(str(item)).strip()
+ if not value:
+ continue
+ para = doc.add_paragraph(style='List Bullet')
+ _parse_markdown_to_runs(value, para)
+ para.paragraph_format.space_after = Pt(4)
+ para.paragraph_format.line_spacing = 1.15
+
+ actions = smart.get('action_items') or []
+ if actions:
+ doc.add_heading('Ações', level=3)
+ for action in actions:
+ owner = sanitize_xml_string((action.get('owner') or '').strip() or '(sem responsável)')
+ task = sanitize_xml_string((action.get('task') or '').strip())
+ due = sanitize_xml_string((action.get('due_hint') or '').strip())
+ para = doc.add_paragraph(style='List Bullet')
+ run_owner = para.add_run(f"{owner}: ")
+ run_owner.bold = True
+ para.add_run(task)
+ if due:
+ run_due = para.add_run(f" (prazo: {due})")
+ run_due.italic = True
+ para.paragraph_format.space_after = Pt(4)
+ para.paragraph_format.line_spacing = 1.15
+
+ doc.add_paragraph('_' * 80)
+ doc.add_paragraph()
+
+
+def _add_chapters_to_doc(doc, chapters) -> None:
+ """Renderiza lista de capitulos com timestamps."""
+ if not chapters:
+ return
+
+ doc.add_heading('Capítulos', level=2)
+ for chapter in chapters:
+ title = sanitize_xml_string((chapter.get('title') or 'Capítulo').strip())
+ start = chapter.get('start_hms') or _format_duration(int(chapter.get('start', 0)))
+ end = chapter.get('end_hms') or _format_duration(int(chapter.get('end', 0)))
+ chapter_summary = sanitize_xml_string((chapter.get('summary') or '').strip())
+
+ para = doc.add_paragraph()
+ run_time = para.add_run(f"[{start} - {end}] ")
+ run_time.bold = True
+ run_time.font.color.rgb = RGBColor(80, 80, 80)
+ para.add_run(title)
+ para.paragraph_format.space_after = Pt(2)
+
+ if chapter_summary:
+ sub = doc.add_paragraph()
+ sub.paragraph_format.left_indent = Inches(0.3)
+ sub.paragraph_format.space_after = Pt(4)
+ _parse_markdown_to_runs(chapter_summary, sub)
+
+ doc.add_paragraph('_' * 80)
+ doc.add_paragraph()
+
+
def _format_duration(seconds: int) -> str:
"""Formata duração em segundos para formato legível"""
hours = seconds // 3600
diff --git a/lazier/summarizer.py b/lazier/summarizer.py
@@ -1,10 +1,23 @@
"""
-Módulo para sumarização de texto usando OpenAI GPT API
-Suporta textos, páginas web e PDFs
+Sumario e renderizacao em portugues usando os modelos de chat da OpenAI.
+
+Inclui dois caminhos:
+
+- `summarize_text` (legado) -> texto livre, mantido por compatibilidade.
+- `summarize_smart` -> Structured Outputs com schema `SmartSummary` (TL;DR,
+ key points, decisoes, action items, topicos, citacoes, perguntas em aberto).
+ E o caminho preferido quando `OPENAI_ENABLE_SMART_SUMMARY=true`.
+
+A `render_text_in_portuguese` continua igual, agora usando o modelo do
+`get_model_config()` quando o caller nao especifica um.
"""
+from __future__ import annotations
+
+import json
import os
-from typing import Optional
+from typing import Any, Dict, List, Optional
+
try:
from openai import OpenAI
except ImportError: # pragma: no cover - ambiente sem openai
@@ -14,305 +27,644 @@ try:
except ImportError: # pragma: no cover - ambiente sem python-dotenv
def load_dotenv():
return False
+try:
+ from pydantic import BaseModel, Field
+except ImportError: # pragma: no cover - pydantic indisponivel
+ BaseModel = None # type: ignore[misc,assignment]
-from .web.extractor import extract_web_content, extract_pdf_content, extract_text_file_content
-
-load_dotenv()
-
+ def Field(*args, **kwargs): # type: ignore[misc]
+ return None
-def summarize_text(text: str, model: str = 'gpt-4o-mini', language: str = 'pt-BR') -> str:
- """
- Sumariza um texto usando OpenAI GPT API
-
- Args:
- text: Texto a ser sumarizado
- model: Modelo GPT a usar (padrão: gpt-4o-mini)
- language: Idioma para o prompt (padrão: pt-BR)
-
- Returns:
- Texto sumarizado
- """
- api_key = os.getenv('OPENAI_API_KEY')
- if not api_key:
- raise Exception(
- "OPENAI_API_KEY não encontrada. "
- "Configure a variável de ambiente OPENAI_API_KEY ou crie um arquivo .env"
- )
- if OpenAI is None:
- raise Exception("openai não está instalado neste ambiente.")
+from .web.extractor import extract_pdf_content, extract_text_file_content, extract_web_content
+from .core.config import get_model_config
- if not text or not text.strip():
- return "Texto vazio - não é possível gerar sumário."
-
- # Estratégia de chunking para textos muito longos
- # GPT-4o-mini tem contexto de ~128k tokens, mas vamos limitar a ~100k para segurança
- # Aproximadamente 1 token = 4 caracteres em português
- max_chars = 400000 # ~100k tokens
-
- if len(text) <= max_chars:
- return _summarize_chunk(text, model, language)
- else:
- # Divide em chunks e sumariza cada um, depois sumariza os sumários
- chunks = _split_text_into_chunks(text, max_chars)
- chunk_summaries = []
-
- print(f"Texto longo detectado ({len(text)} caracteres). Dividido em {len(chunks)} partes.")
-
- for i, chunk in enumerate(chunks):
- try:
- print(f"Sumarizando parte {i+1}/{len(chunks)}...")
- summary = _summarize_chunk(chunk, model, language)
- chunk_summaries.append(summary)
- except Exception as e:
- print(f"Erro ao sumarizar parte {i+1}: {e}")
- chunk_summaries.append(f"[Erro nesta parte: {str(e)}]")
-
- # Filtra sumários válidos
- valid_summaries = [s for s in chunk_summaries if not s.startswith("[Erro")]
-
- if not valid_summaries:
- return "Falha ao gerar sumário: todas as partes falharam."
-
- # Se temos múltiplos chunks, sumariza os sumários
- if len(valid_summaries) > 1:
- print("Consolidando sumários parciais...")
- combined_summaries = "\n\n".join(valid_summaries)
- return _summarize_chunk(combined_summaries, model, language, is_final=True)
- else:
- return valid_summaries[0]
+load_dotenv()
-def render_text_in_portuguese(text: str, model: str = 'gpt-4o-mini') -> str:
- """
- Converte qualquer texto para portugues do Brasil preservando detalhes.
- Se o texto ja estiver em portugues, apenas normaliza a redacao.
- """
- api_key = os.getenv('OPENAI_API_KEY')
+# ---------------------------------------------------------------------------
+# Schema do sumario inteligente
+# ---------------------------------------------------------------------------
+
+if BaseModel is not None:
+
+ class ActionItem(BaseModel):
+ owner: str = Field(default="", description="Responsavel ou pessoa indicada")
+ task: str = Field(default="", description="Acao a ser executada")
+ due_hint: str = Field(default="", description="Prazo, data ou referencia temporal")
+
+
+ class SmartSummary(BaseModel):
+ tldr: str = Field(default="", description="Sumario de 1-3 frases")
+ key_points: List[str] = Field(default_factory=list)
+ decisions: List[str] = Field(default_factory=list)
+ action_items: List[ActionItem] = Field(default_factory=list)
+ topics: List[str] = Field(default_factory=list)
+ quotes: List[str] = Field(default_factory=list)
+ open_questions: List[str] = Field(default_factory=list)
+
+else: # pragma: no cover - apenas seguranca em ambientes sem pydantic
+ ActionItem = dict # type: ignore[misc,assignment]
+ SmartSummary = dict # type: ignore[misc,assignment]
+
+
+# Schema explicito para `response_format=json_schema` (mais seguro que confiar
+# em `model_json_schema()` para garantir strict-mode).
+SMART_SUMMARY_JSON_SCHEMA: Dict[str, Any] = {
+ "name": "SmartSummary",
+ "strict": True,
+ "schema": {
+ "type": "object",
+ "additionalProperties": False,
+ "required": [
+ "tldr",
+ "key_points",
+ "decisions",
+ "action_items",
+ "topics",
+ "quotes",
+ "open_questions",
+ ],
+ "properties": {
+ "tldr": {"type": "string"},
+ "key_points": {"type": "array", "items": {"type": "string"}},
+ "decisions": {"type": "array", "items": {"type": "string"}},
+ "action_items": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "additionalProperties": False,
+ "required": ["owner", "task", "due_hint"],
+ "properties": {
+ "owner": {"type": "string"},
+ "task": {"type": "string"},
+ "due_hint": {"type": "string"},
+ },
+ },
+ },
+ "topics": {"type": "array", "items": {"type": "string"}},
+ "quotes": {"type": "array", "items": {"type": "string"}},
+ "open_questions": {"type": "array", "items": {"type": "string"}},
+ },
+ },
+}
+
+
+# Personas/instrucoes complementares por tipo de conteudo.
+PERSONA_HINTS: Dict[str, str] = {
+ "lecture": (
+ "O texto e uma palestra ou aula. Enfatize conceitos-chave, definicoes, "
+ "exemplos e a sequencia logica de raciocinio. Em action_items, registre "
+ "exercicios ou leituras sugeridas."
+ ),
+ "podcast": (
+ "O texto e um episodio de podcast. Destaque tese central, opinioes dos "
+ "convidados, exemplos e historias contadas. Em quotes, prefira trechos "
+ "marcantes; em action_items, recomendacoes praticas dadas ao ouvinte."
+ ),
+ "interview": (
+ "O texto e uma entrevista. Estruture key_points por blocos tematicos das "
+ "perguntas, e em quotes registre as respostas mais reveladoras."
+ ),
+ "news": (
+ "O texto e jornalistico. Foque em fatos verificaveis, numeros, fontes "
+ "citadas e linha do tempo. Em open_questions, deixe lacunas relatadas "
+ "pela propria materia."
+ ),
+ "tutorial": (
+ "O texto e um tutorial ou how-to. Em key_points, liste passos na ordem; "
+ "em action_items, transforme cada passo em uma acao clara com pre-req."
+ ),
+ "meeting": (
+ "O texto e uma reuniao. Seja muito explicito em decisions e action_items "
+ "(owner + task + due_hint). Em open_questions, registre pendencias."
+ ),
+ "tech_doc": (
+ "O texto e um documento tecnico. Preserve nomes de APIs, parametros, "
+ "constantes e pre-requisitos em key_points."
+ ),
+ "other": "",
+}
+
+
+# Limite de chars por chamada (~tokens muito generoso). Reduz risco de hit no
+# input_tokens limit em modelos com janela menor.
+DEFAULT_CHUNK_CHAR_LIMIT = 300_000
+
+
+# ---------------------------------------------------------------------------
+# Infra OpenAI
+# ---------------------------------------------------------------------------
+
+def _ensure_client() -> "OpenAI":
+ api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise Exception(
- "OPENAI_API_KEY não encontrada. "
- "Configure a variável de ambiente OPENAI_API_KEY ou crie um arquivo .env"
+ "OPENAI_API_KEY nao encontrada. "
+ "Configure a variavel de ambiente OPENAI_API_KEY ou crie um arquivo .env"
)
if OpenAI is None:
- raise Exception("openai não está instalado neste ambiente.")
+ raise Exception("openai nao esta instalado neste ambiente.")
+ return OpenAI(api_key=api_key)
+
+
+def _wrap_chat_error(exc: Exception, action: str) -> Exception:
+ error_msg = str(exc)
+ lowered = error_msg.lower()
+ if "api_key" in lowered or "authentication" in lowered:
+ return Exception("Erro de autenticacao com OpenAI API. Verifique sua OPENAI_API_KEY.")
+ return Exception(f"Erro ao {action}: {error_msg}")
+
+
+def _chat_completions_kwargs(
+ *,
+ model: str,
+ messages: List[Dict[str, Any]],
+ response_format: Optional[Dict[str, Any]] = None,
+ temperature: Optional[float] = None,
+) -> Dict[str, Any]:
+ """Monta kwargs respeitando peculiaridades por familia de modelo."""
+
+ config = get_model_config()
+ kwargs: Dict[str, Any] = {"model": model, "messages": messages}
+ if response_format is not None:
+ kwargs["response_format"] = response_format
+ if config.supports_reasoning(model):
+ kwargs["reasoning_effort"] = config.reasoning_effort
+ elif temperature is not None:
+ # Modelos que nao sao da familia gpt-5/o aceitam temperature normalmente.
+ kwargs["temperature"] = temperature
+ return kwargs
+
+
+# ---------------------------------------------------------------------------
+# Conversao para PT-BR (legacy mantido)
+# ---------------------------------------------------------------------------
+
+def render_text_in_portuguese(text: str, model: Optional[str] = None) -> str:
+ """Converte qualquer texto para portugues do Brasil preservando detalhes."""
if not text or not text.strip():
return ""
- max_chars = 300000
+ config = get_model_config()
+ chosen_model = model or config.chat_model
+
+ max_chars = DEFAULT_CHUNK_CHAR_LIMIT
if len(text) <= max_chars:
- return _render_portuguese_chunk(text, model)
+ return _render_portuguese_chunk(text, chosen_model)
chunks = _split_text_into_chunks(text, max_chars)
- rendered_chunks = []
+ rendered_chunks: List[str] = []
print(f"Texto longo detectado ({len(text)} caracteres). Convertendo {len(chunks)} partes para portugues...")
for i, chunk in enumerate(chunks):
- print(f"Convertendo parte {i+1}/{len(chunks)}...")
- rendered_chunks.append(_render_portuguese_chunk(chunk, model))
+ print(f"Convertendo parte {i + 1}/{len(chunks)}...")
+ rendered_chunks.append(_render_portuguese_chunk(chunk, chosen_model))
return "\n\n".join(chunk.strip() for chunk in rendered_chunks if chunk.strip())
def _render_portuguese_chunk(text: str, model: str) -> str:
- api_key = os.getenv('OPENAI_API_KEY')
- client = OpenAI(api_key=api_key)
- prompt = """Converta o texto a seguir para portugues do Brasil.
-
-Regras:
-- Se o texto ja estiver em portugues, mantenha em portugues do Brasil natural.
-- Nao resuma, nao explique, nao comente o texto.
-- Preserve nomes proprios, numeros, datas, links, listas e estrutura.
-- Mantenha o maximo de fidelidade possivel ao conteudo original.
-
-Texto:
-
-"""
+ client = _ensure_client()
+ prompt = (
+ "Converta o texto a seguir para portugues do Brasil.\n\n"
+ "Regras:\n"
+ "- Se o texto ja estiver em portugues, mantenha em portugues do Brasil natural.\n"
+ "- Nao resuma, nao explique, nao comente o texto.\n"
+ "- Preserve nomes proprios, numeros, datas, links, listas e estrutura.\n"
+ "- Mantenha o maximo de fidelidade possivel ao conteudo original.\n\n"
+ "Texto:\n\n"
+ )
try:
- response = client.chat.completions.create(
+ kwargs = _chat_completions_kwargs(
model=model,
messages=[
{
"role": "system",
- "content": "Voce e um tradutor tecnico e editorial que preserva fielmente o conteudo."
+ "content": "Voce e um tradutor tecnico e editorial que preserva fielmente o conteudo.",
},
- {"role": "user", "content": prompt + text}
+ {"role": "user", "content": prompt + text},
],
temperature=0.1,
)
- return response.choices[0].message.content.strip()
- except Exception as e:
- error_msg = str(e)
- if 'api_key' in error_msg.lower() or 'authentication' in error_msg.lower():
- raise Exception("Erro de autenticação com OpenAI API. Verifique sua OPENAI_API_KEY.")
- raise Exception(f"Erro ao converter texto para português: {error_msg}")
+ response = client.chat.completions.create(**kwargs)
+ return (response.choices[0].message.content or "").strip()
+ except Exception as exc:
+ raise _wrap_chat_error(exc, "converter texto para portugues") from exc
-def _summarize_chunk(text: str, model: str, language: str, is_final: bool = False) -> str:
- """Sumariza um chunk de texto"""
- api_key = os.getenv('OPENAI_API_KEY')
- client = OpenAI(api_key=api_key)
-
- if language.startswith('pt'):
- prompt = """Você é um assistente especializado em criar sumários detalhados e completos.
-
-Crie um sumário COMPLETO e DETALHADO do seguinte texto em português do Brasil. O sumário DEVE:
-- Manter TODOS os pontos importantes, chave e informações relevantes
-- Preservar números, datas, nomes, estatísticas e dados técnicos
-- Manter a estrutura lógica e sequência do conteúdo original
-- Ser detalhado o suficiente para não perder informações essenciais
-- Destacar os principais temas e subtemas
-- Ser escrito em português do Brasil
-- Ter pelo menos 30-40% do tamanho do texto original (para textos longos)
-
-IMPORTANTE: Não omita informações importantes. O objetivo é condensar mantendo a completude dos pontos-chave. Se o texto contém listas, exemplos específicos, ou dados numéricos, inclua-os no sumário.
-
-Texto para sumarizar:
+# ---------------------------------------------------------------------------
+# Sumario textual legado
+# ---------------------------------------------------------------------------
-"""
- if is_final:
- prompt = """Você é um assistente especializado em criar sumários finais detalhados e completos.
+def summarize_text(text: str, model: Optional[str] = None, language: str = "pt-BR") -> str:
+ """Sumariza texto em formato livre (caminho legado)."""
-Você recebeu múltiplos sumários parciais de um texto longo. Crie um sumário final unificado e COMPLETO em português do Brasil que:
-- Integre TODOS os pontos principais dos sumários parciais
-- Mantenha TODAS as informações importantes, números, datas, nomes e dados técnicos
-- Seja coerente e bem estruturado
-- Destaque os temas e informações mais importantes sem perder detalhes relevantes
-- Seja escrito em português do Brasil
-- Preserve a completude das informações essenciais
+ if not text or not text.strip():
+ return "Texto vazio - nao e possivel gerar sumario."
-IMPORTANTE: Não omita informações importantes ao consolidar. O objetivo é criar um sumário final que mantenha a riqueza de detalhes dos sumários parciais.
+ config = get_model_config()
+ chosen_model = model or config.chat_model
+ max_chars = 400_000
-Sumários parciais para consolidar:
+ if len(text) <= max_chars:
+ return _summarize_chunk(text, chosen_model, language)
-"""
- else:
- prompt = f"""You are an assistant specialized in creating concise and informative summaries.
+ chunks = _split_text_into_chunks(text, max_chars)
+ chunk_summaries: List[str] = []
+ print(f"Texto longo detectado ({len(text)} caracteres). Dividido em {len(chunks)} partes.")
+ for i, chunk in enumerate(chunks):
+ try:
+ print(f"Sumarizando parte {i + 1}/{len(chunks)}...")
+ chunk_summaries.append(_summarize_chunk(chunk, chosen_model, language))
+ except Exception as exc:
+ print(f"Erro ao sumarizar parte {i + 1}: {exc}")
+ chunk_summaries.append(f"[Erro nesta parte: {exc}]")
+
+ valid_summaries = [s for s in chunk_summaries if not s.startswith("[Erro")]
+ if not valid_summaries:
+ return "Falha ao gerar sumario: todas as partes falharam."
+ if len(valid_summaries) > 1:
+ print("Consolidando sumarios parciais...")
+ combined = "\n\n".join(valid_summaries)
+ return _summarize_chunk(combined, chosen_model, language, is_final=True)
+ return valid_summaries[0]
-Please create a detailed summary of the following text in {language}. The summary should:
-- Be clear and objective
-- Highlight the main points
-- Maintain the logical structure of the content
-Text to summarize:
+def _summarize_chunk(text: str, model: str, language: str, is_final: bool = False) -> str:
+ client = _ensure_client()
+
+ if language.startswith("pt"):
+ prompt = (
+ "Voce e um assistente especializado em criar sumarios detalhados e completos.\n\n"
+ "Crie um sumario COMPLETO e DETALHADO do seguinte texto em portugues do Brasil. O sumario DEVE:\n"
+ "- Manter TODOS os pontos importantes, chave e informacoes relevantes\n"
+ "- Preservar numeros, datas, nomes, estatisticas e dados tecnicos\n"
+ "- Manter a estrutura logica e sequencia do conteudo original\n"
+ "- Ser detalhado o suficiente para nao perder informacoes essenciais\n"
+ "- Destacar os principais temas e subtemas\n"
+ "- Ser escrito em portugues do Brasil\n\n"
+ "Texto para sumarizar:\n\n"
+ )
+ if is_final:
+ prompt = (
+ "Voce recebeu multiplos sumarios parciais de um texto longo. Crie um sumario final "
+ "unificado e COMPLETO em portugues do Brasil que integre TODOS os pontos principais, "
+ "preserve numeros/datas/nomes e seja coerente.\n\nSumarios parciais:\n\n"
+ )
+ else:
+ prompt = (
+ f"You are an assistant specialized in creating concise and informative summaries.\n\n"
+ f"Please create a detailed summary of the following text in {language}.\n\nText:\n\n"
+ )
-"""
-
try:
- response = client.chat.completions.create(
+ kwargs = _chat_completions_kwargs(
model=model,
messages=[
- {"role": "system", "content": "You are a helpful assistant that creates detailed and comprehensive summaries, preserving all important information."},
- {"role": "user", "content": prompt + text}
+ {
+ "role": "system",
+ "content": "You are a helpful assistant that creates detailed and comprehensive summaries.",
+ },
+ {"role": "user", "content": prompt + text},
],
- temperature=0.2, # Temperatura mais baixa para sumários mais precisos e completos
+ temperature=0.2,
)
-
- return response.choices[0].message.content.strip()
-
- except Exception as e:
- error_msg = str(e)
- if 'api_key' in error_msg.lower() or 'authentication' in error_msg.lower():
- raise Exception("Erro de autenticação com OpenAI API. Verifique sua OPENAI_API_KEY.")
- else:
- raise Exception(f"Erro ao sumarizar texto: {error_msg}")
+ response = client.chat.completions.create(**kwargs)
+ return (response.choices[0].message.content or "").strip()
+ except Exception as exc:
+ raise _wrap_chat_error(exc, "sumarizar texto") from exc
+
+
+# ---------------------------------------------------------------------------
+# Sumario inteligente (Structured Outputs)
+# ---------------------------------------------------------------------------
+
+def summarize_smart(
+ text: str,
+ *,
+ model: Optional[str] = None,
+ content_type: Optional[str] = None,
+ language: str = "pt-BR",
+) -> Dict[str, Any]:
+ """Gera sumario estruturado e devolve dicionario serializavel.
+
+ Sempre devolve um dict no formato `SmartSummary` (mesmo em fallback). Se a
+ chamada com structured outputs falhar, faz uma tentativa adicional com
+ `json_object`. Se ainda assim falhar, embrulha o texto livre em `tldr`.
+ """
+ if BaseModel is None:
+ raise Exception("pydantic nao esta instalado neste ambiente.")
-def summarize_text_file(file_path: str, model: str = 'gpt-4o-mini', language: str = 'pt-BR') -> str:
- """
- Sumariza conteúdo de um arquivo de texto
-
- Args:
- file_path: Caminho do arquivo de texto
- model: Modelo GPT a usar
- language: Idioma para o prompt
-
- Returns:
- Texto sumarizado
- """
- # Extrai conteúdo do arquivo
+ if not text or not text.strip():
+ return SmartSummary(tldr="Texto vazio - nao e possivel gerar sumario.").model_dump()
+
+ config = get_model_config()
+ chosen_model = model or config.chat_model
+
+ chunks = _split_text_into_chunks(text, DEFAULT_CHUNK_CHAR_LIMIT) if len(text) > DEFAULT_CHUNK_CHAR_LIMIT else [text]
+
+ if len(chunks) == 1:
+ return _summarize_smart_chunk(chunks[0], chosen_model, content_type, language).model_dump()
+
+ print(f"Texto longo detectado ({len(text)} chars) - rodando sumario inteligente em {len(chunks)} partes.")
+ parciais: List[SmartSummary] = []
+ for index, chunk in enumerate(chunks):
+ print(f"Sumario inteligente parte {index + 1}/{len(chunks)}...")
+ try:
+ parciais.append(_summarize_smart_chunk(chunk, chosen_model, content_type, language))
+ except Exception as exc:
+ print(f"Erro ao sumarizar parte {index + 1}: {exc}")
+
+ if not parciais:
+ return SmartSummary(tldr="Falha ao gerar sumario: todas as partes falharam.").model_dump()
+
+ if len(parciais) == 1:
+ return parciais[0].model_dump()
+
+ return _merge_smart_summaries(parciais, chosen_model, content_type).model_dump()
+
+
+def _build_smart_messages(
+ text: str,
+ content_type: Optional[str],
+ language: str,
+ is_merge: bool = False,
+) -> List[Dict[str, Any]]:
+ persona_extra = PERSONA_HINTS.get((content_type or "other").lower(), "")
+
+ system = (
+ "Voce e um analista que cria sumarios estruturados de altissima qualidade em "
+ f"{'portugues do Brasil' if language.startswith('pt') else language}. "
+ "Mantenha fidelidade ao texto, sem inventar dados. Quando um campo nao tiver "
+ "informacao, devolva uma lista vazia ou string vazia."
+ )
+ if persona_extra:
+ system += " " + persona_extra
+
+ if is_merge:
+ user = (
+ "Voce recebera varios sumarios estruturados parciais de um mesmo texto longo. "
+ "Consolide-os em UM unico SmartSummary final, eliminando duplicatas, mantendo "
+ "todos os pontos relevantes e preservando numeros, datas e nomes.\n\n"
+ "Sumarios parciais (JSON):\n\n" + text
+ )
+ else:
+ user = (
+ "Crie um SmartSummary do texto abaixo. Regras:\n"
+ "- tldr: 1-3 frases.\n"
+ "- key_points: pontos principais em ordem logica.\n"
+ "- decisions: decisoes explicitas tomadas no texto (vazio se nao houver).\n"
+ "- action_items: tarefas concretas com owner/task/due_hint quando possivel.\n"
+ "- topics: temas curtos (2-4 palavras cada).\n"
+ "- quotes: trechos literais marcantes.\n"
+ "- open_questions: perguntas em aberto ou pontos nao resolvidos.\n\n"
+ "Texto:\n\n" + text
+ )
+
+ return [
+ {"role": "system", "content": system},
+ {"role": "user", "content": user},
+ ]
+
+
+def _summarize_smart_chunk(
+ text: str,
+ model: str,
+ content_type: Optional[str],
+ language: str,
+) -> SmartSummary:
+ client = _ensure_client()
+ messages = _build_smart_messages(text, content_type, language, is_merge=False)
+
+ try:
+ kwargs = _chat_completions_kwargs(
+ model=model,
+ messages=messages,
+ response_format={"type": "json_schema", "json_schema": SMART_SUMMARY_JSON_SCHEMA},
+ temperature=0.2,
+ )
+ response = client.chat.completions.create(**kwargs)
+ raw = response.choices[0].message.content or "{}"
+ return _parse_smart_summary(raw)
+ except Exception as exc:
+ # Fallback: pede JSON livre e tenta parsear.
+ print(f"Aviso: structured outputs falhou ({exc}); tentando json_object...")
+ return _summarize_smart_fallback_json(client, model, messages)
+
+
+def _summarize_smart_fallback_json(
+ client: "OpenAI",
+ model: str,
+ messages: List[Dict[str, Any]],
+) -> SmartSummary:
+ fallback_messages = list(messages)
+ fallback_messages[0] = {
+ **messages[0],
+ "content": messages[0]["content"]
+ + " Devolva apenas um JSON valido com as chaves: tldr, key_points, decisions, "
+ "action_items (lista de objetos com owner/task/due_hint), topics, quotes, open_questions.",
+ }
+
+ try:
+ kwargs = _chat_completions_kwargs(
+ model=model,
+ messages=fallback_messages,
+ response_format={"type": "json_object"},
+ temperature=0.2,
+ )
+ response = client.chat.completions.create(**kwargs)
+ raw = response.choices[0].message.content or "{}"
+ return _parse_smart_summary(raw)
+ except Exception as exc:
+ print(f"Aviso: fallback JSON tambem falhou ({exc}); embrulhando texto livre.")
+ # Ultimo recurso: roda o sumario textual e enfia tudo em tldr.
+ try:
+ text_only_kwargs = _chat_completions_kwargs(
+ model=model,
+ messages=fallback_messages,
+ response_format=None,
+ temperature=0.2,
+ )
+ response = client.chat.completions.create(**text_only_kwargs)
+ free_text = (response.choices[0].message.content or "").strip()
+ except Exception:
+ free_text = ""
+ return SmartSummary(tldr=free_text or "Falha ao gerar sumario estruturado.")
+
+
+def _merge_smart_summaries(
+ parciais: List[SmartSummary],
+ model: str,
+ content_type: Optional[str],
+) -> SmartSummary:
+ client = _ensure_client()
+ serialized = json.dumps([p.model_dump() for p in parciais], ensure_ascii=False, indent=2)
+ messages = _build_smart_messages(serialized, content_type, "pt-BR", is_merge=True)
+
+ try:
+ kwargs = _chat_completions_kwargs(
+ model=model,
+ messages=messages,
+ response_format={"type": "json_schema", "json_schema": SMART_SUMMARY_JSON_SCHEMA},
+ temperature=0.2,
+ )
+ response = client.chat.completions.create(**kwargs)
+ raw = response.choices[0].message.content or "{}"
+ return _parse_smart_summary(raw)
+ except Exception as exc:
+ print(f"Aviso: merge estruturado falhou ({exc}); aplicando merge local.")
+ return _local_merge_summaries(parciais)
+
+
+def _local_merge_summaries(parciais: List[SmartSummary]) -> SmartSummary:
+ """Merge determinista local quando a API falha no merge final."""
+ seen: Dict[str, set] = {
+ "key_points": set(),
+ "decisions": set(),
+ "topics": set(),
+ "quotes": set(),
+ "open_questions": set(),
+ }
+ merged = SmartSummary()
+ tldr_parts: List[str] = []
+ actions_seen: set = set()
+
+ for part in parciais:
+ if part.tldr:
+ tldr_parts.append(part.tldr)
+ for field, target_set in seen.items():
+ for value in getattr(part, field, []) or []:
+ if value and value not in target_set:
+ target_set.add(value)
+ getattr(merged, field).append(value)
+ for action in part.action_items or []:
+ key = (action.owner or "", action.task or "", action.due_hint or "")
+ if key not in actions_seen and (action.task or action.owner):
+ actions_seen.add(key)
+ merged.action_items.append(action)
+
+ merged.tldr = " ".join(tldr_parts).strip()[:600]
+ return merged
+
+
+def _parse_smart_summary(raw: str) -> SmartSummary:
+ data = json.loads(raw)
+ return SmartSummary(**data)
+
+
+# ---------------------------------------------------------------------------
+# Render para texto Markdown (compat com exports atuais)
+# ---------------------------------------------------------------------------
+
+def format_smart_summary_as_text(summary: Dict[str, Any]) -> str:
+ """Converte um dict SmartSummary em Markdown legivel."""
+
+ if not summary:
+ return ""
+
+ lines: List[str] = []
+ tldr = summary.get("tldr") or ""
+ if tldr.strip():
+ lines.append("**TL;DR**")
+ lines.append("")
+ lines.append(tldr.strip())
+ lines.append("")
+
+ def _bullet_block(title: str, items: List[Any]) -> None:
+ if not items:
+ return
+ lines.append(f"**{title}**")
+ lines.append("")
+ for item in items:
+ if isinstance(item, dict):
+ owner = item.get("owner") or "(sem responsavel)"
+ task = item.get("task") or ""
+ due = item.get("due_hint") or ""
+ rendered = f"- {owner}: {task}".rstrip()
+ if due:
+ rendered += f" (prazo: {due})"
+ lines.append(rendered)
+ else:
+ value = str(item).strip()
+ if value:
+ lines.append(f"- {value}")
+ lines.append("")
+
+ _bullet_block("Pontos-chave", summary.get("key_points") or [])
+ _bullet_block("Decisoes", summary.get("decisions") or [])
+ _bullet_block("Acoes", summary.get("action_items") or [])
+ _bullet_block("Topicos", summary.get("topics") or [])
+ _bullet_block("Citacoes", summary.get("quotes") or [])
+ _bullet_block("Perguntas em aberto", summary.get("open_questions") or [])
+
+ return "\n".join(lines).strip()
+
+
+# ---------------------------------------------------------------------------
+# Helpers de alto nivel para fontes de texto
+# ---------------------------------------------------------------------------
+
+def summarize_text_file(file_path: str, model: Optional[str] = None, language: str = "pt-BR") -> str:
content_data = extract_text_file_content(file_path)
- text = content_data['content']
-
+ text = content_data.get("content", "")
if not text or not text.strip():
- return "Arquivo vazio - não é possível gerar sumário."
-
+ return "Arquivo vazio - nao e possivel gerar sumario."
return summarize_text(text, model=model, language=language)
-def summarize_web_page(url: str, model: str = 'gpt-4o-mini', language: str = 'pt-BR') -> str:
- """
- Sumariza conteúdo de uma página web
-
- Args:
- url: URL da página web
- model: Modelo GPT a usar
- language: Idioma para o prompt
-
- Returns:
- Texto sumarizado
- """
- # Extrai conteúdo da web
+def summarize_web_page(url: str, model: Optional[str] = None, language: str = "pt-BR") -> str:
content_data = extract_web_content(url)
- text = content_data['content']
-
+ text = content_data.get("content", "")
if not text or not text.strip():
- return "Não foi possível extrair conteúdo da página web."
-
+ return "Nao foi possivel extrair conteudo da pagina web."
return summarize_text(text, model=model, language=language)
-def summarize_pdf(file_path: str, model: str = 'gpt-4o-mini', language: str = 'pt-BR') -> str:
- """
- Sumariza conteúdo de um arquivo PDF
-
- Args:
- file_path: Caminho do arquivo PDF
- model: Modelo GPT a usar
- language: Idioma para o prompt
-
- Returns:
- Texto sumarizado
- """
- # Extrai conteúdo do PDF
+def summarize_pdf(file_path: str, model: Optional[str] = None, language: str = "pt-BR") -> str:
content_data = extract_pdf_content(file_path)
- text = content_data['content']
-
+ text = content_data.get("content", "")
if not text or not text.strip():
- return "PDF vazio ou não foi possível extrair texto."
-
+ return "PDF vazio ou nao foi possivel extrair texto."
return summarize_text(text, model=model, language=language)
-def _split_text_into_chunks(text: str, max_chars: int) -> list[str]:
- """Divide texto em chunks respeitando limites de tamanho"""
- chunks = []
+# ---------------------------------------------------------------------------
+# Chunking utilitario
+# ---------------------------------------------------------------------------
+
+def _split_text_into_chunks(text: str, max_chars: int) -> List[str]:
+ """Divide texto em chunks respeitando paragrafos, sentencas e o limite."""
+
+ chunks: List[str] = []
current_chunk = ""
-
- # Divide por parágrafos primeiro
- paragraphs = text.split('\n\n')
-
+
+ paragraphs = text.split("\n\n")
+
for paragraph in paragraphs:
if len(current_chunk) + len(paragraph) + 2 <= max_chars:
- current_chunk += paragraph + '\n\n'
+ current_chunk += paragraph + "\n\n"
+ continue
+
+ if current_chunk:
+ chunks.append(current_chunk.strip())
+ current_chunk = ""
+
+ if len(paragraph) > max_chars:
+ sentences = paragraph.split(". ")
+ temp_chunk = ""
+ for sentence in sentences:
+ if len(temp_chunk) + len(sentence) + 2 <= max_chars:
+ temp_chunk += sentence + ". "
+ else:
+ if temp_chunk:
+ chunks.append(temp_chunk.strip())
+ temp_chunk = sentence + ". "
+ current_chunk = temp_chunk
else:
- if current_chunk:
- chunks.append(current_chunk.strip())
- # Se parágrafo sozinho é maior que max_chars, divide por sentenças
- if len(paragraph) > max_chars:
- sentences = paragraph.split('. ')
- temp_chunk = ""
- for sentence in sentences:
- if len(temp_chunk) + len(sentence) + 2 <= max_chars:
- temp_chunk += sentence + '. '
- else:
- if temp_chunk:
- chunks.append(temp_chunk.strip())
- temp_chunk = sentence + '. '
- current_chunk = temp_chunk
- else:
- current_chunk = paragraph + '\n\n'
-
+ current_chunk = paragraph + "\n\n"
+
if current_chunk:
chunks.append(current_chunk.strip())
-
+
return chunks
diff --git a/lazier/transcriber.py b/lazier/transcriber.py
@@ -1,10 +1,24 @@
"""
-Módulo para transcrição de áudio usando OpenAI Whisper API
+Modulo para transcricao de audio usando a API da OpenAI.
+
+Suporta dois modos:
+
+- `transcribe_audio` -> texto plano (rapido, default usa OPENAI_TRANSCRIBE_MODEL).
+- `transcribe_audio_with_timestamps` -> dicionario com `text` e `segments`
+ (cada segmento com `start`, `end`, `text`). Usado quando precisamos gerar
+ capitulos com timestamps. Para isso usamos o modelo definido em
+ `OPENAI_TRANSCRIBE_TIMESTAMPS_MODEL` (default `whisper-1`), porque ainda e a
+ forma mais confiavel de obter `verbose_json`.
+
+Ambos lidam com chunking automatico para arquivos > 24MB (limite da API e ~25MB).
"""
+from __future__ import annotations
+
import os
from pathlib import Path
-from typing import Optional
+from typing import Any, Dict, List, Optional
+
try:
from openai import OpenAI
except ImportError: # pragma: no cover - ambiente sem openai
@@ -19,90 +33,172 @@ load_dotenv()
from .audio_processor import split_audio
+from .core.config import get_model_config
-def transcribe_audio(audio_path: str, language: Optional[str] = None, model: str = 'whisper-1') -> str:
- """
- Transcreve um arquivo de áudio usando OpenAI Whisper API,
- com suporte a divisão automática de arquivos grandes.
-
- Args:
- audio_path: Caminho do arquivo de áudio
- language: Codigo do idioma. Quando None, a API detecta automaticamente.
- model: Modelo Whisper a usar (padrão: whisper-1)
-
- Returns:
- Texto transcrito
- """
- api_key = os.getenv('OPENAI_API_KEY')
+MAX_CHUNK_SIZE_BYTES = 24 * 1024 * 1024 # 24MB para margem sob o limite de 25MB
+
+
+def _ensure_client() -> "OpenAI":
+ api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise Exception(
- "OPENAI_API_KEY não encontrada. "
- "Configure a variável de ambiente OPENAI_API_KEY ou crie um arquivo .env"
+ "OPENAI_API_KEY nao encontrada. "
+ "Configure a variavel de ambiente OPENAI_API_KEY ou crie um arquivo .env"
)
if OpenAI is None:
- raise Exception("openai não está instalado neste ambiente.")
+ raise Exception("openai nao esta instalado neste ambiente.")
+ return OpenAI(api_key=api_key)
- if not os.path.exists(audio_path):
- raise FileNotFoundError(f"Arquivo de áudio não encontrado: {audio_path}")
-
- # Verifica se precisa dividir o arquivo (limite da API é 25MB)
+
+def _wrap_openai_error(exc: Exception) -> Exception:
+ error_msg = str(exc)
+ lowered = error_msg.lower()
+ if "api_key" in lowered or "authentication" in lowered:
+ return Exception("Erro de autenticacao com OpenAI API. Verifique sua OPENAI_API_KEY.")
+ if "file_size" in lowered or "too large" in lowered:
+ return Exception("Arquivo muito grande para a API. Limite e 25MB por chunk.")
+ return Exception(f"Erro ao transcrever audio: {error_msg}")
+
+
+def _resolve_chunks(audio_path: str) -> List[str]:
file_size = os.path.getsize(audio_path)
- max_size_per_chunk = 24 * 1024 * 1024 # 24MB para margem de segurança
-
+ if file_size <= MAX_CHUNK_SIZE_BYTES:
+ return [audio_path]
+ print(f"Arquivo grande detectado ({file_size / 1024 / 1024:.2f}MB). Dividindo em chunks...")
+ return split_audio(audio_path, chunk_size_mb=24)
+
+
+def transcribe_audio(
+ audio_path: str,
+ language: Optional[str] = None,
+ model: Optional[str] = None,
+) -> str:
+ """Transcreve audio em texto plano. Retorna apenas a string da transcricao."""
+
+ if not os.path.exists(audio_path):
+ raise FileNotFoundError(f"Arquivo de audio nao encontrado: {audio_path}")
+
+ config = get_model_config()
+ chosen_model = model or config.transcribe_model
+ client = _ensure_client()
+
try:
- client = OpenAI(api_key=api_key)
-
- if file_size > max_size_per_chunk:
- print(f"Arquivo grande detectado ({file_size / 1024 / 1024:.2f}MB). Dividindo em chunks...")
- chunks = split_audio(audio_path, chunk_size_mb=24)
- transcriptions = []
-
- for i, chunk_path in enumerate(chunks):
- print(f"Processando chunk {i+1}/{len(chunks)}...")
- with open(chunk_path, 'rb') as audio_file:
- request_kwargs = {
- 'model': model,
- 'file': audio_file,
- 'response_format': 'text'
- }
- if language:
- request_kwargs['language'] = language
- transcript = client.audio.transcriptions.create(**request_kwargs)
-
- if hasattr(transcript, 'text'):
- text = transcript.text
- else:
- text = str(transcript)
-
- transcriptions.append(text.strip())
-
- return " ".join(transcriptions)
- else:
- # Caso contrário, transcreve direto
- with open(audio_path, 'rb') as audio_file:
- request_kwargs = {
- 'model': model,
- 'file': audio_file,
- 'response_format': 'text'
+ chunks = _resolve_chunks(audio_path)
+ transcriptions: List[str] = []
+ total = len(chunks)
+ for index, chunk_path in enumerate(chunks):
+ if total > 1:
+ print(f"Processando chunk {index + 1}/{total}...")
+ with open(chunk_path, "rb") as audio_file:
+ request_kwargs: Dict[str, Any] = {
+ "model": chosen_model,
+ "file": audio_file,
+ "response_format": "text",
}
if language:
- request_kwargs['language'] = language
+ request_kwargs["language"] = language
transcript = client.audio.transcriptions.create(**request_kwargs)
-
- # Se retornou como objeto, pega o texto
- if hasattr(transcript, 'text'):
- return transcript.text
+
+ if hasattr(transcript, "text"):
+ text = transcript.text
elif isinstance(transcript, str):
- return transcript
+ text = transcript
else:
- return str(transcript)
-
- except Exception as e:
- error_msg = str(e)
- if 'api_key' in error_msg.lower() or 'authentication' in error_msg.lower():
- raise Exception("Erro de autenticação com OpenAI API. Verifique sua OPENAI_API_KEY.")
- elif 'file_size' in error_msg.lower() or 'too large' in error_msg.lower():
- raise Exception(f"Arquivo muito grande para a API. Limite é 25MB por chunk.")
- else:
- raise Exception(f"Erro ao transcrever áudio: {error_msg}")
+ text = str(transcript)
+ transcriptions.append((text or "").strip())
+
+ return " ".join(part for part in transcriptions if part)
+ except Exception as exc:
+ raise _wrap_openai_error(exc) from exc
+
+
+def transcribe_audio_with_timestamps(
+ audio_path: str,
+ language: Optional[str] = None,
+ model: Optional[str] = None,
+) -> Dict[str, Any]:
+ """Transcreve com `verbose_json` retornando texto e segmentos com timestamps.
+
+ Returns:
+ {
+ "text": str, # texto completo concatenado
+ "segments": [ # lista ordenada com tempos absolutos
+ {"start": float, "end": float, "text": str},
+ ...
+ ],
+ "duration": float | None, # duracao total estimada (s)
+ }
+ """
+
+ if not os.path.exists(audio_path):
+ raise FileNotFoundError(f"Arquivo de audio nao encontrado: {audio_path}")
+
+ config = get_model_config()
+ chosen_model = model or config.transcribe_timestamps_model
+ client = _ensure_client()
+
+ try:
+ chunks = _resolve_chunks(audio_path)
+ all_segments: List[Dict[str, Any]] = []
+ all_text_parts: List[str] = []
+ offset_seconds = 0.0
+ total = len(chunks)
+
+ for index, chunk_path in enumerate(chunks):
+ if total > 1:
+ print(f"Processando chunk {index + 1}/{total} (com timestamps)...")
+ with open(chunk_path, "rb") as audio_file:
+ request_kwargs: Dict[str, Any] = {
+ "model": chosen_model,
+ "file": audio_file,
+ "response_format": "verbose_json",
+ }
+ if language:
+ request_kwargs["language"] = language
+ response = client.audio.transcriptions.create(**request_kwargs)
+
+ payload = _normalize_verbose_response(response)
+ chunk_duration = payload.get("duration")
+ chunk_segments = payload.get("segments") or []
+
+ for seg in chunk_segments:
+ all_segments.append(
+ {
+ "start": float(seg.get("start", 0.0)) + offset_seconds,
+ "end": float(seg.get("end", 0.0)) + offset_seconds,
+ "text": (seg.get("text") or "").strip(),
+ }
+ )
+
+ chunk_text = (payload.get("text") or "").strip()
+ if chunk_text:
+ all_text_parts.append(chunk_text)
+
+ if isinstance(chunk_duration, (int, float)):
+ offset_seconds += float(chunk_duration)
+ elif chunk_segments:
+ # Fallback: usa o final do ultimo segmento como nova base.
+ offset_seconds = max(seg["end"] for seg in all_segments)
+
+ return {
+ "text": " ".join(all_text_parts).strip(),
+ "segments": all_segments,
+ "duration": offset_seconds if offset_seconds > 0 else None,
+ }
+ except Exception as exc:
+ raise _wrap_openai_error(exc) from exc
+
+
+def _normalize_verbose_response(response: Any) -> Dict[str, Any]:
+ """Aceita tanto dict quanto objeto Pydantic da SDK e normaliza."""
+ if hasattr(response, "model_dump"):
+ return response.model_dump()
+ if isinstance(response, dict):
+ return response
+ # ultimo recurso: tenta acessar atributos diretamente
+ return {
+ "text": getattr(response, "text", ""),
+ "segments": getattr(response, "segments", []) or [],
+ "duration": getattr(response, "duration", None),
+ }
diff --git a/pyproject.toml b/pyproject.toml
@@ -23,7 +23,8 @@ classifiers = [
dependencies = [
"click>=8.1.0",
"yt-dlp>=2024.0.0",
- "openai>=1.0.0",
+ "openai>=1.50.0",
+ "pydantic>=2.6.0",
"python-docx>=1.1.0",
"python-dotenv>=1.0.0",
"fastapi>=0.104.0",
diff --git a/tests/test_api.py b/tests/test_api.py
@@ -79,6 +79,76 @@ class ApiTests(unittest.TestCase):
download = self.client.get(f"/api/jobs/{job_id}/download")
self.assertEqual(download.status_code, 200)
+ def test_process_passes_overrides_to_pipeline(self):
+ output_dir = Path(os.environ["LAZIER_OUTPUT_DIR"]) / "2026" / "05" / "03" / "smart-job"
+ output_dir.mkdir(parents=True, exist_ok=True)
+ output_path = output_dir / "sumario.md"
+ output_path.write_text("Resumo inteligente", encoding="utf-8")
+
+ smart_payload = {
+ "tldr": "TL;DR exemplo",
+ "key_points": ["A", "B"],
+ "decisions": [],
+ "action_items": [],
+ "topics": ["IA"],
+ "quotes": [],
+ "open_questions": [],
+ }
+ chapters_payload = [
+ {"title": "Intro", "start": 0, "end": 60, "summary": "Abertura"},
+ {"title": "Core", "start": 60, "end": 180, "summary": "Conteudo principal"},
+ ]
+
+ with patch(
+ "lazier.api.routes.process_source",
+ return_value={
+ "mode": "summarize",
+ "input_type": "audio",
+ "source_name": "sample.mp3",
+ "metadata": {
+ "title": "Sample",
+ "smart_summary": smart_payload,
+ "chapters": chapters_payload,
+ "content_type": "podcast",
+ },
+ "transcription": "Texto completo",
+ "summary": "Resumo inteligente",
+ "smart_summary": smart_payload,
+ "chapters": chapters_payload,
+ "content_type": "podcast",
+ "result_path": str(output_path),
+ "transcription_path": None,
+ "summary_path": str(output_path),
+ },
+ ) as mock_process:
+ response = self.client.post(
+ "/api/process",
+ json={
+ "url": "https://example.com/article",
+ "format": "md",
+ "mode": "summarize",
+ "chat_model": "gpt-5",
+ "transcribe_model": "gpt-4o-transcribe",
+ "smart": True,
+ "chapters": True,
+ },
+ )
+
+ self.assertEqual(response.status_code, 200)
+ job_id = response.json()["job_id"]
+
+ kwargs = mock_process.call_args.kwargs
+ self.assertEqual(kwargs.get("gpt_model"), "gpt-5")
+ self.assertEqual(kwargs.get("model"), "gpt-4o-transcribe")
+ self.assertTrue(kwargs.get("use_smart_summary"))
+ self.assertTrue(kwargs.get("use_chapters"))
+
+ details = self.client.get(f"/api/jobs/{job_id}/details")
+ payload = details.json()
+ self.assertEqual(payload["smart_summary"], smart_payload)
+ self.assertEqual(payload["chapters"], chapters_payload)
+ self.assertEqual(payload["content_type"], "podcast")
+
def test_history_survives_app_recreation(self):
output_dir = Path(os.environ["LAZIER_OUTPUT_DIR"]) / "2026" / "03" / "31" / "sample-job"
output_dir.mkdir(parents=True, exist_ok=True)
diff --git a/tests/test_chapters.py b/tests/test_chapters.py
@@ -0,0 +1,93 @@
+"""Testes do gerador de capitulos."""
+
+from __future__ import annotations
+
+import json
+import os
+import unittest
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from lazier.core.chapters import build_chapters
+from lazier.core.config import reset_model_config_cache
+
+
+class _FakeChoice:
+ def __init__(self, content: str) -> None:
+ self.message = SimpleNamespace(content=content)
+
+
+class _FakeResponse:
+ def __init__(self, content: str) -> None:
+ self.choices = [_FakeChoice(content)]
+
+
+def _segments(count: int, step: float = 5.0):
+ """Gera segments simples com timestamps incrementais."""
+ return [
+ {
+ "start": i * step,
+ "end": (i + 1) * step,
+ "text": f"trecho {i + 1}",
+ }
+ for i in range(count)
+ ]
+
+
+class ChaptersTests(unittest.TestCase):
+ def setUp(self) -> None:
+ os.environ["OPENAI_API_KEY"] = "test-key"
+ os.environ.pop("OPENAI_REASONING_EFFORT", None)
+ reset_model_config_cache()
+
+ def tearDown(self) -> None:
+ reset_model_config_cache()
+
+ def test_returns_empty_when_no_segments(self):
+ self.assertEqual(build_chapters([]), [])
+
+ def test_uses_fallback_for_short_audio(self):
+ # Menos que MIN_SEGMENTS_FOR_CHAPTERS (6) -> fallback determinista.
+ result = build_chapters(_segments(3))
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]["start"], 0.0)
+ self.assertGreater(result[0]["end"], 0.0)
+ self.assertIn("start_hms", result[0])
+
+ def test_builds_chapters_from_llm_response(self):
+ segments = _segments(20, step=10.0) # 0..200s
+ llm_payload = {
+ "chapters": [
+ {"title": "Intro", "summary": "Abertura.", "start_index": 0, "end_index": 4},
+ {"title": "Core", "summary": "Conteudo principal.", "start_index": 5, "end_index": 14},
+ {"title": "Wrap", "summary": "Fechamento.", "start_index": 15, "end_index": 19},
+ ]
+ }
+
+ client = MagicMock()
+ client.chat.completions.create.return_value = _FakeResponse(json.dumps(llm_payload))
+
+ with patch("lazier.core.chapters._ensure_client", return_value=client):
+ chapters = build_chapters(segments, model="gpt-4o-mini")
+
+ self.assertEqual(len(chapters), 3)
+ self.assertEqual(chapters[0]["title"], "Intro")
+ self.assertEqual(chapters[0]["start"], 0.0)
+ self.assertGreaterEqual(chapters[1]["start"], chapters[0]["end"])
+ self.assertEqual(chapters[2]["title"], "Wrap")
+
+ def test_falls_back_when_llm_fails(self):
+ segments = _segments(10, step=6.0)
+ client = MagicMock()
+ client.chat.completions.create.side_effect = Exception("network down")
+
+ with patch("lazier.core.chapters._ensure_client", return_value=client):
+ chapters = build_chapters(segments)
+
+ self.assertGreaterEqual(len(chapters), 1)
+ self.assertEqual(chapters[0]["start"], 0.0)
+ self.assertEqual(chapters[0]["title"], "Parte 1")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_processing.py b/tests/test_processing.py
@@ -5,6 +5,7 @@ import uuid
from pathlib import Path
from unittest.mock import patch
+from lazier.core.config import reset_model_config_cache
from lazier.core.processing import process_source
@@ -15,9 +16,14 @@ class ProcessingTests(unittest.TestCase):
os.environ["OPENAI_API_KEY"] = "test-key"
os.environ["LAZIER_OUTPUT_DIR"] = str(self.temp_dir)
os.environ["LAZIER_DATA_DIR"] = str(self.temp_dir)
+ # Garantir defaults previsiveis para os testes legados.
+ os.environ.pop("OPENAI_ENABLE_SMART_SUMMARY", None)
+ os.environ.pop("OPENAI_ENABLE_CHAPTERS", None)
+ reset_model_config_cache()
def tearDown(self):
shutil.rmtree(self.temp_dir, ignore_errors=True)
+ reset_model_config_cache()
def test_audio_transcribe_generates_portuguese_transcription_file(self):
audio_path = self.temp_dir / "sample.mp3"
@@ -25,21 +31,27 @@ class ProcessingTests(unittest.TestCase):
with patch("lazier.core.processing.transcribe_audio", return_value="Hello world"), patch(
"lazier.core.processing.render_text_in_portuguese", return_value="Olá mundo"
+ ), patch(
+ "lazier.core.processing.detect_content_type",
+ return_value={"content_type": "podcast", "confidence": 0.9, "rationale": ""},
):
result = process_source(
str(audio_path),
mode="transcribe",
output_format="txt",
+ use_smart_summary=False,
+ use_chapters=False,
run_id="job-audio",
source_name="sample.mp3",
output_root=self.temp_dir,
)
self.assertEqual(result["transcription"], "Olá mundo")
+ self.assertEqual(result["content_type"], "podcast")
self.assertTrue(result["result_path"].endswith("transcricao.txt"))
self.assertTrue(Path(result["result_path"]).exists())
- def test_text_summarize_generates_summary_file(self):
+ def test_text_summarize_generates_summary_file_legacy(self):
text_path = self.temp_dir / "article.txt"
text_path.write_text("This is a long article in English.", encoding="utf-8")
@@ -49,11 +61,16 @@ class ProcessingTests(unittest.TestCase):
), patch(
"lazier.core.processing.summarize_text",
return_value="Resumo em português.",
+ ), patch(
+ "lazier.core.processing.detect_content_type",
+ return_value={"content_type": "tech_doc", "confidence": 0.7, "rationale": ""},
):
result = process_source(
str(text_path),
mode="summarize",
output_format="txt",
+ use_smart_summary=False,
+ use_chapters=False,
run_id="job-text",
source_name="article.txt",
output_root=self.temp_dir,
@@ -61,5 +78,55 @@ class ProcessingTests(unittest.TestCase):
self.assertEqual(result["summary"], "Resumo em português.")
self.assertEqual(result["transcription"], "Este é um artigo longo em português.")
+ self.assertEqual(result["content_type"], "tech_doc")
+ self.assertIsNone(result["smart_summary"])
self.assertTrue(result["result_path"].endswith("sumario.txt"))
self.assertTrue(Path(result["result_path"]).exists())
+
+ def test_text_summarize_uses_smart_summary_when_enabled(self):
+ text_path = self.temp_dir / "smart.txt"
+ text_path.write_text("conteudo original em ingles", encoding="utf-8")
+
+ smart_payload = {
+ "tldr": "Resumo curto",
+ "key_points": ["Ponto 1", "Ponto 2"],
+ "decisions": [],
+ "action_items": [{"owner": "Maria", "task": "Revisar texto", "due_hint": "amanha"}],
+ "topics": ["IA"],
+ "quotes": [],
+ "open_questions": [],
+ }
+
+ with patch(
+ "lazier.core.processing.render_text_in_portuguese",
+ return_value="Texto convertido para portugues",
+ ), patch(
+ "lazier.core.processing.detect_content_type",
+ return_value={"content_type": "lecture", "confidence": 0.85, "rationale": ""},
+ ), patch(
+ "lazier.core.processing.summarize_smart",
+ return_value=smart_payload,
+ ) as mock_smart, patch(
+ "lazier.core.processing.summarize_text",
+ ) as mock_legacy:
+ result = process_source(
+ str(text_path),
+ mode="summarize",
+ output_format="md",
+ use_smart_summary=True,
+ use_chapters=False,
+ run_id="job-smart",
+ source_name="smart.txt",
+ output_root=self.temp_dir,
+ )
+
+ mock_smart.assert_called_once()
+ mock_legacy.assert_not_called()
+ self.assertEqual(result["smart_summary"], smart_payload)
+ self.assertEqual(result["content_type"], "lecture")
+ self.assertIn("Resumo curto", result["summary"])
+ self.assertTrue(result["result_path"].endswith("sumario.md"))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_smart_summary.py b/tests/test_smart_summary.py
@@ -0,0 +1,153 @@
+"""Testes do sumario inteligente (`summarize_smart`)."""
+
+from __future__ import annotations
+
+import json
+import os
+import unittest
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from lazier.core.config import reset_model_config_cache
+
+
+class _FakeChoice:
+ def __init__(self, content: str) -> None:
+ self.message = SimpleNamespace(content=content)
+
+
+class _FakeResponse:
+ def __init__(self, content: str) -> None:
+ self.choices = [_FakeChoice(content)]
+
+
+def _make_client(*responses):
+ """Cria um mock que devolve `responses` em sequencia (e re-usa o ultimo)."""
+
+ queue = list(responses)
+
+ def _create(**_kwargs):
+ if len(queue) > 1:
+ return queue.pop(0)
+ return queue[0]
+
+ client = MagicMock()
+ client.chat.completions.create.side_effect = _create
+ return client
+
+
+class SummarizeSmartTests(unittest.TestCase):
+ def setUp(self) -> None:
+ os.environ["OPENAI_API_KEY"] = "test-key"
+ os.environ.pop("OPENAI_REASONING_EFFORT", None)
+ os.environ.pop("OPENAI_CHAT_MODEL", None)
+ reset_model_config_cache()
+
+ def tearDown(self) -> None:
+ reset_model_config_cache()
+
+ def test_smart_summary_parses_structured_output(self):
+ from lazier.summarizer import summarize_smart
+
+ payload = {
+ "tldr": "Resumo curto",
+ "key_points": ["A", "B"],
+ "decisions": ["Decisao 1"],
+ "action_items": [
+ {"owner": "Maria", "task": "Revisar", "due_hint": "amanha"}
+ ],
+ "topics": ["IA"],
+ "quotes": [],
+ "open_questions": [],
+ }
+
+ client = _make_client(_FakeResponse(json.dumps(payload)))
+ with patch("lazier.summarizer._ensure_client", return_value=client):
+ result = summarize_smart("texto longo " * 100)
+
+ self.assertEqual(result["tldr"], "Resumo curto")
+ self.assertEqual(result["key_points"], ["A", "B"])
+ self.assertEqual(result["action_items"][0]["owner"], "Maria")
+
+ def test_smart_summary_falls_back_to_json_object_on_schema_error(self):
+ from lazier.summarizer import summarize_smart
+
+ good_payload = {
+ "tldr": "Fallback",
+ "key_points": ["X"],
+ "decisions": [],
+ "action_items": [],
+ "topics": [],
+ "quotes": [],
+ "open_questions": [],
+ }
+
+ client = MagicMock()
+ client.chat.completions.create.side_effect = [
+ Exception("structured outputs unavailable"),
+ _FakeResponse(json.dumps(good_payload)),
+ ]
+ with patch("lazier.summarizer._ensure_client", return_value=client):
+ result = summarize_smart("conteudo")
+
+ self.assertEqual(result["tldr"], "Fallback")
+ self.assertEqual(result["key_points"], ["X"])
+ self.assertEqual(client.chat.completions.create.call_count, 2)
+
+ def test_smart_summary_merges_chunks(self):
+ """Quando o texto entra em multiplos chunks, o merge final e aplicado."""
+ from lazier import summarizer
+ from lazier.summarizer import summarize_smart, SmartSummary
+
+ parcial_a = SmartSummary(
+ tldr="Parte 1",
+ key_points=["P1A", "P1B"],
+ topics=["t1"],
+ )
+ parcial_b = SmartSummary(
+ tldr="Parte 2",
+ key_points=["P2A"],
+ decisions=["D2"],
+ topics=["t2"],
+ )
+ merged = SmartSummary(
+ tldr="Merge final",
+ key_points=["P1A", "P1B", "P2A"],
+ decisions=["D2"],
+ topics=["t1", "t2"],
+ )
+
+ chunk_responses = [parcial_a, parcial_b]
+
+ def _chunk_side_effect(_text, _model, _content_type, _language):
+ return chunk_responses.pop(0) if chunk_responses else parcial_b
+
+ long_text = ("paragrafo " * 50 + "\n\n") * 3
+ with patch.object(
+ summarizer,
+ "_summarize_smart_chunk",
+ side_effect=_chunk_side_effect,
+ ) as mock_chunk, patch.object(
+ summarizer,
+ "_merge_smart_summaries",
+ return_value=merged,
+ ) as mock_merge, patch.object(
+ summarizer, "DEFAULT_CHUNK_CHAR_LIMIT", len(long_text) // 2
+ ):
+ result = summarize_smart(long_text)
+
+ self.assertEqual(result["tldr"], "Merge final")
+ self.assertEqual(result["decisions"], ["D2"])
+ self.assertGreaterEqual(mock_chunk.call_count, 2)
+ mock_merge.assert_called_once()
+
+ def test_smart_summary_handles_empty_text(self):
+ from lazier.summarizer import summarize_smart
+
+ result = summarize_smart(" ")
+ self.assertIn("Texto vazio", result["tldr"])
+ self.assertEqual(result["key_points"], [])
+
+
+if __name__ == "__main__":
+ unittest.main()