[Update] WebSocket server: Refactor threading and message handling

This commit is contained in:
misyaguziya
2025-05-18 13:16:52 +09:00
parent d940097e44
commit ed5ebcee90
2 changed files with 200 additions and 99 deletions

View File

@@ -106,18 +106,12 @@ class Model:
host=config.WEBSOCKET_HOST,
port=config.WEBSOCKET_PORT
)
self.th_websocket_server = None
def startWebSocketServer(self):
if self.th_websocket_server is None:
from threading import Thread
def run_server():
try:
self.websocket_server.start()
except Exception:
errorLogging()
self.th_websocket_server = Thread(target=run_server, daemon=True)
self.th_websocket_server.start()
def stopWebSocketServer(self):
if self.websocket_server:
@@ -125,7 +119,6 @@ class Model:
self.websocket_server.stop()
except Exception:
errorLogging()
self.th_websocket_server = None
def checkWebSocketServer(self):
if self.websocket_server:
@@ -135,13 +128,13 @@ class Model:
errorLogging()
return False
def websocketSendMessage(self, message_dict):
def websocketSendMessage(self, message):
"""
WebSocketサーバーから全クライアントにメッセージを送信する
:param message_dict: 送信する辞書型データ
:param message: 送信するメッセージ
"""
try:
self.websocket_server.send_message(message_dict)
self.websocket_server.send(str(message))
except Exception:
errorLogging()

View File

