diff --git a/requirements.txt b/requirements.txt index 93733326..95501ea2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,6 +15,7 @@ pydub==0.25.1 psutil==5.9.8 pykakasi==2.3.0 pycaw==20240210 +websockets==15.0.1 translators @ git+https://github.com/misyaguziya/translators@5.9.2.1 SpeechRecognition @ git+https://github.com/misyaguziya/custom_speech_recognition@3.10.4.1 tinyoscquery @ git+https://github.com/cyberkitsune/tinyoscquery@0.1.3 \ No newline at end of file diff --git a/requirements_cuda.txt b/requirements_cuda.txt index 898364b7..768f45ba 100644 --- a/requirements_cuda.txt +++ b/requirements_cuda.txt @@ -16,6 +16,7 @@ pydub==0.25.1 psutil==5.9.8 pykakasi==2.3.0 pycaw==20240210 +websockets==15.0.1 translators @ git+https://github.com/misyaguziya/translators@5.9.2.1 SpeechRecognition @ git+https://github.com/misyaguziya/custom_speech_recognition@3.10.4.1 tinyoscquery @ git+https://github.com/cyberkitsune/tinyoscquery@0.1.3 \ No newline at end of file diff --git a/src-python/config.py b/src-python/config.py index 2f8471e2..f1b7de65 100644 --- a/src-python/config.py +++ b/src-python/config.py @@ -954,6 +954,39 @@ class Config: self._NOTIFICATION_VRC_SFX = value self.saveConfig(inspect.currentframe().f_code.co_name, value) + @property + def WEBSOCKET_SERVER(self): + return self._WEBSOCKET_SERVER + + @WEBSOCKET_SERVER.setter + def WEBSOCKET_SERVER(self, value): + if isinstance(value, bool): + self._WEBSOCKET_SERVER = value + self.saveConfig(inspect.currentframe().f_code.co_name, value) + + @property + @json_serializable('WEBSOCKET_HOST') + def WEBSOCKET_HOST(self): + return self._WEBSOCKET_HOST + + @WEBSOCKET_HOST.setter + def WEBSOCKET_HOST(self, value): + if isinstance(value, str): + self._WEBSOCKET_HOST = value + self.saveConfig(inspect.currentframe().f_code.co_name, value) + + @property + @json_serializable('WEBSOCKET_PORT') + def WEBSOCKET_PORT(self): + return self._WEBSOCKET_PORT + + @WEBSOCKET_PORT.setter + def WEBSOCKET_PORT(self, value): + if isinstance(value, int): + self._WEBSOCKET_PORT = value + self.saveConfig(inspect.currentframe().f_code.co_name, value) + + def init_config(self): # Read Only self._VERSION = "3.1.2" @@ -1139,6 +1172,9 @@ class Config: self._LOGGER_FEATURE = False self._VRC_MIC_MUTE_SYNC = False self._NOTIFICATION_VRC_SFX = True + self._WEBSOCKET_SERVER = True + self._WEBSOCKET_HOST = "127.0.0.1" + self._WEBSOCKET_PORT = 2231 def load_config(self): if os_path.isfile(self.PATH_CONFIG) is not False: diff --git a/src-python/controller.py b/src-python/controller.py index 16c2f53f..4890617e 100644 --- a/src-python/controller.py +++ b/src-python/controller.py @@ -294,6 +294,17 @@ class Controller: "translation":translation, "transliteration":transliteration }) + + if config.WEBSOCKET_SERVER is True: + model.websocketSendMessage( + { + "type":"SENT", + "message":message, + "translation":translation, + "transliteration":transliteration + } + ) + if config.LOGGER_FEATURE is True: if len(translation) > 0: translation = " (" + "/".join(translation) + ")" @@ -377,6 +388,17 @@ class Controller: "translation":translation, "transliteration":transliteration, }) + + if config.WEBSOCKET_SERVER is True: + model.websocketSendMessage( + { + "type":"RECEIVED", + "message":message, + "translation":translation, + "transliteration":transliteration + } + ) + if config.LOGGER_FEATURE is True: if len(translation) > 0: translation = " (" + "/".join(translation) + ")" @@ -434,11 +456,21 @@ class Controller: overlay_image = model.createOverlayImageLargeLog("send", message, translation[0] if len(translation) > 0 else "") model.updateOverlayLargeLog(overlay_image) - # update textbox message log (Sent) + if config.WEBSOCKET_SERVER is True: + model.websocketSendMessage( + { + "type":"CHAT", + "message":message, + "translation":translation, + "transliteration":transliteration + } + ) + + # update textbox message log (Chat) if config.LOGGER_FEATURE is True: if len(translation) > 0: translation_text = " (" + "/".join(translation) + ")" - model.logger.info(f"[SENT] {message}{translation_text}") + model.logger.info(f"[CHAT] {message}{translation_text}") return {"status":200, "result":{ @@ -1778,6 +1810,46 @@ class Controller: model.stopWatchdog() return {"status":200, "result":True} + @staticmethod + def getWebSocketHost(*args, **kwargs) -> dict: + return {"status":200, "result":config.WEBSOCKET_HOST} + + @staticmethod + def setWebSocketHost(data, *args, **kwargs) -> dict: + config.WEBSOCKET_HOST = data + if model.checkWebSocketServer() is True: + model.stopWebSocketServer() + model.startWebSocketServer() + return {"status":200, "result":config.WEBSOCKET_HOST} + + @staticmethod + def getWebSocketPort(*args, **kwargs) -> dict: + return {"status":200, "result":config.WEBSOCKET_PORT} + + @staticmethod + def setWebSocketPort(data, *args, **kwargs) -> dict: + config.WEBSOCKET_PORT = int(data) + if model.checkWebSocketServer() is True: + model.stopWebSocketServer() + model.startWebSocketServer() + return {"status":200, "result":config.WEBSOCKET_PORT} + + @staticmethod + def getWebSocketServer(*args, **kwargs) -> dict: + return {"status":200, "result":config.WEBSOCKET_SERVER} + + @staticmethod + def setEnableWebSocketServer(*args, **kwargs) -> dict: + model.startWebSocketServer() + config.WEBSOCKET_SERVER = True + return {"status":200, "result":config.WEBSOCKET_SERVER} + + @staticmethod + def setDisableWebSocketServer(*args, **kwargs) -> dict: + config.WEBSOCKET_SERVER = False + model.stopWebSocketServer() + return {"status":200, "result":config.WEBSOCKET_SERVER} + def initializationProgress(self, progress): self.run(200, self.run_mapping["initialization_progress"], progress) @@ -1825,41 +1897,7 @@ class Controller: printLog("Init Translation Engine Status") for engine in config.SELECTABLE_TRANSLATION_ENGINE_LIST: - match engine: - case "CTranslate2": - if model.checkTranslatorCTranslate2ModelWeight(config.CTRANSLATE2_WEIGHT_TYPE) is True: - config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = True - else: - config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = False - case "DeepL_API": - printLog("Start check DeepL API Key") - config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = False - if config.AUTH_KEYS[engine] is not None: - if model.authenticationTranslatorDeepLAuthKey(auth_key=config.AUTH_KEYS[engine]) is True: - config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = True - else: - # error update Auth key - auth_keys = config.AUTH_KEYS - auth_keys[engine] = None - config.AUTH_KEYS = auth_keys - case _: - if connected_network is True: - config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = True - else: - config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = False - - for engine in config.SELECTABLE_TRANSCRIPTION_ENGINE_LIST: - match engine: - case "Whisper": - if model.checkTranscriptionWhisperModelWeight(config.WHISPER_WEIGHT_TYPE) is True: - config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = True - else: - config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = False - case _: - if connected_network is True: - config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = True - else: - config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = False + config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = False self.initializationProgress(2) # set Translation Engine @@ -1911,6 +1949,10 @@ class Controller: if (config.OVERLAY_SMALL_LOG is True or config.OVERLAY_LARGE_LOG is True): model.startOverlay() + printLog("Init WebSocket Server") + if config.WEBSOCKET_SERVER is True: + model.startWebSocketServer() + printLog("Update settings") self.updateConfigSettings() diff --git a/src-python/mainloop.py b/src-python/mainloop.py index 5c5fb744..d8ad52e7 100644 --- a/src-python/mainloop.py +++ b/src-python/mainloop.py @@ -291,6 +291,15 @@ mapping = { "/set/enable/send_received_message_to_vrc": {"status": True, "variable":controller.setEnableSendReceivedMessageToVrc}, "/set/disable/send_received_message_to_vrc": {"status": True, "variable":controller.setDisableSendReceivedMessageToVrc}, + # WebSocket Settings + "/get/data/websocket_host": {"status": True, "variable":controller.getWebSocketHost}, + "/set/data/websocket_host": {"status": True, "variable":controller.setWebSocketHost}, + "/get/data/websocket_port": {"status": True, "variable":controller.getWebSocketPort}, + "/set/data/websocket_port": {"status": True, "variable":controller.setWebSocketPort}, + "/get/data/websocket_server": {"status": True, "variable":controller.getWebSocketServer}, + "/set/enable/websocket_server": {"status": True, "variable":controller.setEnableWebSocketServer}, + "/set/disable/websocket_server": {"status": True, "variable":controller.setDisableWebSocketServer}, + # Advanced Settings "/get/data/osc_ip_address": {"status": True, "variable":controller.getOscIpAddress}, "/set/data/osc_ip_address": {"status": True, "variable":controller.setOscIpAddress}, diff --git a/src-python/model.py b/src-python/model.py index 8f62aa5e..2bea7143 100644 --- a/src-python/model.py +++ b/src-python/model.py @@ -1,5 +1,7 @@ import copy import gc +import asyncio +import json from subprocess import Popen from os import makedirs as os_makedirs from os import path as os_path @@ -29,6 +31,7 @@ from models.transcription.transcription_whisper import checkWhisperWeight, downl from models.overlay.overlay import Overlay from models.overlay.overlay_image import OverlayImage from models.watchdog.watchdog import Watchdog +from models.websocket.websocket_server import WebSocketServer from utils import errorLogging, setupLogger class threadFnc(Thread): @@ -99,6 +102,10 @@ class Model: self.kks = kakasi() self.watchdog = Watchdog(config.WATCHDOG_TIMEOUT, config.WATCHDOG_INTERVAL) self.osc_handler = OSCHandler(config.OSC_IP_ADDRESS, config.OSC_PORT) + self.websocket_server = None + self.websocket_server_loop = False + self.websocket_server_alive = False + self.th_websocket_server = None def checkTranslatorCTranslate2ModelWeight(self, weight_type:str): return checkCTranslate2Weight(config.PATH_LOCAL, weight_type) @@ -827,4 +834,82 @@ class Model: self.th_watchdog.join() self.th_watchdog = None + def message_handler(websocket, message): + """WebSocketメッセージ受信時の処理""" + pass + + def startWebSocketServer(self): + """WebSocketサーバーを起動し、別スレッドで実行する""" + self.websocket_server_loop = True + self.websocket_server_alive = False # 初期状態を明示 + + async def WebSocketServerMain(): + try: + self.websocket_server = WebSocketServer( + host=config.WEBSOCKET_HOST, + port=config.WEBSOCKET_PORT, + ) + self.websocket_server.set_message_handler(self.message_handler) + self.websocket_server.start() + self.websocket_server_alive = True + + # イベントループが終了するまで待機 + while self.websocket_server_loop: + self.websocket_server.send("Server is running...") + await asyncio.sleep(0.5) # 応答性向上のため間隔短縮 + + except Exception: + errorLogging() + # 具体的なエラー内容をログに残す場合 + # self.logger.error(f"WebSocket server error: {str(e)}") + finally: + # 確実にサーバーを停止 + if hasattr(self, 'websocket_server') and self.websocket_server: + self.websocket_server.stop() + self.websocket_server_alive = False + + self.th_websocket_server = Thread(target=lambda: asyncio.run(WebSocketServerMain())) + self.th_websocket_server.daemon = True + self.th_websocket_server.start() + + def stopWebSocketServer(self): + """WebSocketサーバーを停止する""" + if not hasattr(self, 'th_websocket_server') or self.th_websocket_server is None: + return + + self.websocket_server_loop = False + + try: + # 一定時間待機してからタイムアウト + self.th_websocket_server.join(timeout=2.0) + + if self.th_websocket_server.is_alive(): + # タイムアウト後もスレッドが生きている場合の処理 + self.logger.warning("WebSocket server thread did not terminate properly") + except Exception: + errorLogging() + finally: + self.th_websocket_server = None + self.websocket_server = None + self.websocket_server_alive = False + + def checkWebSocketServer(self): + """WebSocketサーバーの稼働状態を確認する""" + return self.websocket_server_alive + + def websocketSendMessage(self, message_dict:dict): + """ + WebSocketサーバーから全クライアントにメッセージを送信する + :param message_dict: 送信するメッセージの辞書 + :return: 送信成功したかどうか + """ + if not self.websocket_server_alive or not self.websocket_server: + return False + try: + message_json = json.dumps(message_dict) + return self.websocket_server.send(message_json) + except Exception: + errorLogging() + return False + model = Model() \ No newline at end of file diff --git a/src-python/models/websocket/__init__.py b/src-python/models/websocket/__init__.py new file mode 100644 index 00000000..a01d3ba6 --- /dev/null +++ b/src-python/models/websocket/__init__.py @@ -0,0 +1,4 @@ +# WebSocketサーバーモジュール +from .websocket_server import WebSocketServer + +__all__ = ["WebSocketServer"] diff --git a/src-python/models/websocket/websocket_server.py b/src-python/models/websocket/websocket_server.py new file mode 100644 index 00000000..34762e55 --- /dev/null +++ b/src-python/models/websocket/websocket_server.py @@ -0,0 +1,221 @@ +import asyncio +import threading +import websockets +from websockets.legacy.server import WebSocketServerProtocol +from typing import Callable, Set, Optional + +class WebSocketServer: + """ + WebSocketサーバーを管理するクラス。 + 主な機能: + - サーバーの起動・停止 + - クライアント接続管理 (接続/切断の追跡) + - メッセージ受信のコールバック処理 + - メッセージのブロードキャスト機能 + - GUIスレッド等からメッセージ送信するためのキュー + """ + def __init__(self, host: str='127.0.0.1', port: int=8765): + """ + サーバーのホスト名とポートを指定して初期化します。 + """ + self.host = host + self.port = port + self.clients: Set[WebSocketServerProtocol] = set() # 接続クライアント集合 + self._message_handler: Optional[Callable[['WebSocketServer', WebSocketServerProtocol, str], None]] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._server: Optional[websockets.serve] = None + self._thread: Optional[threading.Thread] = None + self._send_queue: Optional[asyncio.Queue] = None # 外部スレッド向け非同期キュー + self.is_running: bool = False # サーバーの起動状態を示すフラグ + + def set_message_handler(self, handler: Callable[['WebSocketServer', WebSocketServerProtocol, str], None]): + """ + クライアントからメッセージ受信時に呼び出すコールバックを設定します。 + コールバックのシグネチャ: (server, websocket, message) -> None + """ + self._message_handler = handler + + async def _handler(self, websocket): + """ + 単一クライアントとのセッションを処理するハンドラです。 + 新規接続時にクライアントを集合に追加し、メッセージを受信してコールバックを呼び出します。 + 切断時には集合からクライアントを削除します。 + """ + # 接続クライアントを集合に追加 + self.clients.add(websocket) + try: + async for message in websocket: + # メッセージ受信時にコールバック呼び出し + if self._message_handler: + self._message_handler(self, websocket, message) + except websockets.exceptions.ConnectionClosed: + # クライアントが切断した場合 + pass + finally: + # 切断時に集合から削除 + self.clients.remove(websocket) + + async def _broadcast_async(self, message: str): + """ + すべての接続クライアントにメッセージを送信する非同期メソッド。 + """ + if not self.clients: + return + # 全クライアントへ並列に送信 + await asyncio.gather( + *[client.send(message) for client in self.clients], + return_exceptions=True + ) + + async def _send_loop(self): + """ + 内部キューからメッセージを取り出し、すべてのクライアントに送信するループ処理。 + GUIなど他スレッドから送信メッセージをキューに入れてもらい、このコルーチンで配信します。 + """ + assert self._send_queue is not None + while True: + message = await self._send_queue.get() + if message is None: + # Noneを受け取ったらシャットダウン指示とみなしてループを抜ける + break + await self._broadcast_async(message) + + def send(self, message: str): + """ + 外部スレッドからサーバーにメッセージを送信するためのメソッドです。 + イベントループ上で安全にキューにメッセージを積み、_send_loop()経由でブロードキャストします。 + """ + if self._loop and self._send_queue: + # キューにput_nowaitするコールをイベントループにスケジュール + self._loop.call_soon_threadsafe(self._send_queue.put_nowait, message) + + def broadcast(self, message: str): + """ + 外部スレッドや他コルーチンから全クライアントにメッセージを送信するユーティリティ。 + asyncio.run_coroutine_threadsafe を使ってループ上でブロードキャストを実行します。 + """ + if self._loop: + # コルーチン自体をrun_coroutine_threadsafeに渡す + asyncio.run_coroutine_threadsafe( + self._broadcast_async(message), self._loop + ) + + def start(self): + """ + サーバーを起動します。新しいスレッド上で asyncio イベントループを動かし、serve()を実行します。 + """ + if self._thread and self._thread.is_alive(): + return # 既に起動中 + # 新しいスレッドでイベントループを開始 + self._thread = threading.Thread(target=self._run_loop, daemon=True) + self._thread.start() + + def _run_loop(self): + """ + 別スレッド上で実行されるイベントループ用のメソッド。 + サーバーの起動と、送信用キューのタスク登録を行います。 + """ + # 新しいイベントループを作成してこのスレッドの現在のループとして設定 + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + async def setup_server(): + # サーバーを起動し、listenを開始 + self._server = await websockets.serve(self._handler, self.host, self.port) + # 送信キューを初期化 + self._send_queue = asyncio.Queue() + # 送信ループタスクを開始 + self._loop.create_task(self._send_loop()) + # サーバーの起動を待機 + # 設定関数を実行してサーバーを起動 + self._loop.run_until_complete(setup_server()) + self.is_running = True + # サーバーが起動したら、接続待機を開始 + # print(f"WebSocket server started on ws://{self.host}:{self.port}") + try: + # サーバーが停止するまでループを継続 + self._loop.run_forever() + finally: + # 停止指示が出たらすべての接続を閉じ、イベントループを終了 + self._loop.run_until_complete(self._shutdown()) + self._loop.close() + + async def _shutdown(self): + """ + サーバーとクライアントを安全にシャットダウンする非同期処理。 + serveオブジェクトをcloseし、wait_closed()で完全に終了を待ちます。 + さらに接続中の各WebSocketをcloseします。 + """ + # サーバーのListenを停止 + if self._server: + self._server.close() + await self._server.wait_closed() + # 接続中クライアントを順次クローズ + for ws in list(self.clients): + try: + await ws.close() + except Exception: + pass + + def stop(self): + """ + サーバーを停止します。別スレッドで動作中のイベントループに停止を指示し、スレッドを終了させます。 + """ + self.is_running = False + if self._loop: + # サーバーのlistenを停止し、ループ停止をスケジュール + self._loop.call_soon_threadsafe(self._server.close) + # None をキューに入れて_send_loopを抜けさせる + self._loop.call_soon_threadsafe(self._send_queue.put_nowait, None) + # ループ停止 + self._loop.call_soon_threadsafe(self._loop.stop) + # スレッドの終了を待つ + if self._thread: + self._thread.join() + +if __name__ == "__main__": + # テスト用の簡単なメッセージハンドラ + def message_handler(server: WebSocketServer, websocket: WebSocketServerProtocol, message: str): + print(f"Received message from {websocket.remote_address}: {message}") + server.send(f"Echo: {message}") + + def send_message(server: WebSocketServer, message: str): + server.send(message) + + # メイン処理を非同期関数に変更 + async def main(): + # サーバーを起動してメッセージハンドラを設定 + ws_server = WebSocketServer() + ws_server.set_message_handler(message_handler) + ws_server.start() + print("WebSocket server started.") + # 定期的にサーバーからメッセージを送信する例 + import threading + import time + def periodic_send(): + print("Starting periodic message sender...") + while ws_server.is_running: + time.sleep(5) + print("Sending periodic message...") + send_message(ws_server, "Periodic message") + print("Periodic message sender stopped.") + # 別スレッドで定期的にメッセージを送信 + time.sleep(5) + send_thread = threading.Thread(target=periodic_send, daemon=True) + send_thread.start() + # メインスレッドでサーバーを動かし続ける + try: + while True: + # 非同期スリープで待機 + await asyncio.sleep(1) + + except KeyboardInterrupt: + # Ctrl+Cでサーバーを停止 + print("Stopping WebSocket server...") + ws_server.stop() + + # 非同期メイン関数を実行 + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("Stopping WebSocket server...") \ No newline at end of file