mainloopモジュールのドキュメントを新規作成し、Mainクラスのstart()/stop()メソッドを追加。受信スレッドとハンドラスレッドのライフサイクル管理を明示化し、エラーハンドリングを強化。ポーリング負荷を低減するためにqueue.get()にタイムアウトを追加。

This commit is contained in:
misyaguziya
2025-10-09 22:52:15 +09:00
parent 6f33f8afbd
commit 0130792682
2 changed files with 116 additions and 40 deletions

View File

@@ -0,0 +1,43 @@
## mainloop モジュールsrc-python/mainloop.py
このドキュメントは `mainloop.py` の実装と、2025-10-09 に行ったリファクタの概要をまとめます。`mainloop` は標準入力から JSON を受け取り、`controller` のメソッドにルーティングして標準出力へ JSON で応答を返す小さなメインループです。
重要な変更点2025-10-09:
- `Main` クラスに `start()` / `stop()` を追加し、受信スレッドとハンドラスレッドのライフサイクル管理を明示化しました。
- `queue.get(timeout=...)` を使ってポーリング負荷を下げ、`_stop_event` による安全なシャットダウンを可能にしました。
- 標準入力の JSON パースエラーと一般例外のハンドリングを強化しました。
- `startReceiver()` / `startHandler()` を使って個別にスレッドを起動することも可能です。
クラス: Main
- __init__(controller_instance: Controller, mapping_data: dict) -> None
- `controller_instance`: `Controller` のインスタンス。
- `mapping_data`: `mainloop` 内で使用する `mapping`(エンドポイント -> ハンドラ情報)辞書。
- start() -> None
- 内部で `startReceiver()``startHandler()` を呼び、両スレッドを起動します。
- stop(wait: float = 2.0) -> None
- シャットダウンシグナルをセットし、スレッド終了を待ちます(デフォルト 2 秒)。
使い方(例):
```python
from mainloop import Main, mapping, controller
main_instance = Main(controller_instance=controller, mapping_data=mapping)
main_instance.start()
# 実行中に別スレッドや外部シグナルで停止させる
main_instance.stop()
```
既存のスクリプト互換性:
- 既存コードが `startReceiver()``startHandler()` を直接呼んでいる場合、そのまま動作します。`start()` / `stop()` を使うと簡潔に起動 / 停止が行えます。
注意点と推奨事項:
- `stop()` を呼ばないとバックグラウンドスレッドがデーモンであってもプロセス終了前にクリーンアップが不十分になる場合があります。アプリ終了時は `stop()` を呼ぶことを推奨します。
- `queue.get(timeout=...)` を使うことで即時性よりも CPU 使用量の低減を優先しています。非常に低レイテンシが必要なケースでは timeout を短くしてください(ただし CPU 使用量に注意)。
スクリプト連携:
- `mainloop.mapping``mainloop.run_mapping``scripts/print_mapping.py` などのツールから直接参照されます。mapping のキー/値を変更する場合はそれらのスクリプトも確認してください。
変更履歴:
- 2025-10-09: start/stop ライフサイクル、タイムアウト付きキュー取得、エラー処理強化を追加。

View File

