mainloopのスレッド管理を改善し、マルチワーカー化を実装。デバイス管理の初期化を遅延させ、エラーハンドリングを強化。ドキュメントを更新し、設定の変更点を明示化。

This commit is contained in:
misyaguziya
2025-10-13 08:28:27 +09:00
parent 0130792682
commit 914789c9cb
10 changed files with 328 additions and 86 deletions

View File

@@ -2,7 +2,7 @@ import sys
import json
import time
from typing import Any, Tuple
from threading import Thread, Event
from threading import Thread, Event, Lock
from queue import Queue, Empty
import logging
from controller import Controller # noqa: E402
@@ -357,14 +357,38 @@ mapping = {
init_mapping = {key:value for key, value in mapping.items() if key.startswith("/get/data/")}
controller.setInitMapping(init_mapping)
DEFAULT_WORKER_COUNT = 3 # 必要なら増やす
class Main:
def __init__(self, controller_instance: Controller, mapping_data: dict) -> None:
# queue holds tuples of (endpoint, data)
def __init__(self, controller_instance: Controller, mapping_data: dict, worker_count: int = DEFAULT_WORKER_COUNT) -> None:
self.queue: Queue[Tuple[str, Any]] = Queue()
self._stop_event: Event = Event()
self.controller = controller_instance
self.mapping = mapping_data
self._threads: list[Thread] = []
self._worker_count = worker_count
# エンドポイントごとの排他制御用 Lock を作成
# enable/disable ペアは同じロックキーに正規化する
def _canonical_lock_key(endpoint: str) -> str:
if not isinstance(endpoint, str):
return str(endpoint)
if endpoint.startswith("/set/enable/"):
return "/lock/set/" + endpoint[len("/set/enable/"):]
if endpoint.startswith("/set/disable/"):
return "/lock/set/" + endpoint[len("/set/disable/"):]
return endpoint
# mapping に含まれるすべてのエンドポイントを走査して正規化キー集合を作る
lock_keys = set()
for key in self.mapping.keys():
lock_keys.add(_canonical_lock_key(key))
# 正規化キーごとに Lock を割り当てる
self._endpoint_locks: dict[str, Lock] = {k: Lock() for k in lock_keys}
# 正規化関数をインスタンスに保存
self._canonical_lock_key = _canonical_lock_key
def receiver(self) -> None:
"""Read lines from stdin, parse JSON and enqueue requests.
@@ -422,23 +446,51 @@ class Main:
return result, status
def _call_handler(self, endpoint: str, data: Any = None) -> tuple:
result = None
status = 500
handler = self.mapping.get(endpoint)
if handler is None:
response = "Invalid endpoint"
status = 404
else:
try:
response = handler["variable"](data)
status = response.get("status", 500)
result = response.get("result", None)
time.sleep(0.2)
except Exception:
errorLogging()
result = "Internal error"
status = 500
return result, status
def handler(self) -> None:
"""Main handler loop. Uses queue.get with timeout to avoid busy polling and to allow graceful shutdown."""
while not self._stop_event.is_set():
try:
endpoint, data = self.queue.get(timeout=0.5)
except Empty:
continue
try:
result, status = self.handleRequest(endpoint, data)
except Exception:
errorLogging()
result = "Internal error"
status = 500
# endpoint をロック用の正規化キーに変換してロックを取得
lock_key = self._canonical_lock_key(endpoint)
lock = self._endpoint_locks.get(lock_key)
if lock is not None:
acquired = lock.acquire(blocking=False)
if not acquired:
# 同一機能で既に処理中 -> 少し待って再キュー
time.sleep(0.05)
self.queue.put((endpoint, data))
continue
try:
result, status = self._call_handler(endpoint, data)
finally:
lock.release()
else:
result, status = self._call_handler(endpoint, data)
if status == 423:
# Locked endpoint: requeue with a small delay to avoid tight loop
time.sleep(0.1)
self.queue.put((endpoint, data))
else:
@@ -446,10 +498,11 @@ class Main:
printResponse(status, endpoint, result)
def startHandler(self) -> None:
th_handler = Thread(target=self.handler, name="main_handler")
th_handler.daemon = True
th_handler.start()
self._threads.append(th_handler)
for i in range(max(1, self._worker_count)):
th_handler = Thread(target=self.handler, name=f"main_handler_{i}")
th_handler.daemon = True
th_handler.start()
self._threads.append(th_handler)
def start(self) -> None:
"""Start receiver and handler threads."""