Merge branch 'run_mapping' into for_webui

This commit is contained in:
misyaguziya
2024-09-25 16:13:37 +09:00
7 changed files with 419 additions and 419 deletions

View File

@@ -103,8 +103,8 @@ class Config:
return self._MAX_MIC_THRESHOLD
@property
def MAX_SPEAKER_ENERGY_THRESHOLD(self):
return self._MAX_SPEAKER_ENERGY_THRESHOLD
def MAX_SPEAKER_THRESHOLD(self):
return self._MAX_SPEAKER_THRESHOLD
# Read Write
@property
@@ -459,7 +459,7 @@ class Config:
@SELECTED_MIC_HOST.setter
def SELECTED_MIC_HOST(self, value):
if value in [host for host in device_manager.getInputDevices().keys()]:
if value in [host for host in device_manager.getMicDevices().keys()]:
self._SELECTED_MIC_HOST = value
saveJson(self.PATH_CONFIG, inspect.currentframe().f_code.co_name, value)
@@ -470,7 +470,7 @@ class Config:
@SELECTED_MIC_DEVICE.setter
def SELECTED_MIC_DEVICE(self, value):
if value in [device["name"] for device in device_manager.getInputDevices()[self.SELECTED_MIC_HOST]]:
if value in [device["name"] for device in device_manager.getMicDevices()[self.SELECTED_MIC_HOST]]:
self._SELECTED_MIC_DEVICE = value
saveJson(self.PATH_CONFIG, inspect.currentframe().f_code.co_name, value)
@@ -580,19 +580,19 @@ class Config:
@SELECTED_SPEAKER_DEVICE.setter
def SELECTED_SPEAKER_DEVICE(self, value):
if value in [device["name"] for device in device_manager.getOutputDevices()]:
if value in [device["name"] for device in device_manager.getSpeakerDevices()]:
self._SELECTED_SPEAKER_DEVICE = value
saveJson(self.PATH_CONFIG, inspect.currentframe().f_code.co_name, value)
@property
@json_serializable('SPEAKER_ENERGY_THRESHOLD')
def SPEAKER_ENERGY_THRESHOLD(self):
return self._SPEAKER_ENERGY_THRESHOLD
@json_serializable('SPEAKER_THRESHOLD')
def SPEAKER_THRESHOLD(self):
return self._SPEAKER_THRESHOLD
@SPEAKER_ENERGY_THRESHOLD.setter
def SPEAKER_ENERGY_THRESHOLD(self, value):
@SPEAKER_THRESHOLD.setter
def SPEAKER_THRESHOLD(self, value):
if isinstance(value, int):
self._SPEAKER_ENERGY_THRESHOLD = value
self._SPEAKER_THRESHOLD = value
saveJson(self.PATH_CONFIG, inspect.currentframe().f_code.co_name, value)
@property
@@ -991,7 +991,7 @@ class Config:
}
self._MAX_MIC_THRESHOLD = 2000
self._MAX_SPEAKER_ENERGY_THRESHOLD = 4000
self._MAX_SPEAKER_THRESHOLD = 4000
# Read Write
self._ENABLE_TRANSLATION = False
@@ -1101,8 +1101,8 @@ class Config:
"height": "654",
}
self._AUTO_MIC_SELECT = True
self._SELECTED_MIC_HOST = device_manager.getDefaultInputDevice()["host"]["name"]
self._SELECTED_MIC_DEVICE = device_manager.getDefaultInputDevice()["device"]["name"]
self._SELECTED_MIC_HOST = device_manager.getDefaultMicDevice()["host"]["name"]
self._SELECTED_MIC_DEVICE = device_manager.getDefaultMicDevice()["device"]["name"]
self._MIC_THRESHOLD = 300
self._MIC_AUTOMATIC_THRESHOLD = False
self._MIC_RECORD_TIMEOUT = 3
@@ -1112,8 +1112,8 @@ class Config:
self._MIC_AVG_LOGPROB=-0.8
self._MIC_NO_SPEECH_PROB=0.6
self._AUTO_SPEAKER_SELECT = True
self._SELECTED_SPEAKER_DEVICE = device_manager.getDefaultOutputDevice()["device"]["name"]
self._SPEAKER_ENERGY_THRESHOLD = 300
self._SELECTED_SPEAKER_DEVICE = device_manager.getDefaultSpeakerDevice()["device"]["name"]
self._SPEAKER_THRESHOLD = 300
self._SPEAKER_AUTOMATIC_THRESHOLD = False
self._SPEAKER_RECORD_TIMEOUT = 3
self._SPEAKER_PHRASE_TIMEOUT = 3

View File

