マルチスレッド・プログラミング(1)

並行システム

                               システム情報工学研究科コンピュータサイエンス専攻、電子・情報工学系
                               新城 靖
                               <yas@is.tsukuba.ac.jp>

このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/sie/
http://www.cs.tsukuba.ac.jp/~yas/

■スレッド・プログラミング

◆スレッドとは

スレッド(thread) あるいは、 軽量プロセス(lightweight processes) とは、 1つの保護の単位としての プロセス(タスク,あるいは,アドレス空間) 内部にふくまれている平行処理(論理的な並列処理)の単位。

シングルスレッドのプログラム
1度に1つの手続き(Cの関数)しか動かない。
マルチスレッドのプログラム
1度にスレッドの数だけの手続きが論理的には同時に動く。 (同期でブロックされているものも含む)

図? シングルスレッドのプロセスとマルチスレッドのプロセス

図? シングルスレッドのプロセスとマルチスレッドのプロセス

軽量プロセスというと、内部にループを含むような語感がある。

◆スレッドの利用目的

◆本当にスレッドが必要か

John K. Ousterhout, "Why Threads Are A Bad Idea (for most purposes)", Invited Talk at the 1996 USENIX Technical Conference (January 25, 1996). [PDF] [PowerPoint]

マルチスレッドプログラミングは、非常に難しい。

シングルスレッドのイベント駆動で書けるなら、その方がいい。 GUI、分散など。

どこでスレッドを使うべきか。

◆スレッドの内容

個々のスレッドごとに持つもの プロセス全体で共有

◆スレッドの操作

◆Unix のユーザ・レベルの(重たい)プロセス

Unixカーネル内の並行処理

PDP-11 時代からのモデル

■Concurrent Pascal

逐次型プログラミング言語 Pascal を拡張したもの。

◆参考文献

Per Brinch Hansen: "The Architecture of Concurrent Program", Prentice Hall (1977).

Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).

◆プロセス

type procA_t = process(引数・・・);
   var
      局所データの宣言
   procedure proc1(引数・・・);
   procedure proc2(引数・・・);
begin
   cycle
       ・・・
   end;
end

var procA1 : procA_t ;
init procA1(引数);

◆モニタ

type monA_t = monitor(引数・・・);
   var
      局所データの宣言
   procedure entry proc1(引数・・・);
   procedure entry proc2(引数・・・);
begin
   ローカルデータの初期化;
end

var monA1 : monA_t ;
init monA1(引数);;

◆条件変数とキュー

  cv1 : condition;
  cv1.wait;      呼び出したプロセスを待たせる。
  cv1.signal;   待っているプロセスがいれば全て再開させる。
  q1 : queue;
  delay(q1);      呼び出したプロセスをそのキューで待たせる。
  continue(q1);   そのキューで待っているプロセスがいれば1つだけ再開させる。

◆有限バッファ

Unix のパイプのようなことを Concurrent Pascal のプロセス(スレッド)を使って実行したい。
producer | consumer
2つのスレッドの間には、バッファを置く。

図? 環状バッファ(有限バッファ)、生産者、消費者

図? 環状バッファ(有限バッファ)、生産者、消費者"

バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。

手続き

   1: const BUFFER_SIZE = 4;
   2: type circular_buffer =
   3: monitor
   4: var
   5:     rp : integer ;
   6:     rp : integer ;
   7:     data: array [0..BUFFER_SIZE-1] of integer;
   8:     used: integer;
   9:     not_empty : condition;
  10:     not_full  : condition;
  11: 
  12:     procedure entry put(x:integer);
  13:     begin
  14:         while( used = BUFFER_SIZE ) do
  15:             non_full.wait;
  16:         data[wp] := x;
  17:         wp := wp + 1 ;
  18:         if( wp >= BUFFER_SIZE )
  19:             wp := 0 ;
  20:         used := used + 1 ;
  21:         not_empty.signal;
  22:     end
  23: 
  24:     procedure entry get(result x:integer);
  25:     begin
  26:         while( used = 0 ) then
  27:             not_empty.wait;
  28:         x := data[rp];
  29:         rp := rp + 1 ;
  30:         if( rp >= BUFFER_SIZE )
  31:             rp := 0 ;
  32:         used := used - 1 ;
  33:         not_full.signal;
  34:     end
  35: begin
  36:     rp := 0 ;
  37:     wp := 0 ;
  38:     used := 0 ;
  39: end;
  40: 
  41: ...
  42: var buf : circular_buffer ;
  43: init buf;
  44: ...
  45: 

■Pthread

Pthread は、POSIX 1003.1c-1995という標準に準拠したスレッド・ライブラリ。 POSIX Thread とも呼ばれる。

◆Pthreadを利用したプログラムのコンパイル

Pthread を利用したプログラムを書く時には、次のようなヘッダ・ファイルを 読み込む。

#include <pthread.h>

Solaris (Unix International系)でのコンパイルとリンク。 -D_REENTRANT-lpthread を付ける。

% cc -D_REENTRANT -o create create.c -lpthread [←]
% ./create [←]
...
% []
セマフォを使う時、Solaris 6 (SunOS 5.6) では、リンク時に -lposix4 オプションを付ける。Solaris 7-10 (SunOS 5.7, 5.8, 5.9, 5.10) では、リンク時に -lrt オプションを付ける。

Linux, MacOSX, でのコンパイルとリンク。-lpthread を付ける。

% cc -o create create.c -lpthread [←]
% ./create [←]
...
% []

■スレッドの生成・消滅