@@ -1,113 +1,221 @@
import asyncio
import json
import logging
from typing import Dict, Set
import threading
import websockets
from websockets.legacy.server import WebSocketServerProtocol
from typing import Callable, Set, Optional
class WebSocketServer:
def __init__(self, host: str = "0.0.0.0", port: int = 8765):
"""
WebSocketサーバーを管理するクラス。
主な機能:
- サーバーの起動・停止
- クライアント接続管理 (接続/切断の追跡)
- メッセージ受信のコールバック処理
- メッセージのブロードキャスト機能
- GUIスレッド等からメッセージ送信するためのキュー
"""
def __init__(self, host: str='localhost', port: int=8765):
"""
サーバーのホスト名とポートを指定して初期化します。
"""
self.host = host
self.port = port
self.clients = set()
self.server = None
self.is_running = False
self.logger = logging.getLogger('websocket_server')
self.clients: Set[WebSocketServerProtocol] = set() # 接続クライアント集合
self._message_handler: Optional[Callable[['WebSocketServer', WebSocketServerProtocol, str], None]] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._server: Optional[websockets.serve] = None
self._thread: Optional[threading.Thread] = None
self._send_queue: Optional[asyncio.Queue] = None # 外部スレッド向け非同期キュー
self.is_running: bool = False # サーバーの起動状態を示すフラグ
async def register(self, websocket):
"""クライアント接続を登録する"""
def set_message_handler(self, handler: Callable[['WebSocketServer', WebSocketServerProtocol, str], None]):
"""
クライアントからメッセージ受信時に呼び出すコールバックを設定します。
コールバックのシグネチャ: (server, websocket, message) -> None
"""
self._message_handler = handler
async def _handler(self, websocket):
"""
単一クライアントとのセッションを処理するハンドラです。
新規接続時にクライアントを集合に追加し、メッセージを受信してコールバックを呼び出します。
切断時には集合からクライアントを削除します。
"""
# 接続クライアントを集合に追加
self.clients.add(websocket)
self.logger.info(f"クライアント接続: {websocket.remote_address}, 現在の接続数: {len(self.clients)}")
async def unregister(self, websocket):
"""クライアント接続を解除する"""
self.clients.remove(websocket)
self.logger.info(f"クライアント切断: {websocket.remote_address}, 現在の接続数: {len(self.clients)}")
async def handler(self, websocket):
"""WebSocket接続ハンドラー"""
await self.register(websocket)
try:
async for message in websocket:
# クライアントからのメッセージを処理(必要に応じて)
# 現在はクライアントからのメッセージは特に処理していません
self.logger.debug(f"クライアントからのメッセージ: {message}")
# メッセージ受信時にコールバック呼び出し
if self._message_handler:
self._message_handler(self, websocket, message)
except websockets.exceptions.ConnectionClosed:
# クライアントが切断した場合
pass
finally:
await self.unregister(websocket)
# 切断時に集合から削除
self.clients.remove(websocket)
async def broadcast(self, message: Dict):
"""全クライアントにメッセージをブロードキャストする"""
async def _broadcast_async(self, message: str):
"""
すべての接続クライアントにメッセージを送信する非同期メソッド。
"""
if not self.clients:
return
json_message = json.dumps(message)
tasks = [client.send(json_message) for client in self.clients]
await asyncio.gather(*tasks, return_exceptions=True)
def send_message(self, message: Dict):
"""メッセージを送信する関数(通常のコードから呼び出し可能)"""
if not self.is_running or not self.clients:
return
# asyncioのイベントループがあれば利用し、なければ新たに作成
loop = asyncio.get_event_loop() if asyncio.get_event_loop().is_running() else asyncio.new_event_loop()
# ブロードキャスト関数を実行
asyncio.run_coroutine_threadsafe(
self.broadcast(message),
loop
# 全クライアントへ並列に送信
await asyncio.gather(
*[client.send(message) for client in self.clients],
return_exceptions=True
)
async def start_server(self):
"""WebSocketサーバーを起動する"""
if not self.is_running:
self.server = await websockets.serve(self.handler, self.host, self.port)
self.is_running = True
self.logger.info(f"WebSocketサーバーを起動しました - {self.host}:{self.port}")
return True
return False
async def _send_loop(self):
"""
内部キューからメッセージを取り出し、すべてのクライアントに送信するループ処理。
GUIなど他スレッドから送信メッセージをキューに入れてもらい、このコルーチンで配信します。
"""
assert self._send_queue is not None
while True:
message = await self._send_queue.get()
if message is None:
# Noneを受け取ったらシャットダウン指示とみなしてループを抜ける
break
await self._broadcast_async(message)
def send(self, message: str):
"""
外部スレッドからサーバーにメッセージを送信するためのメソッドです。
イベントループ上で安全にキューにメッセージを積み、_send_loop()経由でブロードキャストします。
"""
if self._loop and self._send_queue:
# キューにput_nowaitするコールをイベントループにスケジュール
self._loop.call_soon_threadsafe(self._send_queue.put_nowait, message)
def broadcast(self, message: str):
"""
外部スレッドや他コルーチンから全クライアントにメッセージを送信するユーティリティ。
asyncio.run_coroutine_threadsafe を使ってループ上でブロードキャストを実行します。
"""
if self._loop:
# コルーチン自体をrun_coroutine_threadsafeに渡す
asyncio.run_coroutine_threadsafe(
self._broadcast_async(message), self._loop
)
def start(self):
"""非同期ループでWebSocketサーバーを起動する通常のコードから呼び出し可能"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.start_server())
# バックグラウンドでループを実行
self._task = asyncio.run_coroutine_threadsafe(self._run_forever(), loop)
"""
サーバーを起動します。新しいスレッド上で asyncio イベントループを動かし、serve()を実行します。
"""
if self._thread and self._thread.is_alive():
return # 既に起動中
# 新しいスレッドでイベントループを開始
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
async def _run_forever(self):
"""サーバーを永続的に実行する"""
while self.is_running:
await asyncio.sleep(0.1)
def _run_loop(self):
"""
別スレッド上で実行されるイベントループ用のメソッド。
サーバーの起動と、送信用キューのタスク登録を行います。
"""
# 新しいイベントループを作成してこのスレッドの現在のループとして設定
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
async def stop_server(self):
"""WebSocketサーバーを停止する"""
if self.is_running and self.server:
self.server.close()
await self.server.wait_closed()
self.is_running = False
self.clients.clear()
self.logger.info("WebSocketサーバーを停止しました")
return True
return False
async def setup_server():
# サーバーを起動し、listenを開始
self._server = await websockets.serve(self._handler, self.host, self.port)
# 送信キューを初期化
self._send_queue = asyncio.Queue()
# 送信ループタスクを開始
self._loop.create_task(self._send_loop())
# サーバーの起動を待機
# 設定関数を実行してサーバーを起動
self._loop.run_until_complete(setup_server())
self.is_running = True
# サーバーが起動したら、接続待機を開始
# print(f"WebSocket server started on ws://{self.host}:{self.port}")
try:
# サーバーが停止するまでループを継続
self._loop.run_forever()
finally:
# 停止指示が出たらすべての接続を閉じ、イベントループを終了
self._loop.run_until_complete(self._shutdown())
self._loop.close()
async def _shutdown(self):
"""
サーバーとクライアントを安全にシャットダウンする非同期処理。
serveオブジェクトをcloseし、wait_closed()で完全に終了を待ちます。
さらに接続中の各WebSocketをcloseします。
"""
# サーバーのListenを停止
if self._server:
self._server.close()
await self._server.wait_closed()
# 接続中クライアントを順次クローズ
for ws in list(self.clients):
try:
await ws.close()
except Exception:
pass
def stop(self):
"""WebSocketサーバーを停止する通常のコードから呼び出し可能"""
if not self.is_running:
return
"""
サーバーを停止します。別スレッドで動作中のイベントループに停止を指示し、スレッドを終了させます。
"""
self.is_running = False
if self._loop:
# サーバーのlistenを停止し、ループ停止をスケジュール
self._loop.call_soon_threadsafe(self._server.close)
# None をキューに入れて_send_loopを抜けさせる
self._loop.call_soon_threadsafe(self._send_queue.put_nowait, None)
# ループ停止
self._loop.call_soon_threadsafe(self._loop.stop)
# スレッドの終了を待つ
if self._thread:
self._thread.join()
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(self.stop_server(), loop)
if __name__ == "__main__":
# テスト用の簡単なメッセージハンドラ
def message_handler(server: WebSocketServer, websocket: WebSocketServerProtocol, message: str):
print(f"Received message from {websocket.remote_address}: {message}")
server.send(f"Echo: {message}")
def is_running(self) -> bool:
"""サーバーが実行中かどうかを確認する"""
return self.is_running
def send_message(server: WebSocketServer, message: str):
server.send(message)
def get_clients(self) -> Set:
"""現在のクライアント接続を取得する"""
return self.clients
# メイン処理を非同期関数に変更
async def main():
# サーバーを起動してメッセージハンドラを設定
ws_server = WebSocketServer()
ws_server.set_message_handler(message_handler)
ws_server.start()
print("WebSocket server started.")
# 定期的にサーバーからメッセージを送信する例
import threading
import time
def periodic_send():
print("Starting periodic message sender...")
while ws_server.is_running:
time.sleep(5)
print("Sending periodic message...")
send_message(ws_server, "Periodic message")
print("Periodic message sender stopped.")
# 別スレッドで定期的にメッセージを送信
time.sleep(5)
send_thread = threading.Thread(target=periodic_send, daemon=True)
send_thread.start()
# メインスレッドでサーバーを動かし続ける
try:
while True:
# 非同期スリープで待機
await asyncio.sleep(1)
def get_client_count(self) -> int:
"""現在のクライアント接続数を取得する"""
return len(self.clients)
except KeyboardInterrupt:
# Ctrl+Cでサーバーを停止
print("Stopping WebSocket server...")
ws_server.stop()
# 非同期メイン関数を実行
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Stopping WebSocket server...")