Merge branch 'websocket' into develop

This commit is contained in:
misyaguziya
2025-05-18 15:24:35 +09:00
8 changed files with 436 additions and 37 deletions

View File

@@ -15,6 +15,7 @@ pydub==0.25.1
psutil==5.9.8
pykakasi==2.3.0
pycaw==20240210
websockets==15.0.1
translators @ git+https://github.com/misyaguziya/translators@5.9.2.1
SpeechRecognition @ git+https://github.com/misyaguziya/custom_speech_recognition@3.10.4.1
tinyoscquery @ git+https://github.com/cyberkitsune/tinyoscquery@0.1.3

View File

@@ -16,6 +16,7 @@ pydub==0.25.1
psutil==5.9.8
pykakasi==2.3.0
pycaw==20240210
websockets==15.0.1
translators @ git+https://github.com/misyaguziya/translators@5.9.2.1
SpeechRecognition @ git+https://github.com/misyaguziya/custom_speech_recognition@3.10.4.1
tinyoscquery @ git+https://github.com/cyberkitsune/tinyoscquery@0.1.3

View File

@@ -954,6 +954,39 @@ class Config:
self._NOTIFICATION_VRC_SFX = value
self.saveConfig(inspect.currentframe().f_code.co_name, value)
@property
def WEBSOCKET_SERVER(self):
return self._WEBSOCKET_SERVER
@WEBSOCKET_SERVER.setter
def WEBSOCKET_SERVER(self, value):
if isinstance(value, bool):
self._WEBSOCKET_SERVER = value
self.saveConfig(inspect.currentframe().f_code.co_name, value)
@property
@json_serializable('WEBSOCKET_HOST')
def WEBSOCKET_HOST(self):
return self._WEBSOCKET_HOST
@WEBSOCKET_HOST.setter
def WEBSOCKET_HOST(self, value):
if isinstance(value, str):
self._WEBSOCKET_HOST = value
self.saveConfig(inspect.currentframe().f_code.co_name, value)
@property
@json_serializable('WEBSOCKET_PORT')
def WEBSOCKET_PORT(self):
return self._WEBSOCKET_PORT
@WEBSOCKET_PORT.setter
def WEBSOCKET_PORT(self, value):
if isinstance(value, int):
self._WEBSOCKET_PORT = value
self.saveConfig(inspect.currentframe().f_code.co_name, value)
def init_config(self):
# Read Only
self._VERSION = "3.1.2"
@@ -1139,6 +1172,9 @@ class Config:
self._LOGGER_FEATURE = False
self._VRC_MIC_MUTE_SYNC = False
self._NOTIFICATION_VRC_SFX = True
self._WEBSOCKET_SERVER = True
self._WEBSOCKET_HOST = "127.0.0.1"
self._WEBSOCKET_PORT = 2231
def load_config(self):
if os_path.isfile(self.PATH_CONFIG) is not False:

View File

