Bakulog

獏の夢日記的な何か。

Python/qi Frameworkを使ってPepperのマイクからストリーミングする

前に書いた記事の続編です。たぶんコッチの方が使い勝手がいいです。

もくじ

  1. はじめに
  2. NAOqi Frameworkとqi Frameworkってどう違うの?
  3. 公式のサンプルに近いのがあるじゃないか!
  4. リアルタイム再生機能を追加したコード
  5. まとめ

  

はじめに

本記事ではPepperが聞いている音をリアルタイムにLAN内のPCで受信して聞き取る、という盗聴まがいのことをする方法を紹介します。Pepperの聞いてる音が聞けると裏側にオペレータが居る場合に運用しやすいとか、音声認識の精度が低い時に音響環境がチェックできるといった利点があり、完全自律で動かす場合以外であれば知ってると便利な小ネタです。

この話題は以前の記事「【Python】Pepperマイク to PCスピーカー【Pepper】」でも紹介したのですが、前の記事で紹介した方法はPepperと通信できるフレームワークの中でもNAOqi Frameworkという古いものを使っていました。現在ではこれに代わる新しいフレームワークとしてqi Frameworkというのが公開されており、新しいコードを書く際はqi Frameworkを使うことが推奨されています。

 

そこで、本記事ではqi Frameworkで以前やってみたマイクからの集音処理を置き換えてみます。PepperにとってスタンダードなPython2.7を使う方法に加え、本記事の次の記事では私が以前にラッパーを作ったC#で同様の事をする方法も紹介する予定です1

   

そもそもNAOqi Frameworkとqi Frameworkってどう違うの?

細かい差はいくつかありますが、基本的にはqi Frameworkの方が設計改善されてシンプルで使いやすい!という程度の認識で良いと思います。カッチリ説明しようとすると長くなってしまうので「【和訳記事】NAOqiのプログラムをqi Frameworkに移行しよう!」等をご覧ください。同記事ではPythonのコードを踏まえながら要点を説明しています。

   

公式のサンプルに近いのがあるじゃないか!

一からコードを書いてもいいのですが、ラクをしたいので関連ドキュメンテーションを頑張って漁ってみたところ、Choregraphe 2.4の付属ドキュメンテーションであるNAO Documentationに非常に近い例がありました。ライセンス上ちょっとグレーですが、思い切ってスクリプトを丸ごとこちらに掲載してしまいます。本来はNAO Documentationのトップディレクトリから見て"/dev/python/examples/audio/audio_soundprocessing.html"という箇所に下記のソースが載っているので、ドキュメンテーションの正しい場所が追えそうな場合はそちらもご確認下さい。Windowsの場合NAO Documentationはインストール後に自動で配置されるデスクトップアイコンをダブルクリックして開くか、あるいはスタートメニューに"NAO"とか入れて検索することでも普通に見つかります。

 

[expand title="公式のサンプルコード(クリックで展開)"]

#! /usr/bin/env python
# -*- encoding: UTF-8 -*-

"""Example: Get Signal from Front Microphone & Calculate its rms Power"""


import qi
import argparse
import sys
import time
import numpy as np


class SoundProcessingModule(object):
    """
    A simple get signal from the front microphone of Nao & calculate its rms power.
    It requires numpy.
    """

    def __init__( self, app):
        """
        Initialise services and variables.
        """
        super(SoundProcessingModule, self).__init__()
        app.start()
        session = app.session

        # Get the service ALAudioDevice.
        self.audio_service = session.service("ALAudioDevice")
        self.isProcessingDone = False
        self.nbOfFramesToProcess = 20
        self.framesCount=0
        self.micFront = []
        self.module_name = "SoundProcessingModule"

    def startProcessing(self):
        """
        Start processing
        """
        # ask for the front microphone signal sampled at 16kHz
        # if you want the 4 channels call setClientPreferences(self.module_name, 48000, 0, 0)
        self.audio_service.setClientPreferences(self.module_name, 16000, 3, 0)
        self.audio_service.subscribe(self.module_name)

        while self.isProcessingDone == False:
            time.sleep(1)

        self.audio_service.unsubscribe(self.module_name)

    def processRemote(self, nbOfChannels, nbOfSamplesByChannel, timeStamp, inputBuffer):
        """
        Compute RMS from mic.
        """
        self.framesCount = self.framesCount + 1

        if (self.framesCount <= self.nbOfFramesToProcess):
            # convert inputBuffer to signed integer as it is interpreted as a string by python
            self.micFront=self.convertStr2SignedInt(inputBuffer)
            #compute the rms level on front mic
            rmsMicFront = self.calcRMSLevel(self.micFront)
            print "rms level mic front = " + str(rmsMicFront)
        else :
            self.isProcessingDone=True

    def calcRMSLevel(self,data) :
        """
        Calculate RMS level
        """
        rms = 20 * np.log10( np.sqrt( np.sum( np.power(data,2) / len(data)  )))
        return rms

    def convertStr2SignedInt(self, data) :
        """
        This function takes a string containing 16 bits little endian sound
        samples as input and returns a vector containing the 16 bits sound
        samples values converted between -1 and 1.
        """
        signedData=[]
        ind=0;
        for i in range (0,len(data)/2) :
            signedData.append(data[ind]+data[ind+1]*256)
            ind=ind+2

        for i in range (0,len(signedData)) :
            if signedData[i]>=32768 :
                signedData[i]=signedData[i]-65536

        for i in range (0,len(signedData)) :
            signedData[i]=signedData[i]/32768.0

        return signedData


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--ip", type=str, default="127.0.0.1",
                        help="Robot IP address. On robot or Local Naoqi: use '127.0.0.1'.")
    parser.add_argument("--port", type=int, default=9559,
                        help="Naoqi port number")

    args = parser.parse_args()
    try:
        # Initialize qi framework.
        connection_url = "tcp://" + args.ip + ":" + str(args.port)
        app = qi.Application(["SoundProcessingModule", "--qi-url=" + connection_url])
    except RuntimeError:
        print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " + str(args.port) +".\n"
               "Please check your script arguments. Run with -h option for help.")
        sys.exit(1)
    MySoundProcessingModule = SoundProcessingModule(app)
    app.session.registerService("SoundProcessingModule", MySoundProcessingModule)
    MySoundProcessingModule.startProcessing()

