はじめに
concurrent.futures.ProcessPoolExecutorは便利そうなので、Poolの代わりに使ってみようと思います。
17.2. multiprocessing — プロセスベースの並列処理 — Python 3.6.5 ドキュメント
17.4. concurrent.futures – 並列タスク実行 — Python 3.6.5 ドキュメント
相違点
非同期で使えることを除くと、以下のような違いがあります。
- ProcessPoolExecutor.mapは複数の引数をサポートする。つまり普通のmapみたいに使える
- ただし返り値がitertools.chainになる。リストでほしければ自分で変換する
利便性の観点からすると、まあまあよさげです。
実験1:使えるかどうか確認する
とりあえず同等に使えることを確認するため、次のようなコードを書いてみました。
import time import numpy as np from multiprocessing import Pool from concurrent.futures import ProcessPoolExecutor def f1(x): a, b = x return a * b def f2(a, b): return a * b def main(): p1 = Pool(4) p2 = ProcessPoolExecutor(4) a, b = np.random.rand(2, 10**2, 10**2) alst = [a+x for x in range(1000)] blst = [b+x for x in range(1000)] t1 = time.time() result1 = p1.map(f1, zip(alst, blst)) t2 = time.time() print("Pool:{:.6f}".format(t2 - t1)) t1 = time.time() result2 = list(p2.map(f2, alst, blst)) t2 = time.time() print("ProcessPoolExecutor:{:.6f}".format(t2 - t1)) print(np.array_equal(result1, result2)) if __name__ == "__main__": main() # 結果 """ => Pool:0.586397 ProcessPoolExecutor:1.137316 True """
動いてるけど、おっそ。ドキュメントには「長いデータ渡すときはchunksizeを指定すると速くなるよ」と書いてあったので、そうしてみます。
# 中略 result2 = list(p2.map(f2, alst, blst, chunksize=128)) # 中略 """ => Pool:0.637173 ProcessPoolExecutor:0.655285 True """
何回か回しているとなんとなく微妙に遅い気もするのですが(たぶん5%~10%くらい?)、とりあえず一応ほぼ互角。
実験2:メソッドを試す
インスタンスメソッドでも行けるかどうか見ます。
import time import numpy as np from multiprocessing import Pool from concurrent.futures import ProcessPoolExecutor class F: def f1(self, x): a, b = x return a * b def f2(self, a, b): return a * b def main(): p1 = Pool(4) p2 = ProcessPoolExecutor(4) a, b = np.random.rand(2, 10**2, 10**2) alst = [a+x for x in range(1000)] blst = [b+x for x in range(1000)] f = F() t1 = time.time() result1 = p1.map(f.f1, zip(alst, blst)) t2 = time.time() print("Pool:{:.6f}".format(t2 - t1)) t1 = time.time() result2 = list(p2.map(f.f2, alst, blst, chunksize=128)) t2 = time.time() print("ProcessPoolExecutor:{:.6f}".format(t2 - t1)) print(np.array_equal(result1, result2)) if __name__ == "__main__": main()
普通に走ったので問題なさそう。古めのPythonだとメソッドは渡せなかったはずなのですが、最近は渡せるみたいですね。
実験3:if __name__ == "__main__":を外す
multiprocessingを使っているとたまにこれがないと落ちることがあるみたいなので、外してみます。
import time import numpy as np from multiprocessing import Pool from concurrent.futures import ProcessPoolExecutor def f1(x): a, b = x return a * b def f2(a, b): return a * b p1 = Pool(4) p2 = ProcessPoolExecutor(4) a, b = np.random.rand(2, 10**2, 10**2) alst = [a+x for x in range(1000)] blst = [b+x for x in range(1000)] t1 = time.time() result1 = p1.map(f1, zip(alst, blst)) t2 = time.time() print("Pool:{:.6f}".format(t2 - t1)) t1 = time.time() result2 = list(p2.map(f2, alst, blst, chunksize=128)) t2 = time.time() print("ProcessPoolExecutor:{:.6f}".format(t2 - t1)) print(np.array_equal(result1, result2))
これも問題なかったです。
結論
概ね同等に使えます。なにしろ引数を複数渡せるのが便利なところで、wrapper関数をわざわざ書かなくて済みます。あとはlistに変換するまでは非同期で動いている訳で、それを期待して使うのも状況次第ではありでしょう。
ただしchunksizeは明示的に大きめに指定することです。