@@ -407,32 +407,32 @@ class Model:
command = [os_path.join(current_directory, batch_name), program_name]
Popen(command, cwd=current_directory)
def getListInputHost(self):
result = [host for host in device_manager.getInputDevices().keys()]
def getListMicHost(self):
result = [host for host in device_manager.getMicDevices().keys()]
return result
def getInputDefaultDevice(self):
result = device_manager.getInputDevices().get(config.SELECTED_MIC_HOST, [{"name": "NoDevice"}])[0]["name"]
def getMicDefaultDevice(self):
result = device_manager.getMicDevices().get(config.SELECTED_MIC_HOST, [{"name": "NoDevice"}])[0]["name"]
return result
def getListInputDevice(self):
result = [device["name"] for device in device_manager.getInputDevices().get(config.SELECTED_MIC_HOST, [{"name": "NoDevice"}])]
def getListMicDevice(self):
result = [device["name"] for device in device_manager.getMicDevices().get(config.SELECTED_MIC_HOST, [{"name": "NoDevice"}])]
return result
def getListOutputDevice(self):
result = [device["name"] for device in device_manager.getOutputDevices()]
def getListSpeakerDevice(self):
result = [device["name"] for device in device_manager.getSpeakerDevices()]
return result
def startMicTranscript(self, fnc):
if config.AUTO_MIC_SELECT is True:
default_device = device_manager.getDefaultInputDevice()
default_device = device_manager.getDefaultMicDevice()
mic_host_name = default_device["host"]["name"]
mic_device_name = default_device["device"]["name"]
else:
mic_host_name = config.SELECTED_MIC_HOST
mic_device_name = config.SELECTED_MIC_DEVICE
mic_device_list = device_manager.getInputDevices().get(mic_host_name, [{"name": "NoDevice"}])
mic_device_list = device_manager.getMicDevices().get(mic_host_name, [{"name": "NoDevice"}])
selected_mic_device = [device for device in mic_device_list if device["name"] == mic_device_name]
if len(selected_mic_device) == 0:
@@ -559,14 +559,14 @@ class Model:
self.check_mic_energy_fnc = fnc
if config.AUTO_MIC_SELECT is True:
default_device = device_manager.getDefaultInputDevice()
default_device = device_manager.getDefaultMicDevice()
mic_host_name = default_device["host"]["name"]
mic_device_name = default_device["device"]["name"]
else:
mic_host_name = config.SELECTED_MIC_HOST
mic_device_name = config.SELECTED_MIC_DEVICE
mic_device_list = device_manager.getInputDevices().get(mic_host_name, [{"name": "NoDevice"}])
mic_device_list = device_manager.getMicDevices().get(mic_host_name, [{"name": "NoDevice"}])
selected_mic_device = [device for device in mic_device_list if device["name"] == mic_device_name]
if len(selected_mic_device) == 0:
@@ -601,12 +601,12 @@ class Model:
def startSpeakerTranscript(self, fnc):
if config.AUTO_SPEAKER_SELECT is True:
default_device = device_manager.getDefaultOutputDevice()
default_device = device_manager.getDefaultSpeakerDevice()
speaker_device_name = default_device["device"]["name"]
else:
speaker_device_name = config.SELECTED_SPEAKER_DEVICE
speaker_device_list = device_manager.getOutputDevices()
speaker_device_list = device_manager.getSpeakerDevices()
selected_speaker_device = [device for device in speaker_device_list if device["name"] == speaker_device_name]
if len(selected_speaker_device) == 0:
@@ -622,7 +622,7 @@ class Model:
self.speaker_audio_recorder = SelectedSpeakerEnergyAndAudioRecorder(
device=speaker_device,
energy_threshold=config.SPEAKER_ENERGY_THRESHOLD,
energy_threshold=config.SPEAKER_THRESHOLD,
dynamic_energy_threshold=config.SPEAKER_AUTOMATIC_THRESHOLD,
record_timeout=record_timeout,
)
@@ -693,12 +693,12 @@ class Model:
self.check_speaker_energy_fnc = fnc
if config.AUTO_SPEAKER_SELECT is True:
default_device = device_manager.getDefaultOutputDevice()
default_device = device_manager.getDefaultSpeakerDevice()
speaker_device_name = default_device["device"]["name"]
else:
speaker_device_name = config.SELECTED_SPEAKER_DEVICE
speaker_device_list = device_manager.getOutputDevices()
speaker_device_list = device_manager.getSpeakerDevices()
selected_speaker_device = [device for device in speaker_device_list if device["name"] == speaker_device_name]
if len(selected_speaker_device) == 0:

View File