スレッドは、普通のプログラムの、 サブルーチン(C言語の関数、手続き) に近い。 サブルーチンの場合,呼び出すと、呼び出された方が動き、自分自身は,止ま る。スレッドでは,新たにスレッドを生成した場合,生成した方と生成さ れた方は,論理的には2つとも同時に動く。

■fork-joinモデル

図? fork-joinモデルの実現

図? fork-joinモデルの実現

  1. 逐次処理(スレッド/プロセスが1つ)の状態から始まる
  2. 並列性が必要になった時、fork命令で複数のスレッド/プロセスに分か れて並列処理を行う。
  3. 並列に動作できる部分が終ると join 命令で再び逐次処理に戻る。

◆Unixのfork

fork() システムコールでコピーが作られる。 join の代わりに、子どもは exit()、親は wait()。

◆Pthreadはcreate

Pthread では、コピーではなく create で新たにスレッドを作る。同じ関数を 実行したい時には、直接 call する。(別の関数を実行するなら呼ばなくても よい。) 子スレッドでは、pthread_exit() (トップの 手続きからリターン)、 親は、pthread_join() する。

後で join する必要がない時には、pthread_detach() を使って切り離す。 (joinしなくてもゾンビが残らない。)

◆スレッドの生成とjoin

   1: 
   2: /*
   3:         create-join-2.c -- スレッドを2つ作るプログラム
   4: */
   5: 
   6: #include <stdio.h> /* printf() */
   7: #include <pthread.h>
   8: 
   9: void func1( int x );
  10: void func2( int x );
  11: 
  12: main()
  13: {
  14:     pthread_t t1 ;
  15:     pthread_t t2 ;
  16:         printf("main()\n");
  17:         pthread_create( &t1, NULL, (void *)func1, (void *)10 );
  18:         pthread_create( &t2, NULL, (void *)func2, (void *)20 );
  19:         pthread_join( t1, NULL );
  20:         pthread_join( t2, NULL );
  21: }
  22: 
  23: void func1( int x )
  24: {
  25:     int i ;
  26:         for( i = 0 ; i<3 ; i++ )
  27:         {
  28:             printf("func1( %d ): %d \n",x, i );
  29:         }
  30: }
  31: 
  32: void func2( int x )
  33: {
  34:     int i ;
  35:         for( i = 0 ; i<3 ; i++ )
  36:         {
  37:             printf("func2( %d ): %d \n",x, i );
  38:         }
  39: }
実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/create-join-2.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/Makefile [←]
% make create-join-2 [←]
gcc    -c -o create-join-2.o create-join-2.c
gcc create-join-2.o -lpthread -o create-join-2
% ./create-join-2  [←]
main()
func1( 10 ): 0 
func2( 20 ): 0 
func1( 10 ): 1 
func2( 20 ): 1 
func1( 10 ): 2 
func2( 20 ): 2 
% ./create-join-2 [←]
main()
func1( 10 ): 0 
func1( 10 ): 1 
func1( 10 ): 2 
func2( 20 ): 0 
func2( 20 ): 1 
func2( 20 ): 2 
% ./create-join-2 [←]
main()
func1( 10 ): 0 
func1( 10 ): 1 
func1( 10 ): 2 
func2( 20 ): 0 
func2( 20 ): 1 
func2( 20 ): 2 
% []
この例では、次の3つのスレッドが作られる。
  1. main を実行しているスレッド
  2. func1 から作られたスレッド t1
  3. func2 から作られたスレッド t2
マルチスレッド・プログラミングでは、main関数もまた1つのスレッドが実行 していると考える。これを 初期スレッド 、あるいは、 メインスレッド とよぶ。Pthrad では、 メインスレッド以外のスレッドは、 pthread_create() により作られる。

どういう順序で実行されるかは、決まっていない。 決まっていない。スレッドは、もともと順番を決めないような処理、 非同期的(asynchronous) な処理を表現するためのもの。どうしても他のスレッドと同期を行なう必要が 出てきた時には、mutex や条件変数といった同期機能を使う。

pthread_create()で指定された関数からリターンすると、そ のスレッドが終了する。pthread_exit() を呼び出してもよい。 ただし、 初期スレッド が終了すると、プロセス全体が終了する。 exit() システムコールを呼び出しても終了する。

■mutexによるスレッド間の同期

◆共有資源

複数のプロセス/スレッドで共有しなければならないもの。

プロセスの場合、ファイル、端末、ウインドウ、主記憶、CPU時間。

主記憶やCPU時間といった資源は、横取りしても平気なので、あまり問題になりない。

スレッドの場合は、プログラミング言語の変数。

変更されない変数の値を読むだけなら、特に問題は起きない。それ意外の時、 特に、複数のスレッドで値を読み書きする時に問題が起きる。

◆複数のスレッドによる共有資源のアクセス(ロックなし)

   1: 
   2: /*
   3:  * mutex-nolock.c -- 共有資源をロックなしでアクセスするプログラム
   4:  */
   5: 
   6: #include <stdio.h>
   7: #include <pthread.h>
   8: 
   9: void thread_A(), thread_B();
  10: int     shared_resource ;
  11: 
  12: main() {
  13:     pthread_t t1 ;
  14:     pthread_t t2 ;
  15:         shared_resource = 0 ;
  16:         pthread_setconcurrency( 2 );
  17:         pthread_create( &t1, NULL, (void *)thread_A, 0 );
  18:         pthread_create( &t2, NULL, (void *)thread_B, 0 );
  19:         pthread_join( t1, NULL );
  20:         pthread_join( t2, NULL );
  21:         printf("main(): shared_resource == %d\n", shared_resource );
  22: }
  23: 
  24: void thread_A()
  25: {
  26:     int i, x ;
  27:         for( i = 0 ; i<1000000 ; i++ )
  28:         {
  29:             x = shared_resource ;
  30:             x = x + 1 ;
  31:             shared_resource = x ;
  32:         }
  33: }
  34: 
  35: void thread_B() {
  36:     int i, x ;
  37:         for( i = 0 ; i<1000000 ; i++ ) {
  38:             x = shared_resource ;
  39:             x = x + 1 ;
  40:             shared_resource = x ;
  41:         }
  42: }

