スレッドプログラミング (pthread)

(2007.6.26 新規作成。2007.7.11 更新。)

C言語でのpthreadの関数の使い方について簡単に解説します。C++だとboostとかを使ったほうがいいですが、内部でどのように動いているか見るためにprimitiveなライブラリを使います。

まず、わざとデッドロックさせてみます。ここまではすでにいろいろなサイトで解説されています。加えて、どうやってデッドロックを回避するか考えてみます。

以下は、Fedora 7 Linuxでテストしています。

題材

題材ですが、あまりにメジャーな「哲学者たちの食事」を用います。

問題を引用しておくと、

専ら考えることを生業とする5人の哲学者たちがひとつの丸い食卓に座っている。中央には大きな皿にスパゲッティが盛ってある。5つのフォークが哲学者たちの間に置いてある。また各哲学者の前には取り皿がおいてある。思考中腹がへったことに気がついた哲学者は、やおら自分の左側に置いてあるフォークを左手にもち、更に右手で右側にあったフォークをとり、二つのフォークを使って自分の皿にスパゲッティをとり、食べる。終わったら二つのフォークを元に戻し再び思考を開始する。さてここで、たまたま5人の哲学者たちが同時にスパゲッティを取ろうと自分の左側のフォークをとったらどうなるであろうか? 5人とも右側のフォークが空くまで永久に待ちつづけてしまい、飢餓状態に陥る

ヘッダファイル

まずは適切な関数を使えるように、マクロを定義します。

_XOPEN_SOURCE と_POSIX_C_SOURCE を定義します。これで IEEE 1003.1-2004 POSIX (Single UNIX Specification; SUSv3) になります。Linuxの場合、/usr/include/features.hを見ると、_XOPEN_SOURCEを定義しておけば_POSIX_C_SOURCEは自動的に定義されるようです。

_REENTRANTは移植性の観点からは定義しないほうがいいような気がします。

それからヘッダファイルをincludeします。ファイルは<pthread.h>です。

  10| #define _XOPEN_SOURCE 600
  11| // 199506L以上に設定する
  12| #define _POSIX_C_SOURCE 200112L
  13| #undef _REENTRANT
  14| 
  15| #include <pthread.h>
  16| #include <stdio.h>
  17| #include <unistd.h>
  18| #include <stdlib.h>

哲学者を用意

それぞれの哲学者にスレッドを割り当てましょう。

スレッドの生成は pthread_create(), スレッドの終了の待ち合わせはpthread_join() です。

int pthread_create(pthread_t* newthread, pthread_attr_t* attr, void* (*start_routine)(void*), void* arg);
新しいスレッドを生成し、start_routine(arg) を呼び出す。attrには新しいスレッドに適用するスレッド属性を指定する。attrはNULLでもよく、その場合はデフォルトの属性が用いられる。

スレッドの生成に成功すると、生成したスレッドの識別子を*newthreadに格納し、pthread_create() は0を返す。失敗した場合は、非0のエラーコードを返す。

新しいスレッドでは、start_routine() から抜けるか、pthread_exit() を呼び出すと終了する。

int pthread_join(pthread_t th, void** thread_return);
thで指定するスレッドの終了を待つ(ブロックする)。指定したスレッドがすでに終了していたときはすぐに戻る。

thread_returnがNULLでないときは、thのスレッドの終了コードが*thread_returnに格納される。pthread_join() は、成功すると0を返し、エラーが発生すると非0を返す。

スレッドは終了しても資源を解放しない。pthread_join() されたときに解放される。したがって、各スレッドについて、pthread_join() を1回ずつ呼び出さなければならない。

以上を踏まえて、main() を書いてみます。テーブルに哲学者を5人並べます。

(2007.7.24追記。)
ここでは pthread_create() に哲学者の番号として (i + 1) を渡しています。呼び出し元の自動変数へのポインタを渡してはいけないことに注意してください。生成されたスレッドがオブジェクトにアクセスしようとしたときに、呼び出し元のスレッドでは、すでにその自動変数のスコープから抜けている可能性があります。

  97| int main() {
  98|   pthread_t threadid[5];
  99|   int i;
 100| 
 101|   srand(time(NULL));
 102| 
 103|   for (i = 0; i < 5; i++) {
 104|     // pthread_create()はスレッドを生成する。スレッド生成に成功すると、threadidを
 105|     // 設定する。
 106|     if (pthread_create(&threadid[i], NULL, thread_main, (void*) (i + 1))) {
 107|       printf("pthread_create() error\n");
 108|       return 1;
 109|     }
 110|   }
 111| 
 112|   for (i = 0; i < 5; i++) {
 113|     void* ret = NULL;
 114|     // pthread_join()は、スレッドが終了するまでブロックする。
 115|     // 指定したスレッドがすでに終了していれば何もしない。
 116|     if (pthread_join(threadid[i], &ret)) {
 117|       printf("pthread_join() error.\n");
 118|       return 1;
 119|     }
 120|   }
 121| 
 122|   return 0;
 123| }

