From d21e0a09fc59f01fd19f53cd82f59fa11bc36693 Mon Sep 17 00:00:00 2001 From: misyaguziya Date: Mon, 16 Sep 2024 23:05:30 +0900 Subject: [PATCH] =?UTF-8?q?=F0=9F=91=8D=EF=B8=8F[Update]=20Model:=20Device?= =?UTF-8?q?=E3=81=AE=E3=82=A2=E3=82=AF=E3=82=BB=E3=82=B9=E5=B9=B2=E6=B8=89?= =?UTF-8?q?=E3=81=AE=E5=9B=9E=E9=81=BF=E5=87=A6=E7=90=86=E3=82=92=E3=81=99?= =?UTF-8?q?=E3=82=8B=E3=81=9F=E3=82=81=E3=81=ABDevice=E3=81=AE=E6=A4=9C?= =?UTF-8?q?=E5=87=BA=E3=82=92=E5=B8=B8=E3=81=AB=E8=A1=8C=E3=81=86=E3=82=88?= =?UTF-8?q?=E3=81=86=E3=81=AB=E5=A4=89=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src-python/config.py | 14 +- src-python/model.py | 67 ++----- .../transcription/transcription_utils.py | 175 ++++++++++++------ 3 files changed, 138 insertions(+), 118 deletions(-) diff --git a/src-python/config.py b/src-python/config.py index d781aacb..18d0e432 100644 --- a/src-python/config.py +++ b/src-python/config.py @@ -5,7 +5,7 @@ from json import load as json_load from json import dump as json_dump import tkinter as tk from tkinter import font -from models.transcription.transcription_utils import getInputDevices, getDefaultInputDevice, getOutputDevices, getDefaultOutputDevice +from models.transcription.transcription_utils import device_manager from models.transcription.transcription_languages import transcription_lang from utils import generatePercentageStringsList, isUniqueStrings @@ -470,7 +470,7 @@ class Config: @CHOICE_MIC_HOST.setter def CHOICE_MIC_HOST(self, value): - if value in [host for host in getInputDevices().keys()]: + if value in [host for host in device_manager.getInputDevices().keys()]: self._CHOICE_MIC_HOST = value saveJson(self.PATH_CONFIG, inspect.currentframe().f_code.co_name, value) @@ -481,7 +481,7 @@ class Config: @CHOICE_MIC_DEVICE.setter def CHOICE_MIC_DEVICE(self, value): - if value in [device["name"] for device in getInputDevices()[self.CHOICE_MIC_HOST]]: + if value in [device["name"] for device in device_manager.getInputDevices()[self.CHOICE_MIC_HOST]]: self._CHOICE_MIC_DEVICE = value saveJson(self.PATH_CONFIG, inspect.currentframe().f_code.co_name, value) @@ -591,7 +591,7 @@ class Config: @CHOICE_SPEAKER_DEVICE.setter def CHOICE_SPEAKER_DEVICE(self, value): - if value in [device["name"] for device in getOutputDevices()]: + if value in [device["name"] for device in device_manager.getOutputDevices()]: self._CHOICE_SPEAKER_DEVICE = value saveJson(self.PATH_CONFIG, inspect.currentframe().f_code.co_name, value) @@ -1117,8 +1117,8 @@ class Config: "height": "654", } self._ENABLE_MIC_AUTOMATIC_SELECTION = True - self._CHOICE_MIC_HOST = getDefaultInputDevice()["host"]["name"] - self._CHOICE_MIC_DEVICE = getDefaultInputDevice()["device"]["name"] + self._CHOICE_MIC_HOST = device_manager.getDefaultInputDevice()["host"]["name"] + self._CHOICE_MIC_DEVICE = device_manager.getDefaultInputDevice()["device"]["name"] self._INPUT_MIC_ENERGY_THRESHOLD = 300 self._INPUT_MIC_DYNAMIC_ENERGY_THRESHOLD = False self._INPUT_MIC_RECORD_TIMEOUT = 3 @@ -1128,7 +1128,7 @@ class Config: self._INPUT_MIC_AVG_LOGPROB=-0.8 self._INPUT_MIC_NO_SPEECH_PROB=0.6 self._ENABLE_SPEAKER_AUTOMATIC_SELECTION = True - self._CHOICE_SPEAKER_DEVICE = getDefaultOutputDevice()["device"]["name"] + self._CHOICE_SPEAKER_DEVICE = device_manager.getDefaultOutputDevice()["device"]["name"] self._INPUT_SPEAKER_ENERGY_THRESHOLD = 300 self._INPUT_SPEAKER_DYNAMIC_ENERGY_THRESHOLD = False self._INPUT_SPEAKER_RECORD_TIMEOUT = 3 diff --git a/src-python/model.py b/src-python/model.py index 6c7b7acf..02aeddc8 100644 --- a/src-python/model.py +++ b/src-python/model.py @@ -17,7 +17,7 @@ from typing import Callable from flashtext import KeywordProcessor from pykakasi import kakasi from models.translation.translation_translator import Translator -from models.transcription.transcription_utils import getInputDevices, getOutputDevices, getDefaultInputDevice, getDefaultOutputDevice +from models.transcription.transcription_utils import device_manager from models.osc.osc_tools import sendTyping, sendMessage, receiveOscParameters, getOSCParameterValue from models.transcription.transcription_recorder import SelectedMicEnergyAndAudioRecorder, SelectedSpeakerEnergyAndAudioRecorder from models.transcription.transcription_recorder import SelectedMicEnergyRecorder, SelectedSpeakerEnergyRecorder @@ -69,7 +69,6 @@ class Model: def init(self): self.logger = None - self.device_access = False self.th_check_device = None self.mic_print_transcript = None self.mic_audio_recorder = None @@ -408,53 +407,33 @@ class Model: command = [os_path.join(current_directory, batch_name), program_name] Popen(command, cwd=current_directory) - def startAccessDevice(self): - while self.device_access is True: - sleep(0.1) - self.device_access = True - - def stopAccessDevice(self): - self.device_access = False - def getListInputHost(self): - self.startAccessDevice() - result = [host for host in getInputDevices().keys()] - self.stopAccessDevice() + result = [host for host in device_manager.getInputDevices().keys()] return result def getInputDefaultDevice(self): - self.startAccessDevice() - result = getInputDevices().get(config.CHOICE_MIC_HOST, [{"name": "NoDevice"}])[0]["name"] - self.stopAccessDevice() + result = device_manager.getInputDevices().get(config.CHOICE_MIC_HOST, [{"name": "NoDevice"}])[0]["name"] return result def getListInputDevice(self): - self.startAccessDevice() - result = [device["name"] for device in getInputDevices().get(config.CHOICE_MIC_HOST, [{"name": "NoDevice"}])] - self.stopAccessDevice() + result = [device["name"] for device in device_manager.getInputDevices().get(config.CHOICE_MIC_HOST, [{"name": "NoDevice"}])] return result def getListOutputDevice(self): - self.startAccessDevice() - result = [device["name"] for device in getOutputDevices()] - self.stopAccessDevice() + result = [device["name"] for device in device_manager.getOutputDevices()] return result def startAutomaticDeviceSelection(self, fnc_mic, fnc_speaker): def checkDevice(fnc_mic, fnc_speaker): if config.ENABLE_MIC_AUTOMATIC_SELECTION is True: - self.startAccessDevice() - default_device = getDefaultInputDevice() - self.stopAccessDevice() + default_device = device_manager.getDefaultInputDevice() mic_host_name = default_device["host"]["name"] mic_device_name = default_device["device"]["name"] if mic_host_name != config.CHOICE_MIC_HOST or mic_device_name != config.CHOICE_MIC_DEVICE: fnc_mic(mic_host_name, mic_device_name) if config.ENABLE_SPEAKER_AUTOMATIC_SELECTION is True: - self.startAccessDevice() - default_device = getDefaultOutputDevice() - self.stopAccessDevice() + default_device = device_manager.getDefaultOutputDevice() speaker_device_name = default_device["device"]["name"] if speaker_device_name != config.CHOICE_SPEAKER_DEVICE: fnc_speaker(speaker_device_name) @@ -473,18 +452,14 @@ class Model: def startMicTranscript(self, fnc): if config.ENABLE_MIC_AUTOMATIC_SELECTION is True: - self.startAccessDevice() - default_device = getDefaultInputDevice() - self.stopAccessDevice() + default_device = device_manager.getDefaultInputDevice() mic_host_name = default_device["host"]["name"] mic_device_name = default_device["device"]["name"] else: mic_host_name = config.CHOICE_MIC_HOST mic_device_name = config.CHOICE_MIC_DEVICE - self.startAccessDevice() - mic_device_list = getInputDevices().get(mic_host_name, [{"name": "NoDevice"}]) - self.stopAccessDevice() + mic_device_list = device_manager.getInputDevices().get(mic_host_name, [{"name": "NoDevice"}]) choice_mic_device = [device for device in mic_device_list if device["name"] == mic_device_name] if len(choice_mic_device) == 0: @@ -611,18 +586,14 @@ class Model: self.check_mic_energy_fnc = fnc if config.ENABLE_MIC_AUTOMATIC_SELECTION is True: - self.startAccessDevice() - default_device = getDefaultInputDevice() - self.stopAccessDevice() + default_device = device_manager.getDefaultInputDevice() mic_host_name = default_device["host"]["name"] mic_device_name = default_device["device"]["name"] else: mic_host_name = config.CHOICE_MIC_HOST mic_device_name = config.CHOICE_MIC_DEVICE - self.startAccessDevice() - mic_device_list = getInputDevices().get(mic_host_name, [{"name": "NoDevice"}]) - self.stopAccessDevice() + mic_device_list = device_manager.getInputDevices().get(mic_host_name, [{"name": "NoDevice"}]) choice_mic_device = [device for device in mic_device_list if device["name"] == mic_device_name] if len(choice_mic_device) == 0: @@ -657,16 +628,12 @@ class Model: def startSpeakerTranscript(self, fnc): if config.ENABLE_SPEAKER_AUTOMATIC_SELECTION is True: - self.startAccessDevice() - default_device = getDefaultOutputDevice() - self.stopAccessDevice() + default_device = device_manager.getDefaultOutputDevice() speaker_device_name = default_device["device"]["name"] else: speaker_device_name = config.CHOICE_SPEAKER_DEVICE - self.startAccessDevice() - speaker_device_list = getOutputDevices() - self.stopAccessDevice() + speaker_device_list = device_manager.getOutputDevices() choice_speaker_device = [device for device in speaker_device_list if device["name"] == speaker_device_name] if len(choice_speaker_device) == 0: @@ -753,16 +720,12 @@ class Model: self.check_speaker_energy_fnc = fnc if config.ENABLE_SPEAKER_AUTOMATIC_SELECTION is True: - self.startAccessDevice() - default_device = getDefaultOutputDevice() - self.stopAccessDevice() + default_device = device_manager.getDefaultOutputDevice() speaker_device_name = default_device["device"]["name"] else: speaker_device_name = config.CHOICE_SPEAKER_DEVICE - self.startAccessDevice() - speaker_device_list = getOutputDevices() - self.stopAccessDevice() + speaker_device_list = device_manager.getOutputDevices() choice_speaker_device = [device for device in speaker_device_list if device["name"] == speaker_device_name] if len(choice_speaker_device) == 0: diff --git a/src-python/models/transcription/transcription_utils.py b/src-python/models/transcription/transcription_utils.py index 4292ba23..2281fa22 100644 --- a/src-python/models/transcription/transcription_utils.py +++ b/src-python/models/transcription/transcription_utils.py @@ -1,70 +1,127 @@ +from time import sleep +from threading import Thread from pyaudiowpatch import PyAudio, paWASAPI -def getInputDevices(): - devices = {} - with PyAudio() as p: - for host_index in range(0, p.get_host_api_count()): - host = p.get_host_api_info_by_index(host_index) - for device_index in range(0, p.get_host_api_info_by_index(host_index)['deviceCount']): - device = p.get_device_info_by_host_api_device_index(host_index, device_index) - if device["maxInputChannels"] > 0 and device["isLoopbackDevice"] is False: - if host["name"] in devices.keys(): - devices[host["name"]].append(device) - else: - devices[host["name"]] = [device] - if len(devices) == 0: - devices = {"NoHost": [{"name": "NoDevice"}]} - return devices +class DeviceManager: + _instance = None -def getDefaultInputDevice(): - with PyAudio() as p: - api_info = p.get_default_host_api_info() - defaultInputDevice = api_info["defaultInputDevice"] + def __new__(cls): + if cls._instance is None: + cls._instance = super(DeviceManager, cls).__new__(cls) + cls._instance.init() + return cls._instance - for host_index in range(0, p.get_host_api_count()): - host = p.get_host_api_info_by_index(host_index) - for device_index in range(0, p.get_host_api_info_by_index(host_index)['deviceCount']): - device = p.get_device_info_by_host_api_device_index(host_index, device_index) - if device["index"] == defaultInputDevice: - return {"host": host, "device": device} - return {"host": {"name": "NoHost"}, "device": {"name": "NoDevice"}} + def init(self): + self.input_devices = {"NoHost": [{"name": "NoDevice"}]} + self.default_input_device = {"host": {"name": "NoHost"}, "device": {"name": "NoDevice"}} + self.output_devices = [{"name": "NoDevice"}] + self.default_output_device = {"device": {"name": "NoDevice"}} + self.update() -def getOutputDevices(): - devices = [] - with PyAudio() as p: - wasapi_info = p.get_host_api_info_by_type(paWASAPI) - for host_index in range(0, p.get_host_api_count()): - host = p.get_host_api_info_by_index(host_index) - if host["name"] == wasapi_info["name"]: - for device_index in range(0, p.get_host_api_info_by_index(host_index)['deviceCount']): + self.monitoring_flag = True + self.th_monitoring = Thread(target=self.startMonitoring) + self.th_monitoring.daemon = True + self.th_monitoring.start() + + def update(self): + buffer_input_devices = {} + buffer_default_input_device = {"host": {"name": "NoHost"}, "device": {"name": "NoDevice"}} + buffer_output_devices = [] + buffer_default_output_device = {"device": {"name": "NoDevice"}} + + with PyAudio() as p: + for host_index in range(p.get_host_api_count()): + host = p.get_host_api_info_by_index(host_index) + device_count = host.get('deviceCount', 0) + for device_index in range(device_count): device = p.get_device_info_by_host_api_device_index(host_index, device_index) - if not device["isLoopbackDevice"]: - for loopback in p.get_loopback_device_info_generator(): - if device["name"] in loopback["name"]: - devices.append(loopback) + if device.get("maxInputChannels", 0) > 0 and not device.get("isLoopbackDevice", True): + buffer_input_devices.setdefault(host["name"], []).append(device) + if not buffer_input_devices: + buffer_input_devices = {"NoHost": [{"name": "NoDevice"}]} - if len(devices) == 0: - devices = [{"name": "NoDevice"}] - else: - devices = [dict(t) for t in {tuple(d.items()) for d in devices}] - return devices + api_info = p.get_default_host_api_info() + default_input_device = api_info["defaultInputDevice"] -def getDefaultOutputDevice(): - with PyAudio() as p: - wasapi_info = p.get_host_api_info_by_type(paWASAPI) - defaultOutputDevice = wasapi_info["defaultOutputDevice"] + for host_index in range(p.get_host_api_count()): + host = p.get_host_api_info_by_index(host_index) + device_count = host.get('deviceCount', 0) + for device_index in range(device_count): + device = p.get_device_info_by_host_api_device_index(host_index, device_index) + if device["index"] == default_input_device: + buffer_default_input_device = {"host": host, "device": device} + break + else: + continue + break - for host_index in range(0, p.get_host_api_count()): - for device_index in range(0, p. get_host_api_info_by_index(host_index)['deviceCount']): - device = p.get_device_info_by_host_api_device_index(host_index, device_index) - if device["index"] == defaultOutputDevice: - default_speakers = device - if not default_speakers["isLoopbackDevice"]: - for loopback in p.get_loopback_device_info_generator(): - if default_speakers["name"] in loopback["name"]: - return {"device": loopback} - return {"device": {"name": "NoDevice"}} + output_devices = [] + wasapi_info = p.get_host_api_info_by_type(paWASAPI) + wasapi_name = wasapi_info["name"] + for host_index in range(p.get_host_api_count()): + host = p.get_host_api_info_by_index(host_index) + if host["name"] == wasapi_name: + device_count = host.get('deviceCount', 0) + for device_index in range(device_count): + device = p.get_device_info_by_host_api_device_index(host_index, device_index) + if not device.get("isLoopbackDevice", True): + for loopback in p.get_loopback_device_info_generator(): + if device["name"] in loopback["name"]: + output_devices.append(loopback) + output_devices = [dict(t) for t in {tuple(d.items()) for d in output_devices}] or [{"name": "NoDevice"}] + buffer_output_devices = sorted(output_devices, key=lambda d: d['index']) + + wasapi_info = p.get_host_api_info_by_type(paWASAPI) + default_output_device_index = wasapi_info["defaultOutputDevice"] + + for host_index in range(p.get_host_api_count()): + host_info = p.get_host_api_info_by_index(host_index) + device_count = host_info.get('deviceCount', 0) + for device_index in range(0, device_count): + device = p.get_device_info_by_host_api_device_index(host_index, device_index) + if device["index"] == default_output_device_index: + default_speakers = device + if not default_speakers.get("isLoopbackDevice", True): + for loopback in p.get_loopback_device_info_generator(): + if default_speakers["name"] in loopback["name"]: + buffer_default_output_device = {"device": loopback} + break + break + + if buffer_default_output_device["device"]["name"] != "NoDevice": + break + + self.input_devices = buffer_input_devices + self.default_input_device = buffer_default_input_device + self.output_devices = buffer_output_devices + self.default_output_device = buffer_default_output_device + + def startMonitoring(self): + self.monitoring_flag = True + while self.monitoring_flag is True: + self.update() + sleep(1) + + def stopMonitoring(self): + self.monitoring_flag = False + self.th_monitoring.join() + + def getInputDevices(self): + return self.input_devices + + def getDefaultInputDevice(self): + return self.default_input_device + + def getOutputDevices(self): + return self.output_devices + + def getDefaultOutputDevice(self): + return self.default_output_device + +device_manager = DeviceManager() if __name__ == "__main__": - print("getOutputDevices()", getOutputDevices()) - print("getDefaultOutputDevice()", getDefaultOutputDevice()) \ No newline at end of file + print("getInputDevices()", device_manager.getInputDevices()) + print("getDefaultInputDevice()", device_manager.getDefaultInputDevice()) + print("getOutputDevices()", device_manager.getOutputDevices()) + print("getDefaultOutputDevice()", device_manager.getDefaultOutputDevice()) \ No newline at end of file