概要
ThreadPoolExecutorにはinitializerという便利そうなオプションがあります。でもリファレンスの説明があっさりしていて、挙動がよくわからなかったので調べました。
先に断っておくと、このオプションはPython3.7で追加されたもので、それ以前のバージョンでは存在しません。その場合の代替案も書いておくので参考にしてください。
はじめに
とりあえず先に書いておくと、concurrent.futures.ThreadPoolExecutorは以下のように使えるものです。
import time import threading from concurrent.futures import ThreadPoolExecutor def job(t): print("in job:", t, int(time.time()) % 1000, threading.get_ident()) time.sleep(t) return t print(threading.get_ident()) with ThreadPoolExecutor(max_workers=2) as executor: futures = [executor.submit(job, i) for i in range(6)] result = [f.result() for f in futures] print("finished:", int(time.time()) % 1000) print(result) """ => 140248999868224 in job: 0 233 140248964409088 in job: 1 233 140248964409088 in job: 2 233 140248955684608 in job: 3 234 140248964409088 in job: 4 235 140248955684608 in job: 5 237 140248964409088 finished: 242 [0, 1, 2, 3, 4, 5] """
他の使い方もありますが、今回はこれしか検討しません。順番通りに結果が得られた方が都合が良いことの方が多いでしょう。
リファレンス
concurrent.futures -- 並列タスク実行 — Python 3.8.2 ドキュメント
表示されているUNIX時間の下3桁と、threading.get_identで取得されているスレッドのIDに注目しておいてください。親スレッドと、他に子スレッド2つが存在していることがわかります。ThreadPoolExecutorなので子スレッドは2つっきりで、同じスレッドが最初から最後まで使いまわされます。submitされたものはキューに突っ込まれて、空いているスレッドに放り込まれて終了までそのスレッドで実行されると考えるとわかりやすいでしょう。
なお、後述するinitializerはget_identでIDを取らないと基本的に役に立たないと思います。
initializerを使う
initializerはスレッド開始時の処理を書いておくと良いようです。
ちょっと改造したプログラムです。
import time import threading from concurrent.futures import ThreadPoolExecutor def initializer(): print("in init", threading.get_ident()) def job(t): print("in job:", t, int(time.time()) % 1000, threading.get_ident()) time.sleep(t) return t print(threading.get_ident()) with ThreadPoolExecutor(max_workers=2, initializer=initializer) as executor: futures = [executor.submit(job, i) for i in range(6)] result = [f.result() for f in futures] print("finished:", int(time.time()) % 1000) print(result) """ => 140134438733632 in init 140134403274496 in job: 0 369 140134403274496 in init 140134394550016 in job: 1 369 140134403274496 in job: 2 369 140134394550016 in job: 3 370 140134403274496 in job: 4 371 140134394550016 in job: 5 373 140134403274496 finished: 378 [0, 1, 2, 3, 4, 5] """
スレッド開始時に一回だけ呼ばれている様子がここからわかります。
そうは言ってもどう使うの? と思う人もいるかもしれませんが、意外と使いではあります。「スレッドごとに一回だけ最初にやれば後は使いまわせるが、最初の1回にはそこそこ時間がかかる」ような処理を考えてみます。たとえばI/OとかDBのコネクションを作るのに時間がかかる、みたいなシチュエーションでしょうか。
問題は、initializerはtargetとスコープを共有してくれたりはしない(そんな特殊なことはできるはずもない)ので、いまいち使いづらいことです。けっきょく自分でリソース管理を書かないといけません。といっても、GILがあるので、スレッドごとにリソースを確保して互いに触らない、という条件なら同期を考える必要まではありません。スレッドごとに独立にリソースアクセスできるようにグローバル変数でdictを置いておけば十分でしょう。
import time import random import threading from concurrent.futures import ThreadPoolExecutor resources = dict() def initializer(): id_ = threading.get_ident() resources[id_] = random.random() print("in", id_, ", its resource is ", resources[id_]) def job(t): print(threading.get_ident(), resources[threading.get_ident()]) time.sleep(2) return t print(threading.get_ident()) with ThreadPoolExecutor(max_workers=2, initializer=initializer) as executor: futures = [executor.submit(job, i) for i in range(6)] result = [f.result() for f in futures] print("finished:", int(time.time()) % 1000) print(result) """ => 140374921525056 in 140374878914304 , its resource is 0.06718002352497798 140374878914304 0.06718002352497798 in 140374870189824 , its resource is 0.4636817738099904 140374870189824 0.4636817738099904 140374878914304 0.06718002352497798 140374870189824 0.4636817738099904 140374878914304 0.06718002352497798 140374870189824 0.4636817738099904 finished: 781 [0, 1, 2, 3, 4, 5] """
代替案
普通にtarget先頭でif使えばいいのでは・・・
import time import random import threading from concurrent.futures import ThreadPoolExecutor resources = dict() def job(t): id_ = threading.get_ident() if id_ not in resources: resources[id_] = random.random() print(id_, int(time.time()) % 1000, resources[id_]) time.sleep(2) return t print(threading.get_ident()) with ThreadPoolExecutor(max_workers=2) as executor: futures = [executor.submit(job, i) for i in range(6)] result = [f.result() for f in futures] print("finished:", int(time.time()) % 1000) print(result) """ => 140465058035520 140465015424768 933 0.8531529694670728 140465006700288 933 0.279548446733512 140465015424768 935 0.8531529694670728 140465006700288 935 0.279548446733512 140465015424768 937 0.8531529694670728 140465006700288 937 0.279548446733512 finished: 939 [0, 1, 2, 3, 4, 5] """
最初の一回だけ実行されればいいので、なければないでなんとかなります。initializer使うとちょっと見通しが良いかな?
まとめ
なんかあってもなくても良いような気がしてきましたが、とにかくこんな感じで使えはします。
どちらかというと同じインターフェースのProcessPoolExecutorの方が使いでがあるかもしれません。プロセス内のグローバル変数として確保すれば素直に使えるはずだからです。インターフェースを揃えないといけないのでThreadPoolExecutorにもつけたとか・・・
もっと他に有益な使い方がある、ということを知っている人は教えてください。