@@ -294,6 +294,17 @@ class Controller:
"translation":translation,
"transliteration":transliteration
})
if config.WEBSOCKET_SERVER is True:
model.websocketSendMessage(
{
"type":"SENT",
"message":message,
"translation":translation,
"transliteration":transliteration
}
)
if config.LOGGER_FEATURE is True:
if len(translation) > 0:
translation = " (" + "/".join(translation) + ")"
@@ -377,6 +388,17 @@ class Controller:
"translation":translation,
"transliteration":transliteration,
})
if config.WEBSOCKET_SERVER is True:
model.websocketSendMessage(
{
"type":"RECEIVED",
"message":message,
"translation":translation,
"transliteration":transliteration
}
)
if config.LOGGER_FEATURE is True:
if len(translation) > 0:
translation = " (" + "/".join(translation) + ")"
@@ -434,11 +456,21 @@ class Controller:
overlay_image = model.createOverlayImageLargeLog("send", message, translation[0] if len(translation) > 0 else "")
model.updateOverlayLargeLog(overlay_image)
# update textbox message log (Sent)
if config.WEBSOCKET_SERVER is True:
model.websocketSendMessage(
{
"type":"CHAT",
"message":message,
"translation":translation,
"transliteration":transliteration
}
)
# update textbox message log (Chat)
if config.LOGGER_FEATURE is True:
if len(translation) > 0:
translation_text = " (" + "/".join(translation) + ")"
model.logger.info(f"[SENT] {message}{translation_text}")
model.logger.info(f"[CHAT] {message}{translation_text}")
return {"status":200,
"result":{
@@ -1778,6 +1810,46 @@ class Controller:
model.stopWatchdog()
return {"status":200, "result":True}
@staticmethod
def getWebSocketHost(*args, **kwargs) -> dict:
return {"status":200, "result":config.WEBSOCKET_HOST}
@staticmethod
def setWebSocketHost(data, *args, **kwargs) -> dict:
config.WEBSOCKET_HOST = data
if model.checkWebSocketServer() is True:
model.stopWebSocketServer()
model.startWebSocketServer()
return {"status":200, "result":config.WEBSOCKET_HOST}
@staticmethod
def getWebSocketPort(*args, **kwargs) -> dict:
return {"status":200, "result":config.WEBSOCKET_PORT}
@staticmethod
def setWebSocketPort(data, *args, **kwargs) -> dict:
config.WEBSOCKET_PORT = int(data)
if model.checkWebSocketServer() is True:
model.stopWebSocketServer()
model.startWebSocketServer()
return {"status":200, "result":config.WEBSOCKET_PORT}
@staticmethod
def getWebSocketServer(*args, **kwargs) -> dict:
return {"status":200, "result":config.WEBSOCKET_SERVER}
@staticmethod
def setEnableWebSocketServer(*args, **kwargs) -> dict:
model.startWebSocketServer()
config.WEBSOCKET_SERVER = True
return {"status":200, "result":config.WEBSOCKET_SERVER}
@staticmethod
def setDisableWebSocketServer(*args, **kwargs) -> dict:
config.WEBSOCKET_SERVER = False
model.stopWebSocketServer()
return {"status":200, "result":config.WEBSOCKET_SERVER}
def initializationProgress(self, progress):
self.run(200, self.run_mapping["initialization_progress"], progress)
@@ -1825,41 +1897,7 @@ class Controller:
printLog("Init Translation Engine Status")
for engine in config.SELECTABLE_TRANSLATION_ENGINE_LIST:
match engine:
case "CTranslate2":
if model.checkTranslatorCTranslate2ModelWeight(config.CTRANSLATE2_WEIGHT_TYPE) is True:
config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = True
else:
config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = False
case "DeepL_API":
printLog("Start check DeepL API Key")
config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = False
if config.AUTH_KEYS[engine] is not None:
if model.authenticationTranslatorDeepLAuthKey(auth_key=config.AUTH_KEYS[engine]) is True:
config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = True
else:
# error update Auth key
auth_keys = config.AUTH_KEYS
auth_keys[engine] = None
config.AUTH_KEYS = auth_keys
case _:
if connected_network is True:
config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = True
else:
config.SELECTABLE_TRANSLATION_ENGINE_STATUS[engine] = False
for engine in config.SELECTABLE_TRANSCRIPTION_ENGINE_LIST:
match engine:
case "Whisper":
if model.checkTranscriptionWhisperModelWeight(config.WHISPER_WEIGHT_TYPE) is True:
config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = True
else:
config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = False
case _:
if connected_network is True:
config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = True
else:
config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = False
config.SELECTABLE_TRANSCRIPTION_ENGINE_STATUS[engine] = False
self.initializationProgress(2)
# set Translation Engine
@@ -1911,6 +1949,10 @@ class Controller:
if (config.OVERLAY_SMALL_LOG is True or config.OVERLAY_LARGE_LOG is True):
model.startOverlay()
printLog("Init WebSocket Server")
if config.WEBSOCKET_SERVER is True:
model.startWebSocketServer()
printLog("Update settings")
self.updateConfigSettings()

View File

@@ -291,6 +291,15 @@ mapping = {
"/set/enable/send_received_message_to_vrc": {"status": True, "variable":controller.setEnableSendReceivedMessageToVrc},
"/set/disable/send_received_message_to_vrc": {"status": True, "variable":controller.setDisableSendReceivedMessageToVrc},
# WebSocket Settings
"/get/data/websocket_host": {"status": True, "variable":controller.getWebSocketHost},
"/set/data/websocket_host": {"status": True, "variable":controller.setWebSocketHost},
"/get/data/websocket_port": {"status": True, "variable":controller.getWebSocketPort},
"/set/data/websocket_port": {"status": True, "variable":controller.setWebSocketPort},
"/get/data/websocket_server": {"status": True, "variable":controller.getWebSocketServer},
"/set/enable/websocket_server": {"status": True, "variable":controller.setEnableWebSocketServer},
"/set/disable/websocket_server": {"status": True, "variable":controller.setDisableWebSocketServer},
# Advanced Settings
"/get/data/osc_ip_address": {"status": True, "variable":controller.getOscIpAddress},
"/set/data/osc_ip_address": {"status": True, "variable":controller.setOscIpAddress},

View File

@@ -1,5 +1,7 @@
import copy
import gc
import asyncio
import json
from subprocess import Popen
from os import makedirs as os_makedirs
from os import path as os_path
@@ -29,6 +31,7 @@ from models.transcription.transcription_whisper import checkWhisperWeight, downl
from models.overlay.overlay import Overlay
from models.overlay.overlay_image import OverlayImage
from models.watchdog.watchdog import Watchdog
from models.websocket.websocket_server import WebSocketServer
from utils import errorLogging, setupLogger
class threadFnc(Thread):
@@ -99,6 +102,10 @@ class Model:
self.kks = kakasi()
self.watchdog = Watchdog(config.WATCHDOG_TIMEOUT, config.WATCHDOG_INTERVAL)
self.osc_handler = OSCHandler(config.OSC_IP_ADDRESS, config.OSC_PORT)
self.websocket_server = None
self.websocket_server_loop = False
self.websocket_server_alive = False
self.th_websocket_server = None
def checkTranslatorCTranslate2ModelWeight(self, weight_type:str):
return checkCTranslate2Weight(config.PATH_LOCAL, weight_type)
@@ -827,4 +834,82 @@ class Model:
self.th_watchdog.join()
self.th_watchdog = None
def message_handler(websocket, message):
"""WebSocketメッセージ受信時の処理"""
pass
def startWebSocketServer(self):
"""WebSocketサーバーを起動し、別スレッドで実行する"""
self.websocket_server_loop = True
self.websocket_server_alive = False # 初期状態を明示
async def WebSocketServerMain():
try:
self.websocket_server = WebSocketServer(
host=config.WEBSOCKET_HOST,
port=config.WEBSOCKET_PORT,
)
self.websocket_server.set_message_handler(self.message_handler)
self.websocket_server.start()
self.websocket_server_alive = True
# イベントループが終了するまで待機
while self.websocket_server_loop:
self.websocket_server.send("Server is running...")
await asyncio.sleep(0.5) # 応答性向上のため間隔短縮
except Exception:
errorLogging()
# 具体的なエラー内容をログに残す場合
# self.logger.error(f"WebSocket server error: {str(e)}")
finally:
# 確実にサーバーを停止
if hasattr(self, 'websocket_server') and self.websocket_server:
self.websocket_server.stop()
self.websocket_server_alive = False
self.th_websocket_server = Thread(target=lambda: asyncio.run(WebSocketServerMain()))
self.th_websocket_server.daemon = True
self.th_websocket_server.start()
def stopWebSocketServer(self):
"""WebSocketサーバーを停止する"""
if not hasattr(self, 'th_websocket_server') or self.th_websocket_server is None:
return
self.websocket_server_loop = False
try:
# 一定時間待機してからタイムアウト
self.th_websocket_server.join(timeout=2.0)
if self.th_websocket_server.is_alive():
# タイムアウト後もスレッドが生きている場合の処理
self.logger.warning("WebSocket server thread did not terminate properly")
except Exception:
errorLogging()
finally:
self.th_websocket_server = None
self.websocket_server = None
self.websocket_server_alive = False
def checkWebSocketServer(self):
"""WebSocketサーバーの稼働状態を確認する"""
return self.websocket_server_alive
def websocketSendMessage(self, message_dict:dict):
"""
WebSocketサーバーから全クライアントにメッセージを送信する
:param message_dict: 送信するメッセージの辞書
:return: 送信成功したかどうか
"""
if not self.websocket_server_alive or not self.websocket_server:
return False
try:
message_json = json.dumps(message_dict)
return self.websocket_server.send(message_json)
except Exception:
errorLogging()
return False
model = Model()

View File

@@ -0,0 +1,4 @@
# WebSocketサーバーモジュール
from .websocket_server import WebSocketServer
__all__ = ["WebSocketServer"]

View File

@@ -0,0 +1,221 @@
import asyncio
import threading
import websockets
from websockets.legacy.server import WebSocketServerProtocol
from typing import Callable, Set, Optional
class WebSocketServer:
"""
WebSocketサーバーを管理するクラス。
主な機能:
- サーバーの起動・停止
- クライアント接続管理 (接続/切断の追跡)
- メッセージ受信のコールバック処理
- メッセージのブロードキャスト機能
- GUIスレッド等からメッセージ送信するためのキュー
"""
def __init__(self, host: str='127.0.0.1', port: int=8765):
"""
サーバーのホスト名とポートを指定して初期化します。
"""
self.host = host
self.port = port
self.clients: Set[WebSocketServerProtocol] = set() # 接続クライアント集合
self._message_handler: Optional[Callable[['WebSocketServer', WebSocketServerProtocol, str], None]] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._server: Optional[websockets.serve] = None
self._thread: Optional[threading.Thread] = None
self._send_queue: Optional[asyncio.Queue] = None # 外部スレッド向け非同期キュー
self.is_running: bool = False # サーバーの起動状態を示すフラグ
def set_message_handler(self, handler: Callable[['WebSocketServer', WebSocketServerProtocol, str], None]):
"""
クライアントからメッセージ受信時に呼び出すコールバックを設定します。
コールバックのシグネチャ: (server, websocket, message) -> None
"""
self._message_handler = handler
async def _handler(self, websocket):
"""
単一クライアントとのセッションを処理するハンドラです。
新規接続時にクライアントを集合に追加し、メッセージを受信してコールバックを呼び出します。
切断時には集合からクライアントを削除します。
"""
# 接続クライアントを集合に追加
self.clients.add(websocket)
try:
async for message in websocket:
# メッセージ受信時にコールバック呼び出し
if self._message_handler:
self._message_handler(self, websocket, message)
except websockets.exceptions.ConnectionClosed:
# クライアントが切断した場合
pass
finally:
# 切断時に集合から削除
self.clients.remove(websocket)
async def _broadcast_async(self, message: str):
"""
すべての接続クライアントにメッセージを送信する非同期メソッド。
"""
if not self.clients:
return
# 全クライアントへ並列に送信
await asyncio.gather(
*[client.send(message) for client in self.clients],
return_exceptions=True
)
async def _send_loop(self):
"""
内部キューからメッセージを取り出し、すべてのクライアントに送信するループ処理。
GUIなど他スレッドから送信メッセージをキューに入れてもらい、このコルーチンで配信します。
"""
assert self._send_queue is not None
while True:
message = await self._send_queue.get()
if message is None:
# Noneを受け取ったらシャットダウン指示とみなしてループを抜ける
break
await self._broadcast_async(message)
def send(self, message: str):
"""
外部スレッドからサーバーにメッセージを送信するためのメソッドです。
イベントループ上で安全にキューにメッセージを積み、_send_loop()経由でブロードキャストします。
"""
if self._loop and self._send_queue:
# キューにput_nowaitするコールをイベントループにスケジュール
self._loop.call_soon_threadsafe(self._send_queue.put_nowait, message)
def broadcast(self, message: str):
"""
外部スレッドや他コルーチンから全クライアントにメッセージを送信するユーティリティ。
asyncio.run_coroutine_threadsafe を使ってループ上でブロードキャストを実行します。
"""
if self._loop:
# コルーチン自体をrun_coroutine_threadsafeに渡す
asyncio.run_coroutine_threadsafe(
self._broadcast_async(message), self._loop
)
def start(self):
"""
サーバーを起動します。新しいスレッド上で asyncio イベントループを動かし、serve()を実行します。
"""
if self._thread and self._thread.is_alive():
return # 既に起動中
# 新しいスレッドでイベントループを開始
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
def _run_loop(self):
"""
別スレッド上で実行されるイベントループ用のメソッド。
サーバーの起動と、送信用キューのタスク登録を行います。
"""
# 新しいイベントループを作成してこのスレッドの現在のループとして設定
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
async def setup_server():
# サーバーを起動し、listenを開始
self._server = await websockets.serve(self._handler, self.host, self.port)
# 送信キューを初期化
self._send_queue = asyncio.Queue()
# 送信ループタスクを開始
self._loop.create_task(self._send_loop())
# サーバーの起動を待機
# 設定関数を実行してサーバーを起動
self._loop.run_until_complete(setup_server())
self.is_running = True
# サーバーが起動したら、接続待機を開始
# print(f"WebSocket server started on ws://{self.host}:{self.port}")
try:
# サーバーが停止するまでループを継続
self._loop.run_forever()
finally:
# 停止指示が出たらすべての接続を閉じ、イベントループを終了
self._loop.run_until_complete(self._shutdown())
self._loop.close()
async def _shutdown(self):
"""
サーバーとクライアントを安全にシャットダウンする非同期処理。
serveオブジェクトをcloseし、wait_closed()で完全に終了を待ちます。
さらに接続中の各WebSocketをcloseします。
"""
# サーバーのListenを停止
if self._server:
self._server.close()
await self._server.wait_closed()
# 接続中クライアントを順次クローズ
for ws in list(self.clients):
try:
await ws.close()
except Exception:
pass
def stop(self):
"""
サーバーを停止します。別スレッドで動作中のイベントループに停止を指示し、スレッドを終了させます。
"""
self.is_running = False
if self._loop:
# サーバーのlistenを停止し、ループ停止をスケジュール
self._loop.call_soon_threadsafe(self._server.close)
# None をキューに入れて_send_loopを抜けさせる
self._loop.call_soon_threadsafe(self._send_queue.put_nowait, None)
# ループ停止
self._loop.call_soon_threadsafe(self._loop.stop)
# スレッドの終了を待つ
if self._thread:
self._thread.join()
if __name__ == "__main__":
# テスト用の簡単なメッセージハンドラ
def message_handler(server: WebSocketServer, websocket: WebSocketServerProtocol, message: str):
print(f"Received message from {websocket.remote_address}: {message}")
server.send(f"Echo: {message}")
def send_message(server: WebSocketServer, message: str):
server.send(message)
# メイン処理を非同期関数に変更
async def main():
# サーバーを起動してメッセージハンドラを設定
ws_server = WebSocketServer()
ws_server.set_message_handler(message_handler)
ws_server.start()
print("WebSocket server started.")
# 定期的にサーバーからメッセージを送信する例
import threading
import time
def periodic_send():
print("Starting periodic message sender...")
while ws_server.is_running:
time.sleep(5)
print("Sending periodic message...")
send_message(ws_server, "Periodic message")
print("Periodic message sender stopped.")
# 別スレッドで定期的にメッセージを送信
time.sleep(5)
send_thread = threading.Thread(target=periodic_send, daemon=True)
send_thread.start()
# メインスレッドでサーバーを動かし続ける
try:
while True:
# 非同期スリープで待機
await asyncio.sleep(1)
except KeyboardInterrupt:
# Ctrl+Cでサーバーを停止
print("Stopping WebSocket server...")
ws_server.stop()
# 非同期メイン関数を実行
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Stopping WebSocket server...")