[/expand]

 

上記のコードでは20個分だけデータを落として波形に基づいた計算を行い、プログラムが終了するようになっています。これを少し改造し、以前の記事でも紹介したPyAudioと組み合わせればリアルタイム再生ができそうです。

なお、上のような「クラスを定義してregisterServiceでサービスとして登録する」という処理手順は「Python/qi FrameworkでPepperに自作機能を登録する手順」でも紹介してるので、より詳しい事についてはこちらをご覧ください。

   

リアルタイム再生機能を追加したコード

上記のサンプルに「【Python】Pepperマイク to PCスピーカー【Pepper】」で使っているPyAudioを組み合わせると次のようなコードになります。

 

# -*- coding: utf-8 -*-

"""Pepperが聞いている音声をPC側に転送してリアルタイム再生するスクリプト"""

import argparse
import sys
import time

import qi
from pyaudio import PyAudio

class SoundDownloadPlayer(object):
    """
    Service for download and play sound which Pepper here in real-time.
    PyAudio is required.
    """

    def __init__( self, app):
        """Initialize service and PyAudio stream"""

        super(SoundDownloadPlayer, self).__init__()
        app.start()
        session = app.session

        self.robot_audio = session.service("ALAudioDevice")

    @qi.nobind
    def start(self, serviceName):
        """Start processing"""

        self.pyaudio = PyAudio()
        #引数の意味について:
        #format "2": 量子化ビット数が16bit=2byte
        #channels "1": モノラルマイク
        #rate "16000": サンプリングレート
        self.pyaudioStream = self.pyaudio.open(
            format=self.pyaudio.get_format_from_width(2),
            channels=1,
            rate=16000,
            output=True
            )
        #16000Hz, 3は「前方マイク」、0は「インターリーブ無し」を意味する
        self.robot_audio.setClientPreferences(serviceName, 16000, 3, 0)
        self.robot_audio.subscribe(serviceName)
        #Ctrl + C で止める想定
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            pass

        self.robot_audio.unsubscribe(serviceName)
        self.pyaudioStream.close()
        self.pyaudio.terminate()
        print("SoundDownloadPlayer stopped successfully.")

    def processRemote(self, nbOfChannels, nbOfSamplesByChannel, timeStamp, inputBuffer):
        """Write to pyaudio stream buffer to play real-time"""
        #print("process remote called")
        self.pyaudioStream.write(str(inputBuffer))

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--ip", type=str, default="127.0.0.1",
                        help="Robot IP address. On robot or Local Naoqi: use '127.0.0.1'.")
    parser.add_argument("--port", type=int, default=9559,
                        help="Naoqi port number")

    args = parser.parse_args()
    try:
        # Initialize qi framework.
        connection_url = "tcp://" + args.ip + ":" + str(args.port)
        app = qi.Application(["MySoundDownloadPlayer", "--qi-url=" + connection_url])
    except RuntimeError:
        print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " + str(args.port) +".\n"
               "Please check your script arguments. Run with -h option for help.")
        sys.exit(1)

    player = SoundDownloadPlayer(app)
    app.session.registerService("MySoundDownloadPlayer", player)
    player.start("MySoundDownloadPlayer")

 

使う時はコマンドラインで接続先PepperのIPとポートを指定します。例えば上記のスクリプトを"pepper_sound.py"として保存した場合はこんな感じ。

python pepper_sound.py --ip 192.168.xx.xx --port 9559

 

うまく動作すればPepperが聞いてる音がそのままPC上で聞けます。

   

まとめ

以前の記事とは使ってるフレームワークこそ違いますが、だいたい同じようなノリで同じ事が出来ること事が分かりました。本記事で参考にしたNAO Documentationにはけっこう有用な情報が転がっているので、時間に余裕がある際はぜひガッツリ読んでみてください。  


  1. 実は本記事の内容は「C#でPepperのマイク音を拾うにはどうすればいいんだろう?」と頑張ってみた結果得られた知見に基づいています。Pythonと違ってC#では現状NAOqi Frameworkを使う方法が存在せず、以前の記事に書いた方法を移植するわけにもいかなかったのでqi Framework上での実装法を詳しく調べた、という経緯があります。