From ed5ebcee9090fcee070955479199f54c18e81b1e Mon Sep 17 00:00:00 2001 From: misyaguziya <53165965+misyaguziya@users.noreply.github.com> Date: Sun, 18 May 2025 13:16:52 +0900 Subject: [PATCH] [Update] WebSocket server: Refactor threading and message handling --- src-python/model.py | 21 +- .../models/websocket/websocket_server.py | 278 ++++++++++++------ 2 files changed, 200 insertions(+), 99 deletions(-) diff --git a/src-python/model.py b/src-python/model.py index ee8ee250..d848fd94 100644 --- a/src-python/model.py +++ b/src-python/model.py @@ -106,18 +106,12 @@ class Model: host=config.WEBSOCKET_HOST, port=config.WEBSOCKET_PORT ) - self.th_websocket_server = None def startWebSocketServer(self): - if self.th_websocket_server is None: - from threading import Thread - def run_server(): - try: - self.websocket_server.start() - except Exception: - errorLogging() - self.th_websocket_server = Thread(target=run_server, daemon=True) - self.th_websocket_server.start() + try: + self.websocket_server.start() + except Exception: + errorLogging() def stopWebSocketServer(self): if self.websocket_server: @@ -125,7 +119,6 @@ class Model: self.websocket_server.stop() except Exception: errorLogging() - self.th_websocket_server = None def checkWebSocketServer(self): if self.websocket_server: @@ -135,13 +128,13 @@ class Model: errorLogging() return False - def websocketSendMessage(self, message_dict): + def websocketSendMessage(self, message): """ WebSocketサーバーから全クライアントにメッセージを送信する - :param message_dict: 送信する辞書型データ + :param message: 送信するメッセージ """ try: - self.websocket_server.send_message(message_dict) + self.websocket_server.send(str(message)) except Exception: errorLogging() diff --git a/src-python/models/websocket/websocket_server.py b/src-python/models/websocket/websocket_server.py index 330f6b79..f01122a9 100644 --- a/src-python/models/websocket/websocket_server.py +++ b/src-python/models/websocket/websocket_server.py @@ -1,113 +1,221 @@ import asyncio -import json -import logging -from typing import Dict, Set +import threading import websockets +from websockets.legacy.server import WebSocketServerProtocol +from typing import Callable, Set, Optional class WebSocketServer: - def __init__(self, host: str = "0.0.0.0", port: int = 8765): + """ + WebSocketサーバーを管理するクラス。 + 主な機能: + - サーバーの起動・停止 + - クライアント接続管理 (接続/切断の追跡) + - メッセージ受信のコールバック処理 + - メッセージのブロードキャスト機能 + - GUIスレッド等からメッセージ送信するためのキュー + """ + def __init__(self, host: str='localhost', port: int=8765): + """ + サーバーのホスト名とポートを指定して初期化します。 + """ self.host = host self.port = port - self.clients = set() - self.server = None - self.is_running = False - self.logger = logging.getLogger('websocket_server') + self.clients: Set[WebSocketServerProtocol] = set() # 接続クライアント集合 + self._message_handler: Optional[Callable[['WebSocketServer', WebSocketServerProtocol, str], None]] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._server: Optional[websockets.serve] = None + self._thread: Optional[threading.Thread] = None + self._send_queue: Optional[asyncio.Queue] = None # 外部スレッド向け非同期キュー + self.is_running: bool = False # サーバーの起動状態を示すフラグ - async def register(self, websocket): - """クライアント接続を登録する""" + def set_message_handler(self, handler: Callable[['WebSocketServer', WebSocketServerProtocol, str], None]): + """ + クライアントからメッセージ受信時に呼び出すコールバックを設定します。 + コールバックのシグネチャ: (server, websocket, message) -> None + """ + self._message_handler = handler + + async def _handler(self, websocket): + """ + 単一クライアントとのセッションを処理するハンドラです。 + 新規接続時にクライアントを集合に追加し、メッセージを受信してコールバックを呼び出します。 + 切断時には集合からクライアントを削除します。 + """ + # 接続クライアントを集合に追加 self.clients.add(websocket) - self.logger.info(f"クライアント接続: {websocket.remote_address}, 現在の接続数: {len(self.clients)}") - - async def unregister(self, websocket): - """クライアント接続を解除する""" - self.clients.remove(websocket) - self.logger.info(f"クライアント切断: {websocket.remote_address}, 現在の接続数: {len(self.clients)}") - - async def handler(self, websocket): - """WebSocket接続ハンドラー""" - await self.register(websocket) try: async for message in websocket: - # クライアントからのメッセージを処理(必要に応じて) - # 現在はクライアントからのメッセージは特に処理していません - self.logger.debug(f"クライアントからのメッセージ: {message}") + # メッセージ受信時にコールバック呼び出し + if self._message_handler: + self._message_handler(self, websocket, message) except websockets.exceptions.ConnectionClosed: + # クライアントが切断した場合 pass finally: - await self.unregister(websocket) + # 切断時に集合から削除 + self.clients.remove(websocket) - async def broadcast(self, message: Dict): - """全クライアントにメッセージをブロードキャストする""" + async def _broadcast_async(self, message: str): + """ + すべての接続クライアントにメッセージを送信する非同期メソッド。 + """ if not self.clients: return - - json_message = json.dumps(message) - tasks = [client.send(json_message) for client in self.clients] - await asyncio.gather(*tasks, return_exceptions=True) - - def send_message(self, message: Dict): - """メッセージを送信する関数(通常のコードから呼び出し可能)""" - if not self.is_running or not self.clients: - return - - # asyncioのイベントループがあれば利用し、なければ新たに作成 - loop = asyncio.get_event_loop() if asyncio.get_event_loop().is_running() else asyncio.new_event_loop() - - # ブロードキャスト関数を実行 - asyncio.run_coroutine_threadsafe( - self.broadcast(message), - loop + # 全クライアントへ並列に送信 + await asyncio.gather( + *[client.send(message) for client in self.clients], + return_exceptions=True ) - async def start_server(self): - """WebSocketサーバーを起動する""" - if not self.is_running: - self.server = await websockets.serve(self.handler, self.host, self.port) - self.is_running = True - self.logger.info(f"WebSocketサーバーを起動しました - {self.host}:{self.port}") - return True - return False + async def _send_loop(self): + """ + 内部キューからメッセージを取り出し、すべてのクライアントに送信するループ処理。 + GUIなど他スレッドから送信メッセージをキューに入れてもらい、このコルーチンで配信します。 + """ + assert self._send_queue is not None + while True: + message = await self._send_queue.get() + if message is None: + # Noneを受け取ったらシャットダウン指示とみなしてループを抜ける + break + await self._broadcast_async(message) + + def send(self, message: str): + """ + 外部スレッドからサーバーにメッセージを送信するためのメソッドです。 + イベントループ上で安全にキューにメッセージを積み、_send_loop()経由でブロードキャストします。 + """ + if self._loop and self._send_queue: + # キューにput_nowaitするコールをイベントループにスケジュール + self._loop.call_soon_threadsafe(self._send_queue.put_nowait, message) + + def broadcast(self, message: str): + """ + 外部スレッドや他コルーチンから全クライアントにメッセージを送信するユーティリティ。 + asyncio.run_coroutine_threadsafe を使ってループ上でブロードキャストを実行します。 + """ + if self._loop: + # コルーチン自体をrun_coroutine_threadsafeに渡す + asyncio.run_coroutine_threadsafe( + self._broadcast_async(message), self._loop + ) def start(self): - """非同期ループでWebSocketサーバーを起動する(通常のコードから呼び出し可能)""" - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(self.start_server()) - # バックグラウンドでループを実行 - self._task = asyncio.run_coroutine_threadsafe(self._run_forever(), loop) + """ + サーバーを起動します。新しいスレッド上で asyncio イベントループを動かし、serve()を実行します。 + """ + if self._thread and self._thread.is_alive(): + return # 既に起動中 + # 新しいスレッドでイベントループを開始 + self._thread = threading.Thread(target=self._run_loop, daemon=True) + self._thread.start() - async def _run_forever(self): - """サーバーを永続的に実行する""" - while self.is_running: - await asyncio.sleep(0.1) + def _run_loop(self): + """ + 別スレッド上で実行されるイベントループ用のメソッド。 + サーバーの起動と、送信用キューのタスク登録を行います。 + """ + # 新しいイベントループを作成してこのスレッドの現在のループとして設定 + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) - async def stop_server(self): - """WebSocketサーバーを停止する""" - if self.is_running and self.server: - self.server.close() - await self.server.wait_closed() - self.is_running = False - self.clients.clear() - self.logger.info("WebSocketサーバーを停止しました") - return True - return False + async def setup_server(): + # サーバーを起動し、listenを開始 + self._server = await websockets.serve(self._handler, self.host, self.port) + # 送信キューを初期化 + self._send_queue = asyncio.Queue() + # 送信ループタスクを開始 + self._loop.create_task(self._send_loop()) + # サーバーの起動を待機 + # 設定関数を実行してサーバーを起動 + self._loop.run_until_complete(setup_server()) + self.is_running = True + # サーバーが起動したら、接続待機を開始 + # print(f"WebSocket server started on ws://{self.host}:{self.port}") + try: + # サーバーが停止するまでループを継続 + self._loop.run_forever() + finally: + # 停止指示が出たらすべての接続を閉じ、イベントループを終了 + self._loop.run_until_complete(self._shutdown()) + self._loop.close() + + async def _shutdown(self): + """ + サーバーとクライアントを安全にシャットダウンする非同期処理。 + serveオブジェクトをcloseし、wait_closed()で完全に終了を待ちます。 + さらに接続中の各WebSocketをcloseします。 + """ + # サーバーのListenを停止 + if self._server: + self._server.close() + await self._server.wait_closed() + # 接続中クライアントを順次クローズ + for ws in list(self.clients): + try: + await ws.close() + except Exception: + pass def stop(self): - """WebSocketサーバーを停止する(通常のコードから呼び出し可能)""" - if not self.is_running: - return + """ + サーバーを停止します。別スレッドで動作中のイベントループに停止を指示し、スレッドを終了させます。 + """ + self.is_running = False + if self._loop: + # サーバーのlistenを停止し、ループ停止をスケジュール + self._loop.call_soon_threadsafe(self._server.close) + # None をキューに入れて_send_loopを抜けさせる + self._loop.call_soon_threadsafe(self._send_queue.put_nowait, None) + # ループ停止 + self._loop.call_soon_threadsafe(self._loop.stop) + # スレッドの終了を待つ + if self._thread: + self._thread.join() - loop = asyncio.get_event_loop() - asyncio.run_coroutine_threadsafe(self.stop_server(), loop) +if __name__ == "__main__": + # テスト用の簡単なメッセージハンドラ + def message_handler(server: WebSocketServer, websocket: WebSocketServerProtocol, message: str): + print(f"Received message from {websocket.remote_address}: {message}") + server.send(f"Echo: {message}") - def is_running(self) -> bool: - """サーバーが実行中かどうかを確認する""" - return self.is_running + def send_message(server: WebSocketServer, message: str): + server.send(message) - def get_clients(self) -> Set: - """現在のクライアント接続を取得する""" - return self.clients + # メイン処理を非同期関数に変更 + async def main(): + # サーバーを起動してメッセージハンドラを設定 + ws_server = WebSocketServer() + ws_server.set_message_handler(message_handler) + ws_server.start() + print("WebSocket server started.") + # 定期的にサーバーからメッセージを送信する例 + import threading + import time + def periodic_send(): + print("Starting periodic message sender...") + while ws_server.is_running: + time.sleep(5) + print("Sending periodic message...") + send_message(ws_server, "Periodic message") + print("Periodic message sender stopped.") + # 別スレッドで定期的にメッセージを送信 + time.sleep(5) + send_thread = threading.Thread(target=periodic_send, daemon=True) + send_thread.start() + # メインスレッドでサーバーを動かし続ける + try: + while True: + # 非同期スリープで待機 + await asyncio.sleep(1) - def get_client_count(self) -> int: - """現在のクライアント接続数を取得する""" - return len(self.clients) \ No newline at end of file + except KeyboardInterrupt: + # Ctrl+Cでサーバーを停止 + print("Stopping WebSocket server...") + ws_server.stop() + + # 非同期メイン関数を実行 + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("Stopping WebSocket server...") \ No newline at end of file