Files
VRCT/src-python/test_client.py

991 lines
47 KiB
Python

"""
フロントエンドを模擬したテストクライアント
stdin/stdoutを介してバックエンドと通信し、エンドポイントをテストします。
使用方法:
1. バックエンドを起動: python mainloop.py
2. 別のターミナルでこのスクリプトを実行: python test_client.py
3. エンドポイントとデータを指定してテストを実行
"""
import os
import subprocess
import json
import base64
import sys
import time
import threading
from typing import Optional, Dict, Any
if os.path.exists("config.json"):
os.remove("config.json")
class Color:
GREEN = '\033[32m'
RED = '\033[31m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
CYAN = '\033[36m'
RESET = '\033[0m'
BOLD = '\033[1m'
class TestClient:
def __init__(self):
"""バックエンドプロセスを起動してstdin/stdout通信を確立"""
print(f"{Color.CYAN}バックエンドプロセスを起動中...{Color.RESET}")
self.process = subprocess.Popen(
[sys.executable, 'mainloop.py'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
cwd='.'
)
self._watchdog_stop_event = threading.Event()
self._watchdog_thread: Optional[threading.Thread] = None
# 初期化完了を待つ
print(f"{Color.CYAN}バックエンドの初期化を待機中...{Color.RESET}")
self._wait_for_initialization()
print(f"{Color.GREEN}バックエンド起動完了{Color.RESET}\n")
# 初期化完了後 watchdog 開始
self._start_watchdog()
def _wait_for_initialization(self, timeout: Optional[float] = None):
"""バックエンド初期化完了 (/run/initialization_complete) を待機する。
旧仕様: 60秒で TimeoutError を発生させていた。
新仕様:
- timeout が None の場合は無期限待機。
- 'VRCT_INIT_TIMEOUT' 環境変数が設定されていれば soft timeout 値として使用。
- soft timeout 経過時は ERROR ではなく WARN を表示し継続待機。
- 進捗: 30秒ごとに経過時間と最後に受信した endpoint を表示。
- バックエンドプロセスが終了した場合のみ例外を投げる。
"""
import os
env_timeout = os.getenv("VRCT_INIT_TIMEOUT")
if timeout is None:
try:
timeout = float(env_timeout) if env_timeout else None
except ValueError:
timeout = None
start_time = time.time()
last_progress_endpoint = None
last_progress_time_log = 0.0
while True:
# プロセス終了検知
if self.process.poll() is not None:
raise RuntimeError("Backend process terminated during initialization")
# soft timeout 警告表示
if timeout is not None and (time.time() - start_time) > timeout:
# 一度だけ警告を出し timeout を解除(以降は継続待機)
print(f"{Color.YELLOW}[WARN]{Color.RESET} 初期化が {timeout:.1f} 秒を超過しました。ダウンロード等で長時間かかっています。引き続き待機します。環境変数 VRCT_INIT_TIMEOUT を調整できます。")
timeout = None # 解除
# 30秒ごとの進捗ログ
now = time.time()
if now - last_progress_time_log >= 30:
elapsed = now - start_time
ep_info = last_progress_endpoint or "(受信なし)"
print(f"{Color.CYAN}[進捗]{Color.RESET} 初期化経過 {elapsed:.1f} 秒 / 最終 endpoint: {ep_info}")
last_progress_time_log = now
line = self.process.stdout.readline()
if not line:
# 何も来ていないがプロセスは動作中 -> 継続
continue
stripped = line.strip()
if not stripped:
continue
# JSON解析試行
try:
response = json.loads(stripped)
endpoint = response.get("endpoint", "")
status = response.get("status", 0)
last_progress_endpoint = endpoint or last_progress_endpoint
if status == 348:
# 348 はログ扱い: 全フィールド展開
print(f"{Color.CYAN} [初期化ログ]{Color.RESET} Status: {status} endpoint:{endpoint or '(none)'}")
expanded = json.dumps(response, ensure_ascii=False, indent=2)
for line in expanded.split('\n'):
print(f" {line}")
else:
print(f"{Color.CYAN} [初期化中]{Color.RESET} {endpoint} (Status: {status})")
if endpoint == "/run/initialization_complete" and status == 200:
total_elapsed = time.time() - start_time
print(f"{Color.GREEN} [初期化完了]{Color.RESET} 経過 {total_elapsed:.1f}")
return
except json.JSONDecodeError:
# ログ行として扱う
print(f"{Color.CYAN} [Backend]{Color.RESET} {stripped}")
continue
def send_request(self, endpoint: str, data: Optional[Any] = None, timeout: float = 30.0, silent: bool = False) -> Dict[str, Any]:
"""
エンドポイントにリクエストを送信し、レスポンスを取得
対応するエンドポイントの応答が返ってくるまで処理を待機します
Args:
endpoint: テストするエンドポイント (例: "/get/data/version")
data: 送信するデータ (None, dict, str, int, float, bool, list等)
timeout: タイムアウト時間(秒)
Returns:
レスポンスの辞書 {"status": int, "endpoint": str, "result": Any}
"""
try:
# プロセスが生きているかチェック
if self.process.poll() is not None:
print(f"{Color.RED}[ERROR]{Color.RESET} バックエンドプロセスが終了しています")
return {"status": 500, "endpoint": endpoint, "result": "Backend process is not running"}
# リクエストの構築
request = {"endpoint": endpoint}
if data is not None:
# データをJSON文字列に変換してBase64エンコード
json_data = json.dumps(data, ensure_ascii=False)
encoded_data = base64.b64encode(json_data.encode('utf-8')).decode('utf-8')
request["data"] = encoded_data
# リクエストを送信
request_json = json.dumps(request, ensure_ascii=False)
if not silent:
print(f"{Color.BLUE}[送信]{Color.RESET} {endpoint}")
if data is not None:
print(f" データ: {data}")
print(" 応答待機中...", flush=True)
try:
self.process.stdin.write(request_json + '\n')
self.process.stdin.flush()
except (OSError, BrokenPipeError) as e:
print(f"{Color.RED}[ERROR]{Color.RESET} バックエンドプロセスとの通信に失敗: {e}")
# stderrの内容を確認
stderr_output = self.process.stderr.read()
if stderr_output:
print(f"{Color.RED}[Backend Error]{Color.RESET}")
print(stderr_output)
return {"status": 500, "endpoint": endpoint, "result": f"Communication error: {e}"}
# 対応するエンドポイントのレスポンスを受信するまで待機
start_time = time.time()
while True:
# タイムアウトチェック
if time.time() - start_time > timeout:
print(f"{Color.RED}[TIMEOUT]{Color.RESET} レスポンスがタイムアウトしました ({timeout}秒)")
return {"status": 504, "endpoint": endpoint, "result": f"Timeout after {timeout} seconds"}
# レスポンスを受信
response_line = self.process.stdout.readline()
if not response_line:
# プロセスが終了した可能性
if self.process.poll() is not None:
return {"status": 500, "endpoint": endpoint, "result": "Backend process terminated"}
continue
# JSONとしてパース
try:
response = json.loads(response_line.strip())
except json.JSONDecodeError:
# ログ出力など、JSONでない行を表示
print(f"{Color.CYAN}[Backend出力]{Color.RESET} {response_line.strip()}")
continue
# レスポンスのエンドポイントが一致するかチェック
response_endpoint = response.get("endpoint", "")
if response_endpoint == endpoint:
# 対応するレスポンスを受信
status = response.get("status", 500)
result = response.get("result", None)
# ステータスに応じて色分けして表示
if not silent:
if status == 200:
print(f"{Color.GREEN}[受信]{Color.RESET} Status: {status}")
elif status == 400:
print(f"{Color.YELLOW}[受信]{Color.RESET} Status: {status}")
elif status == 348:
# 348 = ログ扱い: 中身を全展開
print(f"{Color.CYAN}[LOG]{Color.RESET} Status: {status} (endpoint={response_endpoint})")
else:
print(f"{Color.RED}[受信]{Color.RESET} Status: {status}")
# 結果を整形して表示
if not silent:
# 348 の場合はログとしてフルコンテンツ優先表示
if status == 348:
print(" ログエントリ全体:")
full_str = json.dumps(response, ensure_ascii=False, indent=2)
for line in full_str.split('\n'):
print(f" {line}")
print()
else:
print(f" エンドポイント: {response_endpoint}")
print(" 結果:")
if isinstance(result, (dict, list)):
# dict/listの場合はインデント付きで表示
result_str = json.dumps(result, ensure_ascii=False, indent=2)
for line in result_str.split('\n'):
print(f" {line}")
else:
print(f" {result}")
# レスポンス全体も表示
print(" 完全なレスポンス:")
response_str = json.dumps(response, ensure_ascii=False, indent=2)
for line in response_str.split('\n'):
print(f" {line}")
print()
return response
else:
# 別のエンドポイントのレスポンス、またはログメッセージ
if not silent:
if response_endpoint:
print(f"{Color.YELLOW}[他のエンドポイントの応答]{Color.RESET} {response_endpoint}")
else:
# endpointキーがない場合はログメッセージ
print(f"{Color.CYAN}[Backendログ]{Color.RESET}")
print(f" {json.dumps(response, ensure_ascii=False)}")
continue
except json.JSONDecodeError as e:
print(f"{Color.RED}[ERROR]{Color.RESET} JSONデコードエラー: {e}")
return {"status": 500, "endpoint": endpoint, "result": f"JSON decode error: {e}"}
except (BrokenPipeError, OSError) as e:
print(f"{Color.RED}[ERROR]{Color.RESET} プロセス通信エラー: {e}")
# プロセスの状態を確認
if self.process.poll() is not None:
print(f"{Color.RED}[ERROR]{Color.RESET} バックエンドプロセスが終了しています")
# stderrの内容を表示
try:
stderr_output = self.process.stderr.read()
if stderr_output:
print(f"{Color.RED}[Backend Error Output]{Color.RESET}")
print(stderr_output)
except Exception:
pass
return {"status": 500, "endpoint": endpoint, "result": f"Process communication error: {e}"}
except Exception as e:
print(f"{Color.RED}[ERROR]{Color.RESET} 予期しないエラー: {e}")
import traceback
traceback.print_exc()
return {"status": 500, "endpoint": endpoint, "result": f"Error: {e}"}
def _start_watchdog(self):
"""watchdog スレッドを開始 (30秒間隔で /run/feed_watchdog 送信)"""
def _watchdog_loop():
print(f"{Color.CYAN}[Watchdog]{Color.RESET} 開始 (30秒間隔)")
while not self._watchdog_stop_event.is_set():
if self._watchdog_stop_event.wait(timeout=30):
break
if self.process.poll() is None:
try:
request = {"endpoint": "/run/feed_watchdog"}
request_json = json.dumps(request, ensure_ascii=False)
self.process.stdin.write(request_json + '\n')
self.process.stdin.flush()
# レスポンスは受信しない(バックグラウンド送信)
except Exception as e:
print(f"{Color.YELLOW}[Watchdog]{Color.RESET} 送信エラー: {e}")
break
print(f"{Color.CYAN}[Watchdog]{Color.RESET} 終了")
self._watchdog_thread = threading.Thread(target=_watchdog_loop, daemon=True)
self._watchdog_thread.start()
def cleanup(self):
"""バックエンドプロセスを終了"""
print(f"\n{Color.CYAN}バックエンドプロセスを終了中...{Color.RESET}")
# watchdog 停止
if self._watchdog_thread and self._watchdog_thread.is_alive():
self._watchdog_stop_event.set()
self._watchdog_thread.join(timeout=2)
# プロセス終了
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
print(f"{Color.GREEN}終了完了{Color.RESET}")
def run_example_tests(client: TestClient):
"""サンプルテストの実行"""
print(f"{Color.BOLD}=== サンプルテスト開始 ==={Color.RESET}\n")
# 1. バージョン情報の取得
print(f"{Color.BOLD}Test 1: バージョン情報の取得{Color.RESET}")
client.send_request("/get/data/version")
# 2. 透明度の設定
print(f"{Color.BOLD}Test 2: 透明度の設定{Color.RESET}")
client.send_request("/set/data/transparency", 75)
# 3. UI言語の設定
print(f"{Color.BOLD}Test 3: UI言語の設定{Color.RESET}")
client.send_request("/set/data/ui_language", "ja")
# 4. 翻訳機能の有効化
print(f"{Color.BOLD}Test 4: 翻訳機能の有効化{Color.RESET}")
client.send_request("/set/enable/translation")
# 5. 翻訳機能の無効化
print(f"{Color.BOLD}Test 5: 翻訳機能の無効化{Color.RESET}")
client.send_request("/set/disable/translation")
# 6. 無効なエンドポイント
print(f"{Color.BOLD}Test 6: 無効なエンドポイント{Color.RESET}")
client.send_request("/invalid/endpoint")
print(f"{Color.BOLD}=== サンプルテスト終了 ==={Color.RESET}\n")
class AutomatedEndpointTester:
"""backend_test.py のロジックを stdin/stdout 通信向けに移植した自動テストクラス
Args:
client: TestClient インスタンス
silent: True の場合詳細ログを抑制
export_path: テスト結果を JSON に書き出すパス (None なら書き出し無し)
export_csv: True の場合 CSV も併せて書き出す
"""
def __init__(self, client: TestClient, silent: bool = False, export_path: Optional[str] = None, export_csv: bool = False):
self.client = client
self.silent = silent
self.export_path = export_path
self.export_csv = export_csv
# config 的な値をキャッシュ
self.cache: Dict[str, Any] = {}
# エンドポイント分類 (動的取得手段が無いため暫定的にハードコード)
self.validity_endpoints = [
"/set/enable/translation",
"/set/disable/translation",
"/set/enable/transcription_send",
"/set/disable/transcription_send",
"/set/enable/transcription_receive",
"/set/disable/transcription_receive",
"/set/enable/websocket_server",
"/set/disable/websocket_server",
"/set/enable/convert_message_to_romaji",
"/set/disable/convert_message_to_romaji",
"/set/enable/convert_message_to_hiragana",
"/set/disable/convert_message_to_hiragana",
]
self.set_data_endpoints = [
"/set/data/selected_tab_no",
"/set/data/selected_translation_engines",
"/set/data/selected_your_languages",
"/set/data/selected_target_languages",
"/set/data/selected_transcription_engine",
"/set/data/transparency",
"/set/data/ui_scaling",
"/set/data/textbox_ui_scaling",
"/set/data/message_box_ratio",
"/set/data/send_message_button_type",
"/set/data/font_family",
"/set/data/ui_language",
"/set/data/main_window_geometry",
"/set/data/selected_translation_compute_device",
"/set/data/selected_transcription_compute_device",
"/set/data/selected_ctranslate2_weight_type",
"/set/data/selected_plamo_model",
"/set/data/plamo_auth_key",
"/set/data/selected_gemini_model",
"/set/data/gemini_auth_key",
"/set/data/selected_openai_model",
"/set/data/openai_auth_key",
"/set/data/selected_lmstudio_model",
"/set/data/lmstudio_url",
"/set/data/selected_ollama_model",
"/set/data/deepl_auth_key",
"/set/data/selected_mic_host",
"/set/data/selected_mic_device",
"/set/data/mic_threshold",
"/set/data/mic_record_timeout",
"/set/data/mic_phrase_timeout",
"/set/data/mic_max_phrases",
"/set/data/hotkeys",
"/set/data/plugins_status",
"/set/data/mic_avg_logprob",
"/set/data/mic_no_speech_prob",
"/set/data/mic_word_filter",
"/set/data/selected_speaker_device",
"/set/data/speaker_threshold",
"/set/data/speaker_record_timeout",
"/set/data/speaker_phrase_timeout",
"/set/data/speaker_max_phrases",
"/set/data/speaker_avg_logprob",
"/set/data/speaker_no_speech_prob",
"/set/data/selected_whisper_weight_type",
"/set/data/overlay_small_log_settings",
"/set/data/overlay_large_log_settings",
"/set/data/send_message_format_parts",
"/set/data/received_message_format_parts",
"/set/data/websocket_host",
"/set/data/websocket_port",
"/set/data/osc_ip_address",
"/set/data/osc_port",
"/set/data/selected_translation_compute_type",
"/set/data/selected_transcription_compute_type",
]
self.run_endpoints = [
"/run/send_message_box",
"/run/typing_message_box",
"/run/stop_typing_message_box",
"/run/send_text_overlay",
"/run/swap_your_language_and_target_language",
"/run/update_software",
"/run/update_cuda_software",
"/run/download_ctranslate2_weight",
"/run/download_whisper_weight",
"/run/open_filepath_logs",
"/run/open_filepath_config_file",
"/run/feed_watchdog",
"/run/lmstudio_connection",
"/run/ollama_connection",
]
self.delete_data_endpoints = [
"/delete/data/deepl_auth_key",
]
self.results: Dict[str, Dict[str, Any]] = {}
# ---------------------------------- Utility ----------------------------------
def _record(self, endpoint: str, status: Optional[int], result: Any, expected_status: list[int]):
self.results[endpoint] = {
"status": status,
"result": result,
"expected_status": expected_status,
"success": status in expected_status if status is not None else False
}
def _get(self, endpoint: str) -> Any:
"""/get/data/* の結果をキャッシュしつつ取得"""
resp = self.client.send_request(endpoint, silent=self.silent)
if resp.get("status") == 200:
self.cache[endpoint.split("/")[-1]] = resp.get("result")
return resp.get("result")
return None
# ---------------------------------- Generators ----------------------------------
def _gen_set_data(self, endpoint: str):
expected = [200]
data = None
# ほぼ backend_test.py のロジックを踏襲
if endpoint == "/set/data/selected_tab_no":
data = sys.modules.get('__random_tab_choices', None) or None # placeholder for future dynamic
data = data or "1"
elif endpoint == "/set/data/selected_translation_engines":
engines = self._get("/get/data/selectable_translation_engines") or []
data = {i: (engines and (engines[0] if len(engines) else None)) for i in ["1","2","3"]}
elif endpoint == "/set/data/selected_your_languages":
lang_list = self._get("/get/data/selectable_language_list") or []
if lang_list:
choice = lang_list[0]
data = {i: {"1": {**choice, "enable": True}} for i in ["1","2","3"]}
elif endpoint == "/set/data/selected_target_languages":
lang_list = self._get("/get/data/selectable_language_list") or []
if lang_list:
base = lang_list[0]
data = {i: {j: {**base, "enable": (j=="1")} for j in ["1","2","3"]} for i in ["1","2","3"]}
elif endpoint == "/set/data/selected_transcription_engine":
engines = self._get("/get/data/selectable_transcription_engines") or []
data = engines[0] if engines else None
elif endpoint == "/set/data/transparency":
import random
data = random.randint(0,100)
elif endpoint == "/set/data/ui_scaling" or endpoint == "/set/data/textbox_ui_scaling":
import random
data = random.randint(50,200)
elif endpoint == "/set/data/message_box_ratio":
import random
data = round(random.uniform(0.1,0.9),2)
elif endpoint == "/set/data/send_message_button_type":
import random
data = random.choice(["show","hide","show_and_disable_enter_key"])
elif endpoint == "/set/data/font_family":
import random
data = random.choice(["Arial","Verdana","Times New Roman"])
elif endpoint == "/set/data/ui_language":
import random
data = random.choice(["en","ja","ko","zh-Hant","zh-Hans"])
elif endpoint == "/set/data/main_window_geometry":
import random
data = {
"x_pos": random.randint(0,1920),
"y_pos": random.randint(0,1080),
"width": random.randint(800,1920),
"height": random.randint(600,1080)
}
elif endpoint == "/set/data/selected_translation_compute_device":
lst = self._get("/get/data/selectable_translation_compute_device_list") or []
import random
data = random.choice(lst) if lst else None
elif endpoint == "/set/data/selected_transcription_compute_device":
lst = self._get("/get/data/selectable_transcription_compute_device_list") or []
import random
data = random.choice(lst) if lst else None
elif endpoint == "/set/data/selected_ctranslate2_weight_type":
dct = self._get("/get/data/selectable_ctranslate2_weight_type_dict") or {}
keys = list(dct.keys())
import random
data = random.choice(keys) if keys else None
elif endpoint == "/set/data/selected_plamo_model":
lst = self._get("/get/data/selectable_plamo_model_list") or []
import random
data = random.choice(lst) if lst else None
expected = [200,400]
elif endpoint == "/set/data/plamo_auth_key":
data = "PLAMO_DUMMY_KEY"
expected = [200,400]
elif endpoint == "/set/data/selected_gemini_model":
lst = self._get("/get/data/selectable_gemini_model_list") or []
import random
data = random.choice(lst) if lst else None
expected = [200,400]
elif endpoint == "/set/data/gemini_auth_key":
data = "GEMINI_DUMMY_KEY"
expected = [200,400]
elif endpoint == "/set/data/selected_openai_model":
lst = self._get("/get/data/selectable_openai_model_list") or []
import random
data = random.choice(lst) if lst else None
expected = [200,400]
elif endpoint == "/set/data/openai_auth_key":
data = "OPENAI_DUMMY_KEY"
expected = [200,400]
elif endpoint == "/set/data/selected_lmstudio_model":
lst = self._get("/get/data/selectable_lmstudio_model_list") or []
import random
data = random.choice(lst) if lst else None
expected = [200,400]
elif endpoint == "/set/data/lmstudio_url":
import random
data = random.choice(["http://localhost:1234/v1","http://127.0.0.1:1234/v1","http://invalid_host:9999/v1"])
expected=[200,400]
elif endpoint == "/set/data/selected_ollama_model":
lst = self._get("/get/data/selectable_ollama_model_list") or []
import random
data = random.choice(lst) if lst else None
expected = [200,400]
elif endpoint == "/set/data/deepl_auth_key":
data = "DEEPL_DUMMY_KEY"
expected=[200,400]
elif endpoint == "/set/data/selected_mic_host":
lst = self._get("/get/data/selectable_mic_host_list") or []
import random
data = random.choice(lst) if lst else None
elif endpoint == "/set/data/selected_mic_device":
lst = self._get("/get/data/selectable_mic_device_list") or []
import random
data = random.choice(lst) if lst else None
elif endpoint == "/set/data/mic_threshold":
import random
val = random.randint(-1000,3000)
data = val
expected=[200] if 0 <= val <= 2000 else [400]
elif endpoint == "/set/data/mic_record_timeout":
import random
val = random.randint(-1,10)
phrase = self._get("/get/data/mic_phrase_timeout")
data = val
expected=[200] if (phrase is not None and 0 <= val <= phrase) else [400]
elif endpoint == "/set/data/mic_phrase_timeout":
import random
val = random.randint(-1,10)
record = self._get("/get/data/mic_record_timeout")
data = val
expected=[200] if (record is not None and record <= val) else [400]
elif endpoint == "/set/data/mic_max_phrases":
import random
val = random.randint(-1,10)
data = val
expected=[200] if val >= 0 else [400]
elif endpoint == "/set/data/hotkeys":
data = {'toggle_vrct_visibility': None,'toggle_translation': None,'toggle_transcription_send': None,'toggle_transcription_receive': None}
elif endpoint == "/set/data/plugins_status":
plugins = self._get("/get/data/plugins") or []
import random
data = {p: random.choice([True,False]) for p in plugins}
elif endpoint == "/set/data/mic_avg_logprob":
import random
data = random.uniform(-5,0)
elif endpoint == "/set/data/mic_no_speech_prob":
import random
data = random.uniform(0,1)
elif endpoint == "/set/data/mic_word_filter":
import random
data = random.choice([["test_0_0","test_0_1","test_0_2",None],["test_1_0","test_1_1",None],["test_2_0",None],[None]])
elif endpoint == "/set/data/selected_speaker_device":
lst = self._get("/get/data/selectable_speaker_device_list") or []
import random
data = random.choice(lst) if lst else None
elif endpoint == "/set/data/speaker_threshold":
import random
val = random.randint(-1000,5000)
data = val
expected=[200] if 0 <= val <= 4000 else [400]
elif endpoint == "/set/data/speaker_record_timeout":
import random
val = random.randint(-1,10)
phrase = self._get("/get/data/speaker_phrase_timeout")
data = val
expected=[200] if (phrase is not None and 0 <= val <= phrase) else [400]
elif endpoint == "/set/data/speaker_phrase_timeout":
import random
val = random.randint(-1,10)
record = self._get("/get/data/speaker_record_timeout")
data = val
expected=[200] if (record is not None and record <= val) else [400]
elif endpoint == "/set/data/speaker_max_phrases":
import random
val = random.randint(-1,10)
data = val
expected=[200] if val >= 0 else [400]
elif endpoint == "/set/data/speaker_avg_logprob":
import random
data = random.uniform(-5,0)
elif endpoint == "/set/data/speaker_no_speech_prob":
import random
data = random.uniform(0,1)
elif endpoint == "/set/data/selected_whisper_weight_type":
dct = self._get("/get/data/selectable_whisper_weight_type_dict") or {}
import random
keys=[k for k,v in dct.items() if v]
data = random.choice(keys) if keys else None
elif endpoint == "/set/data/overlay_small_log_settings" or endpoint == "/set/data/overlay_large_log_settings":
import random
data = {
"x_pos": random.random(),
"y_pos": random.random(),
"z_pos": random.random(),
"x_rotation": random.random(),
"y_rotation": random.random(),
"z_rotation": random.random(),
"display_duration": random.randint(0,100),
"fadeout_duration": random.randint(0,100),
"opacity": random.random(),
"ui_scaling": random.random(),
"tracker": random.choice(["HMD","LeftHand","RightHand"])
}
elif endpoint == "/set/data/send_message_format_parts":
fmt = self._get("/get/data/send_message_format_parts")
data = fmt
elif endpoint == "/set/data/received_message_format_parts":
fmt = self._get("/get/data/received_message_format_parts")
data = fmt
elif endpoint == "/set/data/websocket_host":
import random
val = random.choice(["127.0.0.1","aaaaadwafasdsd","0210.1564.845.0"])
data = val
expected = [200,400] if val=="127.0.0.1" else [400]
elif endpoint == "/set/data/websocket_port":
import random
data = random.randint(1024,65535)
expected=[200,400]
elif endpoint == "/set/data/osc_ip_address":
import random
val = random.choice(["127.0.0.1","aaaaadwafasdsd","0210.1564.845.0"])
data = val
expected = [200] if val=="127.0.0.1" else [400]
elif endpoint == "/set/data/osc_port":
import random
data = random.randint(1024,65535)
elif endpoint == "/set/data/selected_translation_compute_type":
device = self.cache.get("selected_translation_compute_device") or self._get("/get/data/selected_translation_compute_device")
if device and isinstance(device, dict):
import random
data = random.choice(device.get("compute_types", [])) if device.get("compute_types") else None
elif endpoint == "/set/data/selected_transcription_compute_type":
device = self.cache.get("selected_transcription_compute_device") or self._get("/get/data/selected_transcription_compute_device")
if device and isinstance(device, dict):
import random
data = random.choice(device.get("compute_types", [])) if device.get("compute_types") else None
return data, expected
def _gen_run_data(self, endpoint: str):
expected = [200]
data = None
import random
if endpoint == "/run/send_message_box":
choices=[{"data":{"id":"000001","message":"test"},"status":[200]},
{"data":{"id":"000002","message":"Hello World!"},"status":[200]},
{"data":{"id":"000003","message":"こんにちわ 世界!"},"status":[200]},
{"data":{"id":"000004","message":"안녕하세요 세계!"},"status":[200]},
{"data":{"id":"000005","message":"你好,世界!"},"status":[200]}]
choice = random.choice(choices)
data = choice["data"]
expected = choice["status"]
elif endpoint in ["/run/typing_message_box","/run/stop_typing_message_box","/run/send_text_overlay","/run/swap_your_language_and_target_language"]:
data = "test_overlay" if endpoint == "/run/send_text_overlay" else None
elif endpoint in ["/run/update_software","/run/update_cuda_software","/run/download_ctranslate2_weight","/run/download_whisper_weight","/run/open_filepath_logs","/run/open_filepath_config_file","/run/feed_watchdog"]:
expected=[401]
elif endpoint in ["/run/lmstudio_connection","/run/ollama_connection"]:
expected=[200,400]
return data, expected
# ---------------------------------- Tests ----------------------------------
def test_validity_single(self, endpoint: str):
expected=[200]
if endpoint == "/set/enable/websocket_server":
expected=[200,400]
resp = self.client.send_request(endpoint)
status = resp.get("status")
result = resp.get("result")
self._record(endpoint, status, result, expected)
ok = status in expected
tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}"
print(f"[Validity] {endpoint} -> {tag} ({status})")
return ok
def test_set_data_single(self, endpoint: str):
data, expected = self._gen_set_data(endpoint)
if expected == [404]:
self._record(endpoint, None, None, expected)
print(f"[SetData] {endpoint} -> {Color.RED}UNKNOWN{Color.RESET}")
return False
# data が None でも expected に 400 が含まれていれば送信してテスト
if data is None and 400 not in expected:
self._record(endpoint, None, None, expected)
print(f"[SetData] {endpoint} -> {Color.YELLOW}SKIP(no data){Color.RESET}")
return True
resp = self.client.send_request(endpoint, data, silent=self.silent)
status = resp.get("status")
result = resp.get("result")
self._record(endpoint, status, result, expected)
ok = status in expected
tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}"
print(f"[SetData] {endpoint} -> {tag} ({status}) data={data}")
return ok
def test_run_single(self, endpoint: str):
data, expected = self._gen_run_data(endpoint)
if expected == [401]:
self._record(endpoint, None, None, expected)
print(f"[Run] {endpoint} -> {Color.YELLOW}SKIP(401){Color.RESET}")
return True
resp = self.client.send_request(endpoint, data, silent=self.silent)
status = resp.get("status")
result = resp.get("result")
self._record(endpoint, status, result, expected)
ok = status in expected
tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}"
print(f"[Run] {endpoint} -> {tag} ({status})")
return ok
def test_delete_single(self, endpoint: str):
expected=[200]
resp = self.client.send_request(endpoint, silent=self.silent)
status = resp.get("status")
result = resp.get("result")
self._record(endpoint, status, result, expected)
ok = status in expected
tag = f"{Color.GREEN}PASS{Color.RESET}" if ok else f"{Color.RED}FAIL{Color.RESET}"
print(f"[Delete] {endpoint} -> {tag} ({status})")
return ok
def run_all(self):
print(f"{Color.BOLD}=== 有効/無効エンドポイントテスト ==={Color.RESET}")
for ep in self.validity_endpoints:
self.test_validity_single(ep)
print(f"{Color.BOLD}=== データ設定エンドポイントテスト ==={Color.RESET}")
for ep in self.set_data_endpoints:
self.test_set_data_single(ep)
print(f"{Color.BOLD}=== 実行エンドポイントテスト ==={Color.RESET}")
for ep in self.run_endpoints:
self.test_run_single(ep)
print(f"{Color.BOLD}=== 削除エンドポイントテスト ==={Color.RESET}")
for ep in self.delete_data_endpoints:
self.test_delete_single(ep)
def run_random(self, iterations: int = 500):
import random
print(f"{Color.BOLD}=== ランダムアクセステスト(iter={iterations}) ==={Color.RESET}")
groups = ["validity","set","run","delete"]
for i in range(iterations):
g = random.choice(groups)
if g == "validity":
ep = random.choice(self.validity_endpoints)
self.test_validity_single(ep)
elif g == "set":
ep = random.choice(self.set_data_endpoints)
self.test_set_data_single(ep)
elif g == "run":
ep = random.choice(self.run_endpoints)
self.test_run_single(ep)
else:
ep = random.choice(self.delete_data_endpoints)
self.test_delete_single(ep)
# 最後に disable 系を OFF
for ep in self.validity_endpoints:
if ep.startswith("/set/disable/"):
self.client.send_request(ep, silent=True)
def run_specific_random(self, iterations: int = 200):
import random
print(f"{Color.BOLD}=== 特定(osc/websocket)ランダムテスト(iter={iterations}) ==={Color.RESET}")
set_specific = ["/set/data/osc_ip_address","/set/data/osc_port","/set/data/websocket_host","/set/data/websocket_port"]
for i in range(iterations):
ep = random.choice(set_specific)
self.test_set_data_single(ep)
def summary(self):
total = len(self.results)
passed = sum(1 for r in self.results.values() if r["success"])
skipped = sum(1 for r in self.results.values() if r["expected_status"] == [401])
failed = total - passed - skipped
print(f"\n{Color.BOLD}==== テストサマリー ==== {Color.RESET}")
print(f"総数: {total} / 成功: {passed} / 失敗: {failed} / スキップ(401): {skipped}")
if failed:
print(f"{Color.RED}失敗詳細:{Color.RESET}")
for ep, r in self.results.items():
if not r["success"] and r["expected_status"] != [401]:
print(f"- {ep} status={r['status']} expected={r['expected_status']} result={r['result']}")
print(f"{Color.BOLD}======================={Color.RESET}\n")
# インタラクティブ指定によるエクスポート
if self.export_path:
try:
self.export_results(self.export_path)
print(f"{Color.GREEN}[EXPORT]{Color.RESET} JSONに書き出しました: {self.export_path}")
if self.export_csv:
csv_path = self._derive_csv_path(self.export_path)
self.export_results_csv(csv_path)
print(f"{Color.GREEN}[EXPORT]{Color.RESET} CSVに書き出しました: {csv_path}")
except Exception as e:
print(f"{Color.RED}[EXPORT ERROR]{Color.RESET} {e}")
def _derive_csv_path(self, json_path: str) -> str:
base, ext = os.path.splitext(json_path)
return base + '.csv'
def export_results(self, filename: str):
"""結果をJSONファイルへ書き出し"""
payload = {
"generated_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
"total": len(self.results),
"results": self.results,
}
with open(filename, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
def export_results_csv(self, filename: str):
"""結果をCSVへ書き出し (簡易)"""
import csv
with open(filename, "w", encoding="utf-8", newline="") as f:
w = csv.writer(f)
w.writerow(["endpoint", "status", "expected", "success"])
for ep, r in self.results.items():
w.writerow([ep, r.get("status"), ",".join(map(str, r.get("expected_status", []))), r.get("success")])
def run_interactive_mode(client: TestClient):
"""対話モードでテストを実行"""
print(f"{Color.BOLD}=== 対話モード ==={Color.RESET}")
print("エンドポイントとデータを指定してテストを実行します。")
print("終了するには 'quit' または 'exit' と入力してください。\n")
while True:
try:
# エンドポイントの入力
endpoint = input(f"{Color.CYAN}エンドポイントを入力 (例: /get/data/version): {Color.RESET}").strip()
if endpoint.lower() in ['quit', 'exit', 'q']:
break
if not endpoint:
print(f"{Color.YELLOW}エンドポイントを入力してください{Color.RESET}\n")
continue
# データの入力
data_input = input(f"{Color.CYAN}データを入力 (JSON形式, なければEnter): {Color.RESET}").strip()
data = None
if data_input:
try:
data = json.loads(data_input)
except json.JSONDecodeError:
print(f"{Color.YELLOW}JSONとしてパースできませんでした。文字列として送信します{Color.RESET}")
data = data_input
# リクエスト送信
client.send_request(endpoint, data)
except KeyboardInterrupt:
print(f"\n{Color.YELLOW}中断されました{Color.RESET}")
break
except Exception as e:
print(f"{Color.RED}エラー: {e}{Color.RESET}\n")
def main():
"""メイン処理"""
print(f"{Color.BOLD}{'='*60}{Color.RESET}")
print(f"{Color.BOLD} VRCT Backend Test Client{Color.RESET}")
print(f"{Color.BOLD}{'='*60}{Color.RESET}\n")
client = None
try:
# テストクライアントの初期化
client = TestClient()
# モード選択
print("モードを選択してください:")
print("1. サンプルテストを実行")
print("2. 対話モードで実行")
print("3. 自動テスト(全)を実行")
print("4. ランダムアクセステスト")
print("5. 特定(osc/websocket)ランダムテスト")
mode = input(f"{Color.CYAN}選択 (1-5): {Color.RESET}").strip()
# 追加オプション選択
silent_choice = input(f"{Color.CYAN}詳細ログを抑制しますか? (y/N): {Color.RESET}").strip().lower()
silent = silent_choice == 'y'
export_choice = input(f"{Color.CYAN}結果をJSON出力しますか? (y/N): {Color.RESET}").strip().lower()
export_path = None
export_csv = False
if export_choice == 'y':
default_name = f"test_results_{int(time.time())}.json"
path_in = input(f"{Color.CYAN}出力ファイル名[{default_name}]: {Color.RESET}").strip()
export_path = path_in or default_name
csv_choice = input(f"{Color.CYAN}CSVも出力しますか? (y/N): {Color.RESET}").strip().lower()
export_csv = csv_choice == 'y'
print()
if mode == "1":
run_example_tests(client)
elif mode == "2":
run_interactive_mode(client)
elif mode == "3":
tester = AutomatedEndpointTester(client, silent=silent, export_path=export_path, export_csv=export_csv)
tester.run_all()
tester.summary()
elif mode == "4":
tester = AutomatedEndpointTester(client, silent=silent, export_path=export_path, export_csv=export_csv)
tester.run_random()
tester.summary()
elif mode == "5":
tester = AutomatedEndpointTester(client, silent=silent, export_path=export_path, export_csv=export_csv)
tester.run_specific_random()
tester.summary()
else:
print(f"{Color.YELLOW}無効な選択です。対話モードを開始します。{Color.RESET}\n")
run_interactive_mode(client)
except KeyboardInterrupt:
print(f"\n{Color.YELLOW}ユーザーによって中断されました{Color.RESET}")
except Exception as e:
print(f"{Color.RED}エラーが発生しました: {e}{Color.RESET}")
import traceback
traceback.print_exc()
finally:
if client:
client.cleanup()
if __name__ == "__main__":
main()