pthread_setconcurrency() は、利用したい CPU 数をシステムに要求するもの。 (古い Solaris では、thr_setconcurrency() を使う。)

◆実行結果(ロックなし)

共有メモリ型マルチプロセッサや Solaris, MacOSX での実行結果。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/mutex-nolock.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/Makefile [←]
% make mutex-nolock [←]
gcc    -c -o mutex-nolock.o mutex-nolock.c
gcc mutex-nolock.o -lpthread -o mutex-nolock
% ./mutex-nolock  [←]
main(): shared_resource == 1014838
% ./mutex-nolock [←]
main(): shared_resource == 1062224
% ./mutex-nolock [←]
main(): shared_resource == 1096882
% ./mutex-nolock [←]
main(): shared_resource == 1091488
% ./mutex-nolock [←]
main(): shared_resource == 1096291
% []
thread_A(), thread_B()と2つのスレッドが作 られている。整数型の変数 shared_resource は、その2つの スレッドの両方からアクセスされる。2つのスレッドで合計2000000 増える予 定だが、実際に実行すると、そうはならない。実行する度に答えが違う。

単一プロセッサの Linux 等では、何度か実行しても常に 2000000 だけ増える ことがある。

◆考察(ロックなし)

このプログラムが共有メモリ型マルチプロセッサで動いているとして、動きを 考えてえる。

thread_A()			thread_B()
	:				:
	:				:
29: x = shared_resource ;
				38: x = shared_resource ;
30: x = x + 1 ;
				39: x = x + 1 ;
31: shared_resource = x ;
				40: shared_resource = x ;
	:				:
	:				:
shared_resource++と書いてもだめ。複数の機械語命令に分割 されるので。

◆相互排除

相互排除(mutual exclusion): ある資源をアクセスできるスレッドの数を 多くても1つにする。

プログラムの字面上、相互排除が必要な部分を 際どい部分(critical section) ( クリティカルセクション ) という。

◆Pthreadでの相互排除

Pthread では、相互排除を行なうために、 mutex という仕組みがある。次のようにして、相互排除を行ないたい部分を lock と unlock で囲む。

    pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
	:
	:
    pthread_mutex_lock( &murex1 );
	<相互排除したい部分(際どい部分)>
    pthread_mutex_unlock( &mutex1 );

◆複数のスレッドによる共有資源のアクセス(ロック付き)

ロックなしのプログラムを、mutex を使っ て書き直したもの。
   1: /*
   2:  * mutex-lock.c -- 共有資源をロックしながらアクセスするプログラム
   3:  */
   4: 
   5: #include <stdio.h>
   6: #include <pthread.h>
   7: 
   8: void thread_A(), thread_B();
   9: int     shared_resource ;
  10: pthread_mutex_t mutex1 ;
  11: 
  12: main() {
  13:     pthread_t t1 ;
  14:     pthread_t t2 ;
  15:         shared_resource = 0 ;
  16:         pthread_mutex_init( &mutex1, NULL );
  17:         pthread_setconcurrency( 2 );
  18:         pthread_create( &t1, NULL, (void *)thread_A, 0 );
  19:         pthread_create( &t2, NULL, (void *)thread_B, 0 );
  20:         pthread_join( t1, NULL );
  21:         pthread_join( t2, NULL );
  22:         printf("main(): shared_resource == %d\n", shared_resource );
  23: }
  24: 
  25: void thread_A()
  26: {
  27:     int i, x ;
  28:         for( i = 0 ; i<1000000 ; i++ )
  29:         {
  30:             pthread_mutex_lock( &mutex1 );
  31:             x = shared_resource ;
  32:             x = x + 1 ;
  33:             shared_resource = x ;
  34:             pthread_mutex_unlock( &mutex1 );
  35:         }
  36: }
  37: 
  38: void thread_B() {
  39:     int i, x ;
  40:         for( i = 0 ; i<1000000 ; i++ ) {
  41:             pthread_mutex_lock( &mutex1 );
  42:             x = shared_resource ;
  43:             x = x + 1 ;
  44:             shared_resource = x ;
  45:             pthread_mutex_unlock( &mutex1 );
  46:         }
  47: }

◆実行結果(ロック付き)

% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/mutex-lock.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/Makefile [←]
% make mutex-lock [←]
gcc    -c -o mutex-lock.o mutex-lock.c
gcc mutex-lock.o -lpthread -o mutex-lock
% ./mutex-lock [←]
main(): shared_resource == 2000000
% ./mutex-lock [←]
main(): shared_resource == 2000000
% ./mutex-lock [←]
main(): shared_resource == 2000000
% []

◆考察(ロック付き)

このプログラムが共有メモリ型マルチプロセッサで動いているとして、動きを 考えてえる。
thread_A()				thread_B()

	:					:
30: pthread_mutex_lock( &mutex1 );		:
					41: pthread_mutex_lock( &mutex1 );
31: x = shared_resource ;		<他のスレッドが実行中なので
32: x = x + 1 ;				 この状態でしばらく待たされる>
33: shared_resource = x ;			:
34: pthread_mutex_unlock( &mutex1 );		:
	:				<実行再開>
	:				42: x = shared_resource ;
					43: x = x + 1 ;
					44: shared_resource = x ;
					45: pthread_mutex_unlock( &mutex1 );