@@ -29,23 +29,26 @@ class DeviceManager:
return cls._instance
def init(self):
self.input_devices = {"NoHost": [{"name": "NoDevice"}]}
self.default_input_device = {"host": {"name": "NoHost"}, "device": {"name": "NoDevice"}}
self.output_devices = [{"name": "NoDevice"}]
self.default_output_device = {"device": {"name": "NoDevice"}}
self.mic_devices = {"NoHost": [{"name": "NoDevice"}]}
self.default_mic_device = {"host": {"name": "NoHost"}, "device": {"name": "NoDevice"}}
self.speaker_devices = [{"name": "NoDevice"}]
self.default_speaker_device = {"device": {"name": "NoDevice"}}
self.update()
self.callback_default_input_device = None
self.callback_default_output_device = None
self.callback_default_mic_device = None
self.callback_default_speaker_device = None
self.callback_host_list = None
self.callback_mic_device_list = None
self.callback_speaker_device_list = None
self.monitoring_flag = False
self.startMonitoring()
def update(self):
buffer_input_devices = {}
buffer_default_input_device = {"host": {"name": "NoHost"}, "device": {"name": "NoDevice"}}
buffer_output_devices = []
buffer_default_output_device = {"device": {"name": "NoDevice"}}
buffer_mic_devices = {}
buffer_default_mic_device = {"host": {"name": "NoHost"}, "device": {"name": "NoDevice"}}
buffer_speaker_devices = []
buffer_default_speaker_device = {"device": {"name": "NoDevice"}}
with PyAudio() as p:
for host_index in range(p.get_host_api_count()):
@@ -54,26 +57,26 @@ class DeviceManager:
for device_index in range(device_count):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device.get("maxInputChannels", 0) > 0 and not device.get("isLoopbackDevice", True):
buffer_input_devices.setdefault(host["name"], []).append(device)
if not buffer_input_devices:
buffer_input_devices = {"NoHost": [{"name": "NoDevice"}]}
buffer_mic_devices.setdefault(host["name"], []).append(device)
if not buffer_mic_devices:
buffer_mic_devices = {"NoHost": [{"name": "NoDevice"}]}
api_info = p.get_default_host_api_info()
default_input_device = api_info["defaultInputDevice"]
default_mic_device = api_info["defaultInputDevice"]
for host_index in range(p.get_host_api_count()):
host = p.get_host_api_info_by_index(host_index)
device_count = host.get('deviceCount', 0)
for device_index in range(device_count):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device["index"] == default_input_device:
buffer_default_input_device = {"host": host, "device": device}
if device["index"] == default_mic_device:
buffer_default_mic_device = {"host": host, "device": device}
break
else:
continue
break
output_devices = []
speaker_devices = []
wasapi_info = p.get_host_api_info_by_type(paWASAPI)
wasapi_name = wasapi_info["name"]
for host_index in range(p.get_host_api_count()):
@@ -85,34 +88,34 @@ class DeviceManager:
if not device.get("isLoopbackDevice", True):
for loopback in p.get_loopback_device_info_generator():
if device["name"] in loopback["name"]:
output_devices.append(loopback)
output_devices = [dict(t) for t in {tuple(d.items()) for d in output_devices}] or [{"name": "NoDevice"}]
buffer_output_devices = sorted(output_devices, key=lambda d: d['index'])
speaker_devices.append(loopback)
speaker_devices = [dict(t) for t in {tuple(d.items()) for d in speaker_devices}] or [{"name": "NoDevice"}]
buffer_speaker_devices = sorted(speaker_devices, key=lambda d: d['index'])
wasapi_info = p.get_host_api_info_by_type(paWASAPI)
default_output_device_index = wasapi_info["defaultOutputDevice"]
default_speaker_device_index = wasapi_info["defaultOutputDevice"]
for host_index in range(p.get_host_api_count()):
host_info = p.get_host_api_info_by_index(host_index)
device_count = host_info.get('deviceCount', 0)
for device_index in range(0, device_count):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device["index"] == default_output_device_index:
if device["index"] == default_speaker_device_index:
default_speakers = device
if not default_speakers.get("isLoopbackDevice", True):
for loopback in p.get_loopback_device_info_generator():
if default_speakers["name"] in loopback["name"]:
buffer_default_output_device = {"device": loopback}
buffer_default_speaker_device = {"device": loopback}
break
break
if buffer_default_output_device["device"]["name"] != "NoDevice":
if buffer_default_speaker_device["device"]["name"] != "NoDevice":
break
self.input_devices = buffer_input_devices
self.default_input_device = buffer_default_input_device
self.output_devices = buffer_output_devices
self.default_output_device = buffer_default_output_device
self.mic_devices = buffer_mic_devices
self.default_mic_device = buffer_default_mic_device
self.speaker_devices = buffer_speaker_devices
self.default_speaker_device = buffer_default_speaker_device
def monitoring(self):
comtypes.CoInitialize()
@@ -144,43 +147,67 @@ class DeviceManager:
self.monitoring_flag = False
self.th_monitoring.join()
def setCallbackDefaultInputDevice(self, callback):
self.callback_default_input_device = callback
def setCallbackDefaultMicDevice(self, callback):
self.callback_default_mic_device = callback
def clearCallbackDefaultInputDevice(self):
self.callback_default_input_device = None
def clearCallbackDefaultMicDevice(self):
self.callback_default_mic_device = None
def setCallbackDefaultOutputDevice(self, callback):
self.callback_default_output_device = callback
def setCallbackDefaultSpeakerDevice(self, callback):
self.callback_default_speaker_device = callback
def clearCallbackDefaultOutputDevice(self):
self.callback_default_output_device = None
def clearCallbackDefaultSpeakerDevice(self):
self.callback_default_speaker_device = None
def setCallbackHostList(self, callback):
self.callback_host_list = callback
def clearCallbackHostList(self):
self.callback_host_list = None
def setCallbackMicDeviceList(self, callback):
self.callback_mic_device_list = callback
def clearCallbackMicDeviceList(self):
self.callback_mic_device_list = None
def setCallbackSpeakerDeviceList(self, callback):
self.callback_speaker_device_list = callback
def clearCallbackSpeakerDeviceList(self):
self.callback_speaker_device_list = None
def noticeDefaultDevice(self):
if self.callback_default_input_device is not None:
self.callback_default_input_device(self.default_input_device["host"]["name"], self.default_input_device["device"]["name"])
if self.callback_default_output_device is not None:
self.callback_default_output_device(self.default_output_device["device"]["name"])
if self.callback_default_mic_device is not None:
self.callback_default_mic_device(self.default_mic_device["host"]["name"], self.default_mic_device["device"]["name"])
if self.callback_default_speaker_device is not None:
self.callback_default_speaker_device(self.default_speaker_device["device"]["name"])
if self.callback_host_list is not None:
self.callback_host_list()
if self.callback_mic_device_list is not None:
self.callback_mic_device_list()
if self.callback_speaker_device_list is not None:
self.callback_speaker_device_list()
def getInputDevices(self):
return self.input_devices
def getMicDevices(self):
return self.mic_devices
def getDefaultInputDevice(self):
return self.default_input_device
def getDefaultMicDevice(self):
return self.default_mic_device
def getOutputDevices(self):
return self.output_devices
def getSpeakerDevices(self):
return self.speaker_devices
def getDefaultOutputDevice(self):
return self.default_output_device
def getDefaultSpeakerDevice(self):
return self.default_speaker_device
device_manager = DeviceManager()
if __name__ == "__main__":
# print("getInputDevices()", device_manager.getInputDevices())
# print("getDefaultInputDevice()", device_manager.getDefaultInputDevice())
# print("getOutputDevices()", device_manager.getOutputDevices())
# print("getDefaultOutputDevice()", device_manager.getDefaultOutputDevice())
# print("getMicDevices()", device_manager.getMicDevices())
# print("getDefaultMicDevice()", device_manager.getDefaultMicDevice())
# print("getSpeakerDevices()", device_manager.getSpeakerDevices())
# print("getDefaultSpeakerDevice()", device_manager.getDefaultSpeakerDevice())
while True:
sleep(1)

View File

