Merge branch 'develop' into translate_api

# Conflicts:
#	requirements.txt
#	requirements_cuda.txt
#	src-python/config.py
#	src-python/mainloop.py
#	src-python/model.py
#	src-python/models/osc/osc.py
#	src-python/models/translation/translation_languages.py
#	src-python/models/translation/translation_translator.py
#	src-python/models/translation/translation_utils.py
This commit is contained in:
misyaguziya
2025-10-14 12:47:47 +09:00
86 changed files with 20001 additions and 1584 deletions

View File

@@ -10,11 +10,10 @@ from time import sleep
from queue import Queue
from threading import Thread
from requests import get as requests_get
from typing import Callable
from typing import Callable, Optional, cast
from packaging.version import parse
from flashtext import KeywordProcessor
from pykakasi import kakasi
from device_manager import device_manager
from config import config
@@ -28,6 +27,7 @@ from models.translation.translation_languages import translation_lang
from models.transcription.transcription_languages import transcription_lang
from models.translation.translation_utils import checkCTranslate2Weight, downloadCTranslate2Weight, downloadCTranslate2Tokenizer
from models.transcription.transcription_whisper import checkWhisperWeight, downloadWhisperWeight
from models.transliteration.transliteration_transliterator import Transliterator
from models.overlay.overlay import Overlay
from models.overlay.overlay_image import OverlayImage
from models.watchdog.watchdog import Watchdog
@@ -35,30 +35,47 @@ from models.websocket.websocket_server import WebSocketServer
from utils import errorLogging, setupLogger
class threadFnc(Thread):
def __init__(self, fnc, end_fnc=None, daemon=True, *args, **kwargs):
super(threadFnc, self).__init__(daemon=daemon, target=fnc, *args, **kwargs)
"""A tiny Thread wrapper that repeatedly calls a function.
Usage: threadFnc(fnc, end_fnc=None, daemon=True, *args, **kwargs)
The target function will be called repeatedly inside run().
"""
def __init__(self, fnc, end_fnc=None, daemon: bool = True, *args, **kwargs):
# Do not pass target to super; manage call explicitly so we can
# store args/kwargs on the instance for later use.
super(threadFnc, self).__init__(daemon=daemon)
self.fnc = fnc
self.end_fnc = end_fnc
self.loop = True
self._pause = False
self._args = args
self._kwargs = kwargs
def stop(self):
def stop(self) -> None:
self.loop = False
def pause(self):
def pause(self) -> None:
self._pause = True
def resume(self):
def resume(self) -> None:
self._pause = False
def run(self):
while self.loop:
self.fnc(*self._args, **self._kwargs)
while self._pause:
sleep(0.1)
if callable(self.end_fnc):
self.end_fnc()
def run(self) -> None:
try:
while self.loop:
try:
self.fnc(*self._args, **self._kwargs)
except Exception:
# Protect the thread from terminating on user exceptions
errorLogging()
while self._pause:
sleep(0.1)
finally:
if callable(self.end_fnc):
try:
self.end_fnc()
except Exception:
errorLogging()
return
class Model:
@@ -67,10 +84,22 @@ class Model:
def __new__(cls):
if cls._instance is None:
cls._instance = super(Model, cls).__new__(cls)
cls._instance.init()
# Do NOT call init() here to avoid heavy import-time work.
# Callers should call `model.init()` explicitly or rely on
# `ensure_initialized()` which will lazy-initialize on demand.
cls._instance._inited = False
return cls._instance
def init(self):
"""Perform full initialization of resources.
This method performs heavy construction (models, overlay, threads)
and is intentionally not called at import time. Call explicitly
or let `ensure_initialized()` call it lazily.
"""
if getattr(self, '_inited', False):
return
self.logger = None
self.th_check_device = None
self.mic_print_transcript = None
@@ -99,23 +128,41 @@ class Model:
self.overlay_image = OverlayImage(config.PATH_LOCAL)
self.mic_audio_queue = None
self.mic_mute_status = None
self.kks = kakasi()
self.transliterator = None
self.watchdog = Watchdog(config.WATCHDOG_TIMEOUT, config.WATCHDOG_INTERVAL)
self.osc_handler = OSCHandler(config.OSC_IP_ADDRESS, config.OSC_PORT)
self.websocket_server = None
self.websocket_server_loop = False
self.websocket_server_alive = False
self.th_websocket_server = None
# default no-op callbacks for energy check functions
self.check_mic_energy_fnc: Callable[[float], None] = lambda v: None
self.check_speaker_energy_fnc: Callable[[float], None] = lambda v: None
self._inited = True
def ensure_initialized(self) -> None:
"""Ensure the model has been initialized. This is safe to call from
public methods that require initialized resources.
"""
if not getattr(self, '_inited', False):
try:
self.init()
except Exception:
# Log and continue; callers should handle missing features.
errorLogging()
def checkTranslatorCTranslate2ModelWeight(self, weight_type:str):
return checkCTranslate2Weight(config.PATH_LOCAL, weight_type)
def changeTranslatorCTranslate2Model(self):
self.ensure_initialized()
self.translator.changeCTranslate2Model(
config.PATH_LOCAL,
config.CTRANSLATE2_WEIGHT_TYPE,
config.SELECTED_TRANSLATION_COMPUTE_DEVICE["device"],
config.SELECTED_TRANSLATION_COMPUTE_DEVICE["device_index"])
path=config.PATH_LOCAL,
model_type=config.CTRANSLATE2_WEIGHT_TYPE,
device=config.SELECTED_TRANSLATION_COMPUTE_DEVICE["device"],
device_index=config.SELECTED_TRANSLATION_COMPUTE_DEVICE["device_index"],
compute_type=config.SELECTED_TRANSLATION_COMPUTE_TYPE
)
def downloadCTranslate2ModelWeight(self, weight_type, callback=None, end_callback=None):
return downloadCTranslate2Weight(config.PATH_LOCAL, weight_type, callback, end_callback)
@@ -124,8 +171,17 @@ class Model:
return downloadCTranslate2Tokenizer(config.PATH_LOCAL, weight_type)
def isLoadedCTranslate2Model(self):
self.ensure_initialized()
return self.translator.isLoadedCTranslate2Model()
def isChangedTranslatorParameters(self):
self.ensure_initialized()
return self.translator.isChangedTranslatorParameters()
def setChangedTranslatorParameters(self, is_changed):
self.ensure_initialized()
self.translator.setChangedTranslatorParameters(is_changed)
def checkTranscriptionWhisperModelWeight(self, weight_type:str):
return checkWhisperWeight(config.PATH_LOCAL, weight_type)
@@ -133,10 +189,12 @@ class Model:
return downloadWhisperWeight(config.PATH_LOCAL, weight_type, callback, end_callback)
def resetKeywordProcessor(self):
self.ensure_initialized()
del self.keyword_processor
self.keyword_processor = KeywordProcessor()
def authenticationTranslatorDeepLAuthKey(self, auth_key: str) -> bool:
self.ensure_initialized()
result = self.translator.authenticationDeepLAuthKey(auth_key)
return result
@@ -149,12 +207,14 @@ class Model:
return result
def startLogger(self):
self.ensure_initialized()
os_makedirs(config.PATH_LOGS, exist_ok=True)
file_name = os_path.join(config.PATH_LOGS, f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.log")
self.logger = setupLogger("log", file_name)
self.logger.disabled = False
def stopLogger(self):
self.ensure_initialized()
self.logger.disabled = True
self.logger = None
@@ -195,6 +255,7 @@ class Model:
return compatible_engines
def getTranslate(self, translator_name, source_language, target_language, target_country, message):
self.ensure_initialized()
success_flag = False
translation = self.translator.translate(
translator_name=translator_name,
@@ -224,6 +285,7 @@ class Model:
return translation, success_flag
def getInputTranslate(self, message, source_language=None):
self.ensure_initialized()
translator_name=config.SELECTED_TRANSLATION_ENGINES[config.SELECTED_TAB_NO]
if source_language is None:
source_language=config.SELECTED_YOUR_LANGUAGES[config.SELECTED_TAB_NO]["1"]["language"]
@@ -249,6 +311,7 @@ class Model:
return translations, success_flags
def getOutputTranslate(self, message, source_language=None):
self.ensure_initialized()
translator_name=config.SELECTED_TRANSLATION_ENGINES[config.SELECTED_TAB_NO]
if source_language is None:
source_language=config.SELECTED_TARGET_LANGUAGES[config.SELECTED_TAB_NO]["1"]["language"]
@@ -265,10 +328,12 @@ class Model:
return [translation], [success_flag]
def addKeywords(self):
self.ensure_initialized()
for f in config.MIC_WORD_FILTER:
self.keyword_processor.add_keyword(f)
def checkKeywords(self, message):
self.ensure_initialized()
return len(self.keyword_processor.extract_keywords(message)) != 0
def detectRepeatSendMessage(self, message):
@@ -285,34 +350,63 @@ class Model:
self.previous_receive_message = message
return repeat_flag
def convertMessageToTransliteration(self, message: str) -> str:
data_list = self.kks.convert(message)
keys_to_keep = {"orig", "hira", "hepburn"}
filtered_list = []
for item in data_list:
filtered_item = {key: value for key, value in item.items() if key in keys_to_keep}
filtered_list.append(filtered_item)
def startTransliteration(self):
self.ensure_initialized()
if self.transliterator is None:
self.transliterator = Transliterator()
def stopTransliteration(self):
self.ensure_initialized()
if self.transliterator is not None:
self.transliterator = None
def convertMessageToTransliteration(self, message: str, hiragana: bool=True, romaji: bool=True) -> list:
self.ensure_initialized()
if hiragana is False and romaji is False:
return []
keys_to_keep = {"orig"}
if hiragana:
keys_to_keep.add("hira")
if romaji:
keys_to_keep.add("hepburn")
if self.transliterator is None:
self.startTransliteration()
data_list = self.transliterator.analyze(message, use_macron=False)
filtered_list = [
{key: value for key, value in item.items() if key in keys_to_keep}
for item in data_list
]
return filtered_list
def setOscIpAddress(self, ip_address):
self.ensure_initialized()
self.osc_handler.setOscIpAddress(ip_address)
def setOscPort(self, port):
self.ensure_initialized()
self.osc_handler.setOscPort(port)
def oscStartSendTyping(self):
self.ensure_initialized()
self.osc_handler.sendTyping(flag=True)
def oscStopSendTyping(self):
self.ensure_initialized()
self.osc_handler.sendTyping(flag=False)
def oscSendMessage(self, message:str):
self.ensure_initialized()
self.osc_handler.sendMessage(message=message, notification=config.NOTIFICATION_VRC_SFX)
def setMuteSelfStatus(self):
self.ensure_initialized()
self.mic_mute_status = self.osc_handler.getOSCParameterMuteSelf()
def startReceiveOSC(self):
self.ensure_initialized()
def changeHandlerMute(address, osc_arguments):
if config.ENABLE_TRANSCRIPTION_SEND is True:
if osc_arguments is True and self.mic_mute_status is False:
@@ -329,9 +423,11 @@ class Model:
self.osc_handler.receiveOscParameters()
def stopReceiveOSC(self):
self.ensure_initialized()
self.osc_handler.oscServerStop()
def getIsOscQueryEnabled(self):
self.ensure_initialized()
return self.osc_handler.getIsOscQueryEnabled()
@staticmethod
@@ -396,22 +492,47 @@ class Model:
Popen([program_name, "--cuda"], cwd=current_directory)
def getListMicHost(self):
result = [host for host in device_manager.getMicDevices().keys()]
self.ensure_initialized()
try:
dm = device_manager.getMicDevices()
result = [host for host in dm.keys()]
except Exception:
errorLogging()
result = []
return result
def getMicDefaultDevice(self):
result = device_manager.getMicDevices().get(config.SELECTED_MIC_HOST, [{"name": "NoDevice"}])[0]["name"]
self.ensure_initialized()
try:
dm = device_manager.getMicDevices()
result = dm.get(config.SELECTED_MIC_HOST, [{"name": "NoDevice"}])[0]["name"]
except Exception:
errorLogging()
result = "NoDevice"
return result
def getListMicDevice(self):
result = [device["name"] for device in device_manager.getMicDevices().get(config.SELECTED_MIC_HOST, [{"name": "NoDevice"}])]
self.ensure_initialized()
try:
dm = device_manager.getMicDevices()
result = [device["name"] for device in dm.get(config.SELECTED_MIC_HOST, [{"name": "NoDevice"}])]
except Exception:
errorLogging()
result = ["NoDevice"]
return result
def getListSpeakerDevice(self):
result = [device["name"] for device in device_manager.getSpeakerDevices()]
self.ensure_initialized()
try:
sd = device_manager.getSpeakerDevices()
result = [device["name"] for device in sd]
except Exception:
errorLogging()
result = ["NoDevice"]
return result
def startMicTranscript(self, fnc):
self.ensure_initialized()
mic_host_name = config.SELECTED_MIC_HOST
mic_device_name = config.SELECTED_MIC_DEVICE
@@ -448,6 +569,7 @@ class Model:
whisper_weight_type=config.WHISPER_WEIGHT_TYPE,
device=config.SELECTED_TRANSCRIPTION_COMPUTE_DEVICE["device"],
device_index=config.SELECTED_TRANSCRIPTION_COMPUTE_DEVICE["device_index"],
compute_type=config.SELECTED_TRANSCRIPTION_COMPUTE_TYPE,
)
def sendMicTranscript():
try:
@@ -497,6 +619,7 @@ class Model:
self.changeMicTranscriptStatus()
def resumeMicTranscript(self):
self.ensure_initialized()
# キューをクリア
if isinstance(self.mic_audio_queue, Queue):
while not self.mic_audio_queue.empty():
@@ -511,6 +634,7 @@ class Model:
self.mic_audio_recorder.resume()
def pauseMicTranscript(self):
self.ensure_initialized()
# 文字起こしを一時停止
# if isinstance(self.mic_print_transcript, threadFnc):
# self.mic_print_transcript.pause()
@@ -544,6 +668,7 @@ class Model:
self.resumeMicTranscript()
def stopMicTranscript(self):
self.ensure_initialized()
if isinstance(self.mic_print_transcript, threadFnc):
self.mic_print_transcript.stop()
self.mic_print_transcript.join()
@@ -556,9 +681,11 @@ class Model:
# self.mic_get_energy.stop()
# self.mic_get_energy = None
def startCheckMicEnergy(self, fnc:Callable[[float], None]=None) -> None:
if isinstance(fnc, Callable):
self.check_mic_energy_fnc = fnc
def startCheckMicEnergy(self, fnc:Optional[Callable[[float], None]]=None) -> None:
self.ensure_initialized()
# fnc may be None or a callable. Use cast after checking for None to satisfy type checker.
if fnc is not None:
self.check_mic_energy_fnc = cast(Callable[[float], None], fnc)
mic_host_name = config.SELECTED_MIC_HOST
mic_device_name = config.SELECTED_MIC_DEVICE
@@ -578,7 +705,7 @@ class Model:
errorLogging()
sleep(0.01)
mic_energy_queue = Queue()
mic_energy_queue: Queue = Queue()
mic_device = selected_mic_device[0]
self.mic_energy_recorder = SelectedMicEnergyRecorder(mic_device)
self.mic_energy_recorder.recordIntoQueue(mic_energy_queue)
@@ -587,6 +714,7 @@ class Model:
self.mic_energy_plot_progressbar.start()
def stopCheckMicEnergy(self):
self.ensure_initialized()
if isinstance(self.mic_energy_plot_progressbar, threadFnc):
self.mic_energy_plot_progressbar.stop()
self.mic_energy_plot_progressbar.join()
@@ -596,17 +724,19 @@ class Model:
self.mic_energy_recorder.stop()
self.mic_energy_recorder = None
def startSpeakerTranscript(self, fnc):
def startSpeakerTranscript(self, fnc:Optional[Callable[[dict], None]]=None) -> None:
self.ensure_initialized()
speaker_device_name = config.SELECTED_SPEAKER_DEVICE
speaker_device_list = device_manager.getSpeakerDevices()
selected_speaker_device = [device for device in speaker_device_list if device["name"] == speaker_device_name]
if len(selected_speaker_device) == 0 or speaker_device_name == "NoDevice":
fnc({"text": False, "language": None})
# fnc may be None; only call if callable
if callable(fnc):
fnc({"text": False, "language": None})
else:
speaker_audio_queue = Queue()
# speaker_energy_queue = Queue()
speaker_audio_queue: Queue = Queue()
speaker_device = selected_speaker_device[0]
record_timeout = config.SPEAKER_RECORD_TIMEOUT
phrase_timeout = config.SPEAKER_PHRASE_TIMEOUT
@@ -631,6 +761,7 @@ class Model:
whisper_weight_type=config.WHISPER_WEIGHT_TYPE,
device=config.SELECTED_TRANSCRIPTION_COMPUTE_DEVICE["device"],
device_index=config.SELECTED_TRANSCRIPTION_COMPUTE_DEVICE["device_index"],
compute_type=config.SELECTED_TRANSCRIPTION_COMPUTE_TYPE,
)
def sendSpeakerTranscript():
try:
@@ -678,6 +809,7 @@ class Model:
# self.speaker_get_energy.start()
def stopSpeakerTranscript(self):
self.ensure_initialized()
if isinstance(self.speaker_print_transcript, threadFnc):
self.speaker_print_transcript.stop()
self.speaker_print_transcript.join()
@@ -689,9 +821,11 @@ class Model:
# self.speaker_get_energy.stop()
# self.speaker_get_energy = None
def startCheckSpeakerEnergy(self, fnc:Callable[[float], None]=None) -> None:
if isinstance(fnc, Callable):
self.check_speaker_energy_fnc = fnc
def startCheckSpeakerEnergy(self, fnc:Optional[Callable[[float], None]]=None) -> None:
self.ensure_initialized()
# Accept None as default and assign safely with cast after None-check
if fnc is not None:
self.check_speaker_energy_fnc = cast(Callable[[float], None], fnc)
speaker_device_name = config.SELECTED_SPEAKER_DEVICE
speaker_device_list = device_manager.getSpeakerDevices()
@@ -701,7 +835,7 @@ class Model:
self.check_speaker_energy_fnc(False)
else:
def sendSpeakerEnergy():
if speaker_energy_queue.empty() is False:
if not speaker_energy_queue.empty():
energy = speaker_energy_queue.get()
try:
self.check_speaker_energy_fnc(energy)
@@ -709,7 +843,7 @@ class Model:
errorLogging()
sleep(0.01)
speaker_energy_queue = Queue()
speaker_energy_queue: Queue = Queue()
speaker_device = selected_speaker_device[0]
self.speaker_energy_recorder = SelectedSpeakerEnergyRecorder(speaker_device)
self.speaker_energy_recorder.recordIntoQueue(speaker_energy_queue)
@@ -718,6 +852,7 @@ class Model:
self.speaker_energy_plot_progressbar.start()
def stopCheckSpeakerEnergy(self):
self.ensure_initialized()
if isinstance(self.speaker_energy_plot_progressbar, threadFnc):
self.speaker_energy_plot_progressbar.stop()
self.speaker_energy_plot_progressbar.join()
@@ -727,11 +862,16 @@ class Model:
self.speaker_energy_recorder.stop()
self.speaker_energy_recorder = None
def createOverlayImageSmallLog(self, message:str, your_language:str, translation:list, target_language:dict):
target_language = [data["language"] for data in target_language.values() if data["enable"] is True]
return self.overlay_image.createOverlayImageSmallLog(message, your_language, translation, target_language)
def createOverlayImageSmallLog(self, message:Optional[str], your_language:Optional[str], translation:list, target_language:Optional[dict]) -> object:
self.ensure_initialized()
# target_language may be provided as dict or None
target_language_list = []
if isinstance(target_language, dict):
target_language_list = [data["language"] for data in target_language.values() if data.get("enable") is True]
return self.overlay_image.createOverlayImageSmallLog(message, your_language, translation, target_language_list)
def createOverlayImageSmallMessage(self, message):
self.ensure_initialized()
ui_language = config.UI_LANGUAGE
convert_languages = {
"en": "Default",
@@ -744,12 +884,15 @@ class Model:
return self.overlay_image.createOverlayImageSmallLog(message, language)
def clearOverlayImageSmallLog(self):
self.ensure_initialized()
self.overlay.clearImage("small")
def updateOverlaySmallLog(self, img):
self.ensure_initialized()
self.overlay.updateImage(img, "small")
def updateOverlaySmallLogSettings(self):
self.ensure_initialized()
size = "small"
if (self.overlay.settings[size]["x_pos"] != config.OVERLAY_SMALL_LOG_SETTINGS["x_pos"] or
@@ -778,11 +921,16 @@ class Model:
if (self.overlay.settings[size]["ui_scaling"] != config.OVERLAY_SMALL_LOG_SETTINGS["ui_scaling"]):
self.overlay.updateUiScaling(config.OVERLAY_SMALL_LOG_SETTINGS["ui_scaling"], size)
def createOverlayImageLargeLog(self, message_type:str, message:str, your_language:str, translation:list, target_language:dict):
target_language = [data["language"] for data in target_language.values() if data["enable"] is True]
return self.overlay_image.createOverlayImageLargeLog(message_type, message, your_language, translation, target_language)
def createOverlayImageLargeLog(self, message_type:str, message:Optional[str], your_language:Optional[str], translation:list, target_language:Optional[dict]=None):
self.ensure_initialized()
# normalize target_language dict -> list of language strings
target_language_list = []
if isinstance(target_language, dict):
target_language_list = [data["language"] for data in target_language.values() if data.get("enable") is True]
return self.overlay_image.createOverlayImageLargeLog(message_type, message, your_language, translation, target_language_list)
def createOverlayImageLargeMessage(self, message):
self.ensure_initialized()
ui_language = config.UI_LANGUAGE
convert_languages = {
"en": "Default",
@@ -800,12 +948,15 @@ class Model:
return overlay_image.createOverlayImageLargeLog("send", message, language)
def clearOverlayImageLargeLog(self):
self.ensure_initialized()
self.overlay.clearImage("large")
def updateOverlayLargeLog(self, img):
self.ensure_initialized()
self.overlay.updateImage(img, "large")
def updateOverlayLargeLogSettings(self):
self.ensure_initialized()
size = "large"
if (self.overlay.settings[size]["x_pos"] != config.OVERLAY_LARGE_LOG_SETTINGS["x_pos"] or
self.overlay.settings[size]["y_pos"] != config.OVERLAY_LARGE_LOG_SETTINGS["y_pos"] or
@@ -834,23 +985,29 @@ class Model:
self.overlay.updateUiScaling(config.OVERLAY_LARGE_LOG_SETTINGS["ui_scaling"] * 0.25, size)
def startOverlay(self):
self.ensure_initialized()
self.overlay.startOverlay()
def shutdownOverlay(self):
self.ensure_initialized()
self.overlay.shutdownOverlay()
def startWatchdog(self):
self.ensure_initialized()
self.th_watchdog = threadFnc(self.watchdog.start)
self.th_watchdog.daemon = True
self.th_watchdog.start()
def feedWatchdog(self):
self.ensure_initialized()
self.watchdog.feed()
def setWatchdogCallback(self, callback):
self.ensure_initialized()
self.watchdog.setCallback(callback)
def stopWatchdog(self):
self.ensure_initialized()
if isinstance(self.th_watchdog, threadFnc):
self.th_watchdog.stop()
self.th_watchdog.join()
@@ -862,6 +1019,7 @@ class Model:
def startWebSocketServer(self, host, port):
"""WebSocketサーバーを起動し、別スレッドで実行する"""
self.ensure_initialized()
if self.websocket_server_alive is True:
# サーバーが既に起動している場合は何もしない
return
@@ -900,6 +1058,7 @@ class Model:
def stopWebSocketServer(self):
"""WebSocketサーバーを停止する"""
self.ensure_initialized()
if not hasattr(self, 'th_websocket_server') or self.th_websocket_server is None:
return
@@ -921,6 +1080,7 @@ class Model:
def checkWebSocketServerAlive(self):
"""WebSocketサーバーの稼働状態を確認する"""
self.ensure_initialized()
return self.websocket_server_alive
def websocketSendMessage(self, message_dict:dict):
@@ -929,6 +1089,7 @@ class Model:
:param message_dict: 送信するメッセージの辞書
:return: 送信成功したかどうか
"""
self.ensure_initialized()
if not self.websocket_server_alive or not self.websocket_server:
return False
try: