diarization.py (4849B)
1 """ 2 Diarização opcional de falantes após STT. 3 4 Provedores futuros (AssemblyAI, pyannote, etc.) devem preencher intervalos 5 ``{start, end, speaker_id}`` em segundos; o texto é reorganizado em blocos 6 ``**Falante N**`` alinhados aos segmentos do Whisper quando possível. 7 """ 8 9 from __future__ import annotations 10 11 import logging 12 from typing import Any, Dict, List, Optional, Tuple 13 14 from .core.config import get_model_config 15 16 _log = logging.getLogger(__name__) 17 18 19 def _segment_midpoint_seconds(seg: Dict[str, Any]) -> float: 20 start = float(seg.get("start", 0) or 0) 21 end = float(seg.get("end", start) or start) 22 return (start + end) / 2.0 23 24 25 def _speaker_key(interval: Dict[str, Any]) -> str: 26 sid = interval.get("speaker_id", interval.get("speaker")) 27 if sid is None: 28 return "0" 29 return str(sid) 30 31 32 def _build_speaker_rank(intervals: List[Dict[str, Any]]) -> Dict[str, int]: 33 """Mapeia identificador bruto do provedor para Falante 1..N (ordem de primeira ocorrência).""" 34 seen: List[str] = [] 35 for iv in intervals: 36 k = _speaker_key(iv) 37 if k not in seen: 38 seen.append(k) 39 return {k: i + 1 for i, k in enumerate(seen)} 40 41 42 def _speaker_num_at_time( 43 t: float, intervals: List[Dict[str, Any]], rank: Dict[str, int] 44 ) -> int: 45 for iv in intervals: 46 try: 47 s = float(iv.get("start", 0)) 48 e = float(iv.get("end", s)) 49 except (TypeError, ValueError): 50 continue 51 if s <= t <= e: 52 k = _speaker_key(iv) 53 return rank.get(k, 1) 54 return 1 55 56 57 def merge_transcript_with_speakers( 58 raw_text: str, 59 segments: List[Dict[str, Any]], 60 intervals: List[Dict[str, Any]], 61 ) -> str: 62 """Junta texto bruto ou segmentos Whisper com rótulos **Falante N** (blocos consecutivos).""" 63 if not intervals: 64 return raw_text 65 66 rank = _build_speaker_rank(intervals) 67 68 if segments: 69 parts: List[str] = [] 70 current_sp: Optional[int] = None 71 buffer: List[str] = [] 72 73 def flush() -> None: 74 nonlocal buffer, current_sp 75 if current_sp is None or not buffer: 76 return 77 text = " ".join(buffer).strip() 78 if text: 79 parts.append(f"**Falante {current_sp}**\n\n{text}") 80 buffer = [] 81 82 for seg in segments: 83 text = (seg.get("text") or "").strip() 84 if not text: 85 continue 86 sp = _speaker_num_at_time(_segment_midpoint_seconds(seg), intervals, rank) 87 if current_sp is not None and sp != current_sp: 88 flush() 89 current_sp = sp 90 buffer.append(text) 91 flush() 92 return "\n\n".join(parts).strip() if parts else raw_text 93 94 lines_out: List[str] = [] 95 current_sp = 1 96 for para in raw_text.split("\n\n"): 97 p = para.strip() 98 if not p: 99 continue 100 lines_out.append(f"**Falante {current_sp}**\n\n{p}") 101 current_sp += 1 102 return "\n\n".join(lines_out).strip() if lines_out else raw_text 103 104 105 def diarize_audio_intervals(audio_path: str, provider: str) -> List[Dict[str, Any]]: 106 """ 107 Executa diarização externa. Retorna lista vazia se desligado ou não implementado. 108 109 Para integrar um provedor real, implemente aqui e defina LAZIER_DIARIZATION_PROVIDER. 110 """ 111 p = (provider or "").strip().lower() 112 if p in ("", "none", "off", "false"): 113 return [] 114 115 if p in ("assemblyai", "deepgram", "google", "pyannote"): 116 _log.warning( 117 "Diarização com provedor '%s' ainda não está integrada; " 118 "use LAZIER_DIARIZATION_PROVIDER=none ou contribua com o conector.", 119 p, 120 ) 121 return [] 122 123 _log.warning("LAZIER_DIARIZATION_PROVIDER='%s' desconhecido; ignorando.", provider) 124 return [] 125 126 127 def maybe_enrich_transcript_with_diarization( 128 audio_path: str, 129 raw_text: str, 130 segments: List[Dict[str, Any]], 131 metadata: Dict[str, Any], 132 ) -> Tuple[str, List[Dict[str, Any]]]: 133 """ 134 Opcionalmente altera o texto bruto (pré PT-BR) com marcadores **Falante N**. 135 ``segments`` são os do Whisper (opcionalmente vazios). 136 """ 137 cfg = get_model_config() 138 provider = (cfg.diarization_provider or "none").strip().lower() 139 metadata["diarization_provider"] = provider 140 141 if provider in ("", "none", "off", "false"): 142 metadata["diarization_status"] = "desligada" 143 return raw_text, segments 144 145 intervals = diarize_audio_intervals(audio_path, provider) 146 if not intervals: 147 metadata["diarization_status"] = "indisponivel" 148 return raw_text, segments 149 150 merged = merge_transcript_with_speakers(raw_text, segments, intervals) 151 metadata["diarization_status"] = "ativa" 152 metadata["diarization_segments"] = len(intervals) 153 return merged, segments