From fac33e06ba5562306fbd569dfcaec92b6c8baad9 Mon Sep 17 00:00:00 2001 From: misyaguziya <53165965+misyaguziya@users.noreply.github.com> Date: Thu, 15 May 2025 11:46:13 +0900 Subject: [PATCH 1/5] [Add] websocket server --- src-python/models/websocket/__init__.py | 4 + src-python/models/websocket/model_part.py | 103 ++++++++++++++++++ .../models/websocket/websocket_server.py | 102 +++++++++++++++++ 3 files changed, 209 insertions(+) create mode 100644 src-python/models/websocket/__init__.py create mode 100644 src-python/models/websocket/model_part.py create mode 100644 src-python/models/websocket/websocket_server.py diff --git a/src-python/models/websocket/__init__.py b/src-python/models/websocket/__init__.py new file mode 100644 index 00000000..a01d3ba6 --- /dev/null +++ b/src-python/models/websocket/__init__.py @@ -0,0 +1,4 @@ +# WebSocketサーバーモジュール +from .websocket_server import WebSocketServer + +__all__ = ["WebSocketServer"] diff --git a/src-python/models/websocket/model_part.py b/src-python/models/websocket/model_part.py new file mode 100644 index 00000000..47758bf0 --- /dev/null +++ b/src-python/models/websocket/model_part.py @@ -0,0 +1,103 @@ +import copy +import gc +from subprocess import Popen +from os import makedirs as os_makedirs +from os import path as os_path +from datetime import datetime +from time import sleep +from queue import Queue +from threading import Thread +from requests import get as requests_get +from typing import Callable +from packaging.version import parse + +from flashtext import KeywordProcessor +from pykakasi import kakasi + +from device_manager import device_manager +from config import config + +from models.translation.translation_translator import Translator +from models.osc.osc import OSCHandler +from models.transcription.transcription_recorder import SelectedMicEnergyAndAudioRecorder, SelectedSpeakerEnergyAndAudioRecorder +from models.transcription.transcription_recorder import SelectedMicEnergyRecorder, SelectedSpeakerEnergyRecorder +from models.transcription.transcription_transcriber import AudioTranscriber +from models.translation.translation_languages import translation_lang +from models.transcription.transcription_languages import transcription_lang +from models.translation.translation_utils import checkCTranslate2Weight, downloadCTranslate2Weight, downloadCTranslate2Tokenizer +from models.transcription.transcription_whisper import checkWhisperWeight, downloadWhisperWeight +from models.overlay.overlay import Overlay +from models.overlay.overlay_image import OverlayImage +from models.watchdog.watchdog import Watchdog +from models.websocket.websocket_server import WebSocketServer +from utils import errorLogging, setupLogger + +class threadFnc(Thread): + def __init__(self, fnc, end_fnc=None, daemon=True, *args, **kwargs): + super(threadFnc, self).__init__(daemon=daemon, target=fnc, *args, **kwargs) + self.fnc = fnc + self.end_fnc = end_fnc + self.loop = True + self._pause = False + + def stop(self): + self.loop = False + + def pause(self): + self._pause = True + + def resume(self): + self._pause = False + + def run(self): + while self.loop: + self.fnc(*self._args, **self._kwargs) + while self._pause: + sleep(0.1) + + if callable(self.end_fnc): + self.end_fnc() + return + +class Model: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(Model, cls).__new__(cls) + cls._instance.init() + return cls._instance + + def init(self): + self.logger = None + self.th_check_device = None + self.mic_print_transcript = None + self.mic_audio_recorder = None + self.mic_transcriber = None + self.mic_energy_recorder = None + self.mic_energy_plot_progressbar = None + self.speaker_print_transcript = None + self.speaker_audio_recorder = None + self.speaker_transcriber = None + self.speaker_energy_recorder = None + self.speaker_energy_plot_progressbar = None + + self.previous_send_message = "" + self.previous_receive_message = "" + self.translator = Translator() + self.keyword_processor = KeywordProcessor() + overlay_small_log_settings = copy.deepcopy(config.OVERLAY_SMALL_LOG_SETTINGS) + overlay_large_log_settings = copy.deepcopy(config.OVERLAY_LARGE_LOG_SETTINGS) + overlay_large_log_settings["ui_scaling"] = overlay_large_log_settings["ui_scaling"] * 0.25 + overlay_settings = { + "small": overlay_small_log_settings, + "large": overlay_large_log_settings, + } + self.overlay = Overlay(overlay_settings) + self.overlay_image = OverlayImage() + self.mic_audio_queue = None + self.mic_mute_status = None + self.kks = kakasi() + self.watchdog = Watchdog(config.WATCHDOG_TIMEOUT, config.WATCHDOG_INTERVAL) + self.osc_handler = OSCHandler(config.OSC_IP_ADDRESS, config.OSC_PORT) + self.websocket_server = WebSocketServer(host="0.0.0.0", port=8765) diff --git a/src-python/models/websocket/websocket_server.py b/src-python/models/websocket/websocket_server.py new file mode 100644 index 00000000..73ee3787 --- /dev/null +++ b/src-python/models/websocket/websocket_server.py @@ -0,0 +1,102 @@ +import asyncio +import json +import logging +from typing import Dict, Set, Optional +import websockets +from websockets.server import WebSocketServerProtocol + +class WebSocketServer: + def __init__(self, host: str = "0.0.0.0", port: int = 8765): + self.host = host + self.port = port + self.clients: Set[WebSocketServerProtocol] = set() + self.server = None + self.is_running = False + self.logger = logging.getLogger('websocket_server') + + async def register(self, websocket: WebSocketServerProtocol): + """クライアント接続を登録する""" + self.clients.add(websocket) + self.logger.info(f"クライアント接続: {websocket.remote_address}, 現在の接続数: {len(self.clients)}") + + async def unregister(self, websocket: WebSocketServerProtocol): + """クライアント接続を解除する""" + self.clients.remove(websocket) + self.logger.info(f"クライアント切断: {websocket.remote_address}, 現在の接続数: {len(self.clients)}") + + async def handler(self, websocket: WebSocketServerProtocol, path: str): + """WebSocket接続ハンドラー""" + await self.register(websocket) + try: + async for message in websocket: + # クライアントからのメッセージを処理(必要に応じて) + # 現在はクライアントからのメッセージは特に処理していません + self.logger.debug(f"クライアントからのメッセージ: {message}") + except websockets.exceptions.ConnectionClosed: + pass + finally: + await self.unregister(websocket) + + async def broadcast(self, message: Dict): + """全クライアントにメッセージをブロードキャストする""" + if not self.clients: + return + + json_message = json.dumps(message) + tasks = [client.send(json_message) for client in self.clients] + await asyncio.gather(*tasks, return_exceptions=True) + + def send_transcription(self, transcription_data: Dict): + """文字起こし結果を送信する関数(通常のコードから呼び出し可能)""" + if not self.is_running or not self.clients: + return + + # asyncioのイベントループがあれば利用し、なければ新たに作成 + loop = asyncio.get_event_loop() if asyncio.get_event_loop().is_running() else asyncio.new_event_loop() + + # ブロードキャスト関数を実行 + asyncio.run_coroutine_threadsafe( + self.broadcast(transcription_data), + loop + ) + + async def start_server(self): + """WebSocketサーバーを起動する""" + if not self.is_running: + self.server = await websockets.serve(self.handler, self.host, self.port) + self.is_running = True + self.logger.info(f"WebSocketサーバーを起動しました - {self.host}:{self.port}") + return True + return False + + def start(self): + """非同期ループでWebSocketサーバーを起動する(通常のコードから呼び出し可能)""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.start_server()) + # バックグラウンドでループを実行 + self._task = asyncio.run_coroutine_threadsafe(self._run_forever(), loop) + + async def _run_forever(self): + """サーバーを永続的に実行する""" + while self.is_running: + await asyncio.sleep(0.1) + + async def stop_server(self): + """WebSocketサーバーを停止する""" + if self.is_running and self.server: + self.server.close() + await self.server.wait_closed() + self.is_running = False + self.clients.clear() + self.logger.info("WebSocketサーバーを停止しました") + return True + return False + + def stop(self): + """WebSocketサーバーを停止する(通常のコードから呼び出し可能)""" + if not self.is_running: + return + + loop = asyncio.get_event_loop() + asyncio.run_coroutine_threadsafe(self.stop_server(), loop) From d940097e44991a05822d271098de66f22cec4a98 Mon Sep 17 00:00:00 2001 From: misyaguziya <53165965+misyaguziya@users.noreply.github.com> Date: Sat, 17 May 2025 09:30:17 +0900 Subject: [PATCH 2/5] =?UTF-8?q?=F0=9F=91=8D=EF=B8=8F[Update]=20Model=20:?= =?UTF-8?q?=20WebSocket=20module?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | 1 + requirements_cuda.txt | 1 + src-python/config.py | 34 ++++++ src-python/controller.py | 112 ++++++++++++------ src-python/mainloop.py | 9 ++ src-python/model.py | 45 +++++++ src-python/models/websocket/model_part.py | 103 ---------------- .../models/websocket/websocket_server.py | 31 +++-- 8 files changed, 188 insertions(+), 148 deletions(-) delete mode 100644 src-python/models/websocket/model_part.py diff --git a/requirements.txt b/requirements.txt index b384b8a0..d4115f63 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,6 +15,7 @@ pydub==0.25.1 psutil==5.9.8 pykakasi==2.3.0 pycaw==20240210 +websockets==15.0.1 translators @ git+https://github.com/misyaguziya/translators@5.9.2.1 SpeechRecognition @ git+https://github.com/misyaguziya/custom_speech_recognition@3.10.4.1 tinyoscquery @ git+https://github.com/cyberkitsune/tinyoscquery@0.1.3 \ No newline at end of file diff --git a/requirements_cuda.txt b/requirements_cuda.txt index 8642db8b..0f259323 100644 --- a/requirements_cuda.txt +++ b/requirements_cuda.txt @@ -16,6 +16,7 @@ pydub==0.25.1 psutil==5.9.8 pykakasi==2.3.0 pycaw==20240210 +websockets==15.0.1 translators @ git+https://github.com/misyaguziya/translators@5.9.2.1 SpeechRecognition @ git+https://github.com/misyaguziya/custom_speech_recognition@3.10.4.1 tinyoscquery @ git+https://github.com/cyberkitsune/tinyoscquery@0.1.3 \ No newline at end of file diff --git a/src-python/config.py b/src-python/config.py index 175ffa0f..7910400b 100644 --- a/src-python/config.py +++ b/src-python/config.py @@ -954,6 +954,37 @@ class Config: self._NOTIFICATION_VRC_SFX = value self.saveConfig(inspect.currentframe().f_code.co_name, value) + @property + def WEBSOCKET_SERVER(self): + return self._WEBSOCKET_SERVER + + @WEBSOCKET_SERVER.setter + def WEBSOCKET_SERVER(self, value): + if isinstance(value, bool): + self._WEBSOCKET_SERVER = value + self.saveConfig(inspect.currentframe().f_code.co_name, value) + + @property + def WEBSOCKET_HOST(self): + return self._WEBSOCKET_HOST + + @WEBSOCKET_HOST.setter + def WEBSOCKET_HOST(self, value): + if isinstance(value, str): + self._WEBSOCKET_HOST = value + self.saveConfig(inspect.currentframe().f_code.co_name, value) + + @property + def WEBSOCKET_PORT(self): + return self._WEBSOCKET_PORT + + @WEBSOCKET_PORT.setter + def WEBSOCKET_PORT(self, value): + if isinstance(value, int): + self._WEBSOCKET_PORT = value + self.saveConfig(inspect.currentframe().f_code.co_name, value) + + def init_config(self): # Read Only self._VERSION = "3.1.1" @@ -1139,6 +1170,9 @@ class Config: self._LOGGER_FEATURE = False self._VRC_MIC_MUTE_SYNC = False self._NOTIFICATION_VRC_SFX = True + self._WEBSOCKET_SERVER = True + self._WEBSOCKET_HOST = "0.0.0.0" + self._WEBSOCKET_PORT = 8765 def load_config(self): if os_path.isfile(self.PATH_CONFIG) is not False: diff --git a/src-python/controller.py b/src-python/controller.py index cdc1b037..398e3e37 100644 --- a/src-python/controller.py +++ b/src-python/controller.py @@ -294,6 +294,17 @@ class Controller: "translation":translation, "transliteration":transliteration }) + + if config.WEBSOCKET_SERVER is True: + model.websocketSendMessage( + { + "type":"SEND", + "message":message, + "translation":translation, + "transliteration":transliteration + } + ) + if config.LOGGER_FEATURE is True: if len(translation) > 0: translation = " (" + "/".join(translation) + ")" @@ -377,6 +388,17 @@ class Controller: "translation":translation, "transliteration":transliteration, }) + + if config.WEBSOCKET_SERVER is True: + model.websocketSendMessage( + { + "type":"RECEIVED", + "message":message, + "translation":translation, + "transliteration":transliteration + } + ) + if config.LOGGER_FEATURE is True: if len(translation) > 0: translation = " (" + "/".join(translation) + ")" @@ -434,6 +456,16 @@ class Controller: overlay_image = model.createOverlayImageLargeLog("send", message, translation[0] if len(translation) > 0 else "") model.updateOverlayLargeLog(overlay_image) + if config.WEBSOCKET_SERVER is True: + model.websocketSendMessage( + { + "type":"CHAT", + "message":message, + "translation":translation, + "transliteration":transliteration + } + ) + # update textbox message log (Sent) if config.LOGGER_FEATURE is True: if len(translation) > 0: @@ -1778,6 +1810,46 @@ class Controller: model.stopWatchdog() return {"status":200, "result":True} + @staticmethod + def getWebSocketHost(*args, **kwargs) -> dict: + return {"status":200, "result":config.WEBSOCKET_HOST} + + @staticmethod + def setWebSocketHost(data, *args, **kwargs) -> dict: + config.WEBSOCKET_HOST = data + if model.checkWebSocketServer() is True: + model.stopWebSocketServer() + model.startWebSocketServer() + return {"status":200, "result":config.WEBSOCKET_HOST} + + @staticmethod + def getWebSocketPort(*args, **kwargs) -> dict: + return {"status":200, "result":config.WEBSOCKET_PORT} + + @staticmethod + def setWebSocketPort(data, *args, **kwargs) -> dict: + config.WEBSOCKET_PORT = int(data) + if model.checkWebSocketServer() is True: + model.stopWebSocketServer() + model.startWebSocketServer() + return {"status":200, "result":config.WEBSOCKET_PORT} + + @staticmethod + def getWebSocketServer(*args, **kwargs) -> dict: + return {"status":200, "result":config.WEBSOCKET_SERVER} + + @staticmethod + def setEnableWebSocketServer(*args, **kwargs) -> dict: + config.WEBSOCKET_SERVER = True + model.startWebSocketServer() + return {"status":200, "result":config.WEBSOCKET_SERVER} + + @staticmethod + def setDisableWebSocketServer(*args, **kwargs) -> dict: + config.WEBSOCKET_SERVER = False + model.stopWebSocketServer() + return {"status":200, "result":config.WEBSOCKET_SERVER} + def initializationProgress(self, progress): self.run(200, self.run_mapping["initialization_progress"], progress) @@ -1825,41 +1897,7 @@ class Controller: printLog("Init Translation Engine Status") for engine in config.SELECTABLE_TRANSLATION_ENGINE_LIST: - match engine: - case "CTranslate2": - if model.checkTranslatorCTranslate2ModelWeight(config.CTRANSLATE2_WEIGHT_TYPE) is True: - config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = True - else: - config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = False - case "DeepL_API": - printLog("Start check DeepL API Key") - config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = False - if config.AUTH_KEYS[engine] is not None: - if model.authenticationTranslatorDeepLAuthKey(auth_key=config.AUTH_KEYS[engine]) is True: - config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = True - else: - # error update Auth key - auth_keys = config.AUTH_KEYS - auth_keys[engine] = None - config.AUTH_KEYS = auth_keys - case _: - if connected_network is True: - config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = True - else: - config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = False - - for engine in config.SELECTABLE_TRANSCRIPTION_ENGINE_LIST: - match engine: - case "Whisper": - if model.checkTranscriptionWhisperModelWeight(config.WHISPER_WEIGHT_TYPE) is True: - config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = True - else: - config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = False - case _: - if connected_network is True: - config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = True - else: - config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = False + config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = False self.initializationProgress(2) # set Translation Engine @@ -1911,6 +1949,10 @@ class Controller: if (config.OVERLAY_SMALL_LOG is True or config.OVERLAY_LARGE_LOG is True): model.startOverlay() + printLog("Init WebSocket Server") + if config.WEBSOCKET_SERVER is True: + model.startWebSocketServer() + printLog("Update settings") self.updateConfigSettings() diff --git a/src-python/mainloop.py b/src-python/mainloop.py index 5c5fb744..d8ad52e7 100644 --- a/src-python/mainloop.py +++ b/src-python/mainloop.py @@ -291,6 +291,15 @@ mapping = { "/set/enable/send_received_message_to_vrc": {"status": True, "variable":controller.setEnableSendReceivedMessageToVrc}, "/set/disable/send_received_message_to_vrc": {"status": True, "variable":controller.setDisableSendReceivedMessageToVrc}, + # WebSocket Settings + "/get/data/websocket_host": {"status": True, "variable":controller.getWebSocketHost}, + "/set/data/websocket_host": {"status": True, "variable":controller.setWebSocketHost}, + "/get/data/websocket_port": {"status": True, "variable":controller.getWebSocketPort}, + "/set/data/websocket_port": {"status": True, "variable":controller.setWebSocketPort}, + "/get/data/websocket_server": {"status": True, "variable":controller.getWebSocketServer}, + "/set/enable/websocket_server": {"status": True, "variable":controller.setEnableWebSocketServer}, + "/set/disable/websocket_server": {"status": True, "variable":controller.setDisableWebSocketServer}, + # Advanced Settings "/get/data/osc_ip_address": {"status": True, "variable":controller.getOscIpAddress}, "/set/data/osc_ip_address": {"status": True, "variable":controller.setOscIpAddress}, diff --git a/src-python/model.py b/src-python/model.py index 8f62aa5e..ee8ee250 100644 --- a/src-python/model.py +++ b/src-python/model.py @@ -29,6 +29,7 @@ from models.transcription.transcription_whisper import checkWhisperWeight, downl from models.overlay.overlay import Overlay from models.overlay.overlay_image import OverlayImage from models.watchdog.watchdog import Watchdog +from models.websocket.websocket_server import WebSocketServer from utils import errorLogging, setupLogger class threadFnc(Thread): @@ -100,6 +101,50 @@ class Model: self.watchdog = Watchdog(config.WATCHDOG_TIMEOUT, config.WATCHDOG_INTERVAL) self.osc_handler = OSCHandler(config.OSC_IP_ADDRESS, config.OSC_PORT) + # WebSocketサーバーの初期化 + self.websocket_server = WebSocketServer( + host=config.WEBSOCKET_HOST, + port=config.WEBSOCKET_PORT + ) + self.th_websocket_server = None + + def startWebSocketServer(self): + if self.th_websocket_server is None: + from threading import Thread + def run_server(): + try: + self.websocket_server.start() + except Exception: + errorLogging() + self.th_websocket_server = Thread(target=run_server, daemon=True) + self.th_websocket_server.start() + + def stopWebSocketServer(self): + if self.websocket_server: + try: + self.websocket_server.stop() + except Exception: + errorLogging() + self.th_websocket_server = None + + def checkWebSocketServer(self): + if self.websocket_server: + try: + return self.websocket_server.is_running + except Exception: + errorLogging() + return False + + def websocketSendMessage(self, message_dict): + """ + WebSocketサーバーから全クライアントにメッセージを送信する + :param message_dict: 送信する辞書型データ + """ + try: + self.websocket_server.send_message(message_dict) + except Exception: + errorLogging() + def checkTranslatorCTranslate2ModelWeight(self, weight_type:str): return checkCTranslate2Weight(config.PATH_LOCAL, weight_type) diff --git a/src-python/models/websocket/model_part.py b/src-python/models/websocket/model_part.py deleted file mode 100644 index 47758bf0..00000000 --- a/src-python/models/websocket/model_part.py +++ /dev/null @@ -1,103 +0,0 @@ -import copy -import gc -from subprocess import Popen -from os import makedirs as os_makedirs -from os import path as os_path -from datetime import datetime -from time import sleep -from queue import Queue -from threading import Thread -from requests import get as requests_get -from typing import Callable -from packaging.version import parse - -from flashtext import KeywordProcessor -from pykakasi import kakasi - -from device_manager import device_manager -from config import config - -from models.translation.translation_translator import Translator -from models.osc.osc import OSCHandler -from models.transcription.transcription_recorder import SelectedMicEnergyAndAudioRecorder, SelectedSpeakerEnergyAndAudioRecorder -from models.transcription.transcription_recorder import SelectedMicEnergyRecorder, SelectedSpeakerEnergyRecorder -from models.transcription.transcription_transcriber import AudioTranscriber -from models.translation.translation_languages import translation_lang -from models.transcription.transcription_languages import transcription_lang -from models.translation.translation_utils import checkCTranslate2Weight, downloadCTranslate2Weight, downloadCTranslate2Tokenizer -from models.transcription.transcription_whisper import checkWhisperWeight, downloadWhisperWeight -from models.overlay.overlay import Overlay -from models.overlay.overlay_image import OverlayImage -from models.watchdog.watchdog import Watchdog -from models.websocket.websocket_server import WebSocketServer -from utils import errorLogging, setupLogger - -class threadFnc(Thread): - def __init__(self, fnc, end_fnc=None, daemon=True, *args, **kwargs): - super(threadFnc, self).__init__(daemon=daemon, target=fnc, *args, **kwargs) - self.fnc = fnc - self.end_fnc = end_fnc - self.loop = True - self._pause = False - - def stop(self): - self.loop = False - - def pause(self): - self._pause = True - - def resume(self): - self._pause = False - - def run(self): - while self.loop: - self.fnc(*self._args, **self._kwargs) - while self._pause: - sleep(0.1) - - if callable(self.end_fnc): - self.end_fnc() - return - -class Model: - _instance = None - - def __new__(cls): - if cls._instance is None: - cls._instance = super(Model, cls).__new__(cls) - cls._instance.init() - return cls._instance - - def init(self): - self.logger = None - self.th_check_device = None - self.mic_print_transcript = None - self.mic_audio_recorder = None - self.mic_transcriber = None - self.mic_energy_recorder = None - self.mic_energy_plot_progressbar = None - self.speaker_print_transcript = None - self.speaker_audio_recorder = None - self.speaker_transcriber = None - self.speaker_energy_recorder = None - self.speaker_energy_plot_progressbar = None - - self.previous_send_message = "" - self.previous_receive_message = "" - self.translator = Translator() - self.keyword_processor = KeywordProcessor() - overlay_small_log_settings = copy.deepcopy(config.OVERLAY_SMALL_LOG_SETTINGS) - overlay_large_log_settings = copy.deepcopy(config.OVERLAY_LARGE_LOG_SETTINGS) - overlay_large_log_settings["ui_scaling"] = overlay_large_log_settings["ui_scaling"] * 0.25 - overlay_settings = { - "small": overlay_small_log_settings, - "large": overlay_large_log_settings, - } - self.overlay = Overlay(overlay_settings) - self.overlay_image = OverlayImage() - self.mic_audio_queue = None - self.mic_mute_status = None - self.kks = kakasi() - self.watchdog = Watchdog(config.WATCHDOG_TIMEOUT, config.WATCHDOG_INTERVAL) - self.osc_handler = OSCHandler(config.OSC_IP_ADDRESS, config.OSC_PORT) - self.websocket_server = WebSocketServer(host="0.0.0.0", port=8765) diff --git a/src-python/models/websocket/websocket_server.py b/src-python/models/websocket/websocket_server.py index 73ee3787..330f6b79 100644 --- a/src-python/models/websocket/websocket_server.py +++ b/src-python/models/websocket/websocket_server.py @@ -1,30 +1,29 @@ import asyncio import json import logging -from typing import Dict, Set, Optional +from typing import Dict, Set import websockets -from websockets.server import WebSocketServerProtocol class WebSocketServer: def __init__(self, host: str = "0.0.0.0", port: int = 8765): self.host = host self.port = port - self.clients: Set[WebSocketServerProtocol] = set() + self.clients = set() self.server = None self.is_running = False self.logger = logging.getLogger('websocket_server') - async def register(self, websocket: WebSocketServerProtocol): + async def register(self, websocket): """クライアント接続を登録する""" self.clients.add(websocket) self.logger.info(f"クライアント接続: {websocket.remote_address}, 現在の接続数: {len(self.clients)}") - async def unregister(self, websocket: WebSocketServerProtocol): + async def unregister(self, websocket): """クライアント接続を解除する""" self.clients.remove(websocket) self.logger.info(f"クライアント切断: {websocket.remote_address}, 現在の接続数: {len(self.clients)}") - async def handler(self, websocket: WebSocketServerProtocol, path: str): + async def handler(self, websocket): """WebSocket接続ハンドラー""" await self.register(websocket) try: @@ -46,17 +45,17 @@ class WebSocketServer: tasks = [client.send(json_message) for client in self.clients] await asyncio.gather(*tasks, return_exceptions=True) - def send_transcription(self, transcription_data: Dict): - """文字起こし結果を送信する関数(通常のコードから呼び出し可能)""" + def send_message(self, message: Dict): + """メッセージを送信する関数(通常のコードから呼び出し可能)""" if not self.is_running or not self.clients: return # asyncioのイベントループがあれば利用し、なければ新たに作成 loop = asyncio.get_event_loop() if asyncio.get_event_loop().is_running() else asyncio.new_event_loop() - + # ブロードキャスト関数を実行 asyncio.run_coroutine_threadsafe( - self.broadcast(transcription_data), + self.broadcast(message), loop ) @@ -100,3 +99,15 @@ class WebSocketServer: loop = asyncio.get_event_loop() asyncio.run_coroutine_threadsafe(self.stop_server(), loop) + + def is_running(self) -> bool: + """サーバーが実行中かどうかを確認する""" + return self.is_running + + def get_clients(self) -> Set: + """現在のクライアント接続を取得する""" + return self.clients + + def get_client_count(self) -> int: + """現在のクライアント接続数を取得する""" + return len(self.clients) \ No newline at end of file From ed5ebcee9090fcee070955479199f54c18e81b1e Mon Sep 17 00:00:00 2001 From: misyaguziya <53165965+misyaguziya@users.noreply.github.com> Date: Sun, 18 May 2025 13:16:52 +0900 Subject: [PATCH 3/5] [Update] WebSocket server: Refactor threading and message handling --- src-python/model.py | 21 +- .../models/websocket/websocket_server.py | 278 ++++++++++++------ 2 files changed, 200 insertions(+), 99 deletions(-) diff --git a/src-python/model.py b/src-python/model.py index ee8ee250..d848fd94 100644 --- a/src-python/model.py +++ b/src-python/model.py @@ -106,18 +106,12 @@ class Model: host=config.WEBSOCKET_HOST, port=config.WEBSOCKET_PORT ) - self.th_websocket_server = None def startWebSocketServer(self): - if self.th_websocket_server is None: - from threading import Thread - def run_server(): - try: - self.websocket_server.start() - except Exception: - errorLogging() - self.th_websocket_server = Thread(target=run_server, daemon=True) - self.th_websocket_server.start() + try: + self.websocket_server.start() + except Exception: + errorLogging() def stopWebSocketServer(self): if self.websocket_server: @@ -125,7 +119,6 @@ class Model: self.websocket_server.stop() except Exception: errorLogging() - self.th_websocket_server = None def checkWebSocketServer(self): if self.websocket_server: @@ -135,13 +128,13 @@ class Model: errorLogging() return False - def websocketSendMessage(self, message_dict): + def websocketSendMessage(self, message): """ WebSocketサーバーから全クライアントにメッセージを送信する - :param message_dict: 送信する辞書型データ + :param message: 送信するメッセージ """ try: - self.websocket_server.send_message(message_dict) + self.websocket_server.send(str(message)) except Exception: errorLogging() diff --git a/src-python/models/websocket/websocket_server.py b/src-python/models/websocket/websocket_server.py index 330f6b79..f01122a9 100644 --- a/src-python/models/websocket/websocket_server.py +++ b/src-python/models/websocket/websocket_server.py @@ -1,113 +1,221 @@ import asyncio -import json -import logging -from typing import Dict, Set +import threading import websockets +from websockets.legacy.server import WebSocketServerProtocol +from typing import Callable, Set, Optional class WebSocketServer: - def __init__(self, host: str = "0.0.0.0", port: int = 8765): + """ + WebSocketサーバーを管理するクラス。 + 主な機能: + - サーバーの起動・停止 + - クライアント接続管理 (接続/切断の追跡) + - メッセージ受信のコールバック処理 + - メッセージのブロードキャスト機能 + - GUIスレッド等からメッセージ送信するためのキュー + """ + def __init__(self, host: str='localhost', port: int=8765): + """ + サーバーのホスト名とポートを指定して初期化します。 + """ self.host = host self.port = port - self.clients = set() - self.server = None - self.is_running = False - self.logger = logging.getLogger('websocket_server') + self.clients: Set[WebSocketServerProtocol] = set() # 接続クライアント集合 + self._message_handler: Optional[Callable[['WebSocketServer', WebSocketServerProtocol, str], None]] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._server: Optional[websockets.serve] = None + self._thread: Optional[threading.Thread] = None + self._send_queue: Optional[asyncio.Queue] = None # 外部スレッド向け非同期キュー + self.is_running: bool = False # サーバーの起動状態を示すフラグ - async def register(self, websocket): - """クライアント接続を登録する""" + def set_message_handler(self, handler: Callable[['WebSocketServer', WebSocketServerProtocol, str], None]): + """ + クライアントからメッセージ受信時に呼び出すコールバックを設定します。 + コールバックのシグネチャ: (server, websocket, message) -> None + """ + self._message_handler = handler + + async def _handler(self, websocket): + """ + 単一クライアントとのセッションを処理するハンドラです。 + 新規接続時にクライアントを集合に追加し、メッセージを受信してコールバックを呼び出します。 + 切断時には集合からクライアントを削除します。 + """ + # 接続クライアントを集合に追加 self.clients.add(websocket) - self.logger.info(f"クライアント接続: {websocket.remote_address}, 現在の接続数: {len(self.clients)}") - - async def unregister(self, websocket): - """クライアント接続を解除する""" - self.clients.remove(websocket) - self.logger.info(f"クライアント切断: {websocket.remote_address}, 現在の接続数: {len(self.clients)}") - - async def handler(self, websocket): - """WebSocket接続ハンドラー""" - await self.register(websocket) try: async for message in websocket: - # クライアントからのメッセージを処理(必要に応じて) - # 現在はクライアントからのメッセージは特に処理していません - self.logger.debug(f"クライアントからのメッセージ: {message}") + # メッセージ受信時にコールバック呼び出し + if self._message_handler: + self._message_handler(self, websocket, message) except websockets.exceptions.ConnectionClosed: + # クライアントが切断した場合 pass finally: - await self.unregister(websocket) + # 切断時に集合から削除 + self.clients.remove(websocket) - async def broadcast(self, message: Dict): - """全クライアントにメッセージをブロードキャストする""" + async def _broadcast_async(self, message: str): + """ + すべての接続クライアントにメッセージを送信する非同期メソッド。 + """ if not self.clients: return - - json_message = json.dumps(message) - tasks = [client.send(json_message) for client in self.clients] - await asyncio.gather(*tasks, return_exceptions=True) - - def send_message(self, message: Dict): - """メッセージを送信する関数(通常のコードから呼び出し可能)""" - if not self.is_running or not self.clients: - return - - # asyncioのイベントループがあれば利用し、なければ新たに作成 - loop = asyncio.get_event_loop() if asyncio.get_event_loop().is_running() else asyncio.new_event_loop() - - # ブロードキャスト関数を実行 - asyncio.run_coroutine_threadsafe( - self.broadcast(message), - loop + # 全クライアントへ並列に送信 + await asyncio.gather( + *[client.send(message) for client in self.clients], + return_exceptions=True ) - async def start_server(self): - """WebSocketサーバーを起動する""" - if not self.is_running: - self.server = await websockets.serve(self.handler, self.host, self.port) - self.is_running = True - self.logger.info(f"WebSocketサーバーを起動しました - {self.host}:{self.port}") - return True - return False + async def _send_loop(self): + """ + 内部キューからメッセージを取り出し、すべてのクライアントに送信するループ処理。 + GUIなど他スレッドから送信メッセージをキューに入れてもらい、このコルーチンで配信します。 + """ + assert self._send_queue is not None + while True: + message = await self._send_queue.get() + if message is None: + # Noneを受け取ったらシャットダウン指示とみなしてループを抜ける + break + await self._broadcast_async(message) + + def send(self, message: str): + """ + 外部スレッドからサーバーにメッセージを送信するためのメソッドです。 + イベントループ上で安全にキューにメッセージを積み、_send_loop()経由でブロードキャストします。 + """ + if self._loop and self._send_queue: + # キューにput_nowaitするコールをイベントループにスケジュール + self._loop.call_soon_threadsafe(self._send_queue.put_nowait, message) + + def broadcast(self, message: str): + """ + 外部スレッドや他コルーチンから全クライアントにメッセージを送信するユーティリティ。 + asyncio.run_coroutine_threadsafe を使ってループ上でブロードキャストを実行します。 + """ + if self._loop: + # コルーチン自体をrun_coroutine_threadsafeに渡す + asyncio.run_coroutine_threadsafe( + self._broadcast_async(message), self._loop + ) def start(self): - """非同期ループでWebSocketサーバーを起動する(通常のコードから呼び出し可能)""" - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(self.start_server()) - # バックグラウンドでループを実行 - self._task = asyncio.run_coroutine_threadsafe(self._run_forever(), loop) + """ + サーバーを起動します。新しいスレッド上で asyncio イベントループを動かし、serve()を実行します。 + """ + if self._thread and self._thread.is_alive(): + return # 既に起動中 + # 新しいスレッドでイベントループを開始 + self._thread = threading.Thread(target=self._run_loop, daemon=True) + self._thread.start() - async def _run_forever(self): - """サーバーを永続的に実行する""" - while self.is_running: - await asyncio.sleep(0.1) + def _run_loop(self): + """ + 別スレッド上で実行されるイベントループ用のメソッド。 + サーバーの起動と、送信用キューのタスク登録を行います。 + """ + # 新しいイベントループを作成してこのスレッドの現在のループとして設定 + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) - async def stop_server(self): - """WebSocketサーバーを停止する""" - if self.is_running and self.server: - self.server.close() - await self.server.wait_closed() - self.is_running = False - self.clients.clear() - self.logger.info("WebSocketサーバーを停止しました") - return True - return False + async def setup_server(): + # サーバーを起動し、listenを開始 + self._server = await websockets.serve(self._handler, self.host, self.port) + # 送信キューを初期化 + self._send_queue = asyncio.Queue() + # 送信ループタスクを開始 + self._loop.create_task(self._send_loop()) + # サーバーの起動を待機 + # 設定関数を実行してサーバーを起動 + self._loop.run_until_complete(setup_server()) + self.is_running = True + # サーバーが起動したら、接続待機を開始 + # print(f"WebSocket server started on ws://{self.host}:{self.port}") + try: + # サーバーが停止するまでループを継続 + self._loop.run_forever() + finally: + # 停止指示が出たらすべての接続を閉じ、イベントループを終了 + self._loop.run_until_complete(self._shutdown()) + self._loop.close() + + async def _shutdown(self): + """ + サーバーとクライアントを安全にシャットダウンする非同期処理。 + serveオブジェクトをcloseし、wait_closed()で完全に終了を待ちます。 + さらに接続中の各WebSocketをcloseします。 + """ + # サーバーのListenを停止 + if self._server: + self._server.close() + await self._server.wait_closed() + # 接続中クライアントを順次クローズ + for ws in list(self.clients): + try: + await ws.close() + except Exception: + pass def stop(self): - """WebSocketサーバーを停止する(通常のコードから呼び出し可能)""" - if not self.is_running: - return + """ + サーバーを停止します。別スレッドで動作中のイベントループに停止を指示し、スレッドを終了させます。 + """ + self.is_running = False + if self._loop: + # サーバーのlistenを停止し、ループ停止をスケジュール + self._loop.call_soon_threadsafe(self._server.close) + # None をキューに入れて_send_loopを抜けさせる + self._loop.call_soon_threadsafe(self._send_queue.put_nowait, None) + # ループ停止 + self._loop.call_soon_threadsafe(self._loop.stop) + # スレッドの終了を待つ + if self._thread: + self._thread.join() - loop = asyncio.get_event_loop() - asyncio.run_coroutine_threadsafe(self.stop_server(), loop) +if __name__ == "__main__": + # テスト用の簡単なメッセージハンドラ + def message_handler(server: WebSocketServer, websocket: WebSocketServerProtocol, message: str): + print(f"Received message from {websocket.remote_address}: {message}") + server.send(f"Echo: {message}") - def is_running(self) -> bool: - """サーバーが実行中かどうかを確認する""" - return self.is_running + def send_message(server: WebSocketServer, message: str): + server.send(message) - def get_clients(self) -> Set: - """現在のクライアント接続を取得する""" - return self.clients + # メイン処理を非同期関数に変更 + async def main(): + # サーバーを起動してメッセージハンドラを設定 + ws_server = WebSocketServer() + ws_server.set_message_handler(message_handler) + ws_server.start() + print("WebSocket server started.") + # 定期的にサーバーからメッセージを送信する例 + import threading + import time + def periodic_send(): + print("Starting periodic message sender...") + while ws_server.is_running: + time.sleep(5) + print("Sending periodic message...") + send_message(ws_server, "Periodic message") + print("Periodic message sender stopped.") + # 別スレッドで定期的にメッセージを送信 + time.sleep(5) + send_thread = threading.Thread(target=periodic_send, daemon=True) + send_thread.start() + # メインスレッドでサーバーを動かし続ける + try: + while True: + # 非同期スリープで待機 + await asyncio.sleep(1) - def get_client_count(self) -> int: - """現在のクライアント接続数を取得する""" - return len(self.clients) \ No newline at end of file + except KeyboardInterrupt: + # Ctrl+Cでサーバーを停止 + print("Stopping WebSocket server...") + ws_server.stop() + + # 非同期メイン関数を実行 + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("Stopping WebSocket server...") \ No newline at end of file From e7304247c78f4f85fc4848d5393aea45670eda63 Mon Sep 17 00:00:00 2001 From: misyaguziya <53165965+misyaguziya@users.noreply.github.com> Date: Sun, 18 May 2025 15:20:27 +0900 Subject: [PATCH 4/5] [Update] WebSocket configuration: Change host to 127.0.0.1 and port to 2231; refactor WebSocket server initialization and message handling --- src-python/config.py | 6 +- src-python/controller.py | 8 +- src-python/model.py | 116 ++++++++++++------ .../models/websocket/websocket_server.py | 2 +- 4 files changed, 88 insertions(+), 44 deletions(-) diff --git a/src-python/config.py b/src-python/config.py index 7910400b..16f3802f 100644 --- a/src-python/config.py +++ b/src-python/config.py @@ -965,6 +965,7 @@ class Config: self.saveConfig(inspect.currentframe().f_code.co_name, value) @property + @json_serializable('WEBSOCKET_HOST') def WEBSOCKET_HOST(self): return self._WEBSOCKET_HOST @@ -975,6 +976,7 @@ class Config: self.saveConfig(inspect.currentframe().f_code.co_name, value) @property + @json_serializable('WEBSOCKET_PORT') def WEBSOCKET_PORT(self): return self._WEBSOCKET_PORT @@ -1171,8 +1173,8 @@ class Config: self._VRC_MIC_MUTE_SYNC = False self._NOTIFICATION_VRC_SFX = True self._WEBSOCKET_SERVER = True - self._WEBSOCKET_HOST = "0.0.0.0" - self._WEBSOCKET_PORT = 8765 + self._WEBSOCKET_HOST = "127.0.0.1" + self._WEBSOCKET_PORT = 2231 def load_config(self): if os_path.isfile(self.PATH_CONFIG) is not False: diff --git a/src-python/controller.py b/src-python/controller.py index 398e3e37..0c3c4dca 100644 --- a/src-python/controller.py +++ b/src-python/controller.py @@ -298,7 +298,7 @@ class Controller: if config.WEBSOCKET_SERVER is True: model.websocketSendMessage( { - "type":"SEND", + "type":"SENT", "message":message, "translation":translation, "transliteration":transliteration @@ -466,11 +466,11 @@ class Controller: } ) - # update textbox message log (Sent) + # update textbox message log (Chat) if config.LOGGER_FEATURE is True: if len(translation) > 0: translation_text = " (" + "/".join(translation) + ")" - model.logger.info(f"[SENT] {message}{translation_text}") + model.logger.info(f"[CHAT] {message}{translation_text}") return {"status":200, "result":{ @@ -1840,8 +1840,8 @@ class Controller: @staticmethod def setEnableWebSocketServer(*args, **kwargs) -> dict: - config.WEBSOCKET_SERVER = True model.startWebSocketServer() + config.WEBSOCKET_SERVER = True return {"status":200, "result":config.WEBSOCKET_SERVER} @staticmethod diff --git a/src-python/model.py b/src-python/model.py index d848fd94..7d39294e 100644 --- a/src-python/model.py +++ b/src-python/model.py @@ -1,5 +1,7 @@ import copy import gc +import asyncio +import json from subprocess import Popen from os import makedirs as os_makedirs from os import path as os_path @@ -100,43 +102,10 @@ class Model: self.kks = kakasi() self.watchdog = Watchdog(config.WATCHDOG_TIMEOUT, config.WATCHDOG_INTERVAL) self.osc_handler = OSCHandler(config.OSC_IP_ADDRESS, config.OSC_PORT) - - # WebSocketサーバーの初期化 - self.websocket_server = WebSocketServer( - host=config.WEBSOCKET_HOST, - port=config.WEBSOCKET_PORT - ) - - def startWebSocketServer(self): - try: - self.websocket_server.start() - except Exception: - errorLogging() - - def stopWebSocketServer(self): - if self.websocket_server: - try: - self.websocket_server.stop() - except Exception: - errorLogging() - - def checkWebSocketServer(self): - if self.websocket_server: - try: - return self.websocket_server.is_running - except Exception: - errorLogging() - return False - - def websocketSendMessage(self, message): - """ - WebSocketサーバーから全クライアントにメッセージを送信する - :param message: 送信するメッセージ - """ - try: - self.websocket_server.send(str(message)) - except Exception: - errorLogging() + self.websocket_server = None + self.websocket_server_loop = False + self.websocket_server_alive = False + self.th_websocket_server = None def checkTranslatorCTranslate2ModelWeight(self, weight_type:str): return checkCTranslate2Weight(config.PATH_LOCAL, weight_type) @@ -865,4 +834,77 @@ class Model: self.th_watchdog.join() self.th_watchdog = None + def startWebSocketServer(self): + """WebSocketサーバーを起動し、別スレッドで実行する""" + self.websocket_server_loop = True + self.websocket_server_alive = False # 初期状態を明示 + + async def WebSocketServerMain(): + try: + self.websocket_server = WebSocketServer( + host=config.WEBSOCKET_HOST, + port=config.WEBSOCKET_PORT, + ) + self.websocket_server.start() + self.websocket_server_alive = True + + # イベントループが終了するまで待機 + while self.websocket_server_loop: + self.websocket_server.send("Server is running...") + await asyncio.sleep(0.5) # 応答性向上のため間隔短縮 + + except Exception: + errorLogging() + # 具体的なエラー内容をログに残す場合 + # self.logger.error(f"WebSocket server error: {str(e)}") + finally: + # 確実にサーバーを停止 + if hasattr(self, 'websocket_server') and self.websocket_server: + self.websocket_server.stop() + self.websocket_server_alive = False + + self.th_websocket_server = Thread(target=lambda: asyncio.run(WebSocketServerMain())) + self.th_websocket_server.daemon = True + self.th_websocket_server.start() + + def stopWebSocketServer(self): + """WebSocketサーバーを停止する""" + if not hasattr(self, 'th_websocket_server') or self.th_websocket_server is None: + return + + self.websocket_server_loop = False + + try: + # 一定時間待機してからタイムアウト + self.th_websocket_server.join(timeout=2.0) + + if self.th_websocket_server.is_alive(): + # タイムアウト後もスレッドが生きている場合の処理 + self.logger.warning("WebSocket server thread did not terminate properly") + except Exception: + errorLogging() + finally: + self.th_websocket_server = None + self.websocket_server = None + self.websocket_server_alive = False + + def checkWebSocketServer(self): + """WebSocketサーバーの稼働状態を確認する""" + return self.websocket_server_alive + + def websocketSendMessage(self, message_dict:dict): + """ + WebSocketサーバーから全クライアントにメッセージを送信する + :param message_dict: 送信するメッセージの辞書 + :return: 送信成功したかどうか + """ + if not self.websocket_server_alive or not self.websocket_server: + return False + try: + message_json = json.dumps(message_dict) + return self.websocket_server.send(message_json) + except Exception: + errorLogging() + return False + model = Model() \ No newline at end of file diff --git a/src-python/models/websocket/websocket_server.py b/src-python/models/websocket/websocket_server.py index f01122a9..34762e55 100644 --- a/src-python/models/websocket/websocket_server.py +++ b/src-python/models/websocket/websocket_server.py @@ -14,7 +14,7 @@ class WebSocketServer: - メッセージのブロードキャスト機能 - GUIスレッド等からメッセージ送信するためのキュー """ - def __init__(self, host: str='localhost', port: int=8765): + def __init__(self, host: str='127.0.0.1', port: int=8765): """ サーバーのホスト名とポートを指定して初期化します。 """ From 1cc22d35c3e3816e961a1ea8927a0733a0bec529 Mon Sep 17 00:00:00 2001 From: misyaguziya <53165965+misyaguziya@users.noreply.github.com> Date: Sun, 18 May 2025 15:23:58 +0900 Subject: [PATCH 5/5] [Update] WebSocket server: Add message handler for processing incoming messages --- src-python/model.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src-python/model.py b/src-python/model.py index 7d39294e..2bea7143 100644 --- a/src-python/model.py +++ b/src-python/model.py @@ -834,6 +834,10 @@ class Model: self.th_watchdog.join() self.th_watchdog = None + def message_handler(websocket, message): + """WebSocketメッセージ受信時の処理""" + pass + def startWebSocketServer(self): """WebSocketサーバーを起動し、別スレッドで実行する""" self.websocket_server_loop = True @@ -845,6 +849,7 @@ class Model: host=config.WEBSOCKET_HOST, port=config.WEBSOCKET_PORT, ) + self.websocket_server.set_message_handler(self.message_handler) self.websocket_server.start() self.websocket_server_alive = True