PyAudioとPySPTKで音声を逐次分析合成し続けるPythonスクリプト

表題のとおり。今回はPyWorldではなくPySPTKである。
フレーム化処理にlibrosaも必要である。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import numpy as np
import pyaudio
import pysptk
import librosa
from pysptk.synthesis import MLSADF, Synthesizer

sample_rate = 16000            # サンプリング周波数
input_buffer_size = 1024 * 10  # バッファサイズ(入力)
output_buffer_size = 2048      # バッファサイズ(出力)


# 音声の分析条件
frame_length = 512       # 分析窓長
frame_shift = 80         # 分析窓の移動量
min_f0 = 60              # 探索する基本周波数の下限値
max_f0 = 240             # 探索する基本周波数の上限値

# メルケプストラムの抽出条件
order = 25               # メルケプストラムの次数
alpha = 0.41             # 周波数軸の伸縮パラメタ
floor = 0.001            # フロア値


# 分析再合成
def analysis_resynthesis(signal):

    # フレーム化処理
    frames = librosa.util.frame(
        signal, frame_length=frame_length,
        hop_length=frame_shift).astype(np.float64).transpose()

    # 窓掛け
    frames *= pysptk.blackman(frame_length)

    # ピッチの抽出
    pitch = pysptk.swipe(signal, fs=sample_rate, hopsize=frame_shift,
                         min=min_f0, max=max_f0, otype="pitch")

    # メルケプストラムの抽出
    mc = pysptk.mcep(frames, order=order, alpha=alpha, eps=floor, etype=1)

    # ディジタルフィルタ係数に変換
    b = pysptk.mc2b(mc, alpha)

    # 励振信号の作成
    source_excitation = pysptk.excite(pitch, frame_shift)

    # 音声の再合成
    synthesizer = Synthesizer(MLSADF(order=order, alpha=alpha), frame_shift)
    synthesized = synthesizer.synthesis(source_excitation, b)

    return synthesized


if __name__ == "__main__":

    audio = pyaudio.PyAudio()

    stream_in = audio.open(format=pyaudio.paInt16,
                           channels=1,
                           rate=sample_rate,
                           frames_per_buffer=input_buffer_size,
                           input=True)

    stream_out = audio.open(format=pyaudio.paInt16,
                            channels=1,
                            rate=sample_rate,
                            frames_per_buffer=output_buffer_size,
                            output=True)

    try:
        print("分析合成を開始します。話しかけてください。")
        while stream_in.is_active():
            input = stream_in.read(input_buffer_size,
                                   exception_on_overflow=False)
            # PyAudioで取得直後はバイト列なので、16bit整数に変換後、64bit float値にキャスト
            signal = np.frombuffer(input, dtype='int16').astype(np.float64)

            # 分析再合成
            output = analysis_resynthesis(signal)

            # float値を16bit整数にキャストし、バイト列に戻す
            stream_out.write(output.astype(np.int16).tobytes())

    except KeyboardInterrupt:
        print("\nInterrupt.")

    finally:
        stream_in.stop_stream()
        stream_in.close()
        stream_out.stop_stream()
        stream_out.close()
        audio.terminate()
        print("Stop Streaming.")