後から来た thread_B() は、他のスレッドが実行中は、待たさ れる。

■条件変数によるスレッド間の同期

スレッドでプログラムを作っていると、あるスレッドが別のスレッドの仕事の 完了を待つ必要が出がある。

◆パイプと循環バッファ

Unix のパイプのようなことをスレッドを使って実行したい。
thread_A | thread_B
2つのスレッドの間には、バッファを置く。

図? 環状バッファ、生産者スレッド、消費者スレッド

図? 環状バッファ、生産者スレッド、消費者スレッド

バッファが空の時、thread_B() は、thread_A() が何かデー タをバッファに入れるのを待つ。バッファがいっぱいの時、thread_A() は、thread_B() がバッファから何かデータを取り出すのを待つ。

◆条件変数

条件変数(condition variable) で、ある条件が生じたことを待つ。

条件変数の操作:

wait
ある条件が満たされるまで待つ
signal
ある条件が満たされたことを伝える。待っているスレッドが1つだけ起き上がる。
broadcast
ある条件が満たされたことを伝える。待っているスレッドが全て起き上がる。

◆条件変数を使った環状バッファ

   1: 
   2: /*
   3:  * condv-buffer.c -- 条件変数を使った環状バッファ
   4:  */
   5: 
   6: #include <stdio.h>      /* printf() */
   7: #include <stdlib.h>     /* malloc(), exit() */
   8: #include <pthread.h>
   9: 
  10: void thread_A(), thread_B();
  11: 
  12: #define BUFFER_SIZE     4               /* バッファの大きさ */
  13: struct circular_buffer
  14: {
  15:         int rp ;                        /* 読み出す位置 */
  16:         int wp ;                        /* 書き込む位置 */
  17:         int used ;                      /* バッファ内の要素数 */
  18:         int data[BUFFER_SIZE];          /* データを保存する場所 */
  19:         pthread_mutex_t mutex ;         /* この構造体の相互排除のための mutex */
  20:         pthread_cond_t  not_full ;      /* バッファが一杯ではない状態を待つための条件変数 */
  21:         pthread_cond_t  not_empty ;     /* バッファが空ではない状態を待つための条件変数 */
  22: };
  23: 
  24: void put( struct circular_buffer *b,int x )
  25: {
  26:         pthread_mutex_lock( &b->mutex );
  27: loop:   if( b->used == BUFFER_SIZE )
  28:         {
  29:             pthread_cond_wait( &b->not_full,&b->mutex );
  30:             goto loop;
  31:         }
  32:         b->data[ b->wp++ ] = x ;
  33:         if( b->wp >= BUFFER_SIZE )
  34:             b->wp = 0 ;
  35:         b->used ++ ;
  36:         pthread_cond_signal( &b->not_empty );
  37:         pthread_mutex_unlock( &b->mutex );
  38: }
  39: 
  40: int get( struct circular_buffer *b )
  41: {
  42:     int x ;
  43:         pthread_mutex_lock( &b->mutex );
  44: loop:   if( b->used == 0 )
  45:         {
  46:             pthread_cond_wait( &b->not_empty,&b->mutex );
  47:             goto loop;
  48:         }
  49:         x = b->data[ b->rp++ ] ;
  50:         if( b->rp >= BUFFER_SIZE )
  51:             b->rp = 0 ;
  52:         b->used -- ;
  53:         pthread_cond_signal( &b->not_full );
  54:         pthread_mutex_unlock( &b->mutex );
  55:         return( x );
  56: }
  57: 
  58: main()
  59: {
  60:     pthread_t t1 ;
  61:     pthread_t t2 ;
  62:     struct circular_buffer *b  ;
  63:         b = (struct circular_buffer *)malloc(sizeof(struct circular_buffer));
  64:         if( b == NULL )
  65:         {
  66:             perror("no memory for struct buffer\n");
  67:             exit( -1 );
  68:         }
  69:         b->rp = 0 ;
  70:         b->wp = 0 ;
  71:         b->used = 0 ;
  72:         pthread_mutex_init( &b->mutex, NULL );
  73:         pthread_cond_init( &b->not_full,NULL );
  74:         pthread_cond_init( &b->not_empty,NULL );
  75:         pthread_setconcurrency( 2 );
  76:         pthread_create( &t1, NULL, (void *)thread_A, (void *)b );
  77:         pthread_create( &t2, NULL, (void *)thread_B, (void *)b );
  78:         pthread_join( t1, NULL );
  79:         pthread_join( t2, NULL );
  80: }
  81: 
  82: void thread_A( struct circular_buffer *b )      /* producer */
  83: {
  84:     int i,x ;
  85:         for( i = 0 ; i<10 ; i++ )
  86:         {
  87:             x = i ;
  88:             printf("thread_A(): put( %d )\n",x );
  89:             put( b,x );
  90:         }
  91: }
  92: 
  93: void thread_B( struct circular_buffer *b )      /* consumer */
  94: {
  95:     int i, x ;
  96:         for( i = 0 ; i<10 ; i++ )
  97:         {
  98:             x = get( b );
  99:             printf("thread_B(): get() %d.\n", x );
 100:         }
 101: }

put() は、バッファにデータを追加する時に使う手続き。

基本的には、入口で pthread_mutex_lock() し、 出口で pthread_mutex_unlock() する。

バッファが一杯の時には、条件変数b->not_full で、 一杯でないという条件になるまで待つ。

待っている間は、mutex のロックは解除される。

pthread_cond_wait() からリターンして来る時には、もう一度 ロックされた状態に戻るが、待っている間に、他の変数 (rp,wp,data)が書き換えられている可能性があるので、もう一 度最初から調べる。

