154 lines
5.7 KiB
Python
154 lines
5.7 KiB
Python
import io
|
|
import queue
|
|
import numpy as np
|
|
import soundcard as sc
|
|
import soundfile as sf
|
|
import sounddevice as sd
|
|
import speech_recognition as sr
|
|
|
|
# VoiceRecognizer
|
|
class VoiceRecognizer():
|
|
def __init__(self):
|
|
self.r = sr.Recognizer()
|
|
self.languages = [
|
|
"ja-JP","en-US","en-GB","af-ZA","ar-DZ","ar-BH","ar-EG","ar-IL","ar-IQ","ar-JO","ar-KW","ar-LB","ar-MA",
|
|
"ar-OM","ar-PS","ar-QA","ar-SA","ar-TN","ar-AE","eu-ES","bg-BG","ca-ES","cmn-Hans-CN","cmn-Hans-HK",
|
|
"cmn-Hant-TW","yue-Hant-HK","hr_HR","cs-CZ","da-DK","en-AU","en-CA","en-IN","en-IE","en-NZ","en-PH",
|
|
"en-ZA","fa-IR","fr-FR","fil-PH","gl-ES","de-DE","el-GR","fi-FI","he-IL","hi-IN","hu-HU","id-ID","is-IS",
|
|
"it-IT","it-CH","ko-KR","lt-LT","ms-MY","nl-NL","nb-NO","pl-PL","pt-BR","pt-PT","ro-RO","ru-RU","sr-RS",
|
|
"sk-SK","sl-SI","es-AR","es-BO","es-CL","es-CO","es-CR","es-DO","es-EC","es-SV","es-GT","es-HN","es-MX",
|
|
"es-NI","es-PA","es-PY","es-PE","es-PR","es-ES","es-UY","es-US","es-VE","sv-SE","th-TH","tr-TR","uk-UA",
|
|
"vi-VN","zu-ZA"
|
|
]
|
|
self.mic_device_name = None
|
|
self.mic_threshold = 50
|
|
self.mic_is_dynamic = False
|
|
self.mic_queue = queue.Queue()
|
|
|
|
self.spk_device_name = None
|
|
self.spk_sample_rate = 16000
|
|
self.spk_interval = 3
|
|
self.spk_buffer_size = 4096
|
|
self.spk_audio = np.empty(self.spk_sample_rate * self.spk_interval + self.spk_buffer_size, dtype=np.float32)
|
|
self.n = 0
|
|
self.spk_queue = queue.Queue()
|
|
|
|
def search_input_device(self):
|
|
device_list = sd.query_devices()
|
|
input_device_list = []
|
|
|
|
for device in device_list:
|
|
if device["max_input_channels"] > 0:
|
|
input_device_list.append({"name": device["name"], "index": device["index"]})
|
|
|
|
return input_device_list
|
|
|
|
def search_output_device(self):
|
|
device_list = sc.all_speakers()
|
|
output_device_list = []
|
|
|
|
for device in device_list:
|
|
output_device_list.append(str(device.name))
|
|
|
|
return output_device_list
|
|
|
|
def search_default_device(self):
|
|
device_list = sd.query_devices()
|
|
mic_index = sd.default.device[0]
|
|
name_mic = device_list[mic_index]["name"]
|
|
name_spk = str(sc.default_speaker().name)
|
|
return name_mic, name_spk
|
|
|
|
def set_mic(self, device_name, threshold=50, is_dynamic=False):
|
|
input_device_list = self.search_input_device()
|
|
self.mic_device_name = [device["index"] for device in input_device_list if device["name"] == device_name][0]
|
|
self.mic_threshold = threshold
|
|
self.mic_is_dynamic = is_dynamic
|
|
|
|
def init_mic(self):
|
|
self.r.energy_threshold = self.mic_threshold
|
|
if self.mic_is_dynamic:
|
|
with self.mic as source:
|
|
self.r.adjust_for_ambient_noise(source, 3.0)
|
|
|
|
def listen_mic(self):
|
|
with sr.Microphone(device_index=self.mic_device_name) as source:
|
|
audio = self.r.listen(source)
|
|
self.mic_queue.put(audio)
|
|
|
|
def recognize_mic(self, language):
|
|
try:
|
|
audio = self.mic_queue.get()
|
|
text = self.r.recognize_google(audio, language=language)
|
|
except:
|
|
text = ""
|
|
return text
|
|
|
|
def set_spk(self, device_name=str(sc.default_speaker().name), sample_rate=16000, interval=3, buffer_size=4096):
|
|
self.spk_device_name = device_name
|
|
self.spk_sample_rate = sample_rate
|
|
self.spk_interval = interval
|
|
self.spk_buffer_size = buffer_size
|
|
|
|
def init_spk(self):
|
|
self.spk_audio = np.empty(self.spk_sample_rate * self.spk_interval + self.spk_buffer_size, dtype=np.float32)
|
|
self.n = 0
|
|
|
|
def listen_spk(self):
|
|
audio = self.spk_audio
|
|
n = self.n
|
|
with sc.get_microphone(id=self.spk_device_name, include_loopback=True).recorder(samplerate=self.spk_sample_rate, channels=1) as source:
|
|
while n < self.spk_sample_rate * self.spk_interval:
|
|
data = source.record(self.spk_buffer_size)
|
|
audio[n:n+len(data)] = data.reshape(-1)
|
|
n += len(data)
|
|
m = n * 4 // 5
|
|
vol = np.convolve(audio[m:n] ** 2, np.ones(100) / 100, 'same')
|
|
m += vol.argmin()
|
|
audio_prev = audio.copy()
|
|
self.spk_queue.put(audio[:m])
|
|
audio = np.empty(self.spk_sample_rate * self.spk_interval + self.spk_buffer_size, dtype=np.float32)
|
|
audio[:n-m] = audio_prev[m:n]
|
|
n = n-m
|
|
self.spk_audio = audio
|
|
self.n = n
|
|
|
|
def recognize_spk(self, language):
|
|
try:
|
|
audio = self.spk_queue.get()
|
|
with io.BytesIO() as memory_file:
|
|
sf.write(file=memory_file, data=audio, format="WAV", samplerate=self.spk_sample_rate)
|
|
memory_file.seek(0)
|
|
with sr.AudioFile(memory_file) as source:
|
|
audio = self.r.record(source)
|
|
text = self.r.recognize_google(audio, language=language)
|
|
except:
|
|
text = ""
|
|
return text
|
|
|
|
if __name__ == "__main__":
|
|
import time
|
|
import threading
|
|
|
|
vr = VoiceRecognizer()
|
|
mic_name, spk_name = vr.search_default_device()
|
|
vr.spk_enable_recognize = True
|
|
vr.set_spk(language="ja-JP")
|
|
vr.init_spk()
|
|
|
|
def vr_listen_spk():
|
|
while True:
|
|
vr.listen_spk()
|
|
|
|
def vr_recognize_spk():
|
|
while True:
|
|
text = vr.recognize_spk()
|
|
print(text)
|
|
|
|
th_vr_listen_spk = threading.Thread(target=vr_listen_spk)
|
|
th_vr_recognize_spk = threading.Thread(target=vr_recognize_spk)
|
|
th_vr_listen_spk.start()
|
|
th_vr_recognize_spk.start()
|
|
|
|
while True:
|
|
time.sleep(60) |