Files
VRCT/transcription.py
2023-06-27 22:36:09 +09:00

201 lines
8.4 KiB
Python

import queue
import speech_recognition as sr
import pyaudiowpatch as pyaudio
# VoiceRecognizer
class VoiceRecognizer():
def __init__(self):
self.r = sr.Recognizer()
self.p = pyaudio.PyAudio()
self.languages = [
"ja-JP","en-US","en-GB","af-ZA","ar-DZ","ar-BH","ar-EG","ar-IL","ar-IQ","ar-JO","ar-KW","ar-LB","ar-MA",
"ar-OM","ar-PS","ar-QA","ar-SA","ar-TN","ar-AE","eu-ES","bg-BG","ca-ES","cmn-Hans-CN","cmn-Hans-HK",
"cmn-Hant-TW","yue-Hant-HK","hr_HR","cs-CZ","da-DK","en-AU","en-CA","en-IN","en-IE","en-NZ","en-PH",
"en-ZA","fa-IR","fr-FR","fil-PH","gl-ES","de-DE","el-GR","fi-FI","he-IL","hi-IN","hu-HU","id-ID","is-IS",
"it-IT","it-CH","ko-KR","lt-LT","ms-MY","nl-NL","nb-NO","pl-PL","pt-BR","pt-PT","ro-RO","ru-RU","sr-RS",
"sk-SK","sl-SI","es-AR","es-BO","es-CL","es-CO","es-CR","es-DO","es-EC","es-SV","es-GT","es-HN","es-MX",
"es-NI","es-PA","es-PY","es-PE","es-PR","es-ES","es-UY","es-US","es-VE","sv-SE","th-TH","tr-TR","uk-UA",
"vi-VN","zu-ZA"
]
self.mic_device_name = None
self.mic_threshold = 50
self.mic_is_dynamic = False
self.mic_language = "ja-JP"
self.mic_queue = queue.Queue(10)
self.spk_device = None
self.spk_interval = 3
self.spk_language = "ja-JP"
self.spk_stream = None
self.spk_queue = queue.Queue(10)
def search_input_device(self):
devices = []
with pyaudio.PyAudio() as p:
wasapi_info = p.get_host_api_info_by_type(pyaudio.paWASAPI)
for host_index in range(0, p.get_host_api_count()):
for device_index in range(0, p. get_host_api_info_by_index(host_index)['deviceCount']):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device["hostApi"] == wasapi_info["index"] and device["maxInputChannels"] > 0 and device["isLoopbackDevice"] is False:
devices.append(device)
return devices
def search_output_device(self):
devices =[]
with pyaudio.PyAudio() as p:
wasapi_info = p.get_host_api_info_by_type(pyaudio.paWASAPI)
for host_index in range(0, p.get_host_api_count()):
for device_index in range(0, p. get_host_api_info_by_index(host_index)['deviceCount']):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device["hostApi"] == wasapi_info["index"] and device["isLoopbackDevice"] is True:
devices.append(device)
return devices
def search_default_device(self):
with pyaudio.PyAudio() as p:
wasapi_info = p.get_host_api_info_by_type(pyaudio.paWASAPI)
defaultInputDevice, defaultOutputDevice = wasapi_info["defaultInputDevice"], wasapi_info["defaultOutputDevice"]
for host_index in range(0, p.get_host_api_count()):
for device_index in range(0, p. get_host_api_info_by_index(host_index)['deviceCount']):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device["index"] == defaultInputDevice:
default_mics = device
name_mic = default_mics["name"]
break
for host_index in range(0, p.get_host_api_count()):
for device_index in range(0, p. get_host_api_info_by_index(host_index)['deviceCount']):
device = p.get_device_info_by_host_api_device_index(host_index, device_index)
if device["index"] == defaultOutputDevice:
default_speakers = device
if not default_speakers["isLoopbackDevice"]:
for loopback in p.get_loopback_device_info_generator():
if default_speakers["name"] in loopback["name"]:
name_spk = loopback["name"]
break
return name_mic, name_spk
def set_mic(self, device_name, threshold=50, is_dynamic=False, language="ja-JP"):
input_device_list = self.search_input_device()
self.mic_device_name = [device["index"] for device in input_device_list if device["name"] == device_name][0]
self.mic_threshold = threshold
self.mic_is_dynamic = is_dynamic
self.mic_language = language
def init_mic(self):
while self.mic_queue.empty() is False:
self.mic_queue.get()
self.r.energy_threshold = self.mic_threshold
if self.mic_is_dynamic:
with self.mic as source:
self.r.adjust_for_ambient_noise(source, 3.0)
def listen_mic(self):
with sr.Microphone(device_index=self.mic_device_name) as source:
audio = self.r.listen(source)
self.mic_queue.put(audio)
def recognize_mic(self):
try:
audio = self.mic_queue.get()
text = self.r.recognize_google(audio, language=self.mic_language)
except:
text = ""
return text
def set_spk(self, device_name, interval, language):
output_device_list = self.search_output_device()
self.spk_device = [device for device in output_device_list if device["name"] == device_name][0]
self.spk_interval = interval
self.spk_language = language
def init_spk(self):
while self.spk_queue.empty() is False:
self.spk_queue.get()
def spk_record_callback(self, in_data, frame_count, time_info, status):
self.spk_queue.put(in_data)
return (in_data, pyaudio.paContinue)
def start_spk_recording(self):
self.close_spk_stream()
self.spk_stream = self.p.open(format=pyaudio.paInt16,
channels=self.spk_device["maxInputChannels"],
rate=int(self.spk_device["defaultSampleRate"]),
frames_per_buffer=int(self.spk_device["defaultSampleRate"])*self.spk_interval,
input=True,
input_device_index=self.spk_device["index"],
stream_callback=self.spk_record_callback
)
def stop_spk_stream(self):
self.spk_stream.stop_stream()
def start_spk_stream(self):
self.spk_stream.start_stream()
def close_spk_stream(self):
if self.spk_stream is not None:
self.spk_stream.stop_stream()
self.spk_stream.close()
self.spk_stream = None
def recognize_spk(self):
try:
in_data = self.spk_queue.get()
audio_data = sr.AudioData(in_data, int(self.spk_device["defaultSampleRate"]), self.spk_interval)
text = self.r.recognize_google(audio_data, language=self.spk_language)
except:
text = ""
return text
if __name__ == "__main__":
# import queue
# import threading
# mic_queue = queue.Queue()
# spk_queue = queue.Queue()
# vr = VoiceRecognizer(mic_queue, spk_queue)
# mic_name, spk_name = vr.search_default_device()
# print("mic_name", mic_name)
# print("spk_name", spk_name)
# ###############################################################
# vr.set_mic(device_name=mic_name, threshold=300, is_dynamic=False, language="ja-JP")
# vr.init_mic()
# def vr_listen_mic():
# while True:
# vr.listen_mic()
# def vr_recognize_mic():
# while True:
# text = vr.recognize_mic()
# if len(text) > 0:
# print(text)
# th_vr_listen_mic = threading.Thread(target=vr_listen_mic)
# th_vr_listen_mic.start()
# th_vr_recognize_mic = threading.Thread(target=vr_recognize_mic)
# th_vr_recognize_mic.start()
# ###############################################################
# ###############################################################
# vr.set_spk(device_name=spk_name, interval=4, language="ja-JP")
# vr.start_spk_recording()
# def vr_recognize_spk():
# while True:
# text = vr.recognize_spk()
# if len(text) > 0:
# print(text)
# th_vr_recognize_spk = threading.Thread(target=vr_recognize_spk)
# th_vr_recognize_spk.start()
# ###############################################################
vr = VoiceRecognizer()
print(vr.search_input_device())
print(vr.search_default_device())