[Update] 機能部分をmodelsフォルダに移動

This commit is contained in:
misygauziya
2023-08-20 01:20:30 +09:00
parent 31e786f2a8
commit 02d8fbdb1c
14 changed files with 392 additions and 388 deletions

View File

@@ -0,0 +1,91 @@
transcription_lang = {
"Japanese (Japan)":"ja-JP",
"English (United States)":"en-US",
"English (United Kingdom)":"en-GB",
"Afrikaans (South Africa)":"af-ZA",
"Arabic (Algeria)":"ar-DZ",
"Arabic (Bahrain)":"ar-BH",
"Arabic (Egypt)":"ar-EG",
"Arabic (Israel)":"ar-IL",
"Arabic (Iraq)":"ar-IQ",
"Arabic (Jordan)":"ar-JO",
"Arabic (Kuwait)":"ar-KW",
"Arabic (Lebanon)":"ar-LB",
"Arabic (Morocco)":"ar-MA",
"Arabic (Oman)":"ar-OM",
"Arabic (State of Palestine)":"ar-PS",
"Arabic (Qatar)":"ar-QA",
"Arabic (Saudi Arabia)":"ar-SA",
"Arabic (Tunisia)":"ar-TN",
"Arabic (United Arab Emirates)":"ar-AE",
"Basque (Spain)":"eu-ES",
"Bulgarian (Bulgaria)":"bg-BG",
"Catalan (Spain)":"ca-ES",
"Chinese, Mandarin (Simplified, China)":"cmn-Hans-CN",
"Chinese, Mandarin (Simplified, Hong Kong)":"cmn-Hans-HK",
"Chinese, Mandarin (Traditional, Taiwan)":"cmn-Hant-TW",
"Chinese, Cantonese (Traditional Hong Kong)":"yue-Hant-HK",
"Croatian (Croatia)":"hr-HR",
"Czech (Czech Republic)":"cs-CZ",
"Danish (Denmark)":"da-DK",
"English (Australia)":"en-AU",
"English (Canada)":"en-CA",
"English (India)":"en-IN",
"English (Ireland)":"en-IE",
"English (New Zealand)":"en-NZ",
"English (Philippines)":"en-PH",
"English (South Africa)":"en-ZA",
"Persian (Iran)":"fa-IR",
"French (France)":"fr-FR",
"Filipino (Philippines)":"fil-PH",
"Galician (Spain)":"gl-ES",
"German (Germany)":"de-DE",
"Greek (Greece)":"el-GR",
"Finnish (Finland)":"fi-FI",
"Hebrew (Israel)":"he-IL",
"Hindi (India)":"hi-IN",
"Hungarian (Hungary)":"hu-HU",
"Indonesian (Indonesia)":"id-ID",
"Icelandic (Iceland)":"is-IS",
"Italian (Italy)":"it-IT",
"Italian (Switzerland)":"it-CH",
"Korean (South Korea)":"ko-KR",
"Lithuanian (Lithuania)":"lt-LT",
"Malay (Malaysia)":"ms-MY",
"Dutch (Netherlands)":"nl-NL",
"Norwegian Bokmål (Norway)":"nb-NO",
"Polish (Poland)":"pl-PL",
"Portuguese (Brazil)":"pt-BR",
"Portuguese (Portugal)":"pt-PT",
"Romanian (Romania)":"ro-RO",
"Russian (Russia)":"ru-RU",
"Serbian (Serbia)":"sr-RS",
"Slovak (Slovakia)":"sk-SK",
"Slovenian (Slovenia)":"sl-SI",
"Spanish (Argentina)":"es-AR",
"Spanish (Bolivia)":"es-BO",
"Spanish (Chile)":"es-CL",
"Spanish (Colombia)":"es-CO",
"Spanish (Costa Rica)":"es-CR",
"Spanish (Dominican Republic)":"es-DO",
"Spanish (Ecuador)":"es-EC",
"Spanish (El Salvador)":"es-SV",
"Spanish (Guatemala)":"es-GT",
"Spanish (Honduras)":"es-HN",
"Spanish (Mexico)":"es-MX",
"Spanish (Nicaragua)":"es-NI",
"Spanish (Panama)":"es-PA",
"Spanish (Paraguay)":"es-PY",
"Spanish (Peru)":"es-PE",
"Spanish (Puerto Rico)":"es-PR",
"Spanish (Spain)":"es-ES",
"Spanish (Uruguay)":"es-UY",
"Spanish (United States)":"es-US",
"Spanish (Venezuela)":"es-VE",
"Swedish (Sweden)":"sv-SE",
"Thai (Thailand)":"th-TH",
"Turkish (Turkey)":"tr-TR",
"Ukrainian (Ukraine)":"uk-UA",
"Vietnamese (Vietnam)":"vi-VN",
"Zulu (South Africa)":"zu-ZA"
}

