From 013079268284cac12bb214957de2bc8af5ef1bf6 Mon Sep 17 00:00:00 2001 From: misyaguziya <53165965+misyaguziya@users.noreply.github.com> Date: Thu, 9 Oct 2025 22:52:15 +0900 Subject: [PATCH] =?UTF-8?q?mainloop=E3=83=A2=E3=82=B8=E3=83=A5=E3=83=BC?= =?UTF-8?q?=E3=83=AB=E3=81=AE=E3=83=89=E3=82=AD=E3=83=A5=E3=83=A1=E3=83=B3?= =?UTF-8?q?=E3=83=88=E3=82=92=E6=96=B0=E8=A6=8F=E4=BD=9C=E6=88=90=E3=81=97?= =?UTF-8?q?=E3=80=81Main=E3=82=AF=E3=83=A9=E3=82=B9=E3=81=AEstart()/stop()?= =?UTF-8?q?=E3=83=A1=E3=82=BD=E3=83=83=E3=83=89=E3=82=92=E8=BF=BD=E5=8A=A0?= =?UTF-8?q?=E3=80=82=E5=8F=97=E4=BF=A1=E3=82=B9=E3=83=AC=E3=83=83=E3=83=89?= =?UTF-8?q?=E3=81=A8=E3=83=8F=E3=83=B3=E3=83=89=E3=83=A9=E3=82=B9=E3=83=AC?= =?UTF-8?q?=E3=83=83=E3=83=89=E3=81=AE=E3=83=A9=E3=82=A4=E3=83=95=E3=82=B5?= =?UTF-8?q?=E3=82=A4=E3=82=AF=E3=83=AB=E7=AE=A1=E7=90=86=E3=82=92=E6=98=8E?= =?UTF-8?q?=E7=A4=BA=E5=8C=96=E3=81=97=E3=80=81=E3=82=A8=E3=83=A9=E3=83=BC?= =?UTF-8?q?=E3=83=8F=E3=83=B3=E3=83=89=E3=83=AA=E3=83=B3=E3=82=B0=E3=82=92?= =?UTF-8?q?=E5=BC=B7=E5=8C=96=E3=80=82=E3=83=9D=E3=83=BC=E3=83=AA=E3=83=B3?= =?UTF-8?q?=E3=82=B0=E8=B2=A0=E8=8D=B7=E3=82=92=E4=BD=8E=E6=B8=9B=E3=81=99?= =?UTF-8?q?=E3=82=8B=E3=81=9F=E3=82=81=E3=81=ABqueue.get()=E3=81=AB?= =?UTF-8?q?=E3=82=BF=E3=82=A4=E3=83=A0=E3=82=A2=E3=82=A6=E3=83=88=E3=82=92?= =?UTF-8?q?=E8=BF=BD=E5=8A=A0=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src-python/docs/modules/mainloop.md | 43 +++++++++++ src-python/mainloop.py | 113 ++++++++++++++++++---------- 2 files changed, 116 insertions(+), 40 deletions(-) create mode 100644 src-python/docs/modules/mainloop.md diff --git a/src-python/docs/modules/mainloop.md b/src-python/docs/modules/mainloop.md new file mode 100644 index 00000000..897f9d5d --- /dev/null +++ b/src-python/docs/modules/mainloop.md @@ -0,0 +1,43 @@ +## mainloop モジュール(src-python/mainloop.py) + +このドキュメントは `mainloop.py` の実装と、2025-10-09 に行ったリファクタの概要をまとめます。`mainloop` は標準入力から JSON を受け取り、`controller` のメソッドにルーティングして標準出力へ JSON で応答を返す小さなメインループです。 + +重要な変更点(2025-10-09): +- `Main` クラスに `start()` / `stop()` を追加し、受信スレッドとハンドラスレッドのライフサイクル管理を明示化しました。 +- `queue.get(timeout=...)` を使ってポーリング負荷を下げ、`_stop_event` による安全なシャットダウンを可能にしました。 +- 標準入力の JSON パースエラーと一般例外のハンドリングを強化しました。 +- `startReceiver()` / `startHandler()` を使って個別にスレッドを起動することも可能です。 + +クラス: Main +- __init__(controller_instance: Controller, mapping_data: dict) -> None + - `controller_instance`: `Controller` のインスタンス。 + - `mapping_data`: `mainloop` 内で使用する `mapping`(エンドポイント -> ハンドラ情報)辞書。 +- start() -> None + - 内部で `startReceiver()` と `startHandler()` を呼び、両スレッドを起動します。 +- stop(wait: float = 2.0) -> None + - シャットダウンシグナルをセットし、スレッド終了を待ちます(デフォルト 2 秒)。 + +使い方(例): + +```python +from mainloop import Main, mapping, controller + +main_instance = Main(controller_instance=controller, mapping_data=mapping) +main_instance.start() + +# 実行中に別スレッドや外部シグナルで停止させる +main_instance.stop() +``` + +既存のスクリプト互換性: +- 既存コードが `startReceiver()` や `startHandler()` を直接呼んでいる場合、そのまま動作します。`start()` / `stop()` を使うと簡潔に起動 / 停止が行えます。 + +注意点と推奨事項: +- `stop()` を呼ばないとバックグラウンドスレッドがデーモンであってもプロセス終了前にクリーンアップが不十分になる場合があります。アプリ終了時は `stop()` を呼ぶことを推奨します。 +- `queue.get(timeout=...)` を使うことで即時性よりも CPU 使用量の低減を優先しています。非常に低レイテンシが必要なケースでは timeout を短くしてください(ただし CPU 使用量に注意)。 + +スクリプト連携: +- `mainloop.mapping` と `mainloop.run_mapping` は `scripts/print_mapping.py` などのツールから直接参照されます。mapping のキー/値を変更する場合はそれらのスクリプトも確認してください。 + +変更履歴: +- 2025-10-09: start/stop ライフサイクル、タイムアウト付きキュー取得、エラー処理強化を追加。 diff --git a/src-python/mainloop.py b/src-python/mainloop.py index 9315ab56..644037a2 100644 --- a/src-python/mainloop.py +++ b/src-python/mainloop.py @@ -2,8 +2,8 @@ import sys import json import time from typing import Any, Tuple -from threading import Thread -from queue import Queue +from threading import Thread, Event +from queue import Queue, Empty import logging from controller import Controller # noqa: E402 from utils import printLog, printResponse, errorLogging, encodeBase64 # noqa: E402 @@ -358,31 +358,47 @@ init_mapping = {key:value for key, value in mapping.items() if key.startswith("/ controller.setInitMapping(init_mapping) class Main: - def __init__(self, controller_instance, mapping_data) -> None: + def __init__(self, controller_instance: Controller, mapping_data: dict) -> None: # queue holds tuples of (endpoint, data) self.queue: Queue[Tuple[str, Any]] = Queue() - self.main_loop = True + self._stop_event: Event = Event() self.controller = controller_instance self.mapping = mapping_data + self._threads: list[Thread] = [] def receiver(self) -> None: - while True: - received_data = sys.stdin.readline().strip() - received_data = json.loads(received_data) + """Read lines from stdin, parse JSON and enqueue requests. - if received_data: - endpoint = received_data.get("endpoint", None) - data = received_data.get("data", None) - data = encodeBase64(data) if data is not None else None - printLog(endpoint, {"receive_data": data}) - self.queue.put((endpoint, data)) + Uses blocking readline but honors stop via _stop_event checked between reads. + """ + while not self._stop_event.is_set(): + try: + line = sys.stdin.readline() + if not line: + # EOF reached; sleep briefly and re-check stop event + time.sleep(0.1) + continue + received_data = json.loads(line.strip()) + + if received_data: + endpoint = received_data.get("endpoint") + data = received_data.get("data") + data = encodeBase64(data) if data is not None else None + printLog(endpoint, {"receive_data": data}) + self.queue.put((endpoint, data)) + except json.JSONDecodeError: + # malformed input; log and continue + errorLogging() + except Exception: + errorLogging() def startReceiver(self) -> None: - th_receiver = Thread(target=self.receiver) + th_receiver = Thread(target=self.receiver, name="main_receiver") th_receiver.daemon = True th_receiver.start() + self._threads.append(th_receiver) - def handleRequest(self, endpoint, data=None) -> tuple: + def handleRequest(self, endpoint: str, data: Any = None) -> tuple: result = None # デフォルト値を設定 status = 500 # デフォルト値を設定 @@ -396,45 +412,62 @@ class Main: else: try: response = handler["variable"](data) - status = response.get("status", None) - result = response.get("result", None) - time.sleep(0.2) # 処理の安定化のために少し待機 - except Exception as e: + status = response.get("status") + result = response.get("result") + time.sleep(0.2) # 処理の安定化のために少し待機 + except Exception: errorLogging() - result = str(e) + result = "Internal error" status = 500 return result, status def handler(self) -> None: - while True: - if not self.queue.empty(): - try: - endpoint, data = self.queue.get() - result, status = self.handleRequest(endpoint, data) - except Exception as e: - errorLogging() - result = str(e) - status = 500 + """Main handler loop. Uses queue.get with timeout to avoid busy polling and to allow graceful shutdown.""" + while not self._stop_event.is_set(): + try: + endpoint, data = self.queue.get(timeout=0.5) + except Empty: + continue - if status == 423: - self.queue.put((endpoint, data)) - else: - printLog(endpoint, {"status": status, "send_data": result}) - printResponse(status, endpoint, result) - time.sleep(0.1) + try: + result, status = self.handleRequest(endpoint, data) + except Exception: + errorLogging() + result = "Internal error" + status = 500 + + if status == 423: + # Locked endpoint: requeue with a small delay to avoid tight loop + time.sleep(0.1) + self.queue.put((endpoint, data)) + else: + printLog(endpoint, {"status": status, "send_data": result}) + printResponse(status, endpoint, result) def startHandler(self) -> None: - th_handler = Thread(target=self.handler) + th_handler = Thread(target=self.handler, name="main_handler") th_handler.daemon = True th_handler.start() + self._threads.append(th_handler) def start(self) -> None: - while self.main_loop: - time.sleep(1) + """Start receiver and handler threads.""" + self.startReceiver() + self.startHandler() - def stop(self) -> None: - self.main_loop = False + def stop(self, wait: float = 2.0) -> None: + """Signal threads to stop and wait for them to finish. + + Args: + wait: maximum seconds to wait for threads to join. + """ + self._stop_event.set() + # give threads a chance to exit + start = time.time() + for th in self._threads: + remaining = max(0.0, wait - (time.time() - start)) + th.join(timeout=remaining) # 外部から参照可能なインスタンスを提供 main_instance = Main(controller_instance=controller, mapping_data=mapping)