並行分散ソフトウェア/並列分散ソフトウェア 電子・情報工学系 新城 靖 <yas@is.tsukuba.ac.jp>
このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/sie/pdsoft-2005/2005-12-09
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/sie/
http://www.cs.tsukuba.ac.jp/~yas/
軽量プロセスというと、内部にループを含むような語感がある。
マルチスレッドプログラミングは、非常に難しい。
どこでスレッドを使うべきか。
Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).
type procA_t = process(引数・・・); var 局所データの宣言 procedure proc1(引数・・・); procedure proc2(引数・・・); begin cycle ・・・ end; end var procA1 : procA_t ; init procA1(引数);
type monA_t = monitor(引数・・・); var 局所データの宣言 procedure entry proc1(引数・・・); procedure entry proc2(引数・・・); begin ローカルデータの初期化; end var monA1 : monA_t ; init monA1(引数);;
cv1 : condition; cv1.wait; 呼び出したプロセスを待たせる。 cv1.signal; 待っているプロセスがいれば全て再開させる。
q1 : queue; delay(q1); 呼び出したプロセスをそのキューで待たせる。 continue(q1); そのキューで待っているプロセスがいれば1つだけ再開させる。
producer | consumer2つのスレッドの間には、バッファを置く。
バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。 手続き
1: const BUFFER_SIZE = 4; 2: type circular_buffer = 3: monitor 4: var 5: rp : integer ; 6: rp : integer ; 7: data: array [0..BUFFER_SIZE-1] of integer; 8: used: integer; 9: not_empty : condition; 10: not_full : condition; 11: 12: procedure entry put(x:integer); 13: begin 14: while( used = BUFFER_SIZE ) do 15: non_full.wait; 16: data[wp] := x; 17: wp := wp + 1 ; 18: if( wp >= BUFFER_SIZE ) 19: wp := 0 ; 20: used := used + 1 ; 21: not_empty.signal; 22: end 23: 24: procedure entry get(result x:integer); 25: begin 26: while( used = 0 ) then 27: not_empty.wait; 28: x := data[rp]; 29: rp := rp + 1 ; 30: if( rp >= BUFFER_SIZE ) 31: rp := 0 ; 32: used := used - 1 ; 33: not_full.signal; 34: end 35: begin 36: rp := 0 ; 37: wp := 0 ; 38: used := 0 ; 39: end; 40: 41: ... 42: var buf : circular_buffer ; 43: init buf; 44: ... 45:
Pthread を利用したプログラムを書く時には、次のようなヘッダ・ファイルを 読み込む。
#include <pthread.h>
Solaris (Unix International系)でのコンパイルとリンク。
-D_REENTRANT
と-lpthread
を付ける。
% cc -D_REENTRANT -o create create.c -lpthreadセマフォを使う時、Solaris 6 (SunOS 5.6) では、リンク時に% ./create
... %
![]()
-lposix4
オプションを付ける。Solaris 7-10 (SunOS 5.7, 5.8, 5.9, 5.10) では、リンク時に -lrt
オプションを付ける。
Linux, MacOSX, でのコンパイルとリンク。-lpthread
を付ける。
% cc -o create create.c -lpthread% ./create
... %
![]()
図? fork-joinモデルの実現
後で join する必要がない時には、pthread_detach() を使って切り離す。 (joinしなくてもゾンビが残らない。)
1: 2: /* 3: create-join-2.c -- スレッドを2つ作るプログラム 4: */ 5: 6: #include <pthread.h> 7: 8: void func1( int x ); 9: void func2( int x ); 10: 11: main() 12: { 13: pthread_t t1 ; 14: pthread_t t2 ; 15: pthread_create( &t1, NULL, (void *)func1, (void *)10 ); 16: pthread_create( &t2, NULL, (void *)func2, (void *)20 ); 17: printf("main()\n"); 18: pthread_join( t1, NULL ); 19: pthread_join( t2, NULL ); 20: } 21: 22: void func1( int x ) 23: { 24: int i ; 25: for( i = 0 ; i<3 ; i++ ) 26: { 27: printf("func1( %d ): %d \n",x, i ); 28: } 29: } 30: 31: void func2( int x ) 32: { 33: int i ; 34: for( i = 0 ; i<3 ; i++ ) 35: { 36: printf("func2( %d ): %d \n",x, i ); 37: } 38: }実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2005-12-09/ex/create-join-2.cこの例では、次の3つのスレッドが作られる。% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2005-12-09/ex/Makefile
% make create-join-2
gcc -g -c create-join-2.c -o create-join-2.o gcc -g -o create-join-2 create-join-2.o -lpthread % ./create-join-2
main() func1( 10 ): 0 func1( 10 ): 1 func1( 10 ): 2 func2( 20 ): 0 func2( 20 ): 1 func2( 20 ): 2 %
![]()
func2
から作られたスレッド t2
func1
から作られたスレッド t1
pthread_create()
により作られる。
どういう順序で実行されるかは、決まっていない。 決まっていない。スレッドは、もともと順番を決めないような処理、 非同期的(asynchronous) な処理を表現するためのもの。どうしても他のスレッドと同期を行なう必要が 出てきた時には、mutex や条件変数といった同期機能を使う。
pthread_create()
で指定された関数からリターンすると、そ
のスレッドが終了する。pthread_exit()
を呼び出してもよい。
ただし、
初期スレッド
が終了すると、プロセス全体が終了する。
exit()
システムコールを呼び出しても終了する。
プロセスの場合、ファイル、端末、ウインドウ、主記憶、CPU時間。
主記憶やCPU時間といった資源は、横取りしても平気なので、あまり問題になりない。
スレッドの場合は、プログラミング言語の変数。
変更されない変数の値を読むだけなら、特に問題は起きない。それ意外の時、 特に、複数のスレッドで値を読み書きする時に問題が起きる。
1: 2: /* 3: * mutex-nolock.c -- 共有資源をロックなしでアクセスするプログラム 4: */ 5: 6: #include <pthread.h> 7: 8: void thread_A(), thread_B(); 9: int shared_resource ; 10: 11: main() { 12: pthread_t t1 ; 13: pthread_t t2 ; 14: shared_resource = 0 ; 15: pthread_setconcurrency( 2 ); 16: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 17: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 18: pthread_join( t1, NULL ); 19: pthread_join( t2, NULL ); 20: printf("main(): shared_resource == %d\n", shared_resource ); 21: } 22: 23: void thread_A() 24: { 25: int i, x ; 26: for( i = 0 ; i<1000000 ; i++ ) 27: { 28: x = shared_resource ; 29: x = x + 1 ; 30: shared_resource = x ; 31: } 32: } 33: 34: void thread_B() { 35: int i, x ; 36: for( i = 0 ; i<1000000 ; i++ ) { 37: x = shared_resource ; 38: x = x + 1 ; 39: shared_resource = x ; 40: } 41: }
pthread_setconcurrency() は、利用したい CPU 数をシステムに要求するもの。 (古い Solaris では、thr_setconcurrency() を使う。)
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2005-12-09/ex/mutex-nolock.c% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2005-12-09/ex/Makefile
% make mutex-nolock
gcc -c mutex-nolock.c -o mutex-nolock.o gcc mutex-nolock.o -lpthread -o mutex-nolock % ./mutex-nolock
main(): shared_resource == 1004100 % ./mutex-nolock
main(): shared_resource == 1003470 % ./mutex-nolock
main(): shared_resource == 1003316 % ./mutex-nolock
main(): shared_resource == 1003143 %
thread_A()
, thread_B()
と2つのスレッドが作
られている。整数型の変数 shared_resource
は、その2つの
スレッドの両方からアクセスされる。2つのスレッドで合計2000000 増える予
定だが、実際に実行すると、そうはならない。実行する度に答えが違う。
単一プロセッサの Linux 等では、何度か実行しても常に 2000000 だけ増える ことがある。
このプログラムが共有メモリ型マルチプロセッサで動いているとして、動きを 考えてえる。
thread_A() thread_B() : : : : 28: x = shared_resource ; 37: x = shared_resource ; 29: x = x + 1 ; 38: x = x + 1 ; 30: shared_resource = x ; 39: shared_resource = x ; : : : :
shared_resource++
と書いてもだめ。複数の機械語命令に分割
されるので。
相互排除(mutual exclusion): ある資源をアクセスできるスレッドの数を 多くても1つにする。
プログラムの字面上、相互排除が必要な部分を 際どい部分(critical section) ( クリティカルセクション ) という。
Pthread では、相互排除を行なうために、 mutex という仕組みがある。次のようにして、相互排除を行ないたい部分を lock と unlock で囲む。
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER; : : pthread_mutex_lock( &murex1 ); <相互排除したい部分(際どい部分)> pthread_mutex_unlock( &mutex1 );
1: /* 2: * mutex-lock.c -- 共有資源をロックしながらアクセスするプログラム 3: */ 4: 5: #include <pthread.h> 6: 7: void thread_A(), thread_B(); 8: int shared_resource ; 9: pthread_mutex_t mutex1 ; 10: 11: main() { 12: pthread_t t1 ; 13: pthread_t t2 ; 14: shared_resource = 0 ; 15: pthread_mutex_init( &mutex1, NULL ); 16: pthread_setconcurrency( 2 ); 17: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 18: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 19: pthread_join( t1, NULL ); 20: pthread_join( t2, NULL ); 21: printf("main(): shared_resource == %d\n", shared_resource ); 22: } 23: 24: void thread_A() 25: { 26: int i, x ; 27: for( i = 0 ; i<1000000 ; i++ ) 28: { 29: pthread_mutex_lock( &mutex1 ); 30: x = shared_resource ; 31: x = x + 1 ; 32: shared_resource = x ; 33: pthread_mutex_unlock( &mutex1 ); 34: } 35: } 36: 37: void thread_B() { 38: int i, x ; 39: for( i = 0 ; i<1000000 ; i++ ) { 40: pthread_mutex_lock( &mutex1 ); 41: x = shared_resource ; 42: x = x + 1 ; 43: shared_resource = x ; 44: pthread_mutex_unlock( &mutex1 ); 45: } 46: }
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2005-12-09/ex/mutex-lock.c% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2005-12-09/ex/Makefile
% make mutex-lock
gcc mutex-lock.o -lpthread -o mutex-lock % ./mutex-lock
main(): shared_resource == 2000000 % ./mutex-lock
main(): shared_resource == 2000000 % ./mutex-lock
main(): shared_resource == 2000000 %
![]()
thread_A() thread_B() : : 29: pthread_mutex_lock( &mutex1 ); : 40: pthread_mutex_lock( &mutex1 ); 30: x = shared_resource ; <他のスレッドが実行中なので 31: x = x + 1 ; この状態でしばらく待たされる> 32: shared_resource = x ; : 33: pthread_mutex_unlock( &mutex1 ); : : <実行再開> : 41: x = shared_resource ; 42: x = x + 1 ; 43: shared_resource = x ; 44: pthread_mutex_unlock( &mutex1 );後から来た
thread_B()
は、他のスレッドが実行中は、待たさ
れる。
thread_A | thread_B2つのスレッドの間には、バッファを置く。
バッファが空の時、thread_B() は、thread_A() が何かデー タをバッファに入れるのを待つ。バッファがいっぱいの時、thread_A() は、thread_B() がバッファから何かデータを取り出すのを待つ。
条件変数の操作:
1: 2: /* 3: * condv-buffer.c -- 条件変数を使った環状バッファ 4: */ 5: 6: #include <pthread.h> 7: 8: void thread_A(), thread_B(); 9: 10: #define BUFFER_SIZE 4 /* バッファの大きさ */ 11: struct circular_buffer 12: { 13: int rp ; /* 読み出す位置 */ 14: int wp ; /* 書き込む位置 */ 15: int used ; /* バッファ内の要素数 */ 16: int data[BUFFER_SIZE]; /* データを保存する場所 */ 17: pthread_mutex_t mutex ; /* この構造体の相互排除のための mutex */ 18: pthread_cond_t not_full ; /* バッファが一杯ではない状態を待つための条件変数 */ 19: pthread_cond_t not_empty ; /* バッファが空ではない状態を待つための条件変数 */ 20: }; 21: 22: void put( struct circular_buffer *b,int x ) 23: { 24: pthread_mutex_lock( &b->mutex ); 25: loop: if( b->used == BUFFER_SIZE ) 26: { 27: pthread_cond_wait( &b->not_full,&b->mutex ); 28: goto loop; 29: } 30: b->data[ b->wp++ ] = x ; 31: if( b->wp >= BUFFER_SIZE ) 32: b->wp = 0 ; 33: b->used ++ ; 34: pthread_cond_signal( &b->not_empty ); 35: pthread_mutex_unlock( &b->mutex ); 36: } 37: 38: int get( struct circular_buffer *b ) 39: { 40: int x ; 41: pthread_mutex_lock( &b->mutex ); 42: loop: if( b->used == 0 ) 43: { 44: pthread_cond_wait( &b->not_empty,&b->mutex ); 45: goto loop; 46: } 47: x = b->data[ b->rp++ ] ; 48: if( b->rp >= BUFFER_SIZE ) 49: b->rp = 0 ; 50: b->used -- ; 51: pthread_cond_signal( &b->not_full ); 52: pthread_mutex_unlock( &b->mutex ); 53: return( x ); 54: } 55: 56: main() 57: { 58: pthread_t t1 ; 59: pthread_t t2 ; 60: struct circular_buffer *b ; 61: b = (struct circular_buffer *)malloc(sizeof(struct circular_buffer)); 62: if( b == NULL ) 63: { 64: perror("no memory for struct buffer\n"); 65: exit( -1 ); 66: } 67: b->rp = 0 ; 68: b->wp = 0 ; 69: b->used = 0 ; 70: pthread_mutex_init( &b->mutex, NULL ); 71: pthread_cond_init( &b->not_full,NULL ); 72: pthread_cond_init( &b->not_empty,NULL ); 73: pthread_setconcurrency( 2 ); 74: pthread_create( &t1, NULL, (void *)thread_A, (void *)b ); 75: pthread_create( &t2, NULL, (void *)thread_B, (void *)b ); 76: pthread_join( t1, NULL ); 77: pthread_join( t2, NULL ); 78: } 79: 80: void thread_A( struct circular_buffer *b ) /* producer */ 81: { 82: int i,x ; 83: for( i = 0 ; i<10 ; i++ ) 84: { 85: x = i ; 86: printf("thread_A(): put( %d )\n",x ); 87: put( b,x ); 88: } 89: } 90: 91: void thread_B( struct circular_buffer *b ) /* consumer */ 92: { 93: int i, x ; 94: for( i = 0 ; i<10 ; i++ ) 95: { 96: x = get( b ); 97: printf("thread_B(): get() %d.\n", x ); 98: } 99: }
put()
は、バッファにデータを追加する時に使う手続き。
基本的には、入口で pthread_mutex_lock()
し、
出口で pthread_mutex_unlock()
する。
バッファが一杯の時には、条件変数b->not_full
で、
一杯でないという条件になるまで待つ。
待っている間は、mutex のロックは解除される。
pthread_cond_wait()
からリターンして来る時には、もう一度
ロックされた状態に戻るが、待っている間に、他の変数
(rp,wp,data
)が書き換えられている可能性があるので、もう一
度最初から調べる。
get()
は、バッファからデータを取り出す時に使う手
続き。put()
とほぼ対称形。バッファが空の時に、wait
し、バッファがもはや一杯ではないことをsignal する。
thread_A()
は、10回バッファにデータを書き込むスレッド。
thread_B()
は逆に、10回バッファからデータを読み出すス
レッド。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2005-12-09/ex/condv-buffer.c複数のスレッドが同時に動いている。バッファにためられるのは、最大4なの に、put() が 5 回連続して成功しているように見える。printf() の順番と put(), get() の順番は違うことがある。% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2005-12-09/ex/Makefile
% make condv-buffer
gcc -c condv-buffer.c -o condv-buffer.o gcc condv-buffer.o -lpthread -o condv-buffer % ./condv-buffer
thread_A(): put( 0 ) thread_A(): put( 1 ) thread_A(): put( 2 ) thread_A(): put( 3 ) thread_A(): put( 4 ) thread_A(): put( 5 ) thread_B(): get() 0. thread_B(): get() 1. thread_B(): get() 2. thread_B(): get() 3. thread_B(): get() 4. thread_A(): put( 6 ) thread_A(): put( 7 ) thread_A(): put( 8 ) thread_A(): put( 9 ) thread_B(): get() 5. thread_B(): get() 6. thread_B(): get() 7. thread_B(): get() 8. thread_B(): get() 9. %
![]()
pthread_mutex_lock( &mutex ); while( 1 ) { if( 条件 ) pthread_cond_wait( &cv, &mutex ); 処理; }
バッファに要素を「1つずつ」追加しているので、
pthread_cond_signal()
でもよい。
pthread_cond_broadcast()
に変えても動くようにプログラムを
作る。
pthread_cond_wait() で待っている間に条件が変わっているかもしれないので、 最初から調べ直す。signal で1人だけしか起き上がらないと仮定してはいけ ない。
「1つずつ」ではなく、複数個同時に読み書きする時には、
pthread_cond_broadcast()
でないとだめ。
迷った時には、pthread_cond_broadcast()
。
図? 開いたモジュールと閉じたモジュール
1: /* 2: * mutex-reclock-normal.c 3: * Start: 2001/01/08 09:41:11 4: */ 5: 6: #include <pthread.h> 7: 8: void thread_A(), thread_B(); 9: int shared_resource ; 10: pthread_mutex_t mutex1 ; 11: 12: deposit( int n ) 13: { 14: pthread_mutex_lock( &mutex1 ); 15: shared_resource += n ; 16: pthread_mutex_unlock( &mutex1 ); 17: } 18: 19: add_interest() 20: { 21: int i ; 22: pthread_mutex_lock( &mutex1 ); 23: i = shared_resource * 0.05 ; 24: deposit( i ); 25: pthread_mutex_unlock( &mutex1 ); 26: } 27: 28: main() { 29: pthread_t t1 ; 30: pthread_t t2 ; 31: shared_resource = 1000000 ; 32: pthread_mutex_init( &mutex1, NULL ); 33: 34: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 35: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 36: pthread_join( t1, NULL ); 37: pthread_join( t2, NULL ); 38: printf("main(): shared_resource == %d\n", shared_resource ); 39: } 40: 41: void thread_A() 42: { 43: printf("thread_A(): deposit( 10000 ) ... \n"); 44: deposit( 10000 ); 45: printf("thread_A(): deposit( 10000 ) done. \n"); 46: } 47: 48: void thread_B() 49: { 50: printf("thread_B(): add_interest() ... \n"); 51: add_interest(); 52: printf("thread_B(): add_interest() done. \n"); 53: }実行例。
% wget http://www.hlla.is.tsukuba.ac.jp/~yas/sie/pdsoft-2003/2004-01-22/ex/mutex-reclock-normal.c% wget http://www.hlla.is.tsukuba.ac.jp/~yas/sie/pdsoft-2003/2004-01-22/ex/Makefile
% make mutex-reclock-normal
gcc -c mutex-reclock-normal.c -o mutex-reclock-normal.o gcc mutex-reclock-normal.o -lpthread -o mutex-reclock-normal % ./mutex-reclock-normal
thread_A(): deposit( 10000 ) ... thread_A(): deposit( 10000 ) done. thread_B(): add_interest() ... ^C %
![]()
注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義で動作しないとう報告が 来ています。その場合は、コンパイル時に次のフラグを付けてみてください。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c![]()
1: /* 2: * mutex-reclock-recursive.c 3: * Start: 2001/01/08 09:41:11 4: */ ... 28: static int 29: my_pthread_mutex_init_recursive( pthread_mutex_t *mutex ) 30: { 31: pthread_mutexattr_t attr ; 32: int err ; 33: if( (err=pthread_mutexattr_init( &attr )) < 0 ) 34: return( 0 ); 35: if( (err=pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE)) <0 ) 36: return( 0 ); 37: err = pthread_mutex_init( mutex,&attr ); 38: return( err ); 39: } 40: 41: main() 42: { 43: pthread_t t1 ; 44: pthread_t t2 ; 45: shared_resource = 1000000 ; 46: my_pthread_mutex_init_recursive( &mutex1 ); 47: 48: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 49: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 50: pthread_join( t1, NULL ); 51: pthread_join( t2, NULL ); 52: printf("main(): shared_resource == %d\n", shared_resource ); 53: } ...実行例。deposit() と add_interest() のタイミングによっては、最終結果は 違うことがある。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2005-12-09/ex/mutex-reclock-recursive.c% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2005-12-09/ex/Makefile
% make mutex-reclock-recursive
gcc -c mutex-reclock-recursive.c -o mutex-reclock-recursive.o gcc mutex-reclock-recursive.o -lpthread -o mutex-reclock-recursive % ./mutex-reclock-recursive
thread_A(): deposit( 10000 ) ... thread_A(): deposit( 10000 ) done. thread_B(): add_interest() ... thread_B(): add_interest() done. main(): shared_resource == 1060500 %
![]()
注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義で動作しないとう報告が 来ています。その場合は、コンパイル時に次のフラグを付けてみてください。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c![]()
printf("hello,world\n");上と下は、結果が違う。
printf("hello,"); /* ここに他のスレッドの出力が混じることがある */ printf("world\n");これを避けるには、
flockfile(),funlockfile(),ftrylockfile()
を使う。
flockfile(stdout); printf("hello,"); /* ここに他のスレッドの出力が混じることはない */ printf("world\n"); funlockfile(stdout);
putchar()
や
getchar()
は、遅すぎる。
flockfile()/funlockfile()
の中で使うための
putchar_unlocked(),getchar_unlocked(),putc_unlocked(),getc_unlocked()
が用意されている。printf_unsafe()
はない。
<−>再帰呼出し
スレッド間でポインタを渡す時には、スレッドの寿命にも注意。
シングルスレッドのプログラムでは、static変数は、プログラムのモジュール性 を高めるために有効に使われてきた。
マルチスレッドと相性が非常に悪い。static変数もextern変数と同様に複数の スレッドで共有される。変更する場合には、mutex でロックが必要になる。
gethostbyname()
は、
static変数に値をセットして、リターン・バリューとして返してる。
struct hostent *gethostbyname( char *name ){ static struct hostent ret ; ..... return( &ret ); }複数のスレッドが同時にこの関数を呼び出した場合、同じstatic変数が使われ る。
externやstaticを使わず、auto変数やmalloc()だけを使っているような手続き は、スレッド・セーフ。
別のスレッド・セーフでない手続きを呼んでいれば、それはスレッド・セーフ ではない。
Sun のマニュアルより: struct hostent *gethostbyname(const char *name); struct hostent *gethostbyname_r(const char *name, struct hostent *result, char *buffer, int buflen, int *h_errnop);O2には、gethostbyname_r() はない。
次のような関数が利用可能である。
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value)
int sem_wait(sem_t * sem)
int sem_trywait(sem_t * sem)
sem_wait()
。0でも止まらずエラーを返す。
int sem_post(sem_t * sem)
int sem_getvalue(sem_t * sem, int * sval)
int sem_destroy(sem_t * sem)
注意:名前付きのセマフォもある。sem_open() で作成/初期化し、 sem_unlink() で削除する。
注意:Solaris には、POSIX のセマフォとは別に、カーネル内でのデバイス・ ドライバ作成のためのセマフォが用意されている。
Java には、最初から言語のレベルでスレッドの機能が入っている。
---------------------------------------------------------------------- Java Pthread ---------------------------------------------------------------------- new Thread(); start(); pthread_create() join() pthread_join() synchronized pthread_mutex_lock()とpthread_mutex_unlock()の組 wait() pthread_cond_wait() wait(long timeout) pthread_cond_timedwait() notify() pthread_cond_signal() notifyAll() pthread_cond_broadcast() ----------------------------------------------------------------------
Java の synchronized は、再帰可能。PTHREAD_MUTEX_RECURSIVE 相当。 1つのスレッドが2度同じオブジェクトをロックしてもよい。
Pthreads のプログラムで、1つの mutex と1つの条件変数を使ったものなら、 Java で簡単に書き直せる。
循環バッファのプログラムは、1つの mutex で2つの条件変数を使っているので、単純には Java で書き直せない。 生産者側と消費者側が同時に待つことはないという性質を利用する。
Java で書かれたスレッドのプログラムは、汚いものがけっこうある。スレッ ド「間」の同期で、対称系になるべき所を、片方のスレッドのメソッドにして、 非対称になっていることがある。Java でプログラムを書く時にも、active object (thread) と passive object (threadなし)をきちんと分けた方がよい。
1: 2: /* 3: * CircularBuffer.java -- Java による環状バッファ 4: */ 5: 6: class CircularBuffer 7: { 8: static final int BUFFER_SIZE = 4 ; 9: int rp ; // 読み出す位置 10: int wp ; // 書き込む位置 11: int data[]; // データを保存する場所 12: int used ; // バッファ内の要素数 13: CircularBuffer() 14: { 15: data = new int[BUFFER_SIZE]; 16: rp = 0 ; 17: wp = 0 ; 18: } 19: 20: public synchronized void put( int x ) throws InterruptedException 21: { 22: while( used == data.length ) 23: wait(); 24: data[ wp++ ] = x ; 25: if( wp == data.length ) 26: wp = 0 ; 27: if( used++ == 0 ) 28: notifyAll(); 29: } 30: public synchronized int get() throws InterruptedException 31: { 32: int x ; 33: while( used == 0 ) 34: wait(); 35: x = data[ rp++ ] ; 36: if( rp >= data.length ) 37: rp = 0 ; 38: if( used-- == data.length ) 39: notifyAll(); 40: return( x ); 41: } 42: }
1: 2: /* 3: * CircularBufferDemo.java -- Java による環状バッファのデモ 4: */ 5: 6: class Thread_A implements Runnable // Producer 7: { 8: CircularBuffer b; 9: Thread_A( CircularBuffer b ) 10: { 11: this.b = b; 12: } 13: public void run() 14: { 15: int i,x ; 16: for( i = 0 ; i<10 ; i++ ) 17: { 18: try 19: { 20: x = i ; 21: System.out.println("Thread_A(): put( "+x+" )"); 22: b.put( x ); 23: } 24: catch( InterruptedException e ) 25: { 26: System.err.println("Thread_A(): Interrupted"); 27: break; 28: } 29: } 30: } 31: } 32: 33: class Thread_B implements Runnable // Producer 34: { 35: CircularBuffer b; 36: Thread_B( CircularBuffer b ) 37: { 38: this.b = b; 39: } 40: public void run() 41: { 42: int i,x ; 43: for( i = 0 ; i<10 ; i++ ) 44: { 45: try 46: { 47: x = b.get(); 48: System.out.println("Thread_B(): got() "+x+"."); 49: } 50: catch( InterruptedException e ) 51: { 52: System.err.println("Thread_B(): Interrupted"); 53: break; 54: } 55: } 56: } 57: } 58: 59: class CircularBufferDemo 60: { 61: public static void main(String argv[]) 62: { 63: final CircularBuffer b = new CircularBuffer(); 64: Thread t1 = new Thread( new Thread_A(b) ); 65: t1.start(); 66: Thread t2 = new Thread( new Thread_B(b) ); 67: t2.start(); 68: try 69: { 70: t1.join(); 71: t2.join(); 72: } 73: catch( InterruptedException e ) 74: { 75: System.err.println("main(): Interrupted"); 76: } 77: } 78: }
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2005-12-09/ex/CircularBuffer.java% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2005-12-09/ex/CircularBufferDemo.java
% javac CircularBuffer.java CircularBufferDemo.java
% java CircularBufferDemo
Thread_A(): put( 0 ) Thread_A(): put( 1 ) Thread_A(): put( 2 ) Thread_A(): put( 3 ) Thread_A(): put( 4 ) Thread_B(): got() 0. Thread_B(): got() 1. Thread_B(): got() 2. Thread_B(): got() 3. Thread_A(): put( 5 ) Thread_A(): put( 6 ) Thread_A(): put( 7 ) Thread_A(): put( 8 ) Thread_B(): got() 4. Thread_B(): got() 5. Thread_B(): got() 6. Thread_B(): got() 7. Thread_A(): put( 9 ) Thread_B(): got() 8. Thread_B(): got() 9. %
![]()
一度に wait する必要があるのは、put() 側か get() のどちらか一方だけである。 よって、両方とも同じキューにつないでも、動作する。
balance() { ... } withdraw( int n ) { ... }
プログラミング言語は、C言語(Pthread)または Java を用いなさい。
注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義で動作しないとう報告が 来ています。その場合は、コンパイル時に次のフラグを付けてみてください。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c![]()
注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義で動作しないとう報告が 来ています。その場合は、コンパイル時に次のフラグを付けてみてください。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c![]()
1度に1方向の車しか通さない狭い橋がある。橋の上に同時に3台の車が通る と橋が落ちる。この橋が落ちないように、車の交通整理を実現するプログラム を書きなさい。車は、スレッドで実現されるものとする。そして、次のような 手続きを呼び出す。
vehicle(int direction) { arrive_bridge( direction ); cross_bridge( direction ); exit_bridge( direction ); }このコードで
direction
は、0 または 1 であり、橋のどの方向に車
が渡ろうとしているかを示している。
手続き arrive_bridge()
と exit_bridge()
を、mutex と条件
変数を使って書きなさい。arrive_bridge()
は、安全にその方向で車
が通れるまでリターンしてはならない。衝突したり、重量オーバーで橋が落ち
たりすることがないようにしなければならない。デバッグのためのメッセージ
を適宜画面に出力する。exit_bridge()
は、橋を渡り終えことを告げ
るために呼ばれる。この時、可能ならば、待っている車を渡らせ始める
(arrive_bridge()
からリターンさせる)。
ここでは、公平性は実現しなくてもよい。また、飢餓状態にならないことを保 証しなくてもよい。
完成したプログラムにおいて、新しい車が来た時、既に別の車が待っていたと する。この場合、新しい車が先に橋を渡るか、それとも古い車が先に橋を渡る か、それとも、予想できないか(非決定的か)。その理由を簡単に説明しなさ い。
水素原子や酸素原子ではなく、水素分子 (H2)や酸素分子 (O2)を使ってもよい。
pthread_create( ..., NULL, (void *)H_func, ... ); pthread_create( ..., NULL, (void *)H_func, ... ); pthread_create( ..., NULL, (void *)H_func, ... ); pthread_create( ..., NULL, (void *)H_func, ... ); pthread_create( ..., NULL, (void *)O_func, ... ); pthread_create( ..., NULL, (void *)O_func, ... ); pthread_create( ..., NULL, (void *)O_func, ... ); H_func( ... ) { printf(...); H_Ready(...); } O_func( ... ) { printf(...); O_Ready(...); } H2O_func( ... ) { printf(...); } H_Ready(...) { .... } O_Ready(...) { .... } make_water() { pthread_create( ..., NULL, (void *)H2O_func, ... ); }
注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義で動作しないとう報告が 来ています。その場合は、コンパイル時に次のフラグを付けてみてください。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c![]()
セマフォを使った循環バッファのプログラムの作成は、次の教科書の練習問 題にもなっている(巻末に回答もある)。
清水謙多郎: "オペレーティングシステム",岩波書店 (1992). ISBN: 4000078526.