並行システム システム情報系情報工学域, システム情報工学研究科コンピュータサイエンス専攻 新城 靖 <yas@cs.tsukuba.ac.jp>
このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/cs/
http://www.cs.tsukuba.ac.jp/~yas/
図? シングルスレッドのプロセスとマルチスレッドのプロセス / A process with a single thread and a process with multiple threads
軽量プロセスというと、内部にループを含むような語感がある。 A lightweight process can contain a loop in it.
マルチスレッドプログラミングは、非常に難しい。
どこでスレッドを使うべきか。
Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).
Per Brinch Hansen: "The programming language Concurrent Pascal," IEEE Transactions on Software Engineering, Vol.SE-1, No.2, pp.199-207, June 1975.
type procA_t = process(args...); var local variables... procedure proc1(args...); procedure proc2(args...); begin cycle ... end; end var procA1 : procA_t ; init procA1(args...);
type monA_t = monitor(args ・・・); var loal variables procedure entry proc1(args ・・・); procedure entry proc2(args ・・・); begin initialization of local variables; end var monA1 : monA_t ; init monA1(引数);ローカル変数は、entry の手続きでしかアクセスできない。 1 つのプロセスが entry の手続きを実行中は、他のプロセスは入れない(mutual exclusion)。
図? Concurrent Pascalのプロセスとモニタ/Processes and a monitor in Concurrent Pascal
cv1 : condition; cv1.wait; 呼び出したプロセスを待たせる。block the current process. cv1.signal; 待っているプロセスがいれば全て再開させる。unblock all waiting processes.
q1 : queue; delay(q1); 呼び出したプロセスをそのキューで待たせる。 block the current process. continue(q1); そのキューで待っているプロセスがいれば1つだけ再開させる。 unblock a single process in the queue.
$ producer | consumer
2つのスレッドの間には、バッファを置く。
図? 有限バッファ(環状バッファ)、生産者、消費者 / bounded buffer (circular buffer), producer, consumer
バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。
手続き procedures
1: const BUFFER_SIZE = 4; 2: type circular_buffer = 3: monitor 4: var 5: rp : integer ; 6: wp : integer ; 7: data: array [0..BUFFER_SIZE-1] of integer; 8: used: integer; 9: not_empty : condition; 10: not_full : condition; 11: 12: procedure entry put(x:integer); 13: begin 14: while( used = BUFFER_SIZE ) do 15: non_full.wait; 16: data[wp] := x; 17: wp := wp + 1 ; 18: if( wp >= BUFFER_SIZE ) 19: wp := 0 ; 20: used := used + 1 ; 21: not_empty.signal; 22: end 23: 24: procedure entry get(result x:integer); 25: begin 26: while( used = 0 ) then 27: not_empty.wait; 28: x := data[rp]; 29: rp := rp + 1 ; 30: if( rp >= BUFFER_SIZE ) 31: rp := 0 ; 32: used := used - 1 ; 33: not_full.signal; 34: end 35: begin 36: rp := 0 ; 37: wp := 0 ; 38: used := 0 ; 39: end; 40: 41: ... 42: var buf : circular_buffer ; 43: init buf; 44: ... 45:
Pthread を利用したプログラムを書く時には、次のようなヘッダ・ファイルを 読み込む。
#include <pthread.h>
Linux, MacOSX, でのコンパイルとリンク。-lpthread
を付ける。
% cc -o create create.c -lpthread
% ./create
...
%
Solaris (Unix International系)でのコンパイルとリンク。
-D_REENTRANT
と-lpthread
を付ける。
% cc -D_REENTRANT -o create create.c -lpthread
% ./create
...
%
セマフォを使う時、Solaris 6 (SunOS 5.6) では、リンク時に -lposix4
オプションを付ける。Solaris 7-10 (SunOS 5.7, 5.8, 5.9, 5.10) では、リンク時に -lrt
オプションを付ける。
図? fork-joinモデルの実現
後で join する必要がない時には、pthread_detach() を使って切り離す。 (joinしなくてもゾンビが残らない。)
1: 2: /* 3: create-join-2.c -- スレッドを2つ作るプログラム 4: */ 5: 6: #include <stdlib.h> /* malloc() */ 7: #include <stdio.h> /* printf() */ 8: #include <pthread.h> 9: 10: struct func1_arg { 11: int x; 12: }; 13: struct func2_arg { 14: int x; 15: }; 16: 17: void func1( struct func1_arg *arg ); 18: void func2( struct func2_arg *arg ); 19: 20: main() 21: { 22: pthread_t t1 ; 23: pthread_t t2 ; 24: struct func1_arg *arg1; 25: struct func2_arg *arg2; 26: 27: printf("main()\n"); 28: arg1 = malloc( sizeof(struct func1_arg) ); 29: if( arg1 == NULL ) 30: { 31: perror("no memory for arg1"); 32: exit( 1 ); 33: } 34: arg1->x = 10; 35: pthread_create( &t1, NULL, (void *)func1, (void *)arg1 ); 36: arg2 = malloc( sizeof(struct func2_arg) ); 37: if( arg2 == NULL ) 38: { 39: perror("no memory for arg2"); 40: exit( 1 ); 41: } 42: arg2->x = 20; 43: pthread_create( &t2, NULL, (void *)func2, (void *)arg2 ); 44: pthread_join( t1, NULL ); 45: pthread_join( t2, NULL ); 46: } 47: 48: void func1( struct func1_arg *arg ) 49: { 50: int i ; 51: for( i = 0 ; i<3 ; i++ ) 52: { 53: printf("func1( %d ): %d \n",arg->x, i ); 54: } 55: } 56: 57: void func2( struct func2_arg *arg ) 58: { 59: int i ; 60: for( i = 0 ; i<3 ; i++ ) 61: { 62: printf("func2( %d ): %d \n",arg->x, i ); 63: } 64: }実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/create-join-2.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/Makefile
% make create-join-2
gcc -c -o create-join-2.o create-join-2.c
gcc create-join-2.o -lpthread -o create-join-2
% ./create-join-2
main()
func1( 10 ): 0
func2( 20 ): 0
func1( 10 ): 1
func2( 20 ): 1
func1( 10 ): 2
func2( 20 ): 2
% ./create-join-2
main()
func1( 10 ): 0
func1( 10 ): 1
func1( 10 ): 2
func2( 20 ): 0
func2( 20 ): 1
func2( 20 ): 2
% ./create-join-2
main()
func1( 10 ): 0
func1( 10 ): 1
func1( 10 ): 2
func2( 20 ): 0
func2( 20 ): 1
func2( 20 ): 2
%
この例では、次の3つのスレッドが作られる。Three threads were created.
func1
から作られたスレッド t1
func2
から作られたスレッド t2
pthread_create()
により作られる。
どういう順序で実行されるかは、決まっていない(nondeterministic)。 スレッドは、もともと順番を決めないような処理、 非同期的(asynchronous) な処理を表現するためのもの。どうしても順序を決めたければ、 mutex や条件変数といった同期機能を使う。
pthread_create()
で指定された関数からリターンすると、そ
のスレッドが終了する。pthread_exit()
を呼び出してもよい。
ただし、
初期スレッド
が終了すると、プロセス全体が終了する。
exit()
システムコールを呼び出しても終了する。
プロセスの場合、ファイル、端末、ウインドウ、主記憶、CPU時間。
主記憶やCPU時間といった資源は、横取り(preemption)しても平気なので、あま り問題になりない。
スレッドの場合は、プログラミング言語の変数(variables)。
変更されない変数(immutable variables)の値を読む(read-only)だけなら、特 に問題は起きない。それ意外の時、特に、複数のスレッドで値を読み書きする 時に問題が起きる。
1: 2: /* 3: * mutex-nolock.c -- 共有資源をロックなしでアクセスするプログラム 4: */ 5: 6: #include <stdio.h> 7: #include <pthread.h> 8: 9: void thread_A(), thread_B(); 10: int shared_resource ; 11: 12: main() { 13: pthread_t t1 ; 14: pthread_t t2 ; 15: shared_resource = 0 ; 16: pthread_setconcurrency( 2 ); 17: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 18: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 19: pthread_join( t1, NULL ); 20: pthread_join( t2, NULL ); 21: printf("main(): shared_resource == %d\n", shared_resource ); 22: } 23: 24: void thread_A() 25: { 26: int i, x ; 27: for( i = 0 ; i<1000000 ; i++ ) 28: { 29: x = shared_resource ; 30: x = x + 1 ; 31: shared_resource = x ; 32: } 33: } 34: 35: void thread_B() { 36: int i, x ; 37: for( i = 0 ; i<1000000 ; i++ ) { 38: x = shared_resource ; 39: x = x + 1 ; 40: shared_resource = x ; 41: } 42: }
pthread_setconcurrency() は、利用したい CPU 数をシステムに要求するもの。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/mutex-nolock.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/Makefile
% make mutex-nolock
gcc -c -o mutex-nolock.o mutex-nolock.c
gcc mutex-nolock.o -lpthread -o mutex-nolock
% ./mutex-nolock
main(): shared_resource == 1014838
% ./mutex-nolock
main(): shared_resource == 1062224
% ./mutex-nolock
main(): shared_resource == 1096882
% ./mutex-nolock
main(): shared_resource == 1091488
% ./mutex-nolock
main(): shared_resource == 1096291
%
thread_A()
, thread_B()
と2つのスレッドが作
られている。整数型の変数 shared_resource
は、その2つの
スレッドの両方からアクセスされる。2つのスレッドで合計2000000 増える予
定だが、実際に実行すると、そうはならない。実行する度に答えが違う。
単一プロセッサの Linux 等では、何度か実行しても常に 2000000 だけ増える ことがある。
このプログラムが SMP で動いているとして、動きを 考えてえる。
thread_A() thread_B() : : : : 29: x = shared_resource ; 38: x = shared_resource ; 30: x = x + 1 ; 39: x = x + 1 ; 31: shared_resource = x ; 40: shared_resource = x ; : : : :
shared_resource++
と書いてもだめ。複数の機械語命令に分割
されるので。
相互排除(mutual exclusion): ある資源をアクセスできるスレッドの数を 多くても1つ(at most one)にする。
プログラムの字面上、相互排除が必要な部分を 際どい部分(critical section) ( クリティカルセクション ) という。
Pthread では、相互排除を行なうために、 mutex という仕組みがある。次のようにして、相互排除を行ないたい部分を lock と unlock で囲む。
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER; : : pthread_mutex_lock( &murex1 ); <際どい部分。critical section.> pthread_mutex_unlock( &mutex1 );
1: /* 2: * mutex-lock.c -- 共有資源をロックしながらアクセスするプログラム 3: */ 4: 5: #include <stdio.h> 6: #include <pthread.h> 7: 8: void thread_A(), thread_B(); 9: int shared_resource ; 10: pthread_mutex_t mutex1 ; 11: 12: main() { 13: pthread_t t1 ; 14: pthread_t t2 ; 15: shared_resource = 0 ; 16: pthread_mutex_init( &mutex1, NULL ); 17: pthread_setconcurrency( 2 ); 18: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 19: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 20: pthread_join( t1, NULL ); 21: pthread_join( t2, NULL ); 22: printf("main(): shared_resource == %d\n", shared_resource ); 23: } 24: 25: void thread_A() 26: { 27: int i, x ; 28: for( i = 0 ; i<1000000 ; i++ ) 29: { 30: pthread_mutex_lock( &mutex1 ); 31: x = shared_resource ; 32: x = x + 1 ; 33: shared_resource = x ; 34: pthread_mutex_unlock( &mutex1 ); 35: } 36: } 37: 38: void thread_B() { 39: int i, x ; 40: for( i = 0 ; i<1000000 ; i++ ) { 41: pthread_mutex_lock( &mutex1 ); 42: x = shared_resource ; 43: x = x + 1 ; 44: shared_resource = x ; 45: pthread_mutex_unlock( &mutex1 ); 46: } 47: }
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/mutex-lock.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/Makefile
% make mutex-lock
gcc -c -o mutex-lock.o mutex-lock.c
gcc mutex-lock.o -lpthread -o mutex-lock
% ./mutex-lock
main(): shared_resource == 2000000
% ./mutex-lock
main(): shared_resource == 2000000
% ./mutex-lock
main(): shared_resource == 2000000
%
thread_A() thread_B() : : 30: pthread_mutex_lock( &mutex1 ); : 41: pthread_mutex_lock( &mutex1 ); 31: x = shared_resource ; <他のスレッドが実行中なので 32: x = x + 1 ; この状態でしばらく待たされる> 33: shared_resource = x ; : 34: pthread_mutex_unlock( &mutex1 ); : : <実行再開> : 42: x = shared_resource ; 43: x = x + 1 ; 44: shared_resource = x ; 45: pthread_mutex_unlock( &mutex1 );後から来た
thread_B()
は、他のスレッドが実行中は、待たさ
れる。
thread_A | thread_B
2つのスレッドの間には、
有限バッファ(bounded buffer)を置く。
バッファが空の時、thread_B() は、thread_A() が何かデータをバッファに入 れるのを待つ。バッファがいっぱいの時、thread_A() は、thread_B() がバッ ファから何かデータを取り出すのを待つ。
条件変数の操作 operations:
1: 2: /* 3: * condv-buffer.c -- 条件変数を使った有限バッファ 4: */ 5: 6: #include <stdio.h> /* printf() */ 7: #include <stdlib.h> /* malloc(), exit() */ 8: #include <pthread.h> 9: 10: #define BUFFER_SIZE 4 /* バッファの大きさ */ 11: struct circular_buffer 12: { 13: int rp ; /* 読み出す位置 */ 14: int wp ; /* 書き込む位置 */ 15: int used ; /* バッファ内の要素数 */ 16: int data[BUFFER_SIZE]; /* データを保存する場所 */ 17: pthread_mutex_t mutex ; /* この構造体の相互排除のための mutex */ 18: pthread_cond_t not_full ; /* バッファが一杯ではない状態を待つための条件変数 */ 19: pthread_cond_t not_empty ; /* バッファが空ではない状態を待つための条件変数 */ 20: }; 21: 22: void thread_A(struct circular_buffer *b); 23: void thread_B(struct circular_buffer *b); 24: 25: void put( struct circular_buffer *b,int x ) 26: { 27: pthread_mutex_lock( &b->mutex ); 28: loop: if( b->used == BUFFER_SIZE ) 29: { 30: pthread_cond_wait( &b->not_full,&b->mutex ); 31: goto loop; 32: } 33: b->data[ b->wp++ ] = x ; 34: if( b->wp >= BUFFER_SIZE ) 35: b->wp = 0 ; 36: b->used ++ ; 37: pthread_cond_broadcast( &b->not_empty ); 38: pthread_mutex_unlock( &b->mutex ); 39: } 40: 41: int get( struct circular_buffer *b ) 42: { 43: int x ; 44: pthread_mutex_lock( &b->mutex ); 45: loop: if( b->used == 0 ) 46: { 47: pthread_cond_wait( &b->not_empty,&b->mutex ); 48: goto loop; 49: } 50: x = b->data[ b->rp++ ] ; 51: if( b->rp >= BUFFER_SIZE ) 52: b->rp = 0 ; 53: b->used -- ; 54: pthread_cond_broadcast( &b->not_full ); 55: pthread_mutex_unlock( &b->mutex ); 56: return( x ); 57: } 58: 59: main() 60: { 61: pthread_t t1 ; 62: pthread_t t2 ; 63: struct circular_buffer *b ; 64: b = (struct circular_buffer *)malloc(sizeof(struct circular_buffer)); 65: if( b == NULL ) 66: { 67: perror("no memory for struct buffer\n"); 68: exit( -1 ); 69: } 70: b->rp = 0 ; 71: b->wp = 0 ; 72: b->used = 0 ; 73: pthread_mutex_init( &b->mutex, NULL ); 74: pthread_cond_init( &b->not_full,NULL ); 75: pthread_cond_init( &b->not_empty,NULL ); 76: pthread_setconcurrency( 2 ); 77: pthread_create( &t1, NULL, (void *)thread_A, (void *)b ); 78: pthread_create( &t2, NULL, (void *)thread_B, (void *)b ); 79: pthread_join( t1, NULL ); 80: pthread_join( t2, NULL ); 81: } 82: 83: void thread_A( struct circular_buffer *b ) /* producer */ 84: { 85: int i,x ; 86: for( i = 0 ; i<10 ; i++ ) 87: { 88: x = i ; 89: printf("thread_A(): put( %d )\n",x ); 90: put( b,x ); 91: } 92: } 93: 94: void thread_B( struct circular_buffer *b ) /* consumer */ 95: { 96: int i, x ; 97: for( i = 0 ; i<10 ; i++ ) 98: { 99: x = get( b ); 100: printf("thread_B(): get() %d.\n", x ); 101: } 102: }
put()
は、バッファにデータを追加する時に使う手続き。
基本的には、入口で pthread_mutex_lock()
し、
出口で pthread_mutex_unlock()
する。
バッファが一杯の時には、条件変数b->not_full
で、
一杯でないという条件になるまで待つ。
待っている間は、mutex のロックは解除される。
pthread_cond_wait()
からリターンして来る時には、もう一度
ロックされた状態に戻る。
pthread_cond_wait()
でスリープしてある間に、
変数
(rp,wp,data
)が書き換えられている可能性があるので、
もう一度最初から調べる。
get()
は、バッファからデータを取り出す時に使う手
続き。put()
とほぼ対称形。バッファが空の時に、wait
し、バッファがもはや一杯ではないことをbroadcast する。
thread_A()
は、10回バッファにデータを書き込むスレッド。
thread_B()
は逆に、10回バッファからデータを読み出すス
レッド。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/condv-buffer.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/Makefile
% make condv-buffer
gcc -c -o condv-buffer.o condv-buffer.c
gcc condv-buffer.o -lpthread -o condv-buffer
% ./condv-buffer
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_A(): put( 2 )
thread_A(): put( 3 )
thread_A(): put( 4 )
thread_B(): get() 0.
thread_B(): get() 1.
thread_B(): get() 2.
thread_B(): get() 3.
thread_A(): put( 5 )
thread_A(): put( 6 )
thread_A(): put( 7 )
thread_A(): put( 8 )
thread_B(): get() 4.
thread_B(): get() 5.
thread_B(): get() 6.
thread_B(): get() 7.
thread_A(): put( 9 )
thread_B(): get() 8.
thread_B(): get() 9.
%
複数のスレッドが同時に動いている。バッファにためられるのは、最大4なの
に、put() が 5 回連続して成功しているように見える。printf() の順番と
put(), get() の順番は違うことがある。
pthread_mutex_lock( &mutex ); while( 1 ) { if( condition_checking() ) pthread_cond_wait( &cv, &mutex ); perform(); }
バッファに要素を「1つずつ」追加しているので、
pthread_cond_signal()
でもよい。
pthread_cond_broadcast()
に変えても動くようにプログラムを
作る。
pthread_cond_wait() で待っている間に条件が変わっているかもしれないので、 最初から調べ直す。signal で1人だけしか起き上がらないと仮定してはいけ ない。
「1つずつ」ではなく、複数個同時に読み書きする時には、
pthread_cond_broadcast()
でないとだめ。
迷った時には、pthread_cond_broadcast()
。
図? 開いたモジュールと閉じたモジュール
1: /* 2: * mutex-reclock-normal.c -- 通常の mutex を使う例(デッドロック) 3: */ 4: 5: #include <stdio.h> /* printf() */ 6: #include <pthread.h> 7: 8: void thread_A(), thread_B(); 9: int shared_resource ; 10: pthread_mutex_t mutex1 ; 11: 12: deposit( int n ) 13: { 14: pthread_mutex_lock( &mutex1 ); 15: shared_resource += n ; 16: pthread_mutex_unlock( &mutex1 ); 17: } 18: 19: add_interest() 20: { 21: int i ; 22: pthread_mutex_lock( &mutex1 ); 23: i = shared_resource * 0.05 ; 24: deposit( i ); 25: pthread_mutex_unlock( &mutex1 ); 26: } 27: 28: main() { 29: pthread_t t1 ; 30: pthread_t t2 ; 31: shared_resource = 1000000 ; 32: pthread_mutex_init( &mutex1, NULL ); 33: 34: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 35: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 36: pthread_join( t1, NULL ); 37: pthread_join( t2, NULL ); 38: printf("main(): shared_resource == %d\n", shared_resource ); 39: } 40: 41: void thread_A() 42: { 43: printf("thread_A(): deposit( 10000 ) ... \n"); 44: deposit( 10000 ); 45: printf("thread_A(): deposit( 10000 ) done. \n"); 46: } 47: 48: void thread_B() 49: { 50: printf("thread_B(): add_interest() ... \n"); 51: add_interest(); 52: printf("thread_B(): add_interest() done. \n"); 53: }実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/mutex-reclock-normal.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/Makefile
% make mutex-reclock-normal
gcc -c -o mutex-reclock-normal.o mutex-reclock-normal.c
gcc mutex-reclock-normal.o -lpthread -o mutex-reclock-normal
% ./mutex-reclock-normal
thread_A(): deposit( 10000 ) ...
thread_B(): add_interest() ...
thread_A(): deposit( 10000 ) done.
^C (強制終了)
%
%
注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義でコンパイルできない場 合、Makefile の次の行をコメントアウトしなさい。
#CFLAGS= -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NPこの結果、次のように -D が付いた形でコンパイルされる。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c
1: /* 2: * mutex-reclock-recursive.c -- 再帰的 mutex を使う例 3: */ ... 28: static int 29: my_pthread_mutex_init_recursive( pthread_mutex_t *mutex ) 30: { 31: pthread_mutexattr_t attr ; 32: int err ; 33: if( (err=pthread_mutexattr_init( &attr )) < 0 ) 34: return( 0 ); 35: if( (err=pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE)) <0 ) 36: return( 0 ); 37: err = pthread_mutex_init( mutex,&attr ); 38: return( err ); 39: } 40: 41: main() 42: { 43: pthread_t t1 ; 44: pthread_t t2 ; 45: shared_resource = 1000000 ; 46: my_pthread_mutex_init_recursive( &mutex1 ); 47: 48: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 49: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 50: pthread_join( t1, NULL ); 51: pthread_join( t2, NULL ); 52: printf("main(): shared_resource == %d\n", shared_resource ); 53: }実行例。deposit() と add_interest() のタイミングによっては、最終結果は 違うことがある。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/mutex-reclock-recursive.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/Makefile
% make mutex-reclock-recursive
gcc -c -o mutex-reclock-recursive.o mutex-reclock-recursive.c
gcc mutex-reclock-recursive.o -lpthread -o mutex-reclock-recursive
% ./mutex-reclock-recursive
thread_A(): deposit( 10000 ) ...
thread_A(): deposit( 10000 ) done.
thread_B(): add_interest() ...
thread_B(): add_interest() done.
main(): shared_resource == 1060500
% ./mutex-reclock-recursive
thread_A(): deposit( 10000 ) ...
thread_A(): deposit( 10000 ) done.
thread_B(): add_interest() ...
thread_B(): add_interest() done.
main(): shared_resource == 1060500
%
注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義でコンパイルできない場 合、Makefile の次の行をコメントアウトしなさい。
#CFLAGS= -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NPこの結果、次のように -D が付いた形でコンパイルされる。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c
printf("hello,world\n");上と下は、結果が違う。
printf("hello,"); /* ここに他のスレッドの出力が混じることがある。 Other threads can print something here. */ printf("world\n");これを避けるには、
flockfile(),funlockfile(),ftrylockfile()
を使う。
flockfile(stdout); printf("hello,"); /* ここに他のスレッドの出力が混じることはない Other threads cannot print something here. */ printf("world\n"); funlockfile(stdout);
putchar()
や
getchar()
は、遅すぎる。
flockfile()/funlockfile()
の中で使うための
putchar_unlocked(),getchar_unlocked(),putc_unlocked(),getc_unlocked()
が用意されている。printf_unsafe()
はない。
<−>再帰呼出し
スレッド間でポインタを渡す時には、スレッドの寿命にも注意。
シングルスレッドのプログラムでは、static変数は、プログラムのモジュール 性(modularity) を高めるために有効に使われてきた。
マルチスレッドと相性が非常に悪い。static変数もextern変数と同様に複数の スレッドで共有される。変更する場合には、mutex でロックが必要になる。
gethostbyname()
は、
static変数に値をセットして、その番地を返す。
struct hostent *gethostbyname( char *name ){ static struct hostent ret ; ..... return( &ret ); }複数のスレッドが同時にこの関数を呼び出した場合、同じstatic変数が使われ る。
externやstaticを使わず、auto変数やmalloc()だけを使っているような手続き は、スレッド・セーフ。
別のスレッド・セーフでない手続きを呼んでいれば、それはスレッド・セーフ ではない。
Sun のマニュアルより: struct hostent *gethostbyname(const char *name); struct hostent *gethostbyname_r(const char *name, struct hostent *result, char *buffer, int buflen, int *h_errnop);
次のような関数が利用可能である。
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value)
int sem_wait(sem_t * sem)
int sem_trywait(sem_t * sem)
sem_wait()
。0でも止まらずエラーを返す。
int sem_post(sem_t * sem)
int sem_getvalue(sem_t * sem, int * sval)
int sem_destroy(sem_t * sem)
注意:名前付きのセマフォもある。sem_open() で作成/初期化し、 sem_unlink() で削除する。
注意:Solaris には、POSIX のセマフォとは別に、カーネル内でのデバイス・ ドライバ作成のためのセマフォが用意されている。
Java には、最初から言語のレベルでスレッドの機能が入っている。
Java | Pthread |
new Thread(); start(); | pthread_create() |
join() | pthread_join() |
synchronized | pthread_mutex_lock()とpthread_mutex_unlock()の組 |
wait() | pthread_cond_wait() |
wait(long timeout) | pthread_cond_timedwait() |
notify() | pthread_cond_signal() |
notifyAll() | pthread_cond_broadcast() |
Java の synchronized は、再帰可能(recursive)。PTHREAD_MUTEX_RECURSIVE 相当。 1つのスレッドが2度同じオブジェクトをロックしてもよい。
Pthreads のプログラムで、1つの mutex と1つの条件変数を使ったものなら、 Java で簡単に書き直せる。
Pthreadでの有限バッファのプログラムは、1つの mutex で2つの条件変数を使っているので、単純には Java で書き直せない。 生産者側と消費者側が同時に待つことはないという性質を利用する。
Java で書かれたスレッドのプログラムは、汚いものがけっこうある。スレッ ド「間」の同期で、対称系になるべき所を、片方のスレッドのメソッドにして、 非対称になっていることがある。Java でプログラムを書く時にも、active object (thread) と passive object (threadなし)をきちんと分けた方がよい。
1: 2: /* 3: * CircularBuffer.java -- Java による環状バッファ 4: */ 5: 6: class CircularBuffer 7: { 8: static final int BUFFER_SIZE = 4 ; 9: int rp ; // 読み出す位置 10: int wp ; // 書き込む位置 11: int data[]; // データを保存する場所 12: int used ; // バッファ内の要素数 13: CircularBuffer() 14: { 15: data = new int[BUFFER_SIZE]; 16: rp = 0 ; 17: wp = 0 ; 18: used = 0; 19: } 20: 21: public synchronized void put( int x ) throws InterruptedException 22: { 23: while( used == data.length ) 24: wait(); 25: data[ wp++ ] = x ; 26: if( wp == data.length ) 27: wp = 0 ; 28: used++ ; 29: notifyAll(); 30: } 31: public synchronized int get() throws InterruptedException 32: { 33: int x ; 34: while( used == 0 ) 35: wait(); 36: x = data[ rp++ ] ; 37: if( rp >= data.length ) 38: rp = 0 ; 39: used--; 40: notifyAll(); 41: return( x ); 42: } 43: }
1: 2: /* 3: * CircularBufferDemo.java -- Java による有限バッファのデモ 4: */ 5: 6: class Thread_A implements Runnable // Producer 7: { 8: CircularBuffer b; 9: Thread_A( CircularBuffer b ) 10: { 11: this.b = b; 12: } 13: public void run() 14: { 15: int i,x ; 16: for( i = 0 ; i<10 ; i++ ) 17: { 18: try 19: { 20: x = i ; 21: System.out.println("Thread_A(): put( "+x+" )"); 22: b.put( x ); 23: } 24: catch( InterruptedException e ) 25: { 26: System.err.println("Thread_A(): Interrupted"); 27: break; 28: } 29: } 30: } 31: } 32: 33: class Thread_B implements Runnable // Producer 34: { 35: CircularBuffer b; 36: Thread_B( CircularBuffer b ) 37: { 38: this.b = b; 39: } 40: public void run() 41: { 42: int i,x ; 43: for( i = 0 ; i<10 ; i++ ) 44: { 45: try 46: { 47: x = b.get(); 48: System.out.println("Thread_B(): got() "+x+"."); 49: } 50: catch( InterruptedException e ) 51: { 52: System.err.println("Thread_B(): Interrupted"); 53: break; 54: } 55: } 56: } 57: } 58: 59: class CircularBufferDemo 60: { 61: public static void main(String argv[]) 62: { 63: final CircularBuffer b = new CircularBuffer(); 64: Thread t1 = new Thread( new Thread_A(b) ); 65: t1.start(); 66: Thread t2 = new Thread( new Thread_B(b) ); 67: t2.start(); 68: try 69: { 70: t1.join(); 71: t2.join(); 72: } 73: catch( InterruptedException e ) 74: { 75: System.err.println("main(): Interrupted"); 76: } 77: } 78: }
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/CircularBuffer.java
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-10-08/ex/CircularBufferDemo.java
% javac -encoding EUC-JP CircularBuffer.java CircularBufferDemo.java
% java CircularBufferDemo
Thread_A(): put( 0 )
Thread_A(): put( 1 )
Thread_A(): put( 2 )
Thread_A(): put( 3 )
Thread_A(): put( 4 )
Thread_B(): got() 0.
Thread_B(): got() 1.
Thread_B(): got() 2.
Thread_B(): got() 3.
Thread_B(): got() 4.
Thread_A(): put( 5 )
Thread_A(): put( 6 )
Thread_B(): got() 5.
Thread_A(): put( 7 )
Thread_A(): put( 8 )
Thread_A(): put( 9 )
Thread_B(): got() 6.
Thread_B(): got() 7.
Thread_B(): got() 8.
Thread_B(): got() 9.
%
締切りは、2013年10月13日、 23:59:59 とする。
The program of bounded buffer uses two condition variables. Rewrite this program to use a single condition variables. Since a thread can block either on put() or get(), we can share a single queue of a condition variable.
int balance() { ... // return the balance 残高を返す } int withdraw( int n ) { ... // return 1 if succeeded. 成功の時は 1 を返す // return 0 if failed due to insufficient money. 残高不足の時は 0 }
プログラミング言語は、C言語(Pthread)または Java を用いなさい。 You should use the programming language C with Thread or Java.
プログラミング言語は、C言語(Pthread)または Java を用いなさい。 You should use the programming language C with Thread or Java.
セマフォを使った循環バッファのプログラムの作成は、次の教科書の練習問 題にもなっている(巻末に回答もある)。 The following text book of operating systems contains exercise of a bounded buffer. The answer is also included in the end of the book.
清水謙多郎: "オペレーティングシステム",岩波書店 (1992). ISBN: 4000078526.