👍️ [Update] pythonのメイン処理部分を移動/webui_mainloop.pyをビルドできるように修正

This commit is contained in:
misyaguziya
2024-07-27 01:30:36 +09:00
parent 7ce3bc9be9
commit 1be04cb571
21 changed files with 46 additions and 28 deletions

1164
src-python/config.py Normal file

File diff suppressed because it is too large Load Diff

1192
src-python/controller.py Normal file

File diff suppressed because it is too large Load Diff

33
src-python/main.py Normal file
View File

@@ -0,0 +1,33 @@
if __name__ == "__main__":
try:
import ctypes
ctypes.windll.shcore.SetProcessDpiAwareness(0)
from vrct_gui.splash_window import SplashWindow
splash = SplashWindow()
splash.showSplash()
from config import config
# version 2.2.0からweightフォルダをweightsに変更する
from utils import renameWeightFolder
renameWeightFolder(config.PATH_LOCAL)
from models.translation.translation_utils import downloadCTranslate2Weight
if config.USE_TRANSLATION_FEATURE is True:
downloadCTranslate2Weight(config.PATH_LOCAL, config.CTRANSLATE2_WEIGHT_TYPE, splash.updateDownloadProgress)
from models.transcription.transcription_whisper import downloadWhisperWeight
if config.USE_WHISPER_FEATURE is True:
downloadWhisperWeight(config.PATH_LOCAL, config.WHISPER_WEIGHT_TYPE, splash.updateDownloadProgress)
splash.toProgress(0)
import controller
controller.createMainWindow(splash)
splash.destroySplash()
controller.showMainWindow()
except Exception:
import traceback
with open('error.log', 'a') as f:
traceback.print_exc(file=f)

727
src-python/model.py Normal file
View File

