Bakulog

獏の夢日記的な何か。

【Python】PCのマイクで拾った音をPepperに送って再生させる【qi Framework】

C#で試した内容をPythonでも確認しました、というだけの内容です。

  2016/3/15追記: 本記事のプログラムはNuGetパッケージとして公開してるBaku.LibqiDotNetのver1.0.1では動作しますが、GitHubの最新コードやNuGetパッケージの最新版(2.0)では動作しません。ご注意下さい。  

もくじ 1. はじめに 2. サンプルコード 3. まとめ

   

はじめに

今回は「Python/qi Frameworkを使ってPepperのマイクからストリーミングする」の対極となる処理、つまり手元のマイクに向かって喋った音をPepperに投げつけて再生させる、言い換えるとPepperの口を乗っ取る方法を紹介します。

 

本記事の内容はC#で全く同じ事をやったものPython移植版になります。実装にあたってはC++の事例紹介であるアルデバラン社の方が非公式で試された方法紹介公式ドキュメントの当該箇所を参考にしていますが、特にポイントになる点は2点だけです。

  • Pythonでも他の言語と同様ALAudioDevicesendRemoteBufferToOutput関数を使って音声再生ができる
  • Pythonプログラムでマイク入力を受け取るにはPyAudioライブラリなどを使えばOK

 

Pythonでマイク入力を拾えるライブラリは他にもあるようですが、「Python/qi Frameworkを使ってPepperのマイクからストリーミングする」とライブラリが揃ってる方が何かと便利そうだったため、今回は同じものを使っています。

   

サンプルコード

コマンドラインからIPやらポートやら指定できる機能(argparseの部分のコードはqi Frameworkの公式ドキュメントのサンプルコピペをもとにしています)などをつけて、こんな感じに書けます。

# -*- coding: utf-8 -*-

"""Pepperへ音声を投げるスクリプト"""

import argparse
import sys
import time

import qi
from pyaudio import PyAudio

fut = None

class SoundDetector(object):
    def __init__(self, callback):
        self.pa = PyAudio()
        self.listen_callback = callback
        self.stream = self.open_mic_stream()

    def stop(self):
        self.stream.close()

    def find_input_device(self):
        device_index = None            
        for i in range( self.pa.get_device_count() ):     
            devinfo = self.pa.get_device_info_by_index(i)   
            print( "Device %d: %s"%(i,devinfo["name"]) )

            for keyword in ["mic","input"]:
                if keyword in devinfo["name"].lower():
                    print( "Found an input: device %d - %s"%(i,devinfo["name"]) )
                    device_index = i
                    return device_index

        if device_index == None:
            print( "No preferred input found; using default input device." )

        return device_index

    def open_mic_stream( self ):
        device_index = self.find_input_device()

        stream = self.pa.open(
            format = self.pa.get_format_from_width(2),
            channels = 2,
            rate = 16000,
            input = True,
            input_device_index = device_index,
            frames_per_buffer = 1600
            )

        return stream

    def listen(self):
        try:
            block = self.stream.read(1600)
        except IOError, e:
            print(e.message)
            return

        if self.listen_callback != None:
            self.listen_callback(block)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--ip", type=str, default="127.0.0.1",
                        help="Robot IP address. On robot or Local Naoqi: use '127.0.0.1'.")
    parser.add_argument("--port", type=int, default=9559,
                        help="Naoqi port number")

    args = parser.parse_args()
    try:
        # Initialize qi framework.
        connection_url = "tcp://" + args.ip + ":" + str(args.port)
        app = qi.Application(["sound_setter", "--qi-url=" + connection_url])
    except RuntimeError:
        print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " + str(args.port) +".\n"
               "Please check your script arguments. Run with -h option for help.")
        sys.exit(1)

    #こっから主処理

    session = app.session
    session.connect(connection_url)
    ad = session.service("ALAudioDevice")
    ad.setParameter("outputSampleRate", 16000)

    def sound_callback(block):
        global fut

        print("get block, len = {0}".format(len(block)))
        #blocking style
        #ad.sendRemoteBufferToOutput(len(block) / 4, block)

        #non-blocking style
        if fut != None:
            fut.value()
        fut = qi.async(ad.sendRemoteBufferToOutput, len(block) / 4, block)

    sd = SoundDetector(sound_callback)

    listen_time = 1000
    try:
        for i in range(listen_time):
            print("{0} / {1}".format(i, listen_time))
            sd.listen()
        time.sleep(1.0)

    except KeyboardInterrupt:
        sd.stop()
        if fut != None:
            #wait until async call completes
            fut.value()
        time.sleep(1.0)

 

実行時には下記のようにIPとポートを引数で指定して実行します。

#名前での指定
python pepper_sound_output.py --ip pepper.local --port 9559
#あるいはこういう感じ
python pepper_sound_output.py --ip 192.168.xx.xx --port 9559

 

うまく行けばマイクに向かって喋った音がPepperから再生されます。

   

まとめ

マイクの入出力が両方とも拾えるようになったので、適切に組み合わせればPepperを経由した会話が本体のAPI主体で(タブレットに頼らないでも)可能になることが分かりました。本ブログではC#Pythonに関しての紹介になりましたが当然Java(というかAndroid)等でも同様に出来るはずです。音声合成にこだわらないのであれば、ちょっと特殊なアプリケーション作成に役立つのではないかなあ、と思います。