Pythonによる並列処理の書き方(joblib および concurrent.futures)

joblibとconcurrent.futuresを使った並列処理の実現方法についてそれぞれメモを残しておく。Pythonによる並列処理の関連記事はいくらでも見つかるのだが、個人的な備忘録ということで。

想定している状況:処理対象となるファイル数が多いので、直列で処理を動かすと時間がかかる。それを並列処理させることで、トータルでの実行時間の削減を目指す。例えば音声データ群(よくあるのは数百から数千ファイル)からの特徴量抽出を考えると、通常は音声ファイルごとに独立して抽出を行うので、並列処理に向いているというわけである。

サンプルコードの仕様:

  • 並列させたい処理: 関数 your_function にて定義
  • 処理対象となるファイル群: リスト変数 file_list で保持しておく

joblibを使う方法

joblibをpipでインストールしたのち、Parallelクラスおよびdelayedモジュールをインポートして使う。

from joblib import Parallel, delayed

def your_function(file_name):
    """並列処理実行
        
    Args:
         file_name (str): 並列処理の対象となるファイル
    """
    # 何らかの処理

Parallel(n_jobs=4)( # job数はお好みで
    delayed(your_function)(_file) for _file in file_list
)

delayedに関数を与えて、続いてその関数の引数を与える形である。

concurrent.futuresを使う方法

concurrent.futuresからProcessPoolExecutorクラスをインポートして使う。

from concurrent.futures import ProcessPoolExecutor

def your_function(file_name):
    """並列処理実行
        
    Args:
         file_name (str): 並列処理の対象となるファイル
    """
    # 何らかの処理

with ProcessPoolExecutor(max_workers=4) as executor: # max_workersがjob数に対応
    futures = [executor.submit(your_function, _file) for _file in file_list]
    for future in futures:
        future.result()

executor.submitの第1引数が並列処理対象の関数で、以降は関数の引数を与える形である。当然、関数の引数は複数個与えてもOK。

マルチスレッド用のThreadPoolExecutorもあるが、本記事では省略。

参考

github.com

tech.morikatron.ai