先日の視覚に続き、聴覚(耳)を手に入れたいと思います。お試しというよりもしっかりと使える(と思える)ものにしていきたいと思います。使用するのは、speech_recognition と pyaudioです。
聴く耳(マイク)を手に入れる
やはりロボットには、人間の喋った言葉を理解する能力が必要不可欠です。今回は、Raspberry Pi 4Bに、以下のマイクを使って音声認識をしてみたいと思います。(マイクはどれでも大丈夫だと思います。)
まずは、マイクを認識させたいと思います。マイクを接続して、次のコマンドでハードウェアデバイスの情報を表示します。
$ arecord -l
**** ハードウェアデバイス CAPTURE のリスト ****
カード 2: Device [USB Audio Device], デバイス 0: USB Audio [USB Audio]
サブデバイス: 1/1
サブデバイス #0: subdevice #0
一度、テストのために音声を録音して再生したいと思います。なお、音声の再生にはスピーカも必要になります。
$ arecord out.wav
録音中 WAV 'out.wav' : Unsigned 8 bit, レート 8000 Hz, モノラル
上記のコマンドを実行後にマイクに向かってしゃべってください。録音の終了は、「Ctrl」キー+「c」キーで中断させて終了します。
特に何もしていませんが、基本的には繋ぐだけで認識するようですね。
音声認識プログラムの作成
必要なソフトウェアをインストールしていきたいと思います。私の場合は、後からあれが足りない、これが足りないと悩みながらインストールしましたが、一発で揃えたいと思います。(Raspberry Pi OSで試した結果なので、別ディストリビューションだと他にも不足パッケージはあるかもしれないです)
pip install --upgrade pip
sudo apt-get install portaudio19-dev flac
「portaudio19-dev」は pyaudio をインストールする際に必要になります。「flac」は speech_recognition を動かす際に必要になります。続いて、pip install を行います。
pip install speechrecognition pyaudio
プログラム作成のための準備は、少しサンプルプログラムを作成して使い方を習得します。
import pyaudio
pa = pyaudio.PyAudio()
print(pa.get_default_host_api_info())
input(">> ")
for i in range(pa.get_device_count()):
painfo = pa.get_device_info_by_index(i)
print("Device index:", i)
print(painfo)
pa.terminate()
pyaudio を認識しているデバイスの一覧を取得しています。少し気になるのは、「pa = pyaudio.PyAudio()」の行を実行すると、大量のデバッグメッセージのようなものが表示されます。
ホストAPI情報では、nameに「ALSA」と表示されるので、サウンドデバイスは、入力も出力もALSA(Advanced Linux Sound Architecture)の上で動いているみたいですね。ホストAPI情報の後で「>>」プロンプトに対して「Enter」を入力すると、デバイス情報が表示されます。実行したRaspberry Pi 4B では、12のサウンドデバイスが表示されました。おそらく実際に音を入出力が出来るデバイスは、0~4までで後は仮想デバイスのように思えます。この中からマイクを見つけないといけないのですが、それぞれのデバイス情報の中に「maxInputChannels」という情報があり、スピーカーのような出力デバイスは「0」、マイクのような入力デバイスは「1以上」になっています。これでマイクの入力デバイスを検出したいと思います。
以下が今回作成した、聞いたことを書き出すプログラム「WriteByListen」です。
import speech_recognition as sr
import pyaudio
import numpy as np
import time
from datetime import datetime
from pynput.keyboard import Key, Listener
# 定数設定
CHUNK_SIZE = 1024 # チャンクサイズ
CHANNEL_IN = 1 # インプットチャンネル(1:モノラル、2:ステレオ)
THRESHOLD = 0.3 # 音量のしきい値
RECORD_SECONDS = 1.8 # 何秒間無音なら録音を止めるか
# 聞いたことを書き出すプログラム
class WriteByListen:
def __init__(self,channel_in=CHANNEL_IN,rate=0,chunk_size=CHUNK_SIZE):
# 音声認識用インスタンスを作成する
self.listener = sr.Recognizer()
# PyAudioを作成する
self.pa = pyaudio.PyAudio()
# デバッグメッセージが大量に出力される為、空行を出力して送る
print("The above is a pyaudio debug message, so please ignore is.")
for i in range(20):
print()
# デバイス番号を初期化
self.device_no = 0
# マイクを探す
for i in range(self.pa.get_device_count()):
painfo = self.pa.get_device_info_by_index(i)
if painfo['maxInputChannels'] > 0:
self.device_no = i
self.device_info = painfo
# チャンネル:モノラルなら1 ステレオなら2(初期値は1)
self.channels = channel_in
# レート(周波数)
# 引数のレートが0ならデバイス情報のデフォルトサンプルレートを使う
if rate == 0:
self.rate = int(painfo['defaultSampleRate'])
else:
self.rate = rate
# チャンク(1回あたりの読み込みデータサイズ)
self.chunk = chunk_size
break
# マイクが見つからなかった場合は、終了する。
if self.device_no == 0:
print("Nof found microphone device!")
sys.exit(0)
# 最初に見つかったマイクを使用する
print("Microphone device infomation")
print("device index: ", self.device_no)
print("device name: ", self.device_info["name"])
print("rate: ", self.rate, " chunk: ", self.chunk)
# ストリームを開く
self.stream = self.pa.open(
format = pyaudio.paInt16,
channels = self.channels,
rate = self.rate,
input = True,
input_device_index = self.device_no,
frames_per_buffer = self.chunk,
stream_callback = self.callback
)
# 音声入力中かどうかのフラグ
self.is_recording = False
# 音声データの2次元配列(無音時には取り込まない)
self.data_array = [[]]
self.recording_no = 0
# 音声入力中の無音回数カウント
self.silences_count = 0
# 録音回数
self.recording_count = 0
# 無音期間の終了カウント数(コールバックの間隔が長いようなので1/4にする)
self.silences_max = int(self.rate / self.chunk * RECORD_SECONDS)
# 終了コマンド
self.command = ""
# キーボード入力受付
self.key_listener = Listener(on_press=self.on_press,on_release=self.on_release)
# 結果書き込み用ファイル
filename = "./data/text/hearing_" + datetime.now().strftime("%Y%m%d_%H%M%S_%f") + ".txt"
self.file = open(filename, 'w', encoding='UTF-8')
# 初期化完了
print("Initialization processing completed.")
# コールバック関数(サンプリング間隔で呼ばれ続ける)
def callback(self, in_data, frame_count, time_info, status):
try:
# 音声認識に投げるかどうか?
go_recognize = False
# 音量を数値化する
x = np.frombuffer(in_data, dtype="int16") / 37268.0
# レコーディング回数をカウントアップ
if self.is_recording:
self.recording_count += 1
# print("volume: {:.2f}".format(x.max()))
# しきい値以上の場合は、バッファに追加する
if x.max() > THRESHOLD:
self.silences_count = 0
if not self.is_recording:
print("sampling_start")
self.is_recording = True
self.data_array[self.recording_no].append(in_data)
else:
if (self.is_recording):
self.silences_count += 1
if (self.silences_count > self.silences_max):
print("sampling finished")
go_recognize = True
else:
self.data_array[self.recording_no].append(in_data)
# 終了ステータスを設定
if self.command == "quit":
ret_status = pyaudio.paComplete
else:
ret_status = pyaudio.paContinue
# 音声認識を行う場合
if go_recognize:
# DEBUG PRINT BEGIN
print("Process speech recognizing: #", self.recording_no,
" recording count:", self.recording_count,
" data_size:", len(self.data_array[self.recording_no]))
# DEBUG PRINT END
voice_time = float(self.chunk / self.rate * # perf-measure
len(self.data_array[self.recording_no])) # perf-measure
start_time = time.time() # perf-measure
# 録音番号を進める
rec_no = self.recording_no
self.recording_no += 1
# 空配列を追加する
self.data_array.append([])
# まずは、変数の初期化をしておく
self.recording_count = 0
self.is_recording = False
self.silences_count = 0
# 音声データを連結する
all_data = b''.join(self.data_array[rec_no])
time1 = time.time() - start_time # perf-measure
start_time = time.time() # perf-measure
# 入ってきたオーディオデータを変換する
audiodata = sr.AudioData(all_data, self.rate, 2)
# 翻訳してもらう(グーグルに聞く)
text = self.listener.recognize_google(audiodata, language="ja-JP")
time2 = time.time() - start_time # perf-measure
# DEBUG PRINT BEGIN
print("recognize: voice: {:.2f}[s]".format(voice_time),
" preprocess: {:.2f}[s]".format(time1),
" mainprocess: {:.2f}[s]".format(time2),
" text: {}[char]".format(len(text)))
# DEBUG PRINT END
if len(text) > 0:
self.write(text)
else:
print("Sorry, I could't hear your voice.")
# 配列をクリーンアップ
self.data_array[rec_no] = []
except sr.UnknownValueError as e1:
print(e1)
pass
except sr.RequestError as e2:
print(e2)
pass
except Exception as e3:
print(e3)
finally:
return None, ret_status
# テキストに変換した内容を書き出す
def write(self, text):
# 標準出力とファイルに書き出す
print(text)
self.file.write(text + "\n")
# キーボード制御
def on_press(self,key):
print('{0} pressed'.format(key))
def on_release(self,key):
if key == Key.esc:
self.command = "quit"
return True
# リスニング開始
def start(self):
self.stream.start_stream()
self.key_listener.start()
# 後処理
def stop(self):
self.file.close()
self.stream.stop_stream()
self.stream.close()
self.pa.terminate()
self.key_listener.stop()
self.key_listener = None
# メインループ
def main(self):
# 聞き取り開始
print("Start Listning...(Save text file)")
self.start()
print("To exit the program, \"ESC\" key press.")
#メインループ
while self.stream.is_active():
time.sleep(0.1)
# 終了
print("\nEnd Listning")
self.stop()
# メインプログラム
# 標準的なレートは 44100 だが、一番 48000 が聞き取り確率が高い
if __name__ == "__main__":
writer = WriteByListen(rate=48000)
writer.main()
いくつかポイントを説明します。
まず、callback関数を登録してチャンクサイズ=1024周期で呼び出すのですが、今使っているマイクの一般的な周波数は、44100Hz(丁度良い周波数は最終的には 48000Hzでした)です。つまり1秒間に43回もcallback関数が呼び出されます。そうなると喋りかけてもぶつ切り状態で音声認識が行われて、全く文字に変換ができません。そこで無音が少し続くまで音声を貯めて声が途切れたタイミングで一気に変換するようにしました。無音の判定は、numpyを使ってそのタイミングの最大音量で計測しています。
プログラムの終了については、ESCキー入力をpynputで監視して実行しています。
実際に音声認識をうごかしてみる
リアルタイムにニュース映像の音声を聞きながらテキスト翻訳を動かしてみました。これが高い精度で可能であれば、テレビやネットの映像情報から自動学習も夢じゃないと思います。
実際にプログラムを実行してみました。人がゆっくり話す分には、何も問題なく変換はできます。しかし、今回のようにニュースの音声からテキストを生成するのは、上手くできたという結果には至りませんでした。おそらく音声認識を実行している間は、録音が止まっているようです。なので動画のように翻訳中は止めてみたのですが、それでもテキスト生成は完全ではありませんでした。
ただ、微妙にポイントととなる部分のテキストが生成されているので、興味深いものもありました。
リアルタイムでのテキスト生成は、あまり上手くいきませんでしたが、ヒアリング&レコーディングで音声ファイルを作成しておき、非同期に音声ファイルからテキストファイルを作る方法についても試してみました。こちらは、少々大きな音声ファイルでも変換可能かと思いますので、将来的には、数秒単位にぶつ切りにせずにある程度のまとまった音声で認識すればもっとうまくテキスト生成できるだろうと思っています。
マイクからWaveファイルを作成するPythonプログラム:listern_to_wav.py
Waveファイルからテキストを生成するPythonプログラム:wav_to_text.py
以上
コメント