@@ -0,0 +1,727 @@
import gc
import tempfile
from zipfile import ZipFile
from subprocess import Popen
from os import makedirs as os_makedirs
from os import path as os_path
from shutil import copyfile
from datetime import datetime
from logging import getLogger, FileHandler, Formatter, INFO
from time import sleep
from queue import Queue
from threading import Thread
from requests import get as requests_get
import webbrowser
from typing import Callable
from flashtext import KeywordProcessor
from models.translation.translation_translator import Translator
from models.transcription.transcription_utils import getInputDevices, getOutputDevices
from models.osc.osc_tools import sendTyping, sendMessage, receiveOscParameters, getOSCParameterValue
from models.transcription.transcription_recorder import SelectedMicEnergyAndAudioRecorder, SelectedSpeakerEnergyAndAudioRecorder
from models.transcription.transcription_recorder import SelectedMicEnergyRecorder, SelectedSpeakerEnergyRecorder
from models.transcription.transcription_transcriber import AudioTranscriber
from models.xsoverlay.notification import xsoverlayForVRCT
from models.translation.translation_languages import translation_lang
from models.transcription.transcription_languages import transcription_lang
from models.translation.translation_utils import checkCTranslate2Weight
from models.transcription.transcription_whisper import checkWhisperWeight
from models.overlay.overlay import Overlay
from models.overlay.overlay_image import OverlayImage
from config import config
class threadFnc(Thread):
def __init__(self, fnc, end_fnc=None, daemon=True, *args, **kwargs):
super(threadFnc, self).__init__(daemon=daemon, target=fnc, *args, **kwargs)
self.fnc = fnc
self.end_fnc = end_fnc
self.loop = True
self._pause = False
def stop(self):
self.loop = False
def pause(self):
self._pause = True
def resume(self):
self._pause = False
def run(self):
while self.loop:
self.fnc(*self._args, **self._kwargs)
while self._pause:
sleep(0.1)
if callable(self.end_fnc):
self.end_fnc()
return
class Model:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(Model, cls).__new__(cls)
cls._instance.init()
return cls._instance
def init(self):
self.logger = None
self.mic_print_transcript = None
self.mic_audio_recorder = None
self.mic_energy_recorder = None
self.mic_energy_plot_progressbar = None
self.speaker_print_transcript = None
self.speaker_audio_recorder = None
self.speaker_energy_recorder = None
self.speaker_energy_plot_progressbar = None
self.previous_send_message = ""
self.previous_receive_message = ""
self.translator = Translator()
self.keyword_processor = KeywordProcessor()
self.overlay = Overlay(
config.OVERLAY_SMALL_LOG_SETTINGS["x_pos"],
config.OVERLAY_SMALL_LOG_SETTINGS["y_pos"],
config.OVERLAY_SMALL_LOG_SETTINGS["z_pos"],
config.OVERLAY_SMALL_LOG_SETTINGS["x_rotation"],
config.OVERLAY_SMALL_LOG_SETTINGS["y_rotation"],
config.OVERLAY_SMALL_LOG_SETTINGS["x_rotation"],
config.OVERLAY_SMALL_LOG_SETTINGS["display_duration"],
config.OVERLAY_SMALL_LOG_SETTINGS["fadeout_duration"],
config.OVERLAY_SETTINGS["opacity"],
config.OVERLAY_SETTINGS["ui_scaling"],
)
self.overlay_image = OverlayImage()
self.pre_overlay_message = None
self.th_overlay = None
self.mic_audio_queue = None
self.mic_mute_status = None
self.mic_mute_status_check = None
def checkCTranslatorCTranslate2ModelWeight(self):
return checkCTranslate2Weight(config.PATH_LOCAL, config.CTRANSLATE2_WEIGHT_TYPE)
def changeTranslatorCTranslate2Model(self):
self.translator.changeCTranslate2Model(config.PATH_LOCAL, config.CTRANSLATE2_WEIGHT_TYPE)
def isLoadedCTranslate2Model(self):
return self.translator.isLoadedCTranslate2Model()
def checkTranscriptionWhisperModelWeight(self):
return checkWhisperWeight(config.PATH_LOCAL, config.WHISPER_WEIGHT_TYPE)
def resetKeywordProcessor(self):
del self.keyword_processor
self.keyword_processor = KeywordProcessor()
def authenticationTranslatorDeepLAuthKey(self, auth_key):
result = self.translator.authenticationDeepLAuthKey(auth_key)
return result
def startLogger(self):
os_makedirs(config.PATH_LOGS, exist_ok=True)
logger = getLogger()
logger.setLevel(INFO)
file_name = os_path.join(config.PATH_LOGS, f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.log")
file_handler = FileHandler(file_name, encoding="utf-8", delay=True)
formatter = Formatter("[%(asctime)s] %(message)s")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
self.logger = logger
self.logger.disabled = False
def stopLogger(self):
self.logger.disabled = True
self.logger = None
def getListLanguageAndCountry(self):
transcription_langs = list(transcription_lang.keys())
translation_langs = []
for tl_key in translation_lang.keys():
for lang in translation_lang[tl_key]["source"]:
translation_langs.append(lang)
translation_langs = list(set(translation_langs))
supported_langs = list(filter(lambda x: x in transcription_langs, translation_langs))
languages = []
for language in supported_langs:
for country in transcription_lang[language]:
languages.append(
{
"language" : language,
"country" : country,
}
)
languages = sorted(languages, key=lambda x: x['language'])
return languages
def findTranslationEngines(self, source_lang, target_lang):
compatible_engines = []
for engine in list(translation_lang.keys()):
languages = translation_lang.get(engine, {}).get("source", {})
if source_lang in languages and target_lang in languages:
compatible_engines.append(engine)
if "DeepL_API" in compatible_engines:
if config.AUTH_KEYS["DeepL_API"] is None:
compatible_engines.remove('DeepL_API')
return compatible_engines
def getTranslate(self, translator_name, source_language, target_language, target_country, message):
success_flag = False
translation = self.translator.translate(
translator_name=translator_name,
source_language=source_language,
target_language=target_language,
target_country=target_country,
message=message
)
# 翻訳失敗時のフェールセーフ処理
if isinstance(translation, str):
success_flag = True
else:
while True:
translation = self.translator.translate(
translator_name="CTranslate2",
source_language=source_language,
target_language=target_language,
target_country=target_country,
message=message
)
if translation is not False:
break
sleep(0.1)
return translation, success_flag
def getInputTranslate(self, message):
translator_name=config.CHOICE_INPUT_TRANSLATOR
source_language=config.SOURCE_LANGUAGE
target_language=config.TARGET_LANGUAGE
target_country = config.TARGET_COUNTRY
translation, success_flag = self.getTranslate(
translator_name,
source_language,
target_language,
target_country,
message
)
return translation, success_flag
def getOutputTranslate(self, message):
translator_name=config.CHOICE_OUTPUT_TRANSLATOR
source_language=config.TARGET_LANGUAGE
target_language=config.SOURCE_LANGUAGE
target_country=config.SOURCE_COUNTRY
translation, success_flag = self.getTranslate(
translator_name,
source_language,
target_language,
target_country,
message
)
return translation, success_flag
def addKeywords(self):
for f in config.INPUT_MIC_WORD_FILTER:
self.keyword_processor.add_keyword(f)
def checkKeywords(self, message):
return len(self.keyword_processor.extract_keywords(message)) != 0
def detectRepeatSendMessage(self, message):
repeat_flag = False
if self.previous_send_message == message:
repeat_flag = True
self.previous_send_message = message
return repeat_flag
def detectRepeatReceiveMessage(self, message):
repeat_flag = False
if self.previous_receive_message == message:
repeat_flag = True
self.previous_receive_message = message
return repeat_flag
@staticmethod
def oscStartSendTyping():
sendTyping(True, config.OSC_IP_ADDRESS, config.OSC_PORT)
@staticmethod
def oscStopSendTyping():
sendTyping(False, config.OSC_IP_ADDRESS, config.OSC_PORT)
@staticmethod
def oscSendMessage(message):
sendMessage(message, config.OSC_IP_ADDRESS, config.OSC_PORT)
@staticmethod
def getMuteSelfStatus():
return getOSCParameterValue(address="/avatar/parameters/MuteSelf")
def startCheckMuteSelfStatus(self):
def checkMuteSelfStatus():
if self.mic_mute_status is not None:
self.changeMicTranscriptStatus()
self.stopCheckMuteSelfStatus()
status = self.getMuteSelfStatus()
if status is not None:
self.mic_mute_status = status
self.changeMicTranscriptStatus()
self.stopCheckMuteSelfStatus()
if not isinstance(self.mic_mute_status_check, threadFnc):
self.mic_mute_status_check = threadFnc(checkMuteSelfStatus)
self.mic_mute_status_check.daemon = True
self.mic_mute_status_check.start()
def stopCheckMuteSelfStatus(self):
if isinstance(self.mic_mute_status_check, threadFnc):
self.mic_mute_status_check.stop()
self.mic_mute_status_check = None
def startReceiveOSC(self):
osc_parameter_prefix = "/avatar/parameters/"
param_MuteSelf = "MuteSelf"
def change_handler_mute(address, osc_arguments):
if osc_arguments is True and self.mic_mute_status is False:
self.mic_mute_status = osc_arguments
self.changeMicTranscriptStatus()
elif osc_arguments is False and self.mic_mute_status is True:
self.mic_mute_status = osc_arguments
self.changeMicTranscriptStatus()
dict_filter_and_target = {
osc_parameter_prefix + param_MuteSelf: change_handler_mute,
}
th_osc_server = Thread(target=receiveOscParameters, args=(dict_filter_and_target,))
th_osc_server.daemon = True
th_osc_server.start()
@staticmethod
def checkSoftwareUpdated():
# check update
update_flag = False
response = requests_get(config.GITHUB_URL)
new_version = response.json()["name"]
if new_version != config.VERSION:
update_flag = True
print("software version", "now:", config.VERSION, "new:", new_version)
return update_flag
@staticmethod
def updateSoftware(restart:bool=True, func=None):
def updateSoftwareTask():
filename = 'VRCT.zip'
program_name = 'VRCT.exe'
folder_name = '_internal'
tmp_directory_name = 'tmp'
batch_name = 'update.bat'
current_directory = config.PATH_LOCAL
try:
res = requests_get(config.GITHUB_URL)
assets = res.json()['assets']
url = [i["browser_download_url"] for i in assets if i["name"] == filename][0]
with tempfile.TemporaryDirectory() as tmp_path:
res = requests_get(url, stream=True)
file_size = int(res.headers.get('content-length', 0))
total_chunk = 0
with open(os_path.join(tmp_path, filename), 'wb') as file:
for chunk in res.iter_content(chunk_size=1024*5):
file.write(chunk)
total_chunk += len(chunk)
if isinstance(func, Callable):
func(progress=total_chunk/file_size, progress_type="downloading")
print(f"downloaded {total_chunk}/{file_size}")
with ZipFile(os_path.join(tmp_path, filename)) as zf:
total_files = len(zf.infolist())
extracted_files = 0
for file_info in zf.infolist():
extracted_files += 1
zf.extract(file_info, os_path.join(current_directory, tmp_directory_name))
if isinstance(func, Callable):
func(progress=extracted_files/total_files, progress_type="extracting")
print(f"extracted {extracted_files}/{total_files}")
copyfile(os_path.join(current_directory, folder_name, "batch", batch_name), os_path.join(current_directory, batch_name))
command = [os_path.join(current_directory, batch_name), program_name, folder_name, tmp_directory_name, str(restart)]
Popen(command, cwd=current_directory)
except Exception:
import traceback
with open('error.log', 'a') as f:
traceback.print_exc(file=f)
webbrowser.open(config.BOOTH_URL, new=2, autoraise=True)
th_update_software = Thread(target=updateSoftwareTask)
th_update_software.daemon = True
th_update_software.start()
@staticmethod
def reStartSoftware():
program_name = 'VRCT.exe'
folder_name = '_internal'
batch_name = 'restart.bat'
current_directory = config.PATH_LOCAL
copyfile(os_path.join(current_directory, folder_name, "batch", batch_name), os_path.join(current_directory, batch_name))
command = [os_path.join(current_directory, batch_name), program_name]
Popen(command, cwd=current_directory)
@staticmethod
def getListInputHost():
return [host for host in getInputDevices().keys()]
@staticmethod
def getInputDefaultDevice():
return getInputDevices().get(config.CHOICE_MIC_HOST, [{"name": "NoDevice"}])[0]["name"]
@staticmethod
def getListInputDevice():
return [device["name"] for device in getInputDevices().get(config.CHOICE_MIC_HOST, [{"name": "NoDevice"}])]
@staticmethod
def getListOutputDevice():
return [device["name"] for device in getOutputDevices()]
def startMicTranscript(self, fnc):
mic_device_list = getInputDevices().get(config.CHOICE_MIC_HOST, [{"name": "NoDevice"}])
choice_mic_device = [device for device in mic_device_list if device["name"] == config.CHOICE_MIC_DEVICE]
if len(choice_mic_device) == 0:
return False
self.mic_audio_queue = Queue()
# self.mic_energy_queue = Queue()
mic_device = choice_mic_device[0]
record_timeout = config.INPUT_MIC_RECORD_TIMEOUT
phrase_timeout = config.INPUT_MIC_PHRASE_TIMEOUT
if record_timeout > phrase_timeout:
record_timeout = phrase_timeout
self.mic_audio_recorder = SelectedMicEnergyAndAudioRecorder(
device=mic_device,
energy_threshold=config.INPUT_MIC_ENERGY_THRESHOLD,
dynamic_energy_threshold=config.INPUT_MIC_DYNAMIC_ENERGY_THRESHOLD,
record_timeout=record_timeout,
)
# self.mic_audio_recorder.recordIntoQueue(self.mic_audio_queue, mic_energy_queue)
self.mic_audio_recorder.recordIntoQueue(self.mic_audio_queue, None)
self.mic_transcriber = AudioTranscriber(
speaker=False,
source=self.mic_audio_recorder.source,
phrase_timeout=phrase_timeout,
max_phrases=config.INPUT_MIC_MAX_PHRASES,
transcription_engine=config.SELECTED_TRANSCRIPTION_ENGINE,
root=config.PATH_LOCAL,
whisper_weight_type=config.WHISPER_WEIGHT_TYPE,
)
def sendMicTranscript():
try:
res = self.mic_transcriber.transcribeAudioQueue(
self.mic_audio_queue,
config.SOURCE_LANGUAGE,
config.SOURCE_COUNTRY,
config.INPUT_MIC_AVG_LOGPROB,
config.INPUT_MIC_NO_SPEECH_PROB
)
if res:
message = self.mic_transcriber.getTranscript()
fnc(message)
except Exception:
pass
def endMicTranscript():
while not self.mic_audio_queue.empty():
self.mic_audio_queue.get()
# while not self.mic_energy_queue.empty():
# self.mic_energy_queue.get()
del self.mic_transcriber
gc.collect()
# def sendMicEnergy():
# if mic_energy_queue.empty() is False:
# energy = mic_energy_queue.get()
# # print("mic energy:", energy)
# try:
# fnc(energy)
# except Exception:
# pass
# sleep(0.01)
self.mic_print_transcript = threadFnc(sendMicTranscript, end_fnc=endMicTranscript)
self.mic_print_transcript.daemon = True
self.mic_print_transcript.start()
# self.mic_get_energy = threadFnc(sendMicEnergy)
# self.mic_get_energy.daemon = True
# self.mic_get_energy.start()
self.changeMicTranscriptStatus()
def resumeMicTranscript(self):
# キューをクリア
if isinstance(self.mic_audio_queue, Queue):
while not self.mic_audio_queue.empty():
self.mic_audio_queue.get()
# 文字起こしを再開
# if isinstance(self.mic_print_transcript, threadFnc):
# self.mic_print_transcript.resume()
# 音声のレコードを再開
if isinstance(self.mic_audio_recorder, SelectedMicEnergyAndAudioRecorder):
self.mic_audio_recorder.resume()
def pauseMicTranscript(self):
# 文字起こしを一時停止
# if isinstance(self.mic_print_transcript, threadFnc):
# self.mic_print_transcript.pause()
# 音声のレコードを一時停止
if isinstance(self.mic_audio_recorder, SelectedMicEnergyAndAudioRecorder):
self.mic_audio_recorder.pause()
def changeMicTranscriptStatus(self):
if config.ENABLE_VRC_MIC_MUTE_SYNC is True:
if self.mic_mute_status is True:
self.pauseMicTranscript()
elif self.mic_mute_status is False:
self.resumeMicTranscript()
else:
pass
else:
self.resumeMicTranscript()
def stopMicTranscript(self):
if isinstance(self.mic_print_transcript, threadFnc):
self.mic_print_transcript.stop()
self.mic_print_transcript.join()
self.mic_print_transcript = None
if isinstance(self.mic_audio_recorder, SelectedMicEnergyAndAudioRecorder):
self.mic_audio_recorder.resume()
self.mic_audio_recorder.stop()
self.mic_audio_recorder = None
# if isinstance(self.mic_get_energy, threadFnc):
# self.mic_get_energy.stop()
# self.mic_get_energy = None
def startCheckMicEnergy(self, fnc, end_fnc, error_fnc=None):
mic_device_list = getInputDevices().get(config.CHOICE_MIC_HOST, [{"name": "NoDevice"}])
choice_mic_device = [device for device in mic_device_list if device["name"] == config.CHOICE_MIC_DEVICE]
if len(choice_mic_device) == 0:
try:
error_fnc()
except Exception:
pass
return
def sendMicEnergy():
if mic_energy_queue.empty() is False:
energy = mic_energy_queue.get()
try:
fnc(energy)
except Exception:
pass
sleep(0.01)
mic_energy_queue = Queue()
mic_device = choice_mic_device[0]
self.mic_energy_recorder = SelectedMicEnergyRecorder(mic_device)
self.mic_energy_recorder.recordIntoQueue(mic_energy_queue)
self.mic_energy_plot_progressbar = threadFnc(sendMicEnergy, end_fnc=end_fnc)
self.mic_energy_plot_progressbar.daemon = True
self.mic_energy_plot_progressbar.start()
def stopCheckMicEnergy(self):
if isinstance(self.mic_energy_plot_progressbar, threadFnc):
self.mic_energy_plot_progressbar.stop()
self.mic_energy_plot_progressbar = None
if isinstance(self.mic_energy_recorder, SelectedMicEnergyRecorder):
self.mic_energy_recorder.stop()
self.mic_energy_recorder = None
def startSpeakerTranscript(self, fnc, error_fnc=None):
speaker_device_list = getOutputDevices()
choice_speaker_device = [device for device in speaker_device_list if device["name"] == config.CHOICE_SPEAKER_DEVICE]
if len(choice_speaker_device) == 0:
try:
error_fnc()
except Exception:
pass
return
speaker_audio_queue = Queue()
# speaker_energy_queue = Queue()
speaker_device = choice_speaker_device[0]
record_timeout = config.INPUT_SPEAKER_RECORD_TIMEOUT
phrase_timeout = config.INPUT_SPEAKER_PHRASE_TIMEOUT
if record_timeout > phrase_timeout:
record_timeout = phrase_timeout
self.speaker_audio_recorder = SelectedSpeakerEnergyAndAudioRecorder(
device=speaker_device,
energy_threshold=config.INPUT_SPEAKER_ENERGY_THRESHOLD,
dynamic_energy_threshold=config.INPUT_SPEAKER_DYNAMIC_ENERGY_THRESHOLD,
record_timeout=record_timeout,
)
# self.speaker_audio_recorder.recordIntoQueue(speaker_audio_queue, speaker_energy_queue)
self.speaker_audio_recorder.recordIntoQueue(speaker_audio_queue, None)
self.speaker_transcriber = AudioTranscriber(
speaker=True,
source=self.speaker_audio_recorder.source,
phrase_timeout=phrase_timeout,
max_phrases=config.INPUT_SPEAKER_MAX_PHRASES,
transcription_engine=config.SELECTED_TRANSCRIPTION_ENGINE,
root=config.PATH_LOCAL,
whisper_weight_type=config.WHISPER_WEIGHT_TYPE,
)
def sendSpeakerTranscript():
try:
res = self.speaker_transcriber.transcribeAudioQueue(
speaker_audio_queue,
config.TARGET_LANGUAGE,
config.TARGET_COUNTRY,
config.INPUT_SPEAKER_AVG_LOGPROB,
config.INPUT_SPEAKER_NO_SPEECH_PROB
)
if res:
message = self.speaker_transcriber.getTranscript()
fnc(message)
except Exception:
pass
def endSpeakerTranscript():
speaker_audio_queue.queue.clear()
# speaker_energy_queue.queue.clear()
del self.speaker_transcriber
gc.collect()
# def sendSpeakerEnergy():
# if speaker_energy_queue.empty() is False:
# energy = speaker_energy_queue.get()
# # print("speaker energy:", energy)
# try:
# fnc(energy)
# except Exception:
# pass
# sleep(0.01)
self.speaker_print_transcript = threadFnc(sendSpeakerTranscript, end_fnc=endSpeakerTranscript)
self.speaker_print_transcript.daemon = True
self.speaker_print_transcript.start()
# self.speaker_get_energy = threadFnc(sendSpeakerEnergy)
# self.speaker_get_energy.daemon = True
# self.speaker_get_energy.start()
def stopSpeakerTranscript(self):
if isinstance(self.speaker_print_transcript, threadFnc):
self.speaker_print_transcript.stop()
self.speaker_print_transcript.join()
self.speaker_print_transcript = None
if isinstance(self.speaker_audio_recorder, SelectedSpeakerEnergyAndAudioRecorder):
self.speaker_audio_recorder.stop()
self.speaker_audio_recorder = None
# if isinstance(self.speaker_get_energy, threadFnc):
# self.speaker_get_energy.stop()
# self.speaker_get_energy = None
def startCheckSpeakerEnergy(self, fnc, end_fnc, error_fnc=None):
speaker_device_list = getOutputDevices()
choice_speaker_device = [device for device in speaker_device_list if device["name"] == config.CHOICE_SPEAKER_DEVICE]
if len(choice_speaker_device) == 0:
try:
error_fnc()
except Exception:
pass
return
def sendSpeakerEnergy():
if speaker_energy_queue.empty() is False:
energy = speaker_energy_queue.get()
try:
fnc(energy)
except Exception:
pass
sleep(0.01)
speaker_energy_queue = Queue()
speaker_device = choice_speaker_device[0]
self.speaker_energy_recorder = SelectedSpeakerEnergyRecorder(speaker_device)
self.speaker_energy_recorder.recordIntoQueue(speaker_energy_queue)
self.speaker_energy_plot_progressbar = threadFnc(sendSpeakerEnergy, end_fnc=end_fnc)
self.speaker_energy_plot_progressbar.daemon = True
self.speaker_energy_plot_progressbar.start()
def stopCheckSpeakerEnergy(self):
if isinstance(self.speaker_energy_plot_progressbar, threadFnc):
self.speaker_energy_plot_progressbar.stop()
self.speaker_energy_plot_progressbar = None
if isinstance(self.speaker_energy_recorder, SelectedSpeakerEnergyRecorder):
self.speaker_energy_recorder.stop()
self.speaker_energy_recorder = None
def notificationXSOverlay(self, message):
xsoverlayForVRCT(content=f"{message}")
def createOverlayImageShort(self, message, translation):
your_language = config.TARGET_LANGUAGE
target_language = config.SOURCE_LANGUAGE
ui_type = config.OVERLAY_UI_TYPE
self.pre_overlay_message = {
"message" : message,
"your_language" : your_language,
"translation" : translation,
"target_language" : target_language,
"ui_type" : ui_type,
}
return self.overlay_image.createOverlayImageShort(message, your_language, translation, target_language, ui_type)
# def createOverlayImageLong(self, message_type, message, translation):
# your_language = config.TARGET_LANGUAGE if message_type == "receive" else config.SOURCE_LANGUAGE
# target_language = config.SOURCE_LANGUAGE if message_type == "receive" else config.TARGET_LANGUAGE
# return self.overlay_image.create_overlay_image_long(message_type, message, your_language, translation, target_language)
def clearOverlayImage(self):
self.overlay.clearImage()
def updateOverlay(self, img):
self.overlay.updateImage(img)
def startOverlay(self):
self.overlay.startOverlay()
def updateOverlayPosition(self):
self.overlay.updatePosition(
config.OVERLAY_SMALL_LOG_SETTINGS["x_pos"],
config.OVERLAY_SMALL_LOG_SETTINGS["y_pos"],
config.OVERLAY_SMALL_LOG_SETTINGS["z_pos"],
config.OVERLAY_SMALL_LOG_SETTINGS["x_rotation"],
config.OVERLAY_SMALL_LOG_SETTINGS["y_rotation"],
config.OVERLAY_SMALL_LOG_SETTINGS["z_rotation"],
)
def updateOverlayTimes(self):
display_duration = config.OVERLAY_SMALL_LOG_SETTINGS["display_duration"]
self.overlay.updateDisplayDuration(display_duration)
fadeout_duration = config.OVERLAY_SMALL_LOG_SETTINGS["fadeout_duration"]
self.overlay.updateFadeoutDuration(fadeout_duration)
def updateOverlayImageOpacity(self):
opacity = config.OVERLAY_SETTINGS["opacity"]
self.overlay.updateOpacity(opacity, with_fade=True)
def updateOverlayImageUiScaling(self):
ui_scaling = config.OVERLAY_SETTINGS["ui_scaling"]
self.overlay.updateUiScaling(ui_scaling)
def shutdownOverlay(self):
self.overlay.shutdownOverlay()
model = Model()

View File

@@ -0,0 +1,103 @@
from time import sleep
from pythonosc import osc_message_builder
from pythonosc import udp_client
from pythonosc import dispatcher
from pythonosc import osc_server
from tinyoscquery.queryservice import OSCQueryService
from tinyoscquery.query import OSCQueryBrowser, OSCQueryClient
from tinyoscquery.utility import get_open_udp_port, get_open_tcp_port
# send OSC message typing
def sendTyping(flag=False, ip_address="127.0.0.1", port=9000):
typing = osc_message_builder.OscMessageBuilder(address="/chatbox/typing")
typing.add_arg(flag)
b_typing = typing.build()
client = udp_client.SimpleUDPClient(ip_address, port)
client.send(b_typing)
# send OSC message
def sendMessage(message=None, ip_address="127.0.0.1", port=9000):
if message is not None:
msg = osc_message_builder.OscMessageBuilder(address="/chatbox/input")
msg.add_arg(f"{message}")
msg.add_arg(True)
msg.add_arg(True)
b_msg = msg.build()
client = udp_client.SimpleUDPClient(ip_address, port)
client.send(b_msg)
def sendTestAction(ip_address="127.0.0.1", port=9000):
client = udp_client.SimpleUDPClient(ip_address, port)
client.send_message("/input/Vertical", 1)
sleep(0.01)
client.send_message("/input/Vertical", False)
# send Input Voice
def sendInputVoice(flag=False, ip_address="127.0.0.1", port=9000):
input_voice = osc_message_builder.OscMessageBuilder(address="/input/Voice")
input_voice.add_arg(flag)
b_input_voice = input_voice.build()
client = udp_client.SimpleUDPClient(ip_address, port)
client.send(b_input_voice)
def sendChangeVoice(ip_address="127.0.0.1", port=9000):
sendInputVoice(flag=0, ip_address=ip_address, port=port)
sleep(0.05)
sendInputVoice(flag=1, ip_address=ip_address, port=port)
sleep(0.05)
sendInputVoice(flag=0, ip_address=ip_address, port=port)
sleep(0.05)
def getOSCParameterValue(address, server_name="VRChat-Client"):
value = None
try:
browser = OSCQueryBrowser()
sleep(1)
service = browser.find_service_by_name(server_name)
if service is not None:
oscq = OSCQueryClient(service)
mute_self_node = oscq.query_node(address)
value = mute_self_node.value[0]
browser.zc.close()
browser.browser.cancel()
except Exception:
pass
return value
def receiveOscParameters(dict_filter_and_target, ip_address="127.0.0.1", title="VRCT"):
osc_port = get_open_udp_port()
http_port = get_open_tcp_port()
osc_dispatcher = dispatcher.Dispatcher()
for filter, target in dict_filter_and_target.items():
osc_dispatcher.map(filter, target)
osc_udp_server = osc_server.ThreadingOSCUDPServer((ip_address, osc_port), osc_dispatcher)
osc_client = OSCQueryService(title, http_port, osc_port)
for filter, target in dict_filter_and_target.items():
osc_client.advertise_endpoint(filter)
osc_udp_server.serve_forever()
if __name__ == "__main__":
osc_parameter_prefix = "/avatar/parameters/"
osc_avatar_change_path = "/avatar/change"
param_MuteSelf = "MuteSelf"
param_Voice = "Voice"
def print_handler_all(address, *args):
print(f"all {address}: {args}")
def print_handler_muteself(address, *args):
print(f"muteself {address}: {args}")
def print_handler_voice(address, *args):
print(f"voice {address}: {args}")
dict_filter_and_target = {
# osc_parameter_prefix + "*": print_handler_all,
osc_parameter_prefix + param_MuteSelf: print_handler_muteself,
osc_parameter_prefix + param_Voice: print_handler_voice,
}
receiveOscParameters(dict_filter_and_target)

View File

@@ -0,0 +1,304 @@
import os
import ctypes
import time
from psutil import process_iter
from threading import Thread
import openvr
import numpy as np
from PIL import Image
try:
from . import overlay_utils as utils
except ImportError:
import overlay_utils as utils
def mat34Id(array):
arr = openvr.HmdMatrix34_t()
for i in range(3):
for j in range(4):
arr[i][j] = array[i][j]
return arr
def getBaseMatrix(x_pos, y_pos, z_pos, x_rotation, y_rotation, z_rotation):
arr = np.zeros((3, 4))
rot = utils.euler_to_rotation_matrix((x_rotation, y_rotation, z_rotation))
for i in range(3):
for j in range(3):
arr[i][j] = rot[i][j]
arr[0][3] = x_pos * z_pos
arr[1][3] = y_pos * z_pos
arr[2][3] = - z_pos
return arr
def getHMDBaseMatrix():
x_pos = 0.0
y_pos = -0.4
z_pos = 1.0
x_rotation = 0.0
y_rotation = 0.0
z_rotation = 0.0
arr = getBaseMatrix(x_pos, y_pos, z_pos, x_rotation, y_rotation, z_rotation)
return arr
def getLeftHandBaseMatrix():
x_pos = 0.0
y_pos = -0.06
z_pos = -0.14
x_rotation = -62.0
y_rotation = 154.0
z_rotation = 71.0
arr = getBaseMatrix(x_pos, y_pos, z_pos, x_rotation, y_rotation, z_rotation)
return arr
def getRightHandBaseMatrix():
x_pos = 0.0
y_pos = -0.06
z_pos = -0.14
x_rotation = -62.0
y_rotation = -154.0
z_rotation = -71.0
arr = getBaseMatrix(x_pos, y_pos, z_pos, x_rotation, y_rotation, z_rotation)
return arr
class Overlay:
def __init__(self, x_pos, y_pos, z_pos, x_rotation, y_rotation, z_rotation, display_duration, fadeout_duration, opacity, ui_scaling):
self.initialized = False
settings = {
"color": [1, 1, 1],
"opacity": opacity,
"x_pos": x_pos,
"y_pos": y_pos,
"z_pos": z_pos,
"x_rotation": x_rotation,
"y_rotation": y_rotation,
"z_rotation": z_rotation,
"display_duration": display_duration,
"fadeout_duration": fadeout_duration,
"ui_scaling": ui_scaling,
}
self.settings = settings
self.system = None
self.overlay = None
self.handle = None
self.lastUpdate = time.monotonic()
self.thread_overlay = None
self.fadeRatio = 1
self.loop = True
def init(self):
try:
self.system = openvr.init(openvr.VRApplication_Background)
self.overlay = openvr.IVROverlay()
self.overlay_system = openvr.IVRSystem()
self.handle = self.overlay.createOverlay("Overlay_Speaker2log", "SOverlay_Speaker2log_UI")
self.overlay.showOverlay(self.handle)
self.initialized = True
self.updateImage(Image.new("RGBA", (1, 1), (0, 0, 0, 0)))
self.updateColor(self.settings["color"])
self.updateOpacity(self.settings["opacity"])
self.updateUiScaling(self.settings["ui_scaling"])
self.updatePosition(
self.settings["x_pos"],
self.settings["y_pos"],
self.settings["z_pos"],
self.settings["x_rotation"],
self.settings["y_rotation"],
self.settings["z_rotation"],
)
except Exception as e:
print("Could not initialise OpenVR", e)
def updateImage(self, img):
if self.initialized is True:
width, height = img.size
img = img.tobytes()
img = (ctypes.c_char * len(img)).from_buffer_copy(img)
self.overlay.setOverlayRaw(self.handle, img, width, height, 4)
self.updateOpacity(self.settings["opacity"])
self.lastUpdate = time.monotonic()
def clearImage(self):
if self.initialized is True:
self.updateImage(Image.new("RGBA", (1, 1), (0, 0, 0, 0)))
def updateColor(self, col):
"""
col is a 3-tuple representing (r, g, b)
"""
self.settings["color"] = col
if self.initialized is True:
r, g, b = self.settings["color"]
self.overlay.setOverlayColor(self.handle, r, g, b)
def updateOpacity(self, opacity, with_fade=False):
self.settings["opacity"] = opacity
if self.initialized is True:
if with_fade is True:
if self.fadeRatio > 0:
self.overlay.setOverlayAlpha(self.handle, self.fadeRatio * self.settings["opacity"])
else:
self.overlay.setOverlayAlpha(self.handle, self.settings["opacity"])
def updateUiScaling(self, ui_scaling):
self.settings["ui_scaling"] = ui_scaling
if self.initialized is True:
self.overlay.setOverlayWidthInMeters(self.handle, self.settings["ui_scaling"])
def updatePosition(self, x_pos, y_pos, z_pos, x_rotation, y_rotation, z_rotation, tracker="HMD"):
"""
x_pos, y_pos, z_pos are floats representing the position of overlay
x_rotation, y_rotation, z_rotation are floats representing the rotation of overlay
tracker is a string representing the tracker to use ("HMD", "LeftHand", "RightHand")
"""
self.settings["x_pos"] = x_pos
self.settings["y_pos"] = y_pos
self.settings["z_pos"] = z_pos
self.settings["x_rotation"] = x_rotation
self.settings["y_rotation"] = y_rotation
self.settings["z_rotation"] = z_rotation
match tracker:
case "HMD":
base_matrix = getHMDBaseMatrix()
trackerIndex = openvr.k_unTrackedDeviceIndex_Hmd
case "LeftHand":
base_matrix = getLeftHandBaseMatrix()
trackerIndex = self.overlay_system.getTrackedDeviceIndexForControllerRole(openvr.TrackedControllerRole_LeftHand)
case "RightHand":
base_matrix = getRightHandBaseMatrix()
trackerIndex = self.overlay_system.getTrackedDeviceIndexForControllerRole(openvr.TrackedControllerRole_RightHand)
case _:
base_matrix = getHMDBaseMatrix()
trackerIndex = openvr.k_unTrackedDeviceIndex_Hmd
translation = (self.settings["x_pos"], self.settings["y_pos"], - self.settings["z_pos"])
rotation = (self.settings["x_rotation"], self.settings["y_rotation"], self.settings["z_rotation"])
transform = utils.transform_matrix(base_matrix, translation, rotation)
self.transform = mat34Id(transform)
if self.initialized is True:
self.overlay.setOverlayTransformTrackedDeviceRelative(
self.handle,
trackerIndex,
self.transform
)
def updateDisplayDuration(self, display_duration):
self.settings["display_duration"] = display_duration
def updateFadeoutDuration(self, fadeout_duration):
self.settings["fadeout_duration"] = fadeout_duration
def checkActive(self):
try:
if self.system is not None and self.initialized is True:
new_event = openvr.VREvent_t()
while self.system.pollNextEvent(new_event):
if new_event.eventType == openvr.VREvent_Quit:
return False
return True
except Exception as e:
print("Could not check SteamVR running")
print(e)
return False
def evaluateOpacityFade(self, lastUpdate, currentTime):
if (currentTime - lastUpdate) > self.settings["display_duration"]:
timeThroughInterval = currentTime - lastUpdate - self.settings["display_duration"]
self.fadeRatio = 1 - timeThroughInterval / self.settings["fadeout_duration"]
if self.fadeRatio < 0:
self.fadeRatio = 0
self.overlay.setOverlayAlpha(self.handle, self.fadeRatio * self.settings["opacity"])
def update(self):
currTime = time.monotonic()
if self.settings["fadeout_duration"] != 0:
self.evaluateOpacityFade(self.lastUpdate, currTime)
else:
self.updateOpacity(self.settings["opacity"])
def mainloop(self):
self.loop = True
while self.checkActive() is True and self.loop is True:
startTime = time.monotonic()
self.update()
sleepTime = (1 / 16) - (time.monotonic() - startTime)
if sleepTime > 0:
time.sleep(sleepTime)
def main(self):
self.init()
if self.initialized is True:
self.mainloop()
def startOverlay(self):
self.thread_overlay = Thread(target=self.main)
self.thread_overlay.daemon = True
self.thread_overlay.start()
def shutdownOverlay(self):
if isinstance(self.thread_overlay, Thread):
self.loop = False
self.thread_overlay.join()
self.thread_overlay = None
if isinstance(self.overlay, openvr.IVROverlay) and isinstance(self.handle, int):
self.overlay.destroyOverlay(self.handle)
self.overlay = None
if isinstance(self.system, openvr.IVRSystem):
openvr.shutdown()
self.system = None
self.initialized = False
@staticmethod
def checkSteamvrRunning() -> bool:
_proc_name = "vrmonitor.exe" if os.name == "nt" else "vrmonitor"
return _proc_name in (p.name() for p in process_iter())
if __name__ == "__main__":
# from overlay_image import OverlayImage
# overlay_image = OverlayImage()
# overlay = Overlay(0, 0, 1, 1, 0, 1, 1)
# overlay.startOverlay()
# time.sleep(1)
# # Example usage
# img = overlay_image.createOverlayImageShort("こんにちは、世界!さようなら", "Japanese", "Hello,World!Goodbye", "Japanese")
# overlay.updateImage(img)
# time.sleep(100000)
# for i in range(100):
# print(i)
# overlay = Overlay(0, 0, 1, 1, 1, 1, 1)
# overlay.startOverlay()
# time.sleep(1)
# # Example usage
# img = overlay_image.createOverlayImageShort("こんにちは、世界!さようなら", "Japanese", "Hello,World!Goodbye", "Japanese", ui_type="sakura")
# overlay.updateImage(img)
# time.sleep(0.5)
# img = overlay_image.createOverlayImageShort("こんにちは、世界!さようなら", "Japanese", "Hello,World!Goodbye", "Japanese")
# overlay.updateImage(img)
# time.sleep(0.5)
# overlay.shutdownOverlay()
x_pos = 0
y_pos = 0
z_pos = 0
x_rotation = 0
y_rotation = 0
z_rotation = 0
base_matrix = getLeftHandBaseMatrix()
translation = (x_pos * z_pos, y_pos * z_pos, z_pos)
rotation = (x_rotation, y_rotation, z_rotation)
transform = utils.transform_matrix(base_matrix, translation, rotation)
transform = mat34Id(transform)
print(transform)

View File

@@ -0,0 +1,231 @@
from os import path as os_path
# from datetime import datetime
from typing import Tuple
from PIL import Image, ImageDraw, ImageFont
class OverlayImage:
# TEXT_COLOR_LARGE = (223, 223, 223)
# TEXT_COLOR_SMALL = (190, 190, 190)
# TEXT_COLOR_SEND = (70, 161, 146)
# TEXT_COLOR_RECEIVE = (220, 20, 60)
# TEXT_COLOR_TIME = (120, 120, 120)
# FONT_SIZE_LARGE = HEIGHT
# FONT_SIZE_SMALL = int(FONT_SIZE_LARGE * 2 / 3)
LANGUAGES = {
"Japanese": "NotoSansJP-Regular",
"Korean": "NotoSansKR-Regular",
"Chinese Simplified": "NotoSansSC-Regular",
"Chinese Traditional": "NotoSansTC-Regular",
}
def __init__(self):
pass
@staticmethod
def concatenateImagesVertically(img1: Image, img2: Image) -> Image:
dst = Image.new("RGBA", (img1.width, img1.height + img2.height))
dst.paste(img1, (0, 0))
dst.paste(img2, (0, img1.height))
return dst
@staticmethod
def addImageMargin(image: Image, top: int, right: int, bottom: int, left: int, color: Tuple[int, int, int, int]) -> Image:
width, height = image.size
new_width = width + right + left
new_height = height + top + bottom
result = Image.new(image.mode, (new_width, new_height), color)
result.paste(image, (left, top))
return result
# def create_textimage(self, message_type, size, text, language):
# font_size = self.FONT_SIZE_LARGE if size == "large" else self.FONT_SIZE_SMALL
# text_color = self.TEXT_COLOR_LARGE if size == "large" else self.TEXT_COLOR_SMALL
# anchor = "lm" if message_type == "receive" else "rm"
# text_x = 0 if message_type == "receive" else self.WIDTH
# align = "left" if message_type == "receive" else "right"
# font_family = self.LANGUAGES.get(language, "NotoSansJP-Regular")
# img = Image.new("RGBA", (0, 0), (0, 0, 0, 0))
# draw = ImageDraw.Draw(img)
# font = ImageFont.truetype(os_path.join(os_path.dirname(__file__), "fonts", f"{font_family}.ttf"), font_size)
# # font = ImageFont.truetype(os_path.join("./fonts", f"{font_family}.ttf"), font_size)
# text_width = draw.textlength(text, font)
# character_width = text_width // len(text)
# character_line_num = int(self.WIDTH // character_width)
# if len(text) > character_line_num:
# text = "\n".join([text[i:i+character_line_num] for i in range(0, len(text), character_line_num)])
# n_num = len(text.split("\n")) - 1
# text_height = int(font_size*(n_num+2))
# img = Image.new("RGBA", (self.WIDTH, text_height), (0, 0, 0, 0))
# draw = ImageDraw.Draw(img)
# text_y = text_height // 2
# draw.multiline_text((text_x, text_y), text, text_color, anchor=anchor, stroke_width=0, font=font, align=align)
# return img
# def create_textimage_message_type(self, message_type):
# anchor = "lm" if message_type == "receive" else "rm"
# text = "Receive" if message_type == "receive" else "Send"
# text_color = self.TEXT_COLOR_RECEIVE if message_type == "receive" else self.TEXT_COLOR_SEND
# text_color_time = self.TEXT_COLOR_TIME
# now = datetime.now()
# formatted_time = now.strftime("%H:%M")
# font_size = self.FONT_SIZE_SMALL
# img = Image.new("RGBA", (0, 0), (0, 0, 0, 0))
# draw = ImageDraw.Draw(img)
# font = ImageFont.truetype(os_path.join(os_path.dirname(__file__), "fonts", "NotoSansJP-Regular.ttf"), font_size)
# # font = ImageFont.truetype(os_path.join("./fonts", "NotoSansJP-Regular.ttf"), font_size)
# text_height = font_size*2
# text_width = draw.textlength(formatted_time, font)
# character_width = text_width // len(formatted_time)
# img = Image.new("RGBA", (self.WIDTH, text_height), (0, 0, 0, 0))
# draw = ImageDraw.Draw(img)
# text_y = text_height // 2
# text_time_x = 0 if message_type == "receive" else self.WIDTH - (text_width + character_width)
# text_x = (text_width + character_width) if message_type == "receive" else self.WIDTH
# draw.text((text_time_x, text_y), formatted_time, text_color_time, anchor=anchor, stroke_width=0, font=font)
# draw.text((text_x, text_y), text, text_color, anchor=anchor, stroke_width=0, font=font)
# return img
# def create_textbox(self, message_type, message, your_language, translation, target_language):
# message_type_img = self.create_textimage_message_type(message_type)
# if len(translation) > 0 and target_language is not None:
# img = self.create_textimage(message_type, "small", message, your_language)
# translation_img = self.create_textimage(message_type, "large",translation, target_language)
# img = self.concatenateImagesVertically(img, translation_img)
# else:
# img = self.create_textimage(message_type, "large", message, your_language)
# return self.concatenateImagesVertically(message_type_img, img)
# def create_overlay_image_long(self, message_type, message, your_language, translation="", target_language=None):
# if len(self.log_data) > 10:
# self.log_data.pop(0)
# self.log_data.append(
# {
# "message_type":message_type,
# "message":message,
# "your_language":your_language,
# "translation":translation,
# "target_language":target_language,
# }
# )
# imgs = []
# for log in self.log_data:
# message_type = log["message_type"]
# message = log["message"]
# your_language = log["your_language"]
# translation = log["translation"]
# target_language = log["target_language"]
# img = self.create_textbox(message_type, message, your_language, translation, target_language)
# imgs.append(img)
# img = imgs[0]
# for i in imgs[1:]:
# img = self.concatenateImagesVertically(img, i)
# img = self.addImageMargin(img, 0, 20, 0, 20, (0, 0, 0, 0))
# width, height = img.size
# background = Image.new("RGBA", (width, height), (0, 0, 0, 0))
# draw = ImageDraw.Draw(background)
# draw.rounded_rectangle([(0, 0), (width, height)], radius=15, fill=self.BACKGROUND_COLOR, outline=self.BACKGROUND_OUTLINE_COLOR, width=5)
# img = Image.alpha_composite(background, img)
# return img
def getUiSize(self):
return {
"width": int(960*4),
"height": int(23*4),
"font_size": int(23*4),
}
def getUiColors(self, ui_type):
match ui_type:
case "default":
background_color = (41, 42, 45)
background_outline_color = (41, 42, 45)
text_color = (223, 223, 223)
case "sakura":
background_color = (225, 40, 30)
background_outline_color = (255, 255, 255)
text_color = (223, 223, 223)
return {
"background_color": background_color,
"background_outline_color": background_outline_color,
"text_color": text_color
}
def createDecorationImage(self, ui_type, image_size):
decoration_image = Image.new("RGBA", image_size, (0, 0, 0, 0))
match ui_type:
case "default":
pass
case "sakura":
margin = 7
alpha_ratio = 0.4
overlay_tl = Image.open(os_path.join(os_path.dirname(os_path.dirname(os_path.dirname(__file__))), "img", "overlay_tl_sakura.png"))
overlay_br = Image.open(os_path.join(os_path.dirname(os_path.dirname(os_path.dirname(__file__))), "img", "overlay_br_sakura.png"))
if overlay_tl.size[1] > image_size[1]:
overlay_tl = overlay_tl.resize((image_size[1]-margin, image_size[1]-margin))
if overlay_br.size[1] > image_size[1]:
overlay_br = overlay_br.resize((image_size[1]-margin, image_size[1]-margin))
alpha = overlay_tl.getchannel("A")
alpha = alpha.point(lambda x: x * alpha_ratio)
overlay_tl.putalpha(alpha)
alpha = overlay_br.getchannel("A")
alpha = alpha.point(lambda x: x * alpha_ratio)
overlay_br.putalpha(alpha)
decoration_image.paste(overlay_tl, (margin, margin))
decoration_image.paste(overlay_br, (image_size[0]-overlay_br.size[0]-margin, image_size[1]-overlay_br.size[1]-margin))
return decoration_image
def createTextboxShort(self, text, language, text_color, base_width, base_height, font_size):
font_family = self.LANGUAGES.get(language, "NotoSansJP-Regular")
img = Image.new("RGBA", (base_width, base_height), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
font = ImageFont.truetype(os_path.join(os_path.dirname(os_path.dirname(os_path.dirname(__file__))), "fonts", f"{font_family}.ttf"), font_size)
text_width = draw.textlength(text, font)
character_width = text_width // len(text)
character_line_num = int((base_width) // character_width) - 12
if len(text) > character_line_num:
text = "\n".join([text[i:i+character_line_num] for i in range(0, len(text), character_line_num)])
text_height = font_size * (len(text.split("\n")) + 1) + 20
img = Image.new("RGBA", (base_width, text_height), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
text_x = base_width // 2
text_y = text_height // 2
draw.text((text_x, text_y), text, text_color, anchor="mm", stroke_width=0, font=font, align="center")
return img
def createOverlayImageShort(self, message, your_language, translation="", target_language=None, ui_type="default"):
ui_size = self.getUiSize()
height = ui_size["height"]
width = ui_size["width"]
font_size = ui_size["font_size"]
ui_colors = self.getUiColors(ui_type)
text_color = ui_colors["text_color"]
background_color = ui_colors["background_color"]
background_outline_color = ui_colors["background_outline_color"]
img = self.createTextboxShort(message, your_language, text_color, width, height, font_size)
if len(translation) > 0 and target_language is not None:
translation_img = self.createTextboxShort(translation, target_language, text_color, width, height, font_size)
img = self.concatenateImagesVertically(img, translation_img)
background = Image.new("RGBA", img.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(background)
draw.rounded_rectangle([(0, 0), img.size], radius=30, fill=background_color, outline=background_outline_color, width=5)
decoration_image = self.createDecorationImage(ui_type, img.size)
background = Image.alpha_composite(background, decoration_image)
img = Image.alpha_composite(background, img)
return img

View File

@@ -0,0 +1,87 @@
import numpy as np
def toHomogeneous(matrix):
homogeneous_matrix = np.vstack([matrix, [0, 0, 0, 1]])
return homogeneous_matrix
# 移動行列を生成する関数
def calcTranslationMatrix(translation):
tx, ty, tz = translation
return np.array([
[1, 0, 0, tx],
[0, 1, 0, ty],
[0, 0, 1, tz],
[0, 0, 0, 1]
])
# X軸周りの回転行列を生成する関数
def calcRotationMatrixX(angle):
c = np.cos(np.pi/180*angle)
s = np.sin(np.pi/180*angle)
return np.array([
[1, 0, 0, 0],
[0, c, -s, 0],
[0, s, c, 0],
[0, 0, 0, 1]
])
# Y軸周りの回転行列を生成する関数
def calcRotationMatrixY(angle):
c = np.cos(np.pi/180*angle)
s = np.sin(np.pi/180*angle)
return np.array([
[c, 0, s, 0],
[0, 1, 0, 0],
[-s, 0, c, 0],
[0, 0, 0, 1]
])
# Z軸周りの回転行列を生成する関数
def calcRotationMatrixZ(angle):
c = np.cos(np.pi/180*angle)
s = np.sin(np.pi/180*angle)
return np.array([
[c, -s, 0, 0],
[s, c, 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]
])
# 3x4行列の座標を基準として回転や移動を行う関数
def transform_matrix(base_matrix, translation, rotation):
homogeneous_base_matrix = toHomogeneous(base_matrix)
translation_matrix = calcTranslationMatrix(translation)
rotation_matrix_x = calcRotationMatrixX(rotation[0])
rotation_matrix_y = calcRotationMatrixY(rotation[1])
rotation_matrix_z = calcRotationMatrixZ(rotation[2])
rotation_matrix = np.dot(rotation_matrix_z, np.dot(rotation_matrix_y, rotation_matrix_x))
transformation_matrix = translation_matrix.copy()
transformation_matrix[:3, :3] = rotation_matrix[:3, :3]
result_matrix = np.dot(homogeneous_base_matrix, transformation_matrix)
return result_matrix[:3, :]
def euler_to_rotation_matrix(angles):
phi = angles[0] * np.pi / 180
theta = angles[1] * np.pi / 180
psi = angles[2]* np.pi / 180
R_x = np.array([[1, 0, 0],
[0, np.cos(phi), -np.sin(phi)],
[0, np.sin(phi), np.cos(phi)]])
R_y = np.array([[np.cos(theta), 0, np.sin(theta)],
[0, 1, 0],
[-np.sin(theta), 0, np.cos(theta)]])
R_z = np.array([[np.cos(psi), -np.sin(psi), 0],
[np.sin(psi), np.cos(psi), 0],
[0, 0, 1]])
return np.dot(R_z, np.dot(R_y, R_x))
if __name__ == "__main__":
base_matrix = np.array([
[1, 0, 0, 1],
[0, 1, 0, 1],
[0, 0, 1, 1]
])
translation = [1, 2, 3]
rotation = [0, 0, 90]
result_matrix = transform_matrix(base_matrix, translation, rotation)
print(result_matrix)

View File

@@ -0,0 +1,730 @@
transcription_lang = {
"Afrikaans":{
"South Africa":{
"Google": "af-ZA",
"Whisper": "af",
},
},
"Albanian":{
"Albania":{
"Google": "sq-AL",
"Whisper": "sq",
},
},
"Amharic":{
"Ethiopia":{
"Google": "am-ET",
"Whisper": "am",
},
},
"Arabic":{
"Algeria":{
"Google": "ar-DZ",
"Whisper": "ar",
},
"Bahrain":{
"Google": "ar-BH",
"Whisper": "ar",
},
"Egypt":{
"Google": "ar-EG",
"Whisper": "ar",
},
"Israel":{
"Google": "ar-IL",
"Whisper": "ar",
},
"Iraq":{
"Google": "ar-IQ",
"Whisper": "ar",
},
"Jordan":{
"Google": "ar-JO",
"Whisper": "ar",
},
"Kuwait":{
"Google": "ar-KW",
"Whisper": "ar",
},
"Lebanon":{
"Google": "ar-LB",
"Whisper": "ar",
},
"Mauritania":{
"Google": "ar-MR",
"Whisper": "ar",
},
"Morocco":{
"Google": "ar-MA",
"Whisper": "ar",
},
"Oman":{
"Google": "ar-OM",
"Whisper": "ar",
},
"Qatar":{
"Google": "ar-QA",
"Whisper": "ar",
},
"Saudi Arabia":{
"Google": "ar-SA",
"Whisper": "ar",
},
"Palestine":{
"Google": "ar-PS",
"Whisper": "ar",
},
"Syria":{
"Google": "ar-SY",
"Whisper": "ar",
},
"Tunisia":{
"Google": "ar-TN",
"Whisper": "ar",
},
"United Arab Emirates":{
"Google": "ar-AE",
"Whisper": "ar",
},
"Yemen":{
"Google": "ar-YE",
"Whisper": "ar",
},
},
"Armenian": {
"Armenia": {
"Google": "hy-AM",
"Whisper": "hy",
},
},
"Azerbaijani": {
"Azerbaijan": {
"Google": "az-AZ",
"Whisper": "az",
},
},
"Basque":{
"Spain":{
"Google": "eu-ES",
"Whisper": "eu",
},
},
"Bengali":{
"Bangladesh":{
"Google": "bn-BD",
"Whisper": "bn",
},
"India":{
"Google": "bn-IN",
"Whisper": "bn",
},
},
"Bosnian":{
"Bosnia and Herzegovina":{
"Google": "bs-BA",
"Whisper": "bs",
}
},
"Bulgarian":{
"Bulgaria":{
"Google": "bg-BG",
"Whisper": "bg",
},
},
"Burmese":{
"Myanmar":{
"Google": "my-MM",
"Whisper": "my",
},
},
"Catalan":{
"Spain":{
"Google": "ca-ES",
"Whisper": "ca",
},
},
"Chinese Simplified":{
"China":{
"Google": "cmn-Hans-CN",
"Whisper": "zh",
},
"Hong Kong":{
"Google": "cmn-Hans-HK",
"Whisper": "zh",
},
},
"Chinese Traditional":{
"Taiwan":{
"Google": "cmn-Hant-TW",
"Whisper": "zh",
},
"Hong Kong":{
"Google": "yue-Hant-HK",
"Whisper": "yue",
},
},
"Croatian":{
"Croatia":{
"Google": "hr-HR",
"Whisper": "hr",
},
},
"Czech":{
"Czech Republic":{
"Google": "cs-CZ",
"Whisper": "cs",
},
},
"Danish":{
"Denmark":{
"Google": "da-DK",
"Whisper": "da",
},
},
"Dutch":{
"Belgium":{
"Google": "nl-BE",
"Whisper": "nl",
},
"Netherlands":{
"Google": "nl-NL",
"Whisper": "nl",
},
},
"English": {
"Australia":{
"Google": "en-AU",
"Whisper": "en",
},
"Canada":{
"Google": "en-CA",
"Whisper": "en",
},
"Ghana":{
"Google": "en-GH",
"Whisper": "en",
},
"Hong Kong":{
"Google": "en-HK",
"Whisper": "en",
},
"India":{
"Google": "en-IN",
"Whisper": "en",
},
"Ireland":{
"Google": "en-IE",
"Whisper": "en",
},
"Kenya":{
"Google": "en-KE",
"Whisper": "en",
},
"New Zealand":{
"Google": "en-NZ",
"Whisper": "en",
},
"Nigeria":{
"Google": "en-NG",
"Whisper": "en",
},
"Philippines":{
"Google": "en-PH",
"Whisper": "en",
},
"Singapore":{
"Google": "en-SG",
"Whisper": "en",
},
"South Africa":{
"Google": "en-ZA",
"Whisper": "en",
},
"Tanzania":{
"Google": "en-TZ",
"Whisper": "en",
},
"United Kingdom":{
"Google": "en-GB",
"Whisper": "en",
},
"United States":{
"Google": "en-US",
"Whisper": "en",
},
},
"Estonian":{
"Estonia":{
"Google": "et-EE",
"Whisper": "et",
},
},
"Filipino":{
"Philippines":{
"Google": "fil-PH",
"Whisper": "tl",
},
},
"Finnish":{
"Finland":{
"Google": "fi-FI",
"Whisper": "fi",
},
},
"French":{
"Belgium":{
"Google": "fr-BE",
"Whisper": "fr",
},
"Canada":{
"Google": "fr-CA",
"Whisper": "fr",
},
"France":{
"Google": "fr-FR",
"Whisper": "fr",
},
"Switzerland":{
"Google": "fr-CH",
"Whisper": "fr",
},
},
"Galician":{
"Spain":{
"Google": "gl-ES",
"Whisper": "gl",
},
},
"Georgian":{
"Georgia":{
"Google": "ka-GE",
"Whisper": "ka",
},
},
"German":{
"Austria":{
"Google": "de-AT",
"Whisper": "de",
},
"Germany":{
"Google": "de-DE",
"Whisper": "de",
},
"Switzerland":{
"Google": "de-CH",
"Whisper": "de",
},
},
"Greek":{
"Greece":{
"Google": "el-GR",
"Whisper": "el",
},
},
"Gujarati":{
"India":{
"Google": "gu-IN",
"Whisper": "gu",
},
},
"Hebrew":{
"Israel":{
"Google": "iw-IL",
"Whisper": "he",
},
},
"Hindi": {
"India":{
"Google": "hi-IN",
"Whisper": "hi",
},
},
"Hungarian":{
"Hungary":{
"Google": "hu-HU",
"Whisper": "hu",
},
},
"Icelandic":{
"Iceland":{
"Google": "is-IS",
"Whisper": "is",
},
},
"Indonesian":{
"Indonesia":{
"Google": "id-ID",
"Whisper": "id",
},
},
"Italian":{
"Italy":{
"Google": "it-IT",
"Whisper": "it",
},
"Switzerland":{
"Google": "it-CH",
"Whisper": "it",
},
},
"Japanese":{
"Japan":{
"Google": "ja-JP",
"Whisper": "ja",
},
},
# "Javanese":{
# "Indonesia":{
# "Google": "jv-ID",
# },
# },
"Kannada":{
"India":{
"Google": "kn-IN",
"Whisper": "kn",
},
},
"Kazakh":{
"Kazakhstan":{
"Google": "kk-KZ",
"Whisper": "kk",
},
},
"Khmer":{
"Cambodia":{
"Google": "km-KH",
"Whisper": "km",
},
},
# "Kinyarwanda":{
# "rwanda":{
# "Google": "rw-RW",
# },
# },
"Korean":{
"South Korea":{
"Google": "ko-KR",
"Whisper": "ko",
},
},
"Lao":{
"Laos":{
"Google": "lo-LA",
"Whisper": "lo",
},
},
"Latvian":{
"Latvia":{
"Google": "lv-LV",
"Whisper": "lv",
},
},
"Lithuanian":{
"Lithuania":{
"Google": "lt-LT",
"Whisper": "lt",
},
},
"Macedonian":{
"North Macedonia":{
"Google": "mk-MK",
"Whisper": "mk",
},
},
"Malay":{
"Malaysia":{
"Google": "ms-MY",
"Whisper": "ms",
},
},
"Malayalam":{
"India":{
"Google": "ml-IN",
"Whisper": "ml",
},
},
"Mongolian":{
"Mongolia":{
"Google": "mn-MN",
"Whisper": "mn",
},
},
"Nepali":{
"Nepal":{
"Google": "ne-NP",
"Whisper": "ne",
},
},
"Norwegian":{
"Norway":{
"Google": "no-NO",
"Whisper": "no",
},
},
"Persian":{
"Iran":{
"Google": "fa-IR",
"Whisper": "fa",
},
},
"Polish":{
"Poland":{
"Google": "pl-PL",
"Whisper": "pl",
},
},
"Portuguese":{
"Brazil":{
"Google": "pt-BR",
"Whisper": "pt",
},
"Portugal":{
"Google": "pt-PT",
"Whisper": "pt",
},
},
# "Punjabi":{
# "India":{
# "Google": "pa-Guru-IN",
# },
# },
"Romanian":{
"Romania":{
"Google": "ro-RO",
"Whisper": "ro",
},
},
"Russian":{
"Russia":{
"Google": "ru-RU",
"Whisper": "ru",
},
},
"Serbian":{
"Serbia":{
"Google": "sr-RS",
"Whisper": "sr",
},
},
"Sinhala":{
"Sri Lanka":{
"Google": "si-LK",
"Whisper": "si",
},
},
"Slovak":{
"Slovakia":{
"Google": "sk-SK",
"Whisper": "sk",
},
},
"Slovenian":{
"Slovenia":{
"Google": "sl-SI",
"Whisper": "sl",
},
},
# "Sesotho":{
# "South Africa":{
# "Google": "st-ZA",
# },
# },
"Spanish":{
"Argentina":{
"Google": "es-AR",
"Whisper": "es",
},
"Bolivia":{
"Google": "es-BO",
"Whisper": "es",
},
"Chile":{
"Google": "es-CL",
"Whisper": "es",
},
"Colombia":{
"Google": "es-CO",
"Whisper": "es",
},
"Costa Rica":{
"Google": "es-CR",
"Whisper": "es",
},
"Dominican Republic":{
"Google": "es-DO",
"Whisper": "es",
},
"Ecuador":{
"Google": "es-EC",
"Whisper": "es",
},
"El Salvador":{
"Google": "es-SV",
"Whisper": "es",
},
"Guatemala":{
"Google": "es-GT",
"Whisper": "es",
},
"Honduras":{
"Google": "es-HN",
"Whisper": "es",
},
"Mexico":{
"Google": "es-MX",
"Whisper": "es",
},
"Nicaragua":{
"Google": "es-NI",
"Whisper": "es",
},
"Panama":{
"Google": "es-PA",
"Whisper": "es",
},
"Paraguay":{
"Google": "es-PY",
"Whisper": "es",
},
"Peru":{
"Google": "es-PE",
"Whisper": "es",
},
"Puerto Rico":{
"Google": "es-PR",
"Whisper": "es",
},
"Spain":{
"Google": "es-ES",
"Whisper": "es",
},
"United States":{
"Google": "es-US",
"Whisper": "es",
},
"Uruguay":{
"Google": "es-UY",
"Whisper": "es",
},
"Venezuela":{
"Google": "es-VE",
"Whisper": "es",
},
},
"Sundanese":{
"Indonesia":{
"Google": "su-ID",
"Whisper": "su",
},
},
"Swahili":{
"Kenya":{
"Google": "sw-KE",
"Whisper": "sw",
},
"Tanzania":{
"Google": "sw-TZ",
"Whisper": "sw",
},
},
# "Swazi":{
# "Eswatini":{
# "Google": "ss-Latn-ZA",
# },
# },
"Swedish":{
"Sweden":{
"Google": "sv-SE",
"Whisper": "sv",
},
},
"Tamil":{
"India":{
"Google": "ta-IN",
"Whisper": "ta",
},
"malaysia":{
"Google": "ta-MY",
"Whisper": "ta",
},
"Singapore":{
"Google": "ta-SG",
"Whisper": "ta",
},
"Sri Lanka":{
"Google": "ta-LK",
"Whisper": "ta",
},
},
"Telugu":{
"India":{
"Google": "te-IN",
"Whisper": "te",
},
},
"Thai":{
"Thailand":{
"Google": "th-TH",
"Whisper": "th",
},
},
# "Tsonga":{
# "South Africa":{
# "Google": "ts-ZA",
# },
# },
# "Setswana":{
# "South Africa":{
# "Google": "tn-Latn-ZA",
# },
# },
"Turkish":{
"Turkey":{
"Google": "tr-TR",
"Whisper": "tr",
},
},
"Ukrainian":{
"Ukraine":{
"Google": "uk-UA",
"Whisper": "uk",
},
},
"Urdu":{
"India":{
"Google": "ur-IN",
"Whisper": "ur",
},
"Pakistan":{
"Google": "ur-PK",
"Whisper": "ur",
},
},
"Uzbek":{
"Uzbekistan":{
"Google": "uz-UZ",
"Whisper": "uz",
},
},
# "Venda":{
# "South Africa":{
# "Google": "ve-ZA",
# },
# },
"Vietnamese":{
"Vietnam":{
"Google": "vi-VN",
"Whisper": "vi",
},
},
# "Xhosa":{
# "South Africa":{
# "Google": "xh-ZA",
# },
# },
# "Zulu":{
# "South Africa":{
# "Google": "zu-ZA",
# },
# },
}

View File

@@ -0,0 +1,142 @@
from speech_recognition import Recognizer, Microphone
from pyaudiowpatch import get_sample_size, paInt16
from datetime import datetime
from queue import Queue
class BaseRecorder:
def __init__(self, source, energy_threshold, dynamic_energy_threshold, record_timeout):
self.recorder = Recognizer()
self.recorder.energy_threshold = energy_threshold
self.recorder.dynamic_energy_threshold = dynamic_energy_threshold
self.record_timeout = record_timeout
self.stop = None
if source is None:
raise ValueError("audio source can't be None")
self.source = source
def adjustForNoise(self):
with self.source:
self.recorder.adjust_for_ambient_noise(self.source)
def recordIntoQueue(self, audio_queue):
def record_callback(_, audio):
audio_queue.put((audio.get_raw_data(), datetime.now()))
self.stop, self.pause, self.resume = self.recorder.listen_in_background(self.source, record_callback, phrase_time_limit=self.record_timeout)
class SelectedMicRecorder(BaseRecorder):
def __init__(self, device, energy_threshold, dynamic_energy_threshold, record_timeout):
source=Microphone(
device_index=device['index'],
sample_rate=int(device["defaultSampleRate"]),
)
super().__init__(source=source, energy_threshold=energy_threshold, dynamic_energy_threshold=dynamic_energy_threshold, record_timeout=record_timeout)
# self.adjustForNoise()
class SelectedSpeakerRecorder(BaseRecorder):
def __init__(self, device, energy_threshold, dynamic_energy_threshold, record_timeout):
source = Microphone(speaker=True,
device_index= device["index"],
sample_rate=int(device["defaultSampleRate"]),
chunk_size=get_sample_size(paInt16),
channels=device["maxInputChannels"]
)
super().__init__(source=source, energy_threshold=energy_threshold, dynamic_energy_threshold=dynamic_energy_threshold, record_timeout=record_timeout)
# self.adjustForNoise()
class BaseEnergyRecorder:
def __init__(self, source):
self.recorder = Recognizer()
self.recorder.energy_threshold = 0
self.recorder.dynamic_energy_threshold = False
self.record_timeout = 0
self.stop = None
if source is None:
raise ValueError("audio source can't be None")
self.source = source
def adjustForNoise(self):
with self.source:
self.recorder.adjust_for_ambient_noise(self.source)
def recordIntoQueue(self, energy_queue):
def recordCallback(_, energy):
energy_queue.put(energy)
self.stop, self.pause, self.resume = self.recorder.listen_energy_in_background(self.source, recordCallback)
class SelectedMicEnergyRecorder(BaseEnergyRecorder):
def __init__(self, device):
source=Microphone(
device_index=device['index'],
sample_rate=int(device["defaultSampleRate"]),
)
super().__init__(source=source)
# self.adjustForNoise()
class SelectedSpeakerEnergyRecorder(BaseEnergyRecorder):
def __init__(self, device):
source = Microphone(speaker=True,
device_index= device["index"],
sample_rate=int(device["defaultSampleRate"]),
channels=device["maxInputChannels"]
)
super().__init__(source=source)
# self.adjustForNoise()
class BaseEnergyAndAudioRecorder:
def __init__(self, source, energy_threshold, dynamic_energy_threshold, record_timeout):
self.recorder = Recognizer()
self.recorder.energy_threshold = energy_threshold
self.recorder.dynamic_energy_threshold = dynamic_energy_threshold
self.record_timeout = record_timeout
self.stop = None
if source is None:
raise ValueError("audio source can't be None")
self.source = source
def adjustForNoise(self):
with self.source:
self.recorder.adjust_for_ambient_noise(self.source)
def recordIntoQueue(self, audio_queue, energy_queue=None):
def audioRecordCallback(_, audio):
audio_queue.put((audio.get_raw_data(), datetime.now()))
def energyRecordCallback(energy):
energy_queue.put(energy)
self.stop, self.pause, self.resume = self.recorder.listen_energy_and_audio_in_background(
source=self.source,
callback=audioRecordCallback,
phrase_time_limit=self.record_timeout,
callback_energy=energyRecordCallback if energy_queue is not None else None)
class SelectedMicEnergyAndAudioRecorder(BaseEnergyAndAudioRecorder):
def __init__(self, device, energy_threshold, dynamic_energy_threshold, record_timeout):
source=Microphone(
device_index=device['index'],
sample_rate=int(device["defaultSampleRate"]),
)
super().__init__(source=source, energy_threshold=energy_threshold, dynamic_energy_threshold=dynamic_energy_threshold, record_timeout=record_timeout)
# self.adjustForNoise()
class SelectedSpeakerEnergyAndAudioRecorder(BaseEnergyAndAudioRecorder):
def __init__(self, device, energy_threshold, dynamic_energy_threshold, record_timeout):
source = Microphone(speaker=True,
device_index= device["index"],
sample_rate=int(device["defaultSampleRate"]),
chunk_size=get_sample_size(paInt16),
channels=device["maxInputChannels"]
)
super().__init__(source=source, energy_threshold=energy_threshold, dynamic_energy_threshold=dynamic_energy_threshold, record_timeout=record_timeout)
# self.adjustForNoise()

View File

@@ -0,0 +1,141 @@
import time
from io import BytesIO
from threading import Event
import wave
from speech_recognition import Recognizer, AudioData, AudioFile
from datetime import timedelta
from pyaudiowpatch import get_sample_size, paInt16
from .transcription_languages import transcription_lang
from .transcription_whisper import getWhisperModel, checkWhisperWeight
import torch
import numpy as np
from pydub import AudioSegment
PHRASE_TIMEOUT = 3
MAX_PHRASES = 10
class AudioTranscriber:
def __init__(self, speaker, source, phrase_timeout, max_phrases, transcription_engine, root=None, whisper_weight_type=None):
self.speaker = speaker
self.phrase_timeout = phrase_timeout
self.max_phrases = max_phrases
self.transcript_data = []
self.transcript_changed_event = Event()
self.audio_recognizer = Recognizer()
self.transcription_engine = "Google"
self.whisper_model = None
self.audio_sources = {
"sample_rate": source.SAMPLE_RATE,
"sample_width": source.SAMPLE_WIDTH,
"channels": source.channels,
"last_sample": bytes(),
"last_spoken": None,
"new_phrase": True,
"process_data_func": self.processSpeakerData if speaker else self.processSpeakerData
}
if transcription_engine == "Whisper" and checkWhisperWeight(root, whisper_weight_type) is True:
self.whisper_model = getWhisperModel(root, whisper_weight_type)
self.transcription_engine = "Whisper"
def transcribeAudioQueue(self, audio_queue, language, country, avg_logprob=-0.8, no_speech_prob=0.6):
if audio_queue.empty():
time.sleep(0.01)
return False
audio, time_spoken = audio_queue.get()
self.updateLastSampleAndPhraseStatus(audio, time_spoken)
text = ''
try:
audio_data = self.audio_sources["process_data_func"]()
match self.transcription_engine:
case "Google":
text = self.audio_recognizer.recognize_google(audio_data, language=transcription_lang[language][country][self.transcription_engine])
case "Whisper":
audio_data = np.frombuffer(audio_data.get_raw_data(convert_rate=16000, convert_width=2), np.int16).flatten().astype(np.float32) / 32768.0
if isinstance(audio_data, torch.Tensor):
audio_data = audio_data.detach().numpy()
segments, _ = self.whisper_model.transcribe(
audio_data,
beam_size=5,
temperature=0.0,
log_prob_threshold=-0.8,
no_speech_threshold=0.6,
language=transcription_lang[language][country][self.transcription_engine],
word_timestamps=False,
without_timestamps=True,
task="transcribe",
vad_filter=False,
)
for s in segments:
if s.avg_logprob < avg_logprob or s.no_speech_prob > no_speech_prob:
continue
text += s.text
except Exception:
pass
finally:
pass
if text != '':
self.updateTranscript(text)
return True
def updateLastSampleAndPhraseStatus(self, data, time_spoken):
source_info = self.audio_sources
if source_info["last_spoken"] and time_spoken - source_info["last_spoken"] > timedelta(seconds=self.phrase_timeout):
source_info["last_sample"] = bytes()
source_info["new_phrase"] = True
else:
source_info["new_phrase"] = False
source_info["last_sample"] += data
source_info["last_spoken"] = time_spoken
def processMicData(self):
audio_data = AudioData(self.audio_sources["last_sample"], self.audio_sources["sample_rate"], self.audio_sources["sample_width"])
return audio_data
def processSpeakerData(self):
temp_file = BytesIO()
with wave.open(temp_file, 'wb') as wf:
wf.setnchannels(self.audio_sources["channels"])
wf.setsampwidth(get_sample_size(paInt16))
wf.setframerate(self.audio_sources["sample_rate"])
wf.writeframes(self.audio_sources["last_sample"])
temp_file.seek(0)
if self.audio_sources["channels"] > 2:
audio = AudioSegment.from_file(temp_file, format="wav")
mono_audio = audio.set_channels(1)
temp_file = BytesIO()
mono_audio.export(temp_file, format="wav")
temp_file.seek(0)
with AudioFile(temp_file) as source:
audio = self.audio_recognizer.record(source)
return audio
def updateTranscript(self, text):
source_info = self.audio_sources
transcript = self.transcript_data
if source_info["new_phrase"] or len(transcript) == 0:
if len(transcript) > self.max_phrases:
transcript.pop(-1)
transcript.insert(0, text)
else:
transcript[0] = text
def getTranscript(self):
if len(self.transcript_data) > 0:
text = self.transcript_data.pop(-1)
else:
text = ""
return text
def clearTranscriptData(self):
self.transcript_data.clear()
self.audio_sources["last_sample"] = bytes()
self.audio_sources["new_phrase"] = True

View File

@@ -0,0 +1,70 @@
from pyaudiowpatch import PyAudio, paWASAPI
def getInputDevices():
devices = {}
with PyAudio() as p:
for host_index in range(0, p.get_host_api_count()):
host = p.get_host_api_info_by_index(host_index)
for device_index in range(0, p.get_host_api_info_by_index(host_index)['deviceCount']):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device["maxInputChannels"] > 0 and device["isLoopbackDevice"] is False:
if host["name"] in devices.keys():
devices[host["name"]].append(device)
else:
devices[host["name"]] = [device]
if len(devices) == 0:
devices = {"NoHost": [{"name": "NoDevice"}]}
return devices
def getDefaultInputDevice():
with PyAudio() as p:
api_info = p.get_default_host_api_info()
defaultInputDevice = api_info["defaultInputDevice"]
for host_index in range(0, p.get_host_api_count()):
host = p.get_host_api_info_by_index(host_index)
for device_index in range(0, p.get_host_api_info_by_index(host_index)['deviceCount']):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device["index"] == defaultInputDevice:
return {"host": host, "device": device}
return {"host": {"name": "NoHost"}, "device": {"name": "NoDevice"}}
def getOutputDevices():
devices = []
with PyAudio() as p:
wasapi_info = p.get_host_api_info_by_type(paWASAPI)
for host_index in range(0, p.get_host_api_count()):
host = p.get_host_api_info_by_index(host_index)
if host["name"] == wasapi_info["name"]:
for device_index in range(0, p.get_host_api_info_by_index(host_index)['deviceCount']):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if not device["isLoopbackDevice"]:
for loopback in p.get_loopback_device_info_generator():
if device["name"] in loopback["name"]:
devices.append(loopback)
if len(devices) == 0:
devices = [{"name": "NoDevice"}]
else:
devices = [dict(t) for t in {tuple(d.items()) for d in devices}]
return devices
def getDefaultOutputDevice():
with PyAudio() as p:
wasapi_info = p.get_host_api_info_by_type(paWASAPI)
defaultOutputDevice = wasapi_info["defaultOutputDevice"]
for host_index in range(0, p.get_host_api_count()):
for device_index in range(0, p. get_host_api_info_by_index(host_index)['deviceCount']):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device["index"] == defaultOutputDevice:
default_speakers = device
if not default_speakers["isLoopbackDevice"]:
for loopback in p.get_loopback_device_info_generator():
if default_speakers["name"] in loopback["name"]:
return {"device": loopback}
return {"device": {"name": "NoDevice"}}
if __name__ == "__main__":
print("getOutputDevices()", getOutputDevices())
print("getDefaultOutputDevice()", getDefaultOutputDevice())

View File

@@ -0,0 +1,98 @@
from os import path as os_path, makedirs as os_makedirs
from requests import get as requests_get
from typing import Callable
import huggingface_hub
from faster_whisper import WhisperModel
import logging
logger = logging.getLogger('faster_whisper')
logger.setLevel(logging.CRITICAL)
_MODELS = {
"tiny": "Systran/faster-whisper-tiny",
"base": "Systran/faster-whisper-base",
"small": "Systran/faster-whisper-small",
"medium": "Systran/faster-whisper-medium",
"large-v1": "Systran/faster-whisper-large-v1",
"large-v2": "Systran/faster-whisper-large-v2",
"large-v3": "Systran/faster-whisper-large-v3",
}
_FILENAMES = [
"config.json",
"preprocessor_config.json",
"model.bin",
"tokenizer.json",
"vocabulary.txt",
"vocabulary.json",
]
def downloadFile(url, path, func=None):
try:
res = requests_get(url, stream=True)
res.raise_for_status()
file_size = int(res.headers.get('content-length', 0))
total_chunk = 0
with open(os_path.join(path), 'wb') as file:
for chunk in res.iter_content(chunk_size=1024*5):
file.write(chunk)
if isinstance(func, Callable):
total_chunk += len(chunk)
func(total_chunk/file_size)
except Exception as e:
print("error:downloadFile()", e)
def checkWhisperWeight(root, weight_type):
path = os_path.join(root, "weights", "whisper", weight_type)
result = False
try:
WhisperModel(
path,
device="cpu",
device_index=0,
compute_type="int8",
cpu_threads=4,
num_workers=1,
local_files_only=True,
)
result = True
except Exception:
pass
return result
def downloadWhisperWeight(root, weight_type, callbackFunc):
path = os_path.join(root, "weights", "whisper", weight_type)
os_makedirs(path, exist_ok=True)
if checkWhisperWeight(root, weight_type) is True:
return
for filename in _FILENAMES:
print("Downloading", filename, "...")
file_path = os_path.join(path, filename)
url = huggingface_hub.hf_hub_url(_MODELS[weight_type], filename)
downloadFile(url, file_path, func=callbackFunc)
def getWhisperModel(root, weight_type):
path = os_path.join(root, "weights", "whisper", weight_type)
return WhisperModel(
path,
device="cpu",
device_index=0,
compute_type="int8",
cpu_threads=4,
num_workers=1,
local_files_only=True,
)
if __name__ == "__main__":
def callback(value):
print(value)
pass
downloadWhisperWeight("./", "tiny", callback)
downloadWhisperWeight("./", "base", callback)
downloadWhisperWeight("./", "small", callback)
downloadWhisperWeight("./", "medium", callback)
downloadWhisperWeight("./", "large-v1", callback)
downloadWhisperWeight("./", "large-v2", callback)
downloadWhisperWeight("./", "large-v3", callback)

View File

@@ -0,0 +1,384 @@
translation_lang = {}
dict_deepl_languages = {
"Arabic":"ar",
"Bulgarian":"bg",
"Czech":"cs",
"Danish":"da",
"German":"de",
"Greek":"el",
"English":"en",
"Spanish":"es",
"Estonian":"et",
"Finnish":"fi",
"French":"fr",
"Irish":"ga",
"Croatian":"hr",
"Hungarian":"hu",
"Indonesian":"id",
"Icelandic":"is",
"Italian":"it",
"Japanese":"ja",
"Korean":"ko",
"Lithuanian":"lt",
"Latvian":"lv",
"Maltese":"mt",
"Bokmal":"nb",
"Dutch":"nl",
"Norwegian":"no",
"Polish":"pl",
"Portuguese":"pt",
"Romanian":"ro",
"Russian":"ru",
"Slovak":"sk",
"Slovenian":"sl",
"Swedish":"sv",
"Turkish":"tr",
"Ukrainian":"uk",
"Chinese Simplified":"zh",
"Chinese Traditional":"zh"
}
translation_lang["DeepL"] = {
"source":dict_deepl_languages,
"target":dict_deepl_languages,
}
dict_deepl_api_source_languages = {
"Japanese":"ja",
"English":"en",
"Bulgarian":"bg",
"Czech":"cs",
"Danish":"da",
"German":"de",
"Greek":"el",
"Spanish":"es",
"Estonian":"et",
"Finnish":"fi",
"French":"fr",
"Hungarian":"hu",
"Indonesian":"id",
"Italian":"it",
"Korean":"ko",
"Lithuanian":"lt",
"Latvian":"lv",
"Norwegian":"nb",
"Dutch":"nl",
"Polish":"pl",
"Portuguese":"pt",
"Romanian":"ro",
"Russian":"ru",
"Slovak":"sk",
"Slovenian":"sl",
"Swedish":"sv",
"Turkish":"tr",
"Ukrainian":"uk",
"Chinese Simplified":"zh",
"Chinese Traditional":"zh"
}
dict_deepl_api_target_languages = {
"Japanese":"ja",
"English American":"en-US",
"English British":"en-GB",
"Bulgarian":"bg",
"Czech":"cs",
"Danish":"da",
"German":"de",
"Greek":"el",
"English":"en",
"Spanish":"es",
"Estonian":"et",
"Finnish":"fi",
"French":"fr",
"Hungarian":"hu",
"Indonesian":"id",
"Italian":"it",
"Korean":"ko",
"Lithuanian":"lt",
"Latvian":"lv",
"Norwegian":"nb",
"Dutch":"nl",
"Polish":"pl",
"Portuguese Brazilian":"pt-BR",
"Portuguese European":"pt-PT",
"Romanian":"ro",
"Russian":"ru",
"Slovak":"sk",
"Slovenian":"sl",
"Swedish":"sv",
"Turkish":"tr",
"Ukrainian":"uk",
"Chinese Simplified":"zh",
"Chinese Traditional":"zh"
}
translation_lang["DeepL_API"] = {
"source": dict_deepl_api_source_languages,
"target": dict_deepl_api_target_languages,
}
dict_google_languages = {
"Japanese":"ja",
"English":"en",
"Chinese Simplified":"zh",
"Chinese Traditional":"zh-TW",
"Arabic":"ar",
"Russian":"ru",
"French":"fr",
"German":"de",
"Spanish":"es",
"Portuguese":"pt",
"Italian":"it",
"Korean":"ko",
"Greek":"el",
"Dutch":"nl",
"Hindi":"hi",
"Turkish":"tr",
"Malay":"ms",
"Thai":"th",
"Vietnamese":"vi",
"Indonesian":"id",
"Hebrew":"he",
"Polish":"pl",
"Mongolian":"mn",
"Czech":"cs",
"Hungarian":"hu",
"Estonian":"et",
"Bulgarian":"bg",
"Danish":"da",
"Finnish":"fi",
"Romanian":"ro",
"Swedish":"sv",
"Slovenian":"sl",
"Persian/Farsi":"fa",
"Bosnian":"bs",
"Serbian":"sr",
"Filipino":"tl",
"Haitiancreole":"ht",
"Catalan":"ca",
"Croatian":"hr",
"Latvian":"lv",
"Lithuanian":"lt",
"Urdu":"ur",
"Ukrainian":"uk",
"Welsh":"cy",
"Swahili":"sw",
"Samoan":"sm",
"Slovak":"sk",
"Afrikaans":"af",
"Norwegian":"no",
"Bengali":"bn",
"Malagasy":"mg",
"Maltese":"mt",
"Gujarati":"gu",
"Tamil":"ta",
"Telugu":"te",
"Punjabi":"pa",
"Amharic":"am",
"Azerbaijani":"az",
"Belarusian":"be",
"Cebuano":"ceb",
"Esperanto":"eo",
"Basque":"eu",
"Irish":"ga"
}
translation_lang["Google"] = {
"source":dict_google_languages,
"target":dict_google_languages,
}
dict_bing_languages = {
"Japanese":"ja",
"English":"en",
"Chinese Simplified":"zh",
"Chinese Traditional":"zh-Hant",
"Arabic":"ar",
"Russian":"ru",
"French":"fr",
"German":"de",
"Spanish":"es",
"Portuguese":"pt",
"Italian":"it",
"Korean":"ko",
"Greek":"el",
"Dutch":"nl",
"Hindi":"hi",
"Turkish":"tr",
"Malay":"ms",
"Thai":"th",
"Vietnamese":"vi",
"Indonesian":"id",
"Hebrew":"he",
"Polish":"pl",
"Czech":"cs",
"Hungarian":"hu",
"Estonian":"et",
"Bulgarian":"bg",
"Danish":"da",
"Finnish":"fi",
"Romanian":"ro",
"Swedish":"sv",
"Slovenian":"sl",
"Persian/Farsi":"fa",
"Bosnian":"bs",
"Serbian":"sr",
"Fijian":"fj",
"Filipino":"tl",
"Haitiancreole":"ht",
"Catalan":"ca",
"Croatian":"hr",
"Latvian":"lv",
"Lithuanian":"lt",
"Urdu":"ur",
"Ukrainian":"uk",
"Welsh":"cy",
"Tahiti":"ty",
"Tongan":"to",
"Swahili":"sw",
"Samoan":"sm",
"Slovak":"sk",
"Afrikaans":"af",
"Norwegian":"no",
"Bengali":"bn",
"Malagasy":"mg",
"Maltese":"mt",
"Queretaro otomi":"otq",
"Klingon/tlhingan Hol":"tlh",
"Gujarati":"gu",
"Tamil":"ta",
"Telugu":"te",
"Punjabi":"pa",
"Irish":"ga"
}
translation_lang["Bing"] = {
"source":dict_bing_languages,
"target":dict_bing_languages,
}
dict_papago_languages = {
"German": "de",
"English": "en",
"Spanish":"es",
"French": "fr",
"Hindi": "hi",
"Indonesian": "id",
"Italian": "it",
"Japanese": "ja",
"Korean": "ko",
"Portuguese": "pt",
"Russian": "ru",
"Thai": "th",
"Vietnamese": "vi",
"Chinese Simplified":"zh-CN",
"Chinese Traditional":"zh-TW",
}
translation_lang["Papago"] = {
"source":dict_papago_languages,
"target":dict_papago_languages,
}
dict_ctranslate2_languages = {
"English": "en",
"Chinese Simplified": "zh",
"Chinese Traditional":"zh",
"German": "de",
"Spanish": "es",
"Russian": "ru",
"Korean": "ko",
"French": "fr",
"Japanese": "ja",
"Portuguese": "pt",
"Turkish": "tr",
"Polish": "pl",
"Catalan": "ca",
"Dutch": "nl",
"Arabic": "ar",
"Swedish": "sv",
"Italian": "it",
"Indonesian": "id",
"Hindi": "hi",
"Finnish": "fi",
"Vietnamese": "vi",
"Hebrew": "he",
"Ukrainian": "uk",
"Greek": "el",
"Malay": "ms",
"Czech": "cs",
"Romanian": "ro",
"Danish": "da",
"Hungarian": "hu",
"Tamil": "ta",
"Norwegian": "no",
"Thai": "th",
"Urdu": "ur",
"Croatian": "hr",
"Bulgarian": "bg",
"Lithuanian": "lt",
"Latin": "la",
"Maori": "mi",
"Malayalam": "ml",
"Welsh": "cy",
"Slovak": "sk",
"Telugu": "te",
"Persian": "fa",
"Latvian": "lv",
"Bengali": "bn",
"Serbian": "sr",
"Azerbaijani": "az",
"Slovenian": "sl",
"Kannada": "kn",
"Estonian": "et",
"Macedonian": "mk",
"Breton": "br",
"Basque": "eu",
"Icelandic": "is",
"Armenian": "hy",
"Nepali": "ne",
"Mongolian": "mn",
"Bosnian": "bs",
"Kazakh": "kk",
"Albanian": "sq",
"Swahili": "sw",
"Galician": "gl",
"Marathi": "mr",
"Punjabi": "pa",
"Sinhala": "si",
"Khmer": "km",
"Shona": "sn",
"Yoruba": "yo",
"Somali": "so",
"Afrikaans": "af",
"Occitan": "oc",
"Georgian": "ka",
"Belarusian": "be",
"Tajik": "tg",
"Sindhi": "sd",
"Gujarati": "gu",
"Amharic": "am",
"Yiddish": "yi",
"Lao": "lo",
"Uzbek": "uz",
"Faroese": "fo",
"Haitian creole": "ht",
"Pashto": "ps",
"Turkmen": "tk",
"Nynorsk": "nn",
"Maltese": "mt",
"Sanskrit": "sa",
"Luxembourgish": "lb",
"Myanmar": "my",
"Tibetan": "bo",
"Filipino": "tl",
"Malagasy": "mg",
"Assamese": "as",
"Tatar": "tt",
"Hawaiian": "haw",
"Lingala": "ln",
"Hausa": "ha",
"Bashkir": "ba",
"Javanese": "jw",
"Sundanese": "su"
}
translation_lang["CTranslate2"] = {
"source":dict_ctranslate2_languages,
"target":dict_ctranslate2_languages,
}

View File

@@ -0,0 +1,140 @@
import os
from deepl import Translator as deepl_Translator
from translators import translate_text as other_web_Translator
from .translation_languages import translation_lang
from .translation_utils import ctranslate2_weights
import ctranslate2
import transformers
# Translator
class Translator():
def __init__(self):
self.deepl_client = None
self.ctranslate2_translator = None
self.ctranslate2_tokenizer = None
self.is_loaded_ctranslate2_model = False
def authenticationDeepLAuthKey(self, authkey):
result = True
try:
self.deepl_client = deepl_Translator(authkey)
self.deepl_client.translate_text(" ", target_lang="EN-US")
except Exception:
self.deepl_client = None
result = False
return result
def changeCTranslate2Model(self, path, model_type):
self.is_loaded_ctranslate2_model = False
directory_name = ctranslate2_weights[model_type]["directory_name"]
tokenizer = ctranslate2_weights[model_type]["tokenizer"]
weight_path = os.path.join(path, "weights", "ctranslate2", directory_name)
tokenizer_path = os.path.join(path, "weights", "ctranslate2", directory_name, "tokenizer")
self.ctranslate2_translator = ctranslate2.Translator(
weight_path,
device="cpu",
device_index=0,
compute_type="int8",
inter_threads=1,
intra_threads=4
)
try:
self.ctranslate2_tokenizer = transformers.AutoTokenizer.from_pretrained(tokenizer, cache_dir=tokenizer_path)
except Exception as e:
print("Error: changeCTranslate2Model()", e)
tokenizer_path = os.path.join("./weights", "ctranslate2", directory_name, "tokenizer")
self.ctranslate2_tokenizer = transformers.AutoTokenizer.from_pretrained(tokenizer, cache_dir=tokenizer_path)
self.is_loaded_ctranslate2_model = True
def isLoadedCTranslate2Model(self):
return self.is_loaded_ctranslate2_model
def translateCTranslate2(self, message, source_language, target_language):
result = False
if self.is_loaded_ctranslate2_model is True:
try:
self.ctranslate2_tokenizer.src_lang = source_language
source = self.ctranslate2_tokenizer.convert_ids_to_tokens(self.ctranslate2_tokenizer.encode(message))
target_prefix = [self.ctranslate2_tokenizer.lang_code_to_token[target_language]]
results = self.ctranslate2_translator.translate_batch([source], target_prefix=[target_prefix])
target = results[0].hypotheses[0][1:]
result = self.ctranslate2_tokenizer.decode(self.ctranslate2_tokenizer.convert_tokens_to_ids(target))
except Exception:
pass
return result
@staticmethod
def getLanguageCode(translator_name, target_country, source_language, target_language):
match translator_name:
case "DeepL_API":
if target_language == "English":
if target_country in ["United States", "Canada", "Philippines"]:
target_language = "English American"
else:
target_language = "English British"
elif target_language == "Portuguese":
if target_country in ["Portugal"]:
target_language = "Portuguese European"
else:
target_language = "Portuguese Brazilian"
case _:
pass
source_language=translation_lang[translator_name]["source"][source_language]
target_language=translation_lang[translator_name]["target"][target_language]
return source_language, target_language
def translate(self, translator_name, source_language, target_language, target_country, message):
try:
result = ""
source_language, target_language = self.getLanguageCode(translator_name, target_country, source_language, target_language)
match translator_name:
case "DeepL":
result = other_web_Translator(
query_text=message,
translator="deepl",
from_language=source_language,
to_language=target_language,
)
case "DeepL_API":
if self.deepl_client is None:
result = False
else:
result = self.deepl_client.translate_text(
message,
source_lang=source_language,
target_lang=target_language,
).text
case "Google":
result = other_web_Translator(
query_text=message,
translator="google",
from_language=source_language,
to_language=target_language,
)
case "Bing":
result = other_web_Translator(
query_text=message,
translator="bing",
from_language=source_language,
to_language=target_language,
)
case "Papago":
result = other_web_Translator(
query_text=message,
translator="papago",
from_language=source_language,
to_language=target_language,
)
case "CTranslate2":
result = self.translateCTranslate2(
message=message,
source_language=source_language,
target_language=target_language,
)
except Exception:
import traceback
with open('error.log', 'a') as f:
traceback.print_exc(file=f)
result = False
return result

View File

@@ -0,0 +1,86 @@
import tempfile
from zipfile import ZipFile
from os import path as os_path
from os import makedirs as os_makedirs
from requests import get as requests_get
from typing import Callable
import hashlib
ctranslate2_weights = {
"Small": { # M2M-100 418M-parameter model
"url": "https://github.com/misyaguziya/VRCT-weights/releases/download/v1.0/m2m100_418m.zip",
"directory_name": "m2m100_418m",
"tokenizer": "facebook/m2m100_418M",
"hash": {
"model.bin": "e7c26a9abb5260abd0268fbe3040714070dec254a990b4d7fd3f74c5230e3acb",
"sentencepiece.model": "d8f7c76ed2a5e0822be39f0a4f95a55eb19c78f4593ce609e2edbc2aea4d380a",
"shared_vocabulary.txt": "bd440aa21b8ca3453fc792a0018a1f3fe68b3464aadddd4d16a4b72f73c86d8c",
}
},
"Large": { # M2M-100 1.2B-parameter model
"url": "https://github.com/misyaguziya/VRCT-weights/releases/download/v1.0/m2m100_12b.zip",
"directory_name": "m2m100_12b",
"tokenizer": "facebook/m2m100_1.2b",
"hash": {
"model.bin": "abb7bf4ba7e5e016b6e3ed480c752459b2f783ac8fca372e7587675e5bf3a919",
"sentencepiece.model": "d8f7c76ed2a5e0822be39f0a4f95a55eb19c78f4593ce609e2edbc2aea4d380a",
"shared_vocabulary.txt": "bd440aa21b8ca3453fc792a0018a1f3fe68b3464aadddd4d16a4b72f73c86d8c",
}
},
}
def calculate_file_hash(file_path, block_size=65536):
hash_object = hashlib.sha256()
with open(file_path, 'rb') as file:
for block in iter(lambda: file.read(block_size), b''):
hash_object.update(block)
return hash_object.hexdigest()
def checkCTranslate2Weight(path, weight_type="Small"):
weight_directory_name = ctranslate2_weights[weight_type]["directory_name"]
hash_data = ctranslate2_weights[weight_type]["hash"]
files = [
"model.bin",
"sentencepiece.model",
"shared_vocabulary.txt"
]
# check already downloaded
already_downloaded = False
if all(os_path.exists(os_path.join(path, weight_directory_name, file)) for file in files):
# check hash
for file in files:
original_hash = hash_data[file]
current_hash = calculate_file_hash(os_path.join(path, weight_directory_name, file))
if original_hash != current_hash:
break
already_downloaded = True
return already_downloaded
def downloadCTranslate2Weight(root, weight_type="Small", func=None):
url = ctranslate2_weights[weight_type]["url"]
filename = "weight.zip"
path = os_path.join(root, "weights", "ctranslate2")
os_makedirs(path, exist_ok=True)
if checkCTranslate2Weight(path, weight_type):
return
try:
with tempfile.TemporaryDirectory() as tmp_path:
res = requests_get(url, stream=True)
file_size = int(res.headers.get('content-length', 0))
total_chunk = 0
with open(os_path.join(tmp_path, filename), 'wb') as file:
for chunk in res.iter_content(chunk_size=1024*5):
file.write(chunk)
if isinstance(func, Callable):
total_chunk += len(chunk)
func(total_chunk/file_size)
with ZipFile(os_path.join(tmp_path, filename)) as zf:
zf.extractall(path)
except Exception as e:
print("error:downloadCTranslate2Weight()", e)

View File

@@ -0,0 +1,73 @@
# ###########################################################################################################################
# DOCUMENT:https://xiexe.github.io/XSOverlayDocumentation/#/NotificationsAPI
# SOURCE:https://zenn.dev/eeharumt/scraps/95f49a62dd809a
# messageType: int = 0 # 1: ポップアップ通知, 2: メディアプレーヤー情報
# index: int = 0 # メディアプレーヤーでのみ使用され、手首のアイコンを変更する
# timeout: float = 0.5 # 通知インジケータが表示され続ける時間[秒]
# height: float = 175 # 通知インジケータの高さ
# opacity: float = 1 # 通知インジケータの透明度。0.0-1.0の範囲で低いほど透明に
# volume: float = 0.7 # 通知音の大きさ
# audioPath: str = "" # 通知音ファイルのパス。規定音として"default", "error", "warning"を指定可能。空文字列で通知音なしにできる。
# title: str = "" # 通知タイトル、リッチテキストフォーマットをサポート。
# content: str = "" # 通知内容、リッチテキストフォーマットをサポート。省略することで小サイズ通知となる。
# useBase64Icon: bool = False # TrueにすることでBase64の画像を表示する
# icon: str = "" # Base64画像イメージまたは画像ファイルパス。規定アイコンとして"default", "error", or "warning"を指定可能
# sourceApp: str = "" # 通知したアプリ名(デバック用)
# ##########################################################################################################################
import socket
import json
import base64
from os import path as os_path
def XSOverlay(
endpoint:tuple=("127.0.0.1", 42069), messageType:int=1, index:int=0, timeout:float=2,
height:float=120.0, opacity:float=1.0, volume:float=0.0, audioPath:str="",
title:str="", content:str="", useBase64Icon:bool=False, icon:str="default", sourceApp:str=""
) -> int:
if icon in ["default", "error", "warning"]:
icon_data = icon
elif useBase64Icon:
try:
with open(icon, "rb") as f:
icon_data_bytes = f.read()
icon_data = base64.b64encode(icon_data_bytes).decode("utf-8")
except Exception:
icon_data = "default"
else:
icon_data = icon
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
data_msg = {
"messageType": messageType,
"index": index,
"timeout":timeout,
"height": height,
"opacity": opacity,
"volume": volume,
"audioPath": audioPath,
"title": title,
"content": content,
"useBase64Icon": useBase64Icon,
"icon": icon_data,
"sourceApp": sourceApp,
}
msg_str = json.dumps(data_msg)
response = sock.sendto(msg_str.encode("utf-8"), endpoint)
sock.close()
return response
def xsoverlayForVRCT(content:str="") -> int:
response = XSOverlay(
title="VRCT",
content=content,
useBase64Icon=True,
icon=os_path.join(os_path.dirname(__file__), "img", "xsoverlay2.png"),
sourceApp="VRCT"
)
return response
if __name__ == "__main__":
xsoverlayForVRCT(content="notification test")

72
src-python/utils.py Normal file
View File

@@ -0,0 +1,72 @@
import random
from typing import Union
from os import path as os_path, rename as os_rename
from PIL.Image import open as Image_open
def getImageFile(file_name):
img = Image_open(os_path.join(os_path.dirname(__file__), "img", file_name))
return img
def getKeyByValue(dictionary, value):
for key, val in dictionary.items():
if val == value:
return key
return None
def callFunctionIfCallable(function, *args):
if callable(function) is True:
function(*args)
def isEven(number):
return number % 2 == 0
def makeEven(number, minus:bool=False):
if minus is True:
return number if isEven(number) else number - 1
return number if isEven(number) else number + 1
def generatePercentageStringsList(start:int, end:int, step:int):
strings = []
for percent in range(start, end + 1, step):
strings.append(f"{percent}%")
return strings
def intToPctStr(value:int):
return f"{value}%"
def floatToPctStr(value:float):
return f"{int(value*100)}%"
def strPctToInt(value:str):
return int(value.replace("%", ""))
def isUniqueStrings(unique_strings:Union[str, list], input_string:str, require=False):
import re
if isinstance(unique_strings, str):
unique_strings = [unique_strings]
patterns = [re.escape(s) for s in unique_strings]
counts = [len(re.findall(pattern, input_string)) for pattern in patterns]
if require is True:
# If require is True, unique_strings must appear once
return all(count == 1 for count in counts) and counts.count(1) == 2
else:
# If require is False, check if unique strings are used exactly once
return all(count == 1 for count in counts)
# path先のweightフォルダがある場合にはそのフォルダ名をweightsに変更する
def renameWeightFolder(path):
weight_path = os_path.join(path, "weight")
if os_path.exists(weight_path):
os_rename(weight_path, os_path.join(path, "weights"))
def splitList(lst:list, split_count:int, to_shuffle:bool=False):
if to_shuffle is True:
random.shuffle(lst)
split_lists = []
for i in range(0, len(lst), split_count):
sub_list = lst[i:i+split_count]
split_lists.append(sub_list)
return split_lists

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,274 @@
import warnings
warnings.simplefilter('ignore', RuntimeWarning)
import sys
import json
import time
from config import config
import webui_controller as controller
config_mapping = {
"/config/version": "VERSION",
"/config/transparency_range": "TRANSPARENCY_RANGE",
"/config/appearance_theme_list": "APPEARANCE_THEME_LIST",
"/config/ui_scaling_list": "UI_SCALING_LIST",
"/config/textbox_ui_scaling_range": "TEXTBOX_UI_SCALING_RANGE",
"/config/message_box_ratio_range": "MESSAGE_BOX_RATIO_RANGE",
"/config/selectable_ui_languages_dict": "SELECTABLE_UI_LANGUAGES_DICT",
"/config/selectable_ctranslate2_weight_type_dict": "SELECTABLE_CTRANSLATE2_WEIGHT_TYPE_DICT",
"/config/selectable_whisper_weight_type_dict": "SELECTABLE_WHISPER_WEIGHT_TYPE_DICT",
"/config/max_mic_energy_threshold": "MAX_MIC_ENERGY_THRESHOLD",
"/config/max_speaker_energy_threshold": "MAX_SPEAKER_ENERGY_THRESHOLD",
"/config/enable_translation": "ENABLE_TRANSLATION",
"/config/enable_transcription_send": "ENABLE_TRANSCRIPTION_SEND",
"/config/enable_transcription_receive": "ENABLE_TRANSCRIPTION_RECEIVE",
"/config/enable_foreground": "ENABLE_FOREGROUND",
"/config/source_country": "SOURCE_COUNTRY",
"/config/source_language": "SOURCE_LANGUAGE",
"/config/target_country": "TARGET_COUNTRY",
"/config/target_language": "TARGET_LANGUAGE",
"/config/choice_input_translator": "CHOICE_INPUT_TRANSLATOR",
"/config/choice_output_translator": "CHOICE_OUTPUT_TRANSLATOR",
"/config/is_reset_button_displayed_for_translation": "IS_RESET_BUTTON_DISPLAYED_FOR_TRANSLATION",
"/config/is_reset_button_displayed_for_whisper": "IS_RESET_BUTTON_DISPLAYED_FOR_WHISPER",
"/config/selected_tab_no": "SELECTED_TAB_NO",
"/config/selected_tab_your_translator_engines": "SELECTED_TAB_YOUR_TRANSLATOR_ENGINES",
"/config/selected_tab_target_translator_engines": "SELECTED_TAB_TARGET_TRANSLATOR_ENGINES",
"/config/selected_tab_your_languages": "SELECTED_TAB_YOUR_LANGUAGES",
"/config/selected_tab_target_languages": "SELECTED_TAB_TARGET_LANGUAGES",
"/config/selected_transcription_engine": "SELECTED_TRANSCRIPTION_ENGINE",
"/config/is_main_window_sidebar_compact_mode": "IS_MAIN_WINDOW_SIDEBAR_COMPACT_MODE",
"/config/transparency": "TRANSPARENCY",
"/config/appearance_theme": "APPEARANCE_THEME",
"/config/ui_scaling": "UI_SCALING",
"/config/textbox_ui_scaling": "TEXTBOX_UI_SCALING",
"/config/message_box_ratio": "MESSAGE_BOX_RATIO",
"/config/font_family": "FONT_FAMILY",
"/config/ui_language": "UI_LANGUAGE",
"/config/enable_restore_main_window_geometry": "ENABLE_RESTORE_MAIN_WINDOW_GEOMETRY",
"/config/main_window_geometry": "MAIN_WINDOW_GEOMETRY",
"/config/choice_mic_host": "CHOICE_MIC_HOST",
"/config/choice_mic_device": "CHOICE_MIC_DEVICE",
"/config/input_mic_energy_threshold": "INPUT_MIC_ENERGY_THRESHOLD",
"/config/input_mic_dynamic_energy_threshold": "INPUT_MIC_DYNAMIC_ENERGY_THRESHOLD",
"/config/input_mic_record_timeout": "INPUT_MIC_RECORD_TIMEOUT",
"/config/input_mic_phrase_timeout": "INPUT_MIC_PHRASE_TIMEOUT",
"/config/input_mic_max_phrases": "INPUT_MIC_MAX_PHRASES",
"/config/input_mic_word_filter": "INPUT_MIC_WORD_FILTER",
"/config/input_mic_avg_logprob": "INPUT_MIC_AVG_LOGPROB",
"/config/input_mic_no_speech_prob": "INPUT_MIC_NO_SPEECH_PROB",
"/config/choice_speaker_device": "CHOICE_SPEAKER_DEVICE",
"/config/input_speaker_energy_threshold": "INPUT_SPEAKER_ENERGY_THRESHOLD",
"/config/input_speaker_dynamic_energy_threshold": "INPUT_SPEAKER_DYNAMIC_ENERGY_THRESHOLD",
"/config/input_speaker_record_timeout": "INPUT_SPEAKER_RECORD_TIMEOUT",
"/config/input_speaker_phrase_timeout": "INPUT_SPEAKER_PHRASE_TIMEOUT",
"/config/input_speaker_max_phrases": "INPUT_SPEAKER_MAX_PHRASES",
"/config/input_speaker_avg_logprob": "INPUT_SPEAKER_AVG_LOGPROB",
"/config/input_speaker_no_speech_prob": "INPUT_SPEAKER_NO_SPEECH_PROB",
"/config/osc_ip_address": "OSC_IP_ADDRESS",
"/config/osc_port": "OSC_PORT",
"/config/auth_keys": "AUTH_KEYS",
"/config/use_translation_feature": "USE_TRANSLATION_FEATURE",
"/config/use_whisper_feature": "USE_WHISPER_FEATURE",
"/config/ctranslate2_weight_type": "CTRANSLATE2_WEIGHT_TYPE",
"/config/whisper_weight_type": "WHISPER_WEIGHT_TYPE",
"/config/enable_auto_clear_message_box": "ENABLE_AUTO_CLEAR_MESSAGE_BOX",
"/config/enable_send_only_translated_messages": "ENABLE_SEND_ONLY_TRANSLATED_MESSAGES",
"/config/send_message_button_type": "SEND_MESSAGE_BUTTON_TYPE",
"/config/enable_notice_xsoverlay": "ENABLE_NOTICE_XSOVERLAY",
"/config/overlay_settings": "OVERLAY_SETTINGS",
"/config/enable_overlay_small_log": "ENABLE_OVERLAY_SMALL_LOG",
"/config/overlay_small_log_settings": "OVERLAY_SMALL_LOG_SETTINGS",
"/config/overlay_ui_type": "OVERLAY_UI_TYPE",
"/config/enable_send_message_to_vrc": "ENABLE_SEND_MESSAGE_TO_VRC",
"/config/send_message_format": "SEND_MESSAGE_FORMAT",
"/config/send_message_format_with_t": "SEND_MESSAGE_FORMAT_WITH_T",
"/config/received_message_format": "RECEIVED_MESSAGE_FORMAT",
"/config/received_message_format_with_t": "RECEIVED_MESSAGE_FORMAT_WITH_T",
"/config/enable_speaker2chatbox_pass": "ENABLE_SPEAKER2CHATBOX_PASS",
"/config/enable_send_received_message_to_vrc": "ENABLE_SEND_RECEIVED_MESSAGE_TO_VRC",
"/config/enable_logger": "ENABLE_LOGGER",
"/config/enable_vrc_mic_mute_sync": "ENABLE_VRC_MIC_MUTE_SYNC",
"/config/is_config_window_compact_mode": "IS_CONFIG_WINDOW_COMPACT_MODE",
}
controller_mapping = {
"/controller/list_language_and_country": controller.getListLanguageAndCountry,
"/controller/list_mic_host": controller.getListInputHost,
"/controller/list_mic_device": controller.getListInputDevice,
"/controller/list_speaker_device": controller.getListOutputDevice,
# "/controller/callback_update_software": controller.callbackUpdateSoftware,
# "/controller/callback_restart_software": controller.callbackRestartSoftware,
"/controller/callback_filepath_logs": controller.callbackFilepathLogs,
"/controller/callback_filepath_config_file": controller.callbackFilepathConfigFile,
"/controller/callback_open_config_window": controller.callbackOpenConfigWindow,
"/controller/callback_close_config_window": controller.callbackCloseConfigWindow,
"/controller/callback_enable_main_window_sidebar_compact_mode": controller.callbackEnableMainWindowSidebarCompactMode,
"/controller/callback_disable_main_window_sidebar_compact_mode": controller.callbackDisableMainWindowSidebarCompactMode,
"/controller/callback_enable_translation": controller.callbackEnableTranslation,
"/controller/callback_disable_translation": controller.callbackDisableTranslation,
"/controller/callback_enable_transcription_send": controller.callbackEnableTranscriptionSend,
"/controller/callback_disable_transcription_send": controller.callbackDisableTranscriptionSend,
"/controller/callback_enable_transcription_receive": controller.callbackEnableTranscriptionReceive,
"/controller/callback_disable_transcription_receive": controller.callbackDisableTranscriptionReceive,
"/controller/callback_enable_foreground": controller.callbackEnableForeground,
"/controller/callback_disable_foreground": controller.callbackDisableForeground,
"/controller/set_your_language_and_country": controller.setYourLanguageAndCountry,
"/controller/set_target_language_and_country": controller.setTargetLanguageAndCountry,
"/controller/swap_your_language_and_target_language": controller.swapYourLanguageAndTargetLanguage,
"/controller/callback_selected_language_preset_tab": controller.callbackSelectedLanguagePresetTab,
"/controller/callback_selected_translation_engine": controller.callbackSelectedTranslationEngine,
"/controller/callback_disable_config_window_compact_mode": controller.callbackEnableConfigWindowCompactMode,
"/controller/callback_enable_config_window_compact_mode": controller.callbackDisableConfigWindowCompactMode,
"/controller/callback_set_transparency": controller.callbackSetTransparency,
"/controller/callback_set_appearance": controller.callbackSetAppearance,
"/controller/callback_set_ui_scaling": controller.callbackSetUiScaling,
"/controller/callback_set_textbox_ui_scaling": controller.callbackSetTextboxUiScaling,
"/controller/callback_set_message_box_ratio": controller.callbackSetMessageBoxRatio,
"/controller/callback_set_font_family": controller.callbackSetFontFamily,
"/controller/callback_set_ui_language": controller.callbackSetUiLanguage,
"/controller/callback_set_enable_restore_main_window_geometry": controller.callbackSetEnableRestoreMainWindowGeometry,
"/controller/callback_set_use_translation_feature": controller.callbackSetUseTranslationFeature,
"/controller/callback_set_ctranslate2_weight_type": controller.callbackSetCtranslate2WeightType,
"/controller/callback_set_deepl_auth_key": controller.callbackSetDeeplAuthKey,
"/controller/callback_clear_deepl_auth_key": controller.callbackClearDeeplAuthKey,
"/controller/callback_set_mic_host": controller.callbackSetMicHost,
"/controller/callback_set_mic_device": controller.callbackSetMicDevice,
"/controller/callback_set_mic_energy_threshold": controller.callbackSetMicEnergyThreshold,
"/controller/callback_set_mic_dynamic_energy_threshold": controller.callbackSetMicDynamicEnergyThreshold,
"/controller/callback_enable_check_mic_threshold": controller.callbackEnableCheckMicThreshold,
"/controller/callback_disable_check_mic_threshold": controller.callbackDisableCheckMicThreshold,
"/controller/callback_set_mic_record_timeout": controller.callbackSetMicRecordTimeout,
"/controller/callback_set_mic_phrase_timeout": controller.callbackSetMicPhraseTimeout,
"/controller/callback_set_mic_max_phrases": controller.callbackSetMicMaxPhrases,
"/controller/callback_set_mic_word_filter": controller.callbackSetMicWordFilter,
"/controller/callback_delete_mic_word_filter": controller.callbackDeleteMicWordFilter,
"/controller/callback_set_speaker_device": controller.callbackSetSpeakerDevice,
"/controller/callback_set_speaker_energy_threshold": controller.callbackSetSpeakerEnergyThreshold,
"/controller/callback_set_speaker_dynamic_energy_threshold": controller.callbackSetSpeakerDynamicEnergyThreshold,
"/controller/callback_check_speaker_threshold": controller.callbackCheckSpeakerThreshold,
"/controller/callback_set_speaker_record_timeout": controller.callbackSetSpeakerRecordTimeout,
"/controller/callback_set_speaker_phrase_timeout": controller.callbackSetSpeakerPhraseTimeout,
"/controller/callback_set_speaker_max_phrases": controller.callbackSetSpeakerMaxPhrases,
"/controller/callback_set_use_whisper_feature": controller.callbackSetUserWhisperFeature,
"/controller/callback_set_whisper_weight_type": controller.callbackSetWhisperWeightType,
"/controller/callback_set_overlay_settings": controller.callbackSetOverlaySettings,
"/controller/callback_set_enable_overlay_small_log": controller.callbackSetEnableOverlaySmallLog,
"/controller/callback_set_overlay_small_log_settings": controller.callbackSetOverlaySmallLogSettings,
"/controller/callback_set_enable_auto_clear_chatbox": controller.callbackSetEnableAutoClearMessageBox,
"/controller/callback_set_send_only_translated_messages": controller.callbackSetEnableSendOnlyTranslatedMessages,
"/controller/callback_set_send_message_button_type": controller.callbackSetSendMessageButtonType,
"/controller/callback_set_enable_notice_xsoverlay": controller.callbackSetEnableNoticeXsoverlay,
"/controller/callback_set_enable_auto_export_message_logs": controller.callbackSetEnableAutoExportMessageLogs,
"/controller/callback_set_enable_vrc_mic_mute_sync": controller.callbackSetEnableVrcMicMuteSync,
"/controller/callback_set_enable_send_message_to_vrc": controller.callbackSetEnableSendMessageToVrc,
"/controller/callback_set_send_message_format": controller.callbackSetSendMessageFormat,
"/controller/callback_set_send_message_format_with_t": controller.callbackSetSendMessageFormatWithT,
"/controller/callback_set_received_message_format": controller.callbackSetReceivedMessageFormat,
"/controller/callback_set_received_message_format_with_t": controller.callbackSetReceivedMessageFormatWithT,
"/controller/callback_set_enable_send_received_message_to_vrc": controller.callbackSetEnableSendReceivedMessageToVrc,
"/controller/callback_set_osc_ip_address": controller.callbackSetOscIpAddress,
"/controller/callback_set_osc_port": controller.callbackSetOscPort,
}
action_mapping = {
"/controller/callback_enable_transcription_send": "/action/transcription_send_message",
"/controller/callback_disable_transcription_send": "/action/transcription_send_stopped",
"/controller/callback_enable_transcription_receive": "/action/transcription_receive_message",
"/controller/callback_disable_transcription_receive": "/action/transcription_receive_stopped",
"/controller/callback_enable_check_mic_threshold": "/action/check_mic_threshold_energy",
}
def handleConfigRequest(endpoint):
handler = config_mapping.get(endpoint)
if handler is None:
response = "Invalid endpoint"
status = 404
else:
response = getattr(config, handler)
status = 200
return response, status
def handleControllerRequest(endpoint, data=None):
handler = controller_mapping.get(endpoint)
if handler is None:
response = "Invalid endpoint"
status = 404
else:
action_endpoint = action_mapping.get(endpoint, None)
if action_endpoint is not None:
response = handler(data, Action(action_endpoint).transmit)
else:
response = handler(data)
status = 200
return response, status
class Action:
def __init__(self, endpoint:str) -> None:
self.endpoint = endpoint
def transmit(self, data:dict) -> None:
response = {
"endpoint": self.endpoint,
"status": 200,
"data": data,
}
response = json.dumps(response)
print(response, flush=True)
def main():
received_data = sys.stdin.readline().strip()
received_data = json.loads(received_data)
if received_data is True:
response_data = {
"status": "ok",
"id": received_data["id"],
"data": received_data["data"],
}
response = json.dumps(response_data)
time.sleep(2)
print(response, flush=True)
# endpoint = received_data.get("endpoint", None)
# data = received_data.get("data", None)
# match endpoint.split("/")[1]:
# case "config":
# response_data, status = handleConfigRequest(endpoint, data)
# case "controller":
# response_data, status = handleControllerRequest(endpoint, data)
# case _:
# pass
# response = {
# "status": status,
# "endpoint": endpoint,
# "data": response_data,
# }
# response = json.dumps(response)
# time.sleep(2)
# print(response, flush=True)
if __name__ == "__main__":
# endpoint = "/controller/list_mic_host"
# data = None
# response_data, status = handleControllerRequest(endpoint, data)
# response = {
# "status": status,
# "endpoint": endpoint,
# "data": response_data,
# }
# response = json.dumps(response)
# print(response, flush=True)
try:
print(json.dumps({"init_key_from_py": "Initialization from Python."}), flush=True)
while True:
main()
except Exception:
import traceback
with open('error.log', 'a') as f:
traceback.print_exc(file=f)