Merge branch 'transliteration' into develop
This commit is contained in:
@@ -915,12 +915,16 @@ class Controller:
|
||||
@staticmethod
|
||||
def setEnableConvertMessageToRomaji(*args, **kwargs) -> dict:
|
||||
if config.CONVERT_MESSAGE_TO_ROMAJI is False:
|
||||
if config.CONVERT_MESSAGE_TO_HIRAGANA is False:
|
||||
model.startTransliteration()
|
||||
config.CONVERT_MESSAGE_TO_ROMAJI = True
|
||||
return {"status":200, "result":config.CONVERT_MESSAGE_TO_ROMAJI}
|
||||
|
||||
@staticmethod
|
||||
def setDisableConvertMessageToRomaji(*args, **kwargs) -> dict:
|
||||
if config.CONVERT_MESSAGE_TO_ROMAJI is True:
|
||||
if config.CONVERT_MESSAGE_TO_HIRAGANA is False:
|
||||
model.stopTransliteration()
|
||||
config.CONVERT_MESSAGE_TO_ROMAJI = False
|
||||
return {"status":200, "result":config.CONVERT_MESSAGE_TO_ROMAJI}
|
||||
|
||||
@@ -931,12 +935,16 @@ class Controller:
|
||||
@staticmethod
|
||||
def setEnableConvertMessageToHiragana(*args, **kwargs) -> dict:
|
||||
if config.CONVERT_MESSAGE_TO_HIRAGANA is False:
|
||||
if config.CONVERT_MESSAGE_TO_ROMAJI is False:
|
||||
model.startTransliteration()
|
||||
config.CONVERT_MESSAGE_TO_HIRAGANA = True
|
||||
return {"status":200, "result":config.CONVERT_MESSAGE_TO_HIRAGANA}
|
||||
|
||||
@staticmethod
|
||||
def setDisableConvertMessageToHiragana(*args, **kwargs) -> dict:
|
||||
if config.CONVERT_MESSAGE_TO_HIRAGANA is True:
|
||||
if config.CONVERT_MESSAGE_TO_ROMAJI is False:
|
||||
model.stopTransliteration()
|
||||
config.CONVERT_MESSAGE_TO_HIRAGANA = False
|
||||
return {"status":200, "result":config.CONVERT_MESSAGE_TO_HIRAGANA}
|
||||
|
||||
@@ -2466,6 +2474,11 @@ class Controller:
|
||||
self.updateDownloadedWhisperModelWeight()
|
||||
self.updateTranscriptionEngine()
|
||||
|
||||
# set Transliteration status
|
||||
printLog("Set Transliteration")
|
||||
if config.CONVERT_MESSAGE_TO_ROMAJI is True or config.CONVERT_MESSAGE_TO_HIRAGANA is True:
|
||||
model.startTransliteration()
|
||||
|
||||
self.initializationProgress(3)
|
||||
|
||||
# set word filter
|
||||
|
||||
@@ -99,7 +99,7 @@ class Model:
|
||||
self.overlay_image = OverlayImage(config.PATH_LOCAL)
|
||||
self.mic_audio_queue = None
|
||||
self.mic_mute_status = None
|
||||
self.transliterator = Transliterator()
|
||||
self.transliterator = None
|
||||
self.watchdog = Watchdog(config.WATCHDOG_TIMEOUT, config.WATCHDOG_INTERVAL)
|
||||
self.osc_handler = OSCHandler(config.OSC_IP_ADDRESS, config.OSC_PORT)
|
||||
self.websocket_server = None
|
||||
@@ -277,6 +277,14 @@ class Model:
|
||||
self.previous_receive_message = message
|
||||
return repeat_flag
|
||||
|
||||
def startTransliteration(self):
|
||||
if self.transliterator is None:
|
||||
self.transliterator = Transliterator()
|
||||
|
||||
def stopTransliteration(self):
|
||||
if self.transliterator is not None:
|
||||
self.transliterator = None
|
||||
|
||||
def convertMessageToTransliteration(self, message: str, hiragana: bool=True, romaji: bool=True) -> str:
|
||||
if hiragana is False and romaji is False:
|
||||
return message
|
||||
@@ -287,6 +295,9 @@ class Model:
|
||||
if romaji:
|
||||
keys_to_keep.add("hepburn")
|
||||
|
||||
if self.transliterator is None:
|
||||
self.startTransliteration()
|
||||
|
||||
data_list = self.transliterator.analyze(message, use_macron=False)
|
||||
filtered_list = [
|
||||
{key: value for key, value in item.items() if key in keys_to_keep}
|
||||
|
||||
@@ -100,6 +100,9 @@ class Translator():
|
||||
|
||||
def translate(self, translator_name, source_language, target_language, target_country, message):
|
||||
try:
|
||||
if source_language == target_language:
|
||||
return message
|
||||
|
||||
result = ""
|
||||
source_language, target_language = self.getLanguageCode(translator_name, target_country, source_language, target_language)
|
||||
match translator_name:
|
||||
|
||||
@@ -0,0 +1,134 @@
|
||||
from typing import List, Dict
|
||||
import re
|
||||
|
||||
"""Contextual transliteration rules for tokenized results.
|
||||
|
||||
This module provides a compact rule engine that can modify token
|
||||
readings (kana) based on neighboring tokens. Rules are embedded in
|
||||
``DEFAULT_RULES`` to simplify packaging (no external JSON required).
|
||||
|
||||
Key points
|
||||
- Rules are applied in descending ``priority`` order.
|
||||
- Supported match modes: ``equals`` (exact match) and ``regex``.
|
||||
- ``direction`` chooses whether to inspect the next or previous token.
|
||||
- When a rule sets ``kana``, the engine overwrites ``kana`` and clears
|
||||
``hira``/``hepburn``; callers should recompute them after rules run.
|
||||
|
||||
The engine mutates the provided ``results`` list in-place and also
|
||||
returns it for convenience.
|
||||
"""
|
||||
DEFAULT_RULES = {
|
||||
"rules": [
|
||||
{
|
||||
"name": "nan_next_tdna",
|
||||
"target": "何",
|
||||
"match_mode": "equals",
|
||||
"direction": "next",
|
||||
"kana_set": list("タチツテトダヂヅデドナニヌネノ"),
|
||||
"on_true": {"kana": "ナン"},
|
||||
"on_false": {"kana": "ナニ"}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
|
||||
def apply_context_rules(results: List[Dict], use_macron: bool = False) -> List[Dict]:
|
||||
"""Apply contextual rewrite rules to `results`.
|
||||
|
||||
Parameters
|
||||
- results: list of token dicts produced by Transliterator.split_kanji_okurigana
|
||||
where each entry contains at least the keys: 'orig', 'kana', 'hira', 'hepburn'.
|
||||
- use_macron: passed through for compatibility; rules themselves don't use it
|
||||
|
||||
Returns
|
||||
- The (possibly modified) `results` list. The list is also modified in-place.
|
||||
|
||||
The engine supports 'equals' and 'regex' match modes, next/prev neighbor
|
||||
inspection, and simple actions that overwrite `kana` (caller must recalc
|
||||
`hira`/`hepburn` afterwards).
|
||||
"""
|
||||
|
||||
# prepare rules: sort by priority (desc) and precompile regex where provided
|
||||
raw_rules = DEFAULT_RULES.get("rules", [])
|
||||
rules = sorted(raw_rules, key=lambda r: r.get("priority", 0), reverse=True)
|
||||
for r in rules:
|
||||
if r.get("match_mode") == "regex" and r.get("pattern"):
|
||||
try:
|
||||
r["_re"] = re.compile(r["pattern"])
|
||||
except Exception:
|
||||
r["_re"] = None
|
||||
|
||||
i = 0
|
||||
n = len(results)
|
||||
while i < n:
|
||||
entry = results[i]
|
||||
orig = entry.get("orig", "")
|
||||
# skip tokens with empty orig (symbols, whitespace, etc.)
|
||||
if not orig:
|
||||
i += 1
|
||||
continue
|
||||
|
||||
for rule in rules:
|
||||
target = rule.get("target")
|
||||
mode = rule.get("match_mode", "equals")
|
||||
direction = rule.get("direction", "next")
|
||||
kana_set = set(rule.get("kana_set", []))
|
||||
on_true = rule.get("on_true", {})
|
||||
on_false = rule.get("on_false", {})
|
||||
|
||||
matched = False
|
||||
if mode == "equals" and orig == target:
|
||||
matched = True
|
||||
elif mode == "regex":
|
||||
cre = rule.get("_re")
|
||||
if cre and cre.search(orig):
|
||||
matched = True
|
||||
# regex or other modes can be added later
|
||||
|
||||
if not matched:
|
||||
continue
|
||||
|
||||
# decide neighbor token based on direction
|
||||
neighbor_entry = None
|
||||
if direction == "next":
|
||||
j = i + 1
|
||||
while j < n:
|
||||
if results[j].get("orig"):
|
||||
neighbor_entry = results[j]
|
||||
break
|
||||
j += 1
|
||||
elif direction == "prev":
|
||||
j = i - 1
|
||||
while j >= 0:
|
||||
if results[j].get("orig"):
|
||||
neighbor_entry = results[j]
|
||||
break
|
||||
j -= 1
|
||||
|
||||
condition = False
|
||||
if neighbor_entry:
|
||||
nk = neighbor_entry.get("kana", "")
|
||||
if nk:
|
||||
first = nk[0]
|
||||
if first in kana_set:
|
||||
condition = True
|
||||
else:
|
||||
# fallback to orig-first-char check
|
||||
fo = neighbor_entry.get("orig", "")[:1]
|
||||
if fo and 'ァ' <= fo <= 'ン' and fo in kana_set:
|
||||
condition = True
|
||||
|
||||
# Apply action: simple overwrite of kana/hira/hepburn for the matched token
|
||||
action = on_true if condition else on_false
|
||||
if "kana" in action:
|
||||
entry["kana"] = action["kana"]
|
||||
entry["hira"] = ""
|
||||
entry["hepburn"] = ""
|
||||
# once a rule applied, do not apply further rules to this token
|
||||
break
|
||||
|
||||
i += 1
|
||||
|
||||
# return the (possibly modified) results for convenience/pure-function style usage
|
||||
return results
|
||||
@@ -4,10 +4,14 @@ try:
|
||||
from .transliteration_kana_to_hepburn import katakana_to_hepburn
|
||||
except ImportError:
|
||||
from transliteration_kana_to_hepburn import katakana_to_hepburn
|
||||
try:
|
||||
from .transliteration_context_rules import apply_context_rules
|
||||
except ImportError:
|
||||
from transliteration_context_rules import apply_context_rules
|
||||
|
||||
class Transliterator:
|
||||
def __init__(self):
|
||||
self.tokenizer_obj = dictionary.Dictionary().create()
|
||||
self.tokenizer_obj = dictionary.Dictionary(dict_type="full").create()
|
||||
self.mode = tokenizer.Tokenizer.SplitMode.C
|
||||
|
||||
@staticmethod
|
||||
@@ -22,11 +26,25 @@ class Transliterator:
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def split_kanji_okurigana(surface: str, reading_kana: str):
|
||||
"""
|
||||
1語の表層形(surface)と読み(reading_kana)を
|
||||
[ {"orig":..., "kana":..., "hira":..., "hepburn":...}, ... ] に分割
|
||||
def split_kanji_okurigana(surface: str, reading_kana: str, use_macron: bool = True):
|
||||
"""Split a single surface word and its kana reading into parts.
|
||||
|
||||
Inputs:
|
||||
- surface: the surface form (may contain kanji + kana)
|
||||
- reading_kana: the katakana reading for the whole surface
|
||||
|
||||
Output:
|
||||
- a list of dicts: [{"orig": str, "kana": str, "hira": str, "hepburn": str}, ...]
|
||||
|
||||
Notes:
|
||||
- The function allocates portions of ``reading_kana`` to each contiguous
|
||||
kanji/non-kanji block in ``surface``. Allocation is heuristic: an
|
||||
initial allocation based on block length is used and any remainder is
|
||||
distributed left-to-right preferring kanji blocks.
|
||||
- This function is pure (no external side effects) and returns the
|
||||
constructed list.
|
||||
"""
|
||||
|
||||
result = []
|
||||
|
||||
# 表層を「漢字ブロック」と「非漢字ブロック」に分割
|
||||
@@ -46,55 +64,73 @@ class Transliterator:
|
||||
|
||||
# 読みを分配
|
||||
kana_left = reading_kana
|
||||
for i, (is_kan, part) in enumerate(blocks):
|
||||
if is_kan:
|
||||
# 漢字ブロックの処理
|
||||
if len(blocks) == 1:
|
||||
# 単一ブロック(全て漢字)の場合
|
||||
kana_for_kan = kana_left
|
||||
elif i == len(blocks) - 1:
|
||||
# 最後のブロック(漢字)の場合
|
||||
kana_for_kan = kana_left
|
||||
else:
|
||||
# 中間の漢字ブロックの場合
|
||||
# 後続の非漢字ブロックの文字数を計算
|
||||
remaining_non_kanji = sum(len(p) for is_k, p in blocks[i+1:] if not is_k)
|
||||
if remaining_non_kanji > 0 and len(kana_left) > remaining_non_kanji:
|
||||
kana_for_kan = kana_left[:-remaining_non_kanji]
|
||||
else:
|
||||
# 漢字1文字あたり最低1文字の読みを割り当て
|
||||
min_kana = len(part)
|
||||
kana_for_kan = kana_left[:max(min_kana, len(kana_left) - remaining_non_kanji)]
|
||||
|
||||
# 空の読みを避ける
|
||||
if not kana_for_kan and kana_left:
|
||||
kana_for_kan = kana_left[:1]
|
||||
|
||||
result.append(
|
||||
{
|
||||
"orig": part,
|
||||
"kana": kana_for_kan,
|
||||
"hira": Transliterator.kata_to_hira(kana_for_kan),
|
||||
"hepburn": katakana_to_hepburn(kana_for_kan, use_macron=True)
|
||||
}
|
||||
)
|
||||
kana_left = kana_left[len(kana_for_kan):]
|
||||
else:
|
||||
# 非漢字部分(送り仮名など)
|
||||
kana_for_okuri = kana_left[:len(part)]
|
||||
result.append(
|
||||
{
|
||||
"orig": part,
|
||||
"kana": kana_for_okuri,
|
||||
"hira": Transliterator.kata_to_hira(kana_for_okuri),
|
||||
"hepburn": katakana_to_hepburn(kana_for_okuri, use_macron=True)
|
||||
}
|
||||
)
|
||||
kana_left = kana_left[len(kana_for_okuri):]
|
||||
# We'll allocate kana to each block by initial guess = len(part) (characters)
|
||||
# and distribute any remaining kana left-to-right preferring kanji blocks.
|
||||
kana_len = len(kana_left)
|
||||
|
||||
# initial allocation per block
|
||||
allocs = [len(part) for _, part in blocks]
|
||||
allocated = sum(allocs)
|
||||
remaining = kana_len - allocated
|
||||
|
||||
# distribute extra kana to kanji blocks first (left-to-right)
|
||||
if remaining > 0:
|
||||
for idx, (is_kan, _) in enumerate(blocks):
|
||||
if remaining <= 0:
|
||||
break
|
||||
if is_kan:
|
||||
allocs[idx] += 1
|
||||
remaining -= 1
|
||||
# if still remaining, distribute to all blocks left-to-right
|
||||
idx = 0
|
||||
while remaining > 0 and len(blocks) > 0:
|
||||
allocs[idx] += 1
|
||||
remaining -= 1
|
||||
idx = (idx + 1) % len(blocks)
|
||||
|
||||
# if remaining < 0 (reading shorter than base), shrink allocations from right
|
||||
if remaining < 0:
|
||||
# remove from rightmost blocks as needed
|
||||
need = -remaining
|
||||
idx = len(blocks) - 1
|
||||
while need > 0 and idx >= 0:
|
||||
take = min(allocs[idx] - 1, need) if allocs[idx] > 1 else 0
|
||||
allocs[idx] -= take
|
||||
need -= take
|
||||
idx -= 1
|
||||
|
||||
# now slice kana_left according to allocs
|
||||
pos = 0
|
||||
for (is_kan, part), cnt in zip(blocks, allocs):
|
||||
kana_for_part = kana_left[pos:pos+cnt]
|
||||
pos += cnt
|
||||
result.append({
|
||||
"orig": part,
|
||||
"kana": kana_for_part,
|
||||
"hira": Transliterator.kata_to_hira(kana_for_part),
|
||||
"hepburn": katakana_to_hepburn(kana_for_part, use_macron=use_macron)
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
def analyze(self, text: str, use_macron: bool = True):
|
||||
def analyze(self, text: str, use_macron: bool = False):
|
||||
"""Tokenize ``text`` and produce per-subunit reading information.
|
||||
|
||||
Returns a list of dicts for each token/sub-part with keys:
|
||||
- orig: original surface string (one or more characters)
|
||||
- kana: katakana reading for this part (may be adapted by context rules)
|
||||
- hira: hiragana reading (derived from kana)
|
||||
- hepburn: Latin transcription (derived from kana)
|
||||
|
||||
Side-effects / notes:
|
||||
- The function calls ``apply_context_rules(results, use_macron=...)``
|
||||
which both mutates ``results`` in-place and returns it. This method
|
||||
safely accepts the returned list and then recalculates ``hira`` and
|
||||
``hepburn`` for entries whose ``kana`` was changed.
|
||||
- If rule application fails, analysis still returns the best-effort
|
||||
results.
|
||||
"""
|
||||
|
||||
tokens = self.tokenizer_obj.tokenize(text, self.mode)
|
||||
|
||||
results = []
|
||||
@@ -103,7 +139,7 @@ class Transliterator:
|
||||
reading = t.reading_form()
|
||||
pos = t.part_of_speech()
|
||||
|
||||
if pos and pos[0] in ["記号", "補助記号"]:
|
||||
if pos and pos[0] in ["記号", "補助記号", "空白"]:
|
||||
reading = surface
|
||||
|
||||
if surface == reading:
|
||||
@@ -125,75 +161,41 @@ class Transliterator:
|
||||
"hepburn": katakana_to_hepburn(reading, use_macron=use_macron)
|
||||
})
|
||||
else:
|
||||
# 複数文字の場合は文字種別で分割
|
||||
i = 0
|
||||
reading_pos = 0
|
||||
|
||||
while i < len(surface):
|
||||
char = surface[i]
|
||||
|
||||
if self.is_kanji(char):
|
||||
# 漢字の場合、連続する漢字をまとめて処理
|
||||
kanji_block = ""
|
||||
while i < len(surface) and self.is_kanji(surface[i]):
|
||||
kanji_block += surface[i]
|
||||
i += 1
|
||||
|
||||
# 漢字ブロックの読みを推定
|
||||
if i < len(surface):
|
||||
# 後に文字がある場合、送り仮名を考慮
|
||||
remaining_chars = len(surface) - i
|
||||
kanji_reading = reading[reading_pos:-remaining_chars] if remaining_chars > 0 else reading[reading_pos:]
|
||||
else:
|
||||
# 最後の漢字ブロックの場合
|
||||
kanji_reading = reading[reading_pos:]
|
||||
# 複数文字の場合は既存のユーティリティで分割
|
||||
parts = self.split_kanji_okurigana(surface, reading, use_macron=use_macron)
|
||||
results.extend(parts)
|
||||
|
||||
# 空の読みを避ける
|
||||
if not kanji_reading and reading_pos < len(reading):
|
||||
kanji_reading = reading[reading_pos:]
|
||||
if not kanji_reading and kanji_block:
|
||||
# 読みが空だが漢字ブロックがある場合、残りの読みを全て割り当てる
|
||||
kanji_reading = reading[reading_pos:]
|
||||
# 文脈ルールを適用(別ファイル)
|
||||
try:
|
||||
results = apply_context_rules(results, use_macron=use_macron) or results
|
||||
except Exception:
|
||||
# ルール適用で失敗しても解析結果は返す
|
||||
pass
|
||||
|
||||
# reading_posの更新を正確に行うために、割り当てられた読みの長さをチェック
|
||||
len_allocated_reading = len(kanji_reading)
|
||||
if reading_pos + len_allocated_reading > len(reading):
|
||||
len_allocated_reading = len(reading) - reading_pos
|
||||
|
||||
results.append({
|
||||
"orig": kanji_block,
|
||||
"kana": kanji_reading,
|
||||
"hira": self.kata_to_hira(kanji_reading),
|
||||
"hepburn": katakana_to_hepburn(kanji_reading, use_macron=use_macron)
|
||||
})
|
||||
reading_pos += len_allocated_reading
|
||||
else:
|
||||
# 非漢字の場合
|
||||
non_kanji_block = ""
|
||||
while i < len(surface) and not self.is_kanji(surface[i]):
|
||||
non_kanji_block += surface[i]
|
||||
i += 1
|
||||
|
||||
# 非漢字部分の読み(通常は文字数分、または残りの読みの分だけ)
|
||||
len_block = len(non_kanji_block)
|
||||
non_kanji_reading = reading[reading_pos:reading_pos + len_block]
|
||||
|
||||
# 割り当てられた読みの長さ
|
||||
len_allocated_reading = len(non_kanji_reading)
|
||||
|
||||
results.append({
|
||||
"orig": non_kanji_block,
|
||||
"kana": non_kanji_reading,
|
||||
"hira": self.kata_to_hira(non_kanji_reading),
|
||||
"hepburn": katakana_to_hepburn(non_kanji_reading, use_macron=use_macron)
|
||||
})
|
||||
reading_pos += len_allocated_reading
|
||||
# apply_context_rules が kana を書き換えた場合、hira と hepburn を再計算
|
||||
for entry in results:
|
||||
kana = entry.get("kana", "")
|
||||
if kana:
|
||||
entry["hira"] = self.kata_to_hira(kana)
|
||||
entry["hepburn"] = katakana_to_hepburn(kana, use_macron=use_macron)
|
||||
|
||||
return results
|
||||
|
||||
# --- テスト ---
|
||||
if __name__ == "__main__":
|
||||
import pprint
|
||||
test_cases = [
|
||||
"向こうへ行く",
|
||||
"行事を行う",
|
||||
"上がる",
|
||||
"上る",
|
||||
"入り込む",
|
||||
"何",
|
||||
"何が好き?",
|
||||
"何色が好き?",
|
||||
"何色ありますか?",
|
||||
"何語ですか?",
|
||||
"テーブルに色鉛筆は何色ありますか?"
|
||||
"美しい花を見る",
|
||||
"東京に行く",
|
||||
"漢字とカタカナの混在",
|
||||
@@ -217,4 +219,4 @@ if __name__ == "__main__":
|
||||
|
||||
transliterator = Transliterator()
|
||||
for case in test_cases:
|
||||
print(transliterator.analyze(case))
|
||||
pprint.pprint(transliterator.analyze(case), sort_dicts=False)
|
||||
Reference in New Issue
Block a user