しかし、実際的な問題では、ブロックしないように、スレッドが終了してから pthread_join() を呼ぶべきです。

よくあるシチュエーションは、GUIを持つ主スレッド(メインスレッド)から、裏で仕事をするワーカスレッドを起動します。ワーカースレッドは、自分が終了するときにイベントなり何なりをメインスレッドに投げ、メインスレッドではそのイベントをトリガとしてpthread_join() を呼び出せばいいでしょう。

フォークを並べる

テーブルはどうでもよくって、フォークが重要です。スレッド(哲学者)の間で共有されます。

スレッド間で共有する資源は、適切に排他制御しなければなりません。

資源を更新するとき、大抵は、(1) 読み込み、(2) 計算し、(3) 書き戻す、という手順になります。あるスレッドが更新作業に入っているときに別のスレッドが同じ資源を並行して読み書きすると、一方の計算結果に他方の計算結果が上書きされてしまうことがあります。

ここでは、mutex (ミューテクス, ミューテックス) と条件変数を使ってみます。mutexは、

  1. あるスレッドに保有されている状態と
  2. どのスレッドにも保有されていない状態、
の二つをとるオブジェクトです。別のスレッドに保有されているmutexを保有しようとすると、後のほうのスレッドはブロックされます。

pthreadでは、ミューテクスはpthread_mutex_t型のオブジェクトです。これを資源(フォーク)の数だけ用意します。

pthread_mutex_t は、/usr/include/bits/pthreadtypes.h で定義されています。どのメンバもアプリケーションからアクセスしてはいけません。が、どのようなメンバがあるか見れば、何をするものなのかイメージが湧きます。

ミューテックス関係のAPIは、下記のものがあります。

int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* mutexattr);
ミューテックスオブジェクトをmutexattrで指定する属性オブジェクトに従って初期化する。mutexattrがNULLなら、デフォルトの属性が用いられる。

mutexattrをNULLにして初期化する代わりに、定数オブジェクト PTHREAD_MUTEX_INITIALIZER をmutex オブジェクトに代入しても同じ効果が得られる。

int pthread_mutex_lock(pthread_mutex_t* mutex);
ミューテックスを保有し、ロックする。(以下、デフォルトのミューテックス属性のとき。)

他のスレッドがすでにこのミューテックスを保有(ロック)しているときは、そちらが開放され保有できるようになるまでpthread_mutex_lock() を呼び出したスレッドをブロックする。

ミューテックスを保有しているスレッドが同じミューテックスオブジェクトを再びロックしようとしたときは、デッドロックする。

int pthread_mutex_unlock(pthread_mutex_t* mutex);
ミューテックスのロックを解放する。ミューテックスをロックしたスレッドから呼び出さなければならない。
int pthread_mutex_destroy(pthread_mutex_t* mutex);
ミューテックスオブジェクトを破壊し、関連する資源を解放する。

このほか、ブロックしないpthread_mutex_trylock(), 指定した時間だけロックできるか試すpthread_mutex_timedlock() もあります。

●TODO:条件変数の説明

クラスForkを作ります。インスタンスの状態として、テーブルにあるかを表すon_tableを持ちます。テーブルにあるときがon_table = 1です。あとは排他制御関係です。

  20| class Fork {
  21|   int volatile on_table;
  22|   pthread_mutex_t mutex;
  23|   pthread_cond_t wait;
  24| public:
  25|   Fork(): on_table(1) {
  26|     pthread_mutex_init(&mutex, NULL);
  27|     pthread_cond_init(&wait, NULL);
  28|   }
  29| 
  30|   ~Fork() {
  31|     pthread_cond_destroy(&wait);
  32|     pthread_mutex_destroy(&mutex);
  33|   }
  34| 
  35|   void pickup(int threadid) {
  36|     pthread_mutex_lock(&mutex);
  37|     if (on_table == 0) {
  38|       printf("%d: waiting...\n", threadid);
  39|       pthread_cond_wait(&wait, &mutex); // 待つ -> デッドロック
  40|     }
  41|     on_table = 0;
  42|     pthread_mutex_unlock(&mutex);
  43|   }
  44| 
  45|   void release() {
  46|     pthread_mutex_lock(&mutex);
  47|     on_table = 1;
  48|     pthread_cond_signal(&wait);
  49|     pthread_mutex_unlock(&mutex);
  50|   }
  51| };
  52| 
  53| Fork forks[5];

