update transcription mic/spk
This commit is contained in:
121
transcription.py
121
transcription.py
@@ -1,3 +1,8 @@
|
||||
import io
|
||||
import queue
|
||||
import numpy as np
|
||||
import soundcard as sc
|
||||
import soundfile as sf
|
||||
import sounddevice as sd
|
||||
import speech_recognition as sr
|
||||
|
||||
@@ -5,7 +10,6 @@ import speech_recognition as sr
|
||||
class VoiceRecognizer():
|
||||
def __init__(self):
|
||||
self.r = sr.Recognizer()
|
||||
self.mic = None
|
||||
self.languages = [
|
||||
"ja-JP","en-US","en-GB","af-ZA","ar-DZ","ar-BH","ar-EG","ar-IL","ar-IQ","ar-JO","ar-KW","ar-LB","ar-MA",
|
||||
"ar-OM","ar-PS","ar-QA","ar-SA","ar-TN","ar-AE","eu-ES","bg-BG","ca-ES","cmn-Hans-CN","cmn-Hans-HK",
|
||||
@@ -16,6 +20,18 @@ class VoiceRecognizer():
|
||||
"es-NI","es-PA","es-PY","es-PE","es-PR","es-ES","es-UY","es-US","es-VE","sv-SE","th-TH","tr-TR","uk-UA",
|
||||
"vi-VN","zu-ZA"
|
||||
]
|
||||
self.mic = None
|
||||
self.enable_mic_recognize = False
|
||||
self.queue_mic = queue.Queue()
|
||||
|
||||
self.spk_device_name = None
|
||||
self.spk_sample_rate = 16000
|
||||
self.spk_interval = 3
|
||||
self.spk_buffer_size = 4096
|
||||
self.spk_language = "en-US"
|
||||
self.spk_audio = np.empty(self.spk_sample_rate * self.spk_interval + self.spk_buffer_size, dtype=np.float32)
|
||||
self.n = 0
|
||||
self.queue_spk = queue.Queue()
|
||||
|
||||
def search_input_device(self):
|
||||
device_list = sd.query_devices()
|
||||
@@ -28,21 +44,20 @@ class VoiceRecognizer():
|
||||
return input_device_list
|
||||
|
||||
def search_output_device(self):
|
||||
device_list = sd.query_devices()
|
||||
device_list = sc.all_speakers()
|
||||
output_device_list = []
|
||||
|
||||
for device in device_list:
|
||||
if device["max_output_channels"] > 0:
|
||||
output_device_list.append({"name": device["name"], "index": device["index"]})
|
||||
output_device_list.append(str(device.name))
|
||||
|
||||
return output_device_list
|
||||
|
||||
def search_default_device_index(self):
|
||||
def search_default_device(self):
|
||||
device_list = sd.query_devices()
|
||||
default_device_list = []
|
||||
for i in sd.default.device:
|
||||
default_device_list.append({"name": device_list[i]["name"], "index": device_list[i]["index"]})
|
||||
return default_device_list
|
||||
mic_index = sd.default.device[0]
|
||||
name_mic = device_list[mic_index]["name"]
|
||||
name_spk = str(sc.default_speaker().name)
|
||||
return name_mic, name_spk
|
||||
|
||||
def set_mic(self, device_name, threshold=50, is_dynamic=False):
|
||||
input_device_list = self.search_input_device()
|
||||
@@ -68,14 +83,86 @@ class VoiceRecognizer():
|
||||
else:
|
||||
return False
|
||||
|
||||
def listen_voice(self, language):
|
||||
def listen_mic(self):
|
||||
if self.mic != None:
|
||||
with self.mic as source:
|
||||
audio = self.r.listen(source)
|
||||
try:
|
||||
text = self.r.recognize_google(audio, language=language)
|
||||
return text
|
||||
except:
|
||||
return ""
|
||||
else:
|
||||
return False
|
||||
self.queue_mic.put(audio)
|
||||
|
||||
def recognize_mic(self, language):
|
||||
try:
|
||||
audio = self.queue_mic.get()
|
||||
text = self.r.recognize_google(audio, language=language)
|
||||
except:
|
||||
text = ""
|
||||
return text
|
||||
|
||||
def set_spk(self, device_name=str(sc.default_speaker().name), sample_rate=16000, interval=3, buffer_size=4096, language="en-US"):
|
||||
self.spk_device_name = device_name
|
||||
self.spk_sample_rate = sample_rate
|
||||
self.spk_interval = interval
|
||||
self.spk_buffer_size = buffer_size
|
||||
self.spk_language = language
|
||||
|
||||
def init_spk(self):
|
||||
self.spk_audio = np.empty(self.spk_sample_rate * self.spk_interval + self.spk_buffer_size, dtype=np.float32)
|
||||
self.n = 0
|
||||
|
||||
def listen_spk(self):
|
||||
audio = self.spk_audio
|
||||
n = self.n
|
||||
with sc.get_microphone(id=self.spk_device_name, include_loopback=True).recorder(samplerate=self.spk_sample_rate, channels=1) as source:
|
||||
while n < self.spk_sample_rate * self.spk_interval:
|
||||
data = source.record(self.spk_buffer_size)
|
||||
audio[n:n+len(data)] = data.reshape(-1)
|
||||
n += len(data)
|
||||
m = n * 4 // 5
|
||||
vol = np.convolve(audio[m:n] ** 2, np.ones(100) / 100, 'same')
|
||||
m += vol.argmin()
|
||||
audio_prev = audio.copy()
|
||||
self.queue_spk.put(audio[:m])
|
||||
audio = np.empty(self.spk_sample_rate * self.spk_interval + self.spk_buffer_size, dtype=np.float32)
|
||||
audio[:n-m] = audio_prev[m:n]
|
||||
n = n-m
|
||||
self.spk_audio = audio
|
||||
self.n = n
|
||||
|
||||
def recognize_spk(self):
|
||||
try:
|
||||
audio = self.queue_spk.get()
|
||||
with io.BytesIO() as memory_file:
|
||||
sf.write(file=memory_file, data=audio, format="WAV", samplerate=self.spk_sample_rate)
|
||||
memory_file.seek(0)
|
||||
with sr.AudioFile(memory_file) as source:
|
||||
audio = self.r.record(source)
|
||||
text = self.r.recognize_google(audio, language=self.spk_language)
|
||||
except Exception as e:
|
||||
text = ""
|
||||
return text
|
||||
|
||||
if __name__ == "__main__":
|
||||
import time
|
||||
import threading
|
||||
|
||||
vr = VoiceRecognizer()
|
||||
mic_name, spk_name = vr.search_default_device()
|
||||
vr.spk_enable_recognize = True
|
||||
vr.set_spk(language="ja-JP")
|
||||
vr.init_spk()
|
||||
|
||||
def vr_listen_spk():
|
||||
while True:
|
||||
vr.listen_spk()
|
||||
|
||||
def vr_recognize_spk():
|
||||
while True:
|
||||
text = vr.recognize_spk()
|
||||
print(text)
|
||||
|
||||
th_vr_listen_spk = threading.Thread(target=vr_listen_spk)
|
||||
th_vr_recognize_spk = threading.Thread(target=vr_recognize_spk)
|
||||
th_vr_listen_spk.start()
|
||||
th_vr_recognize_spk.start()
|
||||
|
||||
while True:
|
||||
time.sleep(60)
|
||||
Reference in New Issue
Block a user