33 KiB
33 KiB
websocket_server.py - WebSocket通信サーバー
概要
非同期WebSocket通信を提供する包括的なサーバーシステムです。クライアント接続管理、メッセージ配信、外部スレッドからの安全な操作を統合し、VRCTアプリケーションとWebフロントエンド間のリアルタイム通信を実現します。
主要機能
非同期WebSocket通信
- asyncio/websockets による高性能WebSocketサーバー
- 複数クライアント同時接続対応
- 自動接続・切断管理
メッセージング機能
- リアルタイムメッセージ受信処理
- 全クライアントへのブロードキャスト配信
- カスタムメッセージハンドラー対応
スレッド間通信
- GUI等の外部スレッドからの安全なメッセージ送信
- 非同期キューによる効率的な通信制御
- スレッドセーフな操作保証
クラス構造
WebSocketServer クラス
class WebSocketServer:
def __init__(self, host: str='127.0.0.1', port: int=8765):
self.host: str # サーバーホスト
self.port: int # サーバーポート
self.clients: Set[WebSocketServerProtocol] # 接続クライアント集合
self._message_handler: Optional[Callable] # メッセージハンドラー
self._loop: Optional[asyncio.AbstractEventLoop] # イベントループ
self._server: Optional[websockets.serve] # WebSocketサーバー
self._thread: Optional[threading.Thread] # サーバースレッド
self._send_queue: Optional[asyncio.Queue] # 送信キュー
self.is_running: bool # 動作状態フラグ
WebSocket通信の中核管理クラス
主要メソッド
サーバー制御
def start_server(self) -> None
WebSocketサーバーを開始(バックグラウンドスレッド)
def stop_server(self) -> None
WebSocketサーバーを停止・リソース解放
メッセージハンドリング
def set_message_handler(self, handler: Callable[['WebSocketServer', WebSocketServerProtocol, str], None]) -> None
クライアントからのメッセージ受信時コールバック設定
パラメータ
- handler: メッセージハンドラー関数
(server, websocket, message) -> None
メッセージ送信
def send(self, message: str) -> None
外部スレッドから安全にメッセージを全クライアントに送信
パラメータ
- message: 送信するメッセージ文字列
def broadcast(self, message: str) -> None
非同期的に全クライアントにメッセージをブロードキャスト
パラメータ
- message: ブロードキャストするメッセージ
使用方法
基本的なWebSocketサーバー
from models.websocket.websocket_server import WebSocketServer
import time
import json
# メッセージハンドラーの定義
def on_message_received(server, websocket, message):
"""クライアントからのメッセージ処理"""
print(f"クライアントからメッセージ受信: {message}")
try:
# JSONメッセージの解析
data = json.loads(message)
if data.get('type') == 'translation_request':
# 翻訳要求の処理
handle_translation_request(server, data)
elif data.get('type') == 'config_update':
# 設定更新の処理
handle_config_update(server, data)
else:
# エコーバック
response = {
'type': 'echo',
'original_message': data,
'timestamp': time.time()
}
server.broadcast(json.dumps(response))
except json.JSONDecodeError:
# テキストメッセージの場合
response = f"受信しました: {message}"
server.broadcast(response)
def handle_translation_request(server, data):
"""翻訳要求の処理"""
text = data.get('text', '')
target_lang = data.get('target_language', 'English')
# 実際の翻訳処理(ここではモック)
translated_text = f"[{target_lang}] {text}"
response = {
'type': 'translation_result',
'original': text,
'translated': translated_text,
'target_language': target_lang
}
server.broadcast(json.dumps(response))
def handle_config_update(server, data):
"""設定更新の処理"""
config_key = data.get('key')
config_value = data.get('value')
print(f"設定更新: {config_key} = {config_value}")
response = {
'type': 'config_updated',
'key': config_key,
'value': config_value,
'status': 'success'
}
server.broadcast(json.dumps(response))
# WebSocketサーバーの起動
ws_server = WebSocketServer(host='127.0.0.1', port=8765)
ws_server.set_message_handler(on_message_received)
ws_server.start_server()
print("WebSocketサーバーが起動しました: ws://127.0.0.1:8765")
# 定期的なステータス送信
for i in range(10):
status_message = {
'type': 'status',
'server_time': time.time(),
'uptime': i * 5,
'connected_clients': len(ws_server.clients)
}
ws_server.send(json.dumps(status_message))
time.sleep(5)
# サーバー停止
ws_server.stop_server()
VRCTアプリケーション統合
class VRCTWebSocketInterface:
"""VRCT用WebSocketインターフェース"""
def __init__(self, controller, port=8765):
self.controller = controller # VRCTコントローラー
self.ws_server = WebSocketServer(host='127.0.0.1', port=port)
self.ws_server.set_message_handler(self.handle_web_message)
def handle_web_message(self, server, websocket, message):
"""Webクライアントからのメッセージ処理"""
try:
data = json.loads(message)
command = data.get('command')
if command == 'get_config':
self.send_config(server)
elif command == 'set_config':
self.update_config(server, data)
elif command == 'start_translation':
self.start_translation_service(server, data)
elif command == 'stop_translation':
self.stop_translation_service(server)
elif command == 'get_status':
self.send_status(server)
elif command == 'translate_text':
self.translate_text(server, data)
else:
self.send_error(server, f"未知のコマンド: {command}")
except Exception as e:
self.send_error(server, f"メッセージ処理エラー: {e}")
def send_config(self, server):
"""設定情報をWebクライアントに送信"""
config_data = {
'type': 'config',
'data': {
'source_language': self.controller.config.source_language,
'target_language': self.controller.config.target_language,
'translation_engine': self.controller.config.translation_engine,
'osc_enabled': self.controller.config.osc_enabled,
'overlay_enabled': self.controller.config.overlay_enabled
}
}
server.broadcast(json.dumps(config_data))
def update_config(self, server, data):
"""設定更新"""
config_updates = data.get('config', {})
for key, value in config_updates.items():
if hasattr(self.controller.config, key):
setattr(self.controller.config, key, value)
print(f"設定更新: {key} = {value}")
# 更新確認を送信
response = {
'type': 'config_updated',
'status': 'success',
'updated_keys': list(config_updates.keys())
}
server.broadcast(json.dumps(response))
def start_translation_service(self, server, data):
"""翻訳サービス開始"""
try:
self.controller.start_translation()
response = {
'type': 'service_status',
'service': 'translation',
'status': 'started',
'message': '翻訳サービスが開始されました'
}
server.broadcast(json.dumps(response))
except Exception as e:
self.send_error(server, f"翻訳サービス開始エラー: {e}")
def stop_translation_service(self, server):
"""翻訳サービス停止"""
try:
self.controller.stop_translation()
response = {
'type': 'service_status',
'service': 'translation',
'status': 'stopped',
'message': '翻訳サービスが停止されました'
}
server.broadcast(json.dumps(response))
except Exception as e:
self.send_error(server, f"翻訳サービス停止エラー: {e}")
def send_status(self, server):
"""システム状態送信"""
status_data = {
'type': 'system_status',
'data': {
'translation_active': self.controller.is_translation_active(),
'osc_connected': self.controller.is_osc_connected(),
'overlay_active': self.controller.is_overlay_active(),
'connected_clients': len(server.clients),
'uptime': self.controller.get_uptime(),
'memory_usage': self.controller.get_memory_usage()
}
}
server.broadcast(json.dumps(status_data))
def translate_text(self, server, data):
"""即座翻訳実行"""
text = data.get('text', '')
source_lang = data.get('source_language')
target_lang = data.get('target_language')
try:
# 翻訳実行
result = self.controller.translate_text(
text, source_lang, target_lang
)
response = {
'type': 'translation_result',
'original': text,
'translated': result,
'source_language': source_lang,
'target_language': target_lang,
'timestamp': time.time()
}
server.broadcast(json.dumps(response))
except Exception as e:
self.send_error(server, f"翻訳エラー: {e}")
def send_error(self, server, error_message):
"""エラーメッセージ送信"""
error_data = {
'type': 'error',
'message': error_message,
'timestamp': time.time()
}
server.broadcast(json.dumps(error_data))
def start(self):
"""WebSocketインターフェース開始"""
self.ws_server.start_server()
print(f"VRCT WebSocketインターフェース開始: ws://127.0.0.1:{self.ws_server.port}")
def stop(self):
"""WebSocketインターフェース停止"""
self.ws_server.stop_server()
print("VRCT WebSocketインターフェース停止")
def notify_translation_result(self, original, translated, source_lang, target_lang):
"""翻訳結果の通知(VRCTコントローラーから呼び出し)"""
notification = {
'type': 'live_translation',
'original': original,
'translated': translated,
'source_language': source_lang,
'target_language': target_lang,
'timestamp': time.time()
}
self.ws_server.send(json.dumps(notification))
# 使用例(VRCTアプリケーション内)
# vrct_ws_interface = VRCTWebSocketInterface(controller)
# vrct_ws_interface.start()
リアルタイム監視ダッシュボード
class MonitoringDashboard:
"""リアルタイム監視ダッシュボード"""
def __init__(self, system_components, port=8766):
self.components = system_components
self.ws_server = WebSocketServer(host='127.0.0.1', port=port)
self.ws_server.set_message_handler(self.handle_dashboard_message)
self.monitoring_active = False
def handle_dashboard_message(self, server, websocket, message):
"""ダッシュボードからのメッセージ処理"""
try:
data = json.loads(message)
action = data.get('action')
if action == 'start_monitoring':
self.start_monitoring(server)
elif action == 'stop_monitoring':
self.stop_monitoring(server)
elif action == 'get_metrics':
self.send_metrics(server)
elif action == 'get_logs':
self.send_logs(server, data.get('limit', 100))
except Exception as e:
self.send_dashboard_error(server, str(e))
def start_monitoring(self, server):
"""監視開始"""
if not self.monitoring_active:
self.monitoring_active = True
# 監視スレッド開始
import threading
monitor_thread = threading.Thread(
target=self.monitoring_loop,
args=(server,),
daemon=True
)
monitor_thread.start()
response = {
'type': 'monitoring_status',
'status': 'started'
}
server.broadcast(json.dumps(response))
def stop_monitoring(self, server):
"""監視停止"""
self.monitoring_active = False
response = {
'type': 'monitoring_status',
'status': 'stopped'
}
server.broadcast(json.dumps(response))
def monitoring_loop(self, server):
"""リアルタイム監視ループ"""
while self.monitoring_active:
try:
# システムメトリクス収集
metrics = self.collect_metrics()
# ダッシュボードに送信
dashboard_data = {
'type': 'live_metrics',
'metrics': metrics,
'timestamp': time.time()
}
server.broadcast(json.dumps(dashboard_data))
time.sleep(2) # 2秒間隔で更新
except Exception as e:
print(f"監視ループエラー: {e}")
time.sleep(5)
def collect_metrics(self):
"""システムメトリクス収集"""
import psutil
metrics = {
'system': {
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent
},
'network': {
'bytes_sent': psutil.net_io_counters().bytes_sent,
'bytes_recv': psutil.net_io_counters().bytes_recv
},
'vrct': {
'translation_count': self.components.get('translation_count', 0),
'osc_messages_sent': self.components.get('osc_count', 0),
'overlay_updates': self.components.get('overlay_count', 0),
'active_connections': len(self.ws_server.clients)
}
}
return metrics
def send_metrics(self, server):
"""メトリクス送信"""
metrics = self.collect_metrics()
response = {
'type': 'metrics_snapshot',
'metrics': metrics,
'timestamp': time.time()
}
server.broadcast(json.dumps(response))
def send_logs(self, server, limit):
"""ログ送信"""
# ログファイルから最新のログを取得(実装例)
logs = self.get_recent_logs(limit)
response = {
'type': 'log_data',
'logs': logs,
'count': len(logs)
}
server.broadcast(json.dumps(response))
def get_recent_logs(self, limit):
"""最新ログ取得"""
# 実際のログファイル読み込み処理
mock_logs = [
{'level': 'INFO', 'message': 'システム開始', 'timestamp': time.time() - 60},
{'level': 'DEBUG', 'message': '翻訳処理完了', 'timestamp': time.time() - 30},
{'level': 'WARNING', 'message': 'メモリ使用量増加', 'timestamp': time.time() - 10}
]
return mock_logs[-limit:]
def send_dashboard_error(self, server, error_message):
"""ダッシュボードエラー送信"""
error_data = {
'type': 'dashboard_error',
'message': error_message,
'timestamp': time.time()
}
server.broadcast(json.dumps(error_data))
def start_dashboard(self):
"""ダッシュボード開始"""
self.ws_server.start_server()
print(f"監視ダッシュボード開始: ws://127.0.0.1:{self.ws_server.port}")
def stop_dashboard(self):
"""ダッシュボード停止"""
self.monitoring_active = False
self.ws_server.stop_server()
# 使用例
system_components = {
'translation_count': 150,
'osc_count': 75,
'overlay_count': 200
}
dashboard = MonitoringDashboard(system_components)
dashboard.start_dashboard()
# しばらく実行
time.sleep(60)
dashboard.stop_dashboard()
高度なメッセージルーティング
class WebSocketRouter:
"""WebSocketメッセージルーティングシステム"""
def __init__(self, port=8767):
self.ws_server = WebSocketServer(host='127.0.0.1', port=port)
self.ws_server.set_message_handler(self.route_message)
self.routes = {}
self.middleware = []
self.client_subscriptions = {}
def add_route(self, message_type, handler):
"""メッセージタイプに対するハンドラー登録"""
self.routes[message_type] = handler
def add_middleware(self, middleware_func):
"""ミドルウェア追加"""
self.middleware.append(middleware_func)
def route_message(self, server, websocket, message):
"""メッセージルーティング処理"""
try:
# JSON解析
data = json.loads(message)
message_type = data.get('type')
# ミドルウェア実行
for middleware in self.middleware:
data = middleware(data, websocket)
if data is None: # ミドルウェアがNoneを返した場合は処理中断
return
# ルーティング実行
if message_type in self.routes:
handler = self.routes[message_type]
response = handler(data, websocket, server)
if response:
server.broadcast(json.dumps(response))
else:
# 未定義メッセージタイプ
error_response = {
'type': 'error',
'message': f'未対応メッセージタイプ: {message_type}',
'original_type': message_type
}
websocket.send(json.dumps(error_response))
except json.JSONDecodeError as e:
error_response = {
'type': 'error',
'message': f'JSON解析エラー: {e}'
}
websocket.send(json.dumps(error_response))
except Exception as e:
error_response = {
'type': 'error',
'message': f'処理エラー: {e}'
}
websocket.send(json.dumps(error_response))
def subscription_middleware(self, data, websocket):
"""購読管理ミドルウェア"""
message_type = data.get('type')
if message_type == 'subscribe':
# 購読登録
topics = data.get('topics', [])
client_id = id(websocket)
self.client_subscriptions[client_id] = topics
response = {
'type': 'subscription_confirmed',
'topics': topics
}
websocket.send(json.dumps(response))
return None # 処理終了
elif message_type == 'unsubscribe':
# 購読解除
client_id = id(websocket)
if client_id in self.client_subscriptions:
del self.client_subscriptions[client_id]
response = {
'type': 'unsubscription_confirmed'
}
websocket.send(json.dumps(response))
return None
return data # そのまま次の処理に進む
def authentication_middleware(self, data, websocket):
"""認証ミドルウェア"""
# 簡易認証例
api_key = data.get('api_key')
if api_key != 'valid_api_key_123':
error_response = {
'type': 'authentication_error',
'message': '無効なAPIキー'
}
websocket.send(json.dumps(error_response))
return None
return data
def logging_middleware(self, data, websocket):
"""ログ記録ミドルウェア"""
client_ip = websocket.remote_address[0] if websocket.remote_address else 'unknown'
message_type = data.get('type', 'unknown')
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {client_ip} -> {message_type}")
return data
def broadcast_to_subscribers(self, topic, message_data):
"""購読者へのトピック配信"""
message_data['topic'] = topic
message_json = json.dumps(message_data)
for client_id, topics in self.client_subscriptions.items():
if topic in topics:
# 該当クライアントを検索
for client in self.ws_server.clients:
if id(client) == client_id:
try:
client.send(message_json)
except Exception as e:
print(f"配信エラー: {e}")
break
def start_router(self):
"""ルーター開始"""
self.ws_server.start_server()
print(f"WebSocketルーター開始: ws://127.0.0.1:{self.ws_server.port}")
def stop_router(self):
"""ルーター停止"""
self.ws_server.stop_server()
# 使用例
def handle_chat_message(data, websocket, server):
"""チャットメッセージハンドラー"""
username = data.get('username', 'Anonymous')
message = data.get('message', '')
response = {
'type': 'chat_broadcast',
'username': username,
'message': message,
'timestamp': time.time()
}
return response
def handle_translation_request(data, websocket, server):
"""翻訳要求ハンドラー"""
text = data.get('text', '')
# 翻訳処理(モック)
translated = f"[翻訳] {text}"
response = {
'type': 'translation_response',
'original': text,
'translated': translated
}
return response
# ルーター設定
router = WebSocketRouter()
# ミドルウェア登録
router.add_middleware(router.logging_middleware)
router.add_middleware(router.subscription_middleware)
# router.add_middleware(router.authentication_middleware) # 認証が必要な場合
# ルート登録
router.add_route('chat_message', handle_chat_message)
router.add_route('translation_request', handle_translation_request)
router.start_router()
# トピック配信テスト
time.sleep(2)
router.broadcast_to_subscribers('system_updates', {
'type': 'system_notification',
'message': 'システム更新完了',
'severity': 'info'
})
time.sleep(10)
router.stop_router()
高度な機能・パターン
接続プール管理
class ConnectionPoolManager:
"""WebSocket接続プール管理"""
def __init__(self):
self.pools = {} # pool_name -> set of websockets
def assign_to_pool(self, websocket, pool_name):
"""クライアントをプールに割り当て"""
if pool_name not in self.pools:
self.pools[pool_name] = set()
self.pools[pool_name].add(websocket)
print(f"クライアントを {pool_name} プールに追加")
def remove_from_pools(self, websocket):
"""すべてのプールからクライアントを削除"""
for pool_name, pool in self.pools.items():
if websocket in pool:
pool.discard(websocket)
print(f"クライアントを {pool_name} プールから削除")
def broadcast_to_pool(self, pool_name, message):
"""特定プールに対してブロードキャスト"""
if pool_name in self.pools:
for websocket in self.pools[pool_name].copy():
try:
websocket.send(message)
except Exception:
# 切断されたクライアントを削除
self.pools[pool_name].discard(websocket)
def get_pool_stats(self):
"""プール統計情報"""
stats = {}
for pool_name, pool in self.pools.items():
stats[pool_name] = len(pool)
return stats
メッセージ永続化・再送機能
class PersistentMessageSystem:
"""メッセージ永続化・再送システム"""
def __init__(self, max_history=1000):
self.message_history = []
self.max_history = max_history
self.client_last_seen = {} # client_id -> last_message_id
def store_message(self, message_data):
"""メッセージを履歴に保存"""
message_id = len(self.message_history)
stored_message = {
'id': message_id,
'data': message_data,
'timestamp': time.time()
}
self.message_history.append(stored_message)
# 履歴サイズ制限
if len(self.message_history) > self.max_history:
self.message_history = self.message_history[-self.max_history:]
return message_id
def get_missed_messages(self, client_id, last_seen_id):
"""クライアントが見逃したメッセージを取得"""
missed_messages = []
for msg in self.message_history:
if msg['id'] > last_seen_id:
missed_messages.append(msg)
return missed_messages
def client_reconnected(self, websocket, client_id):
"""クライアント再接続時の処理"""
last_seen = self.client_last_seen.get(client_id, -1)
missed_messages = self.get_missed_messages(client_id, last_seen)
# 見逃したメッセージを再送
for msg in missed_messages:
try:
recovery_data = {
'type': 'message_recovery',
'original_message': msg['data'],
'message_id': msg['id'],
'original_timestamp': msg['timestamp']
}
websocket.send(json.dumps(recovery_data))
except Exception as e:
print(f"メッセージ再送エラー: {e}")
print(f"クライアント {client_id} に {len(missed_messages)} 件のメッセージを再送")
def update_client_position(self, client_id, message_id):
"""クライアントの最新メッセージ位置更新"""
self.client_last_seen[client_id] = message_id
パフォーマンス・スケーラビリティ
負荷分散・最適化
class OptimizedWebSocketServer(WebSocketServer):
"""最適化されたWebSocketサーバー"""
def __init__(self, host='127.0.0.1', port=8765):
super().__init__(host, port)
self.message_stats = {
'total_messages': 0,
'messages_per_second': 0,
'last_reset_time': time.time()
}
self.compression_enabled = True
self.batch_size = 50
self.batch_timeout = 0.1
def enable_message_batching(self, batch_size=50, timeout=0.1):
"""メッセージバッチング有効化"""
self.batch_size = batch_size
self.batch_timeout = timeout
async def optimized_broadcast(self, message_batch):
"""最適化されたバッチブロードキャスト"""
if not self.clients:
return
# 圧縮対応
if self.compression_enabled and len(message_batch) > 1:
# 複数メッセージをまとめて送信
combined_message = json.dumps({
'type': 'batch',
'messages': message_batch,
'count': len(message_batch)
})
else:
combined_message = json.dumps(message_batch[0])
# 並列送信(エラー処理付き)
send_tasks = []
for client in self.clients.copy():
send_tasks.append(self.safe_send(client, combined_message))
results = await asyncio.gather(*send_tasks, return_exceptions=True)
# 失敗したクライアントを削除
for i, result in enumerate(results):
if isinstance(result, Exception):
failed_client = list(self.clients)[i]
self.clients.discard(failed_client)
print(f"クライアント削除(送信失敗): {result}")
# 統計更新
self.update_message_stats(len(message_batch))
async def safe_send(self, client, message):
"""安全なメッセージ送信"""
try:
await client.send(message)
except Exception as e:
raise e # gather で捕捉される
def update_message_stats(self, message_count):
"""メッセージ統計更新"""
self.message_stats['total_messages'] += message_count
current_time = time.time()
time_diff = current_time - self.message_stats['last_reset_time']
if time_diff >= 1.0: # 1秒ごとに速度計算
self.message_stats['messages_per_second'] = message_count / time_diff
self.message_stats['last_reset_time'] = current_time
def get_performance_stats(self):
"""パフォーマンス統計取得"""
return {
'connected_clients': len(self.clients),
'total_messages': self.message_stats['total_messages'],
'messages_per_second': self.message_stats['messages_per_second'],
'compression_enabled': self.compression_enabled,
'batch_size': self.batch_size
}
依存関係・システム要件
必須依存関係
asyncio: 非同期処理フレームワークwebsockets: WebSocketライブラリthreading: マルチスレッド制御json: JSON形式データ処理
システム要件
system_requirements = {
"python_version": "3.7以上",
"asyncio_support": "非同期処理対応",
"network_stack": "TCP/WebSocket対応",
"memory": "同時接続数に応じた十分なメモリ"
}
performance_characteristics = {
"concurrent_connections": "数百~数千接続対応",
"message_throughput": "秒間数千メッセージ処理可能",
"latency": "低レイテンシー(ミリ秒オーダー)",
"memory_per_connection": "約1-5MB(接続当たり)"
}
オプション依存関係
ujson: 高速JSON処理(パフォーマンス向上)compression: メッセージ圧縮(帯域節約)
注意事項・制限
ネットワーク制限
- ファイアウォール設定の要確認
- プロキシ環境での制限可能性
- ブラウザーのWebSocket接続制限
スケーラビリティ制限
- 単一プロセスでの同時接続数制限
- メモリ使用量の線形増加
- CPU集約的な処理での性能劣化
セキュリティ考慮事項
security_considerations = {
"authentication": "認証機構の実装推奨",
"authorization": "適切な認可制御",
"rate_limiting": "レート制限の実装",
"input_validation": "入力データの検証必須",
"cors_policy": "CORS設定の適切な構成"
}
関連モジュール
config.py: WebSocket設定管理controller.py: WebSocket制御インターフェースutils.py: エラーログ・ユーティリティmodel.py: WebSocket機能統合
将来の改善点
- Redis等を用いたメッセージブローカー連携
- 負荷分散・クラスタリング対応
- より高度な認証・認可システム
- WebRTC等のより高速な通信プロトコル対応
- GraphQL over WebSocketサポート
- リアルタイム監視・分析機能の強化