OSCモジュールのドキュメントを更新し、使用例や注意点を追加。型注釈を強化し、エラーハンドリングを改善。OSCHandlerクラスの初期化メソッドを修正し、サービスのアドバタイズ中の例外処理を追加。テストファイルを新規作成し、OSCモジュールのインポートテストを追加。
This commit is contained in:
@@ -1,82 +1,116 @@
|
||||
"""OSC helpers and a thin OSCQuery-enabled server wrapper.
|
||||
|
||||
This module provides `OSCHandler`, a convenience wrapper used by the
|
||||
application to send OSC messages and expose OSCQuery endpoints when the
|
||||
target address is localhost. The implementation is defensive: missing
|
||||
utilities are handled gracefully and logging helpers are used where
|
||||
available.
|
||||
"""
|
||||
|
||||
import time
|
||||
from typing import Any
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
from time import sleep
|
||||
from threading import Thread
|
||||
from pythonosc import udp_client, dispatcher, osc_server
|
||||
from tinyoscquery.queryservice import OSCQueryService
|
||||
from tinyoscquery.query import OSCQueryBrowser, OSCQueryClient
|
||||
from tinyoscquery.utility import get_open_udp_port, get_open_tcp_port
|
||||
from tinyoscquery.shared.node import OSCAccess
|
||||
try:
|
||||
from tinyoscquery.queryservice import OSCQueryService
|
||||
from tinyoscquery.query import OSCQueryBrowser, OSCQueryClient
|
||||
from tinyoscquery.utility import get_open_udp_port, get_open_tcp_port
|
||||
from tinyoscquery.shared.node import OSCAccess
|
||||
except Exception:
|
||||
# tinyoscquery is optional for non-local usage; functionality that
|
||||
# depends on it will be disabled if it's missing.
|
||||
OSCQueryService = None # type: ignore
|
||||
OSCQueryBrowser = None # type: ignore
|
||||
OSCQueryClient = None # type: ignore
|
||||
def get_open_udp_port() -> int: # type: ignore
|
||||
return 0
|
||||
|
||||
def get_open_tcp_port() -> int: # type: ignore
|
||||
return 0
|
||||
OSCAccess = None # type: ignore
|
||||
|
||||
try:
|
||||
from utils import errorLogging
|
||||
except ImportError:
|
||||
def errorLogging():
|
||||
except Exception:
|
||||
def errorLogging() -> None:
|
||||
import traceback
|
||||
print("Error occurred:", traceback.format_exc())
|
||||
|
||||
class OSCHandler:
|
||||
def __init__(self, ip_address="127.0.0.1", port=9000) -> None:
|
||||
"""Thin wrapper managing OSC send/receive and optional OSCQuery advertising.
|
||||
|
||||
if ip_address in ["127.0.0.1", "localhost"]:
|
||||
self.is_osc_query_enabled = True
|
||||
else:
|
||||
self.is_osc_query_enabled = False
|
||||
Args:
|
||||
ip_address: OSC server client target / bind address
|
||||
port: UDP port to send to
|
||||
"""
|
||||
|
||||
self.osc_ip_address = ip_address
|
||||
self.osc_port = port
|
||||
self.osc_parameter_muteself = "/avatar/parameters/MuteSelf"
|
||||
self.osc_parameter_chatbox_typing = "/chatbox/typing"
|
||||
self.osc_parameter_chatbox_input = "/chatbox/input"
|
||||
def __init__(self, ip_address: str = "127.0.0.1", port: int = 9000) -> None:
|
||||
|
||||
self.is_osc_query_enabled: bool = ip_address in ["127.0.0.1", "localhost"]
|
||||
|
||||
self.osc_ip_address: str = ip_address
|
||||
self.osc_port: int = port
|
||||
self.osc_parameter_muteself: str = "/avatar/parameters/MuteSelf"
|
||||
self.osc_parameter_chatbox_typing: str = "/chatbox/typing"
|
||||
self.osc_parameter_chatbox_input: str = "/chatbox/input"
|
||||
self.udp_client = udp_client.SimpleUDPClient(self.osc_ip_address, self.osc_port)
|
||||
self.osc_server_name = "VRChat-Client"
|
||||
self.osc_server_name: str = "VRChat-Client"
|
||||
self.osc_server = None
|
||||
self.osc_query_service = None
|
||||
self.osc_query_service_name = "VRCT"
|
||||
self.osc_server_ip_address = ip_address
|
||||
self.http_port = None
|
||||
self.osc_server_port = None
|
||||
self.dict_filter_and_target = {}
|
||||
self.osc_query_service_name: str = "VRCT"
|
||||
self.osc_server_ip_address: str = ip_address
|
||||
self.http_port: Optional[int] = None
|
||||
self.osc_server_port: Optional[int] = None
|
||||
self.dict_filter_and_target: Dict[str, Callable] = {}
|
||||
self.browser = None
|
||||
|
||||
def getIsOscQueryEnabled(self) -> bool:
|
||||
"""Return whether OSCQuery support is enabled (local addresses only)."""
|
||||
return self.is_osc_query_enabled
|
||||
|
||||
def setOscIpAddress(self, ip_address:str) -> None:
|
||||
if ip_address in ["127.0.0.1", "localhost"]:
|
||||
self.is_osc_query_enabled = True
|
||||
else:
|
||||
self.is_osc_query_enabled = False
|
||||
def setOscIpAddress(self, ip_address: str) -> None:
|
||||
"""Change the OSC target IP address and reinitialize services."""
|
||||
self.is_osc_query_enabled = ip_address in ["127.0.0.1", "localhost"]
|
||||
|
||||
self.oscServerStop()
|
||||
self.osc_ip_address = ip_address
|
||||
self.udp_client = udp_client.SimpleUDPClient(self.osc_ip_address, self.osc_port)
|
||||
self.receiveOscParameters()
|
||||
|
||||
def setOscPort(self, port:int) -> None:
|
||||
def setOscPort(self, port: int) -> None:
|
||||
"""Change the OSC UDP port used for sending and reinitialize services."""
|
||||
self.oscServerStop()
|
||||
self.osc_port = port
|
||||
self.udp_client = udp_client.SimpleUDPClient(self.osc_ip_address, self.osc_port)
|
||||
self.receiveOscParameters()
|
||||
|
||||
# send OSC message typing
|
||||
def sendTyping(self, flag:bool=False) -> None:
|
||||
def sendTyping(self, flag: bool = False) -> None:
|
||||
"""Send /chatbox/typing with a boolean flag."""
|
||||
self.udp_client.send_message(self.osc_parameter_chatbox_typing, [flag])
|
||||
|
||||
# send OSC message
|
||||
def sendMessage(self, message:str="", notification:bool=True) -> None:
|
||||
def sendMessage(self, message: str = "", notification: bool = True) -> None:
|
||||
"""Send /chatbox/input if message is non-empty.
|
||||
|
||||
The second argument historically was a boolean flag for clearing; we keep
|
||||
compatibility by sending [message, True, notification].
|
||||
"""
|
||||
if len(message) > 0:
|
||||
self.udp_client.send_message(self.osc_parameter_chatbox_input, [f"{message}", True, notification])
|
||||
|
||||
def getOSCParameterValue(self, address:str) -> Any:
|
||||
def getOSCParameterValue(self, address: str) -> Any:
|
||||
if not self.is_osc_query_enabled:
|
||||
# OSCQueryが無効な場合はNoneを返す
|
||||
return None
|
||||
|
||||
value = None
|
||||
value: Any = None
|
||||
try:
|
||||
# browserインスタンスを再利用し、毎回の生成と破棄を避ける
|
||||
if self.browser is None:
|
||||
# OSCQueryBrowser may not be available; guard
|
||||
if OSCQueryBrowser is None:
|
||||
return None
|
||||
self.browser = OSCQueryBrowser()
|
||||
sleep(1) # 初回のみスリープ
|
||||
|
||||
@@ -99,15 +133,22 @@ class OSCHandler:
|
||||
self.browser = None
|
||||
return value
|
||||
|
||||
def getOSCParameterMuteSelf(self) -> bool:
|
||||
def getOSCParameterMuteSelf(self) -> Optional[bool]:
|
||||
"""Return the value of the MuteSelf parameter when available, else None."""
|
||||
return self.getOSCParameterValue(self.osc_parameter_muteself)
|
||||
|
||||
def setDictFilterAndTarget(self, dict_filter_and_target:dict) -> None:
|
||||
def setDictFilterAndTarget(self, dict_filter_and_target: Dict[str, Callable]) -> None:
|
||||
"""Set the mapping from OSC address filters to handler callables."""
|
||||
self.dict_filter_and_target = dict_filter_and_target
|
||||
|
||||
def receiveOscParameters(self) -> None:
|
||||
if self.is_osc_query_enabled is False:
|
||||
# OSCQueryが無効な場合は何もしない
|
||||
"""Start a local OSC server and advertise OSCQuery endpoints when supported.
|
||||
|
||||
If `tinyoscquery` is not available or OSCQuery is disabled, this is a
|
||||
no-op.
|
||||
"""
|
||||
if not self.is_osc_query_enabled or OSCQueryService is None:
|
||||
# OSCQuery が無効またはライブラリが無い場合は何もしない
|
||||
return
|
||||
|
||||
self.osc_server_port = get_open_udp_port()
|
||||
@@ -120,28 +161,39 @@ class OSCHandler:
|
||||
|
||||
while True:
|
||||
try:
|
||||
# osc_server_name + UTC timestampでユニークなサービス名を生成
|
||||
# osc_server_name + UTC timestamp でユニークなサービス名を生成
|
||||
service_name = f"{self.osc_query_service_name}:{int(time.time())}"
|
||||
self.osc_query_service = OSCQueryService(service_name, self.http_port, self.osc_server_port)
|
||||
for filter, target in self.dict_filter_and_target.items():
|
||||
self.osc_query_service.advertise_endpoint(filter, access=OSCAccess.READWRITE_VALUE)
|
||||
# OSCAccess may be None when tinyoscquery is not present; guard
|
||||
if OSCAccess is not None:
|
||||
self.osc_query_service.advertise_endpoint(filter, access=OSCAccess.READWRITE_VALUE)
|
||||
break
|
||||
except Exception:
|
||||
errorLogging()
|
||||
sleep(1)
|
||||
|
||||
def oscServerServe(self) -> None:
|
||||
"""Run the OSC server loop with a longer poll interval to reduce CPU."""
|
||||
# ポーリング間隔を長くして(2秒から10秒に)CPUの使用率を削減
|
||||
self.osc_server.serve_forever(10)
|
||||
if self.osc_server is not None:
|
||||
self.osc_server.serve_forever(10)
|
||||
|
||||
def oscServerStop(self) -> None:
|
||||
"""Stop and clean up any running OSC server and OSCQuery service."""
|
||||
if isinstance(self.osc_server, osc_server.ThreadingOSCUDPServer):
|
||||
self.osc_server.shutdown()
|
||||
try:
|
||||
self.osc_server.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
self.osc_server = None
|
||||
if isinstance(self.osc_query_service, OSCQueryService):
|
||||
self.osc_query_service.http_server.shutdown()
|
||||
if OSCQueryService is not None and isinstance(self.osc_query_service, OSCQueryService):
|
||||
try:
|
||||
self.osc_query_service.http_server.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
self.osc_query_service = None
|
||||
# browserがある場合はクリーンアップ
|
||||
# browser がある場合はクリーンアップ
|
||||
if self.browser is not None:
|
||||
try:
|
||||
if hasattr(self.browser, 'zc') and self.browser.zc is not None:
|
||||
|
||||
Reference in New Issue
Block a user