""" フロントエンドを模擬したテストクライアント stdin/stdoutを介してバックエンドと通信し、エンドポイントをテストします。 使用方法: 1. バックエンドを起動: python mainloop.py 2. 別のターミナルでこのスクリプトを実行: python test_client.py 3. エンドポイントとデータを指定してテストを実行 """ import os import subprocess import json import base64 import sys import time import threading from typing import Optional, Dict, Any if os.path.exists("config.json"): os.remove("config.json") class Color: GREEN = '\033[32m' RED = '\033[31m' YELLOW = '\033[33m' BLUE = '\033[34m' CYAN = '\033[36m' RESET = '\033[0m' BOLD = '\033[1m' class TestClient: def __init__(self): """バックエンドプロセスを起動してstdin/stdout通信を確立""" print(f"{Color.CYAN}バックエンドプロセスを起動中...{Color.RESET}") self.process = subprocess.Popen( [sys.executable, 'mainloop.py'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, cwd='.' ) self._watchdog_stop_event = threading.Event() self._watchdog_thread: Optional[threading.Thread] = None # 初期化完了を待つ print(f"{Color.CYAN}バックエンドの初期化を待機中...{Color.RESET}") self._wait_for_initialization() print(f"{Color.GREEN}バックエンド起動完了{Color.RESET}\n") # 初期化完了後 watchdog 開始 self._start_watchdog() def _wait_for_initialization(self, timeout: Optional[float] = None): """バックエンド初期化完了 (/run/initialization_complete) を待機する。 旧仕様: 60秒で TimeoutError を発生させていた。 新仕様: - timeout が None の場合は無期限待機。 - 'VRCT_INIT_TIMEOUT' 環境変数が設定されていれば soft timeout 値として使用。 - soft timeout 経過時は ERROR ではなく WARN を表示し継続待機。 - 進捗: 30秒ごとに経過時間と最後に受信した endpoint を表示。 - バックエンドプロセスが終了した場合のみ例外を投げる。 """ import os env_timeout = os.getenv("VRCT_INIT_TIMEOUT") if timeout is None: try: timeout = float(env_timeout) if env_timeout else None except ValueError: timeout = None start_time = time.time() last_progress_endpoint = None last_progress_time_log = 0.0 while True: # プロセス終了検知 if self.process.poll() is not None: raise RuntimeError("Backend process terminated during initialization") # soft timeout 警告表示 if timeout is not None and (time.time() - start_time) > timeout: # 一度だけ警告を出し timeout を解除(以降は継続待機) print(f"{Color.YELLOW}[WARN]{Color.RESET} 初期化が {timeout:.1f} 秒を超過しました。ダウンロード等で長時間かかっています。引き続き待機します。環境変数 VRCT_INIT_TIMEOUT を調整できます。") timeout = None # 解除 # 30秒ごとの進捗ログ now = time.time() if now - last_progress_time_log >= 30: elapsed = now - start_time ep_info = last_progress_endpoint or "(受信なし)" print(f"{Color.CYAN}[進捗]{Color.RESET} 初期化経過 {elapsed:.1f} 秒 / 最終 endpoint: {ep_info}") last_progress_time_log = now line = self.process.stdout.readline() if not line: # 何も来ていないがプロセスは動作中 -> 継続 continue stripped = line.strip() if not stripped: continue # JSON解析試行 try: response = json.loads(stripped) endpoint = response.get("endpoint", "") status = response.get("status", 0) last_progress_endpoint = endpoint or last_progress_endpoint if status == 348: # 348 はログ扱い: 全フィールド展開 print(f"{Color.CYAN} [初期化ログ]{Color.RESET} Status: {status} endpoint:{endpoint or '(none)'}") expanded = json.dumps(response, ensure_ascii=False, indent=2) for line in expanded.split('\n'): print(f" {line}") else: print(f"{Color.CYAN} [初期化中]{Color.RESET} {endpoint} (Status: {status})") if endpoint == "/run/initialization_complete" and status == 200: total_elapsed = time.time() - start_time print(f"{Color.GREEN} [初期化完了]{Color.RESET} 経過 {total_elapsed:.1f} 秒") return except json.JSONDecodeError: # ログ行として扱う print(f"{Color.CYAN} [Backend]{Color.RESET} {stripped}") continue def send_request(self, endpoint: str, data: Optional[Any] = None, timeout: float = 30.0, silent: bool = False) -> Dict[str, Any]: """ エンドポイントにリクエストを送信し、レスポンスを取得 対応するエンドポイントの応答が返ってくるまで処理を待機します Args: endpoint: テストするエンドポイント (例: "/get/data/version") data: 送信するデータ (None, dict, str, int, float, bool, list等) timeout: タイムアウト時間(秒) Returns: レスポンスの辞書 {"status": int, "endpoint": str, "result": Any} """ try: # プロセスが生きているかチェック if self.process.poll() is not None: print(f"{Color.RED}[ERROR]{Color.RESET} バックエンドプロセスが終了しています") return {"status": 500, "endpoint": endpoint, "result": "Backend process is not running"} # リクエストの構築 request = {"endpoint": endpoint} if data is not None: # データをJSON文字列に変換してBase64エンコード json_data = json.dumps(data, ensure_ascii=False) encoded_data = base64.b64encode(json_data.encode('utf-8')).decode('utf-8') request["data"] = encoded_data # リクエストを送信 request_json = json.dumps(request, ensure_ascii=False) if not silent: print(f"{Color.BLUE}[送信]{Color.RESET} {endpoint}") if data is not None: print(f" データ: {data}") print(" 応答待機中...", flush=True) try: self.process.stdin.write(request_json + '\n') self.process.stdin.flush() except (OSError, BrokenPipeError) as e: print(f"{Color.RED}[ERROR]{Color.RESET} バックエンドプロセスとの通信に失敗: {e}") # stderrの内容を確認 stderr_output = self.process.stderr.read() if stderr_output: print(f"{Color.RED}[Backend Error]{Color.RESET}") print(stderr_output) return {"status": 500, "endpoint": endpoint, "result": f"Communication error: {e}"} # 対応するエンドポイントのレスポンスを受信するまで待機 start_time = time.time() while True: # タイムアウトチェック if time.time() - start_time > timeout: print(f"{Color.RED}[TIMEOUT]{Color.RESET} レスポンスがタイムアウトしました ({timeout}秒)") return {"status": 504, "endpoint": endpoint, "result": f"Timeout after {timeout} seconds"} # レスポンスを受信 response_line = self.process.stdout.readline() if not response_line: # プロセスが終了した可能性 if self.process.poll() is not None: return {"status": 500, "endpoint": endpoint, "result": "Backend process terminated"} continue # JSONとしてパース try: response = json.loads(response_line.strip()) except json.JSONDecodeError: # ログ出力など、JSONでない行を表示 print(f"{Color.CYAN}[Backend出力]{Color.RESET} {response_line.strip()}") continue # レスポンスのエンドポイントが一致するかチェック response_endpoint = response.get("endpoint", "") if response_endpoint == endpoint: # 対応するレスポンスを受信 status = response.get("status", 500) result = response.get("result", None) # ステータスに応じて色分けして表示 if not silent: if status == 200: print(f"{Color.GREEN}[受信]{Color.RESET} Status: {status}") elif status == 400: print(f"{Color.YELLOW}[受信]{Color.RESET} Status: {status}") elif status == 348: # 348 = ログ扱い: 中身を全展開 print(f"{Color.CYAN}[LOG]{Color.RESET} Status: {status} (endpoint={response_endpoint})") else: print(f"{Color.RED}[受信]{Color.RESET} Status: {status}") # 結果を整形して表示 if not silent: # 348 の場合はログとしてフルコンテンツ優先表示 if status == 348: print(" ログエントリ全体:") full_str = json.dumps(response, ensure_ascii=False, indent=2) for line in full_str.split('\n'): print(f" {line}") print() else: print(f" エンドポイント: {response_endpoint}") print(" 結果:") if isinstance(result, (dict, list)): # dict/listの場合はインデント付きで表示 result_str = json.dumps(result, ensure_ascii=False, indent=2) for line in result_str.split('\n'): print(f" {line}") else: print(f" {result}") # レスポンス全体も表示 print(" 完全なレスポンス:") response_str = json.dumps(response, ensure_ascii=False, indent=2) for line in response_str.split('\n'): print(f" {line}") print() return response else: # 別のエンドポイントのレスポンス、またはログメッセージ if not silent: if response_endpoint: print(f"{Color.YELLOW}[他のエンドポイントの応答]{Color.RESET} {response_endpoint}") else: # endpointキーがない場合はログメッセージ print(f"{Color.CYAN}[Backendログ]{Color.RESET}") print(f" {json.dumps(response, ensure_ascii=False)}") continue except json.JSONDecodeError as e: print(f"{Color.RED}[ERROR]{Color.RESET} JSONデコードエラー: {e}") return {"status": 500, "endpoint": endpoint, "result": f"JSON decode error: {e}"} except (BrokenPipeError, OSError) as e: print(f"{Color.RED}[ERROR]{Color.RESET} プロセス通信エラー: {e}") # プロセスの状態を確認 if self.process.poll() is not None: print(f"{Color.RED}[ERROR]{Color.RESET} バックエンドプロセスが終了しています") # stderrの内容を表示 try: stderr_output = self.process.stderr.read() if stderr_output: print(f"{Color.RED}[Backend Error Output]{Color.RESET}") print(stderr_output) except Exception: pass return {"status": 500, "endpoint": endpoint, "result": f"Process communication error: {e}"} except Exception as e: print(f"{Color.RED}[ERROR]{Color.RESET} 予期しないエラー: {e}") import traceback traceback.print_exc() return {"status": 500, "endpoint": endpoint, "result": f"Error: {e}"} def _start_watchdog(self): """watchdog スレッドを開始 (30秒間隔で /run/feed_watchdog 送信)""" def _watchdog_loop(): print(f"{Color.CYAN}[Watchdog]{Color.RESET} 開始 (30秒間隔)") while not self._watchdog_stop_event.is_set(): if self._watchdog_stop_event.wait(timeout=30): break if self.process.poll() is None: try: request = {"endpoint": "/run/feed_watchdog"} request_json = json.dumps(request, ensure_ascii=False) self.process.stdin.write(request_json + '\n') self.process.stdin.flush() # レスポンスは受信しない(バックグラウンド送信) except Exception as e: print(f"{Color.YELLOW}[Watchdog]{Color.RESET} 送信エラー: {e}") break print(f"{Color.CYAN}[Watchdog]{Color.RESET} 終了") self._watchdog_thread = threading.Thread(target=_watchdog_loop, daemon=True) self._watchdog_thread.start() def cleanup(self): """バックエンドプロセスを終了""" print(f"\n{Color.CYAN}バックエンドプロセスを終了中...{Color.RESET}") # watchdog 停止 if self._watchdog_thread and self._watchdog_thread.is_alive(): self._watchdog_stop_event.set() self._watchdog_thread.join(timeout=2) # プロセス終了 self.process.terminate() try: self.process.wait(timeout=5) except subprocess.TimeoutExpired: self.process.kill() print(f"{Color.GREEN}終了完了{Color.RESET}") def run_example_tests(client: TestClient): """サンプルテストの実行""" print(f"{Color.BOLD}=== サンプルテスト開始 ==={Color.RESET}\n") # 1. バージョン情報の取得 print(f"{Color.BOLD}Test 1: バージョン情報の取得{Color.RESET}") client.send_request("/get/data/version") # 2. 透明度の設定 print(f"{Color.BOLD}Test 2: 透明度の設定{Color.RESET}") client.send_request("/set/data/transparency", 75) # 3. UI言語の設定 print(f"{Color.BOLD}Test 3: UI言語の設定{Color.RESET}") client.send_request("/set/data/ui_language", "ja") # 4. 翻訳機能の有効化 print(f"{Color.BOLD}Test 4: 翻訳機能の有効化{Color.RESET}") client.send_request("/set/enable/translation") # 5. 翻訳機能の無効化 print(f"{Color.BOLD}Test 5: 翻訳機能の無効化{Color.RESET}") client.send_request("/set/disable/translation") # 6. 無効なエンドポイント print(f"{Color.BOLD}Test 6: 無効なエンドポイント{Color.RESET}") client.send_request("/invalid/endpoint") print(f"{Color.BOLD}=== サンプルテスト終了 ==={Color.RESET}\n") class AutomatedEndpointTester: """backend_test.py のロジックを stdin/stdout 通信向けに移植した自動テストクラス Args: client: TestClient インスタンス silent: True の場合詳細ログを抑制 export_path: テスト結果を JSON に書き出すパス (None なら書き出し無し) export_csv: True の場合 CSV も併せて書き出す """ def __init__(self, client: TestClient, silent: bool = False, export_path: Optional[str] = None, export_csv: bool = False): self.client = client self.silent = silent self.export_path = export_path self.export_csv = export_csv # config 的な値をキャッシュ self.cache: Dict[str, Any] = {} # エンドポイント分類 (動的取得手段が無いため暫定的にハードコード) self.validity_endpoints = [ "/set/enable/translation", "/set/disable/translation", "/set/enable/transcription_send", "/set/disable/transcription_send", "/set/enable/transcription_receive", "/set/disable/transcription_receive", "/set/enable/websocket_server", "/set/disable/websocket_server", "/set/enable/convert_message_to_romaji", "/set/disable/convert_message_to_romaji", "/set/enable/convert_message_to_hiragana", "/set/disable/convert_message_to_hiragana", ] self.set_data_endpoints = [ "/set/data/selected_tab_no", "/set/data/selected_translation_engines", "/set/data/selected_your_languages", "/set/data/selected_target_languages", "/set/data/selected_transcription_engine", "/set/data/transparency", "/set/data/ui_scaling", "/set/data/textbox_ui_scaling", "/set/data/message_box_ratio", "/set/data/send_message_button_type", "/set/data/font_family", "/set/data/ui_language", "/set/data/main_window_geometry", "/set/data/selected_translation_compute_device", "/set/data/selected_transcription_compute_device", "/set/data/ctranslate2_weight_type", "/set/data/plamo_model", "/set/data/plamo_auth_key", "/set/data/gemini_model", "/set/data/gemini_auth_key", "/set/data/openai_model", "/set/data/openai_auth_key", "/set/data/lmstudio_model", "/set/data/lmstudio_url", "/set/data/ollama_model", "/set/data/deepl_auth_key", "/set/data/selected_mic_host", "/set/data/selected_mic_device", "/set/data/mic_threshold", "/set/data/mic_record_timeout", "/set/data/mic_phrase_timeout", "/set/data/mic_max_phrases", "/set/data/hotkeys", "/set/data/plugins_status", "/set/data/mic_avg_logprob", "/set/data/mic_no_speech_prob", "/set/data/mic_word_filter", "/set/data/selected_speaker_device", "/set/data/speaker_threshold", "/set/data/speaker_record_timeout", "/set/data/speaker_phrase_timeout", "/set/data/speaker_max_phrases", "/set/data/speaker_avg_logprob", "/set/data/speaker_no_speech_prob", "/set/data/whisper_weight_type", "/set/data/overlay_small_log_settings", "/set/data/overlay_large_log_settings", "/set/data/send_message_format_parts", "/set/data/received_message_format_parts", "/set/data/websocket_host", "/set/data/websocket_port", "/set/data/osc_ip_address", "/set/data/osc_port", "/set/data/selected_translation_compute_type", "/set/data/selected_transcription_compute_type", ] self.run_endpoints = [ "/run/send_message_box", "/run/typing_message_box", "/run/stop_typing_message_box", "/run/send_text_overlay", "/run/swap_your_language_and_target_language", "/run/update_software", "/run/update_cuda_software", "/run/download_ctranslate2_weight", "/run/download_whisper_weight", "/run/open_filepath_logs", "/run/open_filepath_config_file", "/run/feed_watchdog", "/run/lmstudio_connection", "/run/ollama_connection", ] self.delete_data_endpoints = [ "/delete/data/deepl_auth_key", ] self.results: Dict[str, Dict[str, Any]] = {} # ---------------------------------- Utility ---------------------------------- def _record(self, endpoint: str, status: Optional[int], result: Any, expected_status: list[int]): self.results[endpoint] = { "status": status, "result": result, "expected_status": expected_status, "success": status in expected_status if status is not None else False } def _get(self, endpoint: str) -> Any: """/get/data/* の結果をキャッシュしつつ取得""" resp = self.client.send_request(endpoint, silent=self.silent) if resp.get("status") == 200: self.cache[endpoint.split("/")[-1]] = resp.get("result") return resp.get("result") return None # ---------------------------------- Generators ---------------------------------- def _gen_set_data(self, endpoint: str): expected = [200] data = None # ほぼ backend_test.py のロジックを踏襲 if endpoint == "/set/data/selected_tab_no": data = sys.modules.get('__random_tab_choices', None) or None # placeholder for future dynamic data = data or "1" elif endpoint == "/set/data/selected_translation_engines": engines = self._get("/get/data/translation_engines") or [] data = {i: (engines and (engines[0] if len(engines) else None)) for i in ["1","2","3"]} elif endpoint == "/set/data/selected_your_languages": lang_list = self._get("/get/data/selectable_language_list") or [] if lang_list: choice = lang_list[0] data = {i: {"1": {**choice, "enable": True}} for i in ["1","2","3"]} elif endpoint == "/set/data/selected_target_languages": lang_list = self._get("/get/data/selectable_language_list") or [] if lang_list: base = lang_list[0] data = {i: {j: {**base, "enable": (j=="1")} for j in ["1","2","3"]} for i in ["1","2","3"]} elif endpoint == "/set/data/selected_transcription_engine": engines = self._get("/get/data/transcription_engines") or [] data = engines[0] if engines else None elif endpoint == "/set/data/transparency": import random data = random.randint(0,100) elif endpoint == "/set/data/ui_scaling" or endpoint == "/set/data/textbox_ui_scaling": import random data = random.randint(50,200) elif endpoint == "/set/data/message_box_ratio": import random data = round(random.uniform(0.1,0.9),2) elif endpoint == "/set/data/send_message_button_type": import random data = random.choice(["show","hide","show_and_disable_enter_key"]) elif endpoint == "/set/data/font_family": import random data = random.choice(["Arial","Verdana","Times New Roman"]) elif endpoint == "/set/data/ui_language": import random data = random.choice(["en","ja","ko","zh-Hant","zh-Hans"]) elif endpoint == "/set/data/main_window_geometry": import random data = { "x_pos": random.randint(0,1920), "y_pos": random.randint(0,1080), "width": random.randint(800,1920), "height": random.randint(600,1080) } elif endpoint == "/set/data/selected_translation_compute_device": lst = self._get("/get/data/translation_compute_device_list") or [] import random data = random.choice(lst) if lst else None elif endpoint == "/set/data/selected_transcription_compute_device": lst = self._get("/get/data/transcription_compute_device_list") or [] import random data = random.choice(lst) if lst else None elif endpoint == "/set/data/ctranslate2_weight_type": dct = self._get("/get/data/selectable_ctranslate2_weight_type_dict") or {} keys = list(dct.keys()) import random data = random.choice(keys) if keys else None elif endpoint == "/set/data/plamo_model": lst = self._get("/get/data/plamo_model_list") or [] import random data = random.choice(lst) if lst else None expected = [200,400] elif endpoint == "/set/data/plamo_auth_key": data = "PLAMO_DUMMY_KEY" expected = [200,400] elif endpoint == "/set/data/gemini_model": lst = self._get("/get/data/gemini_model_list") or [] import random data = random.choice(lst) if lst else None expected = [200,400] elif endpoint == "/set/data/gemini_auth_key": data = "GEMINI_DUMMY_KEY" expected = [200,400] elif endpoint == "/set/data/openai_model": lst = self._get("/get/data/openai_model_list") or [] import random data = random.choice(lst) if lst else None expected = [200,400] elif endpoint == "/set/data/openai_auth_key": data = "OPENAI_DUMMY_KEY" expected = [200,400] elif endpoint == "/set/data/lmstudio_model": lst = self._get("/get/data/lmstudio_model_list") or [] import random data = random.choice(lst) if lst else None expected = [200,400] elif endpoint == "/set/data/lmstudio_url": import random data = random.choice(["http://localhost:1234/v1","http://127.0.0.1:1234/v1","http://invalid_host:9999/v1"]) expected=[200,400] elif endpoint == "/set/data/ollama_model": lst = self._get("/get/data/ollama_model_list") or [] import random data = random.choice(lst) if lst else None expected = [200,400] elif endpoint == "/set/data/deepl_auth_key": data = "DEEPL_DUMMY_KEY" expected=[200,400] elif endpoint == "/set/data/selected_mic_host": lst = self._get("/get/data/mic_host_list") or [] import random data = random.choice(lst) if lst else None elif endpoint == "/set/data/selected_mic_device": lst = self._get("/get/data/mic_device_list") or [] import random data = random.choice(lst) if lst else None elif endpoint == "/set/data/mic_threshold": import random val = random.randint(-1000,3000) data = val expected=[200] if 0 <= val <= 2000 else [400] elif endpoint == "/set/data/mic_record_timeout": import random val = random.randint(-1,10) phrase = self._get("/get/data/mic_phrase_timeout") data = val expected=[200] if (phrase is not None and 0 <= val <= phrase) else [400] elif endpoint == "/set/data/mic_phrase_timeout": import random val = random.randint(-1,10) record = self._get("/get/data/mic_record_timeout") data = val expected=[200] if (record is not None and record <= val) else [400] elif endpoint == "/set/data/mic_max_phrases": import random val = random.randint(-1,10) data = val expected=[200] if val >= 0 else [400] elif endpoint == "/set/data/hotkeys": data = {'toggle_vrct_visibility': None,'toggle_translation': None,'toggle_transcription_send': None,'toggle_transcription_receive': None} elif endpoint == "/set/data/plugins_status": plugins = self._get("/get/data/plugins") or [] import random data = {p: random.choice([True,False]) for p in plugins} elif endpoint == "/set/data/mic_avg_logprob": import random data = random.uniform(-5,0) elif endpoint == "/set/data/mic_no_speech_prob": import random data = random.uniform(0,1) elif endpoint == "/set/data/mic_word_filter": import random data = random.choice([["test_0_0","test_0_1","test_0_2",None],["test_1_0","test_1_1",None],["test_2_0",None],[None]]) elif endpoint == "/set/data/selected_speaker_device": lst = self._get("/get/data/speaker_device_list") or [] import random data = random.choice(lst) if lst else None elif endpoint == "/set/data/speaker_threshold": import random val = random.randint(-1000,5000) data = val expected=[200] if 0 <= val <= 4000 else [400] elif endpoint == "/set/data/speaker_record_timeout": import random val = random.randint(-1,10) phrase = self._get("/get/data/speaker_phrase_timeout") data = val expected=[200] if (phrase is not None and 0 <= val <= phrase) else [400] elif endpoint == "/set/data/speaker_phrase_timeout": import random val = random.randint(-1,10) record = self._get("/get/data/speaker_record_timeout") data = val expected=[200] if (record is not None and record <= val) else [400] elif endpoint == "/set/data/speaker_max_phrases": import random val = random.randint(-1,10) data = val expected=[200] if val >= 0 else [400] elif endpoint == "/set/data/speaker_avg_logprob": import random data = random.uniform(-5,0) elif endpoint == "/set/data/speaker_no_speech_prob": import random data = random.uniform(0,1) elif endpoint == "/set/data/whisper_weight_type": dct = self._get("/get/data/selectable_whisper_weight_type_dict") or {} import random keys=[k for k,v in dct.items() if v] data = random.choice(keys) if keys else None elif endpoint == "/set/data/overlay_small_log_settings" or endpoint == "/set/data/overlay_large_log_settings": import random data = { "x_pos": random.random(), "y_pos": random.random(), "z_pos": random.random(), "x_rotation": random.random(), "y_rotation": random.random(), "z_rotation": random.random(), "display_duration": random.randint(0,100), "fadeout_duration": random.randint(0,100), "opacity": random.random(), "ui_scaling": random.random(), "tracker": random.choice(["HMD","LeftHand","RightHand"]) } elif endpoint == "/set/data/send_message_format_parts": fmt = self._get("/get/data/send_message_format_parts") data = fmt elif endpoint == "/set/data/received_message_format_parts": fmt = self._get("/get/data/received_message_format_parts") data = fmt elif endpoint == "/set/data/websocket_host": import random val = random.choice(["127.0.0.1","aaaaadwafasdsd","0210.1564.845.0"]) data = val expected = [200,400] if val=="127.0.0.1" else [400] elif endpoint == "/set/data/websocket_port": import random data = random.randint(1024,65535) expected=[200,400] elif endpoint == "/set/data/osc_ip_address": import random val = random.choice(["127.0.0.1","aaaaadwafasdsd","0210.1564.845.0"]) data = val expected = [200] if val=="127.0.0.1" else [400] elif endpoint == "/set/data/osc_port": import random data = random.randint(1024,65535) elif endpoint == "/set/data/selected_translation_compute_type": device = self.cache.get("selected_translation_compute_device") or self._get("/get/data/selected_translation_compute_device") if device and isinstance(device, dict): import random data = random.choice(device.get("compute_types", [])) if device.get("compute_types") else None elif endpoint == "/set/data/selected_transcription_compute_type": device = self.cache.get("selected_transcription_compute_device") or self._get("/get/data/selected_transcription_compute_device") if device and isinstance(device, dict): import random data = random.choice(device.get("compute_types", [])) if device.get("compute_types") else None return data, expected def _gen_run_data(self, endpoint: str): expected = [200] data = None import random if endpoint == "/run/send_message_box": choices=[{"data":{"id":"000001","message":"test"},"status":[200]}, {"data":{"id":"000002","message":"Hello World!"},"status":[200]}, {"data":{"id":"000003","message":"こんにちわ 世界!"},"status":[200]}, {"data":{"id":"000004","message":"안녕하세요 세계!"},"status":[200]}, {"data":{"id":"000005","message":"你好,世界!"},"status":[200]}] choice = random.choice(choices) data = choice["data"] expected = choice["status"] elif endpoint in ["/run/typing_message_box","/run/stop_typing_message_box","/run/send_text_overlay","/run/swap_your_language_and_target_language"]: data = "test_overlay" if endpoint == "/run/send_text_overlay" else None elif endpoint in ["/run/update_software","/run/update_cuda_software","/run/download_ctranslate2_weight","/run/download_whisper_weight","/run/open_filepath_logs","/run/open_filepath_config_file","/run/feed_watchdog"]: expected=[401] elif endpoint in ["/run/lmstudio_connection","/run/ollama_connection"]: expected=[200,400] return data, expected # ---------------------------------- Tests ---------------------------------- def test_validity_single(self, endpoint: str): expected=[200] if endpoint == "/set/enable/websocket_server": expected=[200,400] resp = self.client.send_request(endpoint) status = resp.get("status") result = resp.get("result") self._record(endpoint, status, result, expected) ok = status in expected tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}" print(f"[Validity] {endpoint} -> {tag} ({status})") return ok def test_set_data_single(self, endpoint: str): data, expected = self._gen_set_data(endpoint) if expected == [404]: self._record(endpoint, None, None, expected) print(f"[SetData] {endpoint} -> {Color.RED}UNKNOWN{Color.RESET}") return False # data が None でも expected に 400 が含まれていれば送信してテスト if data is None and 400 not in expected: self._record(endpoint, None, None, expected) print(f"[SetData] {endpoint} -> {Color.YELLOW}SKIP(no data){Color.RESET}") return True resp = self.client.send_request(endpoint, data, silent=self.silent) status = resp.get("status") result = resp.get("result") self._record(endpoint, status, result, expected) ok = status in expected tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}" print(f"[SetData] {endpoint} -> {tag} ({status}) data={data}") return ok def test_run_single(self, endpoint: str): data, expected = self._gen_run_data(endpoint) if expected == [401]: self._record(endpoint, None, None, expected) print(f"[Run] {endpoint} -> {Color.YELLOW}SKIP(401){Color.RESET}") return True resp = self.client.send_request(endpoint, data, silent=self.silent) status = resp.get("status") result = resp.get("result") self._record(endpoint, status, result, expected) ok = status in expected tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}" print(f"[Run] {endpoint} -> {tag} ({status})") return ok def test_delete_single(self, endpoint: str): expected=[200] resp = self.client.send_request(endpoint, silent=self.silent) status = resp.get("status") result = resp.get("result") self._record(endpoint, status, result, expected) ok = status in expected tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}" print(f"[Delete] {endpoint} -> {tag} ({status})") return ok def run_all(self): print(f"{Color.BOLD}=== 有効/無効エンドポイントテスト ==={Color.RESET}") for ep in self.validity_endpoints: self.test_validity_single(ep) print(f"{Color.BOLD}=== データ設定エンドポイントテスト ==={Color.RESET}") for ep in self.set_data_endpoints: self.test_set_data_single(ep) print(f"{Color.BOLD}=== 実行エンドポイントテスト ==={Color.RESET}") for ep in self.run_endpoints: self.test_run_single(ep) print(f"{Color.BOLD}=== 削除エンドポイントテスト ==={Color.RESET}") for ep in self.delete_data_endpoints: self.test_delete_single(ep) def run_random(self, iterations: int = 500): import random print(f"{Color.BOLD}=== ランダムアクセステスト(iter={iterations}) ==={Color.RESET}") groups = ["validity","set","run","delete"] for i in range(iterations): g = random.choice(groups) if g == "validity": ep = random.choice(self.validity_endpoints) self.test_validity_single(ep) elif g == "set": ep = random.choice(self.set_data_endpoints) self.test_set_data_single(ep) elif g == "run": ep = random.choice(self.run_endpoints) self.test_run_single(ep) else: ep = random.choice(self.delete_data_endpoints) self.test_delete_single(ep) # 最後に disable 系を OFF for ep in self.validity_endpoints: if ep.startswith("/set/disable/"): self.client.send_request(ep, silent=True) def run_specific_random(self, iterations: int = 200): import random print(f"{Color.BOLD}=== 特定(osc/websocket)ランダムテスト(iter={iterations}) ==={Color.RESET}") set_specific = ["/set/data/osc_ip_address","/set/data/osc_port","/set/data/websocket_host","/set/data/websocket_port"] for i in range(iterations): ep = random.choice(set_specific) self.test_set_data_single(ep) def summary(self): total = len(self.results) passed = sum(1 for r in self.results.values() if r["success"]) skipped = sum(1 for r in self.results.values() if r["expected_status"] == [401]) failed = total - passed - skipped print(f"\n{Color.BOLD}==== テストサマリー ==== {Color.RESET}") print(f"総数: {total} / 成功: {passed} / 失敗: {failed} / スキップ(401): {skipped}") if failed: print(f"{Color.RED}失敗詳細:{Color.RESET}") for ep, r in self.results.items(): if not r["success"] and r["expected_status"] != [401]: print(f"- {ep} status={r['status']} expected={r['expected_status']} result={r['result']}") print(f"{Color.BOLD}======================={Color.RESET}\n") # インタラクティブ指定によるエクスポート if self.export_path: try: self.export_results(self.export_path) print(f"{Color.GREEN}[EXPORT]{Color.RESET} JSONに書き出しました: {self.export_path}") if self.export_csv: csv_path = self._derive_csv_path(self.export_path) self.export_results_csv(csv_path) print(f"{Color.GREEN}[EXPORT]{Color.RESET} CSVに書き出しました: {csv_path}") except Exception as e: print(f"{Color.RED}[EXPORT ERROR]{Color.RESET} {e}") def _derive_csv_path(self, json_path: str) -> str: base, ext = os.path.splitext(json_path) return base + '.csv' def export_results(self, filename: str): """結果をJSONファイルへ書き出し""" payload = { "generated_at": time.strftime("%Y-%m-%dT%H:%M:%S"), "total": len(self.results), "results": self.results, } with open(filename, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) def export_results_csv(self, filename: str): """結果をCSVへ書き出し (簡易)""" import csv with open(filename, "w", encoding="utf-8", newline="") as f: w = csv.writer(f) w.writerow(["endpoint", "status", "expected", "success"]) for ep, r in self.results.items(): w.writerow([ep, r.get("status"), ",".join(map(str, r.get("expected_status", []))), r.get("success")]) def run_interactive_mode(client: TestClient): """対話モードでテストを実行""" print(f"{Color.BOLD}=== 対話モード ==={Color.RESET}") print("エンドポイントとデータを指定してテストを実行します。") print("終了するには 'quit' または 'exit' と入力してください。\n") while True: try: # エンドポイントの入力 endpoint = input(f"{Color.CYAN}エンドポイントを入力 (例: /get/data/version): {Color.RESET}").strip() if endpoint.lower() in ['quit', 'exit', 'q']: break if not endpoint: print(f"{Color.YELLOW}エンドポイントを入力してください{Color.RESET}\n") continue # データの入力 data_input = input(f"{Color.CYAN}データを入力 (JSON形式, なければEnter): {Color.RESET}").strip() data = None if data_input: try: data = json.loads(data_input) except json.JSONDecodeError: print(f"{Color.YELLOW}JSONとしてパースできませんでした。文字列として送信します{Color.RESET}") data = data_input # リクエスト送信 client.send_request(endpoint, data) except KeyboardInterrupt: print(f"\n{Color.YELLOW}中断されました{Color.RESET}") break except Exception as e: print(f"{Color.RED}エラー: {e}{Color.RESET}\n") def main(): """メイン処理""" print(f"{Color.BOLD}{'='*60}{Color.RESET}") print(f"{Color.BOLD} VRCT Backend Test Client{Color.RESET}") print(f"{Color.BOLD}{'='*60}{Color.RESET}\n") client = None try: # テストクライアントの初期化 client = TestClient() # モード選択 print("モードを選択してください:") print("1. サンプルテストを実行") print("2. 対話モードで実行") print("3. 自動テスト(全)を実行") print("4. ランダムアクセステスト") print("5. 特定(osc/websocket)ランダムテスト") mode = input(f"{Color.CYAN}選択 (1-5): {Color.RESET}").strip() # 追加オプション選択 silent_choice = input(f"{Color.CYAN}詳細ログを抑制しますか? (y/N): {Color.RESET}").strip().lower() silent = silent_choice == 'y' export_choice = input(f"{Color.CYAN}結果をJSON出力しますか? (y/N): {Color.RESET}").strip().lower() export_path = None export_csv = False if export_choice == 'y': default_name = f"test_results_{int(time.time())}.json" path_in = input(f"{Color.CYAN}出力ファイル名[{default_name}]: {Color.RESET}").strip() export_path = path_in or default_name csv_choice = input(f"{Color.CYAN}CSVも出力しますか? (y/N): {Color.RESET}").strip().lower() export_csv = csv_choice == 'y' print() if mode == "1": run_example_tests(client) elif mode == "2": run_interactive_mode(client) elif mode == "3": tester = AutomatedEndpointTester(client, silent=silent, export_path=export_path, export_csv=export_csv) tester.run_all() tester.summary() elif mode == "4": tester = AutomatedEndpointTester(client, silent=silent, export_path=export_path, export_csv=export_csv) tester.run_random() tester.summary() elif mode == "5": tester = AutomatedEndpointTester(client, silent=silent, export_path=export_path, export_csv=export_csv) tester.run_specific_random() tester.summary() else: print(f"{Color.YELLOW}無効な選択です。対話モードを開始します。{Color.RESET}\n") run_interactive_mode(client) except KeyboardInterrupt: print(f"\n{Color.YELLOW}ユーザーによって中断されました{Color.RESET}") except Exception as e: print(f"{Color.RED}エラーが発生しました: {e}{Color.RESET}") import traceback traceback.print_exc() finally: if client: client.cleanup() if __name__ == "__main__": main()