持ち上げようとしたフォークがテーブルにないとき(隣の哲学者が持っているとき)、pthread_cond_wait() で待ち合わせます。

あとはスレッドの起点であるthread_mainなどを見ておきましょう。

  55| void sleep_rand(float max) {
  56|   int sec = 1 + (int) (max * rand() / (RAND_MAX + 1.0)); // 1..max
  57|   sleep(sec);
  58| }
  59| 
  60| enum LeftOrRight {
  61|   LEFT = 0,
  62|   RIGHT
  63| };
  64| 
  65| int fork_n(int id, LeftOrRight left_or_right) {
  66|   return left_or_right == LEFT ? id - 1 : id % 5;
  67| }
  68| 
  69| Fork* get_fork(int id, LeftOrRight left_or_right) {
  70|   return &forks[fork_n(id, left_or_right)];
  71| }
  72| 
  73| void* thread_main(void* arg) {
  74|   int id = (int) arg;   // 1..5
  75| 
  76|   while (true) {
  77|     sleep_rand(5.0); // 考え中
  78|     get_fork(id, LEFT)->pickup(id);
  79|     printf("%d: taked the fork %d\n", id, fork_n(id, LEFT));
  80|     sleep_rand(3.0); // ここを長くするとデッドロックしやすくなる。
  81|     get_fork(id, RIGHT)->pickup(id);
  82|     printf("%d: taked the fork %d, and eat\n", id, fork_n(id, RIGHT));
  83|     sleep_rand(2.0);
  84| 
  85|     get_fork(id, LEFT)->release();
  86|     get_fork(id, RIGHT)->release();
  87|     printf("%d: released the fork %d, %d\n", 
  88| 	   id, fork_n(id, LEFT), fork_n(id, RIGHT));
  89|   }
  90| 
  91|   // pthread_exit()はスレッドを終了させる。thread_main()から戻ってもいい。
  92|   pthread_exit(NULL);
  93| }

以上のプログラムを実行すると、タイミングにもよりますが、しばらく見ているとデッドロックします。みんなが左のフォークを持った瞬間です。

ではどうすればいいのか。

デッドロック対策

(2007.7.10 この節追加。)

デッドロックしないようにするには、次の手段があります。

1. 資源を共有しない

今回の例で言えば、なぜフォークを共有するのか。10本用意すればスレッド間で共有不要です。

あるいは、フォークを並べておかなくても、給仕 (ウェイター) を用意して、スパゲティを振る舞っても、資源を共有せずにすみます。これは、非同期キュー(待ち行列)を用います。

2. 資源を取得する順序

複数の資源を取得(ロック)するときに、ロックを掛ける順序が違うスレッドがあるとロックします。次の順でロックするとデッドロックしません。スレッドCが3 => 1の順でロックすると、デッドロックするタイミングがあります。

資源123
スレッドA1 => 2
スレッドB 2 => 3
スレッドC1 => 3

実装するときは、ロックしたmutexを覚えておいて、あらかじめ定めておいた順序と違う場合はエラーにするようにします。(ロック階層)

3. 全部ロックできないときは失敗

複数の資源をロックしようとするときに、(a) すべてをロックできるか、(b) 全部のロックを取り消すか、二者択一にする方法があります。

上のスレッドCが3 => 1の順でロックする場合でも、資源1をロックしようとするときにエラーにして巻き戻せばデッドロックになりません。pthreadでは、pthread_mutex_trylock() を使います。

が、この方法ではライブロック (livelock) の問題があります。哲学者がみんな同時に、左のフォークを取り、右を取ろうとして両方離し、を繰り返すタイミングがあります。どのスレッドも動いているのに、全体としてデッドロックと同じ状態に陥ります。よりたちが悪い。ですので、これは採用してはいけません。

4. ロックをタイムアウトさせる

ロックは仕方ないとして、バグによってデッドロックを起こさないように、フェイルセーフ fail-safeとして、一定時間ロックを掛けられなかったときはタイムアウトするようにする手もあります。

pthread_mutex_lock() ではなく、pthread_mutex_timedlock() を使います。