Files
VRCT/src-python/models/telemetry/__init__.py

198 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
テレメトリAptabase管理モジュール
パブリック API を提供し、内部実装を隠蔽する。
"""
import asyncio
import threading
from concurrent.futures import Future
# Allow running as a script for quick verification.
try:
from .state import TelemetryState
from .core import TelemetryCore
except ImportError:
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
from models.telemetry.state import TelemetryState
from models.telemetry.core import TelemetryCore
class Telemetry:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.state = TelemetryState()
self.core = TelemetryCore(self.state)
self._loop = None
self._loop_thread = None
self._init_called = False # init()が呼ばれたかを追跡(重複初期化防止)
self._initialized = True
def _start_event_loop(self):
"""バックグラウンドでイベントループを開始"""
if self._loop_thread is not None and self._loop_thread.is_alive():
return
def run_loop():
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
self._loop_thread = threading.Thread(target=run_loop, daemon=True, name="telemetry_loop")
self._loop_thread.start()
# ループが開始されるまで待機
while self._loop is None:
pass
def _stop_event_loop(self, timeout: float = 5.0):
"""イベントループを停止(フラッシュ完了を待つ)"""
if self._loop is None:
return
# ループにstop()を予約
self._loop.call_soon_threadsafe(self._loop.stop)
# ループスレッド終了を待機
if self._loop_thread is not None:
self._loop_thread.join(timeout=timeout)
self._loop = None
self._loop_thread = None
def _run_async(self, coro):
"""同期コンテキストから非同期関数を実行"""
if self._loop is None:
return
future = asyncio.run_coroutine_threadsafe(coro, self._loop)
try:
# タイムアウト付きで待機(ブロッキングしすぎないように)
return future.result(timeout=5.0)
except Exception:
# テレメトリ失敗は黙殺
pass
def _schedule_async(self, coro):
"""非同期タスクをバックグラウンドでスケジュール(待機しない)"""
if self._loop is None:
return
try:
asyncio.run_coroutine_threadsafe(coro, self._loop)
except Exception:
pass
def init(self, enabled: bool, app_version: str = "1.0.0"):
"""テレメトリ初期化(同期インターフェース)
重要:このメソッドは冪等です。複数回呼ばれても安全です。
既に初期化済みの場合は、有効/無効の状態のみを更新します。
これにより、設定変更時などに誤ってapp_startedイベントが重複送信される問題を防ぎます。
"""
# 既に初期化済みの場合は、状態の更新のみ
if self._init_called:
self.state.set_enabled(enabled)
return
# 初回初期化
self._init_called = True
self.state.set_enabled(enabled)
if enabled:
self._start_event_loop()
self._run_async(self._init_async(app_version))
async def _init_async(self, app_version: str):
"""非同期初期化処理"""
await self.core.start(app_version=app_version)
await self.core.send_event("app_started")
def shutdown(self):
"""テレメトリ終了(同期インターフェース)
重要Tauri sidecar環境では、このメソッド実行後にプロセス終了が発生します。
app_closed イベントが確実に送信されるように、以下の手順を実行します:
1. app_closed イベント送信を同期待機
2. Aptabase クライアント停止(フラッシュ含む)
3. イベントループ完全停止を待機
"""
try:
# app_closed 送信とクライアント停止を同期待機
if self.state.is_enabled():
try:
# _run_async で最大5秒間待機Aptabase のフラッシュ含む)
self._run_async(self._shutdown_async())
except Exception:
pass
# イベントループを完全停止(フラッシュ確認を待つ)
# sidecar 終了前に確実に完了させるため、タイムアウトを長めに設定
self._stop_event_loop(timeout=5.0)
except Exception:
# どの段階で失敗してもプロセス終了処理は進行させる
pass
finally:
# 状態をリセット
self.state.reset()
self._init_called = False # 初期化フラグもリセット
async def _shutdown_async(self):
"""非同期終了処理"""
await self.core.send_event("app_closed")
await self.core.stop()
def track(self, event: str, payload: dict = None):
"""汎用イベント送信(同期インターフェース)"""
if not self.state.is_enabled():
return
self._schedule_async(self.core.send_event(event, payload))
def track_core_feature(self, feature: str):
"""コア機能イベント送信(同期インターフェース)"""
if not self.state.is_enabled():
return
if self.core.is_duplicate_core_feature(feature):
return
self._schedule_async(self._track_core_feature_async(feature))
async def _track_core_feature_async(self, feature: str):
"""非同期コア機能送信処理"""
await self.core.send_event("core_feature", {"feature": feature})
self.state.record_feature_sent(feature)
def is_enabled(self) -> bool:
"""有効状態確認"""
return self.state.is_enabled()
def get_state(self) -> dict:
"""内部状態取得(デバッグ用)"""
return self.state.get_debug_info()
if __name__ == "__main__":
# 同期インターフェースのデモ
telemetry = Telemetry()
telemetry.init(enabled=True)
telemetry.track("debug_test", {"message": "telemetry main demo"})
telemetry.track_core_feature("text_input")
print("state:", telemetry.get_state())
# イベント送信完了を待つ
import time
time.sleep(2)
telemetry.shutdown()
print("telemetry demo finished")