@@ -2,8 +2,8 @@ import sys
import json import json
import time import time
from typing import Any, Tuple from typing import Any, Tuple
from threading import Thread from threading import Thread, Event
from queue import Queue from queue import Queue, Empty
import logging import logging
from controller import Controller # noqa: E402 from controller import Controller # noqa: E402
from utils import printLog, printResponse, errorLogging, encodeBase64 # noqa: E402 from utils import printLog, printResponse, errorLogging, encodeBase64 # noqa: E402
@@ -358,31 +358,47 @@ init_mapping = {key:value for key, value in mapping.items() if key.startswith("/
controller.setInitMapping(init_mapping) controller.setInitMapping(init_mapping)
class Main: class Main:
def __init__(self, controller_instance, mapping_data) -> None: def __init__(self, controller_instance: Controller, mapping_data: dict) -> None:
# queue holds tuples of (endpoint, data) # queue holds tuples of (endpoint, data)
self.queue: Queue[Tuple[str, Any]] = Queue() self.queue: Queue[Tuple[str, Any]] = Queue()
self.main_loop = True self._stop_event: Event = Event()
self.controller = controller_instance self.controller = controller_instance
self.mapping = mapping_data self.mapping = mapping_data
self._threads: list[Thread] = []
def receiver(self) -> None: def receiver(self) -> None:
while True: """Read lines from stdin, parse JSON and enqueue requests.
received_data = sys.stdin.readline().strip()
received_data = json.loads(received_data)
if received_data: Uses blocking readline but honors stop via _stop_event checked between reads.
endpoint = received_data.get("endpoint", None) """
data = received_data.get("data", None) while not self._stop_event.is_set():
data = encodeBase64(data) if data is not None else None try:
printLog(endpoint, {"receive_data": data}) line = sys.stdin.readline()
self.queue.put((endpoint, data)) if not line:
# EOF reached; sleep briefly and re-check stop event
time.sleep(0.1)
continue
received_data = json.loads(line.strip())
if received_data:
endpoint = received_data.get("endpoint")
data = received_data.get("data")
data = encodeBase64(data) if data is not None else None
printLog(endpoint, {"receive_data": data})
self.queue.put((endpoint, data))
except json.JSONDecodeError:
# malformed input; log and continue
errorLogging()
except Exception:
errorLogging()
def startReceiver(self) -> None: def startReceiver(self) -> None:
th_receiver = Thread(target=self.receiver) th_receiver = Thread(target=self.receiver, name="main_receiver")
th_receiver.daemon = True th_receiver.daemon = True
th_receiver.start() th_receiver.start()
self._threads.append(th_receiver)
def handleRequest(self, endpoint, data=None) -> tuple: def handleRequest(self, endpoint: str, data: Any = None) -> tuple:
result = None # デフォルト値を設定 result = None # デフォルト値を設定
status = 500 # デフォルト値を設定 status = 500 # デフォルト値を設定
@@ -396,45 +412,62 @@ class Main:
else: else:
try: try:
response = handler["variable"](data) response = handler["variable"](data)
status = response.get("status", None) status = response.get("status")
result = response.get("result", None) result = response.get("result")
time.sleep(0.2) # 処理の安定化のために少し待機 time.sleep(0.2) # 処理の安定化のために少し待機
except Exception as e: except Exception:
errorLogging() errorLogging()
result = str(e) result = "Internal error"
status = 500 status = 500
return result, status return result, status
def handler(self) -> None: def handler(self) -> None:
while True: """Main handler loop. Uses queue.get with timeout to avoid busy polling and to allow graceful shutdown."""
if not self.queue.empty(): while not self._stop_event.is_set():
try: try:
endpoint, data = self.queue.get() endpoint, data = self.queue.get(timeout=0.5)
result, status = self.handleRequest(endpoint, data) except Empty:
except Exception as e: continue
errorLogging()
result = str(e)
status = 500
if status == 423: try:
self.queue.put((endpoint, data)) result, status = self.handleRequest(endpoint, data)
else: except Exception:
printLog(endpoint, {"status": status, "send_data": result}) errorLogging()
printResponse(status, endpoint, result) result = "Internal error"
time.sleep(0.1) status = 500
if status == 423:
# Locked endpoint: requeue with a small delay to avoid tight loop
time.sleep(0.1)
self.queue.put((endpoint, data))
else:
printLog(endpoint, {"status": status, "send_data": result})
printResponse(status, endpoint, result)
def startHandler(self) -> None: def startHandler(self) -> None:
th_handler = Thread(target=self.handler) th_handler = Thread(target=self.handler, name="main_handler")
th_handler.daemon = True th_handler.daemon = True
th_handler.start() th_handler.start()
self._threads.append(th_handler)
def start(self) -> None: def start(self) -> None:
while self.main_loop: """Start receiver and handler threads."""
time.sleep(1) self.startReceiver()
self.startHandler()
def stop(self) -> None: def stop(self, wait: float = 2.0) -> None:
self.main_loop = False """Signal threads to stop and wait for them to finish.
Args:
wait: maximum seconds to wait for threads to join.
"""
self._stop_event.set()
# give threads a chance to exit
start = time.time()
for th in self._threads:
remaining = max(0.0, wait - (time.time() - start))
th.join(timeout=remaining)
# 外部から参照可能なインスタンスを提供 # 外部から参照可能なインスタンスを提供
main_instance = Main(controller_instance=controller, mapping_data=mapping) main_instance = Main(controller_instance=controller, mapping_data=mapping)