ドキュメントを更新し、型注釈を追加してコードの可読性とメンテナンス性を向上。各モジュールの使用例や依存関係を明示化し、エラーハンドリングを改善。

This commit is contained in:
misyaguziya
2025-10-09 17:35:55 +09:00
parent b26129af68
commit 690a2f081b
5 changed files with 276 additions and 60 deletions

View File

@@ -1,7 +1,14 @@
"""Runtime transcriber that wraps Google SpeechRecognition and faster-whisper.
This class focuses on converting incoming raw audio buffers into text using
either the Google web recognizer (online) or a local Whisper model (offline).
"""
import time
from io import BytesIO
from threading import Event
import wave
from typing import Any, Callable, Dict, List, Optional, Tuple
from speech_recognition import Recognizer, AudioData, AudioFile
from speech_recognition.exceptions import UnknownValueError
from datetime import timedelta
@@ -20,38 +27,71 @@ warnings.simplefilter('ignore', RuntimeWarning)
PHRASE_TIMEOUT = 3
MAX_PHRASES = 10
class AudioTranscriber:
def __init__(self, speaker, source, phrase_timeout, max_phrases, transcription_engine, root=None, whisper_weight_type=None, device="cpu", device_index=0, compute_type="auto"):
"""Convert queued audio buffers into transcripts.
Public attributes set by the constructor:
- speaker: bool
- phrase_timeout: int
- max_phrases: int
Methods are intentionally permissive about input types to match the
existing codebase; this wrapper adds typing for clarity.
"""
def __init__(
self,
speaker: bool,
source: Any,
phrase_timeout: int,
max_phrases: int,
transcription_engine: str,
root: Optional[str] = None,
whisper_weight_type: Optional[str] = None,
device: str = "cpu",
device_index: int = 0,
compute_type: str = "auto",
) -> None:
self.speaker = speaker
self.phrase_timeout = phrase_timeout
self.max_phrases = max_phrases
self.transcript_data = []
self.transcript_data: List[Dict[str, Any]] = []
self.transcript_changed_event = Event()
self.audio_recognizer = Recognizer()
self.transcription_engine = "Google"
self.whisper_model = None
self.audio_sources = {
"sample_rate": source.SAMPLE_RATE,
"sample_width": source.SAMPLE_WIDTH,
"channels": source.channels,
"last_sample": bytes(),
"last_spoken": None,
"new_phrase": True,
"process_data_func": self.processSpeakerData if speaker else self.processSpeakerData
self.audio_sources: Dict[str, Any] = {
"sample_rate": source.SAMPLE_RATE,
"sample_width": source.SAMPLE_WIDTH,
"channels": source.channels,
"last_sample": bytes(),
"last_spoken": None,
"new_phrase": True,
"process_data_func": self.processSpeakerData if speaker else self.processSpeakerData,
}
if transcription_engine == "Whisper" and checkWhisperWeight(root, whisper_weight_type) is True:
self.whisper_model = getWhisperModel(root, whisper_weight_type, device=device, device_index=device_index, compute_type=compute_type)
self.whisper_model = getWhisperModel(
root, whisper_weight_type, device=device, device_index=device_index, compute_type=compute_type
)
self.transcription_engine = "Whisper"
def transcribeAudioQueue(self, audio_queue, languages, countries, avg_logprob=-0.8, no_speech_prob=0.6):
def transcribeAudioQueue(
self,
audio_queue: Any,
languages: List[str],
countries: List[str],
avg_logprob: float = -0.8,
no_speech_prob: float = 0.6,
) -> bool:
if audio_queue.empty():
time.sleep(0.01)
return False
audio, time_spoken = audio_queue.get()
self.updateLastSampleAndPhraseStatus(audio, time_spoken)
confidences = [{"confidence": 0, "text": "", "language": None}]
confidences: List[Dict[str, Any]] = [{"confidence": 0, "text": "", "language": None}]
try:
audio_data = self.audio_sources["process_data_func"]()
match self.transcription_engine:
@@ -67,13 +107,19 @@ class AudioTranscriber:
except Exception:
pass
case "Whisper":
audio_data = np.frombuffer(audio_data.get_raw_data(convert_rate=16000, convert_width=2), np.int16).flatten().astype(np.float32) / 32768.0
audio_data = np.frombuffer(
audio_data.get_raw_data(convert_rate=16000, convert_width=2), np.int16
).flatten().astype(np.float32) / 32768.0
if isinstance(audio_data, torch.Tensor):
audio_data = audio_data.detach().numpy()
for language, country in zip(languages, countries):
text = ""
source_language = transcription_lang[language][country][self.transcription_engine] if len(languages) == 1 else None
source_language = (
transcription_lang[language][country][self.transcription_engine]
if len(languages) == 1
else None
)
segments, info = self.whisper_model.transcribe(
audio_data,
beam_size=5,
@@ -85,13 +131,15 @@ class AudioTranscriber:
without_timestamps=True,
task="transcribe",
vad_filter=False,
)
)
for s in segments:
if s.avg_logprob < avg_logprob or s.no_speech_prob > no_speech_prob:
continue
text += s.text
confidences.append({"confidence": info.language_probability, "text": text, "language": language})
if (len(languages) == 1) or (transcription_lang[language][country][self.transcription_engine] == info.language):
if (len(languages) == 1) or (
transcription_lang[language][country][self.transcription_engine] == info.language
):
break
except UnknownValueError:
@@ -106,7 +154,7 @@ class AudioTranscriber:
self.updateTranscript(result)
return True
def updateLastSampleAndPhraseStatus(self, data, time_spoken):
def updateLastSampleAndPhraseStatus(self, data: bytes, time_spoken) -> None:
source_info = self.audio_sources
if source_info["last_spoken"] and time_spoken - source_info["last_spoken"] > timedelta(seconds=self.phrase_timeout):
source_info["last_sample"] = bytes()
@@ -117,11 +165,13 @@ class AudioTranscriber:
source_info["last_sample"] += data
source_info["last_spoken"] = time_spoken
def processMicData(self):
audio_data = AudioData(self.audio_sources["last_sample"], self.audio_sources["sample_rate"], self.audio_sources["sample_width"])
def processMicData(self) -> AudioData:
audio_data = AudioData(
self.audio_sources["last_sample"], self.audio_sources["sample_rate"], self.audio_sources["sample_width"]
)
return audio_data
def processSpeakerData(self):
def processSpeakerData(self) -> AudioData:
temp_file = BytesIO()
with wave.open(temp_file, 'wb') as wf:
wf.setnchannels(self.audio_sources["channels"])
@@ -141,7 +191,7 @@ class AudioTranscriber:
audio = self.audio_recognizer.record(source)
return audio
def updateTranscript(self, result):
def updateTranscript(self, result: dict) -> None:
source_info = self.audio_sources
transcript = self.transcript_data
@@ -152,14 +202,14 @@ class AudioTranscriber:
else:
transcript[0] = result
def getTranscript(self):
def getTranscript(self) -> dict:
if len(self.transcript_data) > 0:
result = self.transcript_data.pop(-1)
else:
result = {"confidence": 0, "text": "", "language": None}
return result
def clearTranscriptData(self):
def clearTranscriptData(self) -> None:
self.transcript_data.clear()
self.audio_sources["last_sample"] = bytes()
self.audio_sources["new_phrase"] = True