diff --git a/src-python/test_client.py b/src-python/test_client.py index 55f738fa..4000b316 100644 --- a/src-python/test_client.py +++ b/src-python/test_client.py @@ -7,7 +7,7 @@ stdin/stdoutを介してバックエンドと通信し、エンドポイント 2. 別のターミナルでこのスクリプトを実行: python test_client.py 3. エンドポイントとデータを指定してテストを実行 """ - +import os import subprocess import json import base64 @@ -15,6 +15,9 @@ import sys import time from typing import Optional, Dict, Any +if os.path.exists("config.json"): + os.remove("config.json") + class Color: GREEN = '\033[32m' RED = '\033[31m' @@ -43,45 +46,71 @@ class TestClient: self._wait_for_initialization() print(f"{Color.GREEN}バックエンド起動完了{Color.RESET}\n") - def _wait_for_initialization(self, timeout: float = 60.0): + def _wait_for_initialization(self, timeout: Optional[float] = None): + """バックエンド初期化完了 (/run/initialization_complete) を待機する。 + + 旧仕様: 60秒で TimeoutError を発生させていた。 + 新仕様: + - timeout が None の場合は無期限待機。 + - 'VRCT_INIT_TIMEOUT' 環境変数が設定されていれば soft timeout 値として使用。 + - soft timeout 経過時は ERROR ではなく WARN を表示し継続待機。 + - 進捗: 30秒ごとに経過時間と最後に受信した endpoint を表示。 + - バックエンドプロセスが終了した場合のみ例外を投げる。 """ - バックエンドの初期化完了を待機 - /run/initialization_complete のレスポンスを受信するまで待つ - """ - start_time = time.time() - while True: - if time.time() - start_time > timeout: - print(f"{Color.RED}[ERROR]{Color.RESET} 初期化タイムアウト ({timeout}秒)") - raise TimeoutError(f"Backend initialization timeout after {timeout} seconds") - - # stdoutから1行読み込み - line = self.process.stdout.readline() - - if not line: - # プロセスが終了した場合 - if self.process.poll() is not None: - raise RuntimeError("Backend process terminated during initialization") - continue - - # JSON解析を試みる + import os + env_timeout = os.getenv("VRCT_INIT_TIMEOUT") + if timeout is None: try: - response = json.loads(line.strip()) + timeout = float(env_timeout) if env_timeout else None + except ValueError: + timeout = None + + start_time = time.time() + last_progress_endpoint = None + last_progress_time_log = 0.0 + + while True: + # プロセス終了検知 + if self.process.poll() is not None: + raise RuntimeError("Backend process terminated during initialization") + + # soft timeout 警告表示 + if timeout is not None and (time.time() - start_time) > timeout: + # 一度だけ警告を出し timeout を解除(以降は継続待機) + print(f"{Color.YELLOW}[WARN]{Color.RESET} 初期化が {timeout:.1f} 秒を超過しました。ダウンロード等で長時間かかっています。引き続き待機します。環境変数 VRCT_INIT_TIMEOUT を調整できます。") + timeout = None # 解除 + + # 30秒ごとの進捗ログ + now = time.time() + if now - last_progress_time_log >= 30: + elapsed = now - start_time + ep_info = last_progress_endpoint or "(受信なし)" + print(f"{Color.CYAN}[進捗]{Color.RESET} 初期化経過 {elapsed:.1f} 秒 / 最終 endpoint: {ep_info}") + last_progress_time_log = now + + line = self.process.stdout.readline() + if not line: + # 何も来ていないがプロセスは動作中 -> 継続 + continue + + stripped = line.strip() + if not stripped: + continue + + # JSON解析試行 + try: + response = json.loads(stripped) endpoint = response.get("endpoint", "") status = response.get("status", 0) - - # 初期化メッセージを表示 + last_progress_endpoint = endpoint or last_progress_endpoint print(f"{Color.CYAN} [初期化中]{Color.RESET} {endpoint} (Status: {status})") - - # 初期化完了を検出 if endpoint == "/run/initialization_complete" and status == 200: - print(f"{Color.GREEN} [初期化完了]{Color.RESET}") + total_elapsed = time.time() - start_time + print(f"{Color.GREEN} [初期化完了]{Color.RESET} 経過 {total_elapsed:.1f} 秒") return - except json.JSONDecodeError: - # JSON以外の行(ログなど)を表示 - stripped = line.strip() - if stripped: - print(f"{Color.CYAN} [Backend]{Color.RESET} {stripped}") + # ログ行として扱う + print(f"{Color.CYAN} [Backend]{Color.RESET} {stripped}") continue def send_request(self, endpoint: str, data: Optional[Any] = None, timeout: float = 30.0) -> Dict[str, Any]: @@ -263,6 +292,492 @@ def run_example_tests(client: TestClient): print(f"{Color.BOLD}=== サンプルテスト終了 ==={Color.RESET}\n") +class AutomatedEndpointTester: + """backend_test.py のロジックを stdin/stdout 通信向けに移植した自動テストクラス""" + def __init__(self, client: TestClient): + self.client = client + # config 的な値をキャッシュ + self.cache: Dict[str, Any] = {} + # エンドポイント分類 (動的取得手段が無いため暫定的にハードコード) + self.validity_endpoints = [ + "/set/enable/translation", + "/set/disable/translation", + "/set/enable/transcription_send", + "/set/disable/transcription_send", + "/set/enable/transcription_receive", + "/set/disable/transcription_receive", + "/set/enable/websocket_server", + "/set/disable/websocket_server", + "/set/enable/convert_message_to_romaji", + "/set/disable/convert_message_to_romaji", + "/set/enable/convert_message_to_hiragana", + "/set/disable/convert_message_to_hiragana", + ] + self.set_data_endpoints = [ + "/set/data/selected_tab_no", + "/set/data/selected_translation_engines", + "/set/data/selected_your_languages", + "/set/data/selected_target_languages", + "/set/data/selected_transcription_engine", + "/set/data/transparency", + "/set/data/ui_scaling", + "/set/data/textbox_ui_scaling", + "/set/data/message_box_ratio", + "/set/data/send_message_button_type", + "/set/data/font_family", + "/set/data/ui_language", + "/set/data/main_window_geometry", + "/set/data/selected_translation_compute_device", + "/set/data/selected_transcription_compute_device", + "/set/data/ctranslate2_weight_type", + "/set/data/plamo_model", + "/set/data/plamo_auth_key", + "/set/data/gemini_model", + "/set/data/gemini_auth_key", + "/set/data/openai_model", + "/set/data/openai_auth_key", + "/set/data/lmstudio_model", + "/set/data/lmstudio_url", + "/set/data/ollama_model", + "/set/data/deepl_auth_key", + "/set/data/selected_mic_host", + "/set/data/selected_mic_device", + "/set/data/mic_threshold", + "/set/data/mic_record_timeout", + "/set/data/mic_phrase_timeout", + "/set/data/mic_max_phrases", + "/set/data/hotkeys", + "/set/data/plugins_status", + "/set/data/mic_avg_logprob", + "/set/data/mic_no_speech_prob", + "/set/data/mic_word_filter", + "/set/data/selected_speaker_device", + "/set/data/speaker_threshold", + "/set/data/speaker_record_timeout", + "/set/data/speaker_phrase_timeout", + "/set/data/speaker_max_phrases", + "/set/data/speaker_avg_logprob", + "/set/data/speaker_no_speech_prob", + "/set/data/whisper_weight_type", + "/set/data/overlay_small_log_settings", + "/set/data/overlay_large_log_settings", + "/set/data/send_message_format_parts", + "/set/data/received_message_format_parts", + "/set/data/websocket_host", + "/set/data/websocket_port", + "/set/data/osc_ip_address", + "/set/data/osc_port", + "/set/data/selected_translation_compute_type", + "/set/data/selected_transcription_compute_type", + ] + self.run_endpoints = [ + "/run/send_message_box", + "/run/typing_message_box", + "/run/stop_typing_message_box", + "/run/send_text_overlay", + "/run/swap_your_language_and_target_language", + "/run/update_software", + "/run/update_cuda_software", + "/run/download_ctranslate2_weight", + "/run/download_whisper_weight", + "/run/open_filepath_logs", + "/run/open_filepath_config_file", + "/run/feed_watchdog", + "/run/lmstudio_connection", + "/run/ollama_connection", + ] + self.delete_data_endpoints = [ + "/delete/data/deepl_auth_key", + ] + self.results: Dict[str, Dict[str, Any]] = {} + + # ---------------------------------- Utility ---------------------------------- + def _record(self, endpoint: str, status: Optional[int], result: Any, expected_status: list[int]): + self.results[endpoint] = { + "status": status, + "result": result, + "expected_status": expected_status, + "success": status in expected_status if status is not None else False + } + + def _get(self, endpoint: str) -> Any: + """/get/data/* の結果をキャッシュしつつ取得""" + resp = self.client.send_request(endpoint) + if resp.get("status") == 200: + self.cache[endpoint.split("/")[-1]] = resp.get("result") + return resp.get("result") + return None + + # ---------------------------------- Generators ---------------------------------- + def _gen_set_data(self, endpoint: str): + expected = [200] + data = None + # ほぼ backend_test.py のロジックを踏襲 + if endpoint == "/set/data/selected_tab_no": + data = sys.modules.get('__random_tab_choices', None) or None # placeholder for future dynamic + data = data or "1" + elif endpoint == "/set/data/selected_translation_engines": + engines = self._get("/get/data/translation_engines") or [] + data = {i: (engines and (engines[0] if len(engines) else None)) for i in ["1","2","3"]} + elif endpoint == "/set/data/selected_your_languages": + lang_list = self._get("/get/data/selectable_language_list") or [] + if lang_list: + choice = lang_list[0] + data = {i: {"1": {**choice, "enable": True}} for i in ["1","2","3"]} + elif endpoint == "/set/data/selected_target_languages": + lang_list = self._get("/get/data/selectable_language_list") or [] + if lang_list: + base = lang_list[0] + data = {i: {j: {**base, "enable": (j=="1")} for j in ["1","2","3"]} for i in ["1","2","3"]} + elif endpoint == "/set/data/selected_transcription_engine": + engines = self._get("/get/data/transcription_engines") or [] + data = engines[0] if engines else None + elif endpoint == "/set/data/transparency": + import random + data = random.randint(0,100) + elif endpoint == "/set/data/ui_scaling" or endpoint == "/set/data/textbox_ui_scaling": + import random + data = random.randint(50,200) + elif endpoint == "/set/data/message_box_ratio": + import random + data = round(random.uniform(0.1,0.9),2) + elif endpoint == "/set/data/send_message_button_type": + import random + data = random.choice(["show","hide","show_and_disable_enter_key"]) + elif endpoint == "/set/data/font_family": + import random + data = random.choice(["Arial","Verdana","Times New Roman"]) + elif endpoint == "/set/data/ui_language": + import random + data = random.choice(["en","ja","ko","zh-Hant","zh-Hans"]) + elif endpoint == "/set/data/main_window_geometry": + import random + data = { + "x_pos": random.randint(0,1920), + "y_pos": random.randint(0,1080), + "width": random.randint(800,1920), + "height": random.randint(600,1080) + } + elif endpoint == "/set/data/selected_translation_compute_device": + lst = self._get("/get/data/translation_compute_device_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/selected_transcription_compute_device": + lst = self._get("/get/data/transcription_compute_device_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/ctranslate2_weight_type": + dct = self._get("/get/data/selectable_ctranslate2_weight_type_dict") or {} + keys = list(dct.keys()) + import random + data = random.choice(keys) if keys else None + elif endpoint == "/set/data/plamo_model": + lst = self._get("/get/data/plamo_model_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/plamo_auth_key": + data = "PLAMO_DUMMY_KEY" + expected = [200,400] + elif endpoint == "/set/data/gemini_model": + lst = self._get("/get/data/gemini_model_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/gemini_auth_key": + data = "GEMINI_DUMMY_KEY" + expected = [200,400] + elif endpoint == "/set/data/openai_model": + lst = self._get("/get/data/openai_model_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/openai_auth_key": + data = "OPENAI_DUMMY_KEY" + expected = [200,400] + elif endpoint == "/set/data/lmstudio_model": + lst = self._get("/get/data/lmstudio_model_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/lmstudio_url": + import random + data = random.choice(["http://localhost:1234/v1","http://127.0.0.1:1234/v1","http://invalid_host:9999/v1"]) + expected=[200,400] + elif endpoint == "/set/data/ollama_model": + lst = self._get("/get/data/ollama_model_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/deepl_auth_key": + data = "DEEPL_DUMMY_KEY" + expected=[200,400] + elif endpoint == "/set/data/selected_mic_host": + lst = self._get("/get/data/mic_host_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/selected_mic_device": + lst = self._get("/get/data/mic_device_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/mic_threshold": + import random + val = random.randint(-1000,3000) + data = val + expected=[200] if 0 <= val <= 2000 else [400] + elif endpoint == "/set/data/mic_record_timeout": + import random + val = random.randint(-1,10) + phrase = self._get("/get/data/mic_phrase_timeout") + data = val + expected=[200] if (phrase is not None and 0 <= val <= phrase) else [400] + elif endpoint == "/set/data/mic_phrase_timeout": + import random + val = random.randint(-1,10) + record = self._get("/get/data/mic_record_timeout") + data = val + expected=[200] if (record is not None and record <= val) else [400] + elif endpoint == "/set/data/mic_max_phrases": + import random + val = random.randint(-1,10) + data = val + expected=[200] if val >= 0 else [400] + elif endpoint == "/set/data/hotkeys": + data = {'toggle_vrct_visibility': None,'toggle_translation': None,'toggle_transcription_send': None,'toggle_transcription_receive': None} + elif endpoint == "/set/data/plugins_status": + plugins = self._get("/get/data/plugins") or [] + import random + data = {p: random.choice([True,False]) for p in plugins} + elif endpoint == "/set/data/mic_avg_logprob": + import random + data = random.uniform(-5,0) + elif endpoint == "/set/data/mic_no_speech_prob": + import random + data = random.uniform(0,1) + elif endpoint == "/set/data/mic_word_filter": + import random + data = random.choice([["test_0_0","test_0_1","test_0_2",None],["test_1_0","test_1_1",None],["test_2_0",None],[None]]) + elif endpoint == "/set/data/selected_speaker_device": + lst = self._get("/get/data/speaker_device_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/speaker_threshold": + import random + val = random.randint(-1000,5000) + data = val + expected=[200] if 0 <= val <= 4000 else [400] + elif endpoint == "/set/data/speaker_record_timeout": + import random + val = random.randint(-1,10) + phrase = self._get("/get/data/speaker_phrase_timeout") + data = val + expected=[200] if (phrase is not None and 0 <= val <= phrase) else [400] + elif endpoint == "/set/data/speaker_phrase_timeout": + import random + val = random.randint(-1,10) + record = self._get("/get/data/speaker_record_timeout") + data = val + expected=[200] if (record is not None and record <= val) else [400] + elif endpoint == "/set/data/speaker_max_phrases": + import random + val = random.randint(-1,10) + data = val + expected=[200] if val >= 0 else [400] + elif endpoint == "/set/data/speaker_avg_logprob": + import random + data = random.uniform(-5,0) + elif endpoint == "/set/data/speaker_no_speech_prob": + import random + data = random.uniform(0,1) + elif endpoint == "/set/data/whisper_weight_type": + dct = self._get("/get/data/selectable_whisper_weight_type_dict") or {} + import random + keys=[k for k,v in dct.items() if v] + data = random.choice(keys) if keys else None + elif endpoint == "/set/data/overlay_small_log_settings" or endpoint == "/set/data/overlay_large_log_settings": + import random + data = { + "x_pos": random.random(), + "y_pos": random.random(), + "z_pos": random.random(), + "x_rotation": random.random(), + "y_rotation": random.random(), + "z_rotation": random.random(), + "display_duration": random.randint(0,100), + "fadeout_duration": random.randint(0,100), + "opacity": random.random(), + "ui_scaling": random.random(), + "tracker": random.choice(["HMD","LeftHand","RightHand"]) + } + elif endpoint == "/set/data/send_message_format_parts": + fmt = self._get("/get/data/send_message_format_parts") + data = fmt + elif endpoint == "/set/data/received_message_format_parts": + fmt = self._get("/get/data/received_message_format_parts") + data = fmt + elif endpoint == "/set/data/websocket_host": + import random + val = random.choice(["127.0.0.1","aaaaadwafasdsd","0210.1564.845.0"]) + data = val + expected = [200,400] if val=="127.0.0.1" else [400] + elif endpoint == "/set/data/websocket_port": + import random + data = random.randint(1024,65535) + expected=[200,400] + elif endpoint == "/set/data/osc_ip_address": + import random + val = random.choice(["127.0.0.1","aaaaadwafasdsd","0210.1564.845.0"]) + data = val + expected = [200] if val=="127.0.0.1" else [400] + elif endpoint == "/set/data/osc_port": + import random + data = random.randint(1024,65535) + elif endpoint == "/set/data/selected_translation_compute_type": + device = self.cache.get("selected_translation_compute_device") or self._get("/get/data/selected_translation_compute_device") + if device and isinstance(device, dict): + import random + data = random.choice(device.get("compute_types", [])) if device.get("compute_types") else None + elif endpoint == "/set/data/selected_transcription_compute_type": + device = self.cache.get("selected_transcription_compute_device") or self._get("/get/data/selected_transcription_compute_device") + if device and isinstance(device, dict): + import random + data = random.choice(device.get("compute_types", [])) if device.get("compute_types") else None + return data, expected + + def _gen_run_data(self, endpoint: str): + expected = [200] + data = None + import random + if endpoint == "/run/send_message_box": + choices=[{"data":{"id":"000001","message":"test"},"status":[200]}, + {"data":{"id":"000002","message":"Hello World!"},"status":[200]}, + {"data":{"id":"000003","message":"こんにちわ 世界!"},"status":[200]}, + {"data":{"id":"000004","message":"안녕하세요 세계!"},"status":[200]}, + {"data":{"id":"000005","message":"你好,世界!"},"status":[200]}] + choice = random.choice(choices) + data = choice["data"] + expected = choice["status"] + elif endpoint in ["/run/typing_message_box","/run/stop_typing_message_box","/run/send_text_overlay","/run/swap_your_language_and_target_language"]: + data = "test_overlay" if endpoint == "/run/send_text_overlay" else None + elif endpoint in ["/run/update_software","/run/update_cuda_software","/run/download_ctranslate2_weight","/run/download_whisper_weight","/run/open_filepath_logs","/run/open_filepath_config_file","/run/feed_watchdog"]: + expected=[401] + elif endpoint in ["/run/lmstudio_connection","/run/ollama_connection"]: + expected=[200,400] + return data, expected + + # ---------------------------------- Tests ---------------------------------- + def test_validity_single(self, endpoint: str): + expected=[200] + if endpoint == "/set/enable/websocket_server": + expected=[200,400] + resp = self.client.send_request(endpoint) + status = resp.get("status") + result = resp.get("result") + self._record(endpoint, status, result, expected) + ok = status in expected + tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}" + print(f"[Validity] {endpoint} -> {tag} ({status})") + return ok + + def test_set_data_single(self, endpoint: str): + data, expected = self._gen_set_data(endpoint) + if expected == [404]: + self._record(endpoint, None, None, expected) + print(f"[SetData] {endpoint} -> {Color.RED}UNKNOWN{Color.RESET}") + return False + if data is None: + self._record(endpoint, None, None, expected) + print(f"[SetData] {endpoint} -> {Color.YELLOW}SKIP(no data){Color.RESET}") + return True + resp = self.client.send_request(endpoint, data) + status = resp.get("status") + result = resp.get("result") + self._record(endpoint, status, result, expected) + ok = status in expected + tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}" + print(f"[SetData] {endpoint} -> {tag} ({status}) data={data}") + return ok + + def test_run_single(self, endpoint: str): + data, expected = self._gen_run_data(endpoint) + if expected == [401]: + self._record(endpoint, None, None, expected) + print(f"[Run] {endpoint} -> {Color.YELLOW}SKIP(401){Color.RESET}") + return True + resp = self.client.send_request(endpoint, data) + status = resp.get("status") + result = resp.get("result") + self._record(endpoint, status, result, expected) + ok = status in expected + tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}" + print(f"[Run] {endpoint} -> {tag} ({status})") + return ok + + def test_delete_single(self, endpoint: str): + expected=[200] + resp = self.client.send_request(endpoint) + status = resp.get("status") + result = resp.get("result") + self._record(endpoint, status, result, expected) + ok = status in expected + tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}" + print(f"[Delete] {endpoint} -> {tag} ({status})") + return ok + + def run_all(self): + print(f"{Color.BOLD}=== 有効/無効エンドポイントテスト ==={Color.RESET}") + for ep in self.validity_endpoints: + self.test_validity_single(ep) + print(f"{Color.BOLD}=== データ設定エンドポイントテスト ==={Color.RESET}") + for ep in self.set_data_endpoints: + self.test_set_data_single(ep) + print(f"{Color.BOLD}=== 実行エンドポイントテスト ==={Color.RESET}") + for ep in self.run_endpoints: + self.test_run_single(ep) + print(f"{Color.BOLD}=== 削除エンドポイントテスト ==={Color.RESET}") + for ep in self.delete_data_endpoints: + self.test_delete_single(ep) + + def run_random(self, iterations: int = 500): + import random + print(f"{Color.BOLD}=== ランダムアクセステスト(iter={iterations}) ==={Color.RESET}") + groups = ["validity","set","run","delete"] + for i in range(iterations): + g = random.choice(groups) + if g == "validity": + ep = random.choice(self.validity_endpoints) + self.test_validity_single(ep) + elif g == "set": + ep = random.choice(self.set_data_endpoints) + self.test_set_data_single(ep) + elif g == "run": + ep = random.choice(self.run_endpoints) + self.test_run_single(ep) + else: + ep = random.choice(self.delete_data_endpoints) + self.test_delete_single(ep) + # 最後に disable 系を OFF + for ep in self.validity_endpoints: + if ep.startswith("/set/disable/"): + self.client.send_request(ep) + + def run_specific_random(self, iterations: int = 200): + import random + print(f"{Color.BOLD}=== 特定(osc/websocket)ランダムテスト(iter={iterations}) ==={Color.RESET}") + set_specific = ["/set/data/osc_ip_address","/set/data/osc_port","/set/data/websocket_host","/set/data/websocket_port"] + for i in range(iterations): + ep = random.choice(set_specific) + self.test_set_data_single(ep) + + def summary(self): + total = len(self.results) + passed = sum(1 for r in self.results.values() if r["success"]) + skipped = sum(1 for r in self.results.values() if r["expected_status"] == [401]) + failed = total - passed - skipped + print(f"\n{Color.BOLD}==== テストサマリー ==== {Color.RESET}") + print(f"総数: {total} / 成功: {passed} / 失敗: {failed} / スキップ(401): {skipped}") + if failed: + print(f"{Color.RED}失敗詳細:{Color.RESET}") + for ep, r in self.results.items(): + if not r["success"] and r["expected_status"] != [401]: + print(f"- {ep} status={r['status']} expected={r['expected_status']} result={r['result']}") + print(f"{Color.BOLD}======================={Color.RESET}\n") + def run_interactive_mode(client: TestClient): """対話モードでテストを実行""" print(f"{Color.BOLD}=== 対話モード ==={Color.RESET}") @@ -316,7 +831,10 @@ def main(): print("モードを選択してください:") print("1. サンプルテストを実行") print("2. 対話モードで実行") - mode = input(f"{Color.CYAN}選択 (1 or 2): {Color.RESET}").strip() + print("3. 自動テスト(全)を実行") + print("4. ランダムアクセステスト") + print("5. 特定(osc/websocket)ランダムテスト") + mode = input(f"{Color.CYAN}選択 (1-5): {Color.RESET}").strip() print() @@ -324,6 +842,18 @@ def main(): run_example_tests(client) elif mode == "2": run_interactive_mode(client) + elif mode == "3": + tester = AutomatedEndpointTester(client) + tester.run_all() + tester.summary() + elif mode == "4": + tester = AutomatedEndpointTester(client) + tester.run_random() + tester.summary() + elif mode == "5": + tester = AutomatedEndpointTester(client) + tester.run_specific_random() + tester.summary() else: print(f"{Color.YELLOW}無効な選択です。対話モードを開始します。{Color.RESET}\n") run_interactive_mode(client)