diff --git a/src-python/backend_test.py b/src-python/backend_test.py index c43a622d..ffa043e2 100644 --- a/src-python/backend_test.py +++ b/src-python/backend_test.py @@ -180,17 +180,21 @@ class TestMainloop(): case "/set/data/selected_tab_no": data = random.choice(["1", "2", "3"]) case "/set/data/selected_translation_engines": + print("Fetching endpoint data for translation_engines...") + self.config_dict["translation_engines"], _ = self.main.handleRequest("/get/data/translation_engines", None) translation_engines = self.config_dict.get("translation_engines", None) data = {} for i in ["1", "2", "3"]: data[i] = random.choice(translation_engines) case "/set/data/selected_your_languages": + self.config_dict["selectable_language_list"], _ = self.main.handleRequest("/get/data/selectable_language_list", None) selectable_language_list = self.config_dict.get("selectable_language_list", None) data = {} for i in ["1", "2", "3"]: data[i] = {} data[i]["1"] = random.choice(selectable_language_list) | {"enable": True} case "/set/data/selected_target_languages": + self.config_dict["selectable_language_list"], _ = self.main.handleRequest("/get/data/selectable_language_list", None) selectable_language_list = self.config_dict.get("selectable_language_list", None) data = {} for i in ["1", "2", "3"]: @@ -198,6 +202,7 @@ class TestMainloop(): for j in ["1", "2", "3"]: data[i][j] = random.choice(selectable_language_list) | {"enable": random.choice([True, False])} case "/set/data/selected_transcription_engine": + self.config_dict["transcription_engines"], _ = self.main.handleRequest("/get/data/transcription_engines", None) transcription_engines = self.config_dict.get("transcription_engines", None) data = random.choice(transcription_engines) case "/set/data/transparency": @@ -222,11 +227,17 @@ class TestMainloop(): "height": random.randint(600, 1080) } case "/set/data/selected_translation_compute_device": - data = random.choice(self.config_dict["translation_compute_device_list"]) + self.config_dict["translation_compute_device_list"], _ = self.main.handleRequest("/get/data/translation_compute_device_list", None) + translation_compute_device_list = self.config_dict.get("translation_compute_device_list", None) + data = random.choice(translation_compute_device_list) case "/set/data/selected_transcription_compute_device": - data = random.choice(self.config_dict["transcription_compute_device_list"]) + self.config_dict["transcription_compute_device_list"], _ = self.main.handleRequest("/get/data/transcription_compute_device_list", None) + transcription_compute_device_list = self.config_dict.get("transcription_compute_device_list", None) + data = random.choice(transcription_compute_device_list) case "/set/data/ctranslate2_weight_type": - data = random.choice(list(self.config_dict["selectable_ctranslate2_weight_type_dict"].keys())) + self.config_dict["selectable_ctranslate2_weight_type_dict"], _ = self.main.handleRequest("/get/data/selectable_ctranslate2_weight_type_dict", None) + selectable_ctranslate2_weight_type_dict = self.config_dict.get("selectable_ctranslate2_weight_type_dict", None) + data = random.choice(list(selectable_ctranslate2_weight_type_dict.keys())) # LLM / API Clients case "/set/data/plamo_model": # 事前にモデルリストを取得 @@ -279,9 +290,13 @@ class TestMainloop(): data = "OPENAI_DUMMY_KEY" expected_status = [200, 400] case "/set/data/selected_mic_host": - data = random.choice(self.config_dict["mic_host_list"]) + self.config_dict["mic_host_list"], _ = self.main.handleRequest("/get/data/mic_host_list", None) + mic_host_list = self.config_dict.get("mic_host_list", None) + data = random.choice(mic_host_list) case "/set/data/selected_mic_device": - data = random.choice(self.config_dict["mic_device_list"]) + self.config_dict["mic_device_list"], _ = self.main.handleRequest("/get/data/mic_device_list", None) + mic_device_list = self.config_dict.get("mic_device_list", None) + data = random.choice(mic_device_list) case "/set/data/mic_threshold": data = random.randint(-1000, 3000) if 0 <= data <= 2000: @@ -290,13 +305,17 @@ class TestMainloop(): expected_status = [400] case "/set/data/mic_record_timeout": data = random.randint(-1, 10) - if 0 <= data <= self.config_dict["mic_phrase_timeout"]: + self.config_dict["mic_phrase_timeout"], _ = self.main.handleRequest("/get/data/mic_phrase_timeout", None) + mic_phrase_timeout = self.config_dict.get("mic_phrase_timeout", None) + if 0 <= data <= mic_phrase_timeout: pass else: expected_status = [400] case "/set/data/mic_phrase_timeout": data = random.randint(-1, 10) - if self.config_dict["mic_record_timeout"] <= data: + self.config_dict["mic_record_timeout"], _ = self.main.handleRequest("/get/data/mic_record_timeout", None) + mic_record_timeout = self.config_dict.get("mic_record_timeout", None) + if mic_record_timeout <= data: pass else: expected_status = [400] @@ -329,7 +348,9 @@ class TestMainloop(): ] ) case "/set/data/selected_speaker_device": - data = random.choice(self.config_dict["speaker_device_list"]) + self.config_dict["speaker_device_list"], _ = self.main.handleRequest("/get/data/speaker_device_list", None) + speaker_device_list = self.config_dict.get("speaker_device_list", None) + data = random.choice(speaker_device_list) case "/set/data/speaker_threshold": data = random.randint(-1000, 5000) if 0 <= data <= 4000: @@ -338,13 +359,17 @@ class TestMainloop(): expected_status = [400] case "/set/data/speaker_record_timeout": data = random.randint(-1, 10) - if 0 <= data <= self.config_dict["speaker_phrase_timeout"]: + self.config_dict["speaker_phrase_timeout"], _ = self.main.handleRequest("/get/data/speaker_phrase_timeout", None) + speaker_phrase_timeout = self.config_dict.get("speaker_phrase_timeout", None) + if 0 <= data <= speaker_phrase_timeout: pass else: expected_status = [400] case "/set/data/speaker_phrase_timeout": data = random.randint(-1, 10) - if self.config_dict["speaker_record_timeout"] <= data: + self.config_dict["speaker_record_timeout"], _ = self.main.handleRequest("/get/data/speaker_record_timeout", None) + speaker_record_timeout = self.config_dict.get("speaker_record_timeout", None) + if speaker_record_timeout <= data: pass else: expected_status = [400] @@ -359,7 +384,9 @@ class TestMainloop(): case "/set/data/speaker_no_speech_prob": data = random.uniform(0, 1) case "/set/data/whisper_weight_type": - data = random.choice([key for key, value in self.config_dict["selectable_whisper_weight_type_dict"].items() if value is True]) + self.config_dict["selectable_whisper_weight_type_dict"], _ = self.main.handleRequest("/get/data/selectable_whisper_weight_type_dict", None) + selectable_whisper_weight_type_dict = self.config_dict.get("selectable_whisper_weight_type_dict", None) + data = random.choice([key for key, value in selectable_whisper_weight_type_dict.items() if value is True]) case "/set/data/overlay_small_log_settings": data = { "x_pos": random.random(), @@ -389,9 +416,13 @@ class TestMainloop(): "tracker": random.choice(["HMD", "LeftHand", "RightHand"]), } case "/set/data/send_message_format_parts": - data = self.config_dict["send_message_format_parts"] + self.config_dict["send_message_format_parts"], _ = self.main.handleRequest("/get/data/send_message_format_parts", None) + send_message_format_parts = self.config_dict.get("send_message_format_parts", None) + data = send_message_format_parts case "/set/data/received_message_format_parts": - data = self.config_dict["received_message_format_parts"] + self.config_dict["received_message_format_parts"], _ = self.main.handleRequest("/get/data/received_message_format_parts", None) + received_message_format_parts = self.config_dict.get("received_message_format_parts", None) + data = received_message_format_parts case "/set/data/websocket_host": data = random.choice(["127.0.0.1", "aaaaadwafasdsd", "0210.1564.845.0"]) if data == "127.0.0.1": @@ -449,6 +480,8 @@ class TestMainloop(): print(f" Current config_dict: {self.config_dict}") else: print(f"-> {Color.YELLOW}[SKIP]{Color.RESET} No data to set for this endpoint: {endpoint}.") + status = None + result = None success = True self.record_test_result(endpoint, status, result if data is not None else None, expected_status) # テスト結果を記録 return success @@ -614,9 +647,9 @@ class TestMainloop(): ] self.set_data_specific_endpoints = [ - "/set/data/ctranslate2_weight_type", - "/set/data/websocket_host", - "/set/data/websocket_port", + # "/set/data/ctranslate2_weight_type", + # "/set/data/websocket_host", + # "/set/data/websocket_port", "/set/data/osc_ip_address", "/set/data/osc_port", ] @@ -625,7 +658,7 @@ class TestMainloop(): self.delete_data_endpoints = [] endpoint_types = [ - "validity", + # "validity", "set_data", # "run", # "delete", @@ -807,9 +840,9 @@ if __name__ == "__main__": # test.test_run_endpoints_all() # test.test_delete_data_endpoints_all() # test.test_endpoints_all_random() - test.test_endpoints_on_off_continuous() + # test.test_endpoints_on_off_continuous() # test.test_endpoints_on_off_random() - # test.test_endpoints_specific_random() + test.test_endpoints_specific_random() # test.test_translate_all_language_pairs() test.generate_summary() except KeyboardInterrupt: diff --git a/src-python/config.py b/src-python/config.py index 3444a4a3..24dd42e1 100644 --- a/src-python/config.py +++ b/src-python/config.py @@ -63,23 +63,201 @@ def _auto_register_descriptors(): make_serializer(name) +# Wrapper classes for mutable types that auto-save on modification +class ManagedDict(dict): + """Dict wrapper that saves changes back to config.""" + def __init__(self, instance, property_name, immediate_save): + self._instance = instance + self._property_name = property_name + self._immediate_save = immediate_save + self._internal_name = f"_{property_name}" + # Initialize from internal storage + super().__init__(getattr(instance, self._internal_name)) + + def _get_internal(self): + """Get reference to internal storage.""" + return getattr(self._instance, self._internal_name) + + def _save(self): + """Save current state back to config and sync internal storage.""" + try: + # Update internal storage directly + internal_dict = self._get_internal() + internal_dict.clear() + internal_dict.update(dict.items(self)) + # Trigger config save + self._instance.saveConfig(self._property_name, dict(self), immediate_save=self._immediate_save) + except Exception: + pass + + def __getitem__(self, key): + # Always read from internal storage to get latest value + return self._get_internal()[key] + + def __setitem__(self, key, value): + super().__setitem__(key, value) + self._save() + + def __delitem__(self, key): + super().__delitem__(key) + self._save() + + def __contains__(self, key): + return key in self._get_internal() + + def get(self, key, default=None): + return self._get_internal().get(key, default) + + def keys(self): + return self._get_internal().keys() + + def values(self): + return self._get_internal().values() + + def items(self): + return self._get_internal().items() + + def update(self, *args, **kwargs): + super().update(*args, **kwargs) + self._save() + + def pop(self, *args): + result = super().pop(*args) + self._save() + return result + + def popitem(self): + result = super().popitem() + self._save() + return result + + def clear(self): + super().clear() + self._save() + + def setdefault(self, key, default=None): + result = super().setdefault(key, default) + self._save() + return result + + +class ManagedList(list): + """List wrapper that saves changes back to config.""" + def __init__(self, instance, property_name, immediate_save): + self._instance = instance + self._property_name = property_name + self._immediate_save = immediate_save + self._internal_name = f"_{property_name}" + # Initialize from internal storage + super().__init__(getattr(instance, self._internal_name)) + + def _get_internal(self): + """Get reference to internal storage.""" + return getattr(self._instance, self._internal_name) + + def _save(self): + """Save current state back to config and sync internal storage.""" + try: + # Update internal storage directly + internal_list = self._get_internal() + internal_list.clear() + internal_list.extend(list.__iter__(self)) + # Trigger config save + self._instance.saveConfig(self._property_name, list(self), immediate_save=self._immediate_save) + except Exception: + pass + + def __getitem__(self, index): + # Always read from internal storage to get latest value + return self._get_internal()[index] + + def __setitem__(self, index, value): + super().__setitem__(index, value) + self._save() + + def __delitem__(self, index): + super().__delitem__(index) + self._save() + + def __len__(self): + return len(self._get_internal()) + + def __contains__(self, value): + return value in self._get_internal() + + def __iter__(self): + return iter(self._get_internal()) + + def append(self, value): + super().append(value) + self._save() + + def extend(self, iterable): + super().extend(iterable) + self._save() + + def insert(self, index, value): + super().insert(index, value) + self._save() + + def remove(self, value): + super().remove(value) + self._save() + + def pop(self, index=-1): + result = super().pop(index) + self._save() + return result + + def clear(self): + super().clear() + self._save() + + def sort(self, *args, **kwargs): + super().sort(*args, **kwargs) + self._save() + + def reverse(self): + super().reverse() + self._save() + + # Descriptor for simple managed config properties to reduce repetitive getters/setters. # It performs optional type validation, optional allowed-values check, and calls # instance.saveConfig(...) on successful set. class ManagedProperty: - def __init__(self, name: str, type_: type = None, allowed=None, immediate_save: bool = False, serialize: bool = True, readonly: bool = False): + def __init__(self, name: str, type_: type = None, allowed=None, immediate_save: bool = False, serialize: bool = True, readonly: bool = False, mutable_tracking: bool = False): self.name = name self.type_ = type_ self.allowed = allowed self.immediate_save = immediate_save self.serialize = serialize self.readonly = readonly + self.mutable_tracking = mutable_tracking self.private_name = f"_{name}" + self.wrapper_cache_name = f"_wrapper_{name}" def __get__(self, instance, owner): if instance is None: return self stored = getattr(instance, self.private_name) + + # If mutable_tracking is enabled, return cached wrapper or create new one + if self.mutable_tracking and isinstance(stored, dict): + wrapper = getattr(instance, self.wrapper_cache_name, None) + if wrapper is None or not isinstance(wrapper, ManagedDict): + wrapper = ManagedDict(instance, self.name, self.immediate_save) + setattr(instance, self.wrapper_cache_name, wrapper) + # Wrapper automatically syncs with internal storage on access + return wrapper + elif self.mutable_tracking and isinstance(stored, list): + wrapper = getattr(instance, self.wrapper_cache_name, None) + if wrapper is None or not isinstance(wrapper, ManagedList): + wrapper = ManagedList(instance, self.name, self.immediate_save) + setattr(instance, self.wrapper_cache_name, wrapper) + # Wrapper automatically syncs with internal storage on access + return wrapper + # Return deep copy for mutable types to prevent external modification if isinstance(stored, (dict, list)): return copy.deepcopy(stored) @@ -426,15 +604,15 @@ class Config: # --- Selectable dict/list properties (managed by descriptor, not serialized) --- # These are dynamically generated in init_config() based on installed packages/APIs - SELECTABLE_CTRANSLATE2_WEIGHT_TYPE_DICT = ManagedProperty('SELECTABLE_CTRANSLATE2_WEIGHT_TYPE_DICT', type_=dict, serialize=False) - SELECTABLE_WHISPER_WEIGHT_TYPE_DICT = ManagedProperty('SELECTABLE_WHISPER_WEIGHT_TYPE_DICT', type_=dict, serialize=False) - SELECTABLE_TRANSLATION_ENGINE_STATUS = ManagedProperty('SELECTABLE_TRANSLATION_ENGINE_STATUS', type_=dict, serialize=False) - SELECTABLE_TRANSCRIPTION_ENGINE_STATUS = ManagedProperty('SELECTABLE_TRANSCRIPTION_ENGINE_STATUS', type_=dict, serialize=False) - SELECTABLE_PLAMO_MODEL_LIST = ManagedProperty('SELECTABLE_PLAMO_MODEL_LIST', type_=list, serialize=False) - SELECTABLE_GEMINI_MODEL_LIST = ManagedProperty('SELECTABLE_GEMINI_MODEL_LIST', type_=list, serialize=False) - SELECTABLE_OPENAI_MODEL_LIST = ManagedProperty('SELECTABLE_OPENAI_MODEL_LIST', type_=list, serialize=False) - SELECTABLE_LMSTUDIO_MODEL_LIST = ManagedProperty('SELECTABLE_LMSTUDIO_MODEL_LIST', type_=list, serialize=False) - SELECTABLE_OLLAMA_MODEL_LIST = ManagedProperty('SELECTABLE_OLLAMA_MODEL_LIST', type_=list, serialize=False) + SELECTABLE_CTRANSLATE2_WEIGHT_TYPE_DICT = ManagedProperty('SELECTABLE_CTRANSLATE2_WEIGHT_TYPE_DICT', type_=dict, serialize=False, mutable_tracking=True) + SELECTABLE_WHISPER_WEIGHT_TYPE_DICT = ManagedProperty('SELECTABLE_WHISPER_WEIGHT_TYPE_DICT', type_=dict, serialize=False, mutable_tracking=True) + SELECTABLE_TRANSLATION_ENGINE_STATUS = ManagedProperty('SELECTABLE_TRANSLATION_ENGINE_STATUS', type_=dict, serialize=False, mutable_tracking=True) + SELECTABLE_TRANSCRIPTION_ENGINE_STATUS = ManagedProperty('SELECTABLE_TRANSCRIPTION_ENGINE_STATUS', type_=dict, serialize=False, mutable_tracking=True) + SELECTABLE_PLAMO_MODEL_LIST = ManagedProperty('SELECTABLE_PLAMO_MODEL_LIST', type_=list, serialize=False, mutable_tracking=True) + SELECTABLE_GEMINI_MODEL_LIST = ManagedProperty('SELECTABLE_GEMINI_MODEL_LIST', type_=list, serialize=False, mutable_tracking=True) + SELECTABLE_OPENAI_MODEL_LIST = ManagedProperty('SELECTABLE_OPENAI_MODEL_LIST', type_=list, serialize=False, mutable_tracking=True) + SELECTABLE_LMSTUDIO_MODEL_LIST = ManagedProperty('SELECTABLE_LMSTUDIO_MODEL_LIST', type_=list, serialize=False, mutable_tracking=True) + SELECTABLE_OLLAMA_MODEL_LIST = ManagedProperty('SELECTABLE_OLLAMA_MODEL_LIST', type_=list, serialize=False, mutable_tracking=True) # --- Save Json Data (ManagedProperty-based) --- # More simple boolean flags replaced with ManagedProperty diff --git a/src-python/controller.py b/src-python/controller.py index 60764865..02c42e6b 100644 --- a/src-python/controller.py +++ b/src-python/controller.py @@ -201,9 +201,7 @@ class Controller: def downloaded(self) -> None: if model.checkTranslatorCTranslate2ModelWeight(self.weight_type) is True: - weight_type_dict = config.SELECTABLE_CTRANSLATE2_WEIGHT_TYPE_DICT - weight_type_dict[self.weight_type] = True - config.SELECTABLE_CTRANSLATE2_WEIGHT_TYPE_DICT = weight_type_dict + config.SELECTABLE_CTRANSLATE2_WEIGHT_TYPE_DICT[self.weight_type] = True self.run( 200, @@ -236,9 +234,7 @@ class Controller: def downloaded(self) -> None: if model.checkTranscriptionWhisperModelWeight(self.weight_type) is True: - weight_type_dict = config.SELECTABLE_WHISPER_WEIGHT_TYPE_DICT - weight_type_dict[self.weight_type] = True - config.SELECTABLE_WHISPER_WEIGHT_TYPE_DICT = weight_type_dict + config.SELECTABLE_WHISPER_WEIGHT_TYPE_DICT[self.weight_type] = True self.run( 200, @@ -2649,10 +2645,8 @@ class Controller: return cleaned_text def updateDownloadedCTranslate2ModelWeight(self) -> None: - weight_type_dict = config.SELECTABLE_CTRANSLATE2_WEIGHT_TYPE_DICT - for weight_type in weight_type_dict.keys(): - weight_type_dict[weight_type] = model.checkTranslatorCTranslate2ModelWeight(weight_type) - config.SELECTABLE_CTRANSLATE2_WEIGHT_TYPE_DICT = weight_type_dict + for weight_type in config.SELECTABLE_CTRANSLATE2_WEIGHT_TYPE_DICT.keys(): + config.SELECTABLE_CTRANSLATE2_WEIGHT_TYPE_DICT[weight_type] = model.checkTranslatorCTranslate2ModelWeight(weight_type) def updateTranslationEngineAndEngineList(self): engines = config.SELECTED_TRANSLATION_ENGINES @@ -2674,10 +2668,8 @@ class Controller: self.run(200, self.run_mapping["translation_engines"], selectable_engines) def updateDownloadedWhisperModelWeight(self) -> None: - weight_type_dict = config.SELECTABLE_WHISPER_WEIGHT_TYPE_DICT - for weight_type in weight_type_dict.keys(): - weight_type_dict[weight_type] = model.checkTranscriptionWhisperModelWeight(weight_type) - config.SELECTABLE_WHISPER_WEIGHT_TYPE_DICT = weight_type_dict + for weight_type in config.SELECTABLE_WHISPER_WEIGHT_TYPE_DICT.keys(): + config.SELECTABLE_WHISPER_WEIGHT_TYPE_DICT[weight_type] = model.checkTranscriptionWhisperModelWeight(weight_type) def updateTranscriptionEngine(self): weight_type = config.WHISPER_WEIGHT_TYPE diff --git a/src-python/docs/details/backend_test.md b/src-python/docs/details/backend_test.md index af0413c0..9cd03d81 100644 --- a/src-python/docs/details/backend_test.md +++ b/src-python/docs/details/backend_test.md @@ -3,13 +3,26 @@ ## 概要 VRCTアプリケーションのAPIエンドポイントを包括的にテストするためのモジュールです。メインループの各種機能をランダムアクセスでテストし、システムの安定性と堅牢性を検証します。 +### 2025-10 更新点(commit a54538e 反映) + +動的値が必要な `/set/data/*` エンドポイントに対し、事前に対応する `/get/data/...` エンドポイントを呼び出して最新値を取得し、`self.config_dict` にキャッシュしてからランダム選択する方式に変更しました。これにより以下が改善されています: + +- 翻訳/転写エンジン選択時の存在しないキー送信防止 +- 計算デバイス / 重みタイプ選択の安定化(リスト更新後に選択) +- マイク/スピーカー関連タイムアウト値の正常範囲チェック(取得した最新値を基準にバリデーション) +- Whisper / CTranslate2 重みタイプ辞書のキー集合変化への追従 + +例: `"/set/data/selected_translation_engines"` 試験前に `"/get/data/translation_engines"` を呼び、取得したリストから `random.choice()`。従来の初期キャッシュ依存から、実行時取得へ移行。 + ## 主要機能 ### Color クラス + - ANSIエスケープシーケンスを使用したコンソール出力色彩管理 - テスト結果の視覚的表示(成功・失敗・スキップ等) ### TestMainloop クラス + - APIエンドポイントの包括的テスト実行 - ランダムアクセステスト - テスト結果の記録・分析 @@ -18,23 +31,27 @@ VRCTアプリケーションのAPIエンドポイントを包括的にテスト ## 主要メソッド ### テスト実行メソッド + - `test_endpoints_on_off_all()`: ON/OFF系エンドポイントの全テスト - `test_set_data_endpoints_all()`: データ設定系エンドポイントの全テスト - `test_run_endpoints_all()`: 実行系エンドポイントの全テスト - `test_endpoints_all_random()`: 全エンドポイントのランダムアクセステスト ### 特定機能テスト + - `test_translate_all_language_pairs()`: 全言語ペアでの翻訳テスト - `test_endpoints_on_off_continuous()`: ON/OFF連続切り替えテスト - `test_endpoints_specific_random()`: 特定エンドポイントのランダムテスト ### 結果分析 + - `generate_summary()`: テスト結果のサマリー生成 - `record_test_result()`: テスト結果の記録 ## 使用方法 ### 基本的な使い方 + ```python # テストインスタンスを作成 test = TestMainloop() @@ -49,6 +66,7 @@ test.generate_summary() ``` ### ランダムテストの実行 + ```python # 全エンドポイントのランダムアクセステスト test.test_endpoints_all_random() @@ -58,6 +76,7 @@ test.test_endpoints_specific_random() ``` ## 依存関係 + - `mainloop`: VRCTメインループモジュール - `random`: ランダムテストデータ生成 - `time`: テスト間隔制御 @@ -65,31 +84,48 @@ test.test_endpoints_specific_random() ## テスト対象エンドポイント ### 制御系 + - `/set/enable/*`: 機能有効化 - `/set/disable/*`: 機能無効化 ### データ設定系 + - `/set/data/*`: 各種設定データの更新 +動的取得対象(代表例): + +- `selected_translation_engines` → `/get/data/translation_engines` +- `selected_transcription_engine` → `/get/data/transcription_engines` +- `selected_translation_compute_device` → `/get/data/translation_compute_device_list` +- `ctranslate2_weight_type` → `/get/data/selectable_ctranslate2_weight_type_dict` +- `whisper_weight_type` → `/get/data/selectable_whisper_weight_type_dict` +- `selected_mic_host` / `selected_mic_device` → `/get/data/mic_host_list` / `/get/data/mic_device_list` +- `selected_speaker_device` → `/get/data/speaker_device_list` + ### 実行系 + - `/run/*`: 各種機能の実行 ### データ削除系 + - `/delete/data/*`: データの削除 ## 注意事項 + - テスト実行前に`config.json`を削除して初期化 - 重いAIモデルを使用するテストは実行時間に注意 - ランダムテストは指定回数(デフォルト1000-10000回)実行される - テスト終了時は自動的にすべての機能を無効化する ## エラーハンドリング + - 各テストは独立して実行され、一つの失敗が全体に影響しない - 期待されるステータスコードと実際の結果を比較 - VRAM不足等のリソースエラーも適切にハンドリング ## テスト結果の分類 + - **PASS**: 期待されるステータスコードと一致 - **ERROR**: 期待されるステータスコードと不一致 - **SKIP**: テスト実行不可(401ステータス) -- **Invalid**: 無効なエンドポイント(404ステータス) \ No newline at end of file +- **Invalid**: 無効なエンドポイント(404ステータス) diff --git a/src-python/docs/mainloop.md b/src-python/docs/mainloop.md index 4536d49e..8eb3691b 100644 --- a/src-python/docs/mainloop.md +++ b/src-python/docs/mainloop.md @@ -9,9 +9,11 @@ ### 1. グローバル変数 #### `run_mapping` (dict) + フロントエンドへの通知用エンドポイントマッピング。Controllerが `run()` コールバックを通じてフロントエンドに状態変化を通知する際に使用。 **主要なエンドポイント:** + - `/run/enable_translation` - 翻訳機能の有効/無効状態 - `/run/transcription_mic_message` - マイク音声認識結果 - `/run/transcription_speaker_message` - スピーカー音声認識結果 @@ -19,11 +21,14 @@ - `/run/initialization_complete` - 初期化完了通知 #### `mapping` (dict) + フロントエンドからのリクエストを処理する関数マッピング。各エンドポイントに対して: + - `status`: ロック状態(True: 処理可能, False: ロック中) - `variable`: 実行する Controller メソッド **エンドポイント分類:** + - `/get/data/*` - 設定値の取得(初期化時に使用) - `/set/data/*` - 設定値の更新 - `/set/enable/*` - 機能の有効化 @@ -31,6 +36,7 @@ - `/run/*` - アクション実行(メッセージ送信、ダウンロード等) #### `init_mapping` (dict) + 初期化時に実行される `/get/data/*` エンドポイントのサブセット。アプリケーション起動時に全設定値をフロントエンドに送信するために使用。 ### 2. Mainクラス @@ -38,11 +44,13 @@ #### コンストラクタ `__init__(controller_instance, mapping_data, worker_count)` **パラメータ:** + - `controller_instance`: Controller インスタンス - `mapping_data`: エンドポイントマッピング辞書 - `worker_count`: ハンドラワーカースレッド数(デフォルト: 3) **初期化処理:** + 1. リクエストキュー (`Queue[Tuple[str, Any]]`) の作成 2. 停止イベント (`Event`) の作成 3. エンドポイント別 Lock の生成: @@ -50,6 +58,7 @@ - 同一機能の有効化/無効化リクエストが競合しないよう排他制御 **正規化ロジックの例:** + ```python "/set/enable/translation" → "/lock/set/translation" "/set/disable/translation" → "/lock/set/translation" @@ -61,6 +70,7 @@ **責務:** stdin から JSON リクエストを読み取り、キューに投入 **処理フロー:** + 1. `sys.stdin.readline()` でブロッキング読み取り 2. JSON パース (`json.loads()`) 3. エンドポイントとデータを抽出 @@ -69,6 +79,7 @@ 6. キューに投入 `self.queue.put((endpoint, data))` **エラー処理:** + - JSON パースエラー: ログ出力して継続 - EOF 到達: 0.1秒待機して再試行 - その他の例外: `errorLogging()` でトレースバック記録 @@ -80,6 +91,7 @@ **責務:** キューからリクエストを取り出し、適切なロックを取得して処理 **処理フロー:** + 1. キューから `(endpoint, data)` を取得(0.5秒タイムアウト) 2. エンドポイントを正規化キーに変換 3. 対応する Lock を取得試行(非ブロッキング) @@ -89,10 +101,12 @@ 5. レスポンスを stdout に出力 (`printResponse()`) **排他制御の意義:** + - 例: 翻訳機能の有効化中に無効化リクエストが来た場合、無効化は待機 - 異なる機能のリクエストは並列実行可能 **再キューロジック:** + - status == 423 (Locked): 0.1秒待機して再キュー - これにより、初期化中の設定変更リクエストが適切にリトライされる @@ -103,6 +117,7 @@ **責務:** 実際のビジネスロジック実行 **処理フロー:** + 1. `mapping` から対応するハンドラを取得 2. エンドポイントが存在しない → status 404 3. ハンドラの `status` が False → status 423 (Locked) @@ -111,15 +126,33 @@ 6. status と result を抽出して返却 **エラー処理:** + - 例外発生時: `errorLogging()` でトレースバック記録、status 500 を返却 -#### `start()` / `stop(wait)` メソッド +#### `start()` / `stop(wait)` メソッド(2025-10 仕様) **start():** -- `startReceiver()` - stdin 読み取りスレッド起動 -- `startHandler()` - ハンドラワーカースレッド起動 +コミット `5ce281e` によりシンプルな「メインループ維持」機能のみになりました。`startReceiver()` / `startHandler()` は `start()` 内では呼ばれません。呼び出し側(`__main__` ブロックや外部プロセスマネージャ)で必要なスレッドを起動した後、`start()` を呼び出して以下の挙動を提供します。 + +```python +def start(self) -> None: + try: + while not self._stop_event.is_set(): + time.sleep(1) + except KeyboardInterrupt: + self.stop() +``` + +ポイント: + +- メインスレッドを 1 秒スリープしつつ存続させ、Ctrl+C(KeyboardInterrupt)で安全に停止へ遷移。 +- 既存の receiver / handler スレッド二重起動のリスクを除去。 +- ライフサイクル明確化: スレッド起動責務を `start()` から分離しテスト・拡張容易性向上。 + +従来仕様ドキュメントの「`start()` がスレッドを直接起動する」記述は廃止済みであり、最新コードでは不要です。 **stop(wait):** + - `_stop_event.set()` - 全スレッドに停止シグナル送信 - 各スレッドを `join(timeout=remaining)` で待機(最大 `wait` 秒) @@ -131,16 +164,22 @@ 2. `startReceiver()` - stdin リスニング開始 3. `startHandler()` - リクエスト処理開始 4. **Watchdog 設定:** - - `controller.setWatchdogCallback(main_instance.stop)` + - `controller.setWatchdogCallback(main_instance.stop)` - Watchdog がタイムアウトした場合にプロセス全体を停止 5. **Controller 初期化:** - - `controller.init()` + - `controller.init()` - Model の遅延初期化、デバイス列挙、ネットワーク接続チェック - `init_mapping` のすべてのエンドポイントを実行して初期設定をフロントエンドに送信 6. **マッピングのアンロック:** - すべての `mapping[key]["status"]` を True に設定 - これにより初期化中だった機能が利用可能になる -7. `main_instance.start()` - 実質的には何もしない(既に起動済み) +7. `main_instance.start()` - メインループ維持のみ(receiver / handler は事前に起動済み) + +### Watchdog エンドポイント連携 + +バックエンドはクライアント側から `/run/feed_watchdog` が一定間隔(例: 30 秒)で送信される前提の設計が可能です。テストクライアント(`test_client.py`)は初期化完了後にバックグラウンドスレッドでこのエンドポイントを自動送信し、内部監視(`controller` 側の Watchdog コールバック)タイムアウト防止を行います。 + +ログ上は通常レスポンス非取得(fire & forget)となるため、負荷低減のためにハートビート頻度調整が可能です。 ## 並列処理とスレッドセーフティ @@ -182,6 +221,7 @@ ``` **フィールド:** + - `endpoint`: 実行するエンドポイント(必須) - `data`: パラメータ(オプション、Base64 エンコード) @@ -196,12 +236,13 @@ ``` **フィールド:** + - `status`: HTTP ステータスコード相当 - - 200: 成功 - - 400: バリデーションエラー - - 404: 無効なエンドポイント - - 423: ロック中(リトライされる) - - 500: 内部エラー + - 200: 成功 + - 400: バリデーションエラー + - 404: 無効なエンドポイント + - 423: ロック中(リトライされる) + - 500: 内部エラー - `endpoint`: リクエストされたエンドポイント - `result`: 処理結果(型はエンドポイントに依存) @@ -218,52 +259,62 @@ ## エラーハンドリング ### 1. JSON パースエラー + - **発生箇所:** `receiver()` の `json.loads()` - **処理:** `errorLogging()` でトレースバック記録、リクエストをスキップ ### 2. ハンドラ実行エラー + - **発生箇所:** `_call_handler()` の `handler["variable"](data)` -- **処理:** - - `errorLogging()` でトレースバック記録 - - status 500 と "Internal error" を返却 - - プロセスは継続 +- **処理:** + - `errorLogging()` でトレースバック記録 + - status 500 と "Internal error" を返却 + - プロセスは継続 ### 3. JSON シリアライズエラー + - **発生箇所:** `printResponse()` の `json.dumps()` - **処理:** - - エラーログに詳細を記録 - - フォールバック JSON を出力(status 500) - - プロセスは継続 + - エラーログに詳細を記録 + - フォールバック JSON を出力(status 500) + - プロセスは継続 ### 4. EOF (stdin 終了) + - **発生箇所:** `receiver()` の `readline()` - **処理:** 0.1秒待機して再試行(フロントエンドの再起動待ち) ## パフォーマンス最適化 ### 1. 複数ワーカースレッド + - デフォルト3スレッドで並列処理 - CPU バウンドな処理(翻訳、文字起こし)を効率化 ### 2. 非ブロッキングロック + - ロック競合時に即座に再キュー - スレッドのブロッキング時間を最小化 ### 3. 処理安定化待機 + - 各ハンドラ実行後に 0.2秒待機 - 連続リクエストによる競合状態を回避 ## 制限事項 ### 1. 初期化中の制限 + - `mapping[key]["status"] = False` の間はリクエストが 423 でリトライされる - 初期化完了まで最大数秒のレイテンシが発生 ### 2. stdin の単方向性 + - stdin → キュー → ハンドラの一方向フロー - 複数のフロントエンドからの同時接続は非対応 ### 3. シリアル実行の保証 + - 同一エンドポイントのリクエストは排他的に実行されるが、 - 異なるエンドポイントは並列実行される可能性がある - 依存関係のある操作は呼び出し側で順序制御が必要 @@ -294,14 +345,17 @@ ## 今後の拡張性 ### 1. 双方向通信 + - WebSocket への移行でリアルタイム通知を改善 - stdin/stdout は互換性のため維持 ### 2. 動的ワーカー数調整 + - キューの深さに応じてスレッド数を自動調整 - CPU 負荷に応じた適応的なスケーリング ### 3. 優先度キュー + - 重要なリクエスト(エラー通知等)を優先処理 - `queue.PriorityQueue` への移行 @@ -315,6 +369,7 @@ ## コーディング規約 本ファイルは以下の規約に従う: + - PEP 8 スタイルガイド - 型ヒント (`typing` モジュール) - Docstring は Google スタイル @@ -323,6 +378,7 @@ ## テストシナリオ ### 1. 基本動作テスト + ```python # stdin に JSON を送信 echo '{"endpoint": "/get/data/version", "data": null}' | python mainloop.py @@ -330,14 +386,17 @@ echo '{"endpoint": "/get/data/version", "data": null}' | python mainloop.py ``` ### 2. 並列リクエストテスト + - 複数の設定変更リクエストを同時送信 - すべてが正常に処理されることを確認 ### 3. ロック競合テスト + - 翻訳の有効化と無効化を連続送信 - 両方が排他的に実行されることを確認 ### 4. エラー回復テスト + - 不正なJSON、無効なエンドポイント、不正なデータを送信 - プロセスがクラッシュせずエラーレスポンスを返すことを確認 diff --git a/src-python/docs/test_client.md b/src-python/docs/test_client.md new file mode 100644 index 00000000..b552815c --- /dev/null +++ b/src-python/docs/test_client.md @@ -0,0 +1,139 @@ +# test_client.py ドキュメント + +## 概要 +`test_client.py` は stdin/stdout 経由でバックエンド (`mainloop.py`) と通信し、各種 API エンドポイントを自動 / 半自動でテストするためのクライアントユーティリティ。初期化完了待機、ログ (status=348) 展開表示、サイレントモード、結果エクスポート(JSON/CSV)、Watchdog ハートビート送信 (/run/feed_watchdog) などの補助機能を備える。 + +## 主な責務 +- バックエンドプロセス起動と初期化完了待機 (`/run/initialization_complete`) +- エンドポイント単発送信/応答待機 (Base64 エンコード/デコード) +- status=348 ログエントリの全文展開表示 +- タイムアウト / 例外発生時の復旧メッセージ出力 +- Watchdog ハートビート送信スレッド管理 +- 自動テスター (`AutomatedEndpointTester`) による包括的エンドポイント試験 +- テスト結果の JSON / CSV エクスポート (インタラクティブ指定) + +## クラス構成 +### `Color` +ANSIカラー定数を定義し、可読性の高い出力を実現。 + +### `TestClient` +バックエンドとの 1 プロセス・1 チャンネル通信を管理。 + +| 属性 | 役割 | +|------|------| +| `process` | 起動した Python バックエンド subprocess | +| `_watchdog_stop_event` | Watchdog スレッド停止制御用 Event | +| `_watchdog_thread` | /run/feed_watchdog 送信スレッド | + +#### 初期化フロー +1. `subprocess.Popen([sys.executable, 'mainloop.py'], ...)` でバックエンド起動 +2. `_wait_for_initialization()` を呼び出し `/run/initialization_complete` 受信まで待機 + - VRCT_INIT_TIMEOUT 環境変数があれば soft timeout として利用 (超過時 WARN のみ) + - 30 秒間隔で進捗ログ (最後に受信した endpoint) + - status=348 レコードは全文 JSON 展開 +3. 初期化完了後 `_start_watchdog()` がバックグラウンドで /run/feed_watchdog を 30 秒間隔送信 + +#### 重要メソッド +- `send_request(endpoint, data=None, timeout=30.0, silent=False)` + - リクエスト JSON を構築し送信 + - `data` は JSON シリアライズ → Base64 → `data` フィールド + - 指定 endpoint のレスポンス行まで逐次読み取り (他 endpoint のログ行は通過) + - status=348 の場合ログとして全文表示(silent=False のとき) + - タイムアウト時 504 レスポンスを合成 + +- `_wait_for_initialization(timeout=None)` + - 無期限または soft timeout 待機 + - プロセス死亡検知で RuntimeError + +- `_start_watchdog()` / `cleanup()` + - Watchdog スレッド開始と安全停止 (Event セット後 join) + +### `AutomatedEndpointTester` +`backend_test.py` のロジック移植版(stdin/stdout プロトコル向け)。 + +| 属性 | 説明 | +|------|------| +| `silent` | True ならクライアント側詳細出力を抑制 | +| `export_path` | JSON エクスポート先 (None なら未出力) | +| `export_csv` | CSV 追加出力有無 | +| `results` | 収集したテストレコード一覧 | + +#### エンドポイント分類 (ハードコード暫定) +- 有効/無効化: `/set/enable/*`, `/set/disable/*` +- 設定更新: `/set/data/*` +- 実行系: `/run/*` +- 削除系: `/delete/data/*` + +#### 主メソッド +- `test_validity_single(endpoint)` 有効/無効化系単一試験 +- `test_set_data_single(endpoint)` 設定更新系単一試験(事前に動的取得が必要な値は `/get/data/...` を呼び最新値をキャッシュ) +- `test_run_single(endpoint)` 実行系単一試験 +- `test_delete_single(endpoint)` 削除系単一試験 +- `run_all()` 全カテゴリ順次実行 +- `run_random(count=1000)` 全エンドポイントプールからランダム選択 +- `run_specific_random(category, count)` 指定カテゴリ内ランダム +- `summary()` 結果集計出力および必要な場合 JSON/CSV エクスポート + +#### 結果レコード構造 +```json +{ + "endpoint": "/set/data/transparency", + "status": 200, + "result": 85, + "expected": "status==200" +} +``` + +CSV 例: +``` +endpoint,status,expected,success +/set/data/transparency,200,status==200,True +``` + +## status=348 ログ取り扱い +- 初期化待機中: 展開してインデント付き表示 +- リクエスト応答処理中: silent=False ならログエントリ全文を優先表示 +- 通常 API 応答 (status != 348) との区別を明確化しデバッグ容易化 + +## Watchdog ハートビート +- 30 秒間隔で `/run/feed_watchdog` を送信 (fire-and-forget) +- 送信失敗時は警告を表示しループ終了 +- クライアント終了時に停止イベントをセットしスレッド join でリーク防止 + +## エクスポート機能 +### JSON エクスポート +- `export_path` が指定されていれば `results` を UTF-8 / ensure_ascii=False で整形出力 +- フィールド: endpoint, status, result, expected, success + +### CSV エクスポート +- JSON パスから拡張子置換(`.csv`)で派生 (内部 `_derive_csv_path` 相当ロジック) +- 成功判定: `status` と `expected` 文字列評価結果に依存 (単純比較) + +## 例外 / エラー処理方針 +| ケース | 対応 | +|--------|------| +| バックエンド終了検知 | 初期化待機中: RuntimeError を投げる / 通常通信: 500 レスポンス合成 | +| JSONDecodeError | ログ行扱いでスキップ (初期化中は進捗ログとして表示) | +| BrokenPipe / OSError | 通信切断とみなして 500 レスポンス返却 | +| タイムアウト | 504 レスポンス返却 (endpoint 同梱) | + +## 制限事項 +- エンドポイント一覧は動的取得ではなくハードコード (将来的改善余地) +- レスポンスの並行受信は未対応(1 リクエスト同期待ち) +- status=200 以外の詳細な意味的検証は限定的 (expected = 単純条件) +- Watchdog レスポンスは読み取らないため送信失敗検知は例外経路のみ + +## 今後の改善候補 +1. CLI 引数サポート (`--mode random --count 500 --silent --export result.json`) +2. 動的エンドポイント列挙 API 追加後の自動反映 +3. リトライポリシー (指数バックオフ) 導入 +4. 応答時間測定とパフォーマンスレポート出力 +5. 並列テスト実行 (複数 subprocess / async IO) + +## 参考 +- `backend_test.py` : 元ロジック +- `utils.py` : status=348 ログ出力仕様 +- `mainloop.md` : 通信プロトコル詳細 + +## まとめ +`test_client.py` は VRCT バックエンドに対する包括的なテストおよび運用補助を 1 ファイルで実現するツール。初期化待機の堅牢化(無期限 + soft timeout)、Watchdog ハートビート、ログ展開、静音化オプション、結果エクスポートにより長時間動作・回帰試験・CI への組み込みを容易にする基盤を提供する。 diff --git a/src-python/mainloop.py b/src-python/mainloop.py index f4c3a451..84bc3919 100644 --- a/src-python/mainloop.py +++ b/src-python/mainloop.py @@ -551,9 +551,12 @@ class Main: self._threads.append(th_handler) def start(self) -> None: - """Start receiver and handler threads.""" - self.startReceiver() - self.startHandler() + """Start the main loop to keep the program running.""" + try: + while not self._stop_event.is_set(): + time.sleep(1) + except KeyboardInterrupt: + self.stop() def stop(self, wait: float = 2.0) -> None: """Signal threads to stop and wait for them to finish. diff --git a/src-python/test_client.py b/src-python/test_client.py new file mode 100644 index 00000000..6dd565d2 --- /dev/null +++ b/src-python/test_client.py @@ -0,0 +1,984 @@ +""" +フロントエンドを模擬したテストクライアント +stdin/stdoutを介してバックエンドと通信し、エンドポイントをテストします。 + +使用方法: +1. バックエンドを起動: python mainloop.py +2. 別のターミナルでこのスクリプトを実行: python test_client.py +3. エンドポイントとデータを指定してテストを実行 +""" +import os +import subprocess +import json +import base64 +import sys +import time +import threading +from typing import Optional, Dict, Any + +if os.path.exists("config.json"): + os.remove("config.json") + +class Color: + GREEN = '\033[32m' + RED = '\033[31m' + YELLOW = '\033[33m' + BLUE = '\033[34m' + CYAN = '\033[36m' + RESET = '\033[0m' + BOLD = '\033[1m' + +class TestClient: + def __init__(self): + """バックエンドプロセスを起動してstdin/stdout通信を確立""" + print(f"{Color.CYAN}バックエンドプロセスを起動中...{Color.RESET}") + self.process = subprocess.Popen( + [sys.executable, 'mainloop.py'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + cwd='.' + ) + self._watchdog_stop_event = threading.Event() + self._watchdog_thread: Optional[threading.Thread] = None + + # 初期化完了を待つ + print(f"{Color.CYAN}バックエンドの初期化を待機中...{Color.RESET}") + self._wait_for_initialization() + print(f"{Color.GREEN}バックエンド起動完了{Color.RESET}\n") + + # 初期化完了後 watchdog 開始 + self._start_watchdog() + + def _wait_for_initialization(self, timeout: Optional[float] = None): + """バックエンド初期化完了 (/run/initialization_complete) を待機する。 + + 旧仕様: 60秒で TimeoutError を発生させていた。 + 新仕様: + - timeout が None の場合は無期限待機。 + - 'VRCT_INIT_TIMEOUT' 環境変数が設定されていれば soft timeout 値として使用。 + - soft timeout 経過時は ERROR ではなく WARN を表示し継続待機。 + - 進捗: 30秒ごとに経過時間と最後に受信した endpoint を表示。 + - バックエンドプロセスが終了した場合のみ例外を投げる。 + """ + import os + env_timeout = os.getenv("VRCT_INIT_TIMEOUT") + if timeout is None: + try: + timeout = float(env_timeout) if env_timeout else None + except ValueError: + timeout = None + + start_time = time.time() + last_progress_endpoint = None + last_progress_time_log = 0.0 + + while True: + # プロセス終了検知 + if self.process.poll() is not None: + raise RuntimeError("Backend process terminated during initialization") + + # soft timeout 警告表示 + if timeout is not None and (time.time() - start_time) > timeout: + # 一度だけ警告を出し timeout を解除(以降は継続待機) + print(f"{Color.YELLOW}[WARN]{Color.RESET} 初期化が {timeout:.1f} 秒を超過しました。ダウンロード等で長時間かかっています。引き続き待機します。環境変数 VRCT_INIT_TIMEOUT を調整できます。") + timeout = None # 解除 + + # 30秒ごとの進捗ログ + now = time.time() + if now - last_progress_time_log >= 30: + elapsed = now - start_time + ep_info = last_progress_endpoint or "(受信なし)" + print(f"{Color.CYAN}[進捗]{Color.RESET} 初期化経過 {elapsed:.1f} 秒 / 最終 endpoint: {ep_info}") + last_progress_time_log = now + + line = self.process.stdout.readline() + if not line: + # 何も来ていないがプロセスは動作中 -> 継続 + continue + + stripped = line.strip() + if not stripped: + continue + + # JSON解析試行 + try: + response = json.loads(stripped) + endpoint = response.get("endpoint", "") + status = response.get("status", 0) + last_progress_endpoint = endpoint or last_progress_endpoint + if status == 348: + # 348 はログ扱い: 全フィールド展開 + print(f"{Color.CYAN} [初期化ログ]{Color.RESET} Status: {status} endpoint:{endpoint or '(none)'}") + expanded = json.dumps(response, ensure_ascii=False, indent=2) + for line in expanded.split('\n'): + print(f" {line}") + else: + print(f"{Color.CYAN} [初期化中]{Color.RESET} {endpoint} (Status: {status})") + if endpoint == "/run/initialization_complete" and status == 200: + total_elapsed = time.time() - start_time + print(f"{Color.GREEN} [初期化完了]{Color.RESET} 経過 {total_elapsed:.1f} 秒") + return + except json.JSONDecodeError: + # ログ行として扱う + print(f"{Color.CYAN} [Backend]{Color.RESET} {stripped}") + continue + + def send_request(self, endpoint: str, data: Optional[Any] = None, timeout: float = 30.0, silent: bool = False) -> Dict[str, Any]: + """ + エンドポイントにリクエストを送信し、レスポンスを取得 + 対応するエンドポイントの応答が返ってくるまで処理を待機します + + Args: + endpoint: テストするエンドポイント (例: "/get/data/version") + data: 送信するデータ (None, dict, str, int, float, bool, list等) + timeout: タイムアウト時間(秒) + + Returns: + レスポンスの辞書 {"status": int, "endpoint": str, "result": Any} + """ + try: + # プロセスが生きているかチェック + if self.process.poll() is not None: + print(f"{Color.RED}[ERROR]{Color.RESET} バックエンドプロセスが終了しています") + return {"status": 500, "endpoint": endpoint, "result": "Backend process is not running"} + + # リクエストの構築 + request = {"endpoint": endpoint} + + if data is not None: + # データをJSON文字列に変換してBase64エンコード + json_data = json.dumps(data, ensure_ascii=False) + encoded_data = base64.b64encode(json_data.encode('utf-8')).decode('utf-8') + request["data"] = encoded_data + + # リクエストを送信 + request_json = json.dumps(request, ensure_ascii=False) + if not silent: + print(f"{Color.BLUE}[送信]{Color.RESET} {endpoint}") + if data is not None: + print(f" データ: {data}") + print(" 応答待機中...", flush=True) + + try: + self.process.stdin.write(request_json + '\n') + self.process.stdin.flush() + except (OSError, BrokenPipeError) as e: + print(f"{Color.RED}[ERROR]{Color.RESET} バックエンドプロセスとの通信に失敗: {e}") + # stderrの内容を確認 + stderr_output = self.process.stderr.read() + if stderr_output: + print(f"{Color.RED}[Backend Error]{Color.RESET}") + print(stderr_output) + return {"status": 500, "endpoint": endpoint, "result": f"Communication error: {e}"} + + # 対応するエンドポイントのレスポンスを受信するまで待機 + start_time = time.time() + while True: + # タイムアウトチェック + if time.time() - start_time > timeout: + print(f"{Color.RED}[TIMEOUT]{Color.RESET} レスポンスがタイムアウトしました ({timeout}秒)") + return {"status": 504, "endpoint": endpoint, "result": f"Timeout after {timeout} seconds"} + + # レスポンスを受信 + response_line = self.process.stdout.readline() + + if not response_line: + # プロセスが終了した可能性 + if self.process.poll() is not None: + return {"status": 500, "endpoint": endpoint, "result": "Backend process terminated"} + continue + + # JSONとしてパース + try: + response = json.loads(response_line.strip()) + except json.JSONDecodeError: + # ログ出力など、JSONでない行を表示 + print(f"{Color.CYAN}[Backend出力]{Color.RESET} {response_line.strip()}") + continue + + # レスポンスのエンドポイントが一致するかチェック + response_endpoint = response.get("endpoint", "") + if response_endpoint == endpoint: + # 対応するレスポンスを受信 + status = response.get("status", 500) + result = response.get("result", None) + + # ステータスに応じて色分けして表示 + if not silent: + if status == 200: + print(f"{Color.GREEN}[受信]{Color.RESET} Status: {status}") + elif status == 400: + print(f"{Color.YELLOW}[受信]{Color.RESET} Status: {status}") + elif status == 348: + # 348 = ログ扱い: 中身を全展開 + print(f"{Color.CYAN}[LOG]{Color.RESET} Status: {status} (endpoint={response_endpoint})") + else: + print(f"{Color.RED}[受信]{Color.RESET} Status: {status}") + + # 結果を整形して表示 + if not silent: + # 348 の場合はログとしてフルコンテンツ優先表示 + if status == 348: + print(" ログエントリ全体:") + full_str = json.dumps(response, ensure_ascii=False, indent=2) + for line in full_str.split('\n'): + print(f" {line}") + print() + else: + print(f" エンドポイント: {response_endpoint}") + print(" 結果:") + if isinstance(result, (dict, list)): + # dict/listの場合はインデント付きで表示 + result_str = json.dumps(result, ensure_ascii=False, indent=2) + for line in result_str.split('\n'): + print(f" {line}") + else: + print(f" {result}") + + # レスポンス全体も表示 + print(" 完全なレスポンス:") + response_str = json.dumps(response, ensure_ascii=False, indent=2) + for line in response_str.split('\n'): + print(f" {line}") + print() + + return response + else: + # 別のエンドポイントのレスポンス、またはログメッセージ + if not silent: + if response_endpoint: + print(f"{Color.YELLOW}[他のエンドポイントの応答]{Color.RESET} {response_endpoint}") + else: + # endpointキーがない場合はログメッセージ + print(f"{Color.CYAN}[Backendログ]{Color.RESET}") + print(f" {json.dumps(response, ensure_ascii=False)}") + continue + + except json.JSONDecodeError as e: + print(f"{Color.RED}[ERROR]{Color.RESET} JSONデコードエラー: {e}") + return {"status": 500, "endpoint": endpoint, "result": f"JSON decode error: {e}"} + except (BrokenPipeError, OSError) as e: + print(f"{Color.RED}[ERROR]{Color.RESET} プロセス通信エラー: {e}") + # プロセスの状態を確認 + if self.process.poll() is not None: + print(f"{Color.RED}[ERROR]{Color.RESET} バックエンドプロセスが終了しています") + # stderrの内容を表示 + try: + stderr_output = self.process.stderr.read() + if stderr_output: + print(f"{Color.RED}[Backend Error Output]{Color.RESET}") + print(stderr_output) + except Exception: + pass + return {"status": 500, "endpoint": endpoint, "result": f"Process communication error: {e}"} + except Exception as e: + print(f"{Color.RED}[ERROR]{Color.RESET} 予期しないエラー: {e}") + import traceback + traceback.print_exc() + return {"status": 500, "endpoint": endpoint, "result": f"Error: {e}"} + + def _start_watchdog(self): + """watchdog スレッドを開始 (30秒間隔で /run/feed_watchdog 送信)""" + def _watchdog_loop(): + print(f"{Color.CYAN}[Watchdog]{Color.RESET} 開始 (30秒間隔)") + while not self._watchdog_stop_event.is_set(): + if self._watchdog_stop_event.wait(timeout=30): + break + if self.process.poll() is None: + try: + request = {"endpoint": "/run/feed_watchdog"} + request_json = json.dumps(request, ensure_ascii=False) + self.process.stdin.write(request_json + '\n') + self.process.stdin.flush() + # レスポンスは受信しない(バックグラウンド送信) + except Exception as e: + print(f"{Color.YELLOW}[Watchdog]{Color.RESET} 送信エラー: {e}") + break + print(f"{Color.CYAN}[Watchdog]{Color.RESET} 終了") + + self._watchdog_thread = threading.Thread(target=_watchdog_loop, daemon=True) + self._watchdog_thread.start() + + def cleanup(self): + """バックエンドプロセスを終了""" + print(f"\n{Color.CYAN}バックエンドプロセスを終了中...{Color.RESET}") + # watchdog 停止 + if self._watchdog_thread and self._watchdog_thread.is_alive(): + self._watchdog_stop_event.set() + self._watchdog_thread.join(timeout=2) + # プロセス終了 + self.process.terminate() + try: + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.process.kill() + print(f"{Color.GREEN}終了完了{Color.RESET}") + +def run_example_tests(client: TestClient): + """サンプルテストの実行""" + print(f"{Color.BOLD}=== サンプルテスト開始 ==={Color.RESET}\n") + + # 1. バージョン情報の取得 + print(f"{Color.BOLD}Test 1: バージョン情報の取得{Color.RESET}") + client.send_request("/get/data/version") + + # 2. 透明度の設定 + print(f"{Color.BOLD}Test 2: 透明度の設定{Color.RESET}") + client.send_request("/set/data/transparency", 75) + + # 3. UI言語の設定 + print(f"{Color.BOLD}Test 3: UI言語の設定{Color.RESET}") + client.send_request("/set/data/ui_language", "ja") + + # 4. 翻訳機能の有効化 + print(f"{Color.BOLD}Test 4: 翻訳機能の有効化{Color.RESET}") + client.send_request("/set/enable/translation") + + # 5. 翻訳機能の無効化 + print(f"{Color.BOLD}Test 5: 翻訳機能の無効化{Color.RESET}") + client.send_request("/set/disable/translation") + + # 6. 無効なエンドポイント + print(f"{Color.BOLD}Test 6: 無効なエンドポイント{Color.RESET}") + client.send_request("/invalid/endpoint") + + print(f"{Color.BOLD}=== サンプルテスト終了 ==={Color.RESET}\n") + +class AutomatedEndpointTester: + """backend_test.py のロジックを stdin/stdout 通信向けに移植した自動テストクラス + + Args: + client: TestClient インスタンス + silent: True の場合詳細ログを抑制 + export_path: テスト結果を JSON に書き出すパス (None なら書き出し無し) + export_csv: True の場合 CSV も併せて書き出す + """ + def __init__(self, client: TestClient, silent: bool = False, export_path: Optional[str] = None, export_csv: bool = False): + self.client = client + self.silent = silent + self.export_path = export_path + self.export_csv = export_csv + # config 的な値をキャッシュ + self.cache: Dict[str, Any] = {} + # エンドポイント分類 (動的取得手段が無いため暫定的にハードコード) + self.validity_endpoints = [ + "/set/enable/translation", + "/set/disable/translation", + "/set/enable/transcription_send", + "/set/disable/transcription_send", + "/set/enable/transcription_receive", + "/set/disable/transcription_receive", + "/set/enable/websocket_server", + "/set/disable/websocket_server", + "/set/enable/convert_message_to_romaji", + "/set/disable/convert_message_to_romaji", + "/set/enable/convert_message_to_hiragana", + "/set/disable/convert_message_to_hiragana", + ] + self.set_data_endpoints = [ + "/set/data/selected_tab_no", + "/set/data/selected_translation_engines", + "/set/data/selected_your_languages", + "/set/data/selected_target_languages", + "/set/data/selected_transcription_engine", + "/set/data/transparency", + "/set/data/ui_scaling", + "/set/data/textbox_ui_scaling", + "/set/data/message_box_ratio", + "/set/data/send_message_button_type", + "/set/data/font_family", + "/set/data/ui_language", + "/set/data/main_window_geometry", + "/set/data/selected_translation_compute_device", + "/set/data/selected_transcription_compute_device", + "/set/data/ctranslate2_weight_type", + "/set/data/plamo_model", + "/set/data/plamo_auth_key", + "/set/data/gemini_model", + "/set/data/gemini_auth_key", + "/set/data/openai_model", + "/set/data/openai_auth_key", + "/set/data/lmstudio_model", + "/set/data/lmstudio_url", + "/set/data/ollama_model", + "/set/data/deepl_auth_key", + "/set/data/selected_mic_host", + "/set/data/selected_mic_device", + "/set/data/mic_threshold", + "/set/data/mic_record_timeout", + "/set/data/mic_phrase_timeout", + "/set/data/mic_max_phrases", + "/set/data/hotkeys", + "/set/data/plugins_status", + "/set/data/mic_avg_logprob", + "/set/data/mic_no_speech_prob", + "/set/data/mic_word_filter", + "/set/data/selected_speaker_device", + "/set/data/speaker_threshold", + "/set/data/speaker_record_timeout", + "/set/data/speaker_phrase_timeout", + "/set/data/speaker_max_phrases", + "/set/data/speaker_avg_logprob", + "/set/data/speaker_no_speech_prob", + "/set/data/whisper_weight_type", + "/set/data/overlay_small_log_settings", + "/set/data/overlay_large_log_settings", + "/set/data/send_message_format_parts", + "/set/data/received_message_format_parts", + "/set/data/websocket_host", + "/set/data/websocket_port", + "/set/data/osc_ip_address", + "/set/data/osc_port", + "/set/data/selected_translation_compute_type", + "/set/data/selected_transcription_compute_type", + ] + self.run_endpoints = [ + "/run/send_message_box", + "/run/typing_message_box", + "/run/stop_typing_message_box", + "/run/send_text_overlay", + "/run/swap_your_language_and_target_language", + "/run/update_software", + "/run/update_cuda_software", + "/run/download_ctranslate2_weight", + "/run/download_whisper_weight", + "/run/open_filepath_logs", + "/run/open_filepath_config_file", + "/run/feed_watchdog", + "/run/lmstudio_connection", + "/run/ollama_connection", + ] + self.delete_data_endpoints = [ + "/delete/data/deepl_auth_key", + ] + self.results: Dict[str, Dict[str, Any]] = {} + + # ---------------------------------- Utility ---------------------------------- + def _record(self, endpoint: str, status: Optional[int], result: Any, expected_status: list[int]): + self.results[endpoint] = { + "status": status, + "result": result, + "expected_status": expected_status, + "success": status in expected_status if status is not None else False + } + + def _get(self, endpoint: str) -> Any: + """/get/data/* の結果をキャッシュしつつ取得""" + resp = self.client.send_request(endpoint, silent=self.silent) + if resp.get("status") == 200: + self.cache[endpoint.split("/")[-1]] = resp.get("result") + return resp.get("result") + return None + + # ---------------------------------- Generators ---------------------------------- + def _gen_set_data(self, endpoint: str): + expected = [200] + data = None + # ほぼ backend_test.py のロジックを踏襲 + if endpoint == "/set/data/selected_tab_no": + data = sys.modules.get('__random_tab_choices', None) or None # placeholder for future dynamic + data = data or "1" + elif endpoint == "/set/data/selected_translation_engines": + engines = self._get("/get/data/translation_engines") or [] + data = {i: (engines and (engines[0] if len(engines) else None)) for i in ["1","2","3"]} + elif endpoint == "/set/data/selected_your_languages": + lang_list = self._get("/get/data/selectable_language_list") or [] + if lang_list: + choice = lang_list[0] + data = {i: {"1": {**choice, "enable": True}} for i in ["1","2","3"]} + elif endpoint == "/set/data/selected_target_languages": + lang_list = self._get("/get/data/selectable_language_list") or [] + if lang_list: + base = lang_list[0] + data = {i: {j: {**base, "enable": (j=="1")} for j in ["1","2","3"]} for i in ["1","2","3"]} + elif endpoint == "/set/data/selected_transcription_engine": + engines = self._get("/get/data/transcription_engines") or [] + data = engines[0] if engines else None + elif endpoint == "/set/data/transparency": + import random + data = random.randint(0,100) + elif endpoint == "/set/data/ui_scaling" or endpoint == "/set/data/textbox_ui_scaling": + import random + data = random.randint(50,200) + elif endpoint == "/set/data/message_box_ratio": + import random + data = round(random.uniform(0.1,0.9),2) + elif endpoint == "/set/data/send_message_button_type": + import random + data = random.choice(["show","hide","show_and_disable_enter_key"]) + elif endpoint == "/set/data/font_family": + import random + data = random.choice(["Arial","Verdana","Times New Roman"]) + elif endpoint == "/set/data/ui_language": + import random + data = random.choice(["en","ja","ko","zh-Hant","zh-Hans"]) + elif endpoint == "/set/data/main_window_geometry": + import random + data = { + "x_pos": random.randint(0,1920), + "y_pos": random.randint(0,1080), + "width": random.randint(800,1920), + "height": random.randint(600,1080) + } + elif endpoint == "/set/data/selected_translation_compute_device": + lst = self._get("/get/data/translation_compute_device_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/selected_transcription_compute_device": + lst = self._get("/get/data/transcription_compute_device_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/ctranslate2_weight_type": + dct = self._get("/get/data/selectable_ctranslate2_weight_type_dict") or {} + keys = list(dct.keys()) + import random + data = random.choice(keys) if keys else None + elif endpoint == "/set/data/plamo_model": + lst = self._get("/get/data/plamo_model_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/plamo_auth_key": + data = "PLAMO_DUMMY_KEY" + expected = [200,400] + elif endpoint == "/set/data/gemini_model": + lst = self._get("/get/data/gemini_model_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/gemini_auth_key": + data = "GEMINI_DUMMY_KEY" + expected = [200,400] + elif endpoint == "/set/data/openai_model": + lst = self._get("/get/data/openai_model_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/openai_auth_key": + data = "OPENAI_DUMMY_KEY" + expected = [200,400] + elif endpoint == "/set/data/lmstudio_model": + lst = self._get("/get/data/lmstudio_model_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/lmstudio_url": + import random + data = random.choice(["http://localhost:1234/v1","http://127.0.0.1:1234/v1","http://invalid_host:9999/v1"]) + expected=[200,400] + elif endpoint == "/set/data/ollama_model": + lst = self._get("/get/data/ollama_model_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/deepl_auth_key": + data = "DEEPL_DUMMY_KEY" + expected=[200,400] + elif endpoint == "/set/data/selected_mic_host": + lst = self._get("/get/data/mic_host_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/selected_mic_device": + lst = self._get("/get/data/mic_device_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/mic_threshold": + import random + val = random.randint(-1000,3000) + data = val + expected=[200] if 0 <= val <= 2000 else [400] + elif endpoint == "/set/data/mic_record_timeout": + import random + val = random.randint(-1,10) + phrase = self._get("/get/data/mic_phrase_timeout") + data = val + expected=[200] if (phrase is not None and 0 <= val <= phrase) else [400] + elif endpoint == "/set/data/mic_phrase_timeout": + import random + val = random.randint(-1,10) + record = self._get("/get/data/mic_record_timeout") + data = val + expected=[200] if (record is not None and record <= val) else [400] + elif endpoint == "/set/data/mic_max_phrases": + import random + val = random.randint(-1,10) + data = val + expected=[200] if val >= 0 else [400] + elif endpoint == "/set/data/hotkeys": + data = {'toggle_vrct_visibility': None,'toggle_translation': None,'toggle_transcription_send': None,'toggle_transcription_receive': None} + elif endpoint == "/set/data/plugins_status": + plugins = self._get("/get/data/plugins") or [] + import random + data = {p: random.choice([True,False]) for p in plugins} + elif endpoint == "/set/data/mic_avg_logprob": + import random + data = random.uniform(-5,0) + elif endpoint == "/set/data/mic_no_speech_prob": + import random + data = random.uniform(0,1) + elif endpoint == "/set/data/mic_word_filter": + import random + data = random.choice([["test_0_0","test_0_1","test_0_2",None],["test_1_0","test_1_1",None],["test_2_0",None],[None]]) + elif endpoint == "/set/data/selected_speaker_device": + lst = self._get("/get/data/speaker_device_list") or [] + import random + data = random.choice(lst) if lst else None + elif endpoint == "/set/data/speaker_threshold": + import random + val = random.randint(-1000,5000) + data = val + expected=[200] if 0 <= val <= 4000 else [400] + elif endpoint == "/set/data/speaker_record_timeout": + import random + val = random.randint(-1,10) + phrase = self._get("/get/data/speaker_phrase_timeout") + data = val + expected=[200] if (phrase is not None and 0 <= val <= phrase) else [400] + elif endpoint == "/set/data/speaker_phrase_timeout": + import random + val = random.randint(-1,10) + record = self._get("/get/data/speaker_record_timeout") + data = val + expected=[200] if (record is not None and record <= val) else [400] + elif endpoint == "/set/data/speaker_max_phrases": + import random + val = random.randint(-1,10) + data = val + expected=[200] if val >= 0 else [400] + elif endpoint == "/set/data/speaker_avg_logprob": + import random + data = random.uniform(-5,0) + elif endpoint == "/set/data/speaker_no_speech_prob": + import random + data = random.uniform(0,1) + elif endpoint == "/set/data/whisper_weight_type": + dct = self._get("/get/data/selectable_whisper_weight_type_dict") or {} + import random + keys=[k for k,v in dct.items() if v] + data = random.choice(keys) if keys else None + elif endpoint == "/set/data/overlay_small_log_settings" or endpoint == "/set/data/overlay_large_log_settings": + import random + data = { + "x_pos": random.random(), + "y_pos": random.random(), + "z_pos": random.random(), + "x_rotation": random.random(), + "y_rotation": random.random(), + "z_rotation": random.random(), + "display_duration": random.randint(0,100), + "fadeout_duration": random.randint(0,100), + "opacity": random.random(), + "ui_scaling": random.random(), + "tracker": random.choice(["HMD","LeftHand","RightHand"]) + } + elif endpoint == "/set/data/send_message_format_parts": + fmt = self._get("/get/data/send_message_format_parts") + data = fmt + elif endpoint == "/set/data/received_message_format_parts": + fmt = self._get("/get/data/received_message_format_parts") + data = fmt + elif endpoint == "/set/data/websocket_host": + import random + val = random.choice(["127.0.0.1","aaaaadwafasdsd","0210.1564.845.0"]) + data = val + expected = [200,400] if val=="127.0.0.1" else [400] + elif endpoint == "/set/data/websocket_port": + import random + data = random.randint(1024,65535) + expected=[200,400] + elif endpoint == "/set/data/osc_ip_address": + import random + val = random.choice(["127.0.0.1","aaaaadwafasdsd","0210.1564.845.0"]) + data = val + expected = [200] if val=="127.0.0.1" else [400] + elif endpoint == "/set/data/osc_port": + import random + data = random.randint(1024,65535) + elif endpoint == "/set/data/selected_translation_compute_type": + device = self.cache.get("selected_translation_compute_device") or self._get("/get/data/selected_translation_compute_device") + if device and isinstance(device, dict): + import random + data = random.choice(device.get("compute_types", [])) if device.get("compute_types") else None + elif endpoint == "/set/data/selected_transcription_compute_type": + device = self.cache.get("selected_transcription_compute_device") or self._get("/get/data/selected_transcription_compute_device") + if device and isinstance(device, dict): + import random + data = random.choice(device.get("compute_types", [])) if device.get("compute_types") else None + return data, expected + + def _gen_run_data(self, endpoint: str): + expected = [200] + data = None + import random + if endpoint == "/run/send_message_box": + choices=[{"data":{"id":"000001","message":"test"},"status":[200]}, + {"data":{"id":"000002","message":"Hello World!"},"status":[200]}, + {"data":{"id":"000003","message":"こんにちわ 世界!"},"status":[200]}, + {"data":{"id":"000004","message":"안녕하세요 세계!"},"status":[200]}, + {"data":{"id":"000005","message":"你好,世界!"},"status":[200]}] + choice = random.choice(choices) + data = choice["data"] + expected = choice["status"] + elif endpoint in ["/run/typing_message_box","/run/stop_typing_message_box","/run/send_text_overlay","/run/swap_your_language_and_target_language"]: + data = "test_overlay" if endpoint == "/run/send_text_overlay" else None + elif endpoint in ["/run/update_software","/run/update_cuda_software","/run/download_ctranslate2_weight","/run/download_whisper_weight","/run/open_filepath_logs","/run/open_filepath_config_file","/run/feed_watchdog"]: + expected=[401] + elif endpoint in ["/run/lmstudio_connection","/run/ollama_connection"]: + expected=[200,400] + return data, expected + + # ---------------------------------- Tests ---------------------------------- + def test_validity_single(self, endpoint: str): + expected=[200] + if endpoint == "/set/enable/websocket_server": + expected=[200,400] + resp = self.client.send_request(endpoint) + status = resp.get("status") + result = resp.get("result") + self._record(endpoint, status, result, expected) + ok = status in expected + tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}" + print(f"[Validity] {endpoint} -> {tag} ({status})") + return ok + + def test_set_data_single(self, endpoint: str): + data, expected = self._gen_set_data(endpoint) + if expected == [404]: + self._record(endpoint, None, None, expected) + print(f"[SetData] {endpoint} -> {Color.RED}UNKNOWN{Color.RESET}") + return False + if data is None: + self._record(endpoint, None, None, expected) + print(f"[SetData] {endpoint} -> {Color.YELLOW}SKIP(no data){Color.RESET}") + return True + resp = self.client.send_request(endpoint, data, silent=self.silent) + status = resp.get("status") + result = resp.get("result") + self._record(endpoint, status, result, expected) + ok = status in expected + tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}" + print(f"[SetData] {endpoint} -> {tag} ({status}) data={data}") + return ok + + def test_run_single(self, endpoint: str): + data, expected = self._gen_run_data(endpoint) + if expected == [401]: + self._record(endpoint, None, None, expected) + print(f"[Run] {endpoint} -> {Color.YELLOW}SKIP(401){Color.RESET}") + return True + resp = self.client.send_request(endpoint, data, silent=self.silent) + status = resp.get("status") + result = resp.get("result") + self._record(endpoint, status, result, expected) + ok = status in expected + tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}" + print(f"[Run] {endpoint} -> {tag} ({status})") + return ok + + def test_delete_single(self, endpoint: str): + expected=[200] + resp = self.client.send_request(endpoint, silent=self.silent) + status = resp.get("status") + result = resp.get("result") + self._record(endpoint, status, result, expected) + ok = status in expected + tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}" + print(f"[Delete] {endpoint} -> {tag} ({status})") + return ok + + def run_all(self): + print(f"{Color.BOLD}=== 有効/無効エンドポイントテスト ==={Color.RESET}") + for ep in self.validity_endpoints: + self.test_validity_single(ep) + print(f"{Color.BOLD}=== データ設定エンドポイントテスト ==={Color.RESET}") + for ep in self.set_data_endpoints: + self.test_set_data_single(ep) + print(f"{Color.BOLD}=== 実行エンドポイントテスト ==={Color.RESET}") + for ep in self.run_endpoints: + self.test_run_single(ep) + print(f"{Color.BOLD}=== 削除エンドポイントテスト ==={Color.RESET}") + for ep in self.delete_data_endpoints: + self.test_delete_single(ep) + + def run_random(self, iterations: int = 500): + import random + print(f"{Color.BOLD}=== ランダムアクセステスト(iter={iterations}) ==={Color.RESET}") + groups = ["validity","set","run","delete"] + for i in range(iterations): + g = random.choice(groups) + if g == "validity": + ep = random.choice(self.validity_endpoints) + self.test_validity_single(ep) + elif g == "set": + ep = random.choice(self.set_data_endpoints) + self.test_set_data_single(ep) + elif g == "run": + ep = random.choice(self.run_endpoints) + self.test_run_single(ep) + else: + ep = random.choice(self.delete_data_endpoints) + self.test_delete_single(ep) + # 最後に disable 系を OFF + for ep in self.validity_endpoints: + if ep.startswith("/set/disable/"): + self.client.send_request(ep, silent=True) + + def run_specific_random(self, iterations: int = 200): + import random + print(f"{Color.BOLD}=== 特定(osc/websocket)ランダムテスト(iter={iterations}) ==={Color.RESET}") + set_specific = ["/set/data/osc_ip_address","/set/data/osc_port","/set/data/websocket_host","/set/data/websocket_port"] + for i in range(iterations): + ep = random.choice(set_specific) + self.test_set_data_single(ep) + + def summary(self): + total = len(self.results) + passed = sum(1 for r in self.results.values() if r["success"]) + skipped = sum(1 for r in self.results.values() if r["expected_status"] == [401]) + failed = total - passed - skipped + print(f"\n{Color.BOLD}==== テストサマリー ==== {Color.RESET}") + print(f"総数: {total} / 成功: {passed} / 失敗: {failed} / スキップ(401): {skipped}") + if failed: + print(f"{Color.RED}失敗詳細:{Color.RESET}") + for ep, r in self.results.items(): + if not r["success"] and r["expected_status"] != [401]: + print(f"- {ep} status={r['status']} expected={r['expected_status']} result={r['result']}") + print(f"{Color.BOLD}======================={Color.RESET}\n") + # インタラクティブ指定によるエクスポート + if self.export_path: + try: + self.export_results(self.export_path) + print(f"{Color.GREEN}[EXPORT]{Color.RESET} JSONに書き出しました: {self.export_path}") + if self.export_csv: + csv_path = self._derive_csv_path(self.export_path) + self.export_results_csv(csv_path) + print(f"{Color.GREEN}[EXPORT]{Color.RESET} CSVに書き出しました: {csv_path}") + except Exception as e: + print(f"{Color.RED}[EXPORT ERROR]{Color.RESET} {e}") + + def _derive_csv_path(self, json_path: str) -> str: + base, ext = os.path.splitext(json_path) + return base + '.csv' + + def export_results(self, filename: str): + """結果をJSONファイルへ書き出し""" + payload = { + "generated_at": time.strftime("%Y-%m-%dT%H:%M:%S"), + "total": len(self.results), + "results": self.results, + } + with open(filename, "w", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, indent=2) + + def export_results_csv(self, filename: str): + """結果をCSVへ書き出し (簡易)""" + import csv + with open(filename, "w", encoding="utf-8", newline="") as f: + w = csv.writer(f) + w.writerow(["endpoint", "status", "expected", "success"]) + for ep, r in self.results.items(): + w.writerow([ep, r.get("status"), ",".join(map(str, r.get("expected_status", []))), r.get("success")]) + +def run_interactive_mode(client: TestClient): + """対話モードでテストを実行""" + print(f"{Color.BOLD}=== 対話モード ==={Color.RESET}") + print("エンドポイントとデータを指定してテストを実行します。") + print("終了するには 'quit' または 'exit' と入力してください。\n") + + while True: + try: + # エンドポイントの入力 + endpoint = input(f"{Color.CYAN}エンドポイントを入力 (例: /get/data/version): {Color.RESET}").strip() + + if endpoint.lower() in ['quit', 'exit', 'q']: + break + + if not endpoint: + print(f"{Color.YELLOW}エンドポイントを入力してください{Color.RESET}\n") + continue + + # データの入力 + data_input = input(f"{Color.CYAN}データを入力 (JSON形式, なければEnter): {Color.RESET}").strip() + + data = None + if data_input: + try: + data = json.loads(data_input) + except json.JSONDecodeError: + print(f"{Color.YELLOW}JSONとしてパースできませんでした。文字列として送信します{Color.RESET}") + data = data_input + + # リクエスト送信 + client.send_request(endpoint, data) + + except KeyboardInterrupt: + print(f"\n{Color.YELLOW}中断されました{Color.RESET}") + break + except Exception as e: + print(f"{Color.RED}エラー: {e}{Color.RESET}\n") + +def main(): + """メイン処理""" + print(f"{Color.BOLD}{'='*60}{Color.RESET}") + print(f"{Color.BOLD} VRCT Backend Test Client{Color.RESET}") + print(f"{Color.BOLD}{'='*60}{Color.RESET}\n") + + client = None + try: + # テストクライアントの初期化 + client = TestClient() + + # モード選択 + print("モードを選択してください:") + print("1. サンプルテストを実行") + print("2. 対話モードで実行") + print("3. 自動テスト(全)を実行") + print("4. ランダムアクセステスト") + print("5. 特定(osc/websocket)ランダムテスト") + mode = input(f"{Color.CYAN}選択 (1-5): {Color.RESET}").strip() + + # 追加オプション選択 + silent_choice = input(f"{Color.CYAN}詳細ログを抑制しますか? (y/N): {Color.RESET}").strip().lower() + silent = silent_choice == 'y' + export_choice = input(f"{Color.CYAN}結果をJSON出力しますか? (y/N): {Color.RESET}").strip().lower() + export_path = None + export_csv = False + if export_choice == 'y': + default_name = f"test_results_{int(time.time())}.json" + path_in = input(f"{Color.CYAN}出力ファイル名[{default_name}]: {Color.RESET}").strip() + export_path = path_in or default_name + csv_choice = input(f"{Color.CYAN}CSVも出力しますか? (y/N): {Color.RESET}").strip().lower() + export_csv = csv_choice == 'y' + + print() + + if mode == "1": + run_example_tests(client) + elif mode == "2": + run_interactive_mode(client) + elif mode == "3": + tester = AutomatedEndpointTester(client, silent=silent, export_path=export_path, export_csv=export_csv) + tester.run_all() + tester.summary() + elif mode == "4": + tester = AutomatedEndpointTester(client, silent=silent, export_path=export_path, export_csv=export_csv) + tester.run_random() + tester.summary() + elif mode == "5": + tester = AutomatedEndpointTester(client, silent=silent, export_path=export_path, export_csv=export_csv) + tester.run_specific_random() + tester.summary() + else: + print(f"{Color.YELLOW}無効な選択です。対話モードを開始します。{Color.RESET}\n") + run_interactive_mode(client) + + except KeyboardInterrupt: + print(f"\n{Color.YELLOW}ユーザーによって中断されました{Color.RESET}") + except Exception as e: + print(f"{Color.RED}エラーが発生しました: {e}{Color.RESET}") + import traceback + traceback.print_exc() + finally: + if client: + client.cleanup() + +if __name__ == "__main__": + main() diff --git a/src-python/utils.py b/src-python/utils.py index 1e250e87..a251e9b5 100644 --- a/src-python/utils.py +++ b/src-python/utils.py @@ -100,7 +100,7 @@ def getComputeDeviceList() -> List[Dict[str, Any]]: "device": "cpu", "device_index": 0, "device_name": "cpu", - "compute_types": ["auto"] + list(get_supported_compute_types("cpu", 0)), + "compute_types": ["auto"] + sorted(list(get_supported_compute_types("cpu", 0))), } ] @@ -108,7 +108,7 @@ def getComputeDeviceList() -> List[Dict[str, Any]]: if torch is not None and hasattr(torch, "cuda") and torch.cuda.is_available(): for device_index in range(torch.cuda.device_count()): gpu_device_name = torch.cuda.get_device_name(device_index) - gpu_compute_types = ["auto"] + list(get_supported_compute_types("cuda", device_index)) + gpu_compute_types = ["auto"] + sorted(list(get_supported_compute_types("cuda", device_index))) # デバイスごとの計算タイプの制限 if "GTX" in gpu_device_name: @@ -157,12 +157,11 @@ def getBestComputeType(device: str, device_index: int) -> str: } # デバイス名に基づいて優先タイプを選択 + selected_types = preferred_types["default"] for key in preferred_types: if key in device_name: selected_types = preferred_types[key] break - else: - selected_types = preferred_types["default"] # 利用可能な計算タイプを返す for compute_type in selected_types: