スレッドプログラミング (pthread)

(2007.6.26 新規作成。2007.7.11 更新。)

C言語でのpthreadの関数の使い方について簡単に解説します。C++だと boost とかを使ったほうがいいですが、内部でどのように動いているか見るためにprimitiveなライブラリを使います。

Windows でも互換ライブラリがあります。vcpkg package - pthreads

まず、わざとデッドロックさせてみます。ここまではすでにいろいろなサイトで解説されています。加えて、どうやってデッドロックを回避するか考えてみます。

以下は、Fedora 7 Linuxでテストしています。

題材

題材ですが、あまりにメジャーな「哲学者たちの食事」Dining Philosophers Problem を用います。

哲学者たちの食事;

専ら考えることを生業とする5人の哲学者たちがひとつの丸い食卓に座っている。中央には大きな皿にスパゲッティが盛ってある。5つのフォークが哲学者たちの間に置いてある。また各哲学者の前には取り皿がおいてある。

思考中腹がへったことに気がついた哲学者は、やおら自分の左側に置いてあるフォークを左手にもち、更に右手で右側にあったフォークをとり、二つのフォークを使って自分の皿にスパゲッティをとり、食べる。終わったら二つのフォークを元に戻し再び思考を開始する。

さてここで、たまたま5人の哲学者たちが同時にスパゲッティを取ろうと自分の左側のフォークをとったらどうなるであろうか? 5人とも右側のフォークが空くまで永久に待ちつづけてしまい、飢餓状態に陥る

哲学者がそれぞれ並列に動くタスク, フォークが共有資源です。ちな、この問題を検索すると、取り皿ではなく、目の前にスパゲティを盛り付けた画像が多い。それだと食べるフォークをシェアしてて、いややん?


図の出典 http://yucchi.jp/blog/?p=2038

ヘッダファイル

まずは適切な関数を使えるように、マクロを定義します。

_XOPEN_SOURCE_POSIX_C_SOURCE を定義します。これで IEEE 1003.1-2004 POSIX (Single UNIX Specification; SUSv3) になります。

Linuxの場合, /usr/include/features.h を見ると, _XOPEN_SOURCE を定義しておけば _POSIX_C_SOURCE は自動的に定義されるようです。

_XOPEN_SOURCE対応する _POSIX_C_SOURCE
700 200809L
600 200112L
500 199506L
< 500 2

_REENTRANT は obsolete. _POSIX_C_SOURCE=199506L と同じ。もはや移植性の観点から, 定義しないほうがいい。

それからヘッダファイルをincludeします。ファイルは<pthread.h>です。

  10| #define _XOPEN_SOURCE 600
  11| // 199506L以上に設定する
  12| #define _POSIX_C_SOURCE 200112L
  13| #undef _REENTRANT
  14| 
  15| #include <pthread.h>
  16| #include <stdio.h>
  17| #include <unistd.h>
  18| #include <stdlib.h>

哲学者を用意

それぞれの哲学者にスレッドを割り当てましょう。

スレッドの生成は pthread_create(), スレッドの終了の待ち合わせは pthread_join() です。

int pthread_create(pthread_t* newthread, pthread_attr_t* attr, void* (*start_routine)(void*), void* arg)
新しいスレッドを生成し、start_routine(arg) を呼び出す。attr には新しいスレッドに適用するスレッド属性を指定する。attrNULL でもよく、その場合はデフォルトの属性が用いられる。

スレッドの生成に成功すると、生成したスレッドの識別子を *newthread に格納し、pthread_create() は 0 を返す。失敗した場合は、非0のエラーコードを返す。

新しいスレッドでは、start_routine() から抜けるか、pthread_exit() を呼び出すと終了する。

int pthread_join(pthread_t th, void** thread_return)
th で指定するスレッドの終了を待つ (ブロックする)。指定したスレッドがすでに終了していたときはすぐに戻る。

thread_returnNULL でないときは、th のスレッドの終了コードが *thread_return に格納される。pthread_join() は、成功すると 0 を返し、エラーが発生すると非0を返す。

スレッドは終了しても資源を解放しない。pthread_join() されたときに解放される。したがって、各スレッドについて pthread_join() を1回ずつ呼び出さなければならない。

以上を踏まえて、main() を書いてみます。テーブルに哲学者を5人並べます。

(2007.7.24追記。)
ここでは pthread_create() に哲学者の番号として (i + 1) を渡しています。呼び出し元の自動変数へのポインタを渡してはいけないことに注意してください。生成されたスレッドがオブジェクトにアクセスしようとしたときに、呼び出し元のスレッドでは、すでにその自動変数のスコープから抜けている可能性があります。

  97| int main() {
  98|   pthread_t threadid[5];
  99|   int i;
 100| 
 101|   srand(time(NULL));
 102| 
 103|   for (i = 0; i < 5; i++) {
 104|     // pthread_create()はスレッドを生成する。スレッド生成に成功すると、threadidを
 105|     // 設定する。
 106|     if (pthread_create(&threadid[i], NULL, thread_main, (void*) (i + 1))) {
 107|       printf("pthread_create() error\n");
 108|       return 1;
 109|     }
 110|   }
 111| 
 112|   for (i = 0; i < 5; i++) {
 113|     void* ret = NULL;
 114|     // pthread_join()は、スレッドが終了するまでブロックする。
 115|     // 指定したスレッドがすでに終了していれば何もしない。
 116|     if (pthread_join(threadid[i], &ret)) {
 117|       printf("pthread_join() error.\n");
 118|       return 1;
 119|     }
 120|   }
 121| 
 122|   return 0;
 123| }

しかし、実際的な問題では、ブロックしないように、スレッドが終了してから pthread_join() を呼ぶべきです。

よくあるシチュエィションは、GUIを持つ主スレッド (メインスレッド) から、裏で仕事をするワーカスレッドを起動します。ワーカースレッドは、自分が終了するときにイベントなり何なりをメインスレッドに投げ、メインスレッドではそのイベントをトリガとして pthread_join() を呼び出せばいいでしょう。

フォークを並べる

テーブルは別にどうでもよく、フォークが重要です。スレッド (哲学者) の間で共有されます。

スレッド間で共有する資源は、適切に排他制御しなければなりません。

資源を更新するとき、大抵は, (1) 読み込み, (2) 計算し, (3) 書き戻す、という手順になります。あるスレッドが更新作業に入っているときに別のスレッドが同じ資源を並行して読み書きすると、一方の計算結果に他方の計算結果が上書きされてしまうことがあります。

ここでは、mutex (ミューテクス, ミューテックス) と条件変数を使ってみます。Mutexは、

  1. あるスレッドに保有されている状態と
  2. どのスレッドにも保有されていない状態、
の二つをとるオブジェクトです。別のスレッドに保有されているmutexを保有しようとすると、後のほうのスレッドはブロックされます。

pthreadでは、ミューテクスは pthread_mutex_t 型のオブジェクトです。これを資源 (フォーク) の数だけ用意します。

Linux では pthread_mutex_t 型は、/usr/include/bits/pthreadtypes.h で定義されています。どのメンバもアプリケーションからアクセスしてはいけません。が、どのようなメンバがあるか見れば、何をするものなのかイメージが湧きます。

ミューテックス関係のAPIは、下記のものがあります。

PTHREAD_MUTEX_INITIALIZER は POSIX 標準。このほか GNU 拡張で, PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP などがある。

int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* mutexattr)
ミューテックスオブジェクトを mutexattr で指定する属性オブジェクトに従って初期化する。mutexattrが NULL なら、デフォルトの属性が用いられる。

mutexattrを NULL にして初期化する代わりに、定数オブジェクト PTHREAD_MUTEX_INITIALIZER をmutex オブジェクトに代入しても同じ効果が得られる。

int pthread_mutex_lock(pthread_mutex_t* mutex)
ミューテックスを保有し、ロックする。(以下、デフォルトのミューテックス属性のとき。)

他のスレッドがすでにこのミューテックスを保有(ロック)しているときは、そちらが開放され保有できるようになるまで pthread_mutex_lock() を呼び出したスレッドをブロックする。

ミューテックスを保有しているスレッドが同じミューテックスオブジェクトを再びロックしようとしたときは、デッドロックする。

int pthread_mutex_unlock(pthread_mutex_t* mutex)
ミューテックスのロックを解放する。ミューテックスをロックしたスレッドから呼び出さなければならない。
int pthread_mutex_destroy(pthread_mutex_t* mutex)
ミューテックスオブジェクトを破壊し、関連する資源を解放する。

このほか、ブロックしない pthread_mutex_trylock(), 指定した時間だけロックできるか試す pthread_mutex_timedlock() もあります。

ミューテックスのみを使うヴァージョン

ヘッダです。共有資源に対応するようにミューテクスを宣言します。

fork.h

C++
[RAW]
  1. #define _XOPEN_SOURCE 700
  2. #include <pthread.h>
  3. class Fork
  4. {
  5. // テーブルにあるとき 1, 持ち上げると 0
  6. int volatile on_table;
  7. pthread_mutex_t mutex;
  8. public:
  9. int id;
  10. Fork(int id);
  11. ~Fork();
  12. void pickup(int threadid);
  13. void release();
  14. };
  15. extern Fork* forks[5];
  16. extern void* thread_main(void* arg);

本体です。Fork::pickup() メソッドがキモ。

fork.cpp

C++
[RAW]
  1. #include "fork.h"
  2. #include <stdio.h>
  3. using namespace std;
  4. Fork::Fork(int id_): on_table(1), mutex(PTHREAD_MUTEX_INITIALIZER), id(id_) {
  5. }
  6. Fork::~Fork() {
  7. pthread_mutex_destroy(&mutex);
  8. }
  9. // 持ち上げる
  10. void Fork::pickup(int threadid) {
  11. pthread_mutex_lock(&mutex);
  12. if (on_table == 0)
  13. printf("%d: waiting fork %d...\n", threadid, id);
  14. while (on_table == 0) {
  15. pthread_mutex_unlock(&mutex);
  16. sched_yield(); // CPU を明け渡す
  17. pthread_mutex_lock(&mutex);
  18. }
  19. on_table = 0;
  20. pthread_mutex_unlock(&mutex);
  21. }
  22. // テーブルに置く
  23. void Fork::release() {
  24. pthread_mutex_lock(&mutex);
  25. on_table = 1;
  26. pthread_mutex_unlock(&mutex);
  27. }

Fork::pickup() を見てください。共有したいオブジェクト on_table のアクセスを囲むように, pthread_mutex_lock(), pthread_mutex_unlock() します。

注意したいのは、ミューテクスは「読み取り〜更新」の排他制御で, スレッドがフォークが置かれるまで待つ、という制御ではないことです。フォークが置かれるまで待つ部分は, 行 21〜 while (on_table == 0) { ブロックです。

さらに、別スレッドが on_table を更新できるように, いったん unlock して明け渡すことにも注目してください。

条件変数と組み合わせる

条件変数 condition variable は, 名前から内容が分かりにくいですが、上記の while ブロックと同じことをおこないます。

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

  • この関数を呼び出すとき, mutex は呼出しスレッドによりロックされていなければならない。
  • この関数は, アトミックに, mutex を解放し, 条件変数 cond で呼出しスレッドをブロックします。
  • 別スレッドで pthread_cond_broadcast() or pthread_cond_signal() が呼出された場合, 改めて呼出しスレッドがミューテクスのロックを獲得し, この関数から戻ります。

条件変数は, デッドロックしないように注意が必要です。待っているスレッドが二つ以上になりうる場合は, pthread_cond_signal() を使うべきではありません。このサンプルは一つのフォークを二人の哲学者が取りあうため、暗黙に待っているスレッドが高々一つとの仮定があります。

CON38-C. Preserve thread safety and liveness when using condition variables

クラス Fork を作ります。インスタンスの状態として、テーブルにあるかを表すon_tableを持ちます。テーブルにあるときがon_table = 1です。あとは排他制御関係です。

  20| class Fork {
  21|   int volatile on_table;
  22|   pthread_mutex_t mutex;
  23|   pthread_cond_t wait;
  24| public:
  25|   Fork(): on_table(1) {
  26|     pthread_mutex_init(&mutex, NULL);
  27|     pthread_cond_init(&wait, NULL);
  28|   }
  29| 
  30|   ~Fork() {
  31|     pthread_cond_destroy(&wait);
  32|     pthread_mutex_destroy(&mutex);
  33|   }
  34| 
  35|   void pickup(int threadid) {
  36|     pthread_mutex_lock(&mutex);
  37|     if (on_table == 0) {  ●ここif不可。シグナルの割り込みでも戻る
  38|       printf("%d: waiting...\n", threadid);
  39|       pthread_cond_wait(&wait, &mutex); // 待つ
  40|     }
  41|     on_table = 0;
  42|     pthread_mutex_unlock(&mutex);
  43|   }
  44| 
  45|   void release() {
  46|     pthread_mutex_lock(&mutex);
  47|     on_table = 1;
  48|     pthread_cond_signal(&wait);  ●ロック獲得状態で呼び出すこと。待っているスレッドがないときは単に戻る。いたときも起床させてこの関数は戻る。
  49|     pthread_mutex_unlock(&mutex);  ●なので、これも必要
  50|   }
  51| };
  52| 
  53| Fork forks[5];

持ち上げようとしたフォークがテーブルにないとき(隣の哲学者が持っているとき)、pthread_cond_wait() で待ち合わせます。

その他のコード

あとはスレッドの起点であるthread_mainなどを見ておきましょう。

  55| void sleep_rand(float max) {
  56|   int sec = 1 + (int) (max * rand() / (RAND_MAX + 1.0)); // 1..max
  57|   sleep(sec);
  58| }
  59| 
  60| enum LeftOrRight {
  61|   LEFT = 0,
  62|   RIGHT
  63| };
  64| 
  65| int fork_n(int id, LeftOrRight left_or_right) {
  66|   return left_or_right == LEFT ? id - 1 : id % 5;
  67| }
  68| 
  69| Fork* get_fork(int id, LeftOrRight left_or_right) {
  70|   return &forks[fork_n(id, left_or_right)];
  71| }
  72| 
  73| void* thread_main(void* arg) {
  74|   int id = (int) arg;   // 1..5
  75| 
  76|   while (true) {
  77|     sleep_rand(5.0); // 考え中
  78|     get_fork(id, LEFT)->pickup(id);
  79|     printf("%d: taked the fork %d\n", id, fork_n(id, LEFT));
  80|     sleep_rand(3.0); // ここを長くするとデッドロックしやすくなる。
  81|     get_fork(id, RIGHT)->pickup(id);
  82|     printf("%d: taked the fork %d, and eat\n", id, fork_n(id, RIGHT));
  83|     sleep_rand(2.0);
  84| 
  85|     get_fork(id, LEFT)->release();
  86|     get_fork(id, RIGHT)->release();
  87|     printf("%d: released the fork %d, %d\n", 
  88| 	   id, fork_n(id, LEFT), fork_n(id, RIGHT));
  89|   }
  90| 
  91|   // pthread_exit()はスレッドを終了させる。thread_main()から戻ってもいい。
  92|   pthread_exit(NULL);
  93| }

以上のプログラムを実行すると、タイミングにもよりますが、しばらく見ているとデッドロックします。みんなが左のフォークを持った瞬間です。

ではどうすればいいのか。

デッドロック対策

(2007.7.10 この節追加。)

デッドロックしないようにするには、次の手段があります。

1. 資源を共有しない

今回の例で言えば、なぜフォークを共有するのか。10本用意すればスレッド間で共有不要です。

あるいは、フォークを並べておかなくても、給仕 (ウェイター) を用意して、スパゲティを振る舞っても、資源を共有せずにすみます。これは、非同期キュー(待ち行列)を用います。

2. 資源を取得する順序

複数の資源を取得(ロック)するときに、ロックを掛ける順序が違うスレッドがあるとロックします。次の順でロックするとデッドロックしません。スレッドCが3 => 1の順でロックすると、デッドロックするタイミングがあります。

資源123
スレッドA1 => 2
スレッドB 2 => 3
スレッドC1 => 3

実装するときは、ロックしたmutexを覚えておいて、あらかじめ定めておいた順序と違う場合はエラーにするようにします。(ロック階層)

3. × 全部ロックできないときは失敗

複数の資源をロックしようとするときに、(a) すべてをロックできるか、(b) 全部のロックを取り消すか、二者択一にする方法があります。

上のスレッドCが3 => 1の順でロックする場合でも、資源1をロックしようとするときにエラーにして巻き戻せばデッドロックになりません。pthreadでは、pthread_mutex_trylock() を使います。

が、この方法ではライブロック (livelock) の問題があります。哲学者がみんな同時に、左のフォークを取り、右を取ろうとして両方離し、を繰り返すタイミングがあります。どのスレッドも動いているのに、全体としてデッドロックと同じ状態に陥ります。よりたちが悪い。ですので、これは採用してはいけません。

4. ロックをタイムアウトさせる

ロックは仕方ないとして、バグによってデッドロックを起こさないように、フェイルセーフ fail-safeとして、一定時間ロックを掛けられなかったときはタイムアウトするようにする手もあります。

pthread_mutex_lock() ではなく、pthread_mutex_timedlock() を使います。