View File

@@ -0,0 +1,91 @@
from speech_recognition import Recognizer, Microphone
from pyaudiowpatch import get_sample_size, paInt16
from datetime import datetime
class BaseRecorder:
def __init__(self, source, energy_threshold, dynamic_energy_threshold, record_timeout):
self.recorder = Recognizer()
self.recorder.energy_threshold = energy_threshold
self.recorder.dynamic_energy_threshold = dynamic_energy_threshold
self.record_timeout = record_timeout
self.stop = None
if source is None:
raise ValueError("audio source can't be None")
self.source = source
def adjust_for_noise(self):
with self.source:
self.recorder.adjust_for_ambient_noise(self.source)
def record_into_queue(self, audio_queue):
def record_callback(_, audio):
audio_queue.put((audio.get_raw_data(), datetime.now()))
self.stop = self.recorder.listen_in_background(self.source, record_callback, phrase_time_limit=self.record_timeout)
class SelectedMicRecorder(BaseRecorder):
def __init__(self, device, energy_threshold, dynamic_energy_threshold, record_timeout):
source=Microphone(
device_index=device['index'],
sample_rate=int(device["defaultSampleRate"]),
)
super().__init__(source=source, energy_threshold=energy_threshold, dynamic_energy_threshold=dynamic_energy_threshold, record_timeout=record_timeout)
# self.adjust_for_noise()
class SelectedSpeakerRecorder(BaseRecorder):
def __init__(self, device, energy_threshold, dynamic_energy_threshold, record_timeout):
source = Microphone(speaker=True,
device_index= device["index"],
sample_rate=int(device["defaultSampleRate"]),
chunk_size=get_sample_size(paInt16),
channels=device["maxInputChannels"]
)
super().__init__(source=source, energy_threshold=energy_threshold, dynamic_energy_threshold=dynamic_energy_threshold, record_timeout=record_timeout)
# self.adjust_for_noise()
class BaseEnergyRecorder:
def __init__(self, source):
self.recorder = Recognizer()
self.recorder.energy_threshold = 0
self.recorder.dynamic_energy_threshold = False
self.record_timeout = 0
self.stop = None
if source is None:
raise ValueError("audio source can't be None")
self.source = source
def adjust_for_noise(self):
with self.source:
self.recorder.adjust_for_ambient_noise(self.source)
def record_into_queue(self, energy_queue):
def record_callback(_, energy):
energy_queue.put(energy)
self.stop = self.recorder.listen_energy_in_background(self.source, record_callback)
class SelectedMicEnergyRecorder(BaseEnergyRecorder):
def __init__(self, device):
source=Microphone(
device_index=device['index'],
sample_rate=int(device["defaultSampleRate"]),
)
super().__init__(source=source)
# self.adjust_for_noise()
class SelectedSpeakeEnergyRecorder(BaseEnergyRecorder):
def __init__(self, device):
source = Microphone(speaker=True,
device_index= device["index"],
sample_rate=int(device["defaultSampleRate"]),
chunk_size=get_sample_size(paInt16),
channels=device["maxInputChannels"]
)
super().__init__(source=source)
# self.adjust_for_noise()

View File