@@ -20,313 +20,290 @@ class Controller:
self.run = run
# response functions
class DownloadSoftwareProgressBar:
def __init__(self, parent) -> None:
self.parent = parent
def downloadSoftwareProgressBar(self, progress) -> None:
self.run(
200,
self.run_mapping["download_software"],
progress,
)
def set(self, progress) -> None:
self.parent.run(
def updateSoftwareProgressBar(self, progress) -> None:
self.run(
200,
self.run_mapping["update_software"],
progress,
)
def updateMicHostList(self) -> None:
self.run(
200,
self.run_mapping["mic_host_list"],
model.getListMicHost(),
)
def updateMicDeviceList(self) -> None:
self.run(
200,
self.run_mapping["mic_device_list"],
model.getListMicDevice(),
)
def updateSpeakerDeviceList(self) -> None:
self.run(
200,
self.run_mapping["speaker_device_list"],
model.getListSpeakerDevice(),
)
def updateSelectedMicDevice(self, host, device) -> None:
config.SELECTED_MIC_HOST = host
config.SELECTED_MIC_DEVICE = device
self.run(
200,
self.run_mapping["selected_mic_device"],
{"host":host, "device":device},
)
def updateSelectedSpeakerDevice(self, device) -> None:
config.SELECTED_SPEAKER_DEVICE = device
self.run(
200,
self.run_mapping["selected_speaker_device"],
device,
)
def progressBarMicEnergy(self, energy) -> None:
if energy is False:
self.run(
400,
self.run_mapping["error_device"],
{"message":"No mic device detected."},
)
else:
self.run(
200,
self.parent.run_mapping["download_software"],
progress,
self.run_mapping["check_mic_volume"],
energy,
)
class UpdateSoftwareProgressBar:
def __init__(self, parent) -> None:
self.parent = parent
def set(self, progress) -> None:
self.parent.run(
def progressBarSpeakerEnergy(self, energy) -> None:
if energy is False:
self.run(
400,
self.run_mapping["error_device"],
{"message":"No mic device detected."},
)
else:
self.run(
200,
self.parent.run_mapping["update_software"],
progress,
self.run_mapping["check_speaker_volume"],
energy,
)
class UpdateSelectedMicDevice:
def __init__(self, parent) -> None:
self.parent = parent
def downloadCTranslate2ProgressBar(self, progress) -> None:
printLog("CTranslate2 Weight Download Progress", progress)
self.run(
200,
self.run_mapping["download_ctranslate2"],
progress,
)
def set(self, host, device) -> None:
config.SELECTED_MIC_HOST = host
config.SELECTED_MIC_DEVICE = device
self.parent.run(
200,
self.parent.run_mapping["selected_mic_device"],
{"host":host, "device":device},
def downloadWhisperProgressBar(self, progress) -> None:
printLog("Whisper Weight Download Progress", progress)
self.run(
200,
self.run_mapping["download_whisper"],
progress,
)
def micMessage(self, message: Union[str, bool]) -> None:
if isinstance(message, bool) and message is False:
self.run(
400,
self.run_mapping["error_device"],
{"message":"No mic device detected."},
)
class UpdateSelectedSpeakerDevice:
def __init__(self, parent) -> None:
self.parent = parent
def set(self, device) -> None:
config.SELECTED_SPEAKER_DEVICE = device
self.parent.run(
200,
self.parent.run_mapping["selected_speaker_device"],
device,
)
class ProgressBarMicEnergy:
def __init__(self, parent) -> None:
self.parent = parent
def set(self, energy) -> None:
if energy is False:
self.parent.run(
400,
self.parent.run_mapping["error_device"],
{"message":"No mic device detected."},
)
else:
self.parent.run(
elif isinstance(message, str) and len(message) > 0:
# addSentMessageLog(message)
translation = []
transliteration = []
if model.checkKeywords(message):
self.run(
200,
self.parent.run_mapping["check_mic_volume"],
energy,
)
class ProgressBarSpeakerEnergy:
def __init__(self, parent) -> None:
self.parent = parent
def set(self, energy) -> None:
if energy is False:
self.parent.run(
400,
self.parent.run_mapping["error_device"],
{"message":"No mic device detected."},
self.run_mapping["word_filter"],
{"message":f"Detected by word filter:{message}"},
)
return
elif model.detectRepeatSendMessage(message):
return
elif config.ENABLE_TRANSLATION is False:
pass
else:
self.parent.run(
200,
self.parent.run_mapping["check_speaker_volume"],
energy,
)
class DownloadCTranslate2ProgressBar:
def __init__(self, parent) -> None:
self.parent = parent
def set(self, progress) -> None:
printLog("CTranslate2 Weight Download Progress", progress)
self.parent.run(
200,
self.parent.run_mapping["download_ctranslate2"],
progress,
)
class DownloadWhisperProgressBar:
def __init__(self, parent) -> None:
self.parent = parent
def set(self, progress) -> None:
printLog("Whisper Weight Download Progress", progress)
self.parent.run(
200,
self.parent.run_mapping["download_whisper"],
progress,
)
class MicMessage:
def __init__(self, parent) -> None:
self.parent = parent
def send(self, message: Union[str, bool]) -> None:
if isinstance(message, bool) and message is False:
self.parent.run(
400,
self.parent.run_mapping["error_device"],
{"message":"No mic device detected."},
)
elif isinstance(message, str) and len(message) > 0:
# addSentMessageLog(message)
translation = []
transliteration = []
if model.checkKeywords(message):
self.parent.run(
200,
self.parent.run_mapping["word_filter"],
{"message":f"Detected by word filter:{message}"},
translation, success = model.getInputTranslate(message)
if all(success) is not True:
self.changeToCTranslate2Process()
self.run(
400,
self.run_mapping["error_translation_engine"],
{"message":"translation engine limit error"},
)
return
elif model.detectRepeatSendMessage(message):
return
elif config.ENABLE_TRANSLATION is False:
pass
else:
translation, success = model.getInputTranslate(message)
if all(success) is not True:
self.parent.changeToCTranslate2Process()
self.parent.run(
400,
self.parent.run_mapping["error_translation_engine"],
{"message":"translation engine limit error"},
)
if config.CONVERT_MESSAGE_TO_ROMAJI is True or config.CONVERT_MESSAGE_TO_HIRAGANA is True:
if config.SELECTED_TARGET_LANGUAGES[config.SELECTED_TAB_NO]["primary"]["language"] == "Japanese":
transliteration = model.convertMessageToTransliteration(translation[0])
if config.CONVERT_MESSAGE_TO_ROMAJI is True or config.CONVERT_MESSAGE_TO_HIRAGANA is True:
if config.SELECTED_TARGET_LANGUAGES[config.SELECTED_TAB_NO]["primary"]["language"] == "Japanese":
transliteration = model.convertMessageToTransliteration(translation[0])
if config.ENABLE_TRANSCRIPTION_SEND is True:
if config.SEND_MESSAGE_TO_VRC is True:
if config.SEND_ONLY_TRANSLATED_MESSAGES is True:
if config.ENABLE_TRANSLATION is False:
osc_message = self.parent.messageFormatter("SEND", "", [message])
else:
osc_message = self.parent.messageFormatter("SEND", "", translation)
else:
osc_message = self.parent.messageFormatter("SEND", translation, [message])
model.oscSendMessage(osc_message)
self.parent.run(
200,
self.parent.run_mapping["transcription_mic"],
{
"message":message,
"translation":translation,
"transliteration":transliteration
})
if config.LOGGER_FEATURE is True:
if len(translation) > 0:
translation = " (" + "/".join(translation) + ")"
model.logger.info(f"[SENT] {message}{translation}")
# if config.OVERLAY_SMALL_LOG is True:
# overlay_image = model.createOverlayImageShort(message, translation)
# model.updateOverlay(overlay_image)
# overlay_image = model.createOverlayImageLong("send", message, translation)
# model.updateOverlay(overlay_image)
class SpeakerMessage:
def __init__(self, parent) -> None:
self.parent = parent
def receive(self, message):
if isinstance(message, bool) and message is False:
self.parent.run(
400,
self.parent.run_mapping["error_device"],
{"message":"No mic device detected."},
)
elif isinstance(message, str) and len(message) > 0:
translation = []
transliteration = []
if model.detectRepeatReceiveMessage(message):
return
elif config.ENABLE_TRANSLATION is False:
pass
else:
translation, success = model.getOutputTranslate(message)
if all(success) is not True:
self.parent.changeToCTranslate2Process()
self.parent.run(
400,
self.parent.run_mapping["error_translation_engine"],
{"message":"translation engine limit error"},
)
if config.CONVERT_MESSAGE_TO_ROMAJI is True or config.CONVERT_MESSAGE_TO_HIRAGANA is True:
if config.SELECTED_TARGET_LANGUAGES[config.SELECTED_TAB_NO]["primary"]["language"] == "Japanese":
transliteration = model.convertMessageToTransliteration(message)
if config.ENABLE_TRANSCRIPTION_RECEIVE is True:
if config.OVERLAY_SMALL_LOG is True:
if model.overlay.initialized is True:
overlay_image = model.createOverlayImageShort(message, translation)
model.updateOverlay(overlay_image)
# overlay_image = model.createOverlayImageLong("receive", message, translation)
# model.updateOverlay(overlay_image)
# ------------Speaker2Chatbox------------
if config.ENABLE_SPEAKER2CHATBOX is True:
# send OSC message
if config.SEND_RECEIVED_MESSAGE_TO_VRC is True:
osc_message = self.parent.messageFormatter("RECEIVED", translation, [message])
model.oscSendMessage(osc_message)
# ------------Speaker2Chatbox------------
# update textbox message log (Received)
self.parent.run(
200,
self.parent.run_mapping["speaker"],
{
"message":message,
"translation":translation,
"transliteration":transliteration,
})
if config.LOGGER_FEATURE is True:
if len(translation) > 0:
translation = " (" + "/".join(translation) + ")"
model.logger.info(f"[RECEIVED] {message}{translation}")
class ChatMessage:
def __init__(self, parent) -> None:
self.parent = parent
def send(self, data):
id = data["id"]
message = data["message"]
if len(message) > 0:
# addSentMessageLog(message)
translation = []
transliteration = []
if config.ENABLE_TRANSLATION is False:
pass
else:
if config.USE_EXCLUDE_WORDS is True:
replacement_message, replacement_dict = self.parent.replaceExclamationsWithRandom(message)
translation, success = model.getInputTranslate(replacement_message)
message = self.parent.removeExclamations(message)
for i in range(len(translation)):
translation[i] = self.parent.restoreText(translation[i], replacement_dict)
else:
translation, success = model.getInputTranslate(message)
if all(success) is not True:
self.parent.changeToCTranslate2Process()
self.parent.run(
400,
self.parent.run_mapping["error_translation_engine"],
{"message":"translation engine limit error"},
)
if config.CONVERT_MESSAGE_TO_ROMAJI is True or config.CONVERT_MESSAGE_TO_HIRAGANA is True:
if config.SELECTED_TARGET_LANGUAGES[config.SELECTED_TAB_NO]["primary"]["language"] == "Japanese":
transliteration = model.convertMessageToTransliteration(translation[0])
# send OSC message
if config.ENABLE_TRANSCRIPTION_SEND is True:
if config.SEND_MESSAGE_TO_VRC is True:
if config.SEND_ONLY_TRANSLATED_MESSAGES is True:
if config.ENABLE_TRANSLATION is False:
osc_message = self.parent.messageFormatter("SEND", "", [message])
osc_message = self.messageFormatter("SEND", "", [message])
else:
osc_message = self.parent.messageFormatter("SEND", "", translation)
osc_message = self.messageFormatter("SEND", "", translation)
else:
osc_message = self.parent.messageFormatter("SEND", translation, [message])
osc_message = self.messageFormatter("SEND", translation, [message])
model.oscSendMessage(osc_message)
self.run(
200,
self.run_mapping["transcription_mic"],
{
"message":message,
"translation":translation,
"transliteration":transliteration
})
if config.LOGGER_FEATURE is True:
if len(translation) > 0:
translation = " (" + "/".join(translation) + ")"
model.logger.info(f"[SENT] {message}{translation}")
# if config.OVERLAY_SMALL_LOG is True:
# overlay_image = model.createOverlayImageShort(message, translation)
# model.updateOverlay(overlay_image)
# overlay_image = model.createOverlayImageLong("send", message, translation)
# model.updateOverlay(overlay_image)
# update textbox message log (Sent)
if config.LOGGER_FEATURE is True:
if len(translation) > 0:
translation_text = " (" + "/".join(translation) + ")"
model.logger.info(f"[SENT] {message}{translation_text}")
def speakerMessage(self, message) -> None:
if isinstance(message, bool) and message is False:
self.run(
400,
self.run_mapping["error_device"],
{"message":"No mic device detected."},
)
elif isinstance(message, str) and len(message) > 0:
translation = []
transliteration = []
if model.detectRepeatReceiveMessage(message):
return
elif config.ENABLE_TRANSLATION is False:
pass
else:
translation, success = model.getOutputTranslate(message)
if all(success) is not True:
self.changeToCTranslate2Process()
self.run(
400,
self.run_mapping["error_translation_engine"],
{"message":"translation engine limit error"},
)
return {"status":200,
"result":{
"id":id,
if config.CONVERT_MESSAGE_TO_ROMAJI is True or config.CONVERT_MESSAGE_TO_HIRAGANA is True:
if config.SELECTED_TARGET_LANGUAGES[config.SELECTED_TAB_NO]["primary"]["language"] == "Japanese":
transliteration = model.convertMessageToTransliteration(message)
if config.ENABLE_TRANSCRIPTION_RECEIVE is True:
if config.OVERLAY_SMALL_LOG is True:
if model.overlay.initialized is True:
overlay_image = model.createOverlayImageShort(message, translation)
model.updateOverlay(overlay_image)
# overlay_image = model.createOverlayImageLong("receive", message, translation)
# model.updateOverlay(overlay_image)
# ------------Speaker2Chatbox------------
if config.ENABLE_SPEAKER2CHATBOX is True:
# send OSC message
if config.SEND_RECEIVED_MESSAGE_TO_VRC is True:
osc_message = self.messageFormatter("RECEIVED", translation, [message])
model.oscSendMessage(osc_message)
# ------------Speaker2Chatbox------------
# update textbox message log (Received)
self.run(
200,
self.run_mapping["speaker"],
{
"message":message,
"translation":translation,
"transliteration":transliteration,
},
}
})
if config.LOGGER_FEATURE is True:
if len(translation) > 0:
translation = " (" + "/".join(translation) + ")"
model.logger.info(f"[RECEIVED] {message}{translation}")
def chatMessage(self, data) -> None:
id = data["id"]
message = data["message"]
if len(message) > 0:
# addSentMessageLog(message)
translation = []
transliteration = []
if config.ENABLE_TRANSLATION is False:
pass
else:
if config.USE_EXCLUDE_WORDS is True:
replacement_message, replacement_dict = self.replaceExclamationsWithRandom(message)
translation, success = model.getInputTranslate(replacement_message)
message = self.removeExclamations(message)
for i in range(len(translation)):
translation[i] = self.restoreText(translation[i], replacement_dict)
else:
translation, success = model.getInputTranslate(message)
if all(success) is not True:
self.changeToCTranslate2Process()
self.run(
400,
self.run_mapping["error_translation_engine"],
{"message":"translation engine limit error"},
)
if config.CONVERT_MESSAGE_TO_ROMAJI is True or config.CONVERT_MESSAGE_TO_HIRAGANA is True:
if config.SELECTED_TARGET_LANGUAGES[config.SELECTED_TAB_NO]["primary"]["language"] == "Japanese":
transliteration = model.convertMessageToTransliteration(translation[0])
# send OSC message
if config.SEND_MESSAGE_TO_VRC is True:
if config.SEND_ONLY_TRANSLATED_MESSAGES is True:
if config.ENABLE_TRANSLATION is False:
osc_message = self.messageFormatter("SEND", "", [message])
else:
osc_message = self.messageFormatter("SEND", "", translation)
else:
osc_message = self.messageFormatter("SEND", translation, [message])
model.oscSendMessage(osc_message)
# if config.OVERLAY_SMALL_LOG is True:
# overlay_image = model.createOverlayImageShort(message, translation)
# model.updateOverlay(overlay_image)
# overlay_image = model.createOverlayImageLong("send", message, translation)
# model.updateOverlay(overlay_image)
# update textbox message log (Sent)
if config.LOGGER_FEATURE is True:
if len(translation) > 0:
translation_text = " (" + "/".join(translation) + ")"
model.logger.info(f"[SENT] {message}{translation_text}")
return {"status":200,
"result":{
"id":id,
"message":message,
"translation":translation,
"transliteration":transliteration,
},
}
@staticmethod
def getVersion(*args, **kwargs) -> dict:
@@ -361,12 +338,12 @@ class Controller:
return {"status":200, "result":config.SELECTABLE_WHISPER_WEIGHT_TYPE_DICT}
@staticmethod
def getMaxMicEnergyThreshold(*args, **kwargs) -> dict:
def getMaxMicThreshold(*args, **kwargs) -> dict:
return {"status":200, "result":config.MAX_MIC_THRESHOLD}
@staticmethod
def getMaxSpeakerEnergyThreshold(*args, **kwargs) -> dict:
return {"status":200, "result":config.MAX_SPEAKER_ENERGY_THRESHOLD}
def getMaxSpeakerThreshold(*args, **kwargs) -> dict:
return {"status":200, "result":config.MAX_SPEAKER_THRESHOLD}
@staticmethod
def setEnableTranslation(*args, **kwargs) -> dict:
@@ -434,15 +411,15 @@ class Controller:
@staticmethod
def getMicHostList(*args, **kwargs) -> dict:
return {"status":200, "result": model.getListInputHost()}
return {"status":200, "result": model.getListMicHost()}
@staticmethod
def getMicDeviceList(*args, **kwargs) -> dict:
return {"status":200, "result": model.getListInputDevice()}
return {"status":200, "result": model.getListMicDevice()}
@staticmethod
def getSpeakerDeviceList(*args, **kwargs) -> dict:
return {"status":200, "result": model.getListOutputDevice()}
return {"status":200, "result": model.getListSpeakerDevice()}
@staticmethod
def getSelectedTranslationEngines(*args, **kwargs) -> dict:
@@ -624,14 +601,13 @@ class Controller:
def setEnableAutoMicSelect(self, *args, **kwargs) -> dict:
config.AUTO_MIC_SELECT = True
update_device = self.UpdateSelectedMicDevice(self)
device_manager.setCallbackDefaultInputDevice(update_device.set)
device_manager.setCallbackDefaultMicDevice(self.updateSelectedMicDevice)
device_manager.noticeDefaultDevice()
return {"status":200, "result":config.AUTO_MIC_SELECT}
@staticmethod
def setDisableAutoMicSelect(*args, **kwargs) -> dict:
device_manager.clearCallbackDefaultInputDevice()
device_manager.clearCallbackDefaultMicDevice()
config.AUTO_MIC_SELECT = False
return {"status":200, "result":config.AUTO_MIC_SELECT}
@@ -642,7 +618,7 @@ class Controller:
@staticmethod
def setSelectedMicHost(data, *args, **kwargs) -> dict:
config.SELECTED_MIC_HOST = data
config.SELECTED_MIC_DEVICE = model.getInputDefaultDevice()
config.SELECTED_MIC_DEVICE = model.getMicDefaultDevice()
if config.ENABLE_CHECK_ENERGY_SEND is True:
model.stopCheckMicEnergy()
model.startCheckMicEnergy()
@@ -807,14 +783,13 @@ class Controller:
def setEnableAutoSpeakerSelect(self, *args, **kwargs) -> dict:
config.AUTO_SPEAKER_SELECT = True
update_device = self.UpdateSelectedSpeakerDevice(self)
device_manager.setCallbackDefaultOutputDevice(update_device.set)
device_manager.setCallbackDefaultSpeakerDevice(self.updateSelectedSpeakerDevice)
device_manager.noticeDefaultDevice()
return {"status":200, "result":config.AUTO_SPEAKER_SELECT}
@staticmethod
def setDisableAutoSpeakerSelect(*args, **kwargs) -> dict:
device_manager.clearCallbackDefaultInputDevice()
device_manager.clearCallbackDefaultSpeakerDevice()
config.AUTO_SPEAKER_SELECT = False
return {"status":200, "result":config.AUTO_SPEAKER_SELECT}
@@ -831,22 +806,22 @@ class Controller:
return {"status":200, "result":config.SELECTED_SPEAKER_DEVICE}
@staticmethod
def getSpeakerEnergyThreshold(*args, **kwargs) -> dict:
return {"status":200, "result":config.SPEAKER_ENERGY_THRESHOLD}
def getSpeakerThreshold(*args, **kwargs) -> dict:
return {"status":200, "result":config.SPEAKER_THRESHOLD}
@staticmethod
def setSpeakerEnergyThreshold(data, *args, **kwargs) -> dict:
def setSpeakerThreshold(data, *args, **kwargs) -> dict:
printLog("Set Speaker Energy Threshold", data)
try:
data = int(data)
if 0 <= data <= config.MAX_SPEAKER_ENERGY_THRESHOLD:
config.SPEAKER_ENERGY_THRESHOLD = data
if 0 <= data <= config.MAX_SPEAKER_THRESHOLD:
config.SPEAKER_THRESHOLD = data
else:
raise ValueError()
except Exception:
response = {"status":400, "result":{"message":"Error Set Speaker Energy Threshold"}}
else:
response = {"status":200, "result":config.SPEAKER_ENERGY_THRESHOLD}
response = {"status":200, "result":config.SPEAKER_THRESHOLD}
return response
@staticmethod
@@ -1285,9 +1260,8 @@ class Controller:
return {"status":200, "result":config.VRC_MIC_MUTE_SYNC}
def setEnableCheckSpeakerThreshold(self, *args, **kwargs) -> dict:
progressbar_speaker_energy = self.ProgressBarSpeakerEnergy(self)
model.startCheckSpeakerEnergy(
progressbar_speaker_energy.set,
self.progressBarSpeakerEnergy,
)
config.ENABLE_CHECK_ENERGY_RECEIVE = True
return {"status":200, "result":config.ENABLE_CHECK_ENERGY_RECEIVE}
@@ -1299,9 +1273,8 @@ class Controller:
return {"status":200, "result":config.ENABLE_CHECK_ENERGY_RECEIVE}
def setEnableCheckMicThreshold(self, *args, **kwargs) -> dict:
progressbar_mic_energy = self.ProgressBarMicEnergy(self)
model.startCheckMicEnergy(
progressbar_mic_energy.set,
self.progressBarMicEnergy,
)
config.ENABLE_CHECK_ENERGY_SEND = True
return {"status":200, "result":config.ENABLE_CHECK_ENERGY_SEND}
@@ -1314,9 +1287,7 @@ class Controller:
# def updateSoftware(*args, **kwargs) -> dict:
# printLog("Update callbackUpdateSoftware")
# download = DownloadSoftwareProgressBar(self)
# update = UpdateSoftwareProgressBar(self)
# model.updateSoftware(restart=True, download=download.set, update=update.set)
# model.updateSoftware(restart=True, download=self.downloadSoftwareProgressBar, update=self.updateSoftwareProgressBar)
# return {"status":200, "result":True}
# def restartSoftware(*args, **kwargs) -> dict:
@@ -1358,8 +1329,7 @@ class Controller:
return {"status":200, "result":config.ENABLE_TRANSCRIPTION_RECEIVE}
def sendMessageBox(self, data, *args, **kwargs) -> dict:
chat = self.ChatMessage(self)
response = chat.send(data)
response = self.chatMessage(data)
return response
@staticmethod
@@ -1395,13 +1365,11 @@ class Controller:
}
def downloadCtranslate2Weight(self, *args, **kwargs) -> dict:
download = self.DownloadCTranslate2ProgressBar(self)
self.startThreadingDownloadCtranslate2Weight(download.set)
self.startThreadingDownloadCtranslate2Weight(self.downloadCTranslate2ProgressBar)
return {"status":200}
def downloadWhisperWeight(self, *args, **kwargs) -> dict:
download = self.DownloadWhisperProgressBar(self)
self.startThreadingDownloadWhisperWeight(download.set)
self.startThreadingDownloadWhisperWeight(self.downloadWhisperProgressBar)
return {"status":200}
@staticmethod
@@ -1422,13 +1390,12 @@ class Controller:
osc_message = FORMAT.replace("[message]", "/".join(message))
return osc_message
@staticmethod
def changeToCTranslate2Process() -> None:
def changeToCTranslate2Process(self) -> None:
config.SELECTED_TRANSLATION_ENGINES[config.SELECTED_TAB_NO] = "CTranslate2"
self.run(200, self.run_mapping["translation_engines"], "CTranslate2")
def startTranscriptionSendMessage(self) -> None:
mic_message = self.MicMessage(self)
model.startMicTranscript(mic_message.send)
model.startMicTranscript(self.micMessage)
@staticmethod
def stopTranscriptionSendMessage() -> None:
@@ -1464,8 +1431,7 @@ class Controller:
th_stopTranscriptionSendMessage.start()
def startTranscriptionReceiveMessage(self) -> None:
speaker_message = self.SpeakerMessage(self)
model.startSpeakerTranscript(speaker_message.receive)
model.startSpeakerTranscript(self.speakerMessage)
@staticmethod
def stopTranscriptionReceiveMessage() -> None:
@@ -1546,6 +1512,8 @@ class Controller:
if engine not in engines:
engine = engines[0]
config.SELECTED_TRANSLATION_ENGINES[config.SELECTED_TAB_NO] = engine
self.run(200, self.run_mapping["selected_translation_engine"], engine)
self.run(200, self.run_mapping["translation_engines"], engines)
@staticmethod
def startThreadingDownloadCtranslate2Weight(callback:Callable[[float], None]) -> None:
@@ -1577,8 +1545,7 @@ class Controller:
# check Downloaded CTranslate2 Model Weight
printLog("Check Downloaded CTranslate2 Model Weight")
if config.USE_TRANSLATION_FEATURE is True and model.checkCTranslatorCTranslate2ModelWeight() is False:
download = self.DownloadCTranslate2ProgressBar(self)
self.startThreadingDownloadCtranslate2Weight(download.set)
self.startThreadingDownloadCtranslate2Weight(self.downloadCTranslate2ProgressBar)
# set Transcription Engine
printLog("Set Transcription Engine")
@@ -1590,8 +1557,7 @@ class Controller:
# check Downloaded Whisper Model Weight
printLog("Check Downloaded Whisper Model Weight")
if config.USE_WHISPER_FEATURE is True and model.checkTranscriptionWhisperModelWeight() is False:
download = self.DownloadWhisperProgressBar(self)
self.startThreadingDownloadWhisperWeight(download.set)
self.startThreadingDownloadWhisperWeight(self.downloadWhisperProgressBar)
# set word filter
printLog("Set Word Filter")
@@ -1616,11 +1582,13 @@ class Controller:
# init Auto device selection
printLog("Init Auto Device Selection")
if config.AUTO_MIC_SELECT is True:
update_mic_device = self.UpdateSelectedMicDevice(self)
device_manager.setCallbackDefaultInputDevice(update_mic_device.set)
device_manager.setCallbackDefaultMicDevice(self.updateSelectedMicDevice)
if config.AUTO_SPEAKER_SELECT is True:
update_speaker_device = self.UpdateSelectedSpeakerDevice(self)
device_manager.setCallbackDefaultOutputDevice(update_speaker_device.set)
device_manager.setCallbackDefaultSpeakerDevice(self.updateSelectedSpeakerDevice)
device_manager.setCallbackHostList(self.updateMicHostList)
device_manager.setCallbackMicDeviceList(self.updateMicDeviceList)
device_manager.setCallbackSpeakerDeviceList(self.updateSpeakerDeviceList)
printLog("End Initialization")

View File

@@ -26,8 +26,15 @@ run_mapping = {
"download_ctranslate2":"/run/download_ctranslate2_weight",
"download_whisper":"/run/download_whisper_weight",
"selected_mic_device":"/set/data/selected_mic_host",
"selected_speaker_device":"/set/data/selected_speaker_device",
"selected_mic_device":"/run/selected_mic_device",
"selected_speaker_device":"/run/selected_speaker_device",
"selected_translation_engine":"/run/selected_translation_engine",
"translation_engines":"/run/translation_engines",
"mic_host_list":"/run/mic_host_list",
"mic_device_list":"/run/mic_device_list",
"speaker_device_list":"/run/speaker_device_list",
}
controller.setRunMapping(run_mapping)
@@ -151,8 +158,8 @@ mapping = {
"/get/data/mic_device_list": {"status": True, "variable":controller.getMicDeviceList},
"/get/data/speaker_device_list": {"status": True, "variable":controller.getSpeakerDeviceList},
"/get/data/max_mic_energy_threshold": {"status": True, "variable":controller.getMaxMicEnergyThreshold},
"/get/data/max_speaker_energy_threshold": {"status": True, "variable":controller.getMaxSpeakerEnergyThreshold},
"/get/data/max_mic_threshold": {"status": True, "variable":controller.getMaxMicThreshold},
"/get/data/max_speaker_threshold": {"status": True, "variable":controller.getMaxSpeakerThreshold},
"/get/data/auto_mic_select": {"status": True, "variable":controller.getAutoMicSelect},
"/set/enable/auto_mic_select": {"status": True, "variable":controller.setEnableAutoMicSelect},
@@ -200,8 +207,8 @@ mapping = {
"/get/data/selected_speaker_device": {"status": True, "variable":controller.getSelectedSpeakerDevice},
"/set/data/selected_speaker_device": {"status": True, "variable":controller.setSelectedSpeakerDevice},
"/get/data/speaker_energy_threshold": {"status": True, "variable":controller.getSpeakerEnergyThreshold},
"/set/data/speaker_energy_threshold": {"status": True, "variable":controller.setSpeakerEnergyThreshold},
"/get/data/speaker_threshold": {"status": True, "variable":controller.getSpeakerThreshold},
"/set/data/speaker_threshold": {"status": True, "variable":controller.setSpeakerThreshold},
"/get/data/speaker_automatic_threshold": {"status": True, "variable":controller.getSpeakerAutomaticThreshold},
"/set/enable/speaker_automatic_threshold": {"status": True, "variable":controller.setEnableSpeakerAutomaticThreshold},
@@ -508,7 +515,7 @@ if __name__ == "__main__":
data = 1
case "/set/data/mic_phrase_timeout":
data = 5
case "/set/input_set_mic_max_phrases":
case "/set/data/mic_max_phrases":
data = 5
case "/set//data/mic_word_filter":
data = "test0, test1, test2"
@@ -516,7 +523,7 @@ if __name__ == "__main__":
data = "test1"
case "/set/data/selected_speaker_device":
data = "スピーカー (Realtek High Definition Audio)"
case "/set/data/speaker_energy_threshold":
case "/set/data/speaker_threshold":
data = 0.5
case "/set/data/speaker_record_timeout":
data = 5
@@ -564,8 +571,6 @@ if __name__ == "__main__":
data = 0.5
case "/set/data/mic_avg_logprob":
data = 0.5
case "/set/data/mic_max_phrases":
data = 5
case _:
data = None

View File

@@ -7,11 +7,11 @@ export const useSpeakerThreshold = () => {
const { updateEnableAutomaticSpeakerThreshold, currentEnableAutomaticSpeakerThreshold, pendingEnableAutomaticSpeakerThreshold } = useStore_EnableAutomaticSpeakerThreshold();
const getSpeakerThreshold = () => {
asyncStdoutToPython("/get/data/speaker_energy_threshold");
asyncStdoutToPython("/get/data/speaker_threshold");
};
const setSpeakerThreshold = (speaker_threshold) => {
asyncStdoutToPython("/set/data/speaker_energy_threshold", speaker_threshold);
asyncStdoutToPython("/set/data/speaker_threshold", speaker_threshold);
};
const getEnableAutomaticSpeakerThreshold = () => {

View File

@@ -155,8 +155,8 @@ export const useReceiveRoutes = () => {
"/get/data/mic_threshold": updateMicThreshold,
"/set/data/mic_threshold": updateMicThreshold,
"/get/data/speaker_energy_threshold": updateSpeakerThreshold,
"/set/data/speaker_energy_threshold": updateSpeakerThreshold,
"/get/data/speaker_threshold": updateSpeakerThreshold,
"/set/data/speaker_threshold": updateSpeakerThreshold,
"/get/data/mic_automatic_threshold": updateEnableAutomaticMicThreshold,
"/set/enable/mic_automatic_threshold": updateEnableAutomaticMicThreshold,