commit f03ce8665e55aa997acd082b4d50d08e7238362e
parent ded9e411ecc4c7b916c832b17bc31f68dfb4f593
Author: Pablo Murad <pblmrd@gmail.com>
Date: Sat, 24 Jan 2026 21:53:54 -0300
ajuste xml
Diffstat:
4 files changed, 205 insertions(+), 35 deletions(-)
diff --git a/lazier/docx_generator.py b/lazier/docx_generator.py
@@ -10,6 +10,8 @@ from docx.shared import Pt, Inches, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
from typing import Optional, Dict, Any
+from .utils import sanitize_xml_string
+
def create_document(
transcription: str,
@@ -39,28 +41,29 @@ def create_document(
# Título
title = metadata.get('title', 'Transcrição') if metadata else 'Transcrição'
+ title = sanitize_xml_string(title)
title_para = doc.add_heading(title, level=1)
title_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
# Metadados
metadata_section = doc.add_paragraph()
metadata_section.add_run('Data de processamento: ').bold = True
- metadata_section.add_run(datetime.now().strftime('%d/%m/%Y %H:%M:%S'))
+ metadata_section.add_run(sanitize_xml_string(datetime.now().strftime('%d/%m/%Y %H:%M:%S')))
if metadata:
if metadata.get('duration'):
duration_sec = metadata['duration']
duration_str = _format_duration(duration_sec)
metadata_section.add_run('\nDuração: ').bold = True
- metadata_section.add_run(duration_str)
+ metadata_section.add_run(sanitize_xml_string(str(duration_str)))
if metadata.get('uploader'):
metadata_section.add_run('\nCanal/Criador: ').bold = True
- metadata_section.add_run(metadata['uploader'])
+ metadata_section.add_run(sanitize_xml_string(str(metadata['uploader'])))
if metadata.get('webpage_url'):
metadata_section.add_run('\nURL: ').bold = True
- metadata_section.add_run(metadata['webpage_url'])
+ metadata_section.add_run(sanitize_xml_string(str(metadata['webpage_url'])))
# Espaçamento
doc.add_paragraph()
@@ -69,6 +72,9 @@ def create_document(
if summary:
doc.add_heading('Sumário', level=2)
+ # Sanitizar sumário antes de processar
+ summary = sanitize_xml_string(summary)
+
# Divide sumário em parágrafos
summary_paragraphs = summary.split('\n\n')
if len(summary_paragraphs) == 1:
@@ -86,6 +92,9 @@ def create_document(
# Seção de Transcrição
doc.add_heading('Transcrição Completa', level=2)
+ # Sanitizar transcrição antes de processar
+ transcription = sanitize_xml_string(transcription)
+
# Divide transcrição em parágrafos
transcription_paragraphs = transcription.split('\n\n')
if len(transcription_paragraphs) == 1:
@@ -131,6 +140,9 @@ def _parse_markdown_to_runs(text: str, paragraph):
Parse markdown básico e adiciona runs formatados ao parágrafo
Suporta: **negrito**, *itálico*, `código`
"""
+ # Sanitizar texto antes de processar
+ text = sanitize_xml_string(text)
+
# Padrão para negrito **texto** ou __texto__
bold_pattern = r'\*\*(.*?)\*\*|__(.*?)__'
# Padrão para código inline `código`
@@ -144,6 +156,8 @@ def _parse_markdown_to_runs(text: str, paragraph):
# Processa negrito primeiro
for match in re.finditer(bold_pattern, text):
content = match.group(1) if match.group(1) else match.group(2)
+ # Sanitizar conteúdo extraído
+ content = sanitize_xml_string(content)
matches.append((match.start(), match.end(), content, 'bold'))
# Processa código (não pode sobrepor negrito)
@@ -151,7 +165,8 @@ def _parse_markdown_to_runs(text: str, paragraph):
# Verifica se não está dentro de um match de negrito
is_inside_bold = any(start <= match.start() < end for start, end, _, _ in matches)
if not is_inside_bold:
- matches.append((match.start(), match.end(), match.group(1), 'code'))
+ content = sanitize_xml_string(match.group(1))
+ matches.append((match.start(), match.end(), content, 'code'))
# Processa itálico (não pode sobrepor negrito ou código)
for match in re.finditer(italic_pattern, text):
@@ -159,6 +174,7 @@ def _parse_markdown_to_runs(text: str, paragraph):
# Verifica se não está dentro de um match existente
is_inside_other = any(start <= match.start() < end for start, end, _, _ in matches)
if not is_inside_other:
+ content = sanitize_xml_string(content)
matches.append((match.start(), match.end(), content, 'italic'))
# Ordena matches por posição
@@ -169,7 +185,8 @@ def _parse_markdown_to_runs(text: str, paragraph):
for start, end, content, style in matches:
# Adiciona texto antes do match
if start > last_pos:
- paragraph.add_run(text[last_pos:start])
+ text_before = sanitize_xml_string(text[last_pos:start])
+ paragraph.add_run(text_before)
# Adiciona run formatado
run = paragraph.add_run(content)
@@ -185,21 +202,23 @@ def _parse_markdown_to_runs(text: str, paragraph):
# Adiciona texto restante
if last_pos < len(text):
- paragraph.add_run(text[last_pos:])
+ text_remaining = sanitize_xml_string(text[last_pos:])
+ paragraph.add_run(text_remaining)
def _add_markdown_paragraph(doc, text: str, is_summary: bool = False):
"""
Adiciona parágrafo ao documento processando markdown
"""
- text = text.strip()
+ # Sanitizar texto antes de processar
+ text = sanitize_xml_string(text.strip())
if not text:
return
# Detecta títulos
if text.startswith('#'):
level = len(text) - len(text.lstrip('#'))
- title_text = text.lstrip('#').strip()
+ title_text = sanitize_xml_string(text.lstrip('#').strip())
if level <= 6 and title_text:
doc.add_heading(title_text, level=min(level, 6))
return
diff --git a/lazier/utils.py b/lazier/utils.py
@@ -7,6 +7,7 @@ import re
import shutil
import threading
import time
+import unicodedata
from pathlib import Path
from typing import Optional, Tuple
from urllib.parse import urlparse
@@ -133,6 +134,47 @@ def sanitize_filename(filename: str) -> str:
return filename.strip()
+def sanitize_xml_string(text: str) -> str:
+ """
+ Remove caracteres inválidos para XML, mantendo apenas caracteres válidos
+ Remove NULL bytes e caracteres de controle (exceto \n, \r, \t)
+ """
+ if not isinstance(text, str):
+ # Tenta converter para string
+ try:
+ text = str(text)
+ except:
+ return ""
+
+ # Remove NULL bytes
+ text = text.replace('\x00', '')
+
+ # Remove caracteres de controle (0x00-0x1F) exceto \n (0x0A), \r (0x0D), \t (0x09)
+ # Usa regex para remover caracteres de controle inválidos
+ text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x84\x86-\x9F]', '', text)
+
+ # Normaliza Unicode (NFKC - Compatibility Decomposition, followed by Canonical Composition)
+ try:
+ text = unicodedata.normalize('NFKC', text)
+ except:
+ # Se normalização falhar, continua sem normalizar
+ pass
+
+ # Remove caracteres não válidos em XML (categoria 'C' = controle, exceto \n\r\t)
+ result = []
+ for char in text:
+ category = unicodedata.category(char)
+ # Permite caracteres de controle válidos (\n, \r, \t) e todos os outros caracteres
+ if category[0] == 'C':
+ if char in '\n\r\t':
+ result.append(char)
+ # Ignora outros caracteres de controle
+ else:
+ result.append(char)
+
+ return ''.join(result)
+
+
def cleanup_files(file_paths: list[str]) -> None:
"""Remove arquivos temporários"""
for file_path in file_paths:
diff --git a/lazier/web/extractor.py b/lazier/web/extractor.py
@@ -4,6 +4,7 @@ Extração de conteúdo de páginas web e PDFs
import os
import re
+import random
from pathlib import Path
from typing import Optional, Dict, Any
import requests
@@ -11,31 +12,66 @@ from bs4 import BeautifulSoup
import pypdf
import pdfplumber
-
-"""
-Extração de conteúdo de páginas web e PDFs
-"""
-
-import os
-import re
-from pathlib import Path
-from typing import Optional, Dict, Any
-import requests
-from bs4 import BeautifulSoup
-import pypdf
-import pdfplumber
+from ..utils import sanitize_xml_string
def _extract_with_bs4(url: str, timeout: int) -> Dict[str, Any]:
"""Extrai conteúdo usando BeautifulSoup (método rápido, sem JavaScript)"""
+ # User-Agents rotativos para evitar bloqueios
+ user_agents = [
+ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
+ 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
+ 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
+ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
+ 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15',
+ ]
+
headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
+ 'User-Agent': random.choice(user_agents),
+ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
+ 'Accept-Language': 'pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7',
+ 'Accept-Encoding': 'gzip, deflate, br',
+ 'Connection': 'keep-alive',
+ 'Upgrade-Insecure-Requests': '1',
+ 'Sec-Fetch-Dest': 'document',
+ 'Sec-Fetch-Mode': 'navigate',
+ 'Sec-Fetch-Site': 'none',
}
- response = requests.get(url, headers=headers, timeout=timeout)
+ response = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True)
response.raise_for_status()
- soup = BeautifulSoup(response.content, 'html.parser')
+ # Detectar encoding automaticamente
+ encoding = None
+ if response.encoding:
+ encoding = response.encoding
+ else:
+ # Tentar detectar encoding do conteúdo
+ try:
+ import chardet
+ detected = chardet.detect(response.content)
+ encoding = detected.get('encoding', 'utf-8')
+ except ImportError:
+ # Fallback: tentar charset_normalizer se disponível
+ try:
+ from charset_normalizer import detect
+ detected = detect(response.content)
+ if detected:
+ encoding = detected.get('encoding', 'utf-8')
+ except ImportError:
+ encoding = 'utf-8'
+
+ # Decodificar conteúdo
+ try:
+ if encoding:
+ content = response.content.decode(encoding, errors='replace')
+ else:
+ content = response.content.decode('utf-8', errors='replace')
+ except (UnicodeDecodeError, LookupError):
+ # Fallback para utf-8 com replace
+ content = response.content.decode('utf-8', errors='replace')
+
+ soup = BeautifulSoup(content, 'html.parser')
# Remove scripts e styles
for script in soup(["script", "style", "nav", "header", "footer", "aside"]):
@@ -49,10 +85,12 @@ def _extract_with_bs4(url: str, timeout: int) -> Dict[str, Any]:
title = soup.find('h1').get_text()
elif soup.find('meta', property='og:title'):
title = soup.find('meta', property='og:title').get('content')
+ elif soup.find('meta', attrs={'name': 'title'}):
+ title = soup.find('meta', attrs={'name': 'title'}).get('content')
# Extrai texto principal
main_content = None
- for tag in ['article', 'main', '[role="main"]', '.content', '.post', '.entry']:
+ for tag in ['article', 'main', '[role="main"]', '.content', '.post', '.entry', '.article']:
main_content = soup.select_one(tag)
if main_content:
break
@@ -70,8 +108,12 @@ def _extract_with_bs4(url: str, timeout: int) -> Dict[str, Any]:
lines = [line.strip() for line in text.split('\n') if line.strip()]
text = '\n'.join(lines)
+ # Sanitizar antes de retornar
+ text = sanitize_xml_string(text)
+ title = sanitize_xml_string(title) if title else 'Conteúdo Web'
+
return {
- 'title': title or 'Conteúdo Web',
+ 'title': title,
'content': text,
'url': url,
'length': len(text),
@@ -84,21 +126,76 @@ def _extract_with_playwright(url: str, timeout: int) -> Dict[str, Any]:
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
- browser = p.chromium.launch(headless=True)
- page = browser.new_page()
+ browser = p.chromium.launch(
+ headless=True,
+ args=[
+ '--disable-blink-features=AutomationControlled',
+ '--disable-dev-shm-usage',
+ '--no-sandbox',
+ '--disable-setuid-sandbox',
+ ]
+ )
+ context = browser.new_context(
+ user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
+ viewport={'width': 1920, 'height': 1080},
+ locale='pt-BR',
+ timezone_id='America/Sao_Paulo',
+ )
+ page = context.new_page()
+
+ # Adicionar headers extras
+ page.set_extra_http_headers({
+ 'Accept-Language': 'pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7',
+ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
+ })
# Configurar timeout
page.set_default_timeout(timeout * 1000)
- # Navegar para a página
- page.goto(url, wait_until='networkidle', timeout=timeout * 1000)
+ # Navegar para a página com múltiplas estratégias de espera
+ try:
+ page.goto(url, wait_until='networkidle', timeout=timeout * 1000)
+ except:
+ # Se networkidle falhar, tentar domcontentloaded
+ try:
+ page.goto(url, wait_until='domcontentloaded', timeout=timeout * 1000)
+ except:
+ # Último recurso: load
+ page.goto(url, wait_until='load', timeout=timeout * 1000)
# Espera um pouco para garantir que JS terminou de renderizar
- page.wait_for_timeout(2000)
+ page.wait_for_timeout(3000)
+
+ # Tenta esperar por conteúdo principal se possível
+ try:
+ # Espera por elementos comuns de conteúdo
+ selectors = ['article', 'main', '[role="main"]', '.content', 'body']
+ for selector in selectors:
+ try:
+ page.wait_for_selector(selector, timeout=5000)
+ break
+ except:
+ continue
+ except:
+ pass # Continua mesmo se não encontrar
# Extrai conteúdo
title = page.title()
- content = page.inner_text('body')
+
+ # Tenta extrair conteúdo principal primeiro
+ content = None
+ for selector in ['article', 'main', '[role="main"]', '.content', '.post', '.article']:
+ try:
+ element = page.query_selector(selector)
+ if element:
+ content = element.inner_text()
+ break
+ except:
+ continue
+
+ # Se não encontrou conteúdo específico, pega o body inteiro
+ if not content:
+ content = page.inner_text('body')
browser.close()
@@ -111,8 +208,12 @@ def _extract_with_playwright(url: str, timeout: int) -> Dict[str, Any]:
if len(text) > max_length:
text = text[:max_length] + "\n\n[... conteúdo truncado ...]"
+ # Sanitizar antes de retornar
+ text = sanitize_xml_string(text)
+ title = sanitize_xml_string(title) if title else 'Conteúdo Web'
+
return {
- 'title': title or 'Conteúdo Web',
+ 'title': title,
'content': text,
'url': url,
'length': len(text),
@@ -235,7 +336,10 @@ def extract_pdf_content(file_path: str) -> Dict[str, Any]:
if len(text) > max_length:
text = text[:max_length] + "\n\n[... conteúdo truncado ...]"
+ # Sanitizar antes de retornar
+ text = sanitize_xml_string(text)
title = metadata.get('title', '') or Path(file_path).stem
+ title = sanitize_xml_string(title)
return {
'title': title,
@@ -284,8 +388,12 @@ def extract_text_file_content(file_path: str) -> Dict[str, Any]:
if len(content) > max_length:
content = content[:max_length] + "\n\n[... conteúdo truncado ...]"
+ # Sanitizar antes de retornar
+ content = sanitize_xml_string(content)
+ title = sanitize_xml_string(Path(file_path).stem)
+
return {
- 'title': Path(file_path).stem,
+ 'title': title,
'content': content,
'file_path': file_path,
'length': len(content),
diff --git a/requirements.txt b/requirements.txt
@@ -17,3 +17,4 @@ aiofiles>=23.2.0
redis>=5.0.0
hiredis>=2.2.0
playwright>=1.40.0
+chardet>=5.0.0