get() は、バッファからデータを取り出す時に使う手 続き。put()とほぼ対称形。バッファが空の時に、wait し、バッファがもはや一杯ではないことをsignal する。

thread_A() は、10回バッファにデータを書き込むスレッド。 thread_B() は逆に、10回バッファからデータを読み出すス レッド。

◆実行結果

% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/condv-buffer.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/Makefile [←]
% make condv-buffer [←]
gcc    -c -o condv-buffer.o condv-buffer.c
gcc condv-buffer.o -lpthread -o condv-buffer
% ./condv-buffer  [←]
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_A(): put( 2 )
thread_A(): put( 3 )
thread_A(): put( 4 )
thread_B(): get() 0.
thread_B(): get() 1.
thread_B(): get() 2.
thread_B(): get() 3.
thread_A(): put( 5 )
thread_A(): put( 6 )
thread_A(): put( 7 )
thread_A(): put( 8 )
thread_B(): get() 4.
thread_B(): get() 5.
thread_B(): get() 6.
thread_B(): get() 7.
thread_A(): put( 9 )
thread_B(): get() 8.
thread_B(): get() 9.
% []
複数のスレッドが同時に動いている。バッファにためられるのは、最大4なの に、put() が 5 回連続して成功しているように見える。printf() の順番と put(), get() の順番は違うことがある。

◆Concurrent Pascal と Pthread の比較

共通点 相違点

◆種村流

    pthread_mutex_lock( &mutex );
    while( 1 )
    {
        if( 条件 )
            pthread_cond_wait( &cv, &mutex );
	処理;
    }

◆signalかbroadcastか

バッファに要素を「1つずつ」追加しているので、 pthread_cond_signal() でもよい。 pthread_cond_broadcast() に変えても動くようにプログラムを 作る。

pthread_cond_wait() で待っている間に条件が変わっているかもしれないので、 最初から調べ直す。signal で1人だけしか起き上がらないと仮定してはいけ ない。

「1つずつ」ではなく、複数個同時に読み書きする時には、 pthread_cond_broadcast() でないとだめ。

迷った時には、pthread_cond_broadcast()

◆ダブルバッファリング

整数を1つバッファに書き込むだけでロック/アンロックを行なっていると、 実際の並列処理では重たい。ロックの回数を減らすために、ダブルバッファリ ングと呼ばれる技術がよく使われる。読み手と書き手で別々にバッファをもう け、1つのバッファの処理をしている間は、ロックを行なわない。

■再帰的 mutex

1つのスレッドで1つの mutex を複数回ロックしたい。

開いたモジュールでは一度外にだたスレッドがもう一度入ってくる。

図? 開いたモジュールと閉じたモジュール

◆標準mutexでのデッドロック

export している関数の入口で lock, 出口 unlockを入れて、 スレッド・セーフなモジュールを作りたい。 export している関数が、他の export している関数を呼び出すと、デッドロッ クになる。
   1: /*
   2:  * mutex-reclock-normal.c -- 通常の mutex を使う例(デッドロック)
   3:  */
   4: 
   5: #include <stdio.h>      /* printf() */
   6: #include <pthread.h>
   7: 
   8: void thread_A(), thread_B();
   9: int     shared_resource ;
  10: pthread_mutex_t mutex1 ;
  11: 
  12: deposit( int n )
  13: {
  14:         pthread_mutex_lock( &mutex1 );
  15:         shared_resource += n ;
  16:         pthread_mutex_unlock( &mutex1 );
  17: }
  18: 
  19: add_interest()
  20: {
  21:     int i ;
  22:         pthread_mutex_lock( &mutex1 );
  23:         i = shared_resource * 0.05 ;
  24:         deposit( i );
  25:         pthread_mutex_unlock( &mutex1 );
  26: }
  27: 
  28: main() {
  29:     pthread_t t1 ;
  30:     pthread_t t2 ;
  31:         shared_resource = 1000000 ;
  32:         pthread_mutex_init( &mutex1, NULL );
  33: 
  34:         pthread_create( &t1, NULL, (void *)thread_A, 0 );
  35:         pthread_create( &t2, NULL, (void *)thread_B, 0 );
  36:         pthread_join( t1, NULL );
  37:         pthread_join( t2, NULL );
  38:         printf("main(): shared_resource == %d\n", shared_resource );
  39: }
  40: 
  41: void thread_A()
  42: {
  43:         printf("thread_A(): deposit( 10000 ) ... \n");
  44:         deposit( 10000 );       
  45:         printf("thread_A(): deposit( 10000 ) done. \n");
  46: }
  47: 
  48: void thread_B()
  49: {
  50:         printf("thread_B(): add_interest() ... \n");
  51:         add_interest();
  52:         printf("thread_B(): add_interest() done. \n");
  53: }
実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/mutex-reclock-normal.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/Makefile [←]
% make mutex-reclock-normal [←]
gcc    -c -o mutex-reclock-normal.o mutex-reclock-normal.c
gcc mutex-reclock-normal.o -lpthread -o mutex-reclock-normal
% ./mutex-reclock-normal  [←]
thread_A(): deposit( 10000 ) ... 
thread_B(): add_interest() ... 
thread_A(): deposit( 10000 ) done. 
^C (強制終了)
% []
% []

注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義でコンパイルできない場 合、次のフラグを付けてみてください。

% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c [←]

◆再帰的mutex

   1: /*
   2:  * mutex-reclock-recursive.c -- 再帰的 mutex を使う例
   3:  */