@@ -0,0 +1,98 @@
from io import BytesIO
from threading import Event
import wave
from speech_recognition import Recognizer, AudioData, AudioFile
from datetime import timedelta
from pyaudiowpatch import get_sample_size, paInt16
from .transcription_languages import transcription_lang
PHRASE_TIMEOUT = 3
MAX_PHRASES = 10
class AudioTranscriber:
def __init__(self, speaker, source, phrase_timeout, max_phrases):
self.speaker = speaker
self.phrase_timeout = phrase_timeout
self.max_phrases = max_phrases
self.transcript_data = []
self.transcript_changed_event = Event()
self.audio_recognizer = Recognizer()
self.audio_sources = {
"sample_rate": source.SAMPLE_RATE,
"sample_width": source.SAMPLE_WIDTH,
"channels": source.channels,
"last_sample": bytes(),
"last_spoken": None,
"new_phrase": True,
"process_data_func": self.process_speaker_data if speaker else self.process_speaker_data
}
def transcribe_audio_queue(self, audio_queue, language):
# while True:
audio, time_spoken = audio_queue.get()
self.update_last_sample_and_phrase_status(audio, time_spoken)
text = ''
try:
# fd, path = tempfile.mkstemp(suffix=".wav")
# os.close(fd)
audio_data = self.audio_sources["process_data_func"]()
text = self.audio_recognizer.recognize_google(audio_data, language=transcription_lang[language])
except Exception as e:
pass
finally:
pass
# os.unlink(path)
if text != '':
self.update_transcript(text)
def update_last_sample_and_phrase_status(self, data, time_spoken):
source_info = self.audio_sources
if source_info["last_spoken"] and time_spoken - source_info["last_spoken"] > timedelta(seconds=self.phrase_timeout):
source_info["last_sample"] = bytes()
source_info["new_phrase"] = True
else:
source_info["new_phrase"] = False
source_info["last_sample"] += data
source_info["last_spoken"] = time_spoken
def process_mic_data(self):
audio_data = AudioData(self.audio_sources["last_sample"], self.audio_sources["sample_rate"], self.audio_sources["sample_width"])
return audio_data
def process_speaker_data(self):
temp_file = BytesIO()
with wave.open(temp_file, 'wb') as wf:
wf.setnchannels(self.audio_sources["channels"])
wf.setsampwidth(get_sample_size(paInt16))
wf.setframerate(self.audio_sources["sample_rate"])
wf.writeframes(self.audio_sources["last_sample"])
temp_file.seek(0)
with AudioFile(temp_file) as source:
audio = self.audio_recognizer.record(source)
return audio
def update_transcript(self, text):
source_info = self.audio_sources
transcript = self.transcript_data
if source_info["new_phrase"] or len(transcript) == 0:
if len(transcript) > self.max_phrases:
transcript.pop(-1)
transcript.insert(0, text)
else:
transcript[0] = text
def get_transcript(self):
if len(self.transcript_data) > 0:
text = self.transcript_data.pop(-1)
else:
text = ""
return text
def clear_transcript_data(self):
self.transcript_data.clear()
self.audio_sources["last_sample"] = bytes()
self.audio_sources["new_phrase"] = True

View File

@@ -0,0 +1,52 @@
from pyaudiowpatch import PyAudio, paWASAPI
def get_input_device_list():
devices = {}
with PyAudio() as p:
for host_index in range(0, p.get_host_api_count()):
host = p.get_host_api_info_by_index(host_index)
for device_index in range(0, p.get_host_api_info_by_index(host_index)['deviceCount']):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device["maxInputChannels"] > 0 and device["isLoopbackDevice"] is False:
if host["name"] in devices.keys():
devices[host["name"]].append(device)
else:
devices[host["name"]] = [device]
return devices
def get_output_device_list():
devices =[]
with PyAudio() as p:
wasapi_info = p.get_host_api_info_by_type(paWASAPI)
for device in p.get_loopback_device_info_generator():
if device["hostApi"] == wasapi_info["index"] and device["isLoopbackDevice"] is True:
devices.append(device)
return devices
def get_default_input_device():
with PyAudio() as p:
api_info = p.get_default_host_api_info()
defaultInputDevice = api_info["defaultInputDevice"]
for host_index in range(0, p.get_host_api_count()):
host = p.get_host_api_info_by_index(host_index)
for device_index in range(0, p. get_host_api_info_by_index(host_index)['deviceCount']):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device["index"] == defaultInputDevice:
return {"host":host, "device": device}
def get_default_output_device():
with PyAudio() as p:
wasapi_info = p.get_host_api_info_by_type(paWASAPI)
defaultOutputDevice = wasapi_info["defaultOutputDevice"]
for host_index in range(0, p.get_host_api_count()):
for device_index in range(0, p. get_host_api_info_by_index(host_index)['deviceCount']):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device["index"] == defaultOutputDevice:
default_speakers = device
if not default_speakers["isLoopbackDevice"]:
for loopback in p.get_loopback_device_info_generator():
if default_speakers["name"] in loopback["name"]:
default_device = loopback
return default_device