Python スレッドによる並行処理と終了の待ち合わせ、スレッド初期値の設定

Pythonで時間のかかる処理を並行して呼び出したいケースに遭遇したので、
スレッドについて調べてみます。

Python2



Python 3からはスレッドに関する便利なパッケージが追加されています。
concurrent.futures -- 並列タスク実行
asyncio --- 非同期 I/O

レンタルサーバーでPython2しか使用できないケースも(未だに)あるので、
Python2でも利用できることを前提に調べてみます。



最も簡単な例



threading.Threadのコンストラクタに関数を指定するだけでスレッド実行が行なえます。


  1. import threading
  2. import time
  3. def funA():
  4.     for i in range(5):
  5.         print('funA:%d' % i)
  6.         time.sleep(1)
  7. def funB():
  8.     for i in range(5):
  9.         print('funB:%d' % i)
  10.         time.sleep(1)
  11. if __name__ == '__main__':
  12.     threadA = threading.Thread(target=funA)
  13.     threadB = threading.Thread(target=funB)
  14.     threadA.start()
  15.     threadB.start()




実行結果


$ python sample.py
funA:0
funB:0
funA:1
funB:1
funA:2
funB:2
funA:3
funB:3
funA:4
funB:4



funA、funBが並列に実行されています。



threading.Thread



threading.Threadを継承してスレッドオブジェクトを定義することもできます。
start実行時、runメソッドが呼び出されます。


  1. import threading
  2. import time
  3. class Worker(threading.Thread):
  4.     # Thread.start時に呼び出される
  5.     def run(self):
  6.         for i in range(5):
  7.             print('Worker:%d' % i)
  8.             time.sleep(1)
  9. if __name__ == '__main__':
  10.     threadA = Worker()
  11.     threadA.start()
  12.     
  13.     threadB = Worker()
  14.     threadB.start()



実行結果


$ python sample.py
Worker:0
Worker:0
Worker:1
Worker:1
Worker:2
Worker:2
Worker:3
Worker:3
Worker:4
Worker:4





スレッドオブジェクトへの値の設定



せっかくオブジェクトにしたので、それぞれ別の個性を与えたいと思います。
オブジェクトに値を設定するにはコンストラクタを経由する方法があります。


  1. import threading
  2. import time
  3. class Worker(threading.Thread):
  4.     def __init__(self, worker_name):
  5.         threading.Thread.__init__(self)
  6.         self._worker_name = worker_name
  7.     # Thread.start時に呼び出される
  8.     def run(self):
  9.         for i in range(5):
  10.             print('Worker%s:%d' % (self._worker_name, i))
  11.             time.sleep(1)
  12. if __name__ == '__main__':
  13.     threadA = Worker('A')
  14.     threadA.start()
  15.     
  16.     threadB = Worker('B')
  17.     threadB.start()



実行結果


$ python sample.py
WorkerA:0
WorkerB:0
WorkerA:1
WorkerB:1
WorkerA:2
WorkerB:2
WorkerA:3
WorkerB:3
WorkerA:4
WorkerB:4




コンストラクタでなくても、別途値を設定するためのsetterを用意してやる手もあります。


  1. import threading
  2. import time
  3. class Worker(threading.Thread):
  4.     def set_name(self, worker_name):
  5.         self._worker_name = worker_name
  6.     # Thread.start時に呼び出される
  7.     def run(self):
  8.         for i in range(5):
  9.             print('Worker%s:%d' % (self._worker_name, i))
  10.             time.sleep(1)
  11. if __name__ == '__main__':
  12.     threadA = Worker()
  13.     threadA.set_name('A')
  14.     threadA.start()
  15.     
  16.     threadB = Worker()
  17.     threadB.set_name('B')
  18.     threadB.start()



実行結果


$ python sample.py
WorkerA:0
WorkerB:0
WorkerA:1
WorkerB:1
WorkerA:2
WorkerB:2
WorkerA:3
WorkerB:3
WorkerA:4
WorkerB:4





スレッド終了の待ち合わせ(join)



複数のスレッドで処理を行い、それぞれのスレッドで得られた結果を合算したい場合。


  1. import threading
  2. import time
  3. class Worker(threading.Thread):
  4.     def __init__(self, worker_name, max):
  5.         threading.Thread.__init__(self)
  6.         self._worker_name = worker_name
  7.         self._max = max
  8.         self._total = 0
  9.     # Thread.start時に呼び出される
  10.     def run(self):
  11.         for i in range(self._max):
  12.             print('Worker%s:%d' % (self._worker_name, i))
  13.             self._total += i
  14.             time.sleep(1)
  15.     def get_total(self):
  16.         return self._total
  17. if __name__ == '__main__':
  18.     threadA = Worker('A', 10)
  19.     threadA.start()
  20.     
  21.     threadB = Worker('B', 20)
  22.     threadB.start()
  23.     # すぐに実行されてしまう
  24.     gtotal = threadA.get_total() + threadB.get_total()
  25.     print('gtotal:%d' % gtotal)



実行結果


$ python sample.py
WorkerA:0
gtotal:0
WorkerB:0



スレッドで実行しているため、すぐに合計値の計算まで処理が進んでしまします。
runメソッドの終了を待つには、joinを呼び出します。


  1. import threading
  2. import time
  3. class Worker(threading.Thread):
  4.     def __init__(self, worker_name, max):
  5.         threading.Thread.__init__(self)
  6.         self._worker_name = worker_name
  7.         self._max = max
  8.         self._total = 0
  9.     # Thread.start時に呼び出される
  10.     def run(self):
  11.         for i in range(self._max):
  12.             print('Worker%s:%d' % (self._worker_name, i))
  13.             self._total += i
  14.             time.sleep(1)
  15.     def get_total(self):
  16.         return self._total
  17. if __name__ == '__main__':
  18.     threadA = Worker('A', 10)
  19.     threadA.start()
  20.     
  21.     threadB = Worker('B', 20)
  22.     threadB.start()
  23.     # スレッドの終了を待つ
  24.     threadA.join()
  25.     threadB.join()
  26.     # 計算結果を合計
  27.     gtotal = threadA.get_total() + threadB.get_total()
  28.     print('gtotal:%d' % gtotal)



実行結果


$ python sample.py
WorkerA:0
WorkerB:0
WorkerA:1
WorkerB:1
WorkerA:2
...
WorkerB:18
WorkerB:19
gtotal:235

関連記事

コメント

非公開コメント

プロフィール

Author:symfo
blog形式だと探しにくいので、まとめサイト作成中です。
Symfoware まとめ

PR




検索フォーム

月別アーカイブ