...
  28: static int
  29: my_pthread_mutex_init_recursive( pthread_mutex_t *mutex )
  30: {
  31:     pthread_mutexattr_t attr ;
  32:     int err ;
  33:         if( (err=pthread_mutexattr_init( &attr )) < 0 )
  34:             return( 0 );
  35:         if( (err=pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE)) <0 )
  36:             return( 0 );
  37:         err = pthread_mutex_init( mutex,&attr );
  38:         return( err );
  39: }
  40: 
  41: main()
  42: {
  43:     pthread_t t1 ;
  44:     pthread_t t2 ;
  45:         shared_resource = 1000000 ;
  46:         my_pthread_mutex_init_recursive( &mutex1 );
  47: 
  48:         pthread_create( &t1, NULL, (void *)thread_A, 0 );
  49:         pthread_create( &t2, NULL, (void *)thread_B, 0 );
  50:         pthread_join( t1, NULL );
  51:         pthread_join( t2, NULL );
  52:         printf("main(): shared_resource == %d\n", shared_resource );
  53: }
実行例。deposit() と add_interest() のタイミングによっては、最終結果は 違うことがある。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/mutex-reclock-recursive.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/Makefile [←]
% make mutex-reclock-recursive [←]
gcc    -c -o mutex-reclock-recursive.o mutex-reclock-recursive.c
gcc mutex-reclock-recursive.o -lpthread -o mutex-reclock-recursive
% ./mutex-reclock-recursive  [←]
thread_A(): deposit( 10000 ) ... 
thread_A(): deposit( 10000 ) done. 
thread_B(): add_interest() ... 
thread_B(): add_interest() done. 
main(): shared_resource == 1060500
% ./mutex-reclock-recursive [←]
thread_A(): deposit( 10000 ) ... 
thread_A(): deposit( 10000 ) done. 
thread_B(): add_interest() ... 
thread_B(): add_interest() done. 
main(): shared_resource == 1060500
% []

注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義でコンパイルできない場 合、次のフラグを付けてみてください。

% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c [←]

◆stdio専用再帰的lock

printf(), fputs() などの標準入出力(stdio)ライブラリ自体は、スレッド・ セーフなので、単体で使う分にはロックは不要である。 一連の入出力をまとめるために、ロックしなければならないことがある。
	printf("hello,world\n");
上と下は、結果が違う。
	printf("hello,");
	/* ここに他のスレッドの出力が混じることがある */
	printf("world\n");
これを避けるには、 flockfile(),funlockfile(),ftrylockfile()を使う。
	flockfile(stdout);
	printf("hello,");
	/* ここに他のスレッドの出力が混じることはない */
	printf("world\n");
	funlockfile(stdout);
putchar()getchar()は、遅すぎる。 flockfile()/funlockfile()の中で使うための putchar_unlocked(),getchar_unlocked(),putc_unlocked(),getc_unlocked() が用意されている。printf_unsafe() はない。

■Pthreadとメモリ

◆auto変数

各スレッドには、独立したスタックが割り当てられる。C言語の auto 変数は、 スレッドごとにコピーが作られる。

<−>再帰呼出し

スレッド間でポインタを渡す時には、スレッドの寿命にも注意。

◆static変数

シングルスレッドのプログラムでは、static変数は、プログラムのモジュール性 を高めるために有効に使われてきた。

マルチスレッドと相性が非常に悪い。static変数もextern変数と同様に複数の スレッドで共有される。変更する場合には、mutex でロックが必要になる。

◆static変数を使ったライブラリ関数

TCP/IP でプログラムを書く時に使う gethostbyname() は、 static変数に値をセットして、その番地を返す。

struct hostent *gethostbyname( char *name ){
    static struct hostent ret ;
	.....
	return( &ret );
}

複数のスレッドが同時にこの関数を呼び出した場合、同じstatic変数が使われ る。

◆スレッド・セーフ

複数のスレッドで呼び出してもきちんと動作することを、 スレッド・セーフ(thread-safe) という。 MT-Safe(multi-thread-safe)再入可能(reentrant) ということもある。

externやstaticを使わず、auto変数やmalloc()だけを使っているような手続き は、スレッド・セーフ。

別のスレッド・セーフでない手続きを呼んでいれば、それはスレッド・セーフ ではない。

◆スレッド・セーフなインタフェース

スレッド・セーフになるようにするには、インタフェースを変更する必要があ る。
Sun のマニュアルより:
struct hostent *gethostbyname(const char *name);

struct hostent *gethostbyname_r(const char *name,
     struct hostent *result, char *buffer, int buflen,
     int *h_errnop);

◆スレッド・セーフではない手続きを使う

一見無関係の手続きが内部で変数を共有している場合がある。

■Pthreadでのセマフォの利用

Pthread には、実時間機能を実現することを目的として、セマフォが使えるよ うになっている。実時間以外の目的でも、セマフォを使ってもよい。

次のような関数が利用可能である。


#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value)
初期化。psharedが0だとプロセス内で有効。valueは初期値。
int sem_wait(sem_t * sem)
P命令。値を減らす。0の場合は止まる。
int sem_trywait(sem_t * sem)
非ブロックのsem_wait()。0でも止まらずエラーを返す。
int sem_post(sem_t * sem)
V命令。値を増やす。
int sem_getvalue(sem_t * sem, int * sval)
現在の値を返す。普通は役には立たない。次の瞬間には他のスレッドがP/Vしているかもしれないので。
int sem_destroy(sem_t * sem)
セマフォを破棄する。
注意: SystemV 由来のセマフォ(semget(),semop(),semctl())とは違う。

注意:名前付きのセマフォもある。sem_open() で作成/初期化し、 sem_unlink() で削除する。

注意:Solaris には、POSIX のセマフォとは別に、カーネル内でのデバイス・ ドライバ作成のためのセマフォが用意されている。

■Javaのスレッド

Java には、最初から言語のレベルでスレッドの機能が入っている。
Java Pthread
new Thread(); start(); pthread_create()
join() pthread_join()
synchronized pthread_mutex_lock()とpthread_mutex_unlock()の組
wait() pthread_cond_wait()
wait(long timeout) pthread_cond_timedwait()
notify() pthread_cond_signal()
notifyAll() pthread_cond_broadcast()

Java の synchronized は、再帰可能。PTHREAD_MUTEX_RECURSIVE 相当。 1つのスレッドが2度同じオブジェクトをロックしてもよい。

Pthreads のプログラムで、1つの mutex と1つの条件変数を使ったものなら、 Java で簡単に書き直せる。

循環バッファのプログラムは、1つの mutex で2つの条件変数を使っているので、単純には Java で書き直せない。 生産者側と消費者側が同時に待つことはないという性質を利用する。

Java で書かれたスレッドのプログラムは、汚いものがけっこうある。スレッ ド「間」の同期で、対称系になるべき所を、片方のスレッドのメソッドにして、 非対称になっていることがある。Java でプログラムを書く時にも、active object (thread) と passive object (threadなし)をきちんと分けた方がよい。

◆Concurrent Pascal と Java の比較

共通点 相違点

◆条件変数を使った環状バッファ(Java)

   1: 
   2: /*
   3:  * CircularBuffer.java -- Java による環状バッファ
   4:  */
   5: 
   6: class CircularBuffer
   7: {
   8:     static final int BUFFER_SIZE = 4 ;
   9:     int rp ;            // 読み出す位置
  10:     int wp ;            // 書き込む位置
  11:     int data[];         // データを保存する場所
  12:     int used ;          // バッファ内の要素数
  13:     CircularBuffer()
  14:     {
  15:         data = new int[BUFFER_SIZE];
  16:         rp = 0 ;
  17:         wp = 0 ;
  18:     }
  19: 
  20:     public synchronized void put( int x ) throws InterruptedException
  21:     {
  22:         while( used == data.length )
  23:             wait();
  24:         data[ wp++ ] = x ;
  25:         if( wp == data.length )
  26:             wp = 0 ;
  27:         if( used++ == 0 )
  28:             notifyAll();
  29:     }
  30:     public synchronized int get() throws InterruptedException
  31:     {
  32:         int x ;
  33:         while( used == 0 )
  34:             wait();         
  35:         x = data[ rp++ ] ;
  36:         if( rp >= data.length )
  37:             rp = 0 ;
  38:         if( used-- == data.length )
  39:             notifyAll();
  40:         return( x );
  41:     }
  42: }
   1: 
   2: /*
   3:  * CircularBufferDemo.java -- Java による環状バッファのデモ
   4:  */
   5: 
   6: class Thread_A implements Runnable // Producer
   7: {
   8:     CircularBuffer b;
   9:     Thread_A( CircularBuffer b )
  10:     {
  11:         this.b = b;
  12:     }
  13:     public void run()
  14:     {                           
  15:         int i,x ;
  16:         for( i = 0 ; i<10 ; i++ )
  17:         {
  18:             try
  19:             {
  20:                 x = i ;
  21:                 System.out.println("Thread_A(): put( "+x+" )");
  22:                 b.put( x );
  23:             }
  24:             catch( InterruptedException e )
  25:             {
  26:                 System.err.println("Thread_A(): Interrupted");
  27:                 break;
  28:             }
  29:         }
  30:     }
  31: }
  32: 
  33: class Thread_B implements Runnable // Producer
  34: {
  35:     CircularBuffer b;
  36:     Thread_B( CircularBuffer b )
  37:     {
  38:         this.b = b;
  39:     }
  40:     public void run()
  41:     {                           
  42:         int i,x ;
  43:         for( i = 0 ; i<10 ; i++ )
  44:         {
  45:             try
  46:             {
  47:                 x = b.get();
  48:                 System.out.println("Thread_B(): got() "+x+".");
  49:             }
  50:             catch( InterruptedException e )
  51:             {
  52:                 System.err.println("Thread_B(): Interrupted");
  53:                 break;
  54:             }
  55:         }
  56:     }
  57: }
  58: 
  59: class CircularBufferDemo
  60: {
  61:     public static void main(String argv[])
  62:     {
  63:         final CircularBuffer b = new CircularBuffer();
  64:         Thread t1 = new Thread( new Thread_A(b) );
  65:         t1.start();
  66:         Thread t2 = new Thread( new Thread_B(b) );
  67:         t2.start();
  68:         try
  69:         {
  70:             t1.join();
  71:             t2.join();
  72:         }
  73:         catch( InterruptedException e )
  74:         {
  75:             System.err.println("main(): Interrupted");
  76:         }
  77:     }
  78: }
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/CircularBuffer.java [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-14/ex/CircularBufferDemo.java [←]
% javac -encoding EUC-JP CircularBuffer.java CircularBufferDemo.java  [←]
% java CircularBufferDemo  [←]
Thread_A(): put( 0 )
Thread_A(): put( 1 )
Thread_A(): put( 2 )
Thread_A(): put( 3 )
Thread_A(): put( 4 )
Thread_B(): got() 0.
Thread_B(): got() 1.
Thread_B(): got() 2.
Thread_B(): got() 3.
Thread_B(): got() 4.
Thread_A(): put( 5 )
Thread_A(): put( 6 )
Thread_B(): got() 5.
Thread_A(): put( 7 )
Thread_A(): put( 8 )
Thread_A(): put( 9 )
Thread_B(): got() 6.
Thread_B(): got() 7.
Thread_B(): got() 8.
Thread_B(): got() 9.
% []

◆BlockingQueue

Java 2 Standard Edition (J2SE) 5.0 には、 java.util.concurrent パッケージに interface BlockingQueue が追加された。 これを使えば、自分で環状バッファを記述する必要はない。 具体的には、ArrayBlockingQueue を使う。 上限を指定しない BlockingQueue もあるが、利用しないことを奨める。

◆java.util.concurrent.locks パッケージ

Java 2 Standard Edition (J2SE) 5.0 には、次のようなクラスが追加された。
Lock
Pthread の mutex 相当。 ブロックを越えてロックできる。
Condition
Pthread の条件変数相当。 Lock と合わせてつかう。
ReadWriteLock
読書きロック。 Pthread pthread_rwlock_t 相当。後述。

◆Java のセマフォ

Java 2 Standard Edition (J2SE) 5.0 には、 java.util.concurrent.Semaphore クラスがある。

■練習問題

★練習問題(1) スレッドの数

相互排除のプログラムや条件変数プログラムでスレッドの数を増やしてみなさい。

★練習問題(2) 手続きとスレッド

1つの手続き(C言語の関数)から複数のスレッドを生成してみなさい。

★練習問題(3) ダブルバッファリング

循環バッファのプログラムダブルバッファリングを行なうプログラムに 変更しなさい。

★練習問題(4) 条件変数の削減

循環バッファのプログラムで、条件変数を1つになるように変更しなさい。

一度に wait する必要があるのは、put() 側か get() のどちらか一方だけである。 よって、両方とも同じキューにつないでも、動作する。

★練習問題(5) Javaにおける再帰的モニタの確認

再帰的mutexの例題で使った Pthread のプログラムを Java で書き直しなさい。

★練習問題(6) 引き出し機能の追加

再帰的mutexの例題で使ったプ ログラムに、口座の残高を返す手続きと口座から現金を引き出す手続きを付け なさい。ただし、現金の引出しでは、残高が負にならないようにしなさい。

balance()
{
...
}
withdraw( int n )
{
...
}

プログラミング言語は、C言語(Pthread)または Java を用いなさい。

★練習問題(7) 複数要素を受付ける有限バッファ

条件変数を使った環状バッファのプログラムのうち、 (C言語Pthread版) 、または、 (Java版) を書き換えて、 一度に n 個(n は可変)の要素を受付けるようにしなさい。すなわち、

★練習問題(8) 狭い橋

1度に1方向の車しか通さない狭い橋がある。橋の上に同時に3台の車が通る と橋が落ちる。この橋が落ちないように、車の交通整理を実現するプログラム を書きなさい。車は、スレッドで実現されるものとする。そして、次のような 手続きを呼び出す。

vehicle(int direction)
{
	arrive_bridge( direction );
	cross_bridge( direction );
	exit_bridge( direction );
}
このコードで direction は、0 または 1 であり、橋のどの方向に車 が渡ろうとしているかを示している。 手続き arrive_bridge()exit_bridge() を、mutex と条件 変数を使って書きなさい。arrive_bridge() は、安全にその方向で車 が通れるまでリターンしてはならない。衝突したり、重量オーバーで橋が落ち たりすることがないようにしなければならない。デバッグのためのメッセージ を適宜画面に出力する。exit_bridge() は、橋を渡り終えことを告げ るために呼ばれる。この時、可能ならば、待っている車を渡らせ始める (arrive_bridge() からリターンさせる)。

ここでは、公平性は実現しなくてもよい。また、飢餓状態にならないことを保 証しなくてもよい。

完成したプログラムにおいて、新しい車が来た時、既に別の車が待っていたと する。この場合、新しい車が先に橋を渡るか、それとも古い車が先に橋を渡る か、それとも、予想できないか(非決定的か)。その理由を簡単に説明しなさ い。

★練習問題(9) 化学反応

酸素原子と水素原子から水が作られる反応を、スレッドを使って書きなさい。 このような手続きを、mutex と条件変数、または、セマフォを使って書きなさい。

水素原子や酸素原子ではなく、水素分子 (H2)や酸素分子 (O2)を使ってもよい。

pthread_create( ..., NULL, (void *)H_func, ... );
pthread_create( ..., NULL, (void *)H_func, ... );
pthread_create( ..., NULL, (void *)H_func, ... );
pthread_create( ..., NULL, (void *)H_func, ... );
pthread_create( ..., NULL, (void *)O_func, ... );
pthread_create( ..., NULL, (void *)O_func, ... );
pthread_create( ..., NULL, (void *)O_func, ... );

H_func( ... )
{
    printf(...);
    H_Ready(...);
}

O_func( ... )
{
    printf(...);
    O_Ready(...);
}

H2O_func( ... )
{
    printf(...);
}

H_Ready(...)
{
    ....
}

O_Ready(...)
{
    ....
}

make_water()
{
    pthread_create( ..., NULL, (void *)H2O_func, ... );
}

★練習問題(10) 循環バッファをセマフォで実現する

循環バッファのプログラムを、セマフォを使って書き直しなさい。

セマフォを使った循環バッファのプログラムの作成は、次の教科書の練習問 題にもなっている(巻末に回答もある)。

清水謙多郎: "オペレーティングシステム",岩波書店 (1992). ISBN: 4000078526.

プログラミング言語は、C言語(Pthread)または Java を用いなさい。


↑[もどる] ←[12月7日] ・[12月14日] →[12月21日]
Last updated: 2007/12/14 10:22:48
Yasushi Shinjo / <yas@is.